From 7b670ce888f308da0db49a828c56c390eb9860db Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 21 Jul 2014 12:53:41 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                 |   39 +--
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java |   12 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                           |   25 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java                                |   13 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java                           |   11 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java                         |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java                             |   43 +--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java                             |   16 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java              |  418 ++++++++++++++---------------------------
 9 files changed, 217 insertions(+), 368 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index a194d6d..65a95fc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -41,6 +41,7 @@
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
@@ -237,10 +238,13 @@
       return previousValue;
     }
 
-    // we just created a new domain => update all cursors
-    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+    if (MultimasterReplication.isECLEnabledDomain(baseDN))
     {
-      cursor.addDomain(baseDN, null);
+      // we just created a new domain => update all cursors
+      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+      {
+        cursor.addDomain(baseDN, null);
+      }
     }
     return newValue;
   }
@@ -417,10 +421,6 @@
    */
   public void clearDB() throws ChangelogException
   {
-    if (debugEnabled())
-    {
-      TRACER.debugInfo("clear the FileChangelogDB");
-    }
     if (!dbDirectory.exists())
     {
       return;
@@ -868,7 +868,7 @@
             }
           }
 
-          for (final Map<Integer, FileReplicaDB> domainMap: domainToReplicaDBs.values())
+          for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values())
           {
             for (final FileReplicaDB replicaDB : domainMap.values())
             {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
index a8579e4..c17755f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -36,7 +36,6 @@
  * <p>
  * The cursor provides a java.sql.ResultSet like API :
  * <pre>
- * {@code
  *  FileReplicaDBCursor cursor = ...;
  *  try {
  *    while (cursor.next()) {
@@ -116,11 +115,8 @@
       lastNonNullCurrentCSN = nextRecord.getKey();
       return true;
     }
-    else
-    {
-      nextRecord = null;
-      return false;
-    }
+    nextRecord = null;
+    return false;
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 68b9688..8a9390d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -425,13 +425,26 @@
       {
         try
         {
-          while (!domainsToClear.isEmpty())
+          if (!domainsToClear.isEmpty())
           {
-            final DN baseDNToClear = domainsToClear.first();
-            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
-            // Only release the waiting thread
-            // once this domain's state has been cleared.
-            domainsToClear.remove(baseDNToClear);
+            final DN cursorData = nextChangeForInsertDBCursor.getData();
+            final boolean callNextOnCursor =
+                cursorData != null && domainsToClear.contains(cursorData);
+            while (!domainsToClear.isEmpty())
+            {
+              final DN baseDNToClear = domainsToClear.first();
+              nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
+              // Only release the waiting thread
+              // once this domain's state has been cleared.
+              domainsToClear.remove(baseDNToClear);
+            }
+
+            if (callNextOnCursor)
+            {
+              // The next change to consume comes from a domain to be removed.
+              // Call DBCursor.next() to ensure this domain is removed
+              nextChangeForInsertDBCursor.next();
+            }
           }
 
           // Do not call DBCursor.next() here
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 38afd7d..ae54930 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -129,21 +129,31 @@
 
   private void removeNoLongerNeededCursors()
   {
-    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator =
-        cursors.entrySet().iterator(); iterator.hasNext();)
+    for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
     {
-      final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next();
-      final Data data = entry.getValue();
-      if (isCursorNoLongerNeededFor(data))
+      final Data dataToFind = iter.next();
+      for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
+          cursors.entrySet().iterator(); cursorIter.hasNext();)
       {
-        entry.getKey().close();
-        iterator.remove();
-        cursorRemoved(data);
+        final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
+        if (dataToFind.equals(entry.getValue()))
+        {
+          entry.getKey().close();
+          cursorIter.remove();
+        }
       }
+      iter.remove();
     }
   }
 
   /**
+   * Returns an Iterator over the data associated to cursors that must be removed.
+   *
+   * @return an Iterator over the data associated to cursors that must be removed.
+   */
+  protected abstract Iterator<Data> removedCursorsIterator();
+
+  /**
    * Adds a cursor to this composite cursor. It first calls
    * {@link DBCursor#next()} to verify whether it is exhausted or not.
    *
@@ -191,23 +201,6 @@
   protected abstract void incorporateNewCursors() throws ChangelogException;
 
   /**
-   * Returns whether the cursor associated to the provided data should be removed.
-   *
-   * @param data the data associated to the cursor to be tested
-   * @return true if the cursor associated to the provided data should be removed,
-   *         false otherwise
-   */
-  protected abstract boolean isCursorNoLongerNeededFor(Data data);
-
-  /**
-   * Notifies that the cursor associated to the provided data has been removed.
-   *
-   * @param data
-   *          the data associated to the removed cursor
-   */
-  protected abstract void cursorRemoved(Data data);
-
-  /**
    * Returns the data associated to the cursor that returned the current record.
    *
    * @return the data associated to the cursor that returned the current record.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
index 28b56a3..c249a78 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -24,6 +24,7 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -110,16 +111,10 @@
 
   /** {@inheritDoc} */
   @Override
-  protected boolean isCursorNoLongerNeededFor(Void data)
+  @SuppressWarnings("unchecked")
+  protected Iterator<Void> removedCursorsIterator()
   {
-    return false; // Not needed
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  protected void cursorRemoved(Void data)
-  {
-    // Not used so far
+    return Collections.EMPTY_LIST.iterator(); // nothing to remove
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 4ebee02..ce8b030 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -41,6 +41,7 @@
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
@@ -186,7 +187,7 @@
     }
   }
 
-  private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
+  private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
   {
     final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
     if (domainMap != null)
@@ -196,29 +197,12 @@
     return Collections.emptyMap();
   }
 
-  private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
+  private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
   {
     return getDomainMap(baseDN).get(serverId);
   }
 
   /**
-   * Provision resources for the specified serverId in the specified replication
-   * domain.
-   *
-   * @param baseDN
-   *          the replication domain where to add the serverId
-   * @param serverId
-   *          the server Id to add to the replication domain
-   * @throws ChangelogException
-   *           If a database error happened.
-   */
-  private void commission(DN baseDN, int serverId, ReplicationServer rs)
-      throws ChangelogException
-  {
-    getOrCreateReplicaDB(baseDN, serverId, rs);
-  }
-
-  /**
    * Returns a {@link JEReplicaDB}, possibly creating it.
    *
    * @param baseDN
@@ -279,10 +263,13 @@
       return previousValue;
     }
 
-    // we just created a new domain => update all cursors
-    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+    if (MultimasterReplication.isECLEnabledDomain(baseDN))
     {
-      cursor.addDomain(baseDN, null);
+      // we just created a new domain => update all cursors
+      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+      {
+        cursor.addDomain(baseDN, null);
+      }
     }
     return newValue;
   }
@@ -360,7 +347,7 @@
     {
       for (int serverId : entry.getValue())
       {
-        commission(entry.getKey(), serverId, replicationServer);
+        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
       }
     }
   }
@@ -695,7 +682,7 @@
       {
         try
         {
-          cnIndexDB = new JEChangeNumberIndexDB(this.replicationEnv);
+          cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
         }
         catch (Exception e)
         {
@@ -774,10 +761,10 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startAfterCSN)
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
       throws ChangelogException
   {
-    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
       final DBCursor<UpdateMsg> cursor =
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
index 773ecb3..cf88679 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -107,16 +107,9 @@
 
   /** {@inheritDoc} */
   @Override
-  protected boolean isCursorNoLongerNeededFor(DN baseDN)
+  protected Iterator<DN> removedCursorsIterator()
   {
-    return removeDomains.contains(baseDN);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  protected void cursorRemoved(DN baseDN)
-  {
-    removeDomains.remove(baseDN);
+    return removeDomains.iterator();
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index cdaae64..63d9c92 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -89,6 +89,16 @@
 public class ExternalChangeLogTest extends ReplicationTestCase
 {
 
+  private static class Results
+  {
+
+    public final List<SearchResultEntryProtocolOp> searchResultEntries =
+        new ArrayList<SearchResultEntryProtocolOp>();
+    public long searchReferences;
+    public long searchesDone;
+
+  }
+
   private static final int SERVER_ID_1 = 1201;
   private static final int SERVER_ID_2 = 1202;
 
@@ -182,14 +192,15 @@
   @Test(enabled=true, dependsOnMethods = { "PrimaryTest"})
   public void TestWithAndWithoutControl() throws Exception
   {
+    final String tn = "TestWithAndWithoutControl";
     replicationServer.getChangelogDB().setPurgeDelay(0);
     // Write changes and read ECL from start
-    ECLCompatWriteReadAllOps(1);
+    ECLCompatWriteReadAllOps(1, tn);
 
     ECLCompatNoControl(1);
 
     // Write additional changes and read ECL from a provided change number
-    ECLCompatWriteReadAllOps(5);
+    ECLCompatWriteReadAllOps(5, tn);
   }
 
   @Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
@@ -287,12 +298,13 @@
   @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
   public void ECLReplicationServerFullTest15() throws Exception
   {
+    final String tn = "ECLReplicationServerFullTest15";
     replicationServer.getChangelogDB().setPurgeDelay(0);
     // Write 4 changes and read ECL from start
-    ECLCompatWriteReadAllOps(1);
+    ECLCompatWriteReadAllOps(1, tn);
 
     // Write 4 additional changes and read ECL from a provided change number
-    CSN csn = ECLCompatWriteReadAllOps(5);
+    CSN csn = ECLCompatWriteReadAllOps(5, tn);
 
     // Test request from a provided change number - read 6
     ECLCompatReadFrom(6, csn);
@@ -889,15 +901,12 @@
 
       final CSN[] csns = generateCSNs(3, SERVER_ID_1);
       publishDeleteMsgInOTest(server01, csns[0], testName, 1);
-
-      Thread.sleep(1000);
-
-      // Test that last cookie has been updated
-      String cookieNotEmpty = readLastCookie();
-      debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
-
+      final String firstCookie = assertLastCookieDifferentThanLastValue("");
+      String lastCookie = firstCookie;
       publishDeleteMsgInOTest(server01, csns[1], testName, 2);
+      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
       publishDeleteMsgInOTest(server01, csns[2], testName, 3);
+      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
 
       // ---
       // 2. Now set up a very short purge delay on the replication changelogs
@@ -924,7 +933,7 @@
       //    returns the appropriate error.
       debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
       debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
-      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
+      searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
       assertTrue(searchOp.getErrorMessage().toString().startsWith(
           ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()),
           searchOp.getErrorMessage().toString());
@@ -956,26 +965,21 @@
 
       final CSN[] csns = generateCSNs(3, SERVER_ID_1);
       publishDeleteMsgInOTest(server01, csns[0], testName, 1);
-
-      Thread.sleep(1000);
-
-      // Test that last cookie has been updated
-      String cookieNotEmpty = readLastCookie();
-      debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
-
+      final String firstCookie = assertLastCookieDifferentThanLastValue("");
+      String lastCookie = firstCookie;
       publishDeleteMsgInOTest(server01, csns[1], testName, 2);
+      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
       publishDeleteMsgInOTest(server01, csns[2], testName, 3);
+      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
 
       // ---
       // 2. Now remove the domain by sending a reset message
-      ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657);
-      server01.publish(msg);
+      server01.publish(new ResetGenerationIdMsg(23657));
 
       // ---
       // 3. Assert that a request with an empty cookie returns nothing
       // since replication changelog has been cleared
       String cookie= "";
-      InternalSearchOperation searchOp = null;
       searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
 
       // ---
@@ -983,7 +987,7 @@
       // since replication changelog has been cleared
       cookie = readLastCookie();
       debugInfo(testName, "2. Search with last cookie=" + cookie + "\"");
-      searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
+      searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
 
       // ---
       // 5. Assert that a request with an "old" cookie - one that refers to
@@ -991,7 +995,8 @@
       //    returns the appropriate error.
       debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
       debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
-      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
+      final InternalSearchOperation searchOp =
+          searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
       assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString());
     }
     finally
@@ -1001,6 +1006,23 @@
     debugInfo(testName, "Ending test successfully");
   }
 
+  private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception
+  {
+    int cnt = 0;
+    while (cnt < 100)
+    {
+      final String newCookie = readLastCookie();
+      if (!newCookie.equals(lastCookie))
+      {
+        return newCookie;
+      }
+      cnt++;
+      Thread.sleep(10);
+    }
+    Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'");
+    return null;// dead code
+  }
+
   private void debugAndWriteEntries(LDIFWriter ldifWriter,
       List<SearchResultEntry> entries, String tn) throws Exception
   {
@@ -1068,10 +1090,11 @@
 
       // Publish ADD
       csnCounter++;
-      String lentry = "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
-          + "objectClass: top\n" + "objectClass: domain\n"
-          + "entryUUID: "+user1entryUUID+"\n";
-      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+      Entry entry = TestCaseUtils.entryFromLdifString(
+          "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n"
+          + "objectClass: top\n"
+          + "objectClass: domain\n"
+          + "entryUUID: " + user1entryUUID + "\n");
       AddMsg addMsg = new AddMsg(
           csns[csnCounter],
           DN.decode("uid="+tn+"2," + TEST_ROOT_DN_STRING),
@@ -1407,49 +1430,27 @@
 
       InvocationCounterPlugin.resetAllCounters();
 
-      long searchEntries;
-      long searchReferences = ldapStatistics.getSearchResultReferences();
-      long searchesDone     = ldapStatistics.getSearchResultsDone();
+      final Results results = new Results();
+      results.searchReferences = ldapStatistics.getSearchResultReferences();
+      results.searchesDone     = ldapStatistics.getSearchResultsDone();
 
       debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)");
-      LDAPMessage message = new LDAPMessage(2, searchRequest, controls);
-      w.writeMessage(message);
+      w.writeMessage(new LDAPMessage(2, searchRequest, controls));
       Thread.sleep(500);
 
       if (!changesOnly)
       {
         // Wait for change 1
         debugInfo(tn, "Waiting for init search expected to return change 1");
-        searchEntries = 0;
+        readMessages(tn, r, results, 1, "Init search Result=");
+        for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
         {
-          while (searchEntries < 1 && (message = r.readMessage()) != null)
-          {
-            debugInfo(tn, "Init search Result=" +
-                message.getProtocolOpType() + message + " " + searchEntries);
-            switch (message.getProtocolOpType())
-            {
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-              SearchResultEntryProtocolOp searchResultEntry =
-                  message.getSearchResultEntryProtocolOp();
-              searchEntries++;
-              // FIXME:ECL Double check 1 is really the valid value here.
-              checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
-                  (compatMode?"1":"0"));
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-              searchReferences++;
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-              assertSuccessful(message);
-              searchesDone++;
-              break;
-            }
-          }
+          // FIXME:ECL Double check 1 is really the valid value here.
+          final String cn = compatMode ? "1" : "0";
+          checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
         }
         debugInfo(tn, "INIT search done with success. searchEntries="
-            + searchEntries + " #searchesDone="+ searchesDone);
+            + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
       }
 
       // Produces change 2
@@ -1465,30 +1466,8 @@
       " published , psearch will now wait for new entries");
 
       // wait for the 1 new entry
-      searchEntries = 0;
-      SearchResultEntryProtocolOp searchResultEntry = null;
-      while (searchEntries < 1 && (message = r.readMessage()) != null)
-      {
-        debugInfo(tn, "psearch search  Result=" +
-            message.getProtocolOpType() + message);
-        switch (message.getProtocolOpType())
-        {
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-          searchResultEntry = message.getSearchResultEntryProtocolOp();
-          searchEntries++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-          searchReferences++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-          assertSuccessful(message);
-//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
-          searchesDone++;
-          break;
-        }
-      }
+      readMessages(tn, r, results, 1, "psearch search  Result=");
+      SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0);
       Thread.sleep(1000);
 
       // Check we received change 2
@@ -1518,11 +1497,12 @@
             createSearchRequest("(targetDN=*directpsearch*,o=test)", null);
 
         debugInfo(tn, "ACI test : sending search");
-        message = new LDAPMessage(2, searchRequest, createCookieControl(""));
-        w.writeMessage(message);
+        w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl("")));
 
