From 5eb7b26eabf15d047fd913597aa508bb78c1d7e7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 20 May 2014 10:14:29 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3563) Make the Medium Consistency Point support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |   77 ++++++++++----
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |  182 ++++++++++++++++++++++++++++++++++--
 2 files changed, 225 insertions(+), 34 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index c93ca98..628ca99 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -271,16 +271,32 @@
     }
     // ensure that all initial replicas alive information have been updated
     // with CSNs that are acceptable for moving the medium consistency forward
-    return allInitialReplicasArePastOldestPossibleCSN();
+    return allInitialReplicasAreOfflineOrAlive();
   }
 
-  private boolean allInitialReplicasArePastOldestPossibleCSN()
+  /**
+   * Returns true only if the initial replicas known from the changelog state DB
+   * are either:
+   * <ul>
+   * <li>offline, so do not wait for them in order to compute medium consistency
+   * </li>
+   * <li>alive, because we received heartbeats or changes (so their last alive
+   * CSN has been updated to something past the oldest possible CSN), we have
+   * enough info to compute medium consistency</li>
+   * </ul>
+   * In this case, we have enough information to compute medium consistency
+   * without waiting any more.
+   */
+  private boolean allInitialReplicasAreOfflineOrAlive()
   {
     for (DN baseDN : lastAliveCSNs)
     {
       for (CSN csn : lastAliveCSNs.getServerState(baseDN))
       {
-        if (csn.getTime() == 0)
+        if (// oldest possible CSN?
+            csn.getTime() == 0
+            // replica is not offline
+            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
         {
           return false;
         }
@@ -363,6 +379,9 @@
         if (isECLEnabledDomain(baseDN))
         {
           replicasOffline.update(baseDN, offlineCSN);
+          // a replica offline message could also be the very last time
+          // we heard from this replica :)
+          lastAliveCSNs.update(baseDN, offlineCSN);
         }
       }
     }
@@ -528,10 +547,6 @@
               new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
           changelogDB.getChangeNumberIndexDB().addRecord(record);
           moveForwardMediumConsistencyPoint(csn, baseDN);
-
-          // advance the cursor we just read from,
-          // success/failure will be checked later
-          nextChangeForInsertDBCursor.next();
         }
         catch (InterruptedException ignored)
         {
@@ -571,6 +586,7 @@
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
       final DN mcBaseDN) throws ChangelogException
   {
+    boolean callNextOnCursor = true;
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
     mediumConsistency = Pair.of(mcBaseDN, mcCSN);
@@ -586,17 +602,34 @@
       }
       else if (offlineCSN.isOlderThan(mcCSN))
       {
-        /*
-         * replica is not back online and Medium consistency point has gone past
-         * its last offline time: remove everything known about it: cursor,
-         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
-         * from the medium consistency RUV.
-         */
-        removeCursor(mcBaseDN, mcCSN);
-        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
-        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
+            pair = getCursor(mcBaseDN, mcCSN.getServerId());
+        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
+        if (iter != null && !iter.hasNext())
+        {
+          /*
+           * replica is not back online, Medium consistency point has gone past
+           * its last offline time, and there are no more changes after the
+           * offline CSN in the cursor: remove everything known about it:
+           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+           * this replica from the medium consistency RUV.
+           */
+          iter.remove();
+          StaticUtils.close(pair.getFirst());
+          resetNextChangeForInsertDBCursor();
+          callNextOnCursor = false;
+          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+        }
       }
     }
+
+    if (callNextOnCursor)
+    {
+      // advance the cursor we just read from,
+      // success/failure will be checked later
+      nextChangeForInsertDBCursor.next();
+    }
   }
 
   private void removeAllCursors()
@@ -614,8 +647,8 @@
     newCursors.clear();
   }
 
-  private void removeCursor(final DN baseDN, final CSN csn)
-      throws ChangelogException
+  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
+      getCursor(final DN baseDN, final int serverId) throws ChangelogException
   {
     for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
         : allCursors.entrySet())
@@ -626,16 +659,14 @@
             entry1.getValue().entrySet().iterator(); iter.hasNext();)
         {
           final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
-          if (csn.getServerId() == entry2.getKey())
+          if (serverId == entry2.getKey())
           {
-            iter.remove();
-            StaticUtils.close(entry2.getValue());
-            resetNextChangeForInsertDBCursor();
-            return;
+            return Pair.of(entry2.getValue(), iter);
           }
         }
       }
     }
