mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
09.35.2013 bcd9325b7d47b6932d140a15ee761252e130ab7e
OPENDJ-1116 Introduce abstraction for the changelog DB


ReplicationServerDomain.java:
Renamed getStartState() to getOldestState().

ServerState.java:
In duplicateOnlyOlderThan(), changed parameter from CSN to long.


ReplicationServer.java:
Consequence of the change to ServerState and ReplicationServerDomain.
In getECLChangeNumberLimits(), brought the database empty code to the top of the method: to get something more readable.

ECLServerHandler.java:
Consequence of the change to ServerState and ReplicationServerDomain.
Changed a few names / comments.

JEChangeNumberIndexDB.java
Consequence of the change to ReplicationServerDomain.


ExternalChangeLogTest.java
Consequence of the change to ReplicationServerDomain.
Code cleanup:
- removed gblCSN instance field
- inlined sleep()
- used connection.process*() + inlined runModifyOperation() and runDeleteOperation().
- renamed getReplicationDomainStartState() to getDomainOldestState() + added better asserts
6 files modified
438 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/ServerState.java 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 25 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 66 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java 15 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 311 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -529,16 +529,19 @@
  }
  /**
   * Build a copy of the ServerState with only CSNs older than
   * a specific CSN. This is used when building the initial
   * Cookie in the External Changelog, to cope with purged changes.
   * @param csn The CSN to compare the ServerState with
   * Build a copy of the ServerState with only CSNs older than a provided
   * timestamp. This is used when building the initial Cookie in the External
   * Changelog, to cope with purged changes.
   *
   * @param timestamp
   *          The timestamp to compare the ServerState against
   * @return a copy of the ServerState which only contains the CSNs older than
   *         csn.
   */
  public ServerState duplicateOnlyOlderThan(CSN csn)
  public ServerState duplicateOnlyOlderThan(long timestamp)
  {
    ServerState newState = new ServerState();
    final CSN csn = new CSN(timestamp, 0, 0);
    final ServerState newState = new ServerState();
    synchronized (serverIdToCSN)
    {
      for (CSN change : serverIdToCSN.values())
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -186,8 +186,8 @@
          .append(")")
          .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
          .append("] [startState=").append(startState)
          .append("] [stopState=").append(stopState)
          .append("] [currentState=").append(currentState)
          .append("] [stopState=").append(stopState)
          .append("]]");
    }
@@ -735,11 +735,10 @@
      }
      // skip unused domains
      final ServerState latestServerState = domain.getLatestServerState();
      if (latestServerState.isEmpty())
      final ServerState latestState = domain.getLatestServerState();
      if (latestState.isEmpty())
        continue;
      // Creates the new domain context
      final DomainContext newDomainCtxt = new DomainContext();
      newDomainCtxt.active = true;
@@ -749,7 +748,7 @@
      // Assign the start state for the domain
      if (isPersistent == PERSISTENT_CHANGES_ONLY)
      {
        newDomainCtxt.startState = latestServerState;
        newDomainCtxt.startState = latestState;
        startStatesFromProvidedCookie.remove(domain.getBaseDN());
      }
      else
@@ -767,10 +766,9 @@
          // what we have in the replication changelog
          if (newDomainCtxt.startState == null)
          {
            CSN latestTrimCSN =
                new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
            newDomainCtxt.startState =
                domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
                domain.getOldestState().duplicateOnlyOlderThan(
                    newDomainCtxt.domainLatestTrimDate);
          }
        }
        else
@@ -790,7 +788,7 @@
          }
        }
        newDomainCtxt.stopState = latestServerState;
        newDomainCtxt.stopState = latestState;
      }
      newDomainCtxt.currentState = new ServerState();
