From 69a6335cadea27e7095cc502b5e62bcc01b3806f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 20 May 2014 14:08:38 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3563) Make the Medium Consistency Point support replicas temporarily leaving the topology
---
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 182 ++++++++++++++++++++++++++++++++++--
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 77 ++++++++++----
2 files changed, 225 insertions(+), 34 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 63af1c3..ad1f3c0 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -269,16 +269,32 @@
}
// ensure that all initial replicas alive information have been updated
// with CSNs that are acceptable for moving the medium consistency forward
- return allInitialReplicasArePastOldestPossibleCSN();
+ return allInitialReplicasAreOfflineOrAlive();
}
- private boolean allInitialReplicasArePastOldestPossibleCSN()
+ /**
+ * Returns true only if the initial replicas known from the changelog state DB
+ * are either:
+ * <ul>
+ * <li>offline, so do not wait for them in order to compute medium consistency
+ * </li>
+ * <li>alive, because we received heartbeats or changes (so their last alive
+ * CSN has been updated to something past the oldest possible CSN), we have
+ * enough info to compute medium consistency</li>
+ * </ul>
+ * In this case, we have enough information to compute medium consistency
+ * without waiting any more.
+ */
+ private boolean allInitialReplicasAreOfflineOrAlive()
{
for (DN baseDN : lastAliveCSNs)
{
for (CSN csn : lastAliveCSNs.getServerState(baseDN))
{
- if (csn.getTime() == 0)
+ if (// oldest possible CSN?
+ csn.getTime() == 0
+ // replica is not offline
+ && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
{
return false;
}
@@ -361,6 +377,9 @@
if (isECLEnabledDomain(baseDN))
{
replicasOffline.update(baseDN, offlineCSN);
+ // a replica offline message could also be the very last time
+ // we heard from this replica :)
+ lastAliveCSNs.update(baseDN, offlineCSN);
}
}
}
@@ -527,10 +546,6 @@
new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
changelogDB.getChangeNumberIndexDB().addRecord(record);
moveForwardMediumConsistencyPoint(csn, baseDN);
-
- // advance the cursor we just read from,
- // success/failure will be checked later
- nextChangeForInsertDBCursor.next();
}
catch (InterruptedException ignored)
{
@@ -568,6 +583,7 @@
private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
final DN mcBaseDN) throws ChangelogException
{
+ boolean callNextOnCursor = true;
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(mcBaseDN, mcCSN);
mediumConsistency = Pair.of(mcBaseDN, mcCSN);
@@ -583,17 +599,34 @@
}
else if (offlineCSN.isOlderThan(mcCSN))
{
- /*
- * replica is not back online and Medium consistency point has gone past
- * its last offline time: remove everything known about it: cursor,
- * offlineCSN from lastAliveCSN and remove all knowledge of this replica
- * from the medium consistency RUV.
- */
- removeCursor(mcBaseDN, mcCSN);
- lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+ Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
+ pair = getCursor(mcBaseDN, mcCSN.getServerId());
+ Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
+ if (iter != null && !iter.hasNext())
+ {
+ /*
+ * replica is not back online, Medium consistency point has gone past
+ * its last offline time, and there are no more changes after the
+ * offline CSN in the cursor: remove everything known about it:
+ * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+ * this replica from the medium consistency RUV.
+ */
+ iter.remove();
+ StaticUtils.close(pair.getFirst());
+ resetNextChangeForInsertDBCursor();
+ callNextOnCursor = false;
+ lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+ }
}
}
+
+ if (callNextOnCursor)
+ {
+ // advance the cursor we just read from,
+ // success/failure will be checked later
+ nextChangeForInsertDBCursor.next();
+ }
}
private void removeAllCursors()
@@ -611,8 +644,8 @@
newCursors.clear();
}
- private void removeCursor(final DN baseDN, final CSN csn)
- throws ChangelogException
+ private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
+ getCursor(final DN baseDN, final int serverId) throws ChangelogException
{
for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
: allCursors.entrySet())
@@ -623,16 +656,14 @@
entry1.getValue().entrySet().iterator(); iter.hasNext();)
{
final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
- if (csn.getServerId() == entry2.getKey())
+ if (serverId == entry2.getKey())
{
- iter.remove();
- StaticUtils.close(entry2.getValue());
- resetNextChangeForInsertDBCursor();
- return;
+ return Pair.of(entry2.getValue(), iter);
}
}
}
}
+ return Pair.empty();
}
private boolean recycleExhaustedCursors() throws ChangelogException
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index e6939dd..8c79991 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -33,6 +33,8 @@
import java.util.Map;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
@@ -53,6 +55,23 @@
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
+/**
+ * Test for ChangeNumberIndexer class. All dependencies to the changelog DB
+ * interfaces are mocked. The ChangeNumberIndexer class simulates what the RS
+ * does to compute a changeNumber. The tests setup various topologies with their
+ * replicas.
+ * <p>
+ * All tests are written with this layout:
+ * <ul>
+ * <li>Initial setup where RS is stopped. Data are set into the changelog state
+ * DB, the replica DBs and the change number index DB.</li>
+ * <li>Simulate RS startup by calling {@link #startCNIndexer(DN...)}. This will
+ * start the change number indexer thread that will start computing change
+ * numbers and inserting them in the change number index db.</li>
+ * <li>Send events to the change number indexer thread by publishing update
+ * messages, sending heartbeat messages or replica offline messages.</li>
+ * </ul>
+ */
@SuppressWarnings("javadoc")
public class ChangeNumberIndexerTest extends DirectoryServerTestCase
{
@@ -103,12 +122,15 @@
private static final int serverId2 = 102;
private static final int serverId3 = 103;
+ @Mock
private ChangelogDB changelogDB;
+ @Mock
private ChangeNumberIndexDB cnIndexDB;
+ @Mock
private ReplicationDomainDB domainDB;
- private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
- new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+ private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
private ChangelogState initialState;
+ private Map<DN, ServerState> domainNewestCSNs;
private ChangeNumberIndexer cnIndexer;
private MultiDomainServerState initialCookie;
@@ -130,14 +152,14 @@
@BeforeMethod
public void setup() throws Exception
{
- changelogDB = mock(ChangelogDB.class);
- cnIndexDB = mock(ChangeNumberIndexDB.class);
- domainDB = mock(ReplicationDomainDB.class);
+ MockitoAnnotations.initMocks(this);
when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
initialState = new ChangelogState();
initialCookie = new MultiDomainServerState();
+ cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+ domainNewestCSNs = new HashMap<DN, ServerState>();
}
@AfterMethod
@@ -160,6 +182,7 @@
{
addReplica(BASE_DN1, serverId1);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
@@ -171,8 +194,9 @@
{
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
addReplica(BASE_DN1, serverId1);
- setDBInitialRecords(msg1);
+ setCNIndexDBInitialRecords(msg1);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
publishUpdateMsg(msg2);
@@ -185,6 +209,7 @@
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
// simulate messages received out of order
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -202,6 +227,7 @@
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN2, serverId2);
startCNIndexer(BASE_DN1, BASE_DN2);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
@@ -235,6 +261,7 @@
{
addReplica(BASE_DN1, serverId1);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
@@ -264,8 +291,9 @@
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- setDBInitialRecords(msg1, msg2);
+ setCNIndexDBInitialRecords(msg1, msg2);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
@@ -287,6 +315,7 @@
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
@@ -304,6 +333,7 @@
addReplica(BASE_DN1, serverId2);
addReplica(BASE_DN1, serverId3);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
// cn=admin data will does not participate in the external changelog
// so it cannot add to it
@@ -322,6 +352,7 @@
{
addReplica(BASE_DN1, serverId1);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
@@ -341,6 +372,7 @@
{
addReplica(BASE_DN1, serverId1);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
@@ -360,6 +392,7 @@
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -376,6 +409,7 @@
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -402,16 +436,128 @@
assertExternalChangelogContent(msg1, msg2, msg4, msg5);
}
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
+ {
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
+ initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
+ startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
+
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
+ publishUpdateMsg(msg2);
+ // MCP does not wait for temporarily offline serverId1
+ assertExternalChangelogContent(msg2);
+
+ // serverId1 is back online, wait for changes from serverId2
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+ publishUpdateMsg(msg3);
+ assertExternalChangelogContent(msg2);
+ final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
+ publishUpdateMsg(msg4);
+ // MCP moves forward
+ assertExternalChangelogContent(msg2, msg3);
+ }
+
+ /**
+ * Scenario:
+ * <ol>
+ * <li>Replica 1 publishes one change</li>
+ * <li>Replica 1 sends offline message</li>
+ * <li>RS stops</li>
+ * <li>RS starts</li>
+ * </ol>
+ */
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
+ {
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ publishUpdateMsg(msg1);
+ initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
+ startCNIndexer(BASE_DN1);
+
+ // blocked until we receive info for serverId2
+ assertExternalChangelogContent();
+
+ sendHeartbeat(BASE_DN1, serverId2, 3);
+ // MCP moves forward
+ assertExternalChangelogContent(msg1);
+
+ // do not wait for temporarily offline serverId1
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
+ publishUpdateMsg(msg3);
+ assertExternalChangelogContent(msg1, msg3);
+
+ // serverId1 is back online, wait for changes from serverId2
+ final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
+ publishUpdateMsg(msg4);
+ assertExternalChangelogContent(msg1, msg3);
+
+ final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
+ publishUpdateMsg(msg5);
+ // MCP moves forward
+ assertExternalChangelogContent(msg1, msg3, msg4);
+ }
+
+ /**
+ * Scenario:
+ * <ol>
+ * <li>Replica 1 sends offline message</li>
+ * <li>Replica 1 starts</li>
+ * <li>Replica 1 publishes one change</li>
+ * <li>Replica 1 publishes a second change</li>
+ * <li>RS stops</li>
+ * <li>RS starts</li>
+ * </ol>
+ */
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
+ {
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
+ initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+ publishUpdateMsg(msg2, msg3);
+ startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
+
+ // MCP moves forward because serverId1 is not really offline
+ // since because we received a message from it after the offline replica msg
+ final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
+ publishUpdateMsg(msg4);
+ assertExternalChangelogContent(msg2, msg3);
+
+ // back to normal operations
+ sendHeartbeat(BASE_DN1, serverId1, 4);
+ assertExternalChangelogContent(msg2, msg3, msg4);
+ }
+
private void addReplica(DN baseDN, int serverId) throws Exception
{
final SequentialDBCursor cursor = new SequentialDBCursor();
cursors.put(Pair.of(baseDN, serverId), cursor);
when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
.thenReturn(cursor);
- when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(new ServerState());
+ when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
+ getDomainNewestCSNs(baseDN));
initialState.addServerIdToDomain(serverId, baseDN);
}
+ private ServerState getDomainNewestCSNs(final DN baseDN)
+ {
+ ServerState serverState = domainNewestCSNs.get(baseDN);
+ if (serverState == null)
+ {
+ serverState = new ServerState();
+ domainNewestCSNs.put(baseDN, serverState);
+ }
+ return serverState;
+ }
+
private void startCNIndexer(DN... eclEnabledDomains)
{
final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
@@ -432,8 +578,8 @@
if (cnIndexer != null)
{
cnIndexer.initiateShutdown();
- cnIndexer.interrupt();
cnIndexer.join();
+ cnIndexer = null;
}
}
@@ -447,7 +593,7 @@
return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
}
- private void setDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
+ private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
{
// Initialize the previous cookie that will be used to compare the records
// added to the CNIndexDB at the end of this test
@@ -489,7 +635,16 @@
{
if (!msg.isEmptyCursor())
{
- cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
+ if (cnIndexer != null)
+ {
+ // indexer is running
+ cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
+ }
+ else
+ {
+ // we are only setting up initial state, update the domain newest CSNs
+ getDomainNewestCSNs(msg.getBaseDN()).update(msg.getCSN());
+ }
}
}
waitForWaitingState(cnIndexer);
@@ -509,6 +664,10 @@
private void waitForWaitingState(final Thread t)
{
+ if (t == null)
+ { // not started yet, do not wait
+ return;
+ }
State state = t.getState();
while (!state.equals(State.WAITING)
&& !state.equals(State.TIMED_WAITING)
@@ -568,6 +727,7 @@
@Test(dataProvider = "precedingCSNDataProvider")
public void getPrecedingCSN(CSN start, CSN expected)
{
+ cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
assertThat(precedingCSN).isEqualTo(expected);
}
--
Gitblit v1.10.0