-        searchesDone=0;
-        searchEntries = 0;
+        LDAPMessage message;
+        int searchesDone = 0;
+        int searchEntries = 0;
+        int searchReferences = 0;
         while ((searchesDone==0) && (message = r.readMessage()) != null)
         {
           debugInfo(tn, "ACI test : message returned " +
@@ -1714,125 +1694,53 @@
 
       InvocationCounterPlugin.resetAllCounters();
 
-      ldapStatistics.getSearchRequests();
-      long searchEntries    = ldapStatistics.getSearchResultEntries();
-      ldapStatistics.getSearchResultReferences();
-      long searchesDone     = ldapStatistics.getSearchResultsDone();
+      final Results results = new Results();
+      results.searchesDone = ldapStatistics.getSearchResultsDone();
 
-      LDAPMessage message;
-      message = new LDAPMessage(2, searchRequest1, controls);
-      w1.writeMessage(message);
+      w1.writeMessage(new LDAPMessage(2, searchRequest1, controls));
       Thread.sleep(500);
-
-      message = new LDAPMessage(2, searchRequest2, controls);
-      w2.writeMessage(message);
+      w2.writeMessage(new LDAPMessage(2, searchRequest2, controls));
       Thread.sleep(500);
-
-      message = new LDAPMessage(2, searchRequest3, controls);
-      w3.writeMessage(message);
+      w3.writeMessage(new LDAPMessage(2, searchRequest3, controls));
       Thread.sleep(500);
 
       if (!changesOnly)
       {
         debugInfo(tn, "Search1  Persistent filter=" + searchRequest1.getFilter()
                   + " expected to return change " + csn1);
-        searchEntries = 0;
-        message = null;
-
         {
-          while (searchEntries < 1 && (message = r1.readMessage()) != null)
+          readMessages(tn, r1, results, 1, "Search1 Result=");
+          final int searchEntries = results.searchResultEntries.size();
+          if (searchEntries == 1)
           {
-            debugInfo(tn, "Search1 Result=" +
-                message.getProtocolOpType() + " " + message);
-            switch (message.getProtocolOpType())
-            {
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-              SearchResultEntryProtocolOp searchResultEntry =
-                  message.getSearchResultEntryProtocolOp();
-              searchEntries++;
-              if (searchEntries==1)
-              {
-                checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
-                checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
-                    (compatMode?"10":"0"));
-              }
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-              assertSuccessful(message);
-              searchesDone++;
-              break;
-            }
+            final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1);
+            final String cn = compatMode ? "10" : "0";
+            checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
+            checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
           }
+          debugInfo(tn, "Search1 done with success. searchEntries="
+              + searchEntries + " #searchesDone=" + results.searchesDone);
         }
-        debugInfo(tn, "Search1 done with success. searchEntries="
-            + searchEntries + " #searchesDone="+ searchesDone);
 
-        searchEntries = 0;
-        message = null;
         {
           debugInfo(tn, "Search 2  Persistent filter=" + searchRequest2.getFilter()
               + " expected to return change " + csn2 + " & " + csn3);
-          while (searchEntries < 2 && (message = r2.readMessage()) != null)
+          readMessages(tn, r2, results, 2, "Search 2 Result=");
+          for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
           {
-            debugInfo(tn, "Search 2 Result=" +
-                message.getProtocolOpType() + message);
-            switch (message.getProtocolOpType())
-            {
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-              SearchResultEntryProtocolOp searchResultEntry =
-                  message.getSearchResultEntryProtocolOp();
-              searchEntries++;
-              checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
-                  (compatMode?"10":"0"));
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-              assertSuccessful(message);
-              searchesDone++;
-              break;
-            }
+            final String cn = compatMode ? "10" : "0";
+            checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
           }
+          debugInfo(tn, "Search2 done with success. searchEntries="
+              + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
         }
-        debugInfo(tn, "Search2 done with success. searchEntries="
-            + searchEntries + " #searchesDone="+ searchesDone);
 
 
-        searchEntries = 0;
-        message = null;
-        {
-          debugInfo(tn, "Search3  Persistent filter=" + searchRequest3.getFilter()
-              + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
-          while (searchEntries < 4 && (message = r3.readMessage()) != null)
-          {
-            debugInfo(tn, "Search3 Result=" +
-                message.getProtocolOpType() + " " + message);
-
-            switch (message.getProtocolOpType())
-            {
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-              searchEntries++;
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-              assertSuccessful(message);
-              searchesDone++;
-              break;
-            }
-          }
-        }
+        debugInfo(tn, "Search3  Persistent filter=" + searchRequest3.getFilter()
+            + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
+        readMessages(tn, r3, results, 4, "Search3 Result=");
         debugInfo(tn, "Search3 done with success. searchEntries="
-            + searchEntries + " #searchesDone="+ searchesDone);
-
+            + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
       }
 
       // Produces additional change
@@ -1866,82 +1774,19 @@
       debugInfo(tn, delMsg13.getCSN()  + " published additionally ");
 
       // wait 11
-      searchEntries = 0;
-      message = null;
-      while (searchEntries < 1 && (message = r1.readMessage()) != null)
-      {
-        debugInfo(tn, "Search 11 Result=" +
-            message.getProtocolOpType() + " " + message);
-        switch (message.getProtocolOpType())
-        {
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-          searchEntries++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-          assertSuccessful(message);
-//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
-          searchesDone++;
-          break;
-        }
-      }
+      readMessages(tn, r1, results, 1, "Search 11 Result=");
       Thread.sleep(1000);
       debugInfo(tn, "Search 1 successfully receives additional changes");
 
       // wait 12 & 13
-      searchEntries = 0;
-      message = null;
-      while (searchEntries < 2 && (message = r2.readMessage()) != null)
-      {
-        debugInfo(tn, "psearch search 12 Result=" +
-            message.getProtocolOpType() + " " + message);
-        switch (message.getProtocolOpType())
-        {
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-          searchEntries++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-          assertSuccessful(message);
-//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
-          searchesDone++;
-          break;
-        }
-      }
+      readMessages(tn, r2, results, 2, "psearch search 12 Result=");
       Thread.sleep(1000);
       debugInfo(tn, "Search 2 successfully receives additional changes");
 
       // wait 11 & 12 & 13
-      searchEntries = 0;
-      SearchResultEntryProtocolOp searchResultEntry = null;
-      message = null;
-      while (searchEntries < 3 && (message = r3.readMessage()) != null)
-      {
-        debugInfo(tn, "psearch search 13 Result=" +
-            message.getProtocolOpType() + " " + message);
-        switch (message.getProtocolOpType())
-        {
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-          searchResultEntry = message.getSearchResultEntryProtocolOp();
-          searchEntries++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-          assertSuccessful(message);
-//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
-          searchesDone++;
-          break;
-        }
-      }
+      readMessages(tn, r3, results, 3, "psearch search 13 Result=");
+      SearchResultEntryProtocolOp searchResultEntry =
+          results.searchResultEntries.get(results.searchResultEntries.size() - 1);
       Thread.sleep(1000);
 
       // Check we received change 13
@@ -1956,6 +1801,35 @@
     debugInfo(tn, "Ends test successfully");
   }
 
+  private void readMessages(String tn, org.opends.server.tools.LDAPReader r,
+      final Results results, final int i, final String string) throws Exception
+  {
+    results.searchResultEntries.clear();
+
+    LDAPMessage message;
+    while (results.searchResultEntries.size() < i
+        && (message = r.readMessage()) != null)
+    {
+      debugInfo(tn, string + message.getProtocolOpType() + " " + message);
+
+      switch (message.getProtocolOpType())
+      {
+      case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
+        results.searchResultEntries.add(message.getSearchResultEntryProtocolOp());
+        break;
+
+      case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
+        results.searchReferences++;
+        break;
+
+      case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
+        assertSuccessful(message);
+        results.searchesDone++;
+        break;
+      }
+    }
+  }
+
   private void assertSuccessful(LDAPMessage message)
   {
     SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp();
@@ -2002,10 +1876,9 @@
       new BindRequestProtocolOp(
           ByteString.valueOf(bindDN),
           3, ByteString.valueOf(password));
-    LDAPMessage message = new LDAPMessage(1, bindRequest);
-    w.writeMessage(message);
+    w.writeMessage(new LDAPMessage(1, bindRequest));
 
-    message = r.readMessage();
+    final LDAPMessage message = r.readMessage();
     BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp();
 //  assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
     assertEquals(bindResponse.getResultCode(), expected);
@@ -2199,9 +2072,9 @@
     debugInfo(tn, "Ending test successfully");
   }
 
-  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
+  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception
   {
-    String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
+    String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber;
     debugInfo(tn, "Starting test\n\n");
     LDAPReplicationDomain domain = null;
     try
@@ -2219,17 +2092,16 @@
       CSN[] csns = generateCSNs(4, SERVER_ID_1);
 
       // Publish DEL
-      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
+      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
       server01.publish(delMsg);
       debugInfo(tn, " publishes " + delMsg.getCSN());
 
       // Publish ADD
-      String lentry =
-          "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
+      Entry entry = TestCaseUtils.entryFromLdifString(
+          "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n"
           + "objectClass: top\n"
           + "objectClass: domain\n"
-          + "entryUUID: "+user1entryUUID+"\n";
-      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+          + "entryUUID: " + user1entryUUID + "\n");
       AddMsg addMsg = new AddMsg(
           csns[1],
           entry.getDN(),
@@ -2242,7 +2114,7 @@
       debugInfo(tn, " publishes " + addMsg.getCSN());
 
       // Publish MOD
-      DN baseDN = DN.decode("uid="+tn+"3," + TEST_ROOT_DN_STRING);
+      DN baseDN = DN.decode("uid="+tn+"-3," + TEST_ROOT_DN_STRING);
       List<Modification> mods = createMods("description", "new value");
       ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID);
       server01.publish(modMsg);
@@ -2250,7 +2122,7 @@
 
       // Publish modDN
       ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
-          DN.decode("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN
+          DN.decode("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN
           RDN.decode("uid="+tn+"new4"), // new rdn
           true,  // deleteoldrdn
           TEST_ROOT_DN2); // new superior
@@ -2260,8 +2132,8 @@
       server01.publish(modDNMsg);
       debugInfo(tn, " publishes " + modDNMsg.getCSN());
 
-      String filter = "(targetdn=*" + tn + "*,o=test)";
-      InternalSearchOperation searchOp = searchOnChangelog(filter, 4, tn, SUCCESS);
+      InternalSearchOperation searchOp =
+          searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS);
 
       // test 4 entries returned
       final LDIFWriter ldifWriter = getLDIFWriter();
@@ -2271,7 +2143,7 @@
       stop(server01);
 
       // Test with filter on change number
-      filter =
+      String filter =
           "(&(targetdn=*" + tn + "*,o=test)"
             + "(&(changenumber>=" + firstChangeNumber + ")"
               + "(changenumber<=" + (firstChangeNumber + 3) + ")))";
@@ -2334,7 +2206,7 @@
       long firstChangeNumber, int i, String tn, CSN csn)
   {
     final long changeNumber = firstChangeNumber + i;
-    final String targetDN = "uid=" + tn + (i + 1) + "," + TEST_ROOT_DN_STRING;
+    final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING;
 
     assertDNEquals(resultEntry, changeNumber);
     checkValue(resultEntry, "changenumber", String.valueOf(changeNumber));
@@ -2347,9 +2219,11 @@
 
   private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber)
   {
-    String actualDN = resultEntry.getDN().toNormalizedString();
-    String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
-    assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
+    final String actualDN = resultEntry.getDN().toNormalizedString();
+    final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
+    assertThat(actualDN)
+        .as("Unexpected DN for entry " + resultEntry)
+        .isEqualToIgnoringCase(expectedDN);
   }
 
   private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
@@ -2543,7 +2417,7 @@
     while (!cnIndexDB.isEmpty())
     {
       debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
-      Thread.sleep(200);
+      Thread.sleep(10);
     }
 
     debugInfo(tn, "Ending test with success");
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index 7c76a7a..da8bcd0 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,6 +25,9 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
+import java.util.Collections;
+import java.util.Iterator;
+
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -50,14 +53,9 @@
     }
 
     @Override
-    protected boolean isCursorNoLongerNeededFor(String data)
+    protected Iterator<String> removedCursorsIterator()
     {
-      return false;
-    }
-
-    @Override
-    protected void cursorRemoved(String data)
-    {
+      return Collections.EMPTY_LIST.iterator();
     }
   }
 

--
Gitblit v1.10.0