From 89f7cfbe75c879e8faa5b8b327e4c6d9fc2713eb Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 22 Nov 2013 09:24:10 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java | 20 +++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 55 ++++++++++++-
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java | 46 +++++++++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java | 16 ++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 58 +++++++++++++-
opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java | 30 +++++++
6 files changed, 216 insertions(+), 9 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index 7afe2d2..4186d3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -209,6 +209,26 @@
}
/**
+ * Removes the mapping to the provided CSN if it is present in this
+ * MultiDomainServerState.
+ *
+ * @param baseDN
+ * the replication domain's baseDN
+ * @param expectedCSN
+ * the CSN to be removed
+ * @return true if the CSN could be removed, false otherwise.
+ */
+ public boolean removeCSN(DN baseDN, CSN expectedCSN)
+ {
+ ServerState ss = list.get(baseDN);
+ if (ss != null)
+ {
+ return ss.removeCSN(expectedCSN);
+ }
+ return false;
+ }
+
+ /**
* Test if this object equals the provided other object.
* @param other The other object with which we want to test equality.
* @return Returns True if this equals other, else return false.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index ae33dff..edfb1a9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -192,6 +192,36 @@
}
/**
+ * Removes the mapping to the provided CSN if it is present in this
+ * ServerState.
+ *
+ * @param expectedCSN
+ * the CSN to be removed
+ * @return true if the CSN could be removed, false otherwise.
+ */
+ public boolean removeCSN(CSN expectedCSN)
+ {
+ if (expectedCSN == null)
+ return false;
+
+ synchronized (serverIdToCSN)
+ {
+ for (Iterator<CSN> iter = serverIdToCSN.values().iterator();
+ iter.hasNext();)
+ {
+ final CSN csn = iter.next();
+ if (expectedCSN.equals(csn))
+ {
+ iter.remove();
+ saved = false;
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
* Replace the Server State with another ServerState.
*
* @param serverState The ServerState.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index dbb68f7..93b8cd3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -64,8 +64,7 @@
/*
* mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
* 1) initialization can happen while the replication server starts receiving
- * updates 2) many updates can happen concurrently. This solution also avoids
- * using a queue that could fill up before we have consumed all its content.
+ * updates 2) many updates can happen concurrently.
*/
/**
* Holds the cross domain medium consistency Replication Update Vector for the
@@ -98,6 +97,8 @@
*/
private final MultiDomainServerState lastSeenUpdates =
new MultiDomainServerState();
+ private final MultiDomainServerState replicasOffline =
+ new MultiDomainServerState();
/**
* Composite cursor across all the replicaDBs for all the replication domains.
@@ -168,6 +169,21 @@
}
/**
+ * Signals a replica went offline.
+ *
+ * @param baseDN
+ * the replica's replication domain
+ * @param offlineCSN
+ * the serverId and time of the replica that went offline
+ */
+ public void replicaOffline(DN baseDN, CSN offlineCSN)
+ {
+ lastSeenUpdates.update(baseDN, offlineCSN);
+ replicasOffline.update(baseDN, offlineCSN);
+ tryNotify(baseDN);
+ }
+
+ /**
* Notifies the Change number indexer thread if it will be able to do some
* work.
*/
@@ -187,8 +203,8 @@
final CSN mcCSN = mediumConsistencyCSN;
if (mcCSN != null)
{
- final CSN lastSeenSameServerId =
- lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
+ final int serverId = mcCSN.getServerId();
+ final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
return mcCSN.isOlderThan(lastSeenSameServerId);
}
return true;
@@ -374,6 +390,37 @@
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(baseDN, csn);
mediumConsistencyCSN = csn;
+ final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
+ if (offlineCSN != null
+ && offlineCSN.isOlderThan(mediumConsistencyCSN)
+ // If no new updates has been seen for this replica
+ && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
+ {
+ removeCursor(baseDN, csn);
+ replicasOffline.removeCSN(baseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
+ }
+ }
+
+ private void removeCursor(final DN baseDN, final CSN csn)
+ {
+ for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
+ .entrySet())
+ {
+ if (baseDN.equals(entry.getKey()))
+ {
+ final Set<Integer> serverIds = entry.getValue().keySet();
+ for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
+ {
+ final int serverId = iter.next();
+ if (csn.getServerId() == serverId)
+ {
+ iter.remove();
+ return;
+ }
+ }
+ }
+ }
}
private void createNewCursors() throws ChangelogException
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
index 1d58eca..928edde 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
@@ -93,6 +93,23 @@
assertEquals(state.toString(), expected);
}
+ @Test
+ public void testUpdateMultiDomainServerState() throws Exception
+ {
+ final DN dn1 = DN.decode("o=test1");
+ final DN dn2 = DN.decode("o=test2");
+
+ final MultiDomainServerState state1 = new MultiDomainServerState();
+ state1.update(dn1, csn3);
+ state1.update(dn2, csn2);
+ final MultiDomainServerState state2 = new MultiDomainServerState();
+ state2.update(state1);
+
+ assertSame(csn3, state2.getCSN(dn1, csn3.getServerId()));
+ assertSame(csn2, state2.getCSN(dn2, csn2.getServerId()));
+ assertTrue(state1.equalsTo(state2));
+ }
+
@Test(dependsOnMethods = { "testUpdateCSN" })
public void testEqualsTo() throws Exception
{
@@ -136,4 +153,33 @@
assertTrue(state.isEmpty());
}
+ @Test(dependsOnMethods = { "testUpdateCSN" })
+ public void testRemoveCSN() throws Exception
+ {
+ final DN dn1 = DN.decode("o=test1");
+ final DN dn2 = DN.decode("o=test2");
+ final DN dn3 = DN.decode("o=test3");
+
+ final MultiDomainServerState state = new MultiDomainServerState();
+
+ assertTrue(state.update(dn1, csn1));
+ assertTrue(state.update(dn2, csn1));
+ assertTrue(state.update(dn2, csn2));
+ assertNull(state.getCSN(dn3, 42));
+
+ assertFalse(state.removeCSN(dn3, csn1));
+ assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
+ assertSame(csn1, state.getCSN(dn2, csn1.getServerId()));
+ assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
+
+ assertFalse(state.removeCSN(dn1, csn2));
+ assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
+ assertSame(csn1, state.getCSN(dn2, csn1.getServerId()));
+ assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
+
+ assertTrue(state.removeCSN(dn2, csn1));
+ assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
+ assertNull(state.getCSN(dn2, csn1.getServerId()));
+ assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
+ }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
index 73beefa..4d48ca5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -152,4 +152,20 @@
assertTrue(state.cover(csn1Server2));
assertFalse(state.cover(csn0Server3));
}
+
+ @Test
+ public void testRemoveCSN() throws Exception
+ {
+ final CSN csn1Server1 = new CSN(1, 0, 1);
+ final CSN csn2Server1 = new CSN(2, 0, 1);
+ final CSN csn1Server2 = new CSN(1, 0, 2);
+
+ final ServerState state = new ServerState();
+ assertTrue(state.update(csn1Server1));
+
+ assertFalse(state.removeCSN(null));
+ assertFalse(state.removeCSN(csn2Server1));
+ assertFalse(state.removeCSN(csn1Server2));
+ assertTrue(state.removeCSN(csn1Server1));
+ }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 8cab868..5791d30 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -153,7 +153,6 @@
final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
publishUpdateMsg(msg1);
-
assertAddedRecords(msg1);
}
@@ -167,7 +166,6 @@
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
publishUpdateMsg(msg2);
-
assertAddedRecords(msg2);
}
@@ -181,7 +179,6 @@
final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
publishUpdateMsg(msg2, msg1);
-
assertAddedRecords(msg1);
}
@@ -210,7 +207,7 @@
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception
+ public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
{
addReplica(BASE_DN, serverId1);
addReplica(BASE_DN, serverId2);
@@ -222,7 +219,6 @@
final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
// simulate no messages received during some time for replica 2
publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
-
assertAddedRecords(msg1Sid2, msg2Sid1);
}
@@ -245,6 +241,46 @@
assertAddedRecords(msg1, msg2);
}
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
+ {
+ addReplica(BASE_DN, serverId1);
+ addReplica(BASE_DN, serverId2);
+ startIndexer();
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ publishUpdateMsg(msg1, msg2);
+ assertAddedRecords(msg1);
+
+ sendHeartbeat(BASE_DN, serverId1, 3);
+ assertAddedRecords(msg1, msg2);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
+ {
+ addReplica(BASE_DN, serverId1);
+ addReplica(BASE_DN, serverId2);
+ startIndexer();
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ publishUpdateMsg(msg1, msg2);
+ assertAddedRecords(msg1);
+
+ replicaOffline(BASE_DN, serverId2, 3);
+ // MCP cannot move forward since no new updates from serverId1
+ assertAddedRecords(msg1);
+
+ final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
+ publishUpdateMsg(msg4);
+ // MCP moved forward after receiving update from serverId1
+ // (last replica in the domain)
+ assertAddedRecords(msg1, msg2, msg4);
+ }
+
+
private void addReplica(DN baseDN, int serverId) throws Exception
{
final SequentialDBCursor cursor = new SequentialDBCursor();
@@ -320,6 +356,18 @@
waitForWaitingState(indexer);
}
+ private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception
+ {
+ indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
+ waitForWaitingState(indexer);
+ }
+
+ private void replicaOffline(DN baseDN, int serverId, int time) throws Exception
+ {
+ indexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
+ waitForWaitingState(indexer);
+ }
+
private void waitForWaitingState(final Thread t)
{
State state = t.getState();
--
Gitblit v1.10.0