From 5eb7b26eabf15d047fd913597aa508bb78c1d7e7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 20 May 2014 10:14:29 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3563) Make the Medium Consistency Point support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |   77 +++++++++++++++++++++++++++-----------
 1 files changed, 54 insertions(+), 23 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index c93ca98..628ca99 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -271,16 +271,32 @@
     }
     // ensure that all initial replicas alive information have been updated
     // with CSNs that are acceptable for moving the medium consistency forward
-    return allInitialReplicasArePastOldestPossibleCSN();
+    return allInitialReplicasAreOfflineOrAlive();
   }
 
-  private boolean allInitialReplicasArePastOldestPossibleCSN()
+  /**
+   * Returns true only if the initial replicas known from the changelog state DB
+   * are either:
+   * <ul>
+   * <li>offline, so do not wait for them in order to compute medium consistency
+   * </li>
+   * <li>alive, because we received heartbeats or changes (so their last alive
+   * CSN has been updated to something past the oldest possible CSN), we have
+   * enough info to compute medium consistency</li>
+   * </ul>
+   * In this case, we have enough information to compute medium consistency
+   * without waiting any more.
+   */
+  private boolean allInitialReplicasAreOfflineOrAlive()
   {
     for (DN baseDN : lastAliveCSNs)
     {
       for (CSN csn : lastAliveCSNs.getServerState(baseDN))
       {
-        if (csn.getTime() == 0)
+        if (// oldest possible CSN?
+            csn.getTime() == 0
+            // replica is not offline
+            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
         {
           return false;
         }
@@ -363,6 +379,9 @@
         if (isECLEnabledDomain(baseDN))
         {
           replicasOffline.update(baseDN, offlineCSN);
+          // a replica offline message could also be the very last time
+          // we heard from this replica :)
+          lastAliveCSNs.update(baseDN, offlineCSN);
         }
       }
     }
@@ -528,10 +547,6 @@
               new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
           changelogDB.getChangeNumberIndexDB().addRecord(record);
           moveForwardMediumConsistencyPoint(csn, baseDN);
-
-          // advance the cursor we just read from,
-          // success/failure will be checked later
-          nextChangeForInsertDBCursor.next();
         }
         catch (InterruptedException ignored)
         {
@@ -571,6 +586,7 @@
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
       final DN mcBaseDN) throws ChangelogException
   {
+    boolean callNextOnCursor = true;
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
     mediumConsistency = Pair.of(mcBaseDN, mcCSN);
@@ -586,17 +602,34 @@
       }
       else if (offlineCSN.isOlderThan(mcCSN))
       {
-        /*
-         * replica is not back online and Medium consistency point has gone past
-         * its last offline time: remove everything known about it: cursor,
-         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
-         * from the medium consistency RUV.
-         */
-        removeCursor(mcBaseDN, mcCSN);
-        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
-        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
+            pair = getCursor(mcBaseDN, mcCSN.getServerId());
+        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
+        if (iter != null && !iter.hasNext())
+        {
+          /*
+           * replica is not back online, Medium consistency point has gone past
+           * its last offline time, and there are no more changes after the
+           * offline CSN in the cursor: remove everything known about it:
+           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+           * this replica from the medium consistency RUV.
+           */
+          iter.remove();
+          StaticUtils.close(pair.getFirst());
+          resetNextChangeForInsertDBCursor();
+          callNextOnCursor = false;
+          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+        }
       }
     }
+
+    if (callNextOnCursor)
+    {
+      // advance the cursor we just read from,
+      // success/failure will be checked later
+      nextChangeForInsertDBCursor.next();
+    }
   }
 
   private void removeAllCursors()
@@ -614,8 +647,8 @@
     newCursors.clear();
   }
 
-  private void removeCursor(final DN baseDN, final CSN csn)
-      throws ChangelogException
+  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
+      getCursor(final DN baseDN, final int serverId) throws ChangelogException
   {
     for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
         : allCursors.entrySet())
@@ -626,16 +659,14 @@
             entry1.getValue().entrySet().iterator(); iter.hasNext();)
         {
           final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
-          if (csn.getServerId() == entry2.getKey())
+          if (serverId == entry2.getKey())
           {
-            iter.remove();
-            StaticUtils.close(entry2.getValue());
-            resetNextChangeForInsertDBCursor();
-            return;
+            return Pair.of(entry2.getValue(), iter);
           }
         }
       }
     }
+    return Pair.empty();
   }
 
   private boolean recycleExhaustedCursors() throws ChangelogException

--
Gitblit v1.10.0