From bcd9325b7d47b6932d140a15ee761252e130ab7e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 09 Oct 2013 13:35:05 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                             |   86 +++------
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                              |   25 +-
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java            |   15 +
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java |  315 +++++++++++++++++----------------------
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                       |    6 
 opends/src/server/org/opends/server/replication/common/ServerState.java                                   |   15 +
 6 files changed, 201 insertions(+), 261 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 888a1a2..ef5bd1b 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -529,16 +529,19 @@
   }
 
   /**
-   * Build a copy of the ServerState with only CSNs older than
-   * a specific CSN. This is used when building the initial
-   * Cookie in the External Changelog, to cope with purged changes.
-   * @param csn The CSN to compare the ServerState with
+   * Build a copy of the ServerState with only CSNs older than a provided
+   * timestamp. This is used when building the initial Cookie in the External
+   * Changelog, to cope with purged changes.
+   *
+   * @param timestamp
+   *          The timestamp to compare the ServerState against
    * @return a copy of the ServerState which only contains the CSNs older than
    *         csn.
    */
-  public ServerState duplicateOnlyOlderThan(CSN csn)
+  public ServerState duplicateOnlyOlderThan(long timestamp)
   {
-    ServerState newState = new ServerState();
+    final CSN csn = new CSN(timestamp, 0, 0);
+    final ServerState newState = new ServerState();
     synchronized (serverIdToCSN)
     {
       for (CSN change : serverIdToCSN.values())
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index e72dabe..259711d 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -186,8 +186,8 @@
           .append(")")
           .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
           .append("] [startState=").append(startState)
-          .append("] [stopState=").append(stopState)
           .append("] [currentState=").append(currentState)
+          .append("] [stopState=").append(stopState)
           .append("]]");
     }
 
@@ -735,11 +735,10 @@
       }
 
       // skip unused domains
-      final ServerState latestServerState = domain.getLatestServerState();
-      if (latestServerState.isEmpty())
+      final ServerState latestState = domain.getLatestServerState();
+      if (latestState.isEmpty())
         continue;
 
-
       // Creates the new domain context
       final DomainContext newDomainCtxt = new DomainContext();
       newDomainCtxt.active = true;
@@ -749,7 +748,7 @@
       // Assign the start state for the domain
       if (isPersistent == PERSISTENT_CHANGES_ONLY)
       {
-        newDomainCtxt.startState = latestServerState;
+        newDomainCtxt.startState = latestState;
         startStatesFromProvidedCookie.remove(domain.getBaseDN());
       }
       else
@@ -767,10 +766,9 @@
           // what we have in the replication changelog
           if (newDomainCtxt.startState == null)
           {
-            CSN latestTrimCSN =
-                new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
             newDomainCtxt.startState =
-                domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
+                domain.getOldestState().duplicateOnlyOlderThan(
+                    newDomainCtxt.domainLatestTrimDate);
           }
         }
         else
@@ -790,7 +788,7 @@
           }
         }
 
-        newDomainCtxt.stopState = latestServerState;
+        newDomainCtxt.stopState = latestState;
       }
       newDomainCtxt.currentState = new ServerState();
 
