From 89f7cfbe75c879e8faa5b8b327e4c6d9fc2713eb Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 22 Nov 2013 09:24:10 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java                                       |   20 +++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |   55 ++++++++++++-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java           |   46 +++++++++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java                      |   16 ++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |   58 +++++++++++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java                                                  |   30 +++++++
 6 files changed, 216 insertions(+), 9 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index 7afe2d2..4186d3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -209,6 +209,26 @@
   }
 
   /**
+   * Removes the mapping to the provided CSN if it is present in this
+   * MultiDomainServerState.
+   *
+   * @param baseDN
+   *          the replication domain's baseDN
+   * @param expectedCSN
+   *          the CSN to be removed
+   * @return true if the CSN could be removed, false otherwise.
+   */
+  public boolean removeCSN(DN baseDN, CSN expectedCSN)
+  {
+    ServerState ss = list.get(baseDN);
+    if (ss != null)
+    {
+      return ss.removeCSN(expectedCSN);
+    }
+    return false;
+  }
+
+  /**
    * Test if this object equals the provided other object.
    * @param other The other object with which we want to test equality.
    * @return      Returns True if this equals other, else return false.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index ae33dff..edfb1a9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -192,6 +192,36 @@
   }
 
   /**
+   * Removes the mapping to the provided CSN if it is present in this
+   * ServerState.
+   *
+   * @param expectedCSN
+   *          the CSN to be removed
+   * @return true if the CSN could be removed, false otherwise.
+   */
+  public boolean removeCSN(CSN expectedCSN)
+  {
+    if (expectedCSN == null)
+      return false;
+
+    synchronized (serverIdToCSN)
+    {
+      for (Iterator<CSN> iter = serverIdToCSN.values().iterator();
+          iter.hasNext();)
+      {
+        final CSN csn = iter.next();
+        if (expectedCSN.equals(csn))
+        {
+          iter.remove();
+          saved = false;
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
    * Replace the Server State with another ServerState.
    *
    * @param serverState The ServerState.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index dbb68f7..93b8cd3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -64,8 +64,7 @@
   /*
    * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
    * 1) initialization can happen while the replication server starts receiving
-   * updates 2) many updates can happen concurrently. This solution also avoids
-   * using a queue that could fill up before we have consumed all its content.
+   * updates 2) many updates can happen concurrently.
    */
   /**
    * Holds the cross domain medium consistency Replication Update Vector for the
@@ -98,6 +97,8 @@
    */
   private final MultiDomainServerState lastSeenUpdates =
       new MultiDomainServerState();
+  private final MultiDomainServerState replicasOffline =
+      new MultiDomainServerState();
 
   /**
    * Composite cursor across all the replicaDBs for all the replication domains.
@@ -168,6 +169,21 @@
   }
 
   /**
+   * Signals a replica went offline.
+   *
+   * @param baseDN
+   *          the replica's replication domain
+   * @param offlineCSN
+   *          the serverId and time of the replica that went offline
+   */
+  public void replicaOffline(DN baseDN, CSN offlineCSN)
+  {
+    lastSeenUpdates.update(baseDN, offlineCSN);
+    replicasOffline.update(baseDN, offlineCSN);
+    tryNotify(baseDN);
+  }
+
+  /**
    * Notifies the Change number indexer thread if it will be able to do some
    * work.
    */
@@ -187,8 +203,8 @@
     final CSN mcCSN = mediumConsistencyCSN;
     if (mcCSN != null)
     {
-      final CSN lastSeenSameServerId =
-          lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
+      final int serverId = mcCSN.getServerId();
+      final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
       return mcCSN.isOlderThan(lastSeenSameServerId);
     }
     return true;
@@ -374,6 +390,37 @@
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(baseDN, csn);
     mediumConsistencyCSN = csn;
+    final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
+    if (offlineCSN != null
+        && offlineCSN.isOlderThan(mediumConsistencyCSN)
+        // If no new updates has been seen for this replica
+        && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
+    {
+      removeCursor(baseDN, csn);
+      replicasOffline.removeCSN(baseDN, offlineCSN);
+      mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
+    }
+  }
+
+  private void removeCursor(final DN baseDN, final CSN csn)
+  {
+    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
+        .entrySet())
+    {
+      if (baseDN.equals(entry.getKey()))
+      {
+        final Set<Integer> serverIds = entry.getValue().keySet();
+        for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
+        {
+          final int serverId = iter.next();
+          if (csn.getServerId() == serverId)
+          {
+            iter.remove();
+            return;
+          }
+        }
+      }
+    }
   }
 
   private void createNewCursors() throws ChangelogException
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
index 1d58eca..928edde 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
@@ -93,6 +93,23 @@
     assertEquals(state.toString(), expected);
   }
 
+  @Test
+  public void testUpdateMultiDomainServerState() throws Exception
+  {
+    final DN dn1 = DN.decode("o=test1");
+    final DN dn2 = DN.decode("o=test2");
+
+    final MultiDomainServerState state1 = new MultiDomainServerState();
+    state1.update(dn1, csn3);
+    state1.update(dn2, csn2);
+    final MultiDomainServerState state2 = new MultiDomainServerState();
+    state2.update(state1);
+
+    assertSame(csn3, state2.getCSN(dn1, csn3.getServerId()));
+    assertSame(csn2, state2.getCSN(dn2, csn2.getServerId()));
+    assertTrue(state1.equalsTo(state2));
+  }
+
   @Test(dependsOnMethods = { "testUpdateCSN" })
   public void testEqualsTo() throws Exception
   {
@@ -136,4 +153,33 @@
     assertTrue(state.isEmpty());
   }
 
+  @Test(dependsOnMethods = { "testUpdateCSN" })
+  public void testRemoveCSN() throws Exception
+  {
+    final DN dn1 = DN.decode("o=test1");
+    final DN dn2 = DN.decode("o=test2");
+    final DN dn3 = DN.decode("o=test3");
+
+    final MultiDomainServerState state = new MultiDomainServerState();
+
+    assertTrue(state.update(dn1, csn1));
+    assertTrue(state.update(dn2, csn1));
+    assertTrue(state.update(dn2, csn2));
+    assertNull(state.getCSN(dn3, 42));
+
+    assertFalse(state.removeCSN(dn3, csn1));
+    assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
+    assertSame(csn1, state.getCSN(dn2, csn1.getServerId()));
+    assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
+
+    assertFalse(state.removeCSN(dn1, csn2));
+    assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
+    assertSame(csn1, state.getCSN(dn2, csn1.getServerId()));
+    assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
+
+    assertTrue(state.removeCSN(dn2, csn1));
+    assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
+    assertNull(state.getCSN(dn2, csn1.getServerId()));
+    assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
index 73beefa..4d48ca5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -152,4 +152,20 @@
     assertTrue(state.cover(csn1Server2));
     assertFalse(state.cover(csn0Server3));
   }
+
+  @Test
+  public void testRemoveCSN() throws Exception
+  {
+    final CSN csn1Server1 = new CSN(1, 0, 1);
+    final CSN csn2Server1 = new CSN(2, 0, 1);
+    final CSN csn1Server2 = new CSN(1, 0, 2);
+
+    final ServerState state = new ServerState();
+    assertTrue(state.update(csn1Server1));
+
+    assertFalse(state.removeCSN(null));
+    assertFalse(state.removeCSN(csn2Server1));
+    assertFalse(state.removeCSN(csn1Server2));
+    assertTrue(state.removeCSN(csn1Server1));
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 8cab868..5791d30 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -153,7 +153,6 @@
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
     publishUpdateMsg(msg1);
-
     assertAddedRecords(msg1);
   }
 
@@ -167,7 +166,6 @@
 
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
     publishUpdateMsg(msg2);
-
     assertAddedRecords(msg2);
   }
 
@@ -181,7 +179,6 @@
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
     publishUpdateMsg(msg2, msg1);
-
     assertAddedRecords(msg1);
   }
 
