mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
22.24.2013 53ac6966e65a907785505fd1da1ac196730cd442
Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB


Properly implemented the medium consistency point algorithm.


ChangeNumberIndexer.java:
Renamed instance field previousCookie to mediumConsistencyRUV and mediumConsistencyPoint to lastSeenUpdates.
Added mediumConsistencyCSN instance field.
Changed newCursors from ConcurrentMap<Integer, DN> to ConcurrentMap<CSN, DN>.
In tryNotify(), removed CSN parameter.
Added methods canMoveForwardMediumConsistencyPoint() and moveForwardMediumConsistencyPoint().


ChangeNumberIndexerTest.java:
Updated to reflect how the medium consistency point algorithm should work.

MultiDomainServerState.java:
Renamed get() to getCSN().

ServerState.java
In update(), updated javadocs + renamed local variables.
4 files modified
188 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java 17 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 110 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 48 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -189,16 +189,23 @@
  }
  /**
   * Returns the ServerState associated to the provided replication domain's
   * baseDN.
   * Returns the CSN associated to the provided replication domain's baseDN and
   * serverId.
   *
   * @param baseDN
   *          the replication domain's baseDN
   * @param serverId
   *          the serverId
   * @return the associated ServerState
   */
  public ServerState get(DN baseDN)
  public CSN getCSN(DN baseDN, int serverId)
  {
    return list.get(baseDN);
    final ServerState ss = list.get(baseDN);
    if (ss != null)
    {
      return ss.getCSN(serverId);
    }
    return null;
  }
  /**
@@ -260,7 +267,7 @@
  public static Map<DN, ServerState> splitGenStateToServerStates(
      String multiDomainServerState) throws DirectoryException
  {
    final Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
    Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
    if (multiDomainServerState != null && multiDomainServerState.length() > 0)
    {
      try
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -140,9 +140,12 @@
  }
  /**
   * Update the Server State with a CSN.
   * Forward update the Server State with a CSN. The provided CSN will be put on
   * the current object only if it is newer than the existing CSN for the same
   * serverId or if there is no existing CSN.
   *
   * @param csn The committed CSN.
   * @param csn
   *          The committed CSN.
   * @return a boolean indicating if the update was meaningful.
   */
  public boolean update(CSN csn)