@@ -860,12 +858,11 @@
       ServerState cookie)
   {
     /*
-    when the provided startState is older than the replication
-    changelogdb startState, it means that the replication
-    changelog db has been trimmed and the cookie is not valid
-    anymore.
+    when the provided startState is older than the replication changelogdb
+    oldestState, it means that the replication changelog db has been trimmed and
+    the cookie is not valid anymore.
     */
-    for (CSN dbOldestChange : rsDomain.getStartState())
+    for (CSN dbOldestChange : rsDomain.getOldestState())
     {
       CSN providedChange = cookie.getCSN(dbOldestChange.getServerId());
       if (providedChange != null && providedChange.isOlderThan(dbOldestChange))
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c0fdf78..32c7017 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1398,43 +1398,33 @@
      */
     try
     {
-      boolean dbEmpty = true;
-      long oldestChangeNumber = 0;
-      long newestChangeNumber = 0;
-
       final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
-      final CNIndexRecord oldestCNRecord = cnIndexDB.getOldestRecord();
-      final CNIndexRecord newestCNRecord = cnIndexDB.getNewestRecord();
-
-      boolean noCookieForNewestCN = true;
-      CSN csnForNewestCN = null;
-      DN baseDNForNewestCN = null;
-      if (oldestCNRecord != null)
+      final CNIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
+      final CNIndexRecord newestRecord = cnIndexDB.getNewestRecord();
+      if (oldestRecord == null)
       {
-        if (newestCNRecord == null)
-        {
-          // Edge case: DB was cleaned or closed in between calls to
-          // getOldest*() and getNewest*().
-          // The only remaining solution is to fail fast.
-          throw new ChangelogException(
-              ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get());
-        }
-
-        dbEmpty = false;
-        oldestChangeNumber = oldestCNRecord.getChangeNumber();
-        newestChangeNumber = newestCNRecord.getChangeNumber();
-
-        // Get the generalized state associated with the current newest change
-        // number and initializes from it the startStates table
-        String newestCNGenState = newestCNRecord.getPreviousCookie();
-        noCookieForNewestCN =
-            newestCNGenState == null || newestCNGenState.length() == 0;
-
-        csnForNewestCN = newestCNRecord.getCSN();
-        baseDNForNewestCN = newestCNRecord.getBaseDN();
+        // The database is empty
+        long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
+        return new long[] { lastGeneratedCN, lastGeneratedCN };
+      }
+      if (newestRecord == null) // oldestCNRecord != null
+      {
+        // Edge case: DB was cleaned or closed in between calls to
+        // getOldest*() and getNewest*().
+        // The only remaining solution is to fail fast.
+        throw new ChangelogException(
+            ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get());
       }
 
-      long newestDate = 0;
+      long oldestChangeNumber = oldestRecord.getChangeNumber();
+      long newestChangeNumber = newestRecord.getChangeNumber();
+
+      // Get the generalized state associated with the current newest change
+      // number and initializes from the startState table
+      final String cookie = newestRecord.getPreviousCookie();
+      boolean noCookieForNewestCN = cookie == null || cookie.length() == 0;
+
+      long newestTime = newestRecord.getCSN().getTime();
       for (ReplicationServerDomain rsDomain : getReplicationServerDomains())
       {
         if (contains(
@@ -1447,30 +1437,25 @@
         if (noCookieForNewestCN)
         {
           // Count changes of this domain from the beginning of the changelog
-          CSN trimCSN = new CSN(rsDomain.getLatestDomainTrimDate(), 0, 0);
-          ec = rsDomain.getEligibleCount(
-              rsDomain.getStartState().duplicateOnlyOlderThan(trimCSN),
-              maxOldestChangeNumber);
+          final ServerState startState = rsDomain.getOldestState()
+              .duplicateOnlyOlderThan(rsDomain.getLatestDomainTrimDate());
+          ec = rsDomain.getEligibleCount(startState, maxOldestChangeNumber);
         }
         else
         {
           // There are records in the CNIndexDB (so already returned to clients)
           // BUT
           // There is nothing related to this domain in the newest CNIndexRecord
-          // (may be this domain was disabled when this record was returned).
-          // In that case, are counted the changes from
-          // the date of the most recent change from this newest CNIndexRecord
-          if (newestDate == 0)
-          {
-            newestDate = csnForNewestCN.getTime();
-          }
+          // (maybe this domain was disabled when this record was returned).
+          // In that case, are counted the changes from the time of the most
+          // recent change
 
           // And count changes of this domain from the date of the
           // newest seqnum record (that does not refer to this domain)
-          CSN csnx = new CSN(newestDate, csnForNewestCN.getSeqnum(), 0);
+          CSN csnx = new CSN(newestTime, newestRecord.getCSN().getSeqnum(), 0);
           ec = rsDomain.getEligibleCount(csnx, maxOldestChangeNumber);
 
-          if (baseDNForNewestCN.equals(rsDomain.getBaseDN()))
+          if (newestRecord.getBaseDN().equals(rsDomain.getBaseDN()))
             ec--;
         }
 
@@ -1482,15 +1467,6 @@
         if (ec > 0 && oldestChangeNumber == 0)
           oldestChangeNumber = 1;
       }
-
-      if (dbEmpty)
-      {
-        // The database was empty, just keep increasing numbers since last time
-        // we generated one change number.
-        long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
-        oldestChangeNumber += lastGeneratedCN;
-        newestChangeNumber += lastGeneratedCN;
-      }
       return new long[] { oldestChangeNumber, newestChangeNumber };
     }
     catch (ChangelogException e)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index a3a9043..bf5d0ee 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2568,15 +2568,15 @@
   }
 
   /**
-   * Returns the start state of the domain, made of the oldest CSN stored for
-   * each serverId.
+   * Returns the oldest known state for the domain, made of the oldest CSN
+   * stored for each serverId.
    * <p>
    * Note: Because the replication changelogDB trimming always keep one change
    * whatever its date, the CSN contained in the returned state can be very old.
    *
    * @return the start state of the domain.
    */
-  public ServerState getStartState()
+  public ServerState getOldestState()
   {
     return domainDB.getDomainOldestCSNs(baseDN);
   }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index d2d7795..f246160 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -375,14 +375,15 @@
             continue;
           }
 
-          // Purge up to wherever the other DBs have been purged to.
-          // FIXME there is an opportunity for a phantom record in the current
-          // DB if the replicaDB gets purged after the next if statement.
+          // FIXME there is an opportunity for a phantom record in the CNIndexDB
+          // if the replicaDB gets purged after call to domain.getOldestState().
           final CSN csn = record.getCSN();
-          final ServerState startState = domain.getStartState();
-          final CSN fcsn = startState.getCSN(csn.getServerId());
+          final ServerState oldestState = domain.getOldestState();
+          final CSN fcsn = oldestState.getCSN(csn.getServerId());
           if (csn.isOlderThan(fcsn))
           {
+            // This change which has already been purged from the corresponding
+            // replicaDB => purge it from CNIndexDB
             cursor.delete();
             continue;
           }
@@ -397,7 +398,7 @@
 
             if (debugEnabled())
               TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
-                  + csnVector + " -- StartState:" + startState);
+                  + csnVector + " -- StartState:" + oldestState);
           }
           catch(Exception e)
           {
@@ -409,7 +410,7 @@
 
           if (csnVector == null
               || (csnVector.getCSN(csn.getServerId()) != null
-                    && !csnVector.cover(startState)))
+                    && !csnVector.cover(oldestState)))
           {
             cursor.delete();
             if (debugEnabled())
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 0f7e6ea..9f1ab7a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -45,7 +45,6 @@
 import org.opends.server.core.*;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.plugins.InvocationCounterPlugin;
-import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.*;
 import org.opends.server.replication.ReplicationTestCase;
@@ -115,8 +114,6 @@
   /** The LDAPStatistics object associated with the LDAP connection handler. */
   private LDAPStatistics ldapStatistics;
 
-  private CSN gblCSN;
-
   private int brokerSessionTimeout = 5000;
   private int maxWindow = 100;
 
@@ -363,10 +360,10 @@
     ECLCompatWriteReadAllOps(1);
 
     // Write 4 additional changes and read ECL from a provided change number
-    int ts = ECLCompatWriteReadAllOps(5);
+    CSN csn = ECLCompatWriteReadAllOps(5);
 
     // Test request from a provided change number - read 6
-    ECLCompatReadFrom(6);
+    ECLCompatReadFrom(6, csn);
 
     // Test request from a provided change number interval - read 5-7
     ECLCompatReadFromTo(5,7);
@@ -376,7 +373,7 @@
 
     // Test first and last change number, add a new change, do not
     // search again the ECL, but search for first and last
-    ECLCompatTestLimitsAndAdd(1,8, ts);
+    ECLCompatTestLimitsAndAdd(1, 8, 4);
 
     // Test CNIndexDB is purged when replication change log is purged
     ECLPurgeCNIndexDBAfterChangelogClear();
@@ -392,11 +389,11 @@
   public void ECLReplicationServerFullTest16() throws Exception
   {
     // Persistent search in init + changes mode
-    ECLPsearch(false, true);
+    CSN csn = ECLPsearch(false, true);
 
     // Test Filter on replication csn
     // TODO: test with optimization when code done.
-    ECLFilterOnReplicationCsn();
+    ECLFilterOnReplicationCSN(csn);
   }
 
   private void ECLIsNotASupportedSuffix() throws Exception
@@ -499,7 +496,7 @@
       debugInfo(tn, "publishes:" + delMsg2);
 
       // wait for the server to take these changes into account
-      sleep(500);
+      Thread.sleep(500);
 
       // open ECL broker
       serverECL = openReplicationSession(
@@ -585,12 +582,7 @@
   /** Add an entry in the database */
   private void addEntry(Entry entry) throws Exception
   {
-    AddOperation addOp = new AddOperationBasis(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-        .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
-        entry.getUserAttributes(), entry.getOperationalAttributes());
-    addOp.setInternalOperation(true);
-    addOp.run();
+    AddOperation addOp = connection.processAdd(entry);
     waitOpResult(addOp, ResultCode.SUCCESS);
     assertNotNull(getEntry(entry.getDN(), 1000, true));
   }
@@ -629,9 +621,9 @@
       DomainFakeCfg domainConf = new DomainFakeCfg(baseDN2, 1602, replServers);
       domain2 = startNewDomain(domainConf, null,null);
 
-      sleep(1000);
+      Thread.sleep(1000);
       addEntry(createEntry(baseDN2));
-      sleep(2000);
+      Thread.sleep(2000);
 
       // Search on ECL from start on all suffixes
       String cookie = "";
@@ -698,23 +690,23 @@
 
       s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
           100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID);
-      sleep(500);
+      Thread.sleep(500);
 
       // Produce updates
       long time = TimeThread.getTime();
       int ts = 1;
-      CSN csn = new CSN(time, ts++, s1test.getServerId());
-      publishDeleteMsgInOTest(s1test, csn, tn, 1);
+      CSN csn1 = new CSN(time, ts++, s1test.getServerId());
+      publishDeleteMsgInOTest(s1test, csn1, tn, 1);
 
-      csn = new CSN(time++, ts++, s2test2.getServerId());
-      publishDeleteMsgInOTest2(s2test2, csn, tn, 2);
+      CSN csn2 = new CSN(time, ts++, s2test2.getServerId());
+      publishDeleteMsgInOTest2(s2test2, csn2, tn, 2);
 
-      CSN csn3 = new CSN(time++, ts++, s2test2.getServerId());
+      CSN csn3 = new CSN(time, ts++, s2test2.getServerId());
       publishDeleteMsgInOTest2(s2test2, csn3, tn, 3);
 
-      csn = new CSN(time++, ts++, s1test.getServerId());
-      publishDeleteMsgInOTest(s1test, csn, tn, 4);
-      sleep(1500);
+      CSN csn4 = new CSN(time, ts++, s1test.getServerId());
+      publishDeleteMsgInOTest(s1test, csn4, tn, 4);
+      Thread.sleep(1500);
 
       // Changes are :
       //               s1          s2
@@ -749,9 +741,9 @@
       cookie = getCookie(searchOp.getSearchEntries(), 1, tn, ldifWriter, cookie);
 
       // Now publishes a new change and search from the previous cookie
-      CSN csn5 = new CSN(time++, ts++, s1test.getServerId());
+      CSN csn5 = new CSN(time, ts++, s1test.getServerId());
       publishDeleteMsgInOTest(s1test, csn5, tn, 5);
-      sleep(500);
+      Thread.sleep(500);
 
       // Changes are :
       //               s1         s2
@@ -773,30 +765,29 @@
 
       s2test = openReplicationSession(TEST_ROOT_DN,  1204,
           100, replicationServerPort, brokerSessionTimeout, true);
-      sleep(500);
+      Thread.sleep(500);
 
       time = TimeThread.getTime();
-      csn = new CSN(time++, ts++, s1test2.getServerId());
-      publishDeleteMsgInOTest2(s1test2, csn, tn, 6);
+      CSN csn6 = new CSN(time, ts++, s1test2.getServerId());
+      publishDeleteMsgInOTest2(s1test2, csn6, tn, 6);
 
-      csn = new CSN(time++, ts++, s2test.getServerId());
-      publishDeleteMsgInOTest(s2test, csn, tn, 7);
+      CSN csn7 = new CSN(time, ts++, s2test.getServerId());
+      publishDeleteMsgInOTest(s2test, csn7, tn, 7);
 
-      CSN csn8 = new CSN(time++, ts++, s1test2.getServerId());
+      CSN csn8 = new CSN(time, ts++, s1test2.getServerId());
       publishDeleteMsgInOTest2(s1test2, csn8, tn, 8);
 
-      CSN csn9 = new CSN(time++, ts++, s2test.getServerId());
+      CSN csn9 = new CSN(time, ts++, s2test.getServerId());
       publishDeleteMsgInOTest(s2test, csn9, tn, 9);
-      sleep(500);
+      Thread.sleep(500);
 
-      ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN);
-      assertEquals(startState.getCSN(s1test.getServerId()).getSeqnum(), 1);
-      assertTrue(startState.getCSN(s2test.getServerId()) != null);
-      assertEquals(startState.getCSN(s2test.getServerId()).getSeqnum(), 7);
+      final ServerState oldestState = getDomainOldestState(TEST_ROOT_DN);
+      assertEquals(oldestState.getCSN(s1test.getServerId()), csn1);
+      assertEquals(oldestState.getCSN(s2test.getServerId()), csn7);
 
