From 3f27a7ede5ca9df06137254aa32d41d023ac105d Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 16 Sep 2014 15:05:25 +0000
Subject: [PATCH] OPENDJ-1444 CR-4537 Remove previous cookie from storage of ChangeNumberIndexDB

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |  145 ++++++++++++++++++++---------------------------
 1 files changed, 62 insertions(+), 83 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index f8e6f52..0903ed3 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -48,14 +48,18 @@
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
 import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
 import static org.opends.server.util.StaticUtils.*;
 
 /**
  * Thread responsible for inserting replicated changes into the ChangeNumber
- * Index DB (CNIndexDB for short). Only changes older than the medium
- * consistency point are inserted in the CNIndexDB. As a consequence this class
- * is also responsible for maintaining the medium consistency point.
+ * Index DB (CNIndexDB for short).
+ * <p>
+ * Only changes older than the medium consistency point are inserted in the
+ * CNIndexDB. As a consequence this class is also responsible for maintaining
+ * the medium consistency point (indirectly through an
+ * {@code ECLMultiDomainDBCursor}).
  */
 public class ChangeNumberIndexer extends DirectoryThread
 {
@@ -78,27 +82,10 @@
   /*
    * The following MultiDomainServerState fields must be thread safe, because
    * 1) initialization can happen while the replication server starts receiving
-   * updates 2) many updates can happen concurrently.
+   * updates
+   * 2) many updates can happen concurrently.
    */
   /**
-   * Holds the cross domain medium consistency Replication Update Vector for the
-   * current replication server, also known as the previous cookie.
-   * <p>
-   * Stores the value of the cookie before the change currently processed is
-   * inserted in the DB. After insert, it is updated with the CSN of the change
-   * currently processed (thus becoming the "current" cookie just before the
-   * change is returned.
-   * <p>
-   * Note: This object is only updated by changes/updates.
-   *
-   * @see <a href=
-   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
-   * >OpenDJ Domain Names - medium consistency RUV</a>
-   */
-  private final MultiDomainServerState mediumConsistencyRUV =
-      new MultiDomainServerState();
-
-  /**
    * Holds the last time each replica was seen alive, whether via updates or
    * heartbeat notifications, or offline notifications. Data is held for each
    * serverId cross domain.
@@ -108,11 +95,10 @@
    * <p>
    * Note: This object is updated by both heartbeats and changes/updates.
    */
-  private final MultiDomainServerState lastAliveCSNs =
-      new MultiDomainServerState();
+  private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
+
   /** Note: This object is updated by replica offline messages. */
-  private final MultiDomainServerState replicasOffline =
-      new MultiDomainServerState();
+  private final MultiDomainServerState replicasOffline = new MultiDomainServerState();
 
   /**
    * Cursor across all the replicaDBs for all the replication domains. It is
@@ -315,25 +301,50 @@
   }
 
   /**
-   * Restores in memory data needed to build the CNIndexDB, including the medium
-   * consistency point.
+   * Restores in memory data needed to build the CNIndexDB. In particular,
+   * initializes the changes cursor to the medium consistency point.
    */
   private void initialize() throws ChangelogException, DirectoryException
   {
-    final ChangeNumberIndexRecord newestRecord =
-        changelogDB.getChangeNumberIndexDB().getNewestRecord();
+    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+
+    initializeLastAliveCSNs(domainDB);
+    initializeNextChangeCursor(domainDB);
+    initializeOfflineReplicas();
+
+    // this will not be used any more. Discard for garbage collection.
+    this.changelogState = null;
+  }
+
+  private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
+  {
+    final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN();
+
+    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint =
+        domainDB.getCursorFrom(cookieWithNewestCSN, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+
+    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
+    nextChangeForInsertDBCursor.next();
+  }
+
+  /** Returns a cookie initialised with the newest CSN for each replica. */
+  private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException
+  {
+    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
+    final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState();
     if (newestRecord != null)
     {
-      // restore the mediumConsistencyRUV from DB
-      mediumConsistencyRUV.update(
-          new MultiDomainServerState(newestRecord.getPreviousCookie()));
-      // Do not update with the newestRecord CSN
-      // as it will be used for a sanity check later in the same method
+      final CSN newestCsn = newestRecord.getCSN();
+      for (DN baseDN : changelogState.getDomainToServerIds().keySet())
+      {
+        cookieWithNewestCSN.update(baseDN, newestCsn);
+      }
     }
+    return cookieWithNewestCSN;
+  }
 
-    // initialize the DB cursor and the last seen updates
-    // to ensure the medium consistency CSN can move forward
-    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+  private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
+  {
     for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
     {
       final DN baseDN = entry.getKey();
@@ -352,34 +363,10 @@
         lastAliveCSNs.update(baseDN, latestKnownState);
       }
     }
+  }
 
-    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate,
-        domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY));
-    nextChangeForInsertDBCursor.next();
-
-    if (newestRecord != null)
-    {
-      // restore the "previousCookie" state before shutdown
-      UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
-      if (record instanceof ReplicaOfflineMsg)
-      {
-        // ignore: replica offline messages are never stored in the CNIndexDB
-        nextChangeForInsertDBCursor.next();
-        record = nextChangeForInsertDBCursor.getRecord();
-      }
-
-      // sanity check: ensure that when initializing the cursors at the previous
-      // cookie, the next change we find is the newest record in the CNIndexDB
-      if (!record.getCSN().equals(newestRecord.getCSN()))
-      {
-        throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(
-            newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI()));
-      }
-      // Now we can update the mediumConsistencyRUV
-      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
-      nextChangeForInsertDBCursor.next();
-    }
-
+  private void initializeOfflineReplicas()
+  {
     final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
     for (DN baseDN : offlineReplicas)
     {
@@ -394,9 +381,6 @@
         }
       }
     }
