From 6aa4fa5b4f71e830dba55f3ea3f9530737db2d8b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 17 Apr 2014 12:44:17 +0000
Subject: [PATCH] OPENDJ-1439 Change number stops progressing with cross domain replication 

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |   56 ++++++++++++++++++++++++++++++--------------------------
 1 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 5179986..1fe38e0 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -92,14 +92,14 @@
   private final MultiDomainServerState mediumConsistencyRUV =
       new MultiDomainServerState();
   /**
-   * Holds the cross domain medium consistency CSN for the current replication
-   * server.
+   * Holds the cross domain medium consistency baseDN and CSN for the current
+   * replication server.
    *
    * @see <a href=
    * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
    * >OpenDJ Domain Names - medium consistency CSN</a>
    */
-  private volatile CSN mediumConsistencyCSN;
+  private volatile Pair<DN, CSN> mediumConsistency;
 
   /**
    * Holds the last time each replica was seen alive, whether via updates or
@@ -182,7 +182,7 @@
     }
 
     lastAliveCSNs.update(baseDN, heartbeatCSN);
-    tryNotify(baseDN);
+    tryNotify();
   }
 
   /**
@@ -207,7 +207,7 @@
     // only keep the oldest CSN that will be the new cursor's starting point
     newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
     lastAliveCSNs.update(baseDN, csn);
-    tryNotify(baseDN);
+    tryNotify();
   }
 
   /**
@@ -239,16 +239,16 @@
   {
     replicasOffline.update(baseDN, offlineCSN);
     lastAliveCSNs.update(baseDN, offlineCSN);
-    tryNotify(baseDN);
+    tryNotify();
   }
 
   /**
    * Notifies the Change number indexer thread if it will be able to do some
    * work.
    */
-  private void tryNotify(DN baseDN)
+  private void tryNotify()
   {
-    if (canMoveForwardMediumConsistencyPoint(baseDN))
+    if (canMoveForwardMediumConsistencyPoint())
     {
       synchronized (this)
       {
@@ -257,13 +257,14 @@
     }
   }
 
-  private boolean canMoveForwardMediumConsistencyPoint(DN baseDN)
+  private boolean canMoveForwardMediumConsistencyPoint()
   {
-    final CSN mcCSN = mediumConsistencyCSN;
-    if (mcCSN != null)
+    final Pair<DN, CSN> mc = mediumConsistency;
+    if (mc != null)
     {
-      final int serverId = mcCSN.getServerId();
-      CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
+      final CSN mcCSN = mc.getSecond();
+      final CSN lastTimeSameReplicaSeenAlive =
+          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
       return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
     }
     return true;
@@ -441,7 +442,8 @@
               }
               wait();
             }
-            // advance cursor, success/failure will be checked later
+            // try to recycle the exhausted cursors,
+            // success/failure will be checked later
             nextChangeForInsertDBCursor.next();
             // loop to check whether new changes have been added to the
             // ReplicaDBs
@@ -452,7 +454,7 @@
           final DN baseDN = nextChangeForInsertDBCursor.getData();
           // FIXME problem: what if the serverId is not part of the ServerState?
           // right now, change number will be blocked
-          if (!canMoveForwardMediumConsistencyPoint(baseDN))
+          if (!canMoveForwardMediumConsistencyPoint())
           {
             // the oldest record to insert is newer than the medium consistency
             // point. Let's wait for a change that can be published.
@@ -460,7 +462,7 @@
             {
               // double check to protect against a missed call to notify()
               if (!isShutdownInitiated()
-                  && !canMoveForwardMediumConsistencyPoint(baseDN))
+                  && !canMoveForwardMediumConsistencyPoint())
               {
                 wait();
                 // loop to check if changes older than the medium consistency
@@ -479,7 +481,8 @@
           changelogDB.getChangeNumberIndexDB().addRecord(record);
           moveForwardMediumConsistencyPoint(csn, baseDN);
 
-          // advance cursor, success/failure will be checked later
+          // advance the cursor we just read from,
+          // success/failure will be checked later
           nextChangeForInsertDBCursor.next();
         }
         catch (InterruptedException ignored)
@@ -517,20 +520,21 @@
     }
   }
 
-  private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN)
+  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
+      final DN mcBaseDN)
   {
     // update, so it becomes the previous cookie for the next change
-    mediumConsistencyRUV.update(baseDN, csn);
-    mediumConsistencyCSN = csn;
-    final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
+    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
+    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
+    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
     if (offlineCSN != null
-        && offlineCSN.isOlderThan(mediumConsistencyCSN)
+        && offlineCSN.isOlderThan(mcCSN)
         // If no new updates has been seen for this replica
-        && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
+        && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
     {
-      removeCursor(baseDN, csn);
-      replicasOffline.removeCSN(baseDN, offlineCSN);
-      mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
+      removeCursor(mcBaseDN, mcCSN);
+      replicasOffline.removeCSN(mcBaseDN, offlineCSN);
+      mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
     }
   }
 

--
Gitblit v1.10.0