-      startState = getReplicationDomainStartState(TEST_ROOT_DN2);
-      assertEquals(startState.getCSN(s2test2.getServerId()).getSeqnum(), 2);
-      assertEquals(startState.getCSN(s1test2.getServerId()).getSeqnum(), 6);
+      final ServerState oldestState2 = getDomainOldestState(TEST_ROOT_DN2);
+      assertEquals(oldestState2.getCSN(s2test2.getServerId()), csn2);
+      assertEquals(oldestState2.getCSN(s1test2.getServerId()), csn6);
 
       // Test lastExternalChangelogCookie attribute of the ECL
       MultiDomainServerState expectedLastCookie =
@@ -847,9 +838,9 @@
     debugInfo(tn, "Ending test successfully");
   }
 
-  private ServerState getReplicationDomainStartState(DN baseDN)
+  private ServerState getDomainOldestState(DN baseDN)
   {
-    return replicationServer.getReplicationServerDomain(baseDN).getStartState();
+    return replicationServer.getReplicationServerDomain(baseDN).getOldestState();
   }
 
   private String getCookie(List<SearchResultEntry> entries,
@@ -1002,8 +993,8 @@
       // 5. Assert that a request with an "old" cookie - one that refers to
       //    changes that have been removed by the replication changelog trimming
       //    returns the appropriate error.
-      debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState(TEST_ROOT_DN));
-      debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState(TEST_ROOT_DN2));
+      debugInfo(tn, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
+      debugInfo(tn, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
       searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, tn, UNWILLING_TO_PERFORM);
       assertEquals(searchOp.getSearchEntries().size(), 0);
       assertTrue(searchOp.getErrorMessage().toString().startsWith(
@@ -1130,7 +1121,7 @@
       ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
       server01.publish(modDNMsg);
       debugInfo(tn, " publishes " + modDNMsg.getCSN());
-      sleep(1000);
+      Thread.sleep(1000);
 
       String cookie= "";
       InternalSearchOperation searchOp =
@@ -1340,7 +1331,7 @@
   /**
    * Test persistent search
    */
-  private void ECLPsearch(boolean changesOnly, boolean compatMode) throws Exception
+  private CSN ECLPsearch(boolean changesOnly, boolean compatMode) throws Exception
   {
     String tn = "ECLPsearch_" + changesOnly + "_" + compatMode;
     debugInfo(tn, "Starting test \n\n");
@@ -1360,6 +1351,7 @@
     }
     assertNotNull(ldapStatistics);
 
+    try
     {
       // Create broker on suffix
       ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
@@ -1373,7 +1365,7 @@
             "11111111-1112-1113-1114-111111111114");
       debugInfo(tn, " publishing " + delMsg.getCSN());
       server01.publish(delMsg);
-      sleep(500); // let's be sure the message is in the RS
+      Thread.sleep(500); // let's be sure the message is in the RS
 
       // Creates cookie control
       String cookie = "";
@@ -1416,7 +1408,7 @@
       LDAPMessage message;
       message = new LDAPMessage(2, searchRequest, controls);
       w.writeMessage(message);
-      sleep(500);
+      Thread.sleep(500);
 
       SearchResultDoneProtocolOp searchResultDone;
 
@@ -1467,8 +1459,7 @@
          "11111111-1112-1113-1114-111111111115");
       debugInfo(tn, " publishing " + delMsg.getCSN());
       server01.publish(delMsg);
-      this.gblCSN = csn;
-      sleep(1000);
+      Thread.sleep(1000);
 
       debugInfo(tn, delMsg.getCSN() +
       " published , psearch will now wait for new entries");
@@ -1502,7 +1493,7 @@
           break;
         }
       }
