mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.14.2014 5eb7b26eabf15d047fd913597aa508bb78c1d7e7
OPENDJ-1259 (CR-3563) Make the Medium Consistency Point support replicas temporarily leaving the topology


Added more tests about how changeNumber computation is performed WRT replicas offline messages and RS restarts.


ChangeNumberIndexerTest.java:
Tested additional scenarios: emptyDBTwoDSsOneInitiallyOffline(), emptyDBTwoDSsOneInitiallyWithChangesThenOffline(), emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges().
In several tests, called assertExternalChangelogContent() after calling startCNIndexer().
Renamed setDBInitialRecords() to setCNIndexDBInitialRecords().
Added more javadocs.
Used @Mock + MockitoAnnotations.initMocks() instead of several calls to Mockito.mock().

ChangeNumberIndexer.java:
Changed the code to support the additional scenarios.
Renamed allInitialReplicasArePastOldestPossibleCSN() to allInitialReplicasAreOfflineOrAlive().
Changed removeCursor() into getCursor().
2 files modified
249 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 69 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 180 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -271,16 +271,32 @@
    }
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasArePastOldestPossibleCSN();
    return allInitialReplicasAreOfflineOrAlive();
  }
  private boolean allInitialReplicasArePastOldestPossibleCSN()
  /**
   * Returns true only if the initial replicas known from the changelog state DB
   * are either:
   * <ul>
   * <li>offline, so do not wait for them in order to compute medium consistency
   * </li>
   * <li>alive, because we received heartbeats or changes (so their last alive
   * CSN has been updated to something past the oldest possible CSN), we have
   * enough info to compute medium consistency</li>
   * </ul>
   * In this case, we have enough information to compute medium consistency
   * without waiting any more.
   */
  private boolean allInitialReplicasAreOfflineOrAlive()
  {
    for (DN baseDN : lastAliveCSNs)
    {
      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
      {
        if (csn.getTime() == 0)
        if (// oldest possible CSN?
            csn.getTime() == 0
            // replica is not offline
            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
        {
          return false;
        }
@@ -363,6 +379,9 @@
        if (isECLEnabledDomain(baseDN))
        {
          replicasOffline.update(baseDN, offlineCSN);
          // a replica offline message could also be the very last time
          // we heard from this replica :)
          lastAliveCSNs.update(baseDN, offlineCSN);
        }
      }
    }
@@ -528,10 +547,6 @@
              new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
          changelogDB.getChangeNumberIndexDB().addRecord(record);
          moveForwardMediumConsistencyPoint(csn, baseDN);
          // advance the cursor we just read from,
          // success/failure will be checked later
          nextChangeForInsertDBCursor.next();
        }
        catch (InterruptedException ignored)
        {
@@ -571,6 +586,7 @@
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  {
    boolean callNextOnCursor = true;
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
@@ -586,19 +602,36 @@
      }
      else if (offlineCSN.isOlderThan(mcCSN))
      {
        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
            pair = getCursor(mcBaseDN, mcCSN.getServerId());
        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
        if (iter != null && !iter.hasNext())
        {
        /*
         * replica is not back online and Medium consistency point has gone past
         * its last offline time: remove everything known about it: cursor,
         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
         * from the medium consistency RUV.
           * replica is not back online, Medium consistency point has gone past
           * its last offline time, and there are no more changes after the
           * offline CSN in the cursor: remove everything known about it:
           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
           * this replica from the medium consistency RUV.
         */
        removeCursor(mcBaseDN, mcCSN);
          iter.remove();
          StaticUtils.close(pair.getFirst());
          resetNextChangeForInsertDBCursor();
          callNextOnCursor = false;
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
    }
  }
    if (callNextOnCursor)
    {
      // advance the cursor we just read from,
      // success/failure will be checked later
      nextChangeForInsertDBCursor.next();
    }
  }
  private void removeAllCursors()
  {
    if (nextChangeForInsertDBCursor != null)
@@ -614,8 +647,8 @@
    newCursors.clear();
  }
  private void removeCursor(final DN baseDN, final CSN csn)
      throws ChangelogException
  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
      getCursor(final DN baseDN, final int serverId) throws ChangelogException
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
@@ -626,16 +659,14 @@
            entry1.getValue().entrySet().iterator(); iter.hasNext();)
        {
          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
          if (csn.getServerId() == entry2.getKey())
          if (serverId == entry2.getKey())
          {
            iter.remove();
            StaticUtils.close(entry2.getValue());
            resetNextChangeForInsertDBCursor();
            return;
            return Pair.of(entry2.getValue(), iter);
          }
        }
      }
    }
    return Pair.empty();
  }
  private boolean recycleExhaustedCursors() throws ChangelogException
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -32,6 +32,8 @@
import java.util.Map;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
@@ -52,6 +54,23 @@
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
/**
 * Test for ChangeNumberIndexer class. All dependencies to the changelog DB
 * interfaces are mocked. The ChangeNumberIndexer class simulates what the RS
 * does to compute a changeNumber. The tests setup various topologies with their
 * replicas.
 * <p>
 * All tests are written with this layout:
 * <ul>
 * <li>Initial setup where RS is stopped. Data are set into the changelog state
 * DB, the replica DBs and the change number index DB.</li>
 * <li>Simulate RS startup by calling {@link #startCNIndexer(DN...)}. This will
 * start the change number indexer thread that will start computing change
 * numbers and inserting them in the change number index db.</li>
 * <li>Send events to the change number indexer thread by publishing update
 * messages, sending heartbeat messages or replica offline messages.</li>
 * </ul>
 */