+    return Pair.empty();
   }
 
   private boolean recycleExhaustedCursors() throws ChangelogException
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 9b472ed..05e9770 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -32,6 +32,8 @@
 import java.util.Map;
 
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.replication.common.CSN;
@@ -52,6 +54,23 @@
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
+/**
+ * Test for ChangeNumberIndexer class. All dependencies to the changelog DB
+ * interfaces are mocked. The ChangeNumberIndexer class simulates what the RS
+ * does to compute a changeNumber. The tests setup various topologies with their
+ * replicas.
+ * <p>
+ * All tests are written with this layout:
+ * <ul>
+ * <li>Initial setup where RS is stopped. Data are set into the changelog state
+ * DB, the replica DBs and the change number index DB.</li>
+ * <li>Simulate RS startup by calling {@link #startCNIndexer(DN...)}. This will
+ * start the change number indexer thread that will start computing change
+ * numbers and inserting them in the change number index db.</li>
+ * <li>Send events to the change number indexer thread by publishing update
+ * messages, sending heartbeat messages or replica offline messages.</li>
+ * </ul>
+ */
 @SuppressWarnings("javadoc")
 public class ChangeNumberIndexerTest extends DirectoryServerTestCase
 {
@@ -102,12 +121,15 @@
   private static final int serverId2 = 102;
   private static final int serverId3 = 103;
 
+  @Mock
   private ChangelogDB changelogDB;
+  @Mock
   private ChangeNumberIndexDB cnIndexDB;
+  @Mock
   private ReplicationDomainDB domainDB;
-  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
-      new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
   private ChangelogState initialState;
+  private Map<DN, ServerState> domainNewestCSNs;
   private ChangeNumberIndexer cnIndexer;
   private MultiDomainServerState initialCookie;
 
@@ -129,14 +151,14 @@
   @BeforeMethod
   public void setup() throws Exception
   {
-    changelogDB = mock(ChangelogDB.class);
-    cnIndexDB = mock(ChangeNumberIndexDB.class);
-    domainDB = mock(ReplicationDomainDB.class);
+    MockitoAnnotations.initMocks(this);
     when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
     when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
 
     initialState = new ChangelogState();
     initialCookie = new MultiDomainServerState();
+    cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+    domainNewestCSNs = new HashMap<DN, ServerState>();
   }
 
   @AfterMethod
@@ -159,6 +181,7 @@
   {
     addReplica(BASE_DN1, serverId1);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
@@ -170,8 +193,9 @@
   {
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     addReplica(BASE_DN1, serverId1);
-    setDBInitialRecords(msg1);
+    setCNIndexDBInitialRecords(msg1);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
     publishUpdateMsg(msg2);
@@ -184,6 +208,7 @@
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     // simulate messages received out of order
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -201,6 +226,7 @@
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN2, serverId2);
     startCNIndexer(BASE_DN1, BASE_DN2);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
@@ -234,6 +260,7 @@
   {
     addReplica(BASE_DN1, serverId1);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
@@ -263,8 +290,9 @@
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    setDBInitialRecords(msg1, msg2);
+    setCNIndexDBInitialRecords(msg1, msg2);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
     final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
@@ -286,6 +314,7 @@
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
     final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
@@ -303,6 +332,7 @@
     addReplica(BASE_DN1, serverId2);
     addReplica(BASE_DN1, serverId3);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     // cn=admin data will does not participate in the external changelog
     // so it cannot add to it
@@ -321,6 +351,7 @@
   {
     addReplica(BASE_DN1, serverId1);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
@@ -340,6 +371,7 @@
   {
     addReplica(BASE_DN1, serverId1);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
@@ -359,6 +391,7 @@
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -375,6 +408,7 @@
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -401,16 +435,128 @@
     assertExternalChangelogContent(msg1, msg2, msg4, msg5);
   }
 
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
+  {
+    addReplica(BASE_DN1, serverId1);
+    addReplica(BASE_DN1, serverId2);
+    initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
+    startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
+
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
+    publishUpdateMsg(msg2);
+    // MCP does not wait for temporarily offline serverId1
+    assertExternalChangelogContent(msg2);
+
+    // serverId1 is back online, wait for changes from serverId2
+    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+    publishUpdateMsg(msg3);
+    assertExternalChangelogContent(msg2);
+    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
+    publishUpdateMsg(msg4);
+    // MCP moves forward
+    assertExternalChangelogContent(msg2, msg3);
+  }
+
+  /**
+   * Scenario:
+   * <ol>
+   * <li>Replica 1 publishes one change</li>
+   * <li>Replica 1 sends offline message</li>
+   * <li>RS stops</li>
+   * <li>RS starts</li>
+   * </ol>
+   */
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
+  {
+    addReplica(BASE_DN1, serverId1);
+    addReplica(BASE_DN1, serverId2);
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+    publishUpdateMsg(msg1);
+    initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
+    startCNIndexer(BASE_DN1);
+
+    // blocked until we receive info for serverId2
+    assertExternalChangelogContent();
+
+    sendHeartbeat(BASE_DN1, serverId2, 3);
+    // MCP moves forward
+    assertExternalChangelogContent(msg1);
+
+    // do not wait for temporarily offline serverId1
+    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
+    publishUpdateMsg(msg3);
+    assertExternalChangelogContent(msg1, msg3);
+
+    // serverId1 is back online, wait for changes from serverId2
+    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
+    publishUpdateMsg(msg4);
+    assertExternalChangelogContent(msg1, msg3);
+
+    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
+    publishUpdateMsg(msg5);
+    // MCP moves forward
+    assertExternalChangelogContent(msg1, msg3, msg4);
+  }
+
+  /**
+   * Scenario:
+   * <ol>
+   * <li>Replica 1 sends offline message</li>
+   * <li>Replica 1 starts</li>
+   * <li>Replica 1 publishes one change</li>
+   * <li>Replica 1 publishes a second change</li>
+   * <li>RS stops</li>
+   * <li>RS starts</li>
+   * </ol>
+   */
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
+  {
+    addReplica(BASE_DN1, serverId1);
+    addReplica(BASE_DN1, serverId2);
+    initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
+    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+    publishUpdateMsg(msg2, msg3);
+    startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
+
+    // MCP moves forward because serverId1 is not really offline
+    // since because we received a message from it after the offline replica msg
+    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
+    publishUpdateMsg(msg4);
+    assertExternalChangelogContent(msg2, msg3);
+
+    // back to normal operations
+    sendHeartbeat(BASE_DN1, serverId1, 4);
+    assertExternalChangelogContent(msg2, msg3, msg4);
+  }
+
   private void addReplica(DN baseDN, int serverId) throws Exception
   {
     final SequentialDBCursor cursor = new SequentialDBCursor();
     cursors.put(Pair.of(baseDN, serverId), cursor);
     when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
         .thenReturn(cursor);
-    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(new ServerState());
+    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
+        getDomainNewestCSNs(baseDN));
     initialState.addServerIdToDomain(serverId, baseDN);
   }
 
+  private ServerState getDomainNewestCSNs(final DN baseDN)
+  {
+    ServerState serverState = domainNewestCSNs.get(baseDN);
+    if (serverState == null)
+    {
+      serverState = new ServerState();
+      domainNewestCSNs.put(baseDN, serverState);
+    }
+    return serverState;
+  }
+
   private void startCNIndexer(DN... eclEnabledDomains)
   {
     final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
@@ -431,8 +577,8 @@
     if (cnIndexer != null)
     {
       cnIndexer.initiateShutdown();
-      cnIndexer.interrupt();
       cnIndexer.join();
+      cnIndexer = null;
     }
   }
 
@@ -446,7 +592,7 @@
     return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
   }
 
-  private void setDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
+  private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
   {
     // Initialize the previous cookie that will be used to compare the records
     // added to the CNIndexDB at the end of this test
@@ -488,7 +634,16 @@
     {
       if (!msg.isEmptyCursor())
       {
-        cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
+        if (cnIndexer != null)
+        {
+          // indexer is running
+          cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
+        }
+        else
+        {
+          // we are only setting up initial state, update the domain newest CSNs
+          getDomainNewestCSNs(msg.getBaseDN()).update(msg.getCSN());
+        }
       }
     }
     waitForWaitingState(cnIndexer);
@@ -508,6 +663,10 @@
 
   private void waitForWaitingState(final Thread t)
   {
+    if (t == null)
+    { // not started yet, do not wait
+      return;
+    }
     State state = t.getState();
     while (!state.equals(State.WAITING)
         && !state.equals(State.TIMED_WAITING)
@@ -567,6 +726,7 @@
   @Test(dataProvider = "precedingCSNDataProvider")
   public void getPrecedingCSN(CSN start, CSN expected)
   {
+    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
     CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
     assertThat(precedingCSN).isEqualTo(expected);
   }

--
Gitblit v1.10.0