From 1cc3f8f7ff4172a409f8d0ccd4b5b2ab80c54c94 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 22 Nov 2013 08:24:47 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java                                       |   17 +++-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |  110 +++++++++++++++++----------
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |   48 ++++++++---
 opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java                                                  |   13 ++-
 4 files changed, 121 insertions(+), 67 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index 9465346..7afe2d2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -189,16 +189,23 @@
   }
 
   /**
-   * Returns the ServerState associated to the provided replication domain's
-   * baseDN.
+   * Returns the CSN associated to the provided replication domain's baseDN and
+   * serverId.
    *
    * @param baseDN
    *          the replication domain's baseDN
+   * @param serverId
+   *          the serverId
    * @return the associated ServerState
    */
-  public ServerState get(DN baseDN)
+  public CSN getCSN(DN baseDN, int serverId)
   {
-    return list.get(baseDN);
+    final ServerState ss = list.get(baseDN);
+    if (ss != null)
+    {
+      return ss.getCSN(serverId);
+    }
+    return null;
   }
 
   /**
@@ -260,7 +267,7 @@
   public static Map<DN, ServerState> splitGenStateToServerStates(
       String multiDomainServerState) throws DirectoryException
   {
-    final Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
+    Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
     if (multiDomainServerState != null && multiDomainServerState.length() > 0)
     {
       try
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 1b20cb8..ae33dff 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -140,9 +140,12 @@
   }
 
   /**
-   * Update the Server State with a CSN.
+   * Forward update the Server State with a CSN. The provided CSN will be put on
+   * the current object only if it is newer than the existing CSN for the same
+   * serverId or if there is no existing CSN.
    *
-   * @param csn The committed CSN.
+   * @param csn
+   *          The committed CSN.
    * @return a boolean indicating if the update was meaningful.
    */
   public boolean update(CSN csn)
