From 8c0b9cbba08dee4c4ad6fe00357a018cdb54e280 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 05 Jun 2014 10:46:07 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3667) Change time heart beat change numbers should be synced with updates

---
 opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java                                       |   53 +++++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |   99 ++++++++++-----
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java           |  109 ++++++++++++++----
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |   46 +++++-
 4 files changed, 229 insertions(+), 78 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index f6273b6..f451cab 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.common;
 
@@ -40,6 +40,8 @@
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
 
+import com.forgerock.opendj.util.Pair;
+
 import static org.opends.messages.ReplicationMessages.*;
 
 /**
@@ -98,7 +100,9 @@
   public boolean update(DN baseDN, CSN csn)
   {
     if (csn == null)
+    {
       return false;
+    }
 
     ServerState serverState = list.get(baseDN);
     if (serverState == null)
@@ -242,6 +246,45 @@
   }
 
   /**
+   * Returns the oldest Pair&lt;DN, CSN&gt; held in current object, excluding
+   * the provided CSNs. Said otherwise, the value returned is the oldest
+   * Pair&lt;DN, CSN&gt; included in the current object, that is not part of the
+   * excludedCSNs.
+   *
+   * @param excludedCSNs
+   *          the CSNs that cannot be returned
+   * @return the oldest Pair&lt;DN, CSN&gt; included in the current object that
+   *         is not part of the excludedCSNs, or {@link Pair#EMPTY} if no such
+   *         older CSN exists.
+   */
+  public Pair<DN, CSN> getOldestCSNExcluding(MultiDomainServerState excludedCSNs)
+  {
+    Pair<DN, CSN> oldest = Pair.empty();
+    for (Entry<DN, ServerState> entry : list.entrySet())
+    {
+      final DN baseDN = entry.getKey();
+      final ServerState value = entry.getValue();
+      for (Entry<Integer, CSN> entry2 : value.getServerIdToCSNMap().entrySet())
+      {
+        final CSN csn = entry2.getValue();
+        if (!isReplicaExcluded(excludedCSNs, baseDN, csn)
+            && (oldest == Pair.EMPTY || csn.isOlderThan(oldest.getSecond())))
+        {
+          oldest = Pair.of(baseDN, csn);
+        }
+      }
+    }
+    return oldest;
+  }
+
+  private boolean isReplicaExcluded(MultiDomainServerState excluded, DN baseDN,
+      CSN csn)
+  {
+    return excluded != null
+        && csn.equals(excluded.getCSN(baseDN, csn.getServerId()));
+  }
+
+  /**
    * Removes the mapping to the provided CSN if it is present in this
    * MultiDomainServerState.
    *
@@ -253,12 +296,8 @@
    */
   public boolean removeCSN(DN baseDN, CSN expectedCSN)
   {
-    ServerState ss = list.get(baseDN);
-    if (ss != null)
-    {
-      return ss.removeCSN(expectedCSN);
-    }
-    return false;
+    final ServerState ss = list.get(baseDN);
+    return ss != null && ss.removeCSN(expectedCSN);
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 2419dc4..8a068f6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -84,6 +84,8 @@
    * inserted in the DB. After insert, it is updated with the CSN of the change
    * currently processed (thus becoming the "current" cookie just before the
    * change is returned.
+   * <p>
+   * Note: This object is only updated by changes/updates.
    *
    * @see <a href=
    * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
@@ -91,15 +93,6 @@
    */
   private final MultiDomainServerState mediumConsistencyRUV =
       new MultiDomainServerState();
-  /**
-   * Holds the cross domain medium consistency baseDN and CSN for the current
-   * replication server.
-   *
-   * @see <a href=
-   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
-   * >OpenDJ Domain Names - medium consistency CSN</a>
-   */
-  private volatile Pair<DN, CSN> mediumConsistency;
 
   /**
    * Holds the last time each replica was seen alive, whether via updates or
@@ -108,9 +101,12 @@
    * <p>
    * Updates are persistent and stored in the replicaDBs, heartbeats are
    * transient and are easily constructed on normal operations.
+   * <p>
+   * Note: This object is updated by both heartbeats and changes/updates.
    */
   private final MultiDomainServerState lastAliveCSNs =
       new MultiDomainServerState();
+  /** Note: This object is updated by replica offline messages. */
   private final MultiDomainServerState replicasOffline =
       new MultiDomainServerState();
 
@@ -119,8 +115,12 @@
    * positioned on the next change that needs to be inserted in the CNIndexDB.
    * <p>
    * Note: it is only accessed from the {@link #run()} method.
+   *
+   * @NonNull
    */
-  private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
+  @SuppressWarnings("unchecked")
+  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
+      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
 
   /**
    * New cursors for this Map must be created from the {@link #run()} method,
@@ -182,8 +182,9 @@
       return;
     }
 
+    final CSN oldestCSNBefore = getOldestLastAliveCSN();
     lastAliveCSNs.update(baseDN, heartbeatCSN);
-    tryNotify();
+    tryNotify(oldestCSNBefore);
   }
 
   /**
@@ -222,8 +223,9 @@
     final CSN csn = updateMsg.getCSN();
     // only keep the oldest CSN that will be the new cursor's starting point
     newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
+    final CSN oldestCSNBefore = getOldestLastAliveCSN();
     lastAliveCSNs.update(baseDN, csn);
-    tryNotify();
+    tryNotify(oldestCSNBefore);
   }
 
   /**
@@ -254,18 +256,29 @@
    */
   public void replicaOffline(DN baseDN, CSN offlineCSN)
   {
+    if (!isECLEnabledDomain(baseDN))
+    {
+      return;
+    }
+
     replicasOffline.update(baseDN, offlineCSN);
+    final CSN oldestCSNBefore = getOldestLastAliveCSN();
     lastAliveCSNs.update(baseDN, offlineCSN);
-    tryNotify();
+    tryNotify(oldestCSNBefore);
+  }
+
+  private CSN getOldestLastAliveCSN()
+  {
+    return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
   }
 
   /**
    * Notifies the Change number indexer thread if it will be able to do some
    * work.
    */
-  private void tryNotify()
+  private void tryNotify(final CSN oldestCSNBefore)
   {
-    if (canMoveForwardMediumConsistencyPoint())
+    if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
     {
       synchronized (this)
       {
@@ -274,19 +287,32 @@
     }
   }
 
-  private boolean canMoveForwardMediumConsistencyPoint()
+  /**
+   * Used for waking up the {@link ChangeNumberIndexer} thread because it might
+   * have some work to do.
+   */
+  private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
   {
-    final Pair<DN, CSN> mc = mediumConsistency;
-    if (mc != null)
-    {
-      final CSN mcCSN = mc.getSecond();
-      final CSN lastTimeSameReplicaSeenAlive =
-          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
-      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
-    }
+    final CSN oldestCSNAfter = getOldestLastAliveCSN();
     // ensure that all initial replicas alive information have been updated
     // with CSNs that are acceptable for moving the medium consistency forward
-    return allInitialReplicasAreOfflineOrAlive();
+    return allInitialReplicasAreOfflineOrAlive()
+        && oldestCSNBefore != null // then oldestCSNAfter cannot be null
+        // has the oldest CSN changed?
+        && oldestCSNBefore.isOlderThan(oldestCSNAfter);
+  }
+
+  /**
+   * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
+   * must be persisted to the change number index DB.
+   */
+  private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
+  {
+    // ensure that all initial replicas alive information have been updated
+    // with CSNs that are acceptable for moving the medium consistency forward
+    return allInitialReplicasAreOfflineOrAlive()
+        // can we persist the next CSN?
+        && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
   }
 
   /**
@@ -299,8 +325,8 @@
    * CSN has been updated to something past the oldest possible CSN), we have
    * enough info to compute medium consistency</li>
    * </ul>
-   * In this case, we have enough information to compute medium consistency
-   * without waiting any more.
+   * In both cases, we have enough information to compute medium consistency
+   * without waiting any further.
    */
   private boolean allInitialReplicasAreOfflineOrAlive()
   {
@@ -308,11 +334,11 @@
     {
       for (CSN csn : lastAliveCSNs.getServerState(baseDN))
       {
-        if (// oldest possible CSN?
-            csn.getTime() == 0
-            // replica is not offline
+        if (csn.getTime() == 0
             && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
         {
+          // this is the oldest possible CSN, but the replica is not offline
+          // we must wait for more up to date information from this replica
           return false;
         }
       }
@@ -537,16 +563,19 @@
           final DN baseDN = nextChangeForInsertDBCursor.getData();
           // FIXME problem: what if the serverId is not part of the ServerState?
           // right now, change number will be blocked
-          if (!canMoveForwardMediumConsistencyPoint())
+          if (!canMoveForwardMediumConsistencyPoint(csn))
           {
             // the oldest record to insert is newer than the medium consistency
             // point. Let's wait for a change that can be published.
             synchronized (this)
             {
               // double check to protect against a missed call to notify()
-              if (!isShutdownInitiated()
-                  && !canMoveForwardMediumConsistencyPoint())
+              if (!canMoveForwardMediumConsistencyPoint(csn))
               {
+                if (isShutdownInitiated())
+                {
+                  return;
+                }
                 wait();
                 // loop to check if changes older than the medium consistency
                 // point have been added to the ReplicaDBs
@@ -601,10 +630,10 @@
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
       final DN mcBaseDN) throws ChangelogException
   {
-    boolean callNextOnCursor = true;
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
-    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
+
+    boolean callNextOnCursor = true;
     final int mcServerId = mcCSN.getServerId();
     final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
     final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
index 5f9d77e..09b6860 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
@@ -21,14 +21,17 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.common;
 
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.types.DN;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.forgerock.opendj.util.Pair;
+
 import static org.assertj.core.api.Assertions.*;
 import static org.testng.Assert.*;
 
@@ -40,6 +43,18 @@
   private static final CSN csn2 = new CSN(4, 5, 6);
   private static final CSN csn3 = new CSN(7, 8, 3);
 
+  private static DN dn1;
+  private static DN dn2;
+  private static DN dn3;
+
+  @BeforeClass
+  public void setBaseDNs() throws Exception
+  {
+    dn1 = DN.decode("o=test1");
+    dn2 = DN.decode("o=test2");
+    dn3 = DN.decode("o=test3");
+  }
+
   @Test
   public void testDecodeAndEncode1() throws Exception
   {
@@ -60,9 +75,6 @@
   @Test
   public void testUpdateCSN() throws Exception
   {
-    final DN dn1 = DN.decode("o=test1");
-    final DN dn2 = DN.decode("o=test2");
-
     final MultiDomainServerState state = new MultiDomainServerState();
     assertTrue(state.update(dn1, csn1));
     assertTrue(state.update(dn2, csn2));
@@ -77,9 +89,6 @@
   @Test
   public void testUpdateServerState() throws Exception
   {
-    final DN dn1 = DN.decode("o=test1");
-    final DN dn2 = DN.decode("o=test2");
-
     final MultiDomainServerState state = new MultiDomainServerState();
     final ServerState ss1 = new ServerState();
     assertTrue(ss1.update(csn3));
@@ -95,9 +104,6 @@
   @Test
   public void testUpdateMultiDomainServerState() throws Exception
   {
-    final DN dn1 = DN.decode("o=test1");
-    final DN dn2 = DN.decode("o=test2");
-
     final MultiDomainServerState state1 = new MultiDomainServerState();
     state1.update(dn1, csn3);
     state1.update(dn2, csn2);
@@ -112,9 +118,6 @@
   @Test(dependsOnMethods = { "testUpdateCSN" })
   public void testEqualsTo() throws Exception
   {
-    final DN dn1 = DN.decode("o=test1");
-    final DN dn2 = DN.decode("o=test2");
-
     final MultiDomainServerState state1 = new MultiDomainServerState();
     assertTrue(state1.update(dn1, csn3));
 
@@ -134,9 +137,6 @@
   @Test(dependsOnMethods = { "testUpdateCSN" })
   public void testIsEmpty() throws Exception
   {
-    final DN dn1 = DN.decode("o=test1");
-    final DN dn2 = DN.decode("o=test2");
-
     final MultiDomainServerState state = new MultiDomainServerState();
     assertTrue(state.isEmpty());
 
@@ -155,15 +155,7 @@
   @Test(dependsOnMethods = { "testUpdateCSN" })
   public void testRemoveCSN() throws Exception
   {
-    final DN dn1 = DN.decode("o=test1");
-    final DN dn2 = DN.decode("o=test2");
-    final DN dn3 = DN.decode("o=test3");
-
-    final MultiDomainServerState state = new MultiDomainServerState();
-
-    assertTrue(state.update(dn1, csn1));
-    assertTrue(state.update(dn2, csn1));
-    assertTrue(state.update(dn2, csn2));
+    final MultiDomainServerState state = getLastAliveCSNs();
     assertNull(state.getCSN(dn3, 42));
 
     assertFalse(state.removeCSN(dn3, csn1));
@@ -181,4 +173,71 @@
     assertNull(state.getCSN(dn2, csn1.getServerId()));
     assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
   }
+
+  private MultiDomainServerState getLastAliveCSNs()
+  {
+    final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
+    assertTrue(lastAliveCSNs.update(dn1, csn1));
+    assertTrue(lastAliveCSNs.update(dn2, csn1));
+    assertTrue(lastAliveCSNs.update(dn2, csn2));
+    return lastAliveCSNs;
+  }
+
+  @Test(dependsOnMethods = { "testUpdateCSN" })
+  public void testGetOldestCSNExcluding_null() throws Exception
+  {
+    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+    assertEquals(lastAliveCSNs.getOldestCSNExcluding(null), Pair.of(dn1, csn1));
+  }
+
+  @Test(dependsOnMethods = { "testUpdateCSN" })
+  public void testGetOldestCSNExcluding_empty() throws Exception
+  {
+    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+    final MultiDomainServerState excluded = new MultiDomainServerState();
+
+    assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn1, csn1));
+  }
+
+  @Test(dependsOnMethods = { "testUpdateCSN" })
+  public void testGetOldestCSNExcluding_currentOldestCSN_givesNewOldestCSN() throws Exception
+  {
+    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+
+    final MultiDomainServerState excluded = new MultiDomainServerState();
+    excluded.update(dn1, csn1);
+
+    assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
+  }
+
+  @Test(dependsOnMethods = { "testUpdateCSN" })
+  public void testGetOldestCSNExcluding_CSNOlderThanCurrentOldestCSN_givesNewOldestCSN() throws Exception
+  {
+    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+
+    final MultiDomainServerState excluded = new MultiDomainServerState();
+    excluded.update(dn1, csn1);
+    final CSN olderThanCSN1 = new CSN(0, 2, 3);
+    assertEquals(olderThanCSN1.getServerId(), csn1.getServerId());
+    assertTrue(olderThanCSN1.isOlderThan(csn1));
+    excluded.update(dn2, olderThanCSN1);
+
+    assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
+  }
+
+  @Test(dependsOnMethods = { "testUpdateCSN" })
+  public void testGetOldestCSNExcluding_CSNNewerThanCurrentOldestCSN_givesNewOldestCSN() throws Exception
+  {
+    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+
+    final MultiDomainServerState excluded = new MultiDomainServerState();
+    excluded.update(dn1, csn1);
+    final CSN newerThanCSN1 = new CSN(42, 2, 3);
+    assertEquals(newerThanCSN1.getServerId(), csn1.getServerId());
+    assertTrue(newerThanCSN1.isNewerThan(csn1));
+    excluded.update(dn2, newerThanCSN1);
+
+    assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
+  }
+
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 05e9770..b91bdc1 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -355,6 +355,7 @@
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
+    assertExternalChangelogContent(msg1);
 
     addReplica(BASE_DN1, serverId2);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -486,19 +487,19 @@
     assertExternalChangelogContent(msg1);
 
     // do not wait for temporarily offline serverId1
-    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
-    publishUpdateMsg(msg3);
-    assertExternalChangelogContent(msg1, msg3);
+    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
+    publishUpdateMsg(msg4);
+    assertExternalChangelogContent(msg1, msg4);
 
     // serverId1 is back online, wait for changes from serverId2
-    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
-    publishUpdateMsg(msg4);
-    assertExternalChangelogContent(msg1, msg3);
-
-    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
+    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
     publishUpdateMsg(msg5);
+    assertExternalChangelogContent(msg1, msg4);
+
+    final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
+    publishUpdateMsg(msg6);
     // MCP moves forward
-    assertExternalChangelogContent(msg1, msg3, msg4);
+    assertExternalChangelogContent(msg1, msg4, msg5);
   }
 
   /**
@@ -525,16 +526,39 @@
     assertExternalChangelogContent();
 
     // MCP moves forward because serverId1 is not really offline
-    // since because we received a message from it after the offline replica msg
+    // since we received a message from it newer than the offline replica msg
     final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
     publishUpdateMsg(msg4);
     assertExternalChangelogContent(msg2, msg3);
 
     // back to normal operations
-    sendHeartbeat(BASE_DN1, serverId1, 4);
+    sendHeartbeat(BASE_DN1, serverId1, 5);
     assertExternalChangelogContent(msg2, msg3, msg4);
   }
 
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoDSsOneKilled() throws Exception
+  {
+    addReplica(BASE_DN1, serverId1);
+    addReplica(BASE_DN1, serverId2);
+    startCNIndexer(BASE_DN1);
+    assertExternalChangelogContent();
+
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+    publishUpdateMsg(msg1);
+    // MCP cannot move forward: no news yet from serverId2
+    assertExternalChangelogContent();
+
+    sendHeartbeat(BASE_DN1, serverId2, 2);
+    // MCP moves forward: we know what serverId2 is at
+    assertExternalChangelogContent(msg1);
+
+    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+    publishUpdateMsg(msg3);
+    // MCP cannot move forward: serverId2 is the oldest CSN
+    assertExternalChangelogContent(msg1);
+  }
+
   private void addReplica(DN baseDN, int serverId) throws Exception
   {
     final SequentialDBCursor cursor = new SequentialDBCursor();

--
Gitblit v1.10.0