mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.08.2014 69a6335cadea27e7095cc502b5e62bcc01b3806f
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -269,16 +269,32 @@
    }
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasArePastOldestPossibleCSN();
    return allInitialReplicasAreOfflineOrAlive();
  }
  private boolean allInitialReplicasArePastOldestPossibleCSN()
  /**
   * Returns true only if the initial replicas known from the changelog state DB
   * are either:
   * <ul>
   * <li>offline, so do not wait for them in order to compute medium consistency
   * </li>
   * <li>alive, because we received heartbeats or changes (so their last alive
   * CSN has been updated to something past the oldest possible CSN), we have
   * enough info to compute medium consistency</li>
   * </ul>
   * In this case, we have enough information to compute medium consistency
   * without waiting any more.
   */
  private boolean allInitialReplicasAreOfflineOrAlive()
  {
    for (DN baseDN : lastAliveCSNs)
    {
      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
      {
        if (csn.getTime() == 0)
        if (// oldest possible CSN?
            csn.getTime() == 0
            // replica is not offline
            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
        {
          return false;
        }
@@ -361,6 +377,9 @@
        if (isECLEnabledDomain(baseDN))
        {
          replicasOffline.update(baseDN, offlineCSN);
          // a replica offline message could also be the very last time
          // we heard from this replica :)
          lastAliveCSNs.update(baseDN, offlineCSN);
        }
      }
    }
@@ -527,10 +546,6 @@
              new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
          changelogDB.getChangeNumberIndexDB().addRecord(record);
          moveForwardMediumConsistencyPoint(csn, baseDN);
          // advance the cursor we just read from,
          // success/failure will be checked later
          nextChangeForInsertDBCursor.next();
        }
        catch (InterruptedException ignored)
        {
@@ -568,6 +583,7 @@
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  {
    boolean callNextOnCursor = true;
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
@@ -583,17 +599,34 @@
      }
      else if (offlineCSN.isOlderThan(mcCSN))
      {
        /*
         * replica is not back online and Medium consistency point has gone past
         * its last offline time: remove everything known about it: cursor,
         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
         * from the medium consistency RUV.
         */
        removeCursor(mcBaseDN, mcCSN);
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
            pair = getCursor(mcBaseDN, mcCSN.getServerId());
        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
        if (iter != null && !iter.hasNext())
        {
          /*
           * replica is not back online, Medium consistency point has gone past
           * its last offline time, and there are no more changes after the
           * offline CSN in the cursor: remove everything known about it:
           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
           * this replica from the medium consistency RUV.
           */
          iter.remove();
          StaticUtils.close(pair.getFirst());
          resetNextChangeForInsertDBCursor();
          callNextOnCursor = false;
          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
        }
      }
    }
    if (callNextOnCursor)
    {
      // advance the cursor we just read from,
      // success/failure will be checked later
      nextChangeForInsertDBCursor.next();
    }
  }
  private void removeAllCursors()
@@ -611,8 +644,8 @@
    newCursors.clear();
  }
  private void removeCursor(final DN baseDN, final CSN csn)
      throws ChangelogException
  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
      getCursor(final DN baseDN, final int serverId) throws ChangelogException
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
@@ -623,16 +656,14 @@
            entry1.getValue().entrySet().iterator(); iter.hasNext();)
        {
          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
          if (csn.getServerId() == entry2.getKey())
          if (serverId == entry2.getKey())
          {
            iter.remove();
            StaticUtils.close(entry2.getValue());
            resetNextChangeForInsertDBCursor();
            return;
            return Pair.of(entry2.getValue(), iter);
          }
        }
      }
    }
    return Pair.empty();
  }
  private boolean recycleExhaustedCursors() throws ChangelogException
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -33,6 +33,8 @@
import java.util.Map;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
@@ -53,6 +55,23 @@
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
/**
 * Test for ChangeNumberIndexer class. All dependencies to the changelog DB
 * interfaces are mocked. The ChangeNumberIndexer class simulates what the RS
 * does to compute a changeNumber. The tests setup various topologies with their
 * replicas.
 * <p>
 * All tests are written with this layout:
 * <ul>
 * <li>Initial setup where RS is stopped. Data are set into the changelog state
 * DB, the replica DBs and the change number index DB.</li>
 * <li>Simulate RS startup by calling {@link #startCNIndexer(DN...)}. This will
 * start the change number indexer thread that will start computing change
 * numbers and inserting them in the change number index db.</li>
 * <li>Send events to the change number indexer thread by publishing update
 * messages, sending heartbeat messages or replica offline messages.</li>
 * </ul>
 */