@@ -154,9 +157,9 @@
 
     synchronized (serverIdToCSN)
     {
-      int serverId = csn.getServerId();
-      CSN oldCSN = serverIdToCSN.get(serverId);
-      if (oldCSN == null || csn.isNewerThan(oldCSN))
+      final int serverId = csn.getServerId();
+      final CSN existingCSN = serverIdToCSN.get(serverId);
+      if (existingCSN == null || csn.isNewerThan(existingCSN))
       {
         serverIdToCSN.put(serverId, csn);
         return true;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index c82bc4e..dbb68f7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import org.opends.messages.Message;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
@@ -61,29 +62,41 @@
   private ChangelogState changelogState;
 
   /*
-   * previousCookie and mediumConsistencyPoint must be thread safe, because
+   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
    * 1) initialization can happen while the replication server starts receiving
    * updates 2) many updates can happen concurrently. This solution also avoids
    * using a queue that could fill up before we have consumed all its content.
    */
   /**
+   * Holds the cross domain medium consistency Replication Update Vector for the
+   * current replication server, also known as the previous cookie.
+   * <p>
    * Stores the value of the cookie before the change currently processed is
    * inserted in the DB. After insert, it is updated with the CSN of the change
    * currently processed (thus becoming the "current" cookie just before the
    * change is returned.
-   */
-  private final MultiDomainServerState previousCookie =
-      new MultiDomainServerState();
-
-  /**
-   * Holds the medium consistency point for the current replication server.
    *
    * @see <a href=
    * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
-   * >OpenDJ Domain Names for a description of what the medium consistency point
-   * is</a>
+   * >OpenDJ Domain Names - medium consistency RUV</a>
    */
-  private final MultiDomainServerState mediumConsistencyPoint =
+  private final MultiDomainServerState mediumConsistencyRUV =
+      new MultiDomainServerState();
+  /**
+   * Holds the cross domain medium consistency CSN for the current replication
+   * server.
+   *
+   * @see <a href=
+   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
+   * >OpenDJ Domain Names - medium consistency CSN</a>
+   */
+  private volatile CSN mediumConsistencyCSN;
+
+  /**
+   * Holds the most recent changes or heartbeats received for each serverIds
+   * cross domain.
+   */
+  private final MultiDomainServerState lastSeenUpdates =
       new MultiDomainServerState();
 
   /**
@@ -103,8 +116,8 @@
   private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
       new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
   /** This map can be updated by multiple threads. */
-  private ConcurrentMap<Integer, DN> newCursors =
-      new ConcurrentSkipListMap<Integer, DN>();
+  private ConcurrentMap<CSN, DN> newCursors =
+      new ConcurrentSkipListMap<CSN, DN>();
 
   /**
    * Builds a ChangeNumberIndexer object.
@@ -131,11 +144,8 @@
    */
   public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
   {
-    mediumConsistencyPoint.update(baseDN, heartbeatCSN);
-    final CompositeDBCursor<DN> localCursor = crossDomainDBCursor;
-    final DN changeBaseDN = localCursor.getData();
-    final CSN changeCSN = localCursor.getRecord().getCSN();
-    tryNotify(changeBaseDN, changeCSN);
+    lastSeenUpdates.update(baseDN, heartbeatCSN);
+    tryNotify(baseDN);
   }
 
   /**
@@ -152,18 +162,18 @@
       throws ChangelogException
   {
     final CSN csn = updateMsg.getCSN();
-    mediumConsistencyPoint.update(baseDN, csn);
-    newCursors.put(csn.getServerId(), baseDN);
-    tryNotify(baseDN, csn);
+    lastSeenUpdates.update(baseDN, csn);
+    newCursors.put(csn, baseDN);
+    tryNotify(baseDN);
   }
 
   /**
    * Notifies the Change number indexer thread if it will be able to do some
    * work.
    */
-  private void tryNotify(final DN baseDN, final CSN csn)
+  private void tryNotify(DN baseDN)
   {
-    if (mediumConsistencyPoint.cover(baseDN, csn))
+    if (canMoveForwardMediumConsistencyPoint(baseDN))
     {
       synchronized (this)
       {
@@ -172,13 +182,25 @@
     }
   }
 
+  private boolean canMoveForwardMediumConsistencyPoint(DN baseDN)
+  {
+    final CSN mcCSN = mediumConsistencyCSN;
+    if (mcCSN != null)
+    {
+      final CSN lastSeenSameServerId =
+          lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
+      return mcCSN.isOlderThan(lastSeenSameServerId);
+    }
+    return true;
+  }
+
   private void initialize() throws ChangelogException, DirectoryException
   {
     final ChangeNumberIndexRecord newestRecord =
         changelogDB.getChangeNumberIndexDB().getNewestRecord();
     if (newestRecord != null)
     {
-      previousCookie.update(
+      mediumConsistencyRUV.update(
           new MultiDomainServerState(newestRecord.getPreviousCookie()));
     }
 
@@ -190,13 +212,12 @@
       final DN baseDN = entry.getKey();
       for (Integer serverId : entry.getValue())
       {
-        final ServerState previousSS = previousCookie.get(baseDN);
-        final CSN csn = previousSS != null ? previousSS.getCSN(serverId) : null;
+        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
         ensureCursorExists(baseDN, serverId, csn);
       }
 
       ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
-      mediumConsistencyPoint.update(baseDN, latestKnownState);
+      lastSeenUpdates.update(baseDN, latestKnownState);
     }
 
     crossDomainDBCursor = newCompositeDBCursor();
@@ -206,14 +227,11 @@
       final UpdateMsg record = crossDomainDBCursor.getRecord();
       if (!record.getCSN().equals(newestRecord.getCSN()))
       {
-        // TODO JNR remove
-        throw new RuntimeException("They do not equal! recordCSN="
-            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN());
+        // TODO JNR i18n safety check, should never happen
+        throw new ChangelogException(Message.raw("They do not equal! recordCSN="
+            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
       }
-      // TODO JNR is it possible to use the following line instead?
-      // previousCookie.update(newestRecord.getBaseDN(), record.getCSN());
-      // TODO JNR would this mean updating the if above?
-      previousCookie.update(crossDomainDBCursor.getData(), record.getCSN());
+      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
       crossDomainDBCursor.next();
     }
 
@@ -281,7 +299,7 @@
     }
     catch (ChangelogException e)
     {
-      // TODO Auto-generated catch block
+      // TODO JNR error message i18n
       if (debugEnabled())
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
       return;
@@ -310,14 +328,14 @@
         final DN baseDN = crossDomainDBCursor.getData();
         // FIXME problem: what if the serverId is not part of the ServerState?
         // right now, thread will be blocked
-        if (!mediumConsistencyPoint.cover(baseDN, csn))
+        if (!canMoveForwardMediumConsistencyPoint(baseDN))
         {
           // the oldest record to insert is newer than the medium consistency
           // point. Let's wait for a change that can be published.
           synchronized (this)
           {
             // double check to protect against a missed call to notify()
-            if (!mediumConsistencyPoint.cover(baseDN, csn))
+            if (!canMoveForwardMediumConsistencyPoint(baseDN))
             {
               wait();
               // loop to check if changes older than the medium consistency
@@ -329,11 +347,11 @@
 
         // OK, the oldest change is older than the medium consistency point
         // let's publish it to the CNIndexDB
+        final String previousCookie = mediumConsistencyRUV.toString();
         final ChangeNumberIndexRecord record =
-            new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn);
+            new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
         changelogDB.getChangeNumberIndexDB().addRecord(record);
-        // update, so it becomes the previous cookie for the next change
-        previousCookie.update(baseDN, csn);
+        moveForwardMediumConsistencyPoint(csn, baseDN);
 
         // advance cursor, success/failure will be checked later
         crossDomainDBCursor.next();
@@ -351,16 +369,24 @@
     }
   }
 
+  private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN)
+  {
+    // update, so it becomes the previous cookie for the next change
+    mediumConsistencyRUV.update(baseDN, csn);
+    mediumConsistencyCSN = csn;
+  }
+
   private void createNewCursors() throws ChangelogException
   {
     if (!newCursors.isEmpty())
     {
       boolean newCursorAdded = false;
-      for (Iterator<Entry<Integer, DN>> iter = newCursors.entrySet().iterator();
+      for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator();
           iter.hasNext();)
       {
-        final Entry<Integer, DN> entry = iter.next();
-        if (!ensureCursorExists(entry.getValue(), entry.getKey(), null))
+        final Entry<CSN, DN> entry = iter.next();
+        final CSN csn = entry.getKey();
+        if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null))
         {
           newCursorAdded = true;
         }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 0627df3..8cab868 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -27,6 +27,7 @@
 package org.opends.server.replication.server.changelog.je;
 
 import java.lang.Thread.State;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -106,7 +107,7 @@
       new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
   private ChangelogState initialState;
   private ChangeNumberIndexer indexer;
-  private MultiDomainServerState previousCookie;
+  private MultiDomainServerState initialCookie;
 
   @BeforeClass
   public static void classSetup() throws Exception
@@ -131,7 +132,7 @@
     when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
 
     initialState = new ChangelogState();
-    previousCookie = new MultiDomainServerState();
+    initialCookie = new MultiDomainServerState();
   }
 
 
@@ -181,7 +182,7 @@
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
     publishUpdateMsg(msg2, msg1);
 
-    assertAddedRecords(msg1, msg2);
+    assertAddedRecords(msg1);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -197,14 +198,20 @@
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
     final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
     publishUpdateMsg(msg3, msg4);
+    assertAddedRecords(msg3);
 
-    assertAddedRecords(msg3, msg4);
+    final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
+    publishUpdateMsg(msg5);
+    assertAddedRecords(msg3);
+
+    final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
+    publishUpdateMsg(msg6);
+    assertAddedRecords(msg3, msg4, msg5);
   }
 
-  @Test(enabled = false, dependsOnMethods = { EMPTY_DB_NO_DS })
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception
   {
-    // TODO JNR make this tests work
     addReplica(BASE_DN, serverId1);
     addReplica(BASE_DN, serverId2);
     startIndexer();
@@ -216,7 +223,7 @@
     // simulate no messages received during some time for replica 2
     publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
 
-    assertAddedRecords(msg1Sid2, msg2Sid1, msg3Sid2);
+    assertAddedRecords(msg1Sid2, msg2Sid1);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -231,7 +238,10 @@
     addReplica(BASE_DN, serverId2);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
     publishUpdateMsg(msg2);
+    assertAddedRecords(msg1);
 
+    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
+    publishUpdateMsg(msg3);
     assertAddedRecords(msg1, msg2);
   }
 
@@ -275,13 +285,13 @@
         final DN baseDN = newestMsg.getBaseDN();
         final CSN csn = newestMsg.getCSN();
         when(cnIndexDB.getNewestRecord()).thenReturn(
-            new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn));
+            new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
         final SequentialDBCursor cursor =
             cursors.get(Pair.of(baseDN, csn.getServerId()));
         cursor.add(newestMsg);
         cursor.next(); // simulate the cursor had been initialized with this change
       }
-      previousCookie.update(msg.getBaseDN(), msg.getCSN());
+      initialCookie.update(msg.getBaseDN(), msg.getCSN());
     }
   }
 
