mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.05.2014 9737dd7d653611c9d9ee38640685a68d43abbca4
OPENDJ-1440 On startup, change number can progress without waiting for any heartbeat from known replicas

On startup, if a replication server knows about several replicas, then it must wait to receive some sort of alive information for each of them before being able to move the medium consistency forward.
Changes or heartbeats received after replication server started are acceptable.
Likewise, changes that would have been received before the replication server stopped are also acceptable.

This was fixed on replication server startup, by initializing the lastAliveCSN for each known replica, with the oldest possible CSN (timestamp == 0).
Then when checking if the medium consistency can move forward, if no medium consistency is set, then the lastAliveCSN for each known replica must have a timestamp != 0.


ChangeNumberIndexer.java:
In canMoveForwardMediumConsistencyPoint(), call allInitialReplicasArePastOldestPossibleCSN() if the medium consistency CSN is not set.
Added methods oldestPossibleCSN(), allInitialReplicasAreAlive().

ChangeNumberIndexerTest.java:
In emptyDBTwoInitialDSs(), slightly modified the code to test current bug.
In startCNIndexer(), added the initial ECL enabled domains as a parameter.
2 files modified
69 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 30 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 39 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -267,6 +267,23 @@
          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
    }
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasArePastOldestPossibleCSN();
  }
  private boolean allInitialReplicasArePastOldestPossibleCSN()
  {
    for (DN baseDN : lastAliveCSNs)
    {
      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
      {
        if (csn.getTime() == 0)
        {
          return false;
        }
      }
    }
    return true;
  }
@@ -301,8 +318,14 @@
      for (Integer serverId : entry.getValue())
      {
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
        /*
         * initialize with the oldest possible CSN in order for medium
         * consistency to wait for all replicas to be alive before moving
         * forward
         */
        lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
        // start after the actual CSN when initializing from the previous cookie
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
        ensureCursorExists(baseDN, serverId, csn);
      }
@@ -332,6 +355,11 @@
    this.changelogState = null;
  }
  private CSN oldestPossibleCSN(int serverId)
  {
    return new CSN(0, 0, serverId);
  }
  private void resetNextChangeForInsertDBCursor() throws ChangelogException
  {
    final Map<DBCursor<UpdateMsg>, DN> cursors =
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -150,7 +150,7 @@
  @Test
  public void emptyDBNoDS() throws Exception
  {
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
  }
@@ -158,7 +158,7 @@
  public void emptyDBOneInitialDS() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -171,7 +171,7 @@
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
    setDBInitialRecords(msg1);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    publishUpdateMsg(msg2);
@@ -183,11 +183,15 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    // simulate messages received out of order
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2, msg1);
    publishUpdateMsg(msg2);
    // do not start publishing to the changelog until we hear from serverId1
    assertExternalChangelogContent();
    publishUpdateMsg(msg1);
    assertExternalChangelogContent(msg1);
  }
@@ -196,7 +200,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN2, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1, BASE_DN2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
@@ -215,7 +219,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    setDBInitialRecords(msg1, msg2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
@@ -236,7 +240,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
@@ -253,7 +257,7 @@
    addReplica(ADMIN_DATA_DN, serverId1);
    addReplica(BASE_DN1, serverId2);
    addReplica(BASE_DN1, serverId3);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    // cn=admin data will does not participate in the external changelog
    // so it cannot add to it
@@ -271,7 +275,7 @@
  public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -291,7 +295,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -307,7 +311,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -335,14 +339,15 @@
    initialState.addServerIdToDomain(serverId, baseDN);
  }
  private void startCNIndexer()
  private void startCNIndexer(DN... eclEnabledDomains)
  {
    final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
    {
      @Override
      protected boolean isECLEnabledDomain(DN baseDN)
      {
        return BASE_DN1.equals(baseDN) || BASE_DN2.equals(baseDN);
        return eclEnabledDomainList.contains(baseDN);
      }
    };
    cnIndexer.start();
@@ -447,12 +452,6 @@
  private void assertExternalChangelogContent(ReplicatedUpdateMsg... expectedMsgs)
      throws Exception
  {
    if (expectedMsgs.length == 0)
    {
      verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
      return;
    }
    final ArgumentCaptor<ChangeNumberIndexRecord> arg =
        ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
    verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());