-      sleep(1000);
+      Thread.sleep(1000);
 
       // Check we received change 2
       for (LDAPAttribute a : searchResultEntry.getAttributes())
@@ -1581,9 +1572,15 @@
       }
 
       close(s);
-      while (!s.isClosed()) sleep(100);
+      while (!s.isClosed())
+        Thread.sleep(100);
+
+      return csn;
     }
-    debugInfo(tn, "Ends test successfully");
+    finally
+    {
+      debugInfo(tn, "Ends test successfully");
+    }
   }
 
   private SearchRequestProtocolOp createSearchRequest(String filterString,
@@ -1647,7 +1644,7 @@
             "11111111-1111-1111-1111-111111111111");
       debugInfo(tn, " publishing " + delMsg1);
       server01.publish(delMsg1);
-      sleep(500); // let's be sure the message is in the RS
+      Thread.sleep(500); // let's be sure the message is in the RS
 
       // Produce update 2
       CSN csn2 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_2);
@@ -1656,7 +1653,7 @@
             "22222222-2222-2222-2222-222222222222");
       debugInfo(tn, " publishing " + delMsg2);
       server02.publish(delMsg2);
-      sleep(500); // let's be sure the message is in the RS
+      Thread.sleep(500); // let's be sure the message is in the RS
 
       // Produce update 3
       CSN csn3 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_2);
