mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.09.2014 4d8fd404b195b5ae6b4115c33d95828dfc9f2662
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -265,6 +265,23 @@
          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
    }
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasArePastOldestPossibleCSN();
  }
  private boolean allInitialReplicasArePastOldestPossibleCSN()
  {
    for (DN baseDN : lastAliveCSNs)
    {
      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
      {
        if (csn.getTime() == 0)
        {
          return false;
        }
      }
    }
    return true;
  }
@@ -299,8 +316,14 @@
      for (Integer serverId : entry.getValue())
      {
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
        /*
         * initialize with the oldest possible CSN in order for medium
         * consistency to wait for all replicas to be alive before moving
         * forward
         */
        lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
        // start after the actual CSN when initializing from the previous cookie
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
        ensureCursorExists(baseDN, serverId, csn);
      }
@@ -330,6 +353,11 @@
    this.changelogState = null;
  }
  private CSN oldestPossibleCSN(int serverId)
  {
    return new CSN(0, 0, serverId);
  }
  private void resetNextChangeForInsertDBCursor() throws ChangelogException
  {
    final Map<DBCursor<UpdateMsg>, DN> cursors =
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -151,7 +151,7 @@
  @Test
  public void emptyDBNoDS() throws Exception
  {
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    assertExternalChangelogContent();
  }
@@ -159,7 +159,7 @@
  public void emptyDBOneInitialDS() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -172,7 +172,7 @@
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
    setDBInitialRecords(msg1);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    publishUpdateMsg(msg2);
@@ -184,11 +184,15 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    // simulate messages received out of order
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2, msg1);
    publishUpdateMsg(msg2);
    // do not start publishing to the changelog until we hear from serverId1
    assertExternalChangelogContent();
    publishUpdateMsg(msg1);
    assertExternalChangelogContent(msg1);
  }
@@ -197,7 +201,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN2, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1, BASE_DN2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
@@ -216,7 +220,7 @@
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    setDBInitialRecords(msg1, msg2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
@@ -237,7 +241,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
@@ -254,7 +258,7 @@
    addReplica(ADMIN_DATA_DN, serverId1);
    addReplica(BASE_DN1, serverId2);
    addReplica(BASE_DN1, serverId3);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    // cn=admin data will does not participate in the external changelog
    // so it cannot add to it
@@ -272,7 +276,7 @@
  public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -292,7 +296,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -308,7 +312,7 @@
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer();
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -336,14 +340,15 @@
    initialState.addServerIdToDomain(serverId, baseDN);
  }
  private void startCNIndexer()
  private void startCNIndexer(DN... eclEnabledDomains)
  {
    final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
    {
      @Override
      protected boolean isECLEnabledDomain(DN baseDN)
      {
        return BASE_DN1.equals(baseDN) || BASE_DN2.equals(baseDN);
        return eclEnabledDomainList.contains(baseDN);
      }
    };
    cnIndexer.start();
@@ -448,12 +453,6 @@
  private void assertExternalChangelogContent(ReplicatedUpdateMsg... expectedMsgs)
      throws Exception
  {
    if (expectedMsgs.length == 0)
    {
      verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
      return;
    }
    final ArgumentCaptor<ChangeNumberIndexRecord> arg =
        ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
    verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());