@@ -860,12 +858,11 @@
      ServerState cookie)
  {
    /*
    when the provided startState is older than the replication
    changelogdb startState, it means that the replication
    changelog db has been trimmed and the cookie is not valid
    anymore.
    when the provided startState is older than the replication changelogdb
    oldestState, it means that the replication changelog db has been trimmed and
    the cookie is not valid anymore.
    */
    for (CSN dbOldestChange : rsDomain.getStartState())
    for (CSN dbOldestChange : rsDomain.getOldestState())
    {
      CSN providedChange = cookie.getCSN(dbOldestChange.getServerId());
      if (providedChange != null && providedChange.isOlderThan(dbOldestChange))
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1398,20 +1398,16 @@
     */
    try
    {
      boolean dbEmpty = true;
      long oldestChangeNumber = 0;
      long newestChangeNumber = 0;
      final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
      final CNIndexRecord oldestCNRecord = cnIndexDB.getOldestRecord();
      final CNIndexRecord newestCNRecord = cnIndexDB.getNewestRecord();
      boolean noCookieForNewestCN = true;
      CSN csnForNewestCN = null;
      DN baseDNForNewestCN = null;
      if (oldestCNRecord != null)
      final CNIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
      final CNIndexRecord newestRecord = cnIndexDB.getNewestRecord();
      if (oldestRecord == null)
      {
        if (newestCNRecord == null)
        // The database is empty
        long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
        return new long[] { lastGeneratedCN, lastGeneratedCN };
      }
      if (newestRecord == null) // oldestCNRecord != null
        {
          // Edge case: DB was cleaned or closed in between calls to
          // getOldest*() and getNewest*().
@@ -1420,21 +1416,15 @@
              ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get());
        }
        dbEmpty = false;
        oldestChangeNumber = oldestCNRecord.getChangeNumber();
        newestChangeNumber = newestCNRecord.getChangeNumber();
      long oldestChangeNumber = oldestRecord.getChangeNumber();
      long newestChangeNumber = newestRecord.getChangeNumber();
        // Get the generalized state associated with the current newest change
        // number and initializes from it the startStates table
        String newestCNGenState = newestCNRecord.getPreviousCookie();
        noCookieForNewestCN =
            newestCNGenState == null || newestCNGenState.length() == 0;
      // number and initializes from the startState table
      final String cookie = newestRecord.getPreviousCookie();
      boolean noCookieForNewestCN = cookie == null || cookie.length() == 0;
        csnForNewestCN = newestCNRecord.getCSN();
        baseDNForNewestCN = newestCNRecord.getBaseDN();
      }
      long newestDate = 0;
      long newestTime = newestRecord.getCSN().getTime();
      for (ReplicationServerDomain rsDomain : getReplicationServerDomains())
      {
        if (contains(
@@ -1447,10 +1437,9 @@
        if (noCookieForNewestCN)
        {
          // Count changes of this domain from the beginning of the changelog
          CSN trimCSN = new CSN(rsDomain.getLatestDomainTrimDate(), 0, 0);
          ec = rsDomain.getEligibleCount(
              rsDomain.getStartState().duplicateOnlyOlderThan(trimCSN),
              maxOldestChangeNumber);
          final ServerState startState = rsDomain.getOldestState()
              .duplicateOnlyOlderThan(rsDomain.getLatestDomainTrimDate());
          ec = rsDomain.getEligibleCount(startState, maxOldestChangeNumber);
        }
        else
        {
@@ -1458,19 +1447,15 @@
          // BUT
          // There is nothing related to this domain in the newest CNIndexRecord
          // (may be this domain was disabled when this record was returned).
          // In that case, are counted the changes from
          // the date of the most recent change from this newest CNIndexRecord
          if (newestDate == 0)
          {
            newestDate = csnForNewestCN.getTime();
          }
          // In that case, are counted the changes from the time of the most
          // recent change
          // And count changes of this domain from the date of the
          // newest seqnum record (that does not refer to this domain)
          CSN csnx = new CSN(newestDate, csnForNewestCN.getSeqnum(), 0);
          CSN csnx = new CSN(newestTime, newestRecord.getCSN().getSeqnum(), 0);
          ec = rsDomain.getEligibleCount(csnx, maxOldestChangeNumber);
          if (baseDNForNewestCN.equals(rsDomain.getBaseDN()))
          if (newestRecord.getBaseDN().equals(rsDomain.getBaseDN()))
            ec--;
        }
@@ -1482,15 +1467,6 @@
        if (ec > 0 && oldestChangeNumber == 0)
          oldestChangeNumber = 1;
      }
      if (dbEmpty)
      {
        // The database was empty, just keep increasing numbers since last time
        // we generated one change number.
        long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
        oldestChangeNumber += lastGeneratedCN;
        newestChangeNumber += lastGeneratedCN;
      }
      return new long[] { oldestChangeNumber, newestChangeNumber };
    }
    catch (ChangelogException e)
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2568,15 +2568,15 @@
  }
  /**
   * Returns the start state of the domain, made of the oldest CSN stored for
   * each serverId.
   * Returns the oldest known state for the domain, made of the oldest CSN
   * stored for each serverId.
   * <p>
   * Note: Because the replication changelogDB trimming always keep one change
   * whatever its date, the CSN contained in the returned state can be very old.
   *
   * @return the start state of the domain.
   */
  public ServerState getStartState()
  public ServerState getOldestState()
  {
    return domainDB.getDomainOldestCSNs(baseDN);
  }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -375,14 +375,15 @@
            continue;
          }
          // Purge up to wherever the other DBs have been purged to.
          // FIXME there is an opportunity for a phantom record in the current
          // DB if the replicaDB gets purged after the next if statement.
          // FIXME there is an opportunity for a phantom record in the CNIndexDB
          // if the replicaDB gets purged after call to domain.getOldestState().
          final CSN csn = record.getCSN();
          final ServerState startState = domain.getStartState();
          final CSN fcsn = startState.getCSN(csn.getServerId());
          final ServerState oldestState = domain.getOldestState();
          final CSN fcsn = oldestState.getCSN(csn.getServerId());
          if (csn.isOlderThan(fcsn))
          {
            // This change which has already been purged from the corresponding
            // replicaDB => purge it from CNIndexDB
            cursor.delete();
            continue;
          }
