From 3f27a7ede5ca9df06137254aa32d41d023ac105d Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 16 Sep 2014 15:05:25 +0000
Subject: [PATCH] OPENDJ-1444 CR-4537 Remove previous cookie from storage of ChangeNumberIndexDB
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 145 ++++++++++++++++++++---------------------------
1 files changed, 62 insertions(+), 83 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index f8e6f52..0903ed3 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -48,14 +48,18 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
* Thread responsible for inserting replicated changes into the ChangeNumber
- * Index DB (CNIndexDB for short). Only changes older than the medium
- * consistency point are inserted in the CNIndexDB. As a consequence this class
- * is also responsible for maintaining the medium consistency point.
+ * Index DB (CNIndexDB for short).
+ * <p>
+ * Only changes older than the medium consistency point are inserted in the
+ * CNIndexDB. As a consequence this class is also responsible for maintaining
+ * the medium consistency point (indirectly through an
+ * {@code ECLMultiDomainDBCursor}).
*/
public class ChangeNumberIndexer extends DirectoryThread
{
@@ -78,27 +82,10 @@
/*
* The following MultiDomainServerState fields must be thread safe, because
* 1) initialization can happen while the replication server starts receiving
- * updates 2) many updates can happen concurrently.
+ * updates
+ * 2) many updates can happen concurrently.
*/
/**
- * Holds the cross domain medium consistency Replication Update Vector for the
- * current replication server, also known as the previous cookie.
- * <p>
- * Stores the value of the cookie before the change currently processed is
- * inserted in the DB. After insert, it is updated with the CSN of the change
- * currently processed (thus becoming the "current" cookie just before the
- * change is returned.
- * <p>
- * Note: This object is only updated by changes/updates.
- *
- * @see <a href=
- * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
- * >OpenDJ Domain Names - medium consistency RUV</a>
- */
- private final MultiDomainServerState mediumConsistencyRUV =
- new MultiDomainServerState();
-
- /**
* Holds the last time each replica was seen alive, whether via updates or
* heartbeat notifications, or offline notifications. Data is held for each
* serverId cross domain.
@@ -108,11 +95,10 @@
* <p>
* Note: This object is updated by both heartbeats and changes/updates.
*/
- private final MultiDomainServerState lastAliveCSNs =
- new MultiDomainServerState();
+ private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
+
/** Note: This object is updated by replica offline messages. */
- private final MultiDomainServerState replicasOffline =
- new MultiDomainServerState();
+ private final MultiDomainServerState replicasOffline = new MultiDomainServerState();
/**
* Cursor across all the replicaDBs for all the replication domains. It is
@@ -315,25 +301,50 @@
}
/**
- * Restores in memory data needed to build the CNIndexDB, including the medium
- * consistency point.
+ * Restores in memory data needed to build the CNIndexDB. In particular,
+ * initializes the changes cursor to the medium consistency point.
*/
private void initialize() throws ChangelogException, DirectoryException
{
- final ChangeNumberIndexRecord newestRecord =
- changelogDB.getChangeNumberIndexDB().getNewestRecord();
+ final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+
+ initializeLastAliveCSNs(domainDB);
+ initializeNextChangeCursor(domainDB);
+ initializeOfflineReplicas();
+
+ // this will not be used any more. Discard for garbage collection.
+ this.changelogState = null;
+ }
+
+ private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
+ {
+ final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN();
+
+ MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint =
+ domainDB.getCursorFrom(cookieWithNewestCSN, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+
+ nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
+ nextChangeForInsertDBCursor.next();
+ }
+
+ /** Returns a cookie initialised with the newest CSN for each replica. */
+ private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException
+ {
+ final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
+ final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState();
if (newestRecord != null)
{
- // restore the mediumConsistencyRUV from DB
- mediumConsistencyRUV.update(
- new MultiDomainServerState(newestRecord.getPreviousCookie()));
- // Do not update with the newestRecord CSN
- // as it will be used for a sanity check later in the same method
+ final CSN newestCsn = newestRecord.getCSN();
+ for (DN baseDN : changelogState.getDomainToServerIds().keySet())
+ {
+ cookieWithNewestCSN.update(baseDN, newestCsn);
+ }
}
+ return cookieWithNewestCSN;
+ }
- // initialize the DB cursor and the last seen updates
- // to ensure the medium consistency CSN can move forward
- final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+ private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
+ {
for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
final DN baseDN = entry.getKey();
@@ -352,34 +363,10 @@
lastAliveCSNs.update(baseDN, latestKnownState);
}
}
+ }
- nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate,
- domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY));
- nextChangeForInsertDBCursor.next();
-
- if (newestRecord != null)
- {
- // restore the "previousCookie" state before shutdown
- UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
- if (record instanceof ReplicaOfflineMsg)
- {
- // ignore: replica offline messages are never stored in the CNIndexDB
- nextChangeForInsertDBCursor.next();
- record = nextChangeForInsertDBCursor.getRecord();
- }
-
- // sanity check: ensure that when initializing the cursors at the previous
- // cookie, the next change we find is the newest record in the CNIndexDB
- if (!record.getCSN().equals(newestRecord.getCSN()))
- {
- throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(
- newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI()));
- }
- // Now we can update the mediumConsistencyRUV
- mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
- nextChangeForInsertDBCursor.next();
- }
-
+ private void initializeOfflineReplicas()
+ {
final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
for (DN baseDN : offlineReplicas)
{
@@ -394,9 +381,6 @@
}
}
}
-
- // this will not be used any more. Discard for garbage collection.
- this.changelogState = null;
}
private CSN oldestPossibleCSN(int serverId)
@@ -491,10 +475,10 @@
// OK, the oldest change is older than the medium consistency point
// let's publish it to the CNIndexDB.
- final String previousCookie = mediumConsistencyRUV.toString();
- final long changeNumber = changelogDB.getChangeNumberIndexDB().addRecord(
- new ChangeNumberIndexRecord(previousCookie, baseDN, csn));
- notifyEntryAddedToChangelog(baseDN, changeNumber, previousCookie, msg);
+ final long changeNumber = changelogDB.getChangeNumberIndexDB()
+ .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
+ MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie();
+ notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
moveForwardMediumConsistencyPoint(csn, baseDN);
}
catch (InterruptedException ignored)
@@ -532,18 +516,18 @@
* the change number of the newly added entry. It will be greater
* than zero for entries added to the change number index and less
* than or equal to zero for entries added to any replica DB
- * @param cookieString
- * a string representing the cookie of the newly added entry. This is
- * only meaningful for entries added to the change number index
+ * @param cookie
+ * the cookie of the newly added entry. This is only meaningful for
+ * entries added to the change number index
* @param msg
* the update message of the newly added entry
* @throws ChangelogException
* If a problem occurs while notifying of the newly added entry.
*/
protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
- String cookieString, UpdateMsg msg) throws ChangelogException
+ MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException
{
- ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookieString, msg);
+ ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg);
}
/**
@@ -561,12 +545,8 @@
TRACER.debugError(msg.toString());
}
- private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
- final DN mcBaseDN) throws ChangelogException
+ private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException
{
- // update, so it becomes the previous cookie for the next change
- mediumConsistencyRUV.update(mcBaseDN, mcCSN);
-
final int mcServerId = mcCSN.getServerId();
final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -587,7 +567,6 @@
* from the medium consistency RUV).
*/
lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
}
}
--
Gitblit v1.10.0