From 4d8fd404b195b5ae6b4115c33d95828dfc9f2662 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 13:09:39 +0000
Subject: [PATCH] OPENDJ-1440 On startup, change number can progress without waiting for any heartbeat from known replicas

---
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |   39 +++++++++----------
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |   30 ++++++++++++++
 2 files changed, 48 insertions(+), 21 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index ac445f7..152e914 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -265,6 +265,23 @@
           lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
       return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
     }
+    // ensure that all initial replicas alive information have been updated
+    // with CSNs that are acceptable for moving the medium consistency forward
+    return allInitialReplicasArePastOldestPossibleCSN();
+  }
+
+  private boolean allInitialReplicasArePastOldestPossibleCSN()
+  {
+    for (DN baseDN : lastAliveCSNs)
+    {
+      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
+      {
+        if (csn.getTime() == 0)
+        {
+          return false;
+        }
+      }
+    }
     return true;
   }
 
@@ -299,8 +316,14 @@
 
       for (Integer serverId : entry.getValue())
       {
-        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
+        /*
+         * initialize with the oldest possible CSN in order for medium
+         * consistency to wait for all replicas to be alive before moving
+         * forward
+         */
+        lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
         // start after the actual CSN when initializing from the previous cookie
+        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
         ensureCursorExists(baseDN, serverId, csn);
       }
 
@@ -330,6 +353,11 @@
     this.changelogState = null;
   }
 
+  private CSN oldestPossibleCSN(int serverId)
+  {
+    return new CSN(0, 0, serverId);
+  }
+
   private void resetNextChangeForInsertDBCursor() throws ChangelogException
   {
     final Map<DBCursor<UpdateMsg>, DN> cursors =
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index d23936e..43b8078 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -151,7 +151,7 @@
   @Test
   public void emptyDBNoDS() throws Exception
   {
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
     assertExternalChangelogContent();
   }
 
@@ -159,7 +159,7 @@
   public void emptyDBOneInitialDS() throws Exception
   {
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
@@ -172,7 +172,7 @@
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     addReplica(BASE_DN1, serverId1);
     setDBInitialRecords(msg1);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
     publishUpdateMsg(msg2);
@@ -184,11 +184,15 @@
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
+    // simulate messages received out of order
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
-    publishUpdateMsg(msg2, msg1);
+    publishUpdateMsg(msg2);
+    // do not start publishing to the changelog until we hear from serverId1
+    assertExternalChangelogContent();
+    publishUpdateMsg(msg1);
     assertExternalChangelogContent(msg1);
   }
 
@@ -197,7 +201,7 @@
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN2, serverId2);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1, BASE_DN2);
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
@@ -216,7 +220,7 @@
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     setDBInitialRecords(msg1, msg2);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
     final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
@@ -237,7 +241,7 @@
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
     final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
     final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
@@ -254,7 +258,7 @@
     addReplica(ADMIN_DATA_DN, serverId1);
     addReplica(BASE_DN1, serverId2);
     addReplica(BASE_DN1, serverId3);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
     // cn=admin data will does not participate in the external changelog
     // so it cannot add to it
@@ -272,7 +276,7 @@
   public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
   {
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
@@ -292,7 +296,7 @@
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -308,7 +312,7 @@
   {
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer();
+    startCNIndexer(BASE_DN1);
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -336,14 +340,15 @@
     initialState.addServerIdToDomain(serverId, baseDN);
   }
 
-  private void startCNIndexer()
+  private void startCNIndexer(DN... eclEnabledDomains)
   {
+    final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
     cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
     {
       @Override
       protected boolean isECLEnabledDomain(DN baseDN)
       {
-        return BASE_DN1.equals(baseDN) || BASE_DN2.equals(baseDN);
+        return eclEnabledDomainList.contains(baseDN);
       }
     };
     cnIndexer.start();
@@ -448,12 +453,6 @@
   private void assertExternalChangelogContent(ReplicatedUpdateMsg... expectedMsgs)
       throws Exception
   {
-    if (expectedMsgs.length == 0)
-    {
-      verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
-      return;
-    }
-
     final ArgumentCaptor<ChangeNumberIndexRecord> arg =
         ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
     verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());

--
Gitblit v1.10.0