@@ -397,7 +398,7 @@
            if (debugEnabled())
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
                  + csnVector + " -- StartState:" + startState);
                  + csnVector + " -- StartState:" + oldestState);
          }
          catch(Exception e)
          {
@@ -409,7 +410,7 @@
          if (csnVector == null
              || (csnVector.getCSN(csn.getServerId()) != null
                    && !csnVector.cover(startState)))
                    && !csnVector.cover(oldestState)))
          {
            cursor.delete();
            if (debugEnabled())
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -45,7 +45,6 @@
import org.opends.server.core.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.plugins.InvocationCounterPlugin;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.*;
import org.opends.server.replication.ReplicationTestCase;
@@ -115,8 +114,6 @@
  /** The LDAPStatistics object associated with the LDAP connection handler. */
  private LDAPStatistics ldapStatistics;
  private CSN gblCSN;
  private int brokerSessionTimeout = 5000;
  private int maxWindow = 100;
@@ -363,10 +360,10 @@
    ECLCompatWriteReadAllOps(1);
    // Write 4 additional changes and read ECL from a provided change number
    int ts = ECLCompatWriteReadAllOps(5);
    CSN csn = ECLCompatWriteReadAllOps(5);
    // Test request from a provided change number - read 6
    ECLCompatReadFrom(6);
    ECLCompatReadFrom(6, csn);
    // Test request from a provided change number interval - read 5-7
    ECLCompatReadFromTo(5,7);
@@ -376,7 +373,7 @@
    // Test first and last change number, add a new change, do not
    // search again the ECL, but search for first and last
    ECLCompatTestLimitsAndAdd(1,8, ts);
    ECLCompatTestLimitsAndAdd(1, 8, 4);
    // Test CNIndexDB is purged when replication change log is purged
    ECLPurgeCNIndexDBAfterChangelogClear();
@@ -392,11 +389,11 @@
  public void ECLReplicationServerFullTest16() throws Exception
  {
    // Persistent search in init + changes mode
    ECLPsearch(false, true);
    CSN csn = ECLPsearch(false, true);
    // Test Filter on replication csn
    // TODO: test with optimization when code done.
    ECLFilterOnReplicationCsn();
    ECLFilterOnReplicationCSN(csn);
  }
  private void ECLIsNotASupportedSuffix() throws Exception
@@ -499,7 +496,7 @@
      debugInfo(tn, "publishes:" + delMsg2);
      // wait for the server to take these changes into account
      sleep(500);
      Thread.sleep(500);
      // open ECL broker
      serverECL = openReplicationSession(
@@ -585,12 +582,7 @@
  /** Add an entry in the database */
  private void addEntry(Entry entry) throws Exception
  {
    AddOperation addOp = new AddOperationBasis(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
        entry.getUserAttributes(), entry.getOperationalAttributes());
    addOp.setInternalOperation(true);
    addOp.run();
    AddOperation addOp = connection.processAdd(entry);
    waitOpResult(addOp, ResultCode.SUCCESS);
    assertNotNull(getEntry(entry.getDN(), 1000, true));
  }
@@ -629,9 +621,9 @@
      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN2, 1602, replServers);
      domain2 = startNewDomain(domainConf, null,null);
      sleep(1000);
      Thread.sleep(1000);
      addEntry(createEntry(baseDN2));
      sleep(2000);
      Thread.sleep(2000);
      // Search on ECL from start on all suffixes
      String cookie = "";
@@ -698,23 +690,23 @@
      s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
          100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID);
      sleep(500);
      Thread.sleep(500);
      // Produce updates
      long time = TimeThread.getTime();
      int ts = 1;
      CSN csn = new CSN(time, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn, tn, 1);
      CSN csn1 = new CSN(time, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn1, tn, 1);
      csn = new CSN(time++, ts++, s2test2.getServerId());
      publishDeleteMsgInOTest2(s2test2, csn, tn, 2);
      CSN csn2 = new CSN(time, ts++, s2test2.getServerId());
      publishDeleteMsgInOTest2(s2test2, csn2, tn, 2);
      CSN csn3 = new CSN(time++, ts++, s2test2.getServerId());
      CSN csn3 = new CSN(time, ts++, s2test2.getServerId());
      publishDeleteMsgInOTest2(s2test2, csn3, tn, 3);
      csn = new CSN(time++, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn, tn, 4);
      sleep(1500);
      CSN csn4 = new CSN(time, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn4, tn, 4);
      Thread.sleep(1500);
      // Changes are :
      //               s1          s2
@@ -749,9 +741,9 @@
      cookie = getCookie(searchOp.getSearchEntries(), 1, tn, ldifWriter, cookie);
      // Now publishes a new change and search from the previous cookie
      CSN csn5 = new CSN(time++, ts++, s1test.getServerId());
      CSN csn5 = new CSN(time, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn5, tn, 5);
      sleep(500);
      Thread.sleep(500);
      // Changes are :
      //               s1         s2
