mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.44.2014 6aa4fa5b4f71e830dba55f3ea3f9530737db2d8b
OPENDJ-1439 Change number stops progressing with cross domain replication 

The changeNumber progression was blocked when the mediumConsistencyCSN was from a different baseDN than the new change to add to the changeNumber index DB.
Fixed this problem by also storing the baseDN of the mediumConsistencyCSN in the ChangeNumberIndexer class.


ChangeNumberIndexer.java:
Renamed mediumConsistencyCSN to mediumConsistency and changed its type from CSN to Pair<DN, CSN>.
In tryNotify(), removed the now useless baseDN parameter (replaced with mediumConsistency field).
Improved comments.

ChangeNumberIndexerTest.java:
Renamed BASE_DN to BASE_DN1.
Added BASE_DN2 and test emptyDBTwoInitialDSsDifferentDomains().
In assertExternalChangelogContent(), did some renaming.

SequentialDBCursor.java:
Improved toString().

ChangeNumberIndexDB.java:
Removed obsolete FIXME.
4 files modified
201 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java 5 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 56 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 135 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
@@ -74,9 +74,6 @@
   * by this DB and return the changeNumber associated to this record.
   * <p>
   * Note: this method disregards the changeNumber in the provided record.
   * <p>
   * FIXME will be removed when ECLServerHandler will not be responsible anymore
   * for lazily building the ChangeNumberIndexDB.
   *
   * @param record
   *          The {@link ChangeNumberIndexRecord} to add to this DB.
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -92,14 +92,14 @@
  private final MultiDomainServerState mediumConsistencyRUV =
      new MultiDomainServerState();
  /**
   * Holds the cross domain medium consistency CSN for the current replication
   * server.
   * Holds the cross domain medium consistency baseDN and CSN for the current
   * replication server.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
   * >OpenDJ Domain Names - medium consistency CSN</a>
   */
  private volatile CSN mediumConsistencyCSN;
  private volatile Pair<DN, CSN> mediumConsistency;
  /**
   * Holds the last time each replica was seen alive, whether via updates or
@@ -182,7 +182,7 @@
    }
    lastAliveCSNs.update(baseDN, heartbeatCSN);
    tryNotify(baseDN);
    tryNotify();
  }
  /**
@@ -207,7 +207,7 @@
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    lastAliveCSNs.update(baseDN, csn);
    tryNotify(baseDN);
    tryNotify();
  }
  /**
@@ -239,16 +239,16 @@
  {
    replicasOffline.update(baseDN, offlineCSN);
    lastAliveCSNs.update(baseDN, offlineCSN);
    tryNotify(baseDN);
    tryNotify();
  }
  /**
   * Notifies the Change number indexer thread if it will be able to do some
   * work.
   */
  private void tryNotify(DN baseDN)
  private void tryNotify()
  {
    if (canMoveForwardMediumConsistencyPoint(baseDN))
    if (canMoveForwardMediumConsistencyPoint())
    {
      synchronized (this)
      {
@@ -257,13 +257,14 @@
    }
  }
  private boolean canMoveForwardMediumConsistencyPoint(DN baseDN)
  private boolean canMoveForwardMediumConsistencyPoint()
  {
    final CSN mcCSN = mediumConsistencyCSN;
    if (mcCSN != null)
    final Pair<DN, CSN> mc = mediumConsistency;
    if (mc != null)
    {
      final int serverId = mcCSN.getServerId();
      CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
      final CSN mcCSN = mc.getSecond();
      final CSN lastTimeSameReplicaSeenAlive =
          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
    }
    return true;
@@ -441,7 +442,8 @@
              }
              wait();
            }
            // advance cursor, success/failure will be checked later
            // try to recycle the exhausted cursors,
            // success/failure will be checked later
            nextChangeForInsertDBCursor.next();
            // loop to check whether new changes have been added to the
            // ReplicaDBs
