From 5f803832687ee9d56deec9946d6be7f3772e7688 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Dec 2013 11:08:12 +0000
Subject: [PATCH] OPENDJ-1231 (CR-2724) Make the Medium Consistency Point support replica heartbeats and replicas shutting down

---
 opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java                        |   15 +++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                    |   24 +++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java             |   20 ++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java              |   39 ++++++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                       |   62 +-------------
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java |   36 +++-----
 6 files changed, 99 insertions(+), 97 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index cfed222..f6273b6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -209,6 +209,19 @@
   }
 
   /**
+   * Returns the ServerState associated to the provided replication domain's
+   * baseDN.
+   *
+   * @param baseDN
+   *          the replication domain's baseDN
+   * @return the associated ServerState
+   */
+  public ServerState getServerState(DN baseDN)
+  {
+    return list.get(baseDN);
+  }
+
+  /**
    * Returns the CSN associated to the provided replication domain's baseDN and
    * serverId.
    *
@@ -216,7 +229,7 @@
    *          the replication domain's baseDN
    * @param serverId
    *          the serverId
-   * @return the associated ServerState
+   * @return the associated CSN
    */
   public CSN getCSN(DN baseDN, int serverId)
   {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 91e799a..70ee7cd 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -167,8 +167,6 @@
    */
   private int assuredTimeoutTimerPurgeCounter = 0;
 
-  private ServerState ctHeartbeatState;
-
   /**
    * Creates a new ReplicationServerDomain associated to the baseDN.
    *
@@ -2558,19 +2556,6 @@
   }
 
   /**
-   * Return the state that contain for each server the time of eligibility.
-   * @return the state.
-   */
-  public ServerState getChangeTimeHeartbeatState()
-  {
-    if (ctHeartbeatState == null)
-    {
-      ctHeartbeatState = getLatestServerState().duplicate();
-    }
-    return ctHeartbeatState;
-  }
-
-  /**
    * Returns the oldest known state for the domain, made of the oldest CSN
    * stored for each serverId.
    * <p>
@@ -2593,31 +2578,13 @@
    *
    * @return the eligible CSN.
    */
-  public CSN getEligibleCSN()
+  CSN getEligibleCSN()
   {
     CSN eligibleCSN = null;
-
-    final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
-    for (final CSN replicaNewestCSN : newestCSNs)
+    for (final CSN lastAliveCSN : domainDB.getDomainLastAliveCSNs(baseDN))
     {
       // Should it be considered for eligibility ?
-      int serverId = replicaNewestCSN.getServerId();
-      CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId);
-
-      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
-      // then the domain is considered down and not considered for eligibility
-      /*
-      if ((heartbeatLastDN != null) &&
-          (TimeThread.getTime()- heartbeatLastDN.getTime() > 5000))
-      {
-        if (debugEnabled())
-          TRACER.debugInfo("In " + this.getName() +
-            " Server " + sid
-            + " is not considered for eligibility ... potentially down");
-        continue;
-      }
-      */
-
+      final int serverId = lastAliveCSN.getServerId();
       if (!isServerConnected(serverId))
       {
         if (debugEnabled())
@@ -2628,13 +2595,9 @@
         continue;
       }
 
-      if (eligibleCSN == null || replicaNewestCSN.isNewerThan(eligibleCSN))
+      if (eligibleCSN == null || lastAliveCSN.isNewerThan(eligibleCSN))
       {
-        eligibleCSN = replicaNewestCSN;
-      }
-      if (heartbeatLastCSN != null && heartbeatLastCSN.isNewerThan(eligibleCSN))
-      {
-        eligibleCSN = heartbeatLastCSN;
+        eligibleCSN = lastAliveCSN;
       }
     }
 
@@ -2671,7 +2634,7 @@
    * @param msg The message to process.
    */
   public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
-      ChangeTimeHeartbeatMsg msg )
+      ChangeTimeHeartbeatMsg msg)
   {
     try
     {
@@ -2689,7 +2652,7 @@
 
     try
     {
-      storeReceivedCTHeartbeat(msg.getCSN());
+      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
       if (senderHandler.isDataServer())
       {
         // If we are the first replication server warned,
@@ -2722,17 +2685,6 @@
   }
 
   /**
-   * Store a change time value received from a data server.
-   * @param csn The provided change time.
-   */
-  public void storeReceivedCTHeartbeat(CSN csn)
-  {
-    // TODO:Maybe we can spare processing by only storing CSN (timestamp)
-    // instead of a server state.
-    getChangeTimeHeartbeatState().update(csn);
-  }
-
-  /**
    * This methods count the changes, server by server :
    * - from a serverState start point
    * - to (inclusive) an end point (the provided endCSN).
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index ed3b7db..7fe4069 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -47,8 +47,8 @@
   long getDomainChangesCount(DN baseDN);
 
   /**
-   * Returns the oldest {@link CSN}s of each serverId for the specified
-   * replication domain.
+   * Returns the oldest {@link CSN}s from the replicaDBs for each serverId in
+   * the specified replication domain.
    *
    * @param baseDN
    *          the replication domain baseDN
@@ -59,8 +59,8 @@
   ServerState getDomainOldestCSNs(DN baseDN);
 
   /**
-   * Returns the newest {@link CSN}s of each serverId for the specified
-   * replication domain.
+   * Returns the newest {@link CSN}s from the replicaDBs for each serverId in
+   * the specified replication domain.
    *
    * @param baseDN
    *          the replication domain baseDN
@@ -71,6 +71,18 @@
   ServerState getDomainNewestCSNs(DN baseDN);
 
   /**
+   * Returns the last time each serverId was seen alive for the specified
+   * replication domain.
+   *
+   * @param baseDN
+   *          the replication domain baseDN
+   * @return a new ServerState object holding the {serverId => CSN} Map. Can be
+   *         null if the config that computes change numbers is set to false or
+   *         if domain is not replicated.
+   */
+  ServerState getDomainLastAliveCSNs(DN baseDN);
+
+  /**
    * Retrieves the latest trim date for the specified replication domain.
    * <p>
    * FIXME will be removed when ECLServerHandler will not be responsible anymore
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 4c46759..40cccc9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -101,12 +101,13 @@
   private volatile CSN mediumConsistencyCSN;
 
   /**
-   * Holds the most recent changes or heartbeats received for each serverIds
-   * cross domain. changes are stored in the replicaDBs and hence persistent,
-   * heartbeats are transient because they are easily constructed on normal
-   * operations.
+   * Holds the last time each replica was seen alive, whether via updates or
+   * heartbeats received. Data is held for each serverId cross domain.
+   * <p>
+   * Updates are persistent and stored in the replicaDBs, heartbeats are
+   * transient and are easily constructed on normal operations.
    */
-  private final MultiDomainServerState lastSeenUpdates =
+  private final MultiDomainServerState lastAliveCSNs =
       new MultiDomainServerState();
   private final MultiDomainServerState replicasOffline =
       new MultiDomainServerState();
@@ -174,7 +175,7 @@
    */
   public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
   {
-    lastSeenUpdates.update(baseDN, heartbeatCSN);
+    lastAliveCSNs.update(baseDN, heartbeatCSN);
     tryNotify(baseDN);
   }
 
@@ -192,13 +193,27 @@
       throws ChangelogException
   {
     final CSN csn = updateMsg.getCSN();
-    lastSeenUpdates.update(baseDN, csn);
+    lastAliveCSNs.update(baseDN, csn);
     // only keep the oldest CSN that will be the new cursor's starting point
     newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
     tryNotify(baseDN);
   }
 
   /**
+   * Returns the last time each serverId was seen alive for the specified
+   * replication domain.
+   *
+   * @param baseDN
+   *          the replication domain baseDN
+   * @return a new ServerState object holding the {serverId => CSN} Map. Can be
+   *         null if domain is not replicated.
+   */
+  public ServerState getDomainLastAliveCSNs(DN baseDN)
+  {
+    return lastAliveCSNs.getServerState(baseDN);
+  }
+
+  /**
    * Signals a replica went offline.
    *
    * @param baseDN
@@ -208,7 +223,7 @@
    */
   public void replicaOffline(DN baseDN, CSN offlineCSN)
   {
-    lastSeenUpdates.update(baseDN, offlineCSN);
+    lastAliveCSNs.update(baseDN, offlineCSN);
     replicasOffline.update(baseDN, offlineCSN);
     tryNotify(baseDN);
   }
@@ -234,8 +249,8 @@
     if (mcCSN != null)
     {
       final int serverId = mcCSN.getServerId();
-      final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
-      return mcCSN.isOlderThan(lastSeenSameServerId);
+      CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
+      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
     }
     return true;
   }
@@ -269,7 +284,7 @@
       }
 
       ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
-      lastSeenUpdates.update(baseDN, latestKnownState);
+      lastAliveCSNs.update(baseDN, latestKnownState);
     }
     resetNextChangeForInsertDBCursor();
 