@@ -773,30 +765,29 @@
      s2test = openReplicationSession(TEST_ROOT_DN,  1204,
          100, replicationServerPort, brokerSessionTimeout, true);
      sleep(500);
      Thread.sleep(500);
      time = TimeThread.getTime();
      csn = new CSN(time++, ts++, s1test2.getServerId());
      publishDeleteMsgInOTest2(s1test2, csn, tn, 6);
      CSN csn6 = new CSN(time, ts++, s1test2.getServerId());
      publishDeleteMsgInOTest2(s1test2, csn6, tn, 6);
      csn = new CSN(time++, ts++, s2test.getServerId());
      publishDeleteMsgInOTest(s2test, csn, tn, 7);
      CSN csn7 = new CSN(time, ts++, s2test.getServerId());
      publishDeleteMsgInOTest(s2test, csn7, tn, 7);
      CSN csn8 = new CSN(time++, ts++, s1test2.getServerId());
      CSN csn8 = new CSN(time, ts++, s1test2.getServerId());
      publishDeleteMsgInOTest2(s1test2, csn8, tn, 8);
      CSN csn9 = new CSN(time++, ts++, s2test.getServerId());
      CSN csn9 = new CSN(time, ts++, s2test.getServerId());
      publishDeleteMsgInOTest(s2test, csn9, tn, 9);
      sleep(500);
      Thread.sleep(500);
      ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN);
      assertEquals(startState.getCSN(s1test.getServerId()).getSeqnum(), 1);
      assertTrue(startState.getCSN(s2test.getServerId()) != null);
      assertEquals(startState.getCSN(s2test.getServerId()).getSeqnum(), 7);
      final ServerState oldestState = getDomainOldestState(TEST_ROOT_DN);
      assertEquals(oldestState.getCSN(s1test.getServerId()), csn1);
      assertEquals(oldestState.getCSN(s2test.getServerId()), csn7);
      startState = getReplicationDomainStartState(TEST_ROOT_DN2);
      assertEquals(startState.getCSN(s2test2.getServerId()).getSeqnum(), 2);
      assertEquals(startState.getCSN(s1test2.getServerId()).getSeqnum(), 6);
      final ServerState oldestState2 = getDomainOldestState(TEST_ROOT_DN2);
      assertEquals(oldestState2.getCSN(s2test2.getServerId()), csn2);
      assertEquals(oldestState2.getCSN(s1test2.getServerId()), csn6);
      // Test lastExternalChangelogCookie attribute of the ECL
      MultiDomainServerState expectedLastCookie =
@@ -847,9 +838,9 @@
    debugInfo(tn, "Ending test successfully");
  }
  private ServerState getReplicationDomainStartState(DN baseDN)
  private ServerState getDomainOldestState(DN baseDN)
  {
    return replicationServer.getReplicationServerDomain(baseDN).getStartState();
    return replicationServer.getReplicationServerDomain(baseDN).getOldestState();
  }
  private String getCookie(List<SearchResultEntry> entries,
@@ -1002,8 +993,8 @@
      // 5. Assert that a request with an "old" cookie - one that refers to
      //    changes that have been removed by the replication changelog trimming
      //    returns the appropriate error.
      debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState(TEST_ROOT_DN));
      debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState(TEST_ROOT_DN2));
      debugInfo(tn, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
      debugInfo(tn, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, tn, UNWILLING_TO_PERFORM);
      assertEquals(searchOp.getSearchEntries().size(), 0);
      assertTrue(searchOp.getErrorMessage().toString().startsWith(
@@ -1130,7 +1121,7 @@
      ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
      server01.publish(modDNMsg);
      debugInfo(tn, " publishes " + modDNMsg.getCSN());
      sleep(1000);
      Thread.sleep(1000);
      String cookie= "";
      InternalSearchOperation searchOp =
@@ -1340,7 +1331,7 @@
  /**
   * Test persistent search
   */
  private void ECLPsearch(boolean changesOnly, boolean compatMode) throws Exception
  private CSN ECLPsearch(boolean changesOnly, boolean compatMode) throws Exception
  {
    String tn = "ECLPsearch_" + changesOnly + "_" + compatMode;
    debugInfo(tn, "Starting test \n\n");
@@ -1360,6 +1351,7 @@
    }
    assertNotNull(ldapStatistics);
    try
    {
      // Create broker on suffix
      ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
@@ -1373,7 +1365,7 @@
            "11111111-1112-1113-1114-111111111114");
      debugInfo(tn, " publishing " + delMsg.getCSN());
      server01.publish(delMsg);
      sleep(500); // let's be sure the message is in the RS
      Thread.sleep(500); // let's be sure the message is in the RS
      // Creates cookie control
      String cookie = "";
@@ -1416,7 +1408,7 @@
      LDAPMessage message;
      message = new LDAPMessage(2, searchRequest, controls);
      w.writeMessage(message);
      sleep(500);
      Thread.sleep(500);
      SearchResultDoneProtocolOp searchResultDone;
@@ -1467,8 +1459,7 @@
         "11111111-1112-1113-1114-111111111115");
      debugInfo(tn, " publishing " + delMsg.getCSN());
      server01.publish(delMsg);
      this.gblCSN = csn;
      sleep(1000);
      Thread.sleep(1000);
      debugInfo(tn, delMsg.getCSN() +
      " published , psearch will now wait for new entries");
@@ -1502,7 +1493,7 @@
          break;
        }
      }
      sleep(1000);
      Thread.sleep(1000);
      // Check we received change 2
      for (LDAPAttribute a : searchResultEntry.getAttributes())
