From 1cc3f8f7ff4172a409f8d0ccd4b5b2ab80c54c94 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 22 Nov 2013 08:24:47 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java | 17 +++-
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 110 +++++++++++++++++----------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 48 ++++++++---
opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java | 13 ++-
4 files changed, 121 insertions(+), 67 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index 9465346..7afe2d2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -189,16 +189,23 @@
}
/**
- * Returns the ServerState associated to the provided replication domain's
- * baseDN.
+ * Returns the CSN associated to the provided replication domain's baseDN and
+ * serverId.
*
* @param baseDN
* the replication domain's baseDN
+ * @param serverId
+ * the serverId
* @return the associated ServerState
*/
- public ServerState get(DN baseDN)
+ public CSN getCSN(DN baseDN, int serverId)
{
- return list.get(baseDN);
+ final ServerState ss = list.get(baseDN);
+ if (ss != null)
+ {
+ return ss.getCSN(serverId);
+ }
+ return null;
}
/**
@@ -260,7 +267,7 @@
public static Map<DN, ServerState> splitGenStateToServerStates(
String multiDomainServerState) throws DirectoryException
{
- final Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
+ Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
if (multiDomainServerState != null && multiDomainServerState.length() > 0)
{
try
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 1b20cb8..ae33dff 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -140,9 +140,12 @@
}
/**
- * Update the Server State with a CSN.
+ * Forward update the Server State with a CSN. The provided CSN will be put on
+ * the current object only if it is newer than the existing CSN for the same
+ * serverId or if there is no existing CSN.
*
- * @param csn The committed CSN.
+ * @param csn
+ * The committed CSN.
* @return a boolean indicating if the update was meaningful.
*/
public boolean update(CSN csn)
@@ -154,9 +157,9 @@
synchronized (serverIdToCSN)
{
- int serverId = csn.getServerId();
- CSN oldCSN = serverIdToCSN.get(serverId);
- if (oldCSN == null || csn.isNewerThan(oldCSN))
+ final int serverId = csn.getServerId();
+ final CSN existingCSN = serverIdToCSN.get(serverId);
+ if (existingCSN == null || csn.isNewerThan(existingCSN))
{
serverIdToCSN.put(serverId, csn);
return true;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index c82bc4e..dbb68f7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
@@ -61,29 +62,41 @@
private ChangelogState changelogState;
/*
- * previousCookie and mediumConsistencyPoint must be thread safe, because
+ * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
* 1) initialization can happen while the replication server starts receiving
* updates 2) many updates can happen concurrently. This solution also avoids
* using a queue that could fill up before we have consumed all its content.
*/
/**
+ * Holds the cross domain medium consistency Replication Update Vector for the
+ * current replication server, also known as the previous cookie.
+ * <p>
* Stores the value of the cookie before the change currently processed is
* inserted in the DB. After insert, it is updated with the CSN of the change
* currently processed (thus becoming the "current" cookie just before the
* change is returned.
- */
- private final MultiDomainServerState previousCookie =
- new MultiDomainServerState();
-
- /**
- * Holds the medium consistency point for the current replication server.
*
* @see <a href=
* "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
- * >OpenDJ Domain Names for a description of what the medium consistency point
- * is</a>
+ * >OpenDJ Domain Names - medium consistency RUV</a>
*/
- private final MultiDomainServerState mediumConsistencyPoint =
+ private final MultiDomainServerState mediumConsistencyRUV =
+ new MultiDomainServerState();
+ /**
+ * Holds the cross domain medium consistency CSN for the current replication
+ * server.
+ *
+ * @see <a href=
+ * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
+ * >OpenDJ Domain Names - medium consistency CSN</a>
+ */
+ private volatile CSN mediumConsistencyCSN;
+
+ /**
+ * Holds the most recent changes or heartbeats received for each serverIds
+ * cross domain.
+ */
+ private final MultiDomainServerState lastSeenUpdates =
new MultiDomainServerState();
/**
@@ -103,8 +116,8 @@
private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
/** This map can be updated by multiple threads. */
- private ConcurrentMap<Integer, DN> newCursors =
- new ConcurrentSkipListMap<Integer, DN>();
+ private ConcurrentMap<CSN, DN> newCursors =
+ new ConcurrentSkipListMap<CSN, DN>();
/**
* Builds a ChangeNumberIndexer object.
@@ -131,11 +144,8 @@
*/
public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
{
- mediumConsistencyPoint.update(baseDN, heartbeatCSN);
- final CompositeDBCursor<DN> localCursor = crossDomainDBCursor;
- final DN changeBaseDN = localCursor.getData();
- final CSN changeCSN = localCursor.getRecord().getCSN();
- tryNotify(changeBaseDN, changeCSN);
+ lastSeenUpdates.update(baseDN, heartbeatCSN);
+ tryNotify(baseDN);
}
/**
@@ -152,18 +162,18 @@
throws ChangelogException
{
final CSN csn = updateMsg.getCSN();
- mediumConsistencyPoint.update(baseDN, csn);
- newCursors.put(csn.getServerId(), baseDN);
- tryNotify(baseDN, csn);
+ lastSeenUpdates.update(baseDN, csn);
+ newCursors.put(csn, baseDN);
+ tryNotify(baseDN);
}
/**
* Notifies the Change number indexer thread if it will be able to do some
* work.
*/
- private void tryNotify(final DN baseDN, final CSN csn)
+ private void tryNotify(DN baseDN)
{
- if (mediumConsistencyPoint.cover(baseDN, csn))
+ if (canMoveForwardMediumConsistencyPoint(baseDN))
{
synchronized (this)
{
@@ -172,13 +182,25 @@
}
}
+ private boolean canMoveForwardMediumConsistencyPoint(DN baseDN)
+ {
+ final CSN mcCSN = mediumConsistencyCSN;
+ if (mcCSN != null)
+ {
+ final CSN lastSeenSameServerId =
+ lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
+ return mcCSN.isOlderThan(lastSeenSameServerId);
+ }
+ return true;
+ }
+
private void initialize() throws ChangelogException, DirectoryException
{
final ChangeNumberIndexRecord newestRecord =
changelogDB.getChangeNumberIndexDB().getNewestRecord();
if (newestRecord != null)
{
- previousCookie.update(
+ mediumConsistencyRUV.update(
new MultiDomainServerState(newestRecord.getPreviousCookie()));
}
@@ -190,13 +212,12 @@
final DN baseDN = entry.getKey();
for (Integer serverId : entry.getValue())
{
- final ServerState previousSS = previousCookie.get(baseDN);
- final CSN csn = previousSS != null ? previousSS.getCSN(serverId) : null;
+ final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
ensureCursorExists(baseDN, serverId, csn);
}
ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
- mediumConsistencyPoint.update(baseDN, latestKnownState);
+ lastSeenUpdates.update(baseDN, latestKnownState);
}
crossDomainDBCursor = newCompositeDBCursor();
@@ -206,14 +227,11 @@
final UpdateMsg record = crossDomainDBCursor.getRecord();
if (!record.getCSN().equals(newestRecord.getCSN()))
{
- // TODO JNR remove
- throw new RuntimeException("They do not equal! recordCSN="
- + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN());
+ // TODO JNR i18n safety check, should never happen
+ throw new ChangelogException(Message.raw("They do not equal! recordCSN="
+ + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
}
- // TODO JNR is it possible to use the following line instead?
- // previousCookie.update(newestRecord.getBaseDN(), record.getCSN());
- // TODO JNR would this mean updating the if above?
- previousCookie.update(crossDomainDBCursor.getData(), record.getCSN());
+ mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
crossDomainDBCursor.next();
}
@@ -281,7 +299,7 @@
}
catch (ChangelogException e)
{
- // TODO Auto-generated catch block
+ // TODO JNR error message i18n
if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
return;
@@ -310,14 +328,14 @@
final DN baseDN = crossDomainDBCursor.getData();
// FIXME problem: what if the serverId is not part of the ServerState?
// right now, thread will be blocked
- if (!mediumConsistencyPoint.cover(baseDN, csn))
+ if (!canMoveForwardMediumConsistencyPoint(baseDN))
{
// the oldest record to insert is newer than the medium consistency
// point. Let's wait for a change that can be published.
synchronized (this)
{
// double check to protect against a missed call to notify()
- if (!mediumConsistencyPoint.cover(baseDN, csn))
+ if (!canMoveForwardMediumConsistencyPoint(baseDN))
{
wait();
// loop to check if changes older than the medium consistency
@@ -329,11 +347,11 @@
// OK, the oldest change is older than the medium consistency point
// let's publish it to the CNIndexDB
+ final String previousCookie = mediumConsistencyRUV.toString();
final ChangeNumberIndexRecord record =
- new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn);
+ new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
changelogDB.getChangeNumberIndexDB().addRecord(record);
- // update, so it becomes the previous cookie for the next change
- previousCookie.update(baseDN, csn);
+ moveForwardMediumConsistencyPoint(csn, baseDN);
// advance cursor, success/failure will be checked later
crossDomainDBCursor.next();
@@ -351,16 +369,24 @@
}
}
+ private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN)
+ {
+ // update, so it becomes the previous cookie for the next change
+ mediumConsistencyRUV.update(baseDN, csn);
+ mediumConsistencyCSN = csn;
+ }
+
private void createNewCursors() throws ChangelogException
{
if (!newCursors.isEmpty())
{
boolean newCursorAdded = false;
- for (Iterator<Entry<Integer, DN>> iter = newCursors.entrySet().iterator();
+ for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator();
iter.hasNext();)
{
- final Entry<Integer, DN> entry = iter.next();
- if (!ensureCursorExists(entry.getValue(), entry.getKey(), null))
+ final Entry<CSN, DN> entry = iter.next();
+ final CSN csn = entry.getKey();
+ if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null))
{
newCursorAdded = true;
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 0627df3..8cab868 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.lang.Thread.State;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -106,7 +107,7 @@
new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
private ChangelogState initialState;
private ChangeNumberIndexer indexer;
- private MultiDomainServerState previousCookie;
+ private MultiDomainServerState initialCookie;
@BeforeClass
public static void classSetup() throws Exception
@@ -131,7 +132,7 @@
when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
initialState = new ChangelogState();
- previousCookie = new MultiDomainServerState();
+ initialCookie = new MultiDomainServerState();
}
@@ -181,7 +182,7 @@
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
publishUpdateMsg(msg2, msg1);
- assertAddedRecords(msg1, msg2);
+ assertAddedRecords(msg1);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -197,14 +198,20 @@
final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
publishUpdateMsg(msg3, msg4);
+ assertAddedRecords(msg3);
- assertAddedRecords(msg3, msg4);
+ final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
+ publishUpdateMsg(msg5);
+ assertAddedRecords(msg3);
+
+ final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
+ publishUpdateMsg(msg6);
+ assertAddedRecords(msg3, msg4, msg5);
}
- @Test(enabled = false, dependsOnMethods = { EMPTY_DB_NO_DS })
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception
{
- // TODO JNR make this tests work
addReplica(BASE_DN, serverId1);
addReplica(BASE_DN, serverId2);
startIndexer();
@@ -216,7 +223,7 @@
// simulate no messages received during some time for replica 2
publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
- assertAddedRecords(msg1Sid2, msg2Sid1, msg3Sid2);
+ assertAddedRecords(msg1Sid2, msg2Sid1);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -231,7 +238,10 @@
addReplica(BASE_DN, serverId2);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
publishUpdateMsg(msg2);
+ assertAddedRecords(msg1);
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
+ publishUpdateMsg(msg3);
assertAddedRecords(msg1, msg2);
}
@@ -275,13 +285,13 @@
final DN baseDN = newestMsg.getBaseDN();
final CSN csn = newestMsg.getCSN();
when(cnIndexDB.getNewestRecord()).thenReturn(
- new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn));
+ new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
final SequentialDBCursor cursor =
cursors.get(Pair.of(baseDN, csn.getServerId()));
cursor.add(newestMsg);
cursor.next(); // simulate the cursor had been initialized with this change
}
- previousCookie.update(msg.getBaseDN(), msg.getCSN());
+ initialCookie.update(msg.getBaseDN(), msg.getCSN());
}
}
@@ -321,6 +331,10 @@
assertThat(state).isEqualTo(State.WAITING);
}
+ /**
+ * Asserts which records have been added to the CNIndexDB since starting the
+ * {@link ChangeNumberIndexer} thread.
+ */
private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
{
final ArgumentCaptor<ChangeNumberIndexRecord> arg =
@@ -328,17 +342,21 @@
verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
final List<ChangeNumberIndexRecord> allValues = arg.getAllValues();
- // recheck it was not called more than expected
- assertThat(allValues).hasSameSizeAs(msgs);
+ // clone initial state to avoid modifying it
+ final MultiDomainServerState previousCookie =
+ new MultiDomainServerState(initialCookie.toString());
+ // check it was not called more than expected
+ String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(msgs) + ">";
+ assertThat(allValues.size()).as(desc1).isEqualTo(msgs.length);
for (int i = 0; i < msgs.length; i++)
{
final ReplicatedUpdateMsg msg = msgs[i];
final ChangeNumberIndexRecord record = allValues.get(i);
// check content in order
- String description = "expected: <" + msg + ">, but got: <" + record + ">";
- assertThat(record.getBaseDN()).as(description).isEqualTo(msg.getBaseDN());
- assertThat(record.getCSN()).as(description).isEqualTo(msg.getCSN());
- assertThat(record.getPreviousCookie()).as(description).isEqualTo(previousCookie.toString());
+ String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
+ assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
+ assertThat(record.getCSN()).as(desc2).isEqualTo(msg.getCSN());
+ assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString());
previousCookie.update(msg.getBaseDN(), msg.getCSN());
}
}
--
Gitblit v1.10.0