mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
22.24.2013 9a13d05fcb1b17c52c7b91b8445d334bce3f9e28
Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB


Properly implemented the medium consistency point algorithm with heartbeats and replicas going offline.


ChangeNumberIndexer.java:
Added replicasOffline instance field.
Added methods replicaOffline() and removeCursor().
In moveForwardMediumConsistencyPoint(), updated the code to cater for offline replica.

ChangeNumberIndexerTest.java:
Added new tests for heartbeat and replica going offline.


MultiDomainServerState.java, ServerState.java:
Added removeCSN().

MultiDomainServerStateTest.java, ServerStateTest.java:
Added more tests.
6 files modified
225 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 30 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 55 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java 46 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java 16 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 58 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -209,6 +209,26 @@
  }
  /**
   * Removes the mapping to the provided CSN if it is present in this
   * MultiDomainServerState.
   *
   * @param baseDN
   *          the replication domain's baseDN
   * @param expectedCSN
   *          the CSN to be removed
   * @return true if the CSN could be removed, false otherwise.
   */
  public boolean removeCSN(DN baseDN, CSN expectedCSN)
  {
    ServerState ss = list.get(baseDN);
    if (ss != null)
    {
      return ss.removeCSN(expectedCSN);
    }
    return false;
  }
  /**
   * Test if this object equals the provided other object.
   * @param other The other object with which we want to test equality.
   * @return      Returns True if this equals other, else return false.
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -192,6 +192,36 @@
  }
  /**
   * Removes the mapping to the provided CSN if it is present in this
   * ServerState.
   *
   * @param expectedCSN
   *          the CSN to be removed
   * @return true if the CSN could be removed, false otherwise.
   */
  public boolean removeCSN(CSN expectedCSN)
  {
    if (expectedCSN == null)
      return false;
    synchronized (serverIdToCSN)
    {
      for (Iterator<CSN> iter = serverIdToCSN.values().iterator();
          iter.hasNext();)
      {
        final CSN csn = iter.next();
        if (expectedCSN.equals(csn))
        {
          iter.remove();
          saved = false;
          return true;
        }
      }
    }
    return false;
  }
  /**
   * Replace the Server State with another ServerState.
   *
   * @param serverState The ServerState.
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -64,8 +64,7 @@
  /*
   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
   * 1) initialization can happen while the replication server starts receiving
   * updates 2) many updates can happen concurrently. This solution also avoids
   * using a queue that could fill up before we have consumed all its content.
   * updates 2) many updates can happen concurrently.
   */
  /**
   * Holds the cross domain medium consistency Replication Update Vector for the
@@ -98,6 +97,8 @@
   */
  private final MultiDomainServerState lastSeenUpdates =
      new MultiDomainServerState();
  private final MultiDomainServerState replicasOffline =
      new MultiDomainServerState();
  /**
   * Composite cursor across all the replicaDBs for all the replication domains.
@@ -168,6 +169,21 @@
  }
  /**
   * Signals a replica went offline.
   *
   * @param baseDN
   *          the replica's replication domain
   * @param offlineCSN
   *          the serverId and time of the replica that went offline
   */
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  {
    lastSeenUpdates.update(baseDN, offlineCSN);
    replicasOffline.update(baseDN, offlineCSN);
    tryNotify(baseDN);
  }
  /**
   * Notifies the Change number indexer thread if it will be able to do some
   * work.
   */
@@ -187,8 +203,8 @@
    final CSN mcCSN = mediumConsistencyCSN;
    if (mcCSN != null)
    {
      final CSN lastSeenSameServerId =
          lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
      final int serverId = mcCSN.getServerId();
      final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
      return mcCSN.isOlderThan(lastSeenSameServerId);
    }
    return true;
