From 60b4019ba512ad303ab5f0dbd06c3203ba53e940 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 05 Jun 2014 10:46:07 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3667) Change time heart beat change numbers should be synced with updates

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |   99 ++++++++++++++++++++++++++++++++-----------------
 1 files changed, 64 insertions(+), 35 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 2419dc4..8a068f6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -84,6 +84,8 @@
    * inserted in the DB. After insert, it is updated with the CSN of the change
    * currently processed (thus becoming the "current" cookie just before the
    * change is returned.
+   * <p>
+   * Note: This object is only updated by changes/updates.
    *
    * @see <a href=
    * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
@@ -91,15 +93,6 @@
    */
   private final MultiDomainServerState mediumConsistencyRUV =
       new MultiDomainServerState();
-  /**
-   * Holds the cross domain medium consistency baseDN and CSN for the current
-   * replication server.
-   *
-   * @see <a href=
-   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
-   * >OpenDJ Domain Names - medium consistency CSN</a>
-   */
-  private volatile Pair<DN, CSN> mediumConsistency;
 
   /**
    * Holds the last time each replica was seen alive, whether via updates or
@@ -108,9 +101,12 @@
    * <p>
    * Updates are persistent and stored in the replicaDBs, heartbeats are
    * transient and are easily constructed on normal operations.
+   * <p>
+   * Note: This object is updated by both heartbeats and changes/updates.
    */
   private final MultiDomainServerState lastAliveCSNs =
       new MultiDomainServerState();
+  /** Note: This object is updated by replica offline messages. */
   private final MultiDomainServerState replicasOffline =
       new MultiDomainServerState();
 
@@ -119,8 +115,12 @@
    * positioned on the next change that needs to be inserted in the CNIndexDB.
    * <p>
    * Note: it is only accessed from the {@link #run()} method.
+   *
+   * @NonNull
    */
-  private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
+  @SuppressWarnings("unchecked")
+  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
+      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
 
   /**
    * New cursors for this Map must be created from the {@link #run()} method,
@@ -182,8 +182,9 @@
       return;
     }
 
+    final CSN oldestCSNBefore = getOldestLastAliveCSN();
     lastAliveCSNs.update(baseDN, heartbeatCSN);
-    tryNotify();
+    tryNotify(oldestCSNBefore);
   }
 
   /**
@@ -222,8 +223,9 @@
     final CSN csn = updateMsg.getCSN();
     // only keep the oldest CSN that will be the new cursor's starting point
     newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
+    final CSN oldestCSNBefore = getOldestLastAliveCSN();
     lastAliveCSNs.update(baseDN, csn);
-    tryNotify();
+    tryNotify(oldestCSNBefore);
   }
 
   /**
@@ -254,18 +256,29 @@
    */
   public void replicaOffline(DN baseDN, CSN offlineCSN)
   {
+    if (!isECLEnabledDomain(baseDN))
+    {
+      return;
+    }
+
     replicasOffline.update(baseDN, offlineCSN);
+    final CSN oldestCSNBefore = getOldestLastAliveCSN();
     lastAliveCSNs.update(baseDN, offlineCSN);
-    tryNotify();
+    tryNotify(oldestCSNBefore);
+  }
+
+  private CSN getOldestLastAliveCSN()
+  {
+    return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
   }
 
   /**
    * Notifies the Change number indexer thread if it will be able to do some
    * work.
    */
-  private void tryNotify()
+  private void tryNotify(final CSN oldestCSNBefore)
   {
-    if (canMoveForwardMediumConsistencyPoint())
+    if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
     {
       synchronized (this)
       {
@@ -274,19 +287,32 @@
     }
   }
 
-  private boolean canMoveForwardMediumConsistencyPoint()
+  /**
+   * Used for waking up the {@link ChangeNumberIndexer} thread because it might
+   * have some work to do.
+   */
+  private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
   {
-    final Pair<DN, CSN> mc = mediumConsistency;
-    if (mc != null)
-    {
-      final CSN mcCSN = mc.getSecond();
-      final CSN lastTimeSameReplicaSeenAlive =
-          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
-      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
-    }
+    final CSN oldestCSNAfter = getOldestLastAliveCSN();
     // ensure that all initial replicas alive information have been updated
     // with CSNs that are acceptable for moving the medium consistency forward
-    return allInitialReplicasAreOfflineOrAlive();
+    return allInitialReplicasAreOfflineOrAlive()
+        && oldestCSNBefore != null // then oldestCSNAfter cannot be null
+        // has the oldest CSN changed?
+        && oldestCSNBefore.isOlderThan(oldestCSNAfter);
+  }
+
+  /**
+   * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
+   * must be persisted to the change number index DB.
+   */
+  private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
+  {
+    // ensure that all initial replicas alive information have been updated
+    // with CSNs that are acceptable for moving the medium consistency forward
+    return allInitialReplicasAreOfflineOrAlive()
+        // can we persist the next CSN?
+        && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
   }
 
   /**
@@ -299,8 +325,8 @@
    * CSN has been updated to something past the oldest possible CSN), we have
    * enough info to compute medium consistency</li>
    * </ul>
-   * In this case, we have enough information to compute medium consistency
-   * without waiting any more.
+   * In both cases, we have enough information to compute medium consistency
+   * without waiting any further.
    */
   private boolean allInitialReplicasAreOfflineOrAlive()
   {
@@ -308,11 +334,11 @@
     {
       for (CSN csn : lastAliveCSNs.getServerState(baseDN))
       {
-        if (// oldest possible CSN?
-            csn.getTime() == 0
-            // replica is not offline
+        if (csn.getTime() == 0
             && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
         {
+          // this is the oldest possible CSN, but the replica is not offline
+          // we must wait for more up to date information from this replica
           return false;
         }
       }
@@ -537,16 +563,19 @@
           final DN baseDN = nextChangeForInsertDBCursor.getData();
           // FIXME problem: what if the serverId is not part of the ServerState?
           // right now, change number will be blocked
-          if (!canMoveForwardMediumConsistencyPoint())
+          if (!canMoveForwardMediumConsistencyPoint(csn))
           {
             // the oldest record to insert is newer than the medium consistency
             // point. Let's wait for a change that can be published.
             synchronized (this)
             {
               // double check to protect against a missed call to notify()
-              if (!isShutdownInitiated()
-                  && !canMoveForwardMediumConsistencyPoint())
+              if (!canMoveForwardMediumConsistencyPoint(csn))
               {
+                if (isShutdownInitiated())
+                {
+                  return;
+                }
                 wait();
                 // loop to check if changes older than the medium consistency
                 // point have been added to the ReplicaDBs
@@ -601,10 +630,10 @@
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
       final DN mcBaseDN) throws ChangelogException
   {
-    boolean callNextOnCursor = true;
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
-    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
+
+    boolean callNextOnCursor = true;
     final int mcServerId = mcCSN.getServerId();
     final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
     final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);

--
Gitblit v1.10.0