@SuppressWarnings("javadoc")
public class ChangeNumberIndexerTest extends DirectoryServerTestCase
{
@@ -102,12 +121,15 @@
  private static final int serverId2 = 102;
  private static final int serverId3 = 103;
  @Mock
  private ChangelogDB changelogDB;
  @Mock
  private ChangeNumberIndexDB cnIndexDB;
  @Mock
  private ReplicationDomainDB domainDB;
  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
      new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
  private ChangelogState initialState;
  private Map<DN, ServerState> domainNewestCSNs;
  private ChangeNumberIndexer cnIndexer;
  private MultiDomainServerState initialCookie;
@@ -129,14 +151,14 @@
  @BeforeMethod
  public void setup() throws Exception
  {
    changelogDB = mock(ChangelogDB.class);
    cnIndexDB = mock(ChangeNumberIndexDB.class);
    domainDB = mock(ReplicationDomainDB.class);
    MockitoAnnotations.initMocks(this);
    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
    initialState = new ChangelogState();
    initialCookie = new MultiDomainServerState();
    cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
    domainNewestCSNs = new HashMap<DN, ServerState>();
  }
  @AfterMethod
@@ -159,6 +181,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -170,8 +193,9 @@
  {
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
    setDBInitialRecords(msg1);
    setCNIndexDBInitialRecords(msg1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    publishUpdateMsg(msg2);
@@ -184,6 +208,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    // simulate messages received out of order
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -201,6 +226,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN2, serverId2);
    startCNIndexer(BASE_DN1, BASE_DN2);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
@@ -234,6 +260,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -263,8 +290,9 @@
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    setDBInitialRecords(msg1, msg2);
    setCNIndexDBInitialRecords(msg1, msg2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
@@ -286,6 +314,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
@@ -303,6 +332,7 @@
    addReplica(BASE_DN1, serverId2);
    addReplica(BASE_DN1, serverId3);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    // cn=admin data will does not participate in the external changelog
    // so it cannot add to it
@@ -321,6 +351,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -340,6 +371,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -359,6 +391,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -375,6 +408,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -401,16 +435,128 @@
    assertExternalChangelogContent(msg1, msg2, msg4, msg5);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2);
    // MCP does not wait for temporarily offline serverId1
    assertExternalChangelogContent(msg2);
    // serverId1 is back online, wait for changes from serverId2
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg2);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
    publishUpdateMsg(msg4);
    // MCP moves forward
    assertExternalChangelogContent(msg2, msg3);
  }
  /**
   * Scenario:
   * <ol>
   * <li>Replica 1 publishes one change</li>
   * <li>Replica 1 sends offline message</li>
   * <li>RS stops</li>
   * <li>RS starts</li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
    startCNIndexer(BASE_DN1);
    // blocked until we receive info for serverId2
    assertExternalChangelogContent();
    sendHeartbeat(BASE_DN1, serverId2, 3);
    // MCP moves forward
    assertExternalChangelogContent(msg1);
    // do not wait for temporarily offline serverId1
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg1, msg3);
    // serverId1 is back online, wait for changes from serverId2
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
    publishUpdateMsg(msg4);
    assertExternalChangelogContent(msg1, msg3);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
    publishUpdateMsg(msg5);
    // MCP moves forward
    assertExternalChangelogContent(msg1, msg3, msg4);
  }
  /**
   * Scenario:
   * <ol>
   * <li>Replica 1 sends offline message</li>
   * <li>Replica 1 starts</li>
   * <li>Replica 1 publishes one change</li>
   * <li>Replica 1 publishes a second change</li>
   * <li>RS stops</li>
   * <li>RS starts</li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg2, msg3);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    // MCP moves forward because serverId1 is not really offline
    // since because we received a message from it after the offline replica msg
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
    publishUpdateMsg(msg4);
    assertExternalChangelogContent(msg2, msg3);
    // back to normal operations
    sendHeartbeat(BASE_DN1, serverId1, 4);
    assertExternalChangelogContent(msg2, msg3, msg4);
  }
  private void addReplica(DN baseDN, int serverId) throws Exception
  {
    final SequentialDBCursor cursor = new SequentialDBCursor();
    cursors.put(Pair.of(baseDN, serverId), cursor);
    when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
        .thenReturn(cursor);
    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(new ServerState());
    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
        getDomainNewestCSNs(baseDN));
    initialState.addServerIdToDomain(serverId, baseDN);
  }
  private ServerState getDomainNewestCSNs(final DN baseDN)
  {
    ServerState serverState = domainNewestCSNs.get(baseDN);
    if (serverState == null)
    {
      serverState = new ServerState();
      domainNewestCSNs.put(baseDN, serverState);
    }
    return serverState;
  }
  private void startCNIndexer(DN... eclEnabledDomains)
  {
    final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
@@ -431,8 +577,8 @@
    if (cnIndexer != null)
    {
      cnIndexer.initiateShutdown();
      cnIndexer.interrupt();
      cnIndexer.join();
      cnIndexer = null;
    }
  }
@@ -446,7 +592,7 @@
    return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
  }
  private void setDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
  private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
  {
    // Initialize the previous cookie that will be used to compare the records
    // added to the CNIndexDB at the end of this test
@@ -488,8 +634,17 @@
    {
      if (!msg.isEmptyCursor())
      {
        if (cnIndexer != null)
        {
          // indexer is running
        cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
      }
        else
        {
          // we are only setting up initial state, update the domain newest CSNs
          getDomainNewestCSNs(msg.getBaseDN()).update(msg.getCSN());
        }
      }
    }
    waitForWaitingState(cnIndexer);
  }
@@ -508,6 +663,10 @@
  private void waitForWaitingState(final Thread t)
  {
    if (t == null)
    { // not started yet, do not wait
      return;
    }
    State state = t.getState();
    while (!state.equals(State.WAITING)
        && !state.equals(State.TIMED_WAITING)
@@ -567,6 +726,7 @@
  @Test(dataProvider = "precedingCSNDataProvider")
  public void getPrecedingCSN(CSN start, CSN expected)
  {
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
    CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
    assertThat(precedingCSN).isEqualTo(expected);
  }