@@ -374,6 +390,37 @@
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(baseDN, csn);
    mediumConsistencyCSN = csn;
    final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
    if (offlineCSN != null
        && offlineCSN.isOlderThan(mediumConsistencyCSN)
        // If no new updates has been seen for this replica
        && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
    {
      removeCursor(baseDN, csn);
      replicasOffline.removeCSN(baseDN, offlineCSN);
      mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
    }
  }
  private void removeCursor(final DN baseDN, final CSN csn)
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
        .entrySet())
    {
      if (baseDN.equals(entry.getKey()))
      {
        final Set<Integer> serverIds = entry.getValue().keySet();
        for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
        {
          final int serverId = iter.next();
          if (csn.getServerId() == serverId)
          {
            iter.remove();
            return;
          }
        }
      }
    }
  }
  private void createNewCursors() throws ChangelogException
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
@@ -93,6 +93,23 @@
    assertEquals(state.toString(), expected);
  }
  @Test
  public void testUpdateMultiDomainServerState() throws Exception
  {
    final DN dn1 = DN.decode("o=test1");
    final DN dn2 = DN.decode("o=test2");
    final MultiDomainServerState state1 = new MultiDomainServerState();
    state1.update(dn1, csn3);
    state1.update(dn2, csn2);
    final MultiDomainServerState state2 = new MultiDomainServerState();
    state2.update(state1);
    assertSame(csn3, state2.getCSN(dn1, csn3.getServerId()));
    assertSame(csn2, state2.getCSN(dn2, csn2.getServerId()));
    assertTrue(state1.equalsTo(state2));
  }
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testEqualsTo() throws Exception
  {
@@ -136,4 +153,33 @@
    assertTrue(state.isEmpty());
  }
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testRemoveCSN() throws Exception
  {
    final DN dn1 = DN.decode("o=test1");
    final DN dn2 = DN.decode("o=test2");
    final DN dn3 = DN.decode("o=test3");
    final MultiDomainServerState state = new MultiDomainServerState();
    assertTrue(state.update(dn1, csn1));
    assertTrue(state.update(dn2, csn1));
    assertTrue(state.update(dn2, csn2));
    assertNull(state.getCSN(dn3, 42));
    assertFalse(state.removeCSN(dn3, csn1));
    assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
    assertSame(csn1, state.getCSN(dn2, csn1.getServerId()));
    assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
    assertFalse(state.removeCSN(dn1, csn2));
    assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
    assertSame(csn1, state.getCSN(dn2, csn1.getServerId()));
    assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
    assertTrue(state.removeCSN(dn2, csn1));
    assertSame(csn1, state.getCSN(dn1, csn1.getServerId()));
    assertNull(state.getCSN(dn2, csn1.getServerId()));
    assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -152,4 +152,20 @@
    assertTrue(state.cover(csn1Server2));
    assertFalse(state.cover(csn0Server3));
  }
  @Test
  public void testRemoveCSN() throws Exception
  {
    final CSN csn1Server1 = new CSN(1, 0, 1);
    final CSN csn2Server1 = new CSN(2, 0, 1);
    final CSN csn1Server2 = new CSN(1, 0, 2);
    final ServerState state = new ServerState();
    assertTrue(state.update(csn1Server1));
    assertFalse(state.removeCSN(null));
    assertFalse(state.removeCSN(csn2Server1));
    assertFalse(state.removeCSN(csn1Server2));
    assertTrue(state.removeCSN(csn1Server1));
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -153,7 +153,6 @@
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    publishUpdateMsg(msg1);
    assertAddedRecords(msg1);
  }
@@ -167,7 +166,6 @@
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
    publishUpdateMsg(msg2);
    assertAddedRecords(msg2);
  }
@@ -181,7 +179,6 @@
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg2, msg1);
    assertAddedRecords(msg1);
  }
@@ -210,7 +207,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception
  public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
@@ -222,7 +219,6 @@
    final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
    // simulate no messages received during some time for replica 2
    publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
    assertAddedRecords(msg1Sid2, msg2Sid1);
  }
@@ -245,6 +241,46 @@
    assertAddedRecords(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertAddedRecords(msg1);
    sendHeartbeat(BASE_DN, serverId1, 3);
    assertAddedRecords(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertAddedRecords(msg1);
    replicaOffline(BASE_DN, serverId2, 3);
    // MCP cannot move forward since no new updates from serverId1
    assertAddedRecords(msg1);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
    publishUpdateMsg(msg4);
    // MCP moved forward after receiving update from serverId1
    // (last replica in the domain)
    assertAddedRecords(msg1, msg2, msg4);
  }
  private void addReplica(DN baseDN, int serverId) throws Exception
  {
    final SequentialDBCursor cursor = new SequentialDBCursor();
@@ -320,6 +356,18 @@
    waitForWaitingState(indexer);
  }
  private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception
  {
    indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(indexer);
  }
  private void replicaOffline(DN baseDN, int serverId, int time) throws Exception
  {
    indexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(indexer);
  }
  private void waitForWaitingState(final Thread t)
  {
    State state = t.getState();