@SuppressWarnings("javadoc")
public class ChangeNumberIndexerTest extends DirectoryServerTestCase
{
@@ -103,12 +122,15 @@
  private static final int serverId2 = 102;
  private static final int serverId3 = 103;
  @Mock
  private ChangelogDB changelogDB;
  @Mock
  private ChangeNumberIndexDB cnIndexDB;
  @Mock
  private ReplicationDomainDB domainDB;
  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
      new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
  private ChangelogState initialState;
  private Map<DN, ServerState> domainNewestCSNs;
  private ChangeNumberIndexer cnIndexer;
  private MultiDomainServerState initialCookie;
@@ -130,14 +152,14 @@
  @BeforeMethod
  public void setup() throws Exception
  {
    changelogDB = mock(ChangelogDB.class);
    cnIndexDB = mock(ChangeNumberIndexDB.class);
    domainDB = mock(ReplicationDomainDB.class);
    MockitoAnnotations.initMocks(this);
    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
    initialState = new ChangelogState();
    initialCookie = new MultiDomainServerState();
    cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
    domainNewestCSNs = new HashMap<DN, ServerState>();
  }
  @AfterMethod
@@ -160,6 +182,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -171,8 +194,9 @@
  {
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
    setDBInitialRecords(msg1);
    setCNIndexDBInitialRecords(msg1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    publishUpdateMsg(msg2);
@@ -185,6 +209,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    // simulate messages received out of order
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -202,6 +227,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN2, serverId2);
    startCNIndexer(BASE_DN1, BASE_DN2);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
@@ -235,6 +261,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -264,8 +291,9 @@
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    setDBInitialRecords(msg1, msg2);
    setCNIndexDBInitialRecords(msg1, msg2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
@@ -287,6 +315,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
@@ -304,6 +333,7 @@
    addReplica(BASE_DN1, serverId2);
    addReplica(BASE_DN1, serverId3);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    // cn=admin data will does not participate in the external changelog
    // so it cannot add to it
@@ -322,6 +352,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -341,6 +372,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -360,6 +392,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -376,6 +409,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -402,16 +436,128 @@
    assertExternalChangelogContent(msg1, msg2, msg4, msg5);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2);
    // MCP does not wait for temporarily offline serverId1
    assertExternalChangelogContent(msg2);
    // serverId1 is back online, wait for changes from serverId2
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg2);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
    publishUpdateMsg(msg4);
    // MCP moves forward
    assertExternalChangelogContent(msg2, msg3);
  }
  /**
   * Scenario:
   * <ol>
   * <li>Replica 1 publishes one change</li>
   * <li>Replica 1 sends offline message</li>
   * <li>RS stops</li>
   * <li>RS starts</li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
    startCNIndexer(BASE_DN1);
    // blocked until we receive info for serverId2
    assertExternalChangelogContent();
    sendHeartbeat(BASE_DN1, serverId2, 3);
    // MCP moves forward
    assertExternalChangelogContent(msg1);
    // do not wait for temporarily offline serverId1
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg1, msg3);
    // serverId1 is back online, wait for changes from serverId2
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
    publishUpdateMsg(msg4);
    assertExternalChangelogContent(msg1, msg3);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
    publishUpdateMsg(msg5);
    // MCP moves forward
    assertExternalChangelogContent(msg1, msg3, msg4);
  }
  /**
   * Scenario:
   * <ol>
   * <li>Replica 1 sends offline message</li>
   * <li>Replica 1 starts</li>
   * <li>Replica 1 publishes one change</li>
   * <li>Replica 1 publishes a second change</li>
   * <li>RS stops</li>
   * <li>RS starts</li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg2, msg3);
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
    // MCP moves forward because serverId1 is not really offline
    // since because we received a message from it after the offline replica msg
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
    publishUpdateMsg(msg4);
    assertExternalChangelogContent(msg2, msg3);
    // back to normal operations
    sendHeartbeat(BASE_DN1, serverId1, 4);
    assertExternalChangelogContent(msg2, msg3, msg4);
  }
  private void addReplica(DN baseDN, int serverId) throws Exception
  {
    final SequentialDBCursor cursor = new SequentialDBCursor();
    cursors.put(Pair.of(baseDN, serverId), cursor);
    when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
        .thenReturn(cursor);
    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(new ServerState());
    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
        getDomainNewestCSNs(baseDN));
    initialState.addServerIdToDomain(serverId, baseDN);
  }
  private ServerState getDomainNewestCSNs(final DN baseDN)
  {
    ServerState serverState = domainNewestCSNs.get(baseDN);
    if (serverState == null)
    {
      serverState = new ServerState();
      domainNewestCSNs.put(baseDN, serverState);
    }
    return serverState;
  }
  private void startCNIndexer(DN... eclEnabledDomains)
  {
    final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
@@ -432,8 +578,8 @@
    if (cnIndexer != null)
    {
      cnIndexer.initiateShutdown();
      cnIndexer.interrupt();
      cnIndexer.join();
      cnIndexer = null;
    }
  }
@@ -447,7 +593,7 @@
    return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
  }
  private void setDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
  private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
  {
    // Initialize the previous cookie that will be used to compare the records
    // added to the CNIndexDB at the end of this test
@@ -489,7 +635,16 @@
    {
      if (!msg.isEmptyCursor())
      {
        cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
        if (cnIndexer != null)
        {
          // indexer is running
          cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
        }
        else
        {
          // we are only setting up initial state, update the domain newest CSNs
          getDomainNewestCSNs(msg.getBaseDN()).update(msg.getCSN());
        }
      }
    }
    waitForWaitingState(cnIndexer);
@@ -509,6 +664,10 @@
  private void waitForWaitingState(final Thread t)
  {
    if (t == null)
    { // not started yet, do not wait
      return;
    }
    State state = t.getState();
    while (!state.equals(State.WAITING)
        && !state.equals(State.TIMED_WAITING)
@@ -568,6 +727,7 @@
  @Test(dataProvider = "precedingCSNDataProvider")
  public void getPrecedingCSN(CSN start, CSN expected)
  {
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
    CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
    assertThat(precedingCSN).isEqualTo(expected);
  }