@@ -321,6 +331,10 @@
     assertThat(state).isEqualTo(State.WAITING);
   }
 
+  /**
+   * Asserts which records have been added to the CNIndexDB since starting the
+   * {@link ChangeNumberIndexer} thread.
+   */
   private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
   {
     final ArgumentCaptor<ChangeNumberIndexRecord> arg =
@@ -328,17 +342,21 @@
     verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
     final List<ChangeNumberIndexRecord> allValues = arg.getAllValues();
 
-    // recheck it was not called more than expected
-    assertThat(allValues).hasSameSizeAs(msgs);
+    // clone initial state to avoid modifying it
+    final MultiDomainServerState previousCookie =
+        new MultiDomainServerState(initialCookie.toString());
+    // check it was not called more than expected
+    String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(msgs) + ">";
+    assertThat(allValues.size()).as(desc1).isEqualTo(msgs.length);
     for (int i = 0; i < msgs.length; i++)
     {
       final ReplicatedUpdateMsg msg = msgs[i];
       final ChangeNumberIndexRecord record = allValues.get(i);
       // check content in order
-      String description = "expected: <" + msg + ">, but got: <" + record + ">";
-      assertThat(record.getBaseDN()).as(description).isEqualTo(msg.getBaseDN());
-      assertThat(record.getCSN()).as(description).isEqualTo(msg.getCSN());
-      assertThat(record.getPreviousCookie()).as(description).isEqualTo(previousCookie.toString());
+      String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
+      assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
+      assertThat(record.getCSN()).as(desc2).isEqualTo(msg.getCSN());
+      assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString());
       previousCookie.update(msg.getBaseDN(), msg.getCSN());
     }
   }

--
Gitblit v1.10.0