@@ -1581,10 +1572,16 @@
      }
      close(s);
      while (!s.isClosed()) sleep(100);
      while (!s.isClosed())
        Thread.sleep(100);
      return csn;
    }
    finally
    {
    debugInfo(tn, "Ends test successfully");
  }
  }
  private SearchRequestProtocolOp createSearchRequest(String filterString,
      final Set<String> attributes) throws LDAPException
@@ -1647,7 +1644,7 @@
            "11111111-1111-1111-1111-111111111111");
      debugInfo(tn, " publishing " + delMsg1);
      server01.publish(delMsg1);
      sleep(500); // let's be sure the message is in the RS
      Thread.sleep(500); // let's be sure the message is in the RS
      // Produce update 2
      CSN csn2 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_2);
@@ -1656,7 +1653,7 @@
            "22222222-2222-2222-2222-222222222222");
      debugInfo(tn, " publishing " + delMsg2);
      server02.publish(delMsg2);
      sleep(500); // let's be sure the message is in the RS
      Thread.sleep(500); // let's be sure the message is in the RS
      // Produce update 3
      CSN csn3 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_2);
@@ -1665,7 +1662,7 @@
            "33333333-3333-3333-3333-333333333333");
      debugInfo(tn, " publishing " + delMsg3);
      server02.publish(delMsg3);
      sleep(500); // let's be sure the message is in the RS
      Thread.sleep(500); // let's be sure the message is in the RS
      // Creates cookie control
      String cookie = "";
@@ -1723,15 +1720,15 @@
      LDAPMessage message;
      message = new LDAPMessage(2, searchRequest1, controls);
      w1.writeMessage(message);
      sleep(500);
      Thread.sleep(500);
      message = new LDAPMessage(2, searchRequest2, controls);
      w2.writeMessage(message);
      sleep(500);
      Thread.sleep(500);
      message = new LDAPMessage(2, searchRequest3, controls);
      w3.writeMessage(message);
      sleep(500);
      Thread.sleep(500);
      SearchResultEntryProtocolOp searchResultEntry = null;
      SearchResultDoneProtocolOp searchResultDone = null;
@@ -1857,7 +1854,7 @@
         "44444444-4444-4444-4444-444444444444");
      debugInfo(tn, " publishing " + delMsg11);
      server01.publish(delMsg11);
      sleep(500);
      Thread.sleep(500);
      debugInfo(tn, delMsg11.getCSN() + " published additionally ");
      // Produces additional change
@@ -1867,7 +1864,7 @@
         "55555555-5555-5555-5555-555555555555");
      debugInfo(tn, " publishing " + delMsg12 );
      server02.publish(delMsg12);
      sleep(500);
      Thread.sleep(500);
      debugInfo(tn, delMsg12.getCSN()  + " published additionally ");
      // Produces additional change
@@ -1877,7 +1874,7 @@
         "66666666-6666-6666-6666-666666666666");
      debugInfo(tn, " publishing " + delMsg13);
      server02.publish(delMsg13);
      sleep(500);
      Thread.sleep(500);
      debugInfo(tn, delMsg13.getCSN()  + " published additionally ");
      // wait 11
@@ -1910,7 +1907,7 @@
          break;
        }
      }
      sleep(1000);
      Thread.sleep(1000);
      debugInfo(tn, "Search 1 successfully receives additional changes");
      // wait 12 & 13
@@ -1943,7 +1940,7 @@
          break;
        }
      }
      sleep(1000);
      Thread.sleep(1000);
      debugInfo(tn, "Search 2 successfully receives additional changes");
      // wait 11 & 12 & 13
@@ -1976,7 +1973,7 @@
          break;
        }
      }
      sleep(1000);
      Thread.sleep(1000);
      // Check we received change 13
      for (LDAPAttribute a : searchResultEntry.getAttributes())
@@ -2010,7 +2007,7 @@
        close(s);
        while (!s.isClosed())
        {
          sleep(100);
          Thread.sleep(100);
        }
      }
    }
@@ -2082,14 +2079,6 @@
  }
  /**
   * Utility - sleeping as long as required
   */
  private void sleep(long time) throws InterruptedException
  {
    Thread.sleep(time);
  }
  /**
   * Utility - log debug message - highlight it is from the test and not
   * from the server code. Makes easier to observe the test steps.
   */
