mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.46.2014 60b4019ba512ad303ab5f0dbd06c3203ba53e940
OPENDJ-1453 (CR-3667) Change time heart beat change numbers should be synced with updates

Added a test case for the change number computation + fixed the code to support it.
In ChangeNumberIndexer.run(), fixed a bug where shutdown forces an insert in the Change number index DB.

ChangeNumberIndexerTest.java:
Added emptyDBTwoDSsOneKilled test.
Fixed timestamps in a few tests.

ChangeNumberIndexer.java:
Initialized nextChangeForInsertDBCursor field to avoid NPE on startup.
Added getOldestLastAliveCSN(), mightMoveForwardMediumConsistencyPoint() + used it in tryNotify().
Changed canMoveForwardMediumConsistencyPoint() to canMoveForwardMediumConsistencyPoint(CSN).
Removed mediumConsistency field, unused now.
In run(), fixed a bug where shutdown forces an insert in the Change number index DB.
Updated javadocs.

MultiDomainServerState.java, MultiDomainServerStateTest.java:
Added getOldestCSNExcluding() and isReplicaExcluded().
4 files modified
307 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java 53 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 99 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java 109 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 46 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
@@ -40,6 +40,8 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import com.forgerock.opendj.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
/**
@@ -98,7 +100,9 @@
  public boolean update(DN baseDN, CSN csn)
  {
    if (csn == null)
    {
      return false;
    }
    ServerState serverState = list.get(baseDN);
    if (serverState == null)
@@ -242,6 +246,45 @@
  }
  /**
   * Returns the oldest Pair<DN, CSN> held in current object, excluding
   * the provided CSNs. Said otherwise, the value returned is the oldest
   * Pair<DN, CSN> included in the current object, that is not part of the
   * excludedCSNs.
   *
   * @param excludedCSNs
   *          the CSNs that cannot be returned
   * @return the oldest Pair<DN, CSN> included in the current object that
   *         is not part of the excludedCSNs, or {@link Pair#EMPTY} if no such
   *         older CSN exists.
   */
  public Pair<DN, CSN> getOldestCSNExcluding(MultiDomainServerState excludedCSNs)
  {
    Pair<DN, CSN> oldest = Pair.empty();
    for (Entry<DN, ServerState> entry : list.entrySet())
    {
      final DN baseDN = entry.getKey();
      final ServerState value = entry.getValue();
      for (Entry<Integer, CSN> entry2 : value.getServerIdToCSNMap().entrySet())
      {
        final CSN csn = entry2.getValue();
        if (!isReplicaExcluded(excludedCSNs, baseDN, csn)
            && (oldest == Pair.EMPTY || csn.isOlderThan(oldest.getSecond())))
        {
          oldest = Pair.of(baseDN, csn);
        }
      }
    }
    return oldest;
  }
  private boolean isReplicaExcluded(MultiDomainServerState excluded, DN baseDN,
      CSN csn)
  {
    return excluded != null
        && csn.equals(excluded.getCSN(baseDN, csn.getServerId()));
  }
  /**
   * Removes the mapping to the provided CSN if it is present in this
   * MultiDomainServerState.
   *
@@ -253,12 +296,8 @@
   */
  public boolean removeCSN(DN baseDN, CSN expectedCSN)
  {
    ServerState ss = list.get(baseDN);
    if (ss != null)
    {
      return ss.removeCSN(expectedCSN);
    }
    return false;
    final ServerState ss = list.get(baseDN);
    return ss != null && ss.removeCSN(expectedCSN);
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -84,6 +84,8 @@
   * inserted in the DB. After insert, it is updated with the CSN of the change
   * currently processed (thus becoming the "current" cookie just before the
   * change is returned.
   * <p>
   * Note: This object is only updated by changes/updates.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
@@ -91,15 +93,6 @@
   */
  private final MultiDomainServerState mediumConsistencyRUV =
      new MultiDomainServerState();
  /**
   * Holds the cross domain medium consistency baseDN and CSN for the current
   * replication server.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
   * >OpenDJ Domain Names - medium consistency CSN</a>
   */
  private volatile Pair<DN, CSN> mediumConsistency;
  /**
   * Holds the last time each replica was seen alive, whether via updates or
@@ -108,9 +101,12 @@
   * <p>
   * Updates are persistent and stored in the replicaDBs, heartbeats are
   * transient and are easily constructed on normal operations.
   * <p>
   * Note: This object is updated by both heartbeats and changes/updates.
   */
  private final MultiDomainServerState lastAliveCSNs =
      new MultiDomainServerState();
  /** Note: This object is updated by replica offline messages. */
  private final MultiDomainServerState replicasOffline =
      new MultiDomainServerState();
@@ -119,8 +115,12 @@
   * positioned on the next change that needs to be inserted in the CNIndexDB.
   * <p>
   * Note: it is only accessed from the {@link #run()} method.
   *
   * @NonNull
   */
  private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
  @SuppressWarnings("unchecked")
  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
  /**
   * New cursors for this Map must be created from the {@link #run()} method,
@@ -182,8 +182,9 @@
      return;
    }
    final CSN oldestCSNBefore = getOldestLastAliveCSN();
    lastAliveCSNs.update(baseDN, heartbeatCSN);
    tryNotify();
    tryNotify(oldestCSNBefore);
  }
  /**
@@ -222,8 +223,9 @@
    final CSN csn = updateMsg.getCSN();
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    final CSN oldestCSNBefore = getOldestLastAliveCSN();
    lastAliveCSNs.update(baseDN, csn);
    tryNotify();
    tryNotify(oldestCSNBefore);
  }
  /**
@@ -254,18 +256,29 @@
   */
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  {
    if (!isECLEnabledDomain(baseDN))
    {
      return;
    }
    replicasOffline.update(baseDN, offlineCSN);
    final CSN oldestCSNBefore = getOldestLastAliveCSN();
    lastAliveCSNs.update(baseDN, offlineCSN);
    tryNotify();
    tryNotify(oldestCSNBefore);
  }
  private CSN getOldestLastAliveCSN()
  {
    return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
  }
  /**
   * Notifies the Change number indexer thread if it will be able to do some
   * work.
   */
  private void tryNotify()
  private void tryNotify(final CSN oldestCSNBefore)
  {
    if (canMoveForwardMediumConsistencyPoint())
    if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
    {
      synchronized (this)
      {
@@ -274,19 +287,32 @@
    }
  }
  private boolean canMoveForwardMediumConsistencyPoint()
  /**
   * Used for waking up the {@link ChangeNumberIndexer} thread because it might
   * have some work to do.
   */
  private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
  {
    final Pair<DN, CSN> mc = mediumConsistency;
    if (mc != null)
    {
      final CSN mcCSN = mc.getSecond();
      final CSN lastTimeSameReplicaSeenAlive =
          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
    }
    final CSN oldestCSNAfter = getOldestLastAliveCSN();
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasAreOfflineOrAlive();
    return allInitialReplicasAreOfflineOrAlive()
        && oldestCSNBefore != null // then oldestCSNAfter cannot be null
        // has the oldest CSN changed?
        && oldestCSNBefore.isOlderThan(oldestCSNAfter);
  }
  /**
   * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
   * must be persisted to the change number index DB.
   */
  private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
  {
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasAreOfflineOrAlive()
        // can we persist the next CSN?
        && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
  }
  /**
@@ -299,8 +325,8 @@
   * CSN has been updated to something past the oldest possible CSN), we have
   * enough info to compute medium consistency</li>
   * </ul>
   * In this case, we have enough information to compute medium consistency
   * without waiting any more.
   * In both cases, we have enough information to compute medium consistency
   * without waiting any further.
   */
  private boolean allInitialReplicasAreOfflineOrAlive()
  {
@@ -308,11 +334,11 @@
    {
      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
      {
        if (// oldest possible CSN?
            csn.getTime() == 0
            // replica is not offline
        if (csn.getTime() == 0
            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
        {
          // this is the oldest possible CSN, but the replica is not offline
          // we must wait for more up to date information from this replica
          return false;
        }
      }
@@ -537,16 +563,19 @@
          final DN baseDN = nextChangeForInsertDBCursor.getData();
          // FIXME problem: what if the serverId is not part of the ServerState?
          // right now, change number will be blocked
          if (!canMoveForwardMediumConsistencyPoint())
          if (!canMoveForwardMediumConsistencyPoint(csn))
          {
            // the oldest record to insert is newer than the medium consistency
            // point. Let's wait for a change that can be published.
            synchronized (this)
            {
              // double check to protect against a missed call to notify()
              if (!isShutdownInitiated()
                  && !canMoveForwardMediumConsistencyPoint())
              if (!canMoveForwardMediumConsistencyPoint(csn))
              {
                if (isShutdownInitiated())
                {
                  return;
                }
                wait();
                // loop to check if changes older than the medium consistency
                // point have been added to the ReplicaDBs
@@ -601,10 +630,10 @@
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  {
    boolean callNextOnCursor = true;
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
    boolean callNextOnCursor = true;
    final int mcServerId = mcCSN.getServerId();
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
@@ -21,14 +21,17 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.types.DN;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.forgerock.opendj.util.Pair;
import static org.assertj.core.api.Assertions.*;
import static org.testng.Assert.*;
@@ -40,6 +43,18 @@
  private static final CSN csn2 = new CSN(4, 5, 6);
  private static final CSN csn3 = new CSN(7, 8, 3);
  private static DN dn1;
  private static DN dn2;
  private static DN dn3;
  @BeforeClass
  public void setBaseDNs() throws Exception
  {
    dn1 = DN.decode("o=test1");
    dn2 = DN.decode("o=test2");
    dn3 = DN.decode("o=test3");
  }
  @Test
  public void testDecodeAndEncode1() throws Exception
  {
@@ -60,9 +75,6 @@
  @Test
  public void testUpdateCSN() throws Exception
  {
    final DN dn1 = DN.decode("o=test1");
    final DN dn2 = DN.decode("o=test2");
    final MultiDomainServerState state = new MultiDomainServerState();
    assertTrue(state.update(dn1, csn1));
    assertTrue(state.update(dn2, csn2));
@@ -77,9 +89,6 @@
  @Test
  public void testUpdateServerState() throws Exception
  {
    final DN dn1 = DN.decode("o=test1");
    final DN dn2 = DN.decode("o=test2");
    final MultiDomainServerState state = new MultiDomainServerState();
    final ServerState ss1 = new ServerState();
    assertTrue(ss1.update(csn3));
@@ -95,9 +104,6 @@
  @Test
  public void testUpdateMultiDomainServerState() throws Exception
  {
    final DN dn1 = DN.decode("o=test1");
    final DN dn2 = DN.decode("o=test2");
    final MultiDomainServerState state1 = new MultiDomainServerState();
    state1.update(dn1, csn3);
    state1.update(dn2, csn2);
@@ -112,9 +118,6 @@
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testEqualsTo() throws Exception
  {
    final DN dn1 = DN.decode("o=test1");
    final DN dn2 = DN.decode("o=test2");
    final MultiDomainServerState state1 = new MultiDomainServerState();
    assertTrue(state1.update(dn1, csn3));
@@ -134,9 +137,6 @@
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testIsEmpty() throws Exception
  {
    final DN dn1 = DN.decode("o=test1");
    final DN dn2 = DN.decode("o=test2");
    final MultiDomainServerState state = new MultiDomainServerState();
    assertTrue(state.isEmpty());
@@ -155,15 +155,7 @@
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testRemoveCSN() throws Exception
  {
    final DN dn1 = DN.decode("o=test1");
    final DN dn2 = DN.decode("o=test2");
    final DN dn3 = DN.decode("o=test3");
    final MultiDomainServerState state = new MultiDomainServerState();
    assertTrue(state.update(dn1, csn1));
    assertTrue(state.update(dn2, csn1));
    assertTrue(state.update(dn2, csn2));
    final MultiDomainServerState state = getLastAliveCSNs();
    assertNull(state.getCSN(dn3, 42));
    assertFalse(state.removeCSN(dn3, csn1));
@@ -181,4 +173,71 @@
    assertNull(state.getCSN(dn2, csn1.getServerId()));
    assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
  }
  private MultiDomainServerState getLastAliveCSNs()
  {
    final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
    assertTrue(lastAliveCSNs.update(dn1, csn1));
    assertTrue(lastAliveCSNs.update(dn2, csn1));
    assertTrue(lastAliveCSNs.update(dn2, csn2));
    return lastAliveCSNs;
  }
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testGetOldestCSNExcluding_null() throws Exception
  {
    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
    assertEquals(lastAliveCSNs.getOldestCSNExcluding(null), Pair.of(dn1, csn1));
  }
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testGetOldestCSNExcluding_empty() throws Exception
  {
    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
    final MultiDomainServerState excluded = new MultiDomainServerState();
    assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn1, csn1));
  }
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testGetOldestCSNExcluding_currentOldestCSN_givesNewOldestCSN() throws Exception
  {
    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
    final MultiDomainServerState excluded = new MultiDomainServerState();
    excluded.update(dn1, csn1);
    assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
  }
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testGetOldestCSNExcluding_CSNOlderThanCurrentOldestCSN_givesNewOldestCSN() throws Exception
  {
    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
    final MultiDomainServerState excluded = new MultiDomainServerState();
    excluded.update(dn1, csn1);
    final CSN olderThanCSN1 = new CSN(0, 2, 3);
    assertEquals(olderThanCSN1.getServerId(), csn1.getServerId());
    assertTrue(olderThanCSN1.isOlderThan(csn1));
    excluded.update(dn2, olderThanCSN1);
    assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
  }
  @Test(dependsOnMethods = { "testUpdateCSN" })
  public void testGetOldestCSNExcluding_CSNNewerThanCurrentOldestCSN_givesNewOldestCSN() throws Exception
  {
    final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
    final MultiDomainServerState excluded = new MultiDomainServerState();
    excluded.update(dn1, csn1);
    final CSN newerThanCSN1 = new CSN(42, 2, 3);
    assertEquals(newerThanCSN1.getServerId(), csn1.getServerId());
    assertTrue(newerThanCSN1.isNewerThan(csn1));
    excluded.update(dn2, newerThanCSN1);
    assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -355,6 +355,7 @@
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    assertExternalChangelogContent(msg1);
    addReplica(BASE_DN1, serverId2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -486,19 +487,19 @@
    assertExternalChangelogContent(msg1);
    // do not wait for temporarily offline serverId1
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg1, msg3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
    publishUpdateMsg(msg4);
    assertExternalChangelogContent(msg1, msg4);
    // serverId1 is back online, wait for changes from serverId2
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
    publishUpdateMsg(msg4);
    assertExternalChangelogContent(msg1, msg3);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
    publishUpdateMsg(msg5);
    assertExternalChangelogContent(msg1, msg4);
    final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
    publishUpdateMsg(msg6);
    // MCP moves forward
    assertExternalChangelogContent(msg1, msg3, msg4);
    assertExternalChangelogContent(msg1, msg4, msg5);
  }
  /**
@@ -525,16 +526,39 @@
    assertExternalChangelogContent();
    // MCP moves forward because serverId1 is not really offline
    // since because we received a message from it after the offline replica msg
    // since we received a message from it newer than the offline replica msg
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
    publishUpdateMsg(msg4);
    assertExternalChangelogContent(msg2, msg3);
    // back to normal operations
    sendHeartbeat(BASE_DN1, serverId1, 4);
    sendHeartbeat(BASE_DN1, serverId1, 5);
    assertExternalChangelogContent(msg2, msg3, msg4);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneKilled() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    // MCP cannot move forward: no news yet from serverId2
    assertExternalChangelogContent();
    sendHeartbeat(BASE_DN1, serverId2, 2);
    // MCP moves forward: we know what serverId2 is at
    assertExternalChangelogContent(msg1);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg3);
    // MCP cannot move forward: serverId2 is the oldest CSN
    assertExternalChangelogContent(msg1);
  }
  private void addReplica(DN baseDN, int serverId) throws Exception
  {
    final SequentialDBCursor cursor = new SequentialDBCursor();