From 9a13d05fcb1b17c52c7b91b8445d334bce3f9e28 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 22 Nov 2013 09:24:10 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |   55 +++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index dbb68f7..93b8cd3 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -64,8 +64,7 @@
   /*
    * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
    * 1) initialization can happen while the replication server starts receiving
-   * updates 2) many updates can happen concurrently. This solution also avoids
-   * using a queue that could fill up before we have consumed all its content.
+   * updates 2) many updates can happen concurrently.
    */
   /**
    * Holds the cross domain medium consistency Replication Update Vector for the
@@ -98,6 +97,8 @@
    */
   private final MultiDomainServerState lastSeenUpdates =
       new MultiDomainServerState();
+  private final MultiDomainServerState replicasOffline =
+      new MultiDomainServerState();
 
   /**
    * Composite cursor across all the replicaDBs for all the replication domains.
@@ -168,6 +169,21 @@
   }
 
   /**
+   * Signals a replica went offline.
+   *
+   * @param baseDN
+   *          the replica's replication domain
+   * @param offlineCSN
+   *          the serverId and time of the replica that went offline
+   */
+  public void replicaOffline(DN baseDN, CSN offlineCSN)
+  {
+    lastSeenUpdates.update(baseDN, offlineCSN);
+    replicasOffline.update(baseDN, offlineCSN);
+    tryNotify(baseDN);
+  }
+
+  /**
    * Notifies the Change number indexer thread if it will be able to do some
    * work.
    */
@@ -187,8 +203,8 @@
     final CSN mcCSN = mediumConsistencyCSN;
     if (mcCSN != null)
     {
-      final CSN lastSeenSameServerId =
-          lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
+      final int serverId = mcCSN.getServerId();
+      final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
       return mcCSN.isOlderThan(lastSeenSameServerId);
     }
     return true;
@@ -374,6 +390,37 @@
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(baseDN, csn);
     mediumConsistencyCSN = csn;
+    final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
+    if (offlineCSN != null
+        && offlineCSN.isOlderThan(mediumConsistencyCSN)
+        // If no new updates has been seen for this replica
+        && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
+    {
+      removeCursor(baseDN, csn);
+      replicasOffline.removeCSN(baseDN, offlineCSN);
+      mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
+    }
+  }
+
+  private void removeCursor(final DN baseDN, final CSN csn)
+  {
+    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
+        .entrySet())
+    {
+      if (baseDN.equals(entry.getKey()))
+      {
+        final Set<Integer> serverIds = entry.getValue().keySet();
+        for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
+        {
+          final int serverId = iter.next();
+          if (csn.getServerId() == serverId)
+          {
+            iter.remove();
+            return;
+          }
+        }
+      }
+    }
   }
 
   private void createNewCursors() throws ChangelogException

--
Gitblit v1.10.0