@@ -154,9 +157,9 @@
    synchronized (serverIdToCSN)
    {
      int serverId = csn.getServerId();
      CSN oldCSN = serverIdToCSN.get(serverId);
      if (oldCSN == null || csn.isNewerThan(oldCSN))
      final int serverId = csn.getServerId();
      final CSN existingCSN = serverIdToCSN.get(serverId);
      if (existingCSN == null || csn.isNewerThan(existingCSN))
      {
        serverIdToCSN.put(serverId, csn);
        return true;
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
@@ -61,29 +62,41 @@
  private ChangelogState changelogState;
  /*
   * previousCookie and mediumConsistencyPoint must be thread safe, because
   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
   * 1) initialization can happen while the replication server starts receiving
   * updates 2) many updates can happen concurrently. This solution also avoids
   * using a queue that could fill up before we have consumed all its content.
   */
  /**
   * Holds the cross domain medium consistency Replication Update Vector for the
   * current replication server, also known as the previous cookie.
   * <p>
   * Stores the value of the cookie before the change currently processed is
   * inserted in the DB. After insert, it is updated with the CSN of the change
   * currently processed (thus becoming the "current" cookie just before the
   * change is returned.
   */
  private final MultiDomainServerState previousCookie =
      new MultiDomainServerState();
  /**
   * Holds the medium consistency point for the current replication server.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
   * >OpenDJ Domain Names for a description of what the medium consistency point
   * is</a>
   * >OpenDJ Domain Names - medium consistency RUV</a>
   */
  private final MultiDomainServerState mediumConsistencyPoint =
  private final MultiDomainServerState mediumConsistencyRUV =
      new MultiDomainServerState();
  /**
   * Holds the cross domain medium consistency CSN for the current replication
   * server.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
   * >OpenDJ Domain Names - medium consistency CSN</a>
   */
  private volatile CSN mediumConsistencyCSN;
  /**
   * Holds the most recent changes or heartbeats received for each serverIds
   * cross domain.
   */
  private final MultiDomainServerState lastSeenUpdates =
      new MultiDomainServerState();
  /**
@@ -103,8 +116,8 @@
  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
  /** This map can be updated by multiple threads. */
  private ConcurrentMap<Integer, DN> newCursors =
      new ConcurrentSkipListMap<Integer, DN>();
  private ConcurrentMap<CSN, DN> newCursors =
      new ConcurrentSkipListMap<CSN, DN>();
  /**
   * Builds a ChangeNumberIndexer object.
@@ -131,11 +144,8 @@
   */
  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    mediumConsistencyPoint.update(baseDN, heartbeatCSN);
    final CompositeDBCursor<DN> localCursor = crossDomainDBCursor;
    final DN changeBaseDN = localCursor.getData();
    final CSN changeCSN = localCursor.getRecord().getCSN();
    tryNotify(changeBaseDN, changeCSN);
    lastSeenUpdates.update(baseDN, heartbeatCSN);
    tryNotify(baseDN);
  }
  /**
@@ -152,18 +162,18 @@
      throws ChangelogException
  {
    final CSN csn = updateMsg.getCSN();
    mediumConsistencyPoint.update(baseDN, csn);
    newCursors.put(csn.getServerId(), baseDN);
    tryNotify(baseDN, csn);
    lastSeenUpdates.update(baseDN, csn);
    newCursors.put(csn, baseDN);
    tryNotify(baseDN);
  }
  /**
   * Notifies the Change number indexer thread if it will be able to do some
   * work.
   */
  private void tryNotify(final DN baseDN, final CSN csn)
  private void tryNotify(DN baseDN)
  {
    if (mediumConsistencyPoint.cover(baseDN, csn))
    if (canMoveForwardMediumConsistencyPoint(baseDN))
    {
      synchronized (this)
      {
@@ -172,13 +182,25 @@
    }
  }
  private boolean canMoveForwardMediumConsistencyPoint(DN baseDN)
  {
    final CSN mcCSN = mediumConsistencyCSN;
    if (mcCSN != null)
    {
      final CSN lastSeenSameServerId =
          lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
      return mcCSN.isOlderThan(lastSeenSameServerId);
    }
    return true;
  }
  private void initialize() throws ChangelogException, DirectoryException
  {
    final ChangeNumberIndexRecord newestRecord =
        changelogDB.getChangeNumberIndexDB().getNewestRecord();
    if (newestRecord != null)
    {
      previousCookie.update(
      mediumConsistencyRUV.update(
          new MultiDomainServerState(newestRecord.getPreviousCookie()));
    }
@@ -190,13 +212,12 @@
      final DN baseDN = entry.getKey();
      for (Integer serverId : entry.getValue())
      {
        final ServerState previousSS = previousCookie.get(baseDN);
        final CSN csn = previousSS != null ? previousSS.getCSN(serverId) : null;
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
        ensureCursorExists(baseDN, serverId, csn);
      }
      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
      mediumConsistencyPoint.update(baseDN, latestKnownState);
      lastSeenUpdates.update(baseDN, latestKnownState);
    }
    crossDomainDBCursor = newCompositeDBCursor();
@@ -206,14 +227,11 @@
      final UpdateMsg record = crossDomainDBCursor.getRecord();
      if (!record.getCSN().equals(newestRecord.getCSN()))
      {
        // TODO JNR remove
        throw new RuntimeException("They do not equal! recordCSN="
            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN());
        // TODO JNR i18n safety check, should never happen
        throw new ChangelogException(Message.raw("They do not equal! recordCSN="
            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
      }
      // TODO JNR is it possible to use the following line instead?
      // previousCookie.update(newestRecord.getBaseDN(), record.getCSN());
      // TODO JNR would this mean updating the if above?
      previousCookie.update(crossDomainDBCursor.getData(), record.getCSN());
      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
      crossDomainDBCursor.next();
    }
@@ -281,7 +299,7 @@
    }
    catch (ChangelogException e)
    {
      // TODO Auto-generated catch block
      // TODO JNR error message i18n
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      return;
@@ -310,14 +328,14 @@
        final DN baseDN = crossDomainDBCursor.getData();
        // FIXME problem: what if the serverId is not part of the ServerState?
        // right now, thread will be blocked
        if (!mediumConsistencyPoint.cover(baseDN, csn))
        if (!canMoveForwardMediumConsistencyPoint(baseDN))
        {
          // the oldest record to insert is newer than the medium consistency
          // point. Let's wait for a change that can be published.
          synchronized (this)
          {
            // double check to protect against a missed call to notify()
            if (!mediumConsistencyPoint.cover(baseDN, csn))
            if (!canMoveForwardMediumConsistencyPoint(baseDN))
            {
              wait();
              // loop to check if changes older than the medium consistency
@@ -329,11 +347,11 @@
        // OK, the oldest change is older than the medium consistency point
        // let's publish it to the CNIndexDB
        final String previousCookie = mediumConsistencyRUV.toString();
        final ChangeNumberIndexRecord record =
            new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn);
            new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
        changelogDB.getChangeNumberIndexDB().addRecord(record);
        // update, so it becomes the previous cookie for the next change
        previousCookie.update(baseDN, csn);
        moveForwardMediumConsistencyPoint(csn, baseDN);
        // advance cursor, success/failure will be checked later
        crossDomainDBCursor.next();
@@ -351,16 +369,24 @@
    }
  }
  private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN)
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(baseDN, csn);
    mediumConsistencyCSN = csn;
  }
  private void createNewCursors() throws ChangelogException
  {
    if (!newCursors.isEmpty())
    {
      boolean newCursorAdded = false;
      for (Iterator<Entry<Integer, DN>> iter = newCursors.entrySet().iterator();
      for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator();
          iter.hasNext();)
      {
        final Entry<Integer, DN> entry = iter.next();
        if (!ensureCursorExists(entry.getValue(), entry.getKey(), null))
        final Entry<CSN, DN> entry = iter.next();
        final CSN csn = entry.getKey();
        if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null))
        {
          newCursorAdded = true;
        }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.lang.Thread.State;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -106,7 +107,7 @@
      new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
  private ChangelogState initialState;
  private ChangeNumberIndexer indexer;
  private MultiDomainServerState previousCookie;
  private MultiDomainServerState initialCookie;
  @BeforeClass
  public static void classSetup() throws Exception
@@ -131,7 +132,7 @@
    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
    initialState = new ChangelogState();
    previousCookie = new MultiDomainServerState();
    initialCookie = new MultiDomainServerState();
  }
@@ -181,7 +182,7 @@
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg2, msg1);
    assertAddedRecords(msg1, msg2);
    assertAddedRecords(msg1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -197,14 +198,20 @@
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
    publishUpdateMsg(msg3, msg4);
    assertAddedRecords(msg3);
    assertAddedRecords(msg3, msg4);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
    publishUpdateMsg(msg5);
    assertAddedRecords(msg3);
    final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
    publishUpdateMsg(msg6);
    assertAddedRecords(msg3, msg4, msg5);
  }
  @Test(enabled = false, dependsOnMethods = { EMPTY_DB_NO_DS })
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception
  {
    // TODO JNR make this tests work
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
@@ -216,7 +223,7 @@
    // simulate no messages received during some time for replica 2
    publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
    assertAddedRecords(msg1Sid2, msg2Sid1, msg3Sid2);
    assertAddedRecords(msg1Sid2, msg2Sid1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -231,7 +238,10 @@
    addReplica(BASE_DN, serverId2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg2);
    assertAddedRecords(msg1);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
    publishUpdateMsg(msg3);
    assertAddedRecords(msg1, msg2);
  }
@@ -275,13 +285,13 @@
        final DN baseDN = newestMsg.getBaseDN();
        final CSN csn = newestMsg.getCSN();
        when(cnIndexDB.getNewestRecord()).thenReturn(
            new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn));
            new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
        final SequentialDBCursor cursor =
            cursors.get(Pair.of(baseDN, csn.getServerId()));
        cursor.add(newestMsg);
        cursor.next(); // simulate the cursor had been initialized with this change
      }
      previousCookie.update(msg.getBaseDN(), msg.getCSN());
      initialCookie.update(msg.getBaseDN(), msg.getCSN());
    }
  }