@@ -1665,7 +1662,7 @@
             "33333333-3333-3333-3333-333333333333");
       debugInfo(tn, " publishing " + delMsg3);
       server02.publish(delMsg3);
-      sleep(500); // let's be sure the message is in the RS
+      Thread.sleep(500); // let's be sure the message is in the RS
 
       // Creates cookie control
       String cookie = "";
@@ -1723,15 +1720,15 @@
       LDAPMessage message;
       message = new LDAPMessage(2, searchRequest1, controls);
       w1.writeMessage(message);
-      sleep(500);
+      Thread.sleep(500);
 
       message = new LDAPMessage(2, searchRequest2, controls);
       w2.writeMessage(message);
-      sleep(500);
+      Thread.sleep(500);
 
       message = new LDAPMessage(2, searchRequest3, controls);
       w3.writeMessage(message);
-      sleep(500);
+      Thread.sleep(500);
 
       SearchResultEntryProtocolOp searchResultEntry = null;
       SearchResultDoneProtocolOp searchResultDone = null;
@@ -1857,7 +1854,7 @@
          "44444444-4444-4444-4444-444444444444");
       debugInfo(tn, " publishing " + delMsg11);
       server01.publish(delMsg11);
-      sleep(500);
+      Thread.sleep(500);
       debugInfo(tn, delMsg11.getCSN() + " published additionally ");
 
       // Produces additional change
@@ -1867,7 +1864,7 @@
          "55555555-5555-5555-5555-555555555555");
       debugInfo(tn, " publishing " + delMsg12 );
       server02.publish(delMsg12);
-      sleep(500);
+      Thread.sleep(500);
       debugInfo(tn, delMsg12.getCSN()  + " published additionally ");
 
       // Produces additional change
@@ -1877,7 +1874,7 @@
          "66666666-6666-6666-6666-666666666666");
       debugInfo(tn, " publishing " + delMsg13);
       server02.publish(delMsg13);
-      sleep(500);
+      Thread.sleep(500);
       debugInfo(tn, delMsg13.getCSN()  + " published additionally ");
 
       // wait 11
@@ -1910,7 +1907,7 @@
           break;
         }
       }
-      sleep(1000);
+      Thread.sleep(1000);
       debugInfo(tn, "Search 1 successfully receives additional changes");
 
       // wait 12 & 13
@@ -1943,7 +1940,7 @@
           break;
         }
       }
-      sleep(1000);
+      Thread.sleep(1000);
       debugInfo(tn, "Search 2 successfully receives additional changes");
 
       // wait 11 & 12 & 13
@@ -1976,7 +1973,7 @@
           break;
         }
       }
-      sleep(1000);
+      Thread.sleep(1000);
 
       // Check we received change 13
       for (LDAPAttribute a : searchResultEntry.getAttributes())
@@ -2010,7 +2007,7 @@
         close(s);
         while (!s.isClosed())
         {
-          sleep(100);
+          Thread.sleep(100);
         }
       }
     }
@@ -2082,14 +2079,6 @@
   }
 
   /**
-   * Utility - sleeping as long as required
-   */
-  private void sleep(long time) throws InterruptedException
-  {
-    Thread.sleep(time);
-  }
-
-  /**
    * Utility - log debug message - highlight it is from the test and not
    * from the server code. Makes easier to observe the test steps.
    */
@@ -2150,6 +2139,9 @@
     }
   }
 
+  /**
+   * FIXME this test actually tests nothing: there are no asserts.
+   */
   private void ChangeTimeHeartbeatTest() throws Exception
   {
     String tn = "ChangeTimeHeartbeatTest";
@@ -2170,23 +2162,23 @@
 
       s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
           100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID);
-      sleep(500);
+      Thread.sleep(500);
 
       // Produce updates
       long time = TimeThread.getTime();
       int ts = 1;
-      CSN csn = new CSN(time, ts++, s1test.getServerId());
-      publishDeleteMsgInOTest(s1test, csn, tn, 1);
+      CSN csn1 = new CSN(time, ts++, s1test.getServerId());
+      publishDeleteMsgInOTest(s1test, csn1, tn, 1);
 