@@ -452,7 +454,7 @@
          final DN baseDN = nextChangeForInsertDBCursor.getData();
          // FIXME problem: what if the serverId is not part of the ServerState?
          // right now, change number will be blocked
          if (!canMoveForwardMediumConsistencyPoint(baseDN))
          if (!canMoveForwardMediumConsistencyPoint())
          {
            // the oldest record to insert is newer than the medium consistency
            // point. Let's wait for a change that can be published.
@@ -460,7 +462,7 @@
            {
              // double check to protect against a missed call to notify()
              if (!isShutdownInitiated()
                  && !canMoveForwardMediumConsistencyPoint(baseDN))
                  && !canMoveForwardMediumConsistencyPoint())
              {
                wait();
                // loop to check if changes older than the medium consistency
@@ -479,7 +481,8 @@
          changelogDB.getChangeNumberIndexDB().addRecord(record);
          moveForwardMediumConsistencyPoint(csn, baseDN);
          // advance cursor, success/failure will be checked later
          // advance the cursor we just read from,
          // success/failure will be checked later
          nextChangeForInsertDBCursor.next();
        }
        catch (InterruptedException ignored)
@@ -517,20 +520,21 @@
    }
  }
  private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN)
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN)
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(baseDN, csn);
    mediumConsistencyCSN = csn;
    final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
    if (offlineCSN != null
        && offlineCSN.isOlderThan(mediumConsistencyCSN)
        && offlineCSN.isOlderThan(mcCSN)
        // If no new updates has been seen for this replica
        && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
        && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
    {
      removeCursor(baseDN, csn);
      replicasOffline.removeCSN(baseDN, offlineCSN);
      mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
      removeCursor(mcBaseDN, mcCSN);
      replicasOffline.removeCSN(mcBaseDN, offlineCSN);
      mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
    }
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -88,11 +88,15 @@
    @Override
    public String toString()
    {
      return "csn=" + getCSN() + ", baseDN=" + baseDN;
      return "UpdateMsg("
          + "\"" + baseDN + " " + getCSN().getServerId() + "\""
          + ", csn=" + getCSN().toStringUI()
          + ")";
    }
  }
  private static DN BASE_DN;
  private static DN BASE_DN1;
  private static DN BASE_DN2;
  private static DN ADMIN_DATA_DN;
  private static final int serverId1 = 101;
  private static final int serverId2 = 102;
@@ -111,7 +115,8 @@
  public static void classSetup() throws Exception
  {
    TestCaseUtils.startFakeServer();
    BASE_DN = DN.decode("dc=example,dc=com");
    BASE_DN1 = DN.decode("dc=example,dc=com");
    BASE_DN2 = DN.decode("dc=world,dc=company");
    ADMIN_DATA_DN = DN.decode("cn=admin data");
  }
@@ -152,10 +157,10 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDS() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN1, serverId1);
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    assertExternalChangelogContent(msg1);
  }
@@ -163,12 +168,12 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBOneInitialDS() throws Exception
  {
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    addReplica(BASE_DN, serverId1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
    setDBInitialRecords(msg1);
    startCNIndexer();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    publishUpdateMsg(msg2);
    assertExternalChangelogContent(msg2);
  }
@@ -176,36 +181,52 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSs() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2, msg1);
    assertExternalChangelogContent(msg1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsDifferentDomains() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN2, serverId2);
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertExternalChangelogContent(msg1);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBTwoInitialDSs() throws Exception
  {
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    setDBInitialRecords(msg1, msg2);
    startCNIndexer();
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
    publishUpdateMsg(msg3, msg4);
    assertExternalChangelogContent(msg3);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
    publishUpdateMsg(msg5);
    assertExternalChangelogContent(msg3);
    final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
    final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
    publishUpdateMsg(msg6);
    assertExternalChangelogContent(msg3, msg4, msg5);
  }
@@ -213,14 +234,14 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1);
    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2);
    final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN, serverId1, 2);
    final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
    final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN1, serverId1, 2);
    final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN1, serverId2, 3);
    // simulate no messages received during some time for replica 2
    publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
    assertExternalChangelogContent(msg1Sid2, msg2Sid1);