@@ -321,6 +331,10 @@
    assertThat(state).isEqualTo(State.WAITING);
  }
  /**
   * Asserts which records have been added to the CNIndexDB since starting the
   * {@link ChangeNumberIndexer} thread.
   */
  private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
  {
    final ArgumentCaptor<ChangeNumberIndexRecord> arg =
@@ -328,17 +342,21 @@
    verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
    final List<ChangeNumberIndexRecord> allValues = arg.getAllValues();
    // recheck it was not called more than expected
    assertThat(allValues).hasSameSizeAs(msgs);
    // clone initial state to avoid modifying it
    final MultiDomainServerState previousCookie =
        new MultiDomainServerState(initialCookie.toString());
    // check it was not called more than expected
    String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(msgs) + ">";
    assertThat(allValues.size()).as(desc1).isEqualTo(msgs.length);
    for (int i = 0; i < msgs.length; i++)
    {
      final ReplicatedUpdateMsg msg = msgs[i];
      final ChangeNumberIndexRecord record = allValues.get(i);
      // check content in order
      String description = "expected: <" + msg + ">, but got: <" + record + ">";
      assertThat(record.getBaseDN()).as(description).isEqualTo(msg.getBaseDN());
      assertThat(record.getCSN()).as(description).isEqualTo(msg.getCSN());
      assertThat(record.getPreviousCookie()).as(description).isEqualTo(previousCookie.toString());
      String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
      assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
      assertThat(record.getCSN()).as(desc2).isEqualTo(msg.getCSN());
      assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString());
      previousCookie.update(msg.getBaseDN(), msg.getCSN());
    }
  }