From 6aa4fa5b4f71e830dba55f3ea3f9530737db2d8b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 17 Apr 2014 12:44:17 +0000
Subject: [PATCH] OPENDJ-1439 Change number stops progressing with cross domain replication
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java | 5
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 56 +++++++------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 135 +++++++++++++++++++--------------
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java | 5 -
4 files changed, 112 insertions(+), 89 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
index 2dc0607..a1abe41 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.api;
@@ -74,9 +74,6 @@
* by this DB and return the changeNumber associated to this record.
* <p>
* Note: this method disregards the changeNumber in the provided record.
- * <p>
- * FIXME will be removed when ECLServerHandler will not be responsible anymore
- * for lazily building the ChangeNumberIndexDB.
*
* @param record
* The {@link ChangeNumberIndexRecord} to add to this DB.
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 5179986..1fe38e0 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -92,14 +92,14 @@
private final MultiDomainServerState mediumConsistencyRUV =
new MultiDomainServerState();
/**
- * Holds the cross domain medium consistency CSN for the current replication
- * server.
+ * Holds the cross domain medium consistency baseDN and CSN for the current
+ * replication server.
*
* @see <a href=
* "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
* >OpenDJ Domain Names - medium consistency CSN</a>
*/
- private volatile CSN mediumConsistencyCSN;
+ private volatile Pair<DN, CSN> mediumConsistency;
/**
* Holds the last time each replica was seen alive, whether via updates or
@@ -182,7 +182,7 @@
}
lastAliveCSNs.update(baseDN, heartbeatCSN);
- tryNotify(baseDN);
+ tryNotify();
}
/**
@@ -207,7 +207,7 @@
// only keep the oldest CSN that will be the new cursor's starting point
newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
lastAliveCSNs.update(baseDN, csn);
- tryNotify(baseDN);
+ tryNotify();
}
/**
@@ -239,16 +239,16 @@
{
replicasOffline.update(baseDN, offlineCSN);
lastAliveCSNs.update(baseDN, offlineCSN);
- tryNotify(baseDN);
+ tryNotify();
}
/**
* Notifies the Change number indexer thread if it will be able to do some
* work.
*/
- private void tryNotify(DN baseDN)
+ private void tryNotify()
{
- if (canMoveForwardMediumConsistencyPoint(baseDN))
+ if (canMoveForwardMediumConsistencyPoint())
{
synchronized (this)
{
@@ -257,13 +257,14 @@
}
}
- private boolean canMoveForwardMediumConsistencyPoint(DN baseDN)
+ private boolean canMoveForwardMediumConsistencyPoint()
{
- final CSN mcCSN = mediumConsistencyCSN;
- if (mcCSN != null)
+ final Pair<DN, CSN> mc = mediumConsistency;
+ if (mc != null)
{
- final int serverId = mcCSN.getServerId();
- CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
+ final CSN mcCSN = mc.getSecond();
+ final CSN lastTimeSameReplicaSeenAlive =
+ lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
}
return true;
@@ -441,7 +442,8 @@
}
wait();
}
- // advance cursor, success/failure will be checked later
+ // try to recycle the exhausted cursors,
+ // success/failure will be checked later
nextChangeForInsertDBCursor.next();
// loop to check whether new changes have been added to the
// ReplicaDBs
@@ -452,7 +454,7 @@
final DN baseDN = nextChangeForInsertDBCursor.getData();
// FIXME problem: what if the serverId is not part of the ServerState?
// right now, change number will be blocked
- if (!canMoveForwardMediumConsistencyPoint(baseDN))
+ if (!canMoveForwardMediumConsistencyPoint())
{
// the oldest record to insert is newer than the medium consistency
// point. Let's wait for a change that can be published.
@@ -460,7 +462,7 @@
{
// double check to protect against a missed call to notify()
if (!isShutdownInitiated()
- && !canMoveForwardMediumConsistencyPoint(baseDN))
+ && !canMoveForwardMediumConsistencyPoint())
{
wait();
// loop to check if changes older than the medium consistency
@@ -479,7 +481,8 @@
changelogDB.getChangeNumberIndexDB().addRecord(record);
moveForwardMediumConsistencyPoint(csn, baseDN);
- // advance cursor, success/failure will be checked later
+ // advance the cursor we just read from,
+ // success/failure will be checked later
nextChangeForInsertDBCursor.next();
}
catch (InterruptedException ignored)
@@ -517,20 +520,21 @@
}
}
- private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN)
+ private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
+ final DN mcBaseDN)
{
// update, so it becomes the previous cookie for the next change
- mediumConsistencyRUV.update(baseDN, csn);
- mediumConsistencyCSN = csn;
- final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
+ mediumConsistencyRUV.update(mcBaseDN, mcCSN);
+ mediumConsistency = Pair.of(mcBaseDN, mcCSN);
+ final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
if (offlineCSN != null
- && offlineCSN.isOlderThan(mediumConsistencyCSN)
+ && offlineCSN.isOlderThan(mcCSN)
// If no new updates has been seen for this replica
- && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
+ && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
{
- removeCursor(baseDN, csn);
- replicasOffline.removeCSN(baseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
+ removeCursor(mcBaseDN, mcCSN);
+ replicasOffline.removeCSN(mcBaseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 8cb6ed4..c60411c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -88,11 +88,15 @@
@Override
public String toString()
{
- return "csn=" + getCSN() + ", baseDN=" + baseDN;
+ return "UpdateMsg("
+ + "\"" + baseDN + " " + getCSN().getServerId() + "\""
+ + ", csn=" + getCSN().toStringUI()
+ + ")";
}
}
- private static DN BASE_DN;
+ private static DN BASE_DN1;
+ private static DN BASE_DN2;
private static DN ADMIN_DATA_DN;
private static final int serverId1 = 101;
private static final int serverId2 = 102;
@@ -111,7 +115,8 @@
public static void classSetup() throws Exception
{
TestCaseUtils.startFakeServer();
- BASE_DN = DN.decode("dc=example,dc=com");
+ BASE_DN1 = DN.decode("dc=example,dc=com");
+ BASE_DN2 = DN.decode("dc=world,dc=company");
ADMIN_DATA_DN = DN.decode("cn=admin data");
}
@@ -152,10 +157,10 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDS() throws Exception
{
- addReplica(BASE_DN, serverId1);
+ addReplica(BASE_DN1, serverId1);
startCNIndexer();
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
assertExternalChangelogContent(msg1);
}
@@ -163,12 +168,12 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void nonEmptyDBOneInitialDS() throws Exception
{
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
- addReplica(BASE_DN, serverId1);
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ addReplica(BASE_DN1, serverId1);
setDBInitialRecords(msg1);
startCNIndexer();
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
publishUpdateMsg(msg2);
assertExternalChangelogContent(msg2);
}
@@ -176,36 +181,52 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoInitialDSs() throws Exception
{
- addReplica(BASE_DN, serverId1);
- addReplica(BASE_DN, serverId2);
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
startCNIndexer();
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg2, msg1);
assertExternalChangelogContent(msg1);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoInitialDSsDifferentDomains() throws Exception
+ {
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN2, serverId2);
+ startCNIndexer();
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
+ publishUpdateMsg(msg1, msg2);
+ assertExternalChangelogContent(msg1);
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+ publishUpdateMsg(msg3);
+ assertExternalChangelogContent(msg1, msg2);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void nonEmptyDBTwoInitialDSs() throws Exception
{
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
- addReplica(BASE_DN, serverId1);
- addReplica(BASE_DN, serverId2);
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
setDBInitialRecords(msg1, msg2);
startCNIndexer();
- final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
- final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
+ final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
publishUpdateMsg(msg3, msg4);
assertExternalChangelogContent(msg3);
- final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
+ final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
publishUpdateMsg(msg5);
assertExternalChangelogContent(msg3);
- final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
+ final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
publishUpdateMsg(msg6);
assertExternalChangelogContent(msg3, msg4, msg5);
}
@@ -213,14 +234,14 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
{
- addReplica(BASE_DN, serverId1);
- addReplica(BASE_DN, serverId2);
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
startCNIndexer();
- final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1);
- final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2);
- final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN, serverId1, 2);
- final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
+ final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
+ final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
+ final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN1, serverId1, 2);
+ final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN1, serverId2, 3);
// simulate no messages received during some time for replica 2
publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
assertExternalChangelogContent(msg1Sid2, msg2Sid1);
@@ -230,8 +251,8 @@
public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
{
addReplica(ADMIN_DATA_DN, serverId1);
- addReplica(BASE_DN, serverId2);
- addReplica(BASE_DN, serverId3);
+ addReplica(BASE_DN1, serverId2);
+ addReplica(BASE_DN1, serverId3);
startCNIndexer();
// cn=admin data will does not participate in the external changelog
@@ -240,8 +261,8 @@
publishUpdateMsg(msg1);
assertExternalChangelogContent();
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
- final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId3, 3);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId3, 3);
publishUpdateMsg(msg2, msg3);
assertExternalChangelogContent(msg2);
}
@@ -249,18 +270,18 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
{
- addReplica(BASE_DN, serverId1);
+ addReplica(BASE_DN1, serverId1);
startCNIndexer();
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
- addReplica(BASE_DN, serverId2);
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ addReplica(BASE_DN1, serverId2);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg2);
assertExternalChangelogContent(msg1);
- final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
publishUpdateMsg(msg3);
assertExternalChangelogContent(msg1, msg2);
}
@@ -268,36 +289,36 @@
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
{
- addReplica(BASE_DN, serverId1);
- addReplica(BASE_DN, serverId2);
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
startCNIndexer();
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg1, msg2);
assertExternalChangelogContent(msg1);
- sendHeartbeat(BASE_DN, serverId1, 3);
+ sendHeartbeat(BASE_DN1, serverId1, 3);
assertExternalChangelogContent(msg1, msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
{
- addReplica(BASE_DN, serverId1);
- addReplica(BASE_DN, serverId2);
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
startCNIndexer();
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
publishUpdateMsg(msg1, msg2);
assertExternalChangelogContent(msg1);
- replicaOffline(BASE_DN, serverId2, 3);
+ replicaOffline(BASE_DN1, serverId2, 3);
// MCP cannot move forward since no new updates from serverId1
assertExternalChangelogContent(msg1);
- final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
+ final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
publishUpdateMsg(msg4);
// MCP moved forward after receiving update from serverId1
// (last replica in the domain)
@@ -321,7 +342,7 @@
@Override
protected boolean isECLEnabledDomain(DN baseDN)
{
- return BASE_DN.equals(baseDN);
+ return BASE_DN1.equals(baseDN) || BASE_DN2.equals(baseDN);
}
};
cnIndexer.start();
@@ -423,10 +444,10 @@
* Asserts which records have been added to the CNIndexDB since starting the
* {@link ChangeNumberIndexer} thread.
*/
- private void assertExternalChangelogContent(ReplicatedUpdateMsg... msgs)
+ private void assertExternalChangelogContent(ReplicatedUpdateMsg... expectedMsgs)
throws Exception
{
- if (msgs.length == 0)
+ if (expectedMsgs.length == 0)
{
verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
return;
@@ -441,18 +462,18 @@
final MultiDomainServerState previousCookie =
new MultiDomainServerState(initialCookie.toString());
// check it was not called more than expected
- String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(msgs) + ">";
- assertThat(allValues.size()).as(desc1).isEqualTo(msgs.length);
- for (int i = 0; i < msgs.length; i++)
+ String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(expectedMsgs) + ">";
+ assertThat(allValues).as(desc1).hasSize(expectedMsgs.length);
+ for (int i = 0; i < expectedMsgs.length; i++)
{
- final ReplicatedUpdateMsg msg = msgs[i];
+ final ReplicatedUpdateMsg expectedMsg = expectedMsgs[i];
final ChangeNumberIndexRecord record = allValues.get(i);
// check content in order
- String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
- assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
- assertThat(record.getCSN()).as(desc2).isEqualTo(msg.getCSN());
+ String desc2 = "actual was:<" + record + ">, but expected was:<" + expectedMsg + ">";
+ assertThat(record.getBaseDN()).as(desc2).isEqualTo(expectedMsg.getBaseDN());
+ assertThat(record.getCSN()).as(desc2).isEqualTo(expectedMsg.getCSN());
assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString());
- previousCookie.update(msg.getBaseDN(), msg.getCSN());
+ previousCookie.update(expectedMsg.getBaseDN(), expectedMsg.getCSN());
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
index 6267d77..f9d5d78 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -81,7 +81,8 @@
@Override
public String toString()
{
- return "currentRecord=" + current + " nextMessages=" + msgs;
+ return getClass().getSimpleName() + "(currentRecord=" + current
+ + " nextMessages=" + msgs + ")";
}
}
\ No newline at end of file
--
Gitblit v1.10.0