-      csn = new CSN(time++, ts++, s2test2.getServerId());
-      publishDeleteMsgInOTest(s2test2, csn, tn, 2);
+      CSN csn2 = new CSN(time, ts++, s2test2.getServerId());
+      publishDeleteMsgInOTest(s2test2, csn2, tn, 2);
 
-      CSN csn3 = new CSN(time++, ts++, s2test2.getServerId());
+      CSN csn3 = new CSN(time, ts++, s2test2.getServerId());
       publishDeleteMsgInOTest(s2test2, csn3, tn, 3);
 
-      csn = new CSN(time++, ts++, s1test.getServerId());
-      publishDeleteMsgInOTest(s1test, csn, tn, 4);
-      sleep(500);
+      CSN csn4 = new CSN(time, ts++, s1test.getServerId());
+      publishDeleteMsgInOTest(s1test, csn4, tn, 4);
+      Thread.sleep(500);
 
       // --
       s1test2 = openReplicationSession(TEST_ROOT_DN2,  1203,
@@ -2194,26 +2186,24 @@
 
       s2test = openReplicationSession(TEST_ROOT_DN,  1204,
           100, replicationServerPort, brokerSessionTimeout, true);
-      sleep(500);
+      Thread.sleep(500);
 
       // Test startState ("first cookie") of the ECL
       time = TimeThread.getTime();
-      csn = new CSN(time++, ts++, s1test2.getServerId());
-      publishDeleteMsgInOTest2(s1test2, csn, tn, 6);
+      CSN csn6 = new CSN(time, ts++, s1test2.getServerId());
+      publishDeleteMsgInOTest2(s1test2, csn6, tn, 6);
 
-      csn = new CSN(time++, ts++, s2test.getServerId());
-      publishDeleteMsgInOTest(s2test, csn, tn, 7);
+      CSN csn7 = new CSN(time, ts++, s2test.getServerId());
+      publishDeleteMsgInOTest(s2test, csn7, tn, 7);
 
-      CSN csn8 = new CSN(time++, ts++, s1test2.getServerId());
+      CSN csn8 = new CSN(time, ts++, s1test2.getServerId());
       publishDeleteMsgInOTest2(s1test2, csn8, tn, 8);
 
-      CSN csn9 = new CSN(time++, ts++, s2test.getServerId());
+      CSN csn9 = new CSN(time, ts++, s2test.getServerId());
       publishDeleteMsgInOTest(s2test, csn9, tn, 9);
-      sleep(500);
+      Thread.sleep(500);
 
       ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
-      rsd1.getLatestServerState();
-      rsd1.getChangeTimeHeartbeatState();
       debugInfo(tn, rsd1.getBaseDN()
           + " LatestServerState=" + rsd1.getLatestServerState()
           + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState()
@@ -2222,8 +2212,6 @@
       // FIXME:ECL Enable this test by adding an assert on the right value
 
       ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2);
-      rsd2.getLatestServerState();
-      rsd2.getChangeTimeHeartbeatState();
       debugInfo(tn, rsd2.getBaseDN()
           + " LatestServerState=" + rsd2.getLatestServerState()
           + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState()
@@ -2248,28 +2236,18 @@
     String tn = "ECLCompatEmpty";
     debugInfo(tn, "Starting test\n\n");
 
-    // search on 'cn=changelog'
-    String filter = "(objectclass=*)";
-    debugInfo(tn, " Search: " + filter);
-    InternalSearchOperation op = connection.processSearch(
-        "cn=changelog",
-        SearchScope.WHOLE_SUBTREE,
-        filter);
-
-    // success
+    final InternalSearchOperation op = connection.processSearch(
+        "cn=changelog", SearchScope.WHOLE_SUBTREE, "(objectclass=*)");
     assertEquals(op.getResultCode(), ResultCode.SUCCESS, op.getErrorMessage().toString());
-
-    // root entry returned
-    assertEquals(op.getEntriesSent(), 1);
+    assertEquals(op.getEntriesSent(), 1, "The root entry should have been returned");
     debugInfo(tn, "Ending test successfully");
   }
 
-  private int ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
+  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
   {
     String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
     debugInfo(tn, "Starting test\n\n");
-    final int nbChanges = 4;
-
+    try
     {
       LDIFWriter ldifWriter = getLDIFWriter();
 
@@ -2280,8 +2258,7 @@
       String user1entryUUID = "11111111-1112-1113-1114-111111111115";
       String baseUUID       = "22222222-2222-2222-2222-222222222222";
 
-      CSN[] csns = generateCSNs(nbChanges, SERVER_ID_1);
-      gblCSN = csns[1];
+      CSN[] csns = generateCSNs(4, SERVER_ID_1);
 
       // Publish DEL
       DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
@@ -2296,8 +2273,8 @@
           + "entryUUID: "+user1entryUUID+"\n";
       Entry entry = TestCaseUtils.entryFromLdifString(lentry);
       AddMsg addMsg = new AddMsg(
-          gblCSN,
-          DN.decode("uid="+tn+"2," + TEST_ROOT_DN_STRING),
+          csns[1],
+          entry.getDN(),
           user1entryUUID,
           baseUUID,
           entry.getObjectClassAttribute(),
@@ -2324,14 +2301,14 @@
       ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
       server01.publish(modDNMsg);
       debugInfo(tn, " publishes " + modDNMsg.getCSN());
-      sleep(1000);
+      Thread.sleep(1000);
 
       String filter = "(targetdn=*"+tn.toLowerCase()+"*,o=test)";
       InternalSearchOperation searchOp = searchOnChangelog(filter, tn, SUCCESS);
 
       // test 4 entries returned
       assertEntries(searchOp.getSearchEntries(), firstChangeNumber, tn,
-          ldifWriter, user1entryUUID, csns[0], gblCSN, csns[2], csns[3]);
+          ldifWriter, user1entryUUID, csns);
 
       stop(server01);
 
@@ -2343,11 +2320,14 @@
       searchOp = searchOnChangelog(filter, tn, SUCCESS);
 
       assertEntries(searchOp.getSearchEntries(), firstChangeNumber, tn,
-          ldifWriter, user1entryUUID, csns[0], gblCSN, csns[2], csns[3]);
-      assertEquals(searchOp.getSearchEntries().size(), nbChanges);
+          ldifWriter, user1entryUUID, csns);
+      assertEquals(searchOp.getSearchEntries().size(), csns.length);
+      return csns[1];
     }
-    debugInfo(tn, "Ending test with success");
-    return nbChanges;
+    finally
+    {
+      debugInfo(tn, "Ending test with success");
+    }
   }
 
   private void assertEntries(List<SearchResultEntry> entries,
@@ -2409,7 +2389,7 @@
     assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
   }
 