@@ -2150,6 +2139,9 @@
    }
  }
  /**
   * FIXME this test actually tests nothing: there are no asserts.
   */
  private void ChangeTimeHeartbeatTest() throws Exception
  {
    String tn = "ChangeTimeHeartbeatTest";
@@ -2170,23 +2162,23 @@
      s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
          100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID);
      sleep(500);
      Thread.sleep(500);
      // Produce updates
      long time = TimeThread.getTime();
      int ts = 1;
      CSN csn = new CSN(time, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn, tn, 1);
      CSN csn1 = new CSN(time, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn1, tn, 1);
      csn = new CSN(time++, ts++, s2test2.getServerId());
      publishDeleteMsgInOTest(s2test2, csn, tn, 2);
      CSN csn2 = new CSN(time, ts++, s2test2.getServerId());
      publishDeleteMsgInOTest(s2test2, csn2, tn, 2);
      CSN csn3 = new CSN(time++, ts++, s2test2.getServerId());
      CSN csn3 = new CSN(time, ts++, s2test2.getServerId());
      publishDeleteMsgInOTest(s2test2, csn3, tn, 3);
      csn = new CSN(time++, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn, tn, 4);
      sleep(500);
      CSN csn4 = new CSN(time, ts++, s1test.getServerId());
      publishDeleteMsgInOTest(s1test, csn4, tn, 4);
      Thread.sleep(500);
      // --
      s1test2 = openReplicationSession(TEST_ROOT_DN2,  1203,
@@ -2194,26 +2186,24 @@
      s2test = openReplicationSession(TEST_ROOT_DN,  1204,
          100, replicationServerPort, brokerSessionTimeout, true);
      sleep(500);
      Thread.sleep(500);
      // Test startState ("first cookie") of the ECL
      time = TimeThread.getTime();
      csn = new CSN(time++, ts++, s1test2.getServerId());
      publishDeleteMsgInOTest2(s1test2, csn, tn, 6);
      CSN csn6 = new CSN(time, ts++, s1test2.getServerId());
      publishDeleteMsgInOTest2(s1test2, csn6, tn, 6);
      csn = new CSN(time++, ts++, s2test.getServerId());
      publishDeleteMsgInOTest(s2test, csn, tn, 7);
      CSN csn7 = new CSN(time, ts++, s2test.getServerId());
      publishDeleteMsgInOTest(s2test, csn7, tn, 7);
      CSN csn8 = new CSN(time++, ts++, s1test2.getServerId());
      CSN csn8 = new CSN(time, ts++, s1test2.getServerId());
      publishDeleteMsgInOTest2(s1test2, csn8, tn, 8);
      CSN csn9 = new CSN(time++, ts++, s2test.getServerId());
      CSN csn9 = new CSN(time, ts++, s2test.getServerId());
      publishDeleteMsgInOTest(s2test, csn9, tn, 9);
      sleep(500);
      Thread.sleep(500);
      ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
      rsd1.getLatestServerState();
      rsd1.getChangeTimeHeartbeatState();
      debugInfo(tn, rsd1.getBaseDN()
          + " LatestServerState=" + rsd1.getLatestServerState()
          + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState()
@@ -2222,8 +2212,6 @@
      // FIXME:ECL Enable this test by adding an assert on the right value
      ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2);
      rsd2.getLatestServerState();
      rsd2.getChangeTimeHeartbeatState();
      debugInfo(tn, rsd2.getBaseDN()
          + " LatestServerState=" + rsd2.getLatestServerState()
          + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState()
@@ -2248,28 +2236,18 @@
    String tn = "ECLCompatEmpty";
    debugInfo(tn, "Starting test\n\n");
    // search on 'cn=changelog'
    String filter = "(objectclass=*)";
    debugInfo(tn, " Search: " + filter);
    InternalSearchOperation op = connection.processSearch(
        "cn=changelog",
        SearchScope.WHOLE_SUBTREE,
        filter);
    // success
    final InternalSearchOperation op = connection.processSearch(
        "cn=changelog", SearchScope.WHOLE_SUBTREE, "(objectclass=*)");
    assertEquals(op.getResultCode(), ResultCode.SUCCESS, op.getErrorMessage().toString());
    // root entry returned
    assertEquals(op.getEntriesSent(), 1);
    assertEquals(op.getEntriesSent(), 1, "The root entry should have been returned");
    debugInfo(tn, "Ending test successfully");
  }
  private int ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
  {
    String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
    debugInfo(tn, "Starting test\n\n");
    final int nbChanges = 4;
    try
    {
      LDIFWriter ldifWriter = getLDIFWriter();
@@ -2280,8 +2258,7 @@
      String user1entryUUID = "11111111-1112-1113-1114-111111111115";
      String baseUUID       = "22222222-2222-2222-2222-222222222222";
      CSN[] csns = generateCSNs(nbChanges, SERVER_ID_1);
      gblCSN = csns[1];
      CSN[] csns = generateCSNs(4, SERVER_ID_1);
      // Publish DEL
      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
@@ -2296,8 +2273,8 @@
          + "entryUUID: "+user1entryUUID+"\n";
      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
      AddMsg addMsg = new AddMsg(
          gblCSN,
          DN.decode("uid="+tn+"2," + TEST_ROOT_DN_STRING),
          csns[1],
          entry.getDN(),
          user1entryUUID,
          baseUUID,
          entry.getObjectClassAttribute(),
@@ -2324,14 +2301,14 @@
      ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
      server01.publish(modDNMsg);
      debugInfo(tn, " publishes " + modDNMsg.getCSN());
      sleep(1000);
      Thread.sleep(1000);
      String filter = "(targetdn=*"+tn.toLowerCase()+"*,o=test)";
      InternalSearchOperation searchOp = searchOnChangelog(filter, tn, SUCCESS);
      // test 4 entries returned
      assertEntries(searchOp.getSearchEntries(), firstChangeNumber, tn,
          ldifWriter, user1entryUUID, csns[0], gblCSN, csns[2], csns[3]);
          ldifWriter, user1entryUUID, csns);
      stop(server01);
@@ -2343,11 +2320,14 @@
      searchOp = searchOnChangelog(filter, tn, SUCCESS);
      assertEntries(searchOp.getSearchEntries(), firstChangeNumber, tn,
          ldifWriter, user1entryUUID, csns[0], gblCSN, csns[2], csns[3]);
      assertEquals(searchOp.getSearchEntries().size(), nbChanges);
          ldifWriter, user1entryUUID, csns);
      assertEquals(searchOp.getSearchEntries().size(), csns.length);
      return csns[1];
    }
    finally
    {
    debugInfo(tn, "Ending test with success");
    return nbChanges;
    }
  }
  private void assertEntries(List<SearchResultEntry> entries,
@@ -2409,7 +2389,7 @@
    assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
  }
  private void ECLCompatReadFrom(long firstChangeNumber) throws Exception
  private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
  {
    String tn = "ECLCompatReadFrom/" + firstChangeNumber;
    debugInfo(tn, "Starting test\n\n");
@@ -2432,10 +2412,10 @@
    // check the entry has the right content
    SearchResultEntry resultEntry = entries.get(0);
    assertTrue("changenumber=6,cn=changelog".equalsIgnoreCase(resultEntry.getDN().toNormalizedString()));
    checkValue(resultEntry, "replicationcsn", gblCSN.toString());
    checkValue(resultEntry, "replicationcsn", csn.toString());
    checkValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1));
    checkValue(resultEntry, "changetype", "add");
    checkValue(resultEntry, "changelogcookie", "o=test:" + gblCSN + ";");
    checkValue(resultEntry, "changelogcookie", "o=test:" + csn + ";");
    checkValue(resultEntry, "targetentryuuid", user1entryUUID);
    checkValue(resultEntry, "changenumber", "6");