@@ -230,8 +251,8 @@
  public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
  {
    addReplica(ADMIN_DATA_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    addReplica(BASE_DN, serverId3);
    addReplica(BASE_DN1, serverId2);
    addReplica(BASE_DN1, serverId3);
    startCNIndexer();
    // cn=admin data will does not participate in the external changelog
@@ -240,8 +261,8 @@
    publishUpdateMsg(msg1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId3, 3);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId3, 3);
    publishUpdateMsg(msg2, msg3);
    assertExternalChangelogContent(msg2);
  }
@@ -249,18 +270,18 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN1, serverId1);
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    addReplica(BASE_DN, serverId2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    addReplica(BASE_DN1, serverId2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2);
    assertExternalChangelogContent(msg1);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg1, msg2);
  }
@@ -268,36 +289,36 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertExternalChangelogContent(msg1);
    sendHeartbeat(BASE_DN, serverId1, 3);
    sendHeartbeat(BASE_DN1, serverId1, 3);
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertExternalChangelogContent(msg1);
    replicaOffline(BASE_DN, serverId2, 3);
    replicaOffline(BASE_DN1, serverId2, 3);
    // MCP cannot move forward since no new updates from serverId1
    assertExternalChangelogContent(msg1);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
    publishUpdateMsg(msg4);
    // MCP moved forward after receiving update from serverId1
    // (last replica in the domain)
@@ -321,7 +342,7 @@
      @Override
      protected boolean isECLEnabledDomain(DN baseDN)
      {
        return BASE_DN.equals(baseDN);
        return BASE_DN1.equals(baseDN) || BASE_DN2.equals(baseDN);
      }
    };
    cnIndexer.start();
@@ -423,10 +444,10 @@
   * Asserts which records have been added to the CNIndexDB since starting the
   * {@link ChangeNumberIndexer} thread.
   */
  private void assertExternalChangelogContent(ReplicatedUpdateMsg... msgs)
  private void assertExternalChangelogContent(ReplicatedUpdateMsg... expectedMsgs)
      throws Exception
  {
    if (msgs.length == 0)
    if (expectedMsgs.length == 0)
    {
      verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
      return;
@@ -441,18 +462,18 @@
    final MultiDomainServerState previousCookie =
        new MultiDomainServerState(initialCookie.toString());
    // check it was not called more than expected
    String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(msgs) + ">";
    assertThat(allValues.size()).as(desc1).isEqualTo(msgs.length);
    for (int i = 0; i < msgs.length; i++)
    String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(expectedMsgs) + ">";
    assertThat(allValues).as(desc1).hasSize(expectedMsgs.length);
    for (int i = 0; i < expectedMsgs.length; i++)
    {
      final ReplicatedUpdateMsg msg = msgs[i];
      final ReplicatedUpdateMsg expectedMsg = expectedMsgs[i];
      final ChangeNumberIndexRecord record = allValues.get(i);
      // check content in order
      String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
      assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
      assertThat(record.getCSN()).as(desc2).isEqualTo(msg.getCSN());
      String desc2 = "actual was:<" + record + ">, but expected was:<" + expectedMsg + ">";
      assertThat(record.getBaseDN()).as(desc2).isEqualTo(expectedMsg.getBaseDN());
      assertThat(record.getCSN()).as(desc2).isEqualTo(expectedMsg.getCSN());
      assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString());
      previousCookie.update(msg.getBaseDN(), msg.getCSN());
      previousCookie.update(expectedMsg.getBaseDN(), expectedMsg.getCSN());
    }
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -81,7 +81,8 @@
  @Override
  public String toString()
  {
    return "currentRecord=" + current + " nextMessages=" + msgs;
        return getClass().getSimpleName() + "(currentRecord=" + current
                + " nextMessages=" + msgs + ")";
  }
}