-  private void ECLCompatReadFrom(long firstChangeNumber) throws Exception
+  private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
   {
     String tn = "ECLCompatReadFrom/" + firstChangeNumber;
     debugInfo(tn, "Starting test\n\n");
@@ -2432,10 +2412,10 @@
     // check the entry has the right content
     SearchResultEntry resultEntry = entries.get(0);
     assertTrue("changenumber=6,cn=changelog".equalsIgnoreCase(resultEntry.getDN().toNormalizedString()));
-    checkValue(resultEntry, "replicationcsn", gblCSN.toString());
+    checkValue(resultEntry, "replicationcsn", csn.toString());
     checkValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1));
     checkValue(resultEntry, "changetype", "add");
-    checkValue(resultEntry, "changelogcookie", "o=test:" + gblCSN + ";");
+    checkValue(resultEntry, "changelogcookie", "o=test:" + csn + ";");
     checkValue(resultEntry, "targetentryuuid", user1entryUUID);
     checkValue(resultEntry, "changenumber", "6");
 
@@ -2511,14 +2491,14 @@
   /**
    * Read the ECL in compat mode providing an unknown change number.
    */
-  private void ECLFilterOnReplicationCsn() throws Exception
+  private void ECLFilterOnReplicationCSN(CSN csn) throws Exception
   {
     String tn = "ECLFilterOnReplicationCsn";
     debugInfo(tn, "Starting test\n\n");
 
     LDIFWriter ldifWriter = getLDIFWriter();
 
-    String filter = "(replicationcsn=" + this.gblCSN + ")";
+    String filter = "(replicationcsn=" + csn + ")";
     InternalSearchOperation searchOp = searchOnChangelog(filter, tn, SUCCESS);
     assertEquals(searchOp.getSearchEntries().size(), 1);
 
@@ -2528,7 +2508,7 @@
 
     // check the DEL entry has the right content
     SearchResultEntry resultEntry = entries.get(0);
-    checkValue(resultEntry, "replicationcsn", gblCSN.toString());
+    checkValue(resultEntry, "replicationcsn", csn.toString());
     // TODO:ECL check values of the other attributes
 
     debugInfo(tn, "Ending test with success");
@@ -2619,7 +2599,7 @@
     while (!cnIndexDB.isEmpty())
     {
       debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
-      sleep(200);
+      Thread.sleep(200);
     }
 
     debugInfo(tn, "Ending test with success");
@@ -2744,7 +2724,7 @@
         csn1, user1entryUUID);
     server01.publish(delMsg);
     debugInfo(tn, " publishes " + delMsg.getCSN());
-    sleep(500);
+    Thread.sleep(500);
 
     stop(server01);
 
@@ -2779,7 +2759,7 @@
     DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, user1entryUUID);
     server01.publish(delMsg);
     debugInfo(tn, " publishes " + delMsg.getCSN());
-    sleep(300);
+    Thread.sleep(300);
 
     // From begin to now : 1 change
     assertEquals(rsdtest.getEligibleCount(fromStart, now()), 1);
@@ -2788,7 +2768,7 @@
     delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn2, user1entryUUID);
     server01.publish(delMsg);
     debugInfo(tn, " publishes " + delMsg.getCSN());
-    sleep(300);
+    Thread.sleep(300);
 
     // From begin to now : 2 changes
     assertEquals(rsdtest.getEligibleCount(fromStart, now()), 2);
@@ -2815,7 +2795,7 @@
     delMsg = newDeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, csn3, user1entryUUID);
     server01.publish(delMsg);
     debugInfo(tn, " publishes " + delMsg.getCSN());
-    sleep(300);
+    Thread.sleep(300);
 
     fromStateBeforeCSN2.update(csn2);
 
@@ -2838,7 +2818,7 @@
         delMsg = newDeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, csnx, user1entryUUID);
         server01.publish(delMsg);
       }
-      sleep(1000);
+      Thread.sleep(1000);
       debugInfo(tn, "Perfs test in compat - search lastChangeNumber");
       Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
       excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
@@ -2923,12 +2903,12 @@
       domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers);
       domain21 = startNewDomain(domainConf, eclInclude, eclInclude);
 