@@ -2511,14 +2491,14 @@
  /**
   * Read the ECL in compat mode providing an unknown change number.
   */
  private void ECLFilterOnReplicationCsn() throws Exception
  private void ECLFilterOnReplicationCSN(CSN csn) throws Exception
  {
    String tn = "ECLFilterOnReplicationCsn";
    debugInfo(tn, "Starting test\n\n");
    LDIFWriter ldifWriter = getLDIFWriter();
    String filter = "(replicationcsn=" + this.gblCSN + ")";
    String filter = "(replicationcsn=" + csn + ")";
    InternalSearchOperation searchOp = searchOnChangelog(filter, tn, SUCCESS);
    assertEquals(searchOp.getSearchEntries().size(), 1);
@@ -2528,7 +2508,7 @@
    // check the DEL entry has the right content
    SearchResultEntry resultEntry = entries.get(0);
    checkValue(resultEntry, "replicationcsn", gblCSN.toString());
    checkValue(resultEntry, "replicationcsn", csn.toString());
    // TODO:ECL check values of the other attributes
    debugInfo(tn, "Ending test with success");
@@ -2619,7 +2599,7 @@
    while (!cnIndexDB.isEmpty())
    {
      debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
      sleep(200);
      Thread.sleep(200);
    }
    debugInfo(tn, "Ending test with success");
@@ -2744,7 +2724,7 @@
        csn1, user1entryUUID);
    server01.publish(delMsg);
    debugInfo(tn, " publishes " + delMsg.getCSN());
    sleep(500);
    Thread.sleep(500);
    stop(server01);
@@ -2779,7 +2759,7 @@
    DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, user1entryUUID);
    server01.publish(delMsg);
    debugInfo(tn, " publishes " + delMsg.getCSN());
    sleep(300);
    Thread.sleep(300);
    // From begin to now : 1 change
    assertEquals(rsdtest.getEligibleCount(fromStart, now()), 1);
@@ -2788,7 +2768,7 @@
    delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn2, user1entryUUID);
    server01.publish(delMsg);
    debugInfo(tn, " publishes " + delMsg.getCSN());
    sleep(300);
    Thread.sleep(300);
    // From begin to now : 2 changes
    assertEquals(rsdtest.getEligibleCount(fromStart, now()), 2);
@@ -2815,7 +2795,7 @@
    delMsg = newDeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, csn3, user1entryUUID);
    server01.publish(delMsg);
    debugInfo(tn, " publishes " + delMsg.getCSN());
    sleep(300);
    Thread.sleep(300);
    fromStateBeforeCSN2.update(csn2);
@@ -2838,7 +2818,7 @@
        delMsg = newDeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, csnx, user1entryUUID);
        server01.publish(delMsg);
      }
      sleep(1000);
      Thread.sleep(1000);
      debugInfo(tn, "Perfs test in compat - search lastChangeNumber");
      Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
      excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