@@ -210,7 +207,7 @@
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
-  public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception
+  public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
   {
     addReplica(BASE_DN, serverId1);
     addReplica(BASE_DN, serverId2);
@@ -222,7 +219,6 @@
     final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
     // simulate no messages received during some time for replica 2
     publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
-
     assertAddedRecords(msg1Sid2, msg2Sid1);
   }
 
@@ -245,6 +241,46 @@
     assertAddedRecords(msg1, msg2);
   }
 
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
+  {
+    addReplica(BASE_DN, serverId1);
+    addReplica(BASE_DN, serverId2);
+    startIndexer();
+
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+    publishUpdateMsg(msg1, msg2);
+    assertAddedRecords(msg1);
+
+    sendHeartbeat(BASE_DN, serverId1, 3);
+    assertAddedRecords(msg1, msg2);
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
+  {
+    addReplica(BASE_DN, serverId1);
+    addReplica(BASE_DN, serverId2);
+    startIndexer();
+
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+    publishUpdateMsg(msg1, msg2);
+    assertAddedRecords(msg1);
+
+    replicaOffline(BASE_DN, serverId2, 3);
+    // MCP cannot move forward since no new updates from serverId1
+    assertAddedRecords(msg1);
+
+    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
+    publishUpdateMsg(msg4);
+    // MCP moved forward after receiving update from serverId1
+    // (last replica in the domain)
+    assertAddedRecords(msg1, msg2, msg4);
+  }
+
+
   private void addReplica(DN baseDN, int serverId) throws Exception
   {
     final SequentialDBCursor cursor = new SequentialDBCursor();
@@ -320,6 +356,18 @@
     waitForWaitingState(indexer);
   }
 
+  private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception
+  {
+    indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
+    waitForWaitingState(indexer);
+  }
+
+  private void replicaOffline(DN baseDN, int serverId, int time) throws Exception
+  {
+    indexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
+    waitForWaitingState(indexer);
+  }
+
   private void waitForWaitingState(final Thread t)
   {
     State state = t.getState();

--
Gitblit v1.10.0