-      sleep(1000);
+      Thread.sleep(1000);
 
       addEntry(createEntry(TEST_ROOT_DN2));
       addEntry(createEntry(baseDN3));
 
-      String lentry =
+      Entry uentry1 = TestCaseUtils.entryFromLdifString(
           "dn: cn=Fiona Jensen," + TEST_ROOT_DN_STRING2 + "\n"
           + "objectclass: top\n"
           + "objectclass: person\n"
@@ -2937,12 +2917,10 @@
           + "cn: Fiona Jensen\n"
           + "sn: Jensen\n"
           + "uid: fiona\n"
-          + "telephonenumber: 12121212";
-
-      Entry uentry1 = TestCaseUtils.entryFromLdifString(lentry);
+          + "telephonenumber: 12121212");
       addEntry(uentry1); // add fiona in o=test2
 
-      lentry =
+      Entry uentry2 = TestCaseUtils.entryFromLdifString(
           "dn: cn=Robert Hue," + baseDN3 + "\n"
           + "objectclass: top\n"
           + "objectclass: person\n"
@@ -2951,30 +2929,30 @@
           + "cn: Robert Hue\n"
           + "sn: Robby\n"
           + "uid: robert\n"
-          + "telephonenumber: 131313";
-      Entry uentry2 = TestCaseUtils.entryFromLdifString(lentry);
+          + "telephonenumber: 131313");
       addEntry(uentry2); // add robert in o=test3
 
       // mod 'sn' of fiona (o=test2) with 'sn' configured as ecl-incl-att
-      runModifyOperation(uentry1, createMods("sn", "newsn"));
+      final ModifyOperation modOp1 = connection.processModify(
+          uentry1.getDN(), createMods("sn", "newsn"));
+      waitOpResult(modOp1, ResultCode.SUCCESS);
 
       // mod 'telephonenumber' of robert (o=test3)
-      runModifyOperation(uentry2, createMods("telephonenumber", "555555"));
+      final ModifyOperation modOp2 = connection.processModify(
+          uentry2.getDN(), createMods("telephonenumber", "555555"));
+      waitOpResult(modOp2, ResultCode.SUCCESS);
 
       // moddn robert (o=test3) to robert2 (o=test3)
-      ModifyDNOperation modDNOp = new ModifyDNOperationBasis(connection,
-          InternalClientConnection.nextOperationID(),
-          InternalClientConnection.nextMessageID(),
-          null,
+      ModifyDNOperation modDNOp = connection.processModifyDN(
           DN.decode("cn=Robert Hue," + baseDN3),
           RDN.decode("cn=Robert Hue2"), true,
           baseDN3);
-      modDNOp.run();
       waitOpResult(modDNOp, ResultCode.SUCCESS);
 
       // del robert (o=test3)
-      runDeleteOperation("cn=Robert Hue2," + baseDN3);
-      sleep(1000);
+      final DeleteOperation delOp = connection.processDelete(DN.decode("cn=Robert Hue2," + baseDN3));
+      waitOpResult(delOp, ResultCode.SUCCESS);
+      Thread.sleep(1000);
 
       // Search on ECL from start on all suffixes
       String cookie = "";
@@ -2984,7 +2962,7 @@
       assertThat(entries).hasSize(8);
       debugAndWriteEntries(null, entries, tn);
 
-      sleep(2000);
+      Thread.sleep(2000);
 
       for (SearchResultEntry resultEntry : entries)
       {
@@ -3026,9 +3004,13 @@
     }
     finally
     {
-      runDeleteOperation("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2);
-      runDeleteOperation(TEST_ROOT_DN_STRING2);
-      runDeleteOperation(baseDN3.toString());
+      final DeleteOperation delOp1 = connection.processDelete(
+          DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2));
+      waitOpResult(delOp1, ResultCode.SUCCESS);
+      final DeleteOperation delOp2 = connection.processDelete(TEST_ROOT_DN2);
+      waitOpResult(delOp2, ResultCode.SUCCESS);
+      final DeleteOperation delOp3 = connection.processDelete(baseDN3);
+      waitOpResult(delOp3, ResultCode.SUCCESS);
 
       remove(domain21, domain2, domain3);
       removeTestBackend(backend2, backend3);
@@ -3067,25 +3049,6 @@
     return newDomain;
   }
 
-  private void runModifyOperation(Entry entry, List<Modification> mods)
-      throws Exception
-  {
-    final ModifyOperation operation =
-        new ModifyOperationBasis(connection, 1, 1, null, entry.getDN(), mods);
-    operation.run();
-    waitOpResult(operation, ResultCode.SUCCESS);
-  }
-
-  private void runDeleteOperation(String dn) throws Exception
-  {
-    final DeleteOperation delOp = new DeleteOperationBasis(connection,
-        InternalClientConnection.nextOperationID(),
-        InternalClientConnection.nextMessageID(), null,
-        DN.decode(dn));
-    delOp.run();
-    waitOpResult(delOp, ResultCode.SUCCESS);
-  }
-
   private List<Modification> createMods(String attributeName, String valueString)
   {
     Attribute attr = Attributes.create(attributeName, valueString);
@@ -3113,7 +3076,7 @@
     while (operation.getResultCode() == ResultCode.UNDEFINED
         || operation.getResultCode() != expectedResult)
     {
-      sleep(50);
+      Thread.sleep(50);
       i++;
       if (i > 10)
       {

--
Gitblit v1.10.0