@@ -491,7 +506,7 @@
     if (offlineCSN != null
         && offlineCSN.isOlderThan(mediumConsistencyCSN)
         // If no new updates has been seen for this replica
-        && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
+        && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
     {
       removeCursor(baseDN, csn);
       replicasOffline.removeCSN(baseDN, offlineCSN);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 61991fc..7a52433 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -539,6 +539,23 @@
 
   /** {@inheritDoc} */
   @Override
+  public ServerState getDomainLastAliveCSNs(DN baseDN)
+  {
+    final ChangeNumberIndexer indexer = this.cnIndexer.get();
+    if (indexer != null)
+    {
+      final ServerState results = indexer.getDomainLastAliveCSNs(baseDN);
+      if (results != null)
+      {
+        // return a copy to protect against concurrent modifications
+        return results.duplicate();
+      }
+    }
+    return null;
+  }
+
+  /** {@inheritDoc} */
+  @Override
   public void removeDomain(DN baseDN) throws ChangelogException
   {
     // Remember the first exception because :
@@ -791,8 +808,11 @@
   @Override
   public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
   {
-    // TODO implement this when the changelogDB will be responsible for
-    // maintaining the medium consistency point
+    final ChangeNumberIndexer indexer = cnIndexer.get();
+    if (indexer != null)
+    {
+      indexer.publishHeartbeat(baseDN, heartbeatCSN);
+    }
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index e347da3..beb510a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -552,9 +552,8 @@
     debugInfo(tn, "Starting test\n\n");
 
     // root entry returned
-    final InternalSearchOperation op = searchOnChangelog(
-        "(objectclass=*)", Collections.<String>emptySet(), createControls(""), 1, tn);
-    waitOpResult(op, ResultCode.SUCCESS);
+    searchOnChangelog("(objectclass=*)", Collections.<String> emptySet(), createControls(""),
+        1, ResultCode.SUCCESS, tn);
 
     debugInfo(tn, "Ending test successfully");
   }
@@ -585,8 +584,7 @@
   /** Add an entry in the database */
   private void addEntry(Entry entry) throws Exception
   {
-    AddOperation addOp = connection.processAdd(entry);
-    waitOpResult(addOp, ResultCode.SUCCESS);
+    waitOpResult(connection.processAdd(entry), ResultCode.SUCCESS);
     assertNotNull(getEntry(entry.getDN(), 1000, true));
   }
 
@@ -868,10 +866,8 @@
       throws Exception
   {
     debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]");
-    final InternalSearchOperation searchOp = searchOnChangelog(
-        filterString, ALL_ATTRIBUTES, createControls(cookie), expectedNbEntries, testName);
-    waitOpResult(searchOp, expectedResultCode);
-    return searchOp;
+    return searchOnChangelog(filterString, ALL_ATTRIBUTES, createControls(cookie),
+        expectedNbEntries, expectedResultCode, testName);
   }
 
   private InternalSearchOperation searchOnChangelog(String filterString,
@@ -879,15 +875,13 @@
       throws Exception
   {
     debugInfo(testName, " Search: " + filterString);
-    final InternalSearchOperation searchOp = searchOnChangelog(
-        filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, testName);
-    waitOpResult(searchOp, expectedResultCode);
-    return searchOp;
+    return searchOnChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL,
+        expectedNbEntries, expectedResultCode, testName);
   }
 
   private InternalSearchOperation searchOnChangelog(String filterString,
       Set<String> attributes, List<Control> controls, int expectedNbEntries,
-      String testName) throws Exception
+      ResultCode expectedResultCode, String testName) throws Exception
   {
     InternalSearchOperation op = null;
     int cnt = 0;
@@ -912,6 +906,7 @@
     final List<SearchResultEntry> entries = op.getSearchEntries();
     assertThat(entries).hasSize(expectedNbEntries);
     debugAndWriteEntries(getLDIFWriter(), entries, testName);
+    waitOpResult(op, expectedResultCode);
     return op;
   }
 
@@ -2104,7 +2099,6 @@
       ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
       debugInfo(tn, rsd1.getBaseDN()
           + " LatestServerState=" + rsd1.getLatestServerState()
-          + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState()
           + " eligibleCSN=" + rsd1.getEligibleCSN()
           + " rs eligibleCSN=" + replicationServer.getEligibleCSN(null));
       // FIXME:ECL Enable this test by adding an assert on the right value
@@ -2112,7 +2106,6 @@
       ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2);
       debugInfo(tn, rsd2.getBaseDN()
           + " LatestServerState=" + rsd2.getLatestServerState()
-          + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState()
           + " eligibleCSN=" + rsd2.getEligibleCSN()
           + " rs eligibleCSN=" + replicationServer.getEligibleCSN(null));
       // FIXME:ECL Enable this test by adding an assert on the right value
@@ -2882,13 +2875,10 @@
     }
     finally
     {
-      final DeleteOperation delOp1 = connection.processDelete(
-          DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2));
-      waitOpResult(delOp1, ResultCode.SUCCESS);
-      final DeleteOperation delOp2 = connection.processDelete(TEST_ROOT_DN2);
-      waitOpResult(delOp2, ResultCode.SUCCESS);
-      final DeleteOperation delOp3 = connection.processDelete(baseDN3);
-      waitOpResult(delOp3, ResultCode.SUCCESS);
+      final DN fionaDN = DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2);
+      waitOpResult(connection.processDelete(fionaDN), ResultCode.SUCCESS);
+      waitOpResult(connection.processDelete(TEST_ROOT_DN2), ResultCode.SUCCESS);
+      waitOpResult(connection.processDelete(baseDN3), ResultCode.SUCCESS);
 
       remove(domain21, domain2, domain3);
       removeTestBackend(backend2, backend3);

--
Gitblit v1.10.0