@@ -2923,12 +2903,12 @@
      domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers);
      domain21 = startNewDomain(domainConf, eclInclude, eclInclude);
      sleep(1000);
      Thread.sleep(1000);
      addEntry(createEntry(TEST_ROOT_DN2));
      addEntry(createEntry(baseDN3));
      String lentry =
      Entry uentry1 = TestCaseUtils.entryFromLdifString(
          "dn: cn=Fiona Jensen," + TEST_ROOT_DN_STRING2 + "\n"
          + "objectclass: top\n"
          + "objectclass: person\n"
@@ -2937,12 +2917,10 @@
          + "cn: Fiona Jensen\n"
          + "sn: Jensen\n"
          + "uid: fiona\n"
          + "telephonenumber: 12121212";
      Entry uentry1 = TestCaseUtils.entryFromLdifString(lentry);
          + "telephonenumber: 12121212");
      addEntry(uentry1); // add fiona in o=test2
      lentry =
      Entry uentry2 = TestCaseUtils.entryFromLdifString(
          "dn: cn=Robert Hue," + baseDN3 + "\n"
          + "objectclass: top\n"
          + "objectclass: person\n"
@@ -2951,30 +2929,30 @@
          + "cn: Robert Hue\n"
          + "sn: Robby\n"
          + "uid: robert\n"
          + "telephonenumber: 131313";
      Entry uentry2 = TestCaseUtils.entryFromLdifString(lentry);
          + "telephonenumber: 131313");
      addEntry(uentry2); // add robert in o=test3
      // mod 'sn' of fiona (o=test2) with 'sn' configured as ecl-incl-att
      runModifyOperation(uentry1, createMods("sn", "newsn"));
      final ModifyOperation modOp1 = connection.processModify(
          uentry1.getDN(), createMods("sn", "newsn"));
      waitOpResult(modOp1, ResultCode.SUCCESS);
      // mod 'telephonenumber' of robert (o=test3)
      runModifyOperation(uentry2, createMods("telephonenumber", "555555"));
      final ModifyOperation modOp2 = connection.processModify(
          uentry2.getDN(), createMods("telephonenumber", "555555"));
      waitOpResult(modOp2, ResultCode.SUCCESS);
      // moddn robert (o=test3) to robert2 (o=test3)
      ModifyDNOperation modDNOp = new ModifyDNOperationBasis(connection,
          InternalClientConnection.nextOperationID(),
          InternalClientConnection.nextMessageID(),
          null,
      ModifyDNOperation modDNOp = connection.processModifyDN(
          DN.decode("cn=Robert Hue," + baseDN3),
          RDN.decode("cn=Robert Hue2"), true,
          baseDN3);
      modDNOp.run();
      waitOpResult(modDNOp, ResultCode.SUCCESS);
      // del robert (o=test3)
      runDeleteOperation("cn=Robert Hue2," + baseDN3);
      sleep(1000);
      final DeleteOperation delOp = connection.processDelete(DN.decode("cn=Robert Hue2," + baseDN3));
      waitOpResult(delOp, ResultCode.SUCCESS);
      Thread.sleep(1000);
      // Search on ECL from start on all suffixes
      String cookie = "";
@@ -2984,7 +2962,7 @@
      assertThat(entries).hasSize(8);
      debugAndWriteEntries(null, entries, tn);
      sleep(2000);
      Thread.sleep(2000);
      for (SearchResultEntry resultEntry : entries)
      {
@@ -3026,9 +3004,13 @@
    }
    finally
    {
      runDeleteOperation("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2);
      runDeleteOperation(TEST_ROOT_DN_STRING2);
      runDeleteOperation(baseDN3.toString());
      final DeleteOperation delOp1 = connection.processDelete(
          DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2));
      waitOpResult(delOp1, ResultCode.SUCCESS);
      final DeleteOperation delOp2 = connection.processDelete(TEST_ROOT_DN2);
      waitOpResult(delOp2, ResultCode.SUCCESS);
      final DeleteOperation delOp3 = connection.processDelete(baseDN3);
      waitOpResult(delOp3, ResultCode.SUCCESS);
      remove(domain21, domain2, domain3);
      removeTestBackend(backend2, backend3);
@@ -3067,25 +3049,6 @@
    return newDomain;
  }
  private void runModifyOperation(Entry entry, List<Modification> mods)
      throws Exception
  {
    final ModifyOperation operation =
        new ModifyOperationBasis(connection, 1, 1, null, entry.getDN(), mods);
    operation.run();
    waitOpResult(operation, ResultCode.SUCCESS);
  }
  private void runDeleteOperation(String dn) throws Exception
  {
    final DeleteOperation delOp = new DeleteOperationBasis(connection,
        InternalClientConnection.nextOperationID(),
        InternalClientConnection.nextMessageID(), null,
        DN.decode(dn));
    delOp.run();
    waitOpResult(delOp, ResultCode.SUCCESS);
  }
  private List<Modification> createMods(String attributeName, String valueString)
  {
    Attribute attr = Attributes.create(attributeName, valueString);
@@ -3113,7 +3076,7 @@
    while (operation.getResultCode() == ResultCode.UNDEFINED
        || operation.getResultCode() != expectedResult)
    {
      sleep(50);
      Thread.sleep(50);
      i++;
      if (i > 10)
      {