From de36fa06856d8d04652401bb24e49c3259aef154 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 10:26:42 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |   51 +++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 41 insertions(+), 10 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 3b44c84..5ee48e5 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -103,7 +103,8 @@
 
   /**
    * Holds the last time each replica was seen alive, whether via updates or
-   * heartbeats received. Data is held for each serverId cross domain.
+   * heartbeat notifications, or offline notifications. Data is held for each
+   * serverId cross domain.
    * <p>
    * Updates are persistent and stored in the replicaDBs, heartbeats are
    * transient and are easily constructed on normal operations.
@@ -221,6 +222,7 @@
    * @return true if the provided baseDN is enabled for the external changelog,
    *         false if the provided baseDN is disabled for the external changelog
    *         or unknown to multimaster replication.
+   * @see MultimasterReplication#isECLEnabledDomain(DN)
    */
   protected boolean isECLEnabledDomain(DN baseDN)
   {
@@ -351,6 +353,20 @@
       nextChangeForInsertDBCursor.next();
     }
 
+    for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
+        .entrySet())
+    {
+      final DN baseDN = entry.getKey();
+      final List<CSN> offlineCSNs = entry.getValue();
+      for (CSN offlineCSN : offlineCSNs)
+      {
+        if (isECLEnabledDomain(baseDN))
+        {
+          replicasOffline.update(baseDN, offlineCSN);
+        }
+      }
+    }
+
     // this will not be used any more. Discard for garbage collection.
     this.changelogState = null;
   }
@@ -554,20 +570,33 @@
   }
 
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
-      final DN mcBaseDN)
+      final DN mcBaseDN) throws ChangelogException
   {
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
     mediumConsistency = Pair.of(mcBaseDN, mcCSN);
-    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
-    if (offlineCSN != null
-        && offlineCSN.isOlderThan(mcCSN)
-        // If no new updates has been seen for this replica
-        && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
+    final int mcServerId = mcCSN.getServerId();
+    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
+    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
+    if (offlineCSN != null)
     {
-      removeCursor(mcBaseDN, mcCSN);
-      replicasOffline.removeCSN(mcBaseDN, offlineCSN);
-      mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+      if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN))
+      {
+        // replica is back online, we can forget the last time it was offline
+        replicasOffline.removeCSN(mcBaseDN, offlineCSN);
+      }
+      else if (offlineCSN.isOlderThan(mcCSN))
+      {
+        /*
+         * replica is not back online and Medium consistency point has gone past
+         * its last offline time: remove everything known about it: cursor,
+         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
+         * from the medium consistency RUV.
+         */
+        removeCursor(mcBaseDN, mcCSN);
+        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+      }
     }
   }
 
@@ -587,6 +616,7 @@
   }
 
   private void removeCursor(final DN baseDN, final CSN csn)
+      throws ChangelogException
   {
     for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
         : allCursors.entrySet())
@@ -601,6 +631,7 @@
           {
             iter.remove();
             StaticUtils.close(entry2.getValue());
+            resetNextChangeForInsertDBCursor();
             return;
           }
         }

--
Gitblit v1.10.0