-
-    // this will not be used any more. Discard for garbage collection.
-    this.changelogState = null;
   }
 
   private CSN oldestPossibleCSN(int serverId)
@@ -491,10 +475,10 @@
 
           // OK, the oldest change is older than the medium consistency point
           // let's publish it to the CNIndexDB.
-          final String previousCookie = mediumConsistencyRUV.toString();
-          final long changeNumber = changelogDB.getChangeNumberIndexDB().addRecord(
-              new ChangeNumberIndexRecord(previousCookie, baseDN, csn));
-          notifyEntryAddedToChangelog(baseDN, changeNumber, previousCookie, msg);
+          final long changeNumber = changelogDB.getChangeNumberIndexDB()
+              .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
+          MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie();
+          notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
           moveForwardMediumConsistencyPoint(csn, baseDN);
         }
         catch (InterruptedException ignored)
@@ -532,18 +516,18 @@
    *          the change number of the newly added entry. It will be greater
    *          than zero for entries added to the change number index and less
    *          than or equal to zero for entries added to any replica DB
-   * @param cookieString
-   *          a string representing the cookie of the newly added entry. This is
-   *          only meaningful for entries added to the change number index
+   * @param cookie
+   *          the cookie of the newly added entry. This is only meaningful for
+   *          entries added to the change number index
    * @param msg
    *          the update message of the newly added entry
    * @throws ChangelogException
    *           If a problem occurs while notifying of the newly added entry.
    */
   protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
-      String cookieString, UpdateMsg msg) throws ChangelogException
+      MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException
   {
-    ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookieString, msg);
+    ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg);
   }
 
   /**
@@ -561,12 +545,8 @@
     TRACER.debugError(msg.toString());
   }
 
-  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
-      final DN mcBaseDN) throws ChangelogException
+  private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException
   {
-    // update, so it becomes the previous cookie for the next change
-    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
-
     final int mcServerId = mcCSN.getServerId();
     final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
     final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -587,7 +567,6 @@
          * from the medium consistency RUV).
          */
         lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
-        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
       }
     }
 

--
Gitblit v1.10.0