mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
21.53.2014 acfcc9e05552e3d2fe37f8d9b8ac0827a204ee3a
OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains


Fixed a problem introduced in r10912.
Problem was due to removing domains from the MultiDomainDBCursor:
In CompositeDBCursor.removeNoLongerNeededCursors(), code iterates over cursors and then forget baseDNs to remove only they match with a cursor.
Problem is that it should be the other way around: iterate over the baseDNs to remove and always forget them whether or not a matching cursor is found.



ChangeNumberIndexer.java:
In run(), better handled the removed domains.

CompositeDBCursor.java:
In removeNoLongerNeededCursors(), iterate over the baseDNs to remove, find a cursor and remove it if found, then always forget the baseDN.
Added abstract method removedCursorsIterator().
Removed isCursorNoLongerNeededFor() and cursorRemoved().

DomainDBCursor.java, MultiDomainDBCursor.java, CompositeDBCursorTest.java:
Consequence of the changes to CompositeDBCursor.



ExternalChangeLogTest.java:
Code cleanup.
Extracted method readMessages() to factorize code.
Added method assertLastCookieDifferentThanLastValue() to loop until last cookie is updated.
Added inner class Results.

FileChangelogDB.java, JEChangelogDB.java:
In getExistingOrNewDomainMap(), only add the new domain if the baseDN is from an ECL enabled domain.

FileReplicaDBCursor.java:
Fixed javadoc.
9 files modified
585 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 25 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java 43 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 39 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java 11 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 418 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -237,10 +238,13 @@
      return previousValue;
    }
    // we just created a new domain => update all cursors
    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
    if (MultimasterReplication.isECLEnabledDomain(baseDN))
    {
      cursor.addDomain(baseDN, null);
      // we just created a new domain => update all cursors
      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
      {
        cursor.addDomain(baseDN, null);
      }
    }
    return newValue;
  }
@@ -417,10 +421,6 @@
   */
  public void clearDB() throws ChangelogException
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("clear the FileChangelogDB");
    }
    if (!dbDirectory.exists())
    {
      return;
@@ -868,7 +868,7 @@
            }
          }
          for (final Map<Integer, FileReplicaDB> domainMap: domainToReplicaDBs.values())
          for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values())
          {
            for (final FileReplicaDB replicaDB : domainMap.values())
            {
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -36,7 +36,6 @@
 * <p>
 * The cursor provides a java.sql.ResultSet like API :
 * <pre>
 * {@code
 *  FileReplicaDBCursor cursor = ...;
 *  try {
 *    while (cursor.next()) {
@@ -116,11 +115,8 @@
      lastNonNullCurrentCSN = nextRecord.getKey();
      return true;
    }
    else
    {
      nextRecord = null;
      return false;
    }
    nextRecord = null;
    return false;
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -425,13 +425,26 @@
      {
        try
        {
          while (!domainsToClear.isEmpty())
          if (!domainsToClear.isEmpty())
          {
            final DN baseDNToClear = domainsToClear.first();
            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
            // Only release the waiting thread
            // once this domain's state has been cleared.
            domainsToClear.remove(baseDNToClear);
            final DN cursorData = nextChangeForInsertDBCursor.getData();
            final boolean callNextOnCursor =
                cursorData != null && domainsToClear.contains(cursorData);
            while (!domainsToClear.isEmpty())
            {
              final DN baseDNToClear = domainsToClear.first();
              nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
              // Only release the waiting thread
              // once this domain's state has been cleared.
              domainsToClear.remove(baseDNToClear);
            }
            if (callNextOnCursor)
            {
              // The next change to consume comes from a domain to be removed.
              // Call DBCursor.next() to ensure this domain is removed
              nextChangeForInsertDBCursor.next();
            }
          }
          // Do not call DBCursor.next() here
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -129,21 +129,31 @@
  private void removeNoLongerNeededCursors()
  {
    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator =
        cursors.entrySet().iterator(); iterator.hasNext();)
    for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
    {
      final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next();
      final Data data = entry.getValue();
      if (isCursorNoLongerNeededFor(data))
      final Data dataToFind = iter.next();
      for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
          cursors.entrySet().iterator(); cursorIter.hasNext();)
      {
        entry.getKey().close();
        iterator.remove();
        cursorRemoved(data);
        final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
        if (dataToFind.equals(entry.getValue()))
        {
          entry.getKey().close();
          cursorIter.remove();
        }
      }
      iter.remove();
    }
  }
  /**
   * Returns an Iterator over the data associated to cursors that must be removed.
   *
   * @return an Iterator over the data associated to cursors that must be removed.
   */
  protected abstract Iterator<Data> removedCursorsIterator();
  /**
   * Adds a cursor to this composite cursor. It first calls
   * {@link DBCursor#next()} to verify whether it is exhausted or not.
   *
@@ -191,23 +201,6 @@
  protected abstract void incorporateNewCursors() throws ChangelogException;
  /**
   * Returns whether the cursor associated to the provided data should be removed.
   *
   * @param data the data associated to the cursor to be tested
   * @return true if the cursor associated to the provided data should be removed,
   *         false otherwise
   */
  protected abstract boolean isCursorNoLongerNeededFor(Data data);
  /**
   * Notifies that the cursor associated to the provided data has been removed.
   *
   * @param data
   *          the data associated to the removed cursor
   */
  protected abstract void cursorRemoved(Data data);
  /**
   * Returns the data associated to the cursor that returned the current record.
   *
   * @return the data associated to the cursor that returned the current record.
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -24,6 +24,7 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -110,16 +111,10 @@
  /** {@inheritDoc} */
  @Override
  protected boolean isCursorNoLongerNeededFor(Void data)
  @SuppressWarnings("unchecked")
  protected Iterator<Void> removedCursorsIterator()
  {
    return false; // Not needed
  }
  /** {@inheritDoc} */
  @Override
  protected void cursorRemoved(Void data)
  {
    // Not used so far
    return Collections.EMPTY_LIST.iterator(); // nothing to remove
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -186,7 +187,7 @@
    }
  }
  private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
  private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
  {
    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
@@ -196,29 +197,12 @@
    return Collections.emptyMap();
  }
  private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
  private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
  {
    return getDomainMap(baseDN).get(serverId);
  }
  /**
   * Provision resources for the specified serverId in the specified replication
   * domain.
   *
   * @param baseDN
   *          the replication domain where to add the serverId
   * @param serverId
   *          the server Id to add to the replication domain
   * @throws ChangelogException
   *           If a database error happened.
   */
  private void commission(DN baseDN, int serverId, ReplicationServer rs)
      throws ChangelogException
  {
    getOrCreateReplicaDB(baseDN, serverId, rs);
  }
  /**
   * Returns a {@link JEReplicaDB}, possibly creating it.
   *
   * @param baseDN
@@ -279,10 +263,13 @@
      return previousValue;
    }
    // we just created a new domain => update all cursors
    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
    if (MultimasterReplication.isECLEnabledDomain(baseDN))
    {
      cursor.addDomain(baseDN, null);
      // we just created a new domain => update all cursors
      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
      {
        cursor.addDomain(baseDN, null);
      }
    }
    return newValue;
  }
@@ -360,7 +347,7 @@
    {
      for (int serverId : entry.getValue())
      {
        commission(entry.getKey(), serverId, replicationServer);
        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
      }
    }
  }
@@ -695,7 +682,7 @@
      {
        try
        {
          cnIndexDB = new JEChangeNumberIndexDB(this.replicationEnv);
          cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
        }
        catch (Exception e)
        {
@@ -774,10 +761,10 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startAfterCSN)
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
      throws ChangelogException
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor =
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -107,16 +107,9 @@
  /** {@inheritDoc} */
  @Override
  protected boolean isCursorNoLongerNeededFor(DN baseDN)
  protected Iterator<DN> removedCursorsIterator()
  {
    return removeDomains.contains(baseDN);
  }
  /** {@inheritDoc} */
  @Override
  protected void cursorRemoved(DN baseDN)
  {
    removeDomains.remove(baseDN);
    return removeDomains.iterator();
  }
  /** {@inheritDoc} */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -89,6 +89,16 @@
public class ExternalChangeLogTest extends ReplicationTestCase
{
  private static class Results
  {
    public final List<SearchResultEntryProtocolOp> searchResultEntries =
        new ArrayList<SearchResultEntryProtocolOp>();
    public long searchReferences;
    public long searchesDone;
  }
  private static final int SERVER_ID_1 = 1201;
  private static final int SERVER_ID_2 = 1202;
@@ -182,14 +192,15 @@
  @Test(enabled=true, dependsOnMethods = { "PrimaryTest"})
  public void TestWithAndWithoutControl() throws Exception
  {
    final String tn = "TestWithAndWithoutControl";
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Write changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
    ECLCompatWriteReadAllOps(1, tn);
    ECLCompatNoControl(1);
    // Write additional changes and read ECL from a provided change number
    ECLCompatWriteReadAllOps(5);
    ECLCompatWriteReadAllOps(5, tn);
  }
  @Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
@@ -287,12 +298,13 @@
  @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
  public void ECLReplicationServerFullTest15() throws Exception
  {
    final String tn = "ECLReplicationServerFullTest15";
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Write 4 changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
    ECLCompatWriteReadAllOps(1, tn);
    // Write 4 additional changes and read ECL from a provided change number
    CSN csn = ECLCompatWriteReadAllOps(5);
    CSN csn = ECLCompatWriteReadAllOps(5, tn);
    // Test request from a provided change number - read 6
    ECLCompatReadFrom(6, csn);
@@ -889,15 +901,12 @@
      final CSN[] csns = generateCSNs(3, SERVER_ID_1);
      publishDeleteMsgInOTest(server01, csns[0], testName, 1);
      Thread.sleep(1000);
      // Test that last cookie has been updated
      String cookieNotEmpty = readLastCookie();
      debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
      final String firstCookie = assertLastCookieDifferentThanLastValue("");
      String lastCookie = firstCookie;
      publishDeleteMsgInOTest(server01, csns[1], testName, 2);
      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
      publishDeleteMsgInOTest(server01, csns[2], testName, 3);
      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
      // ---
      // 2. Now set up a very short purge delay on the replication changelogs
@@ -924,7 +933,7 @@
      //    returns the appropriate error.
      debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
      debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
      searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
      assertTrue(searchOp.getErrorMessage().toString().startsWith(
          ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()),
          searchOp.getErrorMessage().toString());
@@ -956,26 +965,21 @@
      final CSN[] csns = generateCSNs(3, SERVER_ID_1);
      publishDeleteMsgInOTest(server01, csns[0], testName, 1);
      Thread.sleep(1000);
      // Test that last cookie has been updated
      String cookieNotEmpty = readLastCookie();
      debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
      final String firstCookie = assertLastCookieDifferentThanLastValue("");
      String lastCookie = firstCookie;
      publishDeleteMsgInOTest(server01, csns[1], testName, 2);
      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
      publishDeleteMsgInOTest(server01, csns[2], testName, 3);
      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
      // ---
      // 2. Now remove the domain by sending a reset message
      ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657);
      server01.publish(msg);
      server01.publish(new ResetGenerationIdMsg(23657));
      // ---
      // 3. Assert that a request with an empty cookie returns nothing
      // since replication changelog has been cleared
      String cookie= "";
      InternalSearchOperation searchOp = null;
      searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
      // ---
@@ -983,7 +987,7 @@
      // since replication changelog has been cleared
      cookie = readLastCookie();
      debugInfo(testName, "2. Search with last cookie=" + cookie + "\"");
      searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
      searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
      // ---
      // 5. Assert that a request with an "old" cookie - one that refers to
@@ -991,7 +995,8 @@
      //    returns the appropriate error.
      debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
      debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
      final InternalSearchOperation searchOp =
          searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
      assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString());
    }
    finally
@@ -1001,6 +1006,23 @@
    debugInfo(testName, "Ending test successfully");
  }
  private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception
  {
    int cnt = 0;
    while (cnt < 100)
    {
      final String newCookie = readLastCookie();
      if (!newCookie.equals(lastCookie))
      {
        return newCookie;
      }
      cnt++;
      Thread.sleep(10);
    }
    Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'");
    return null;// dead code
  }
  private void debugAndWriteEntries(LDIFWriter ldifWriter,
      List<SearchResultEntry> entries, String tn) throws Exception
  {
@@ -1068,10 +1090,11 @@
      // Publish ADD
      csnCounter++;
      String lentry = "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n" + "objectClass: domain\n"
          + "entryUUID: "+user1entryUUID+"\n";
      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
      Entry entry = TestCaseUtils.entryFromLdifString(
          "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n"
          + "objectClass: domain\n"
          + "entryUUID: " + user1entryUUID + "\n");
      AddMsg addMsg = new AddMsg(
          csns[csnCounter],
          DN.decode("uid="+tn+"2," + TEST_ROOT_DN_STRING),
@@ -1407,49 +1430,27 @@
      InvocationCounterPlugin.resetAllCounters();
      long searchEntries;
      long searchReferences = ldapStatistics.getSearchResultReferences();
      long searchesDone     = ldapStatistics.getSearchResultsDone();
      final Results results = new Results();
      results.searchReferences = ldapStatistics.getSearchResultReferences();
      results.searchesDone     = ldapStatistics.getSearchResultsDone();
      debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)");
      LDAPMessage message = new LDAPMessage(2, searchRequest, controls);
      w.writeMessage(message);
      w.writeMessage(new LDAPMessage(2, searchRequest, controls));
      Thread.sleep(500);
      if (!changesOnly)
      {
        // Wait for change 1
        debugInfo(tn, "Waiting for init search expected to return change 1");
        searchEntries = 0;
        readMessages(tn, r, results, 1, "Init search Result=");
        for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
        {
          while (searchEntries < 1 && (message = r.readMessage()) != null)
          {
            debugInfo(tn, "Init search Result=" +
                message.getProtocolOpType() + message + " " + searchEntries);
            switch (message.getProtocolOpType())
            {
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
              SearchResultEntryProtocolOp searchResultEntry =
                  message.getSearchResultEntryProtocolOp();
              searchEntries++;
              // FIXME:ECL Double check 1 is really the valid value here.
              checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
                  (compatMode?"1":"0"));
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
              searchReferences++;
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
              assertSuccessful(message);
              searchesDone++;
              break;
            }
          }
          // FIXME:ECL Double check 1 is really the valid value here.
          final String cn = compatMode ? "1" : "0";
          checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
        }
        debugInfo(tn, "INIT search done with success. searchEntries="
            + searchEntries + " #searchesDone="+ searchesDone);
            + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
      }
      // Produces change 2
@@ -1465,30 +1466,8 @@
      " published , psearch will now wait for new entries");
      // wait for the 1 new entry
      searchEntries = 0;
      SearchResultEntryProtocolOp searchResultEntry = null;
      while (searchEntries < 1 && (message = r.readMessage()) != null)
      {
        debugInfo(tn, "psearch search  Result=" +
            message.getProtocolOpType() + message);
        switch (message.getProtocolOpType())
        {
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
          searchResultEntry = message.getSearchResultEntryProtocolOp();
          searchEntries++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
          searchReferences++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
          assertSuccessful(message);
//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
          searchesDone++;
          break;
        }
      }
      readMessages(tn, r, results, 1, "psearch search  Result=");
      SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0);
      Thread.sleep(1000);
      // Check we received change 2
@@ -1518,11 +1497,12 @@
            createSearchRequest("(targetDN=*directpsearch*,o=test)", null);
        debugInfo(tn, "ACI test : sending search");
        message = new LDAPMessage(2, searchRequest, createCookieControl(""));
        w.writeMessage(message);
        w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl("")));
        searchesDone=0;
        searchEntries = 0;
        LDAPMessage message;
        int searchesDone = 0;
        int searchEntries = 0;
        int searchReferences = 0;
        while ((searchesDone==0) && (message = r.readMessage()) != null)
        {
          debugInfo(tn, "ACI test : message returned " +
@@ -1714,125 +1694,53 @@
      InvocationCounterPlugin.resetAllCounters();
      ldapStatistics.getSearchRequests();
      long searchEntries    = ldapStatistics.getSearchResultEntries();
      ldapStatistics.getSearchResultReferences();
      long searchesDone     = ldapStatistics.getSearchResultsDone();
      final Results results = new Results();
      results.searchesDone = ldapStatistics.getSearchResultsDone();
      LDAPMessage message;
      message = new LDAPMessage(2, searchRequest1, controls);
      w1.writeMessage(message);
      w1.writeMessage(new LDAPMessage(2, searchRequest1, controls));
      Thread.sleep(500);
      message = new LDAPMessage(2, searchRequest2, controls);
      w2.writeMessage(message);
      w2.writeMessage(new LDAPMessage(2, searchRequest2, controls));
      Thread.sleep(500);
      message = new LDAPMessage(2, searchRequest3, controls);
      w3.writeMessage(message);
      w3.writeMessage(new LDAPMessage(2, searchRequest3, controls));
      Thread.sleep(500);
      if (!changesOnly)
      {
        debugInfo(tn, "Search1  Persistent filter=" + searchRequest1.getFilter()
                  + " expected to return change " + csn1);
        searchEntries = 0;
        message = null;
        {
          while (searchEntries < 1 && (message = r1.readMessage()) != null)
          readMessages(tn, r1, results, 1, "Search1 Result=");
          final int searchEntries = results.searchResultEntries.size();
          if (searchEntries == 1)
          {
            debugInfo(tn, "Search1 Result=" +
                message.getProtocolOpType() + " " + message);
            switch (message.getProtocolOpType())
            {
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
              SearchResultEntryProtocolOp searchResultEntry =
                  message.getSearchResultEntryProtocolOp();
              searchEntries++;
              if (searchEntries==1)
              {
                checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
                checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
                    (compatMode?"10":"0"));
              }
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
              assertSuccessful(message);
              searchesDone++;
              break;
            }
            final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1);
            final String cn = compatMode ? "10" : "0";
            checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
            checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
          }
          debugInfo(tn, "Search1 done with success. searchEntries="
              + searchEntries + " #searchesDone=" + results.searchesDone);
        }
        debugInfo(tn, "Search1 done with success. searchEntries="
            + searchEntries + " #searchesDone="+ searchesDone);
        searchEntries = 0;
        message = null;
        {
          debugInfo(tn, "Search 2  Persistent filter=" + searchRequest2.getFilter()
              + " expected to return change " + csn2 + " & " + csn3);
          while (searchEntries < 2 && (message = r2.readMessage()) != null)
          readMessages(tn, r2, results, 2, "Search 2 Result=");
          for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
          {
            debugInfo(tn, "Search 2 Result=" +
                message.getProtocolOpType() + message);
            switch (message.getProtocolOpType())
            {
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
              SearchResultEntryProtocolOp searchResultEntry =
                  message.getSearchResultEntryProtocolOp();
              searchEntries++;
              checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
                  (compatMode?"10":"0"));
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
              assertSuccessful(message);
              searchesDone++;
              break;
            }
            final String cn = compatMode ? "10" : "0";
            checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
          }
          debugInfo(tn, "Search2 done with success. searchEntries="
              + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
        }
        debugInfo(tn, "Search2 done with success. searchEntries="
            + searchEntries + " #searchesDone="+ searchesDone);
        searchEntries = 0;
        message = null;
        {
          debugInfo(tn, "Search3  Persistent filter=" + searchRequest3.getFilter()
              + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
          while (searchEntries < 4 && (message = r3.readMessage()) != null)
          {
            debugInfo(tn, "Search3 Result=" +
                message.getProtocolOpType() + " " + message);
            switch (message.getProtocolOpType())
            {
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
              searchEntries++;
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
              assertSuccessful(message);
              searchesDone++;
              break;
            }
          }
        }
        debugInfo(tn, "Search3  Persistent filter=" + searchRequest3.getFilter()
            + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
        readMessages(tn, r3, results, 4, "Search3 Result=");
        debugInfo(tn, "Search3 done with success. searchEntries="
            + searchEntries + " #searchesDone="+ searchesDone);
            + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
      }
      // Produces additional change
@@ -1866,82 +1774,19 @@
      debugInfo(tn, delMsg13.getCSN()  + " published additionally ");
      // wait 11
      searchEntries = 0;
      message = null;
      while (searchEntries < 1 && (message = r1.readMessage()) != null)
      {
        debugInfo(tn, "Search 11 Result=" +
            message.getProtocolOpType() + " " + message);
        switch (message.getProtocolOpType())
        {
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
          searchEntries++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
          assertSuccessful(message);
//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
          searchesDone++;
          break;
        }
      }
      readMessages(tn, r1, results, 1, "Search 11 Result=");
      Thread.sleep(1000);
      debugInfo(tn, "Search 1 successfully receives additional changes");
      // wait 12 & 13
      searchEntries = 0;
      message = null;
      while (searchEntries < 2 && (message = r2.readMessage()) != null)
      {
        debugInfo(tn, "psearch search 12 Result=" +
            message.getProtocolOpType() + " " + message);
        switch (message.getProtocolOpType())
        {
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
          searchEntries++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
          assertSuccessful(message);
//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
          searchesDone++;
          break;
        }
      }
      readMessages(tn, r2, results, 2, "psearch search 12 Result=");
      Thread.sleep(1000);
      debugInfo(tn, "Search 2 successfully receives additional changes");
      // wait 11 & 12 & 13
      searchEntries = 0;
      SearchResultEntryProtocolOp searchResultEntry = null;
      message = null;
      while (searchEntries < 3 && (message = r3.readMessage()) != null)
      {
        debugInfo(tn, "psearch search 13 Result=" +
            message.getProtocolOpType() + " " + message);
        switch (message.getProtocolOpType())
        {
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
          searchResultEntry = message.getSearchResultEntryProtocolOp();
          searchEntries++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
          assertSuccessful(message);
//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
          searchesDone++;
          break;
        }
      }
      readMessages(tn, r3, results, 3, "psearch search 13 Result=");
      SearchResultEntryProtocolOp searchResultEntry =
          results.searchResultEntries.get(results.searchResultEntries.size() - 1);
      Thread.sleep(1000);
      // Check we received change 13
@@ -1956,6 +1801,35 @@
    debugInfo(tn, "Ends test successfully");
  }
  private void readMessages(String tn, org.opends.server.tools.LDAPReader r,
      final Results results, final int i, final String string) throws Exception
  {
    results.searchResultEntries.clear();
    LDAPMessage message;
    while (results.searchResultEntries.size() < i
        && (message = r.readMessage()) != null)
    {
      debugInfo(tn, string + message.getProtocolOpType() + " " + message);
      switch (message.getProtocolOpType())
      {
      case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
        results.searchResultEntries.add(message.getSearchResultEntryProtocolOp());
        break;
      case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
        results.searchReferences++;
        break;
      case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
        assertSuccessful(message);
        results.searchesDone++;
        break;
      }
    }
  }
  private void assertSuccessful(LDAPMessage message)
  {
    SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp();
@@ -2002,10 +1876,9 @@
      new BindRequestProtocolOp(
          ByteString.valueOf(bindDN),
          3, ByteString.valueOf(password));
    LDAPMessage message = new LDAPMessage(1, bindRequest);
    w.writeMessage(message);
    w.writeMessage(new LDAPMessage(1, bindRequest));
    message = r.readMessage();
    final LDAPMessage message = r.readMessage();
    BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp();
//  assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
    assertEquals(bindResponse.getResultCode(), expected);
@@ -2199,9 +2072,9 @@
    debugInfo(tn, "Ending test successfully");
  }
  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception
  {
    String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
    String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber;
    debugInfo(tn, "Starting test\n\n");
    LDAPReplicationDomain domain = null;
    try
@@ -2219,17 +2092,16 @@
      CSN[] csns = generateCSNs(4, SERVER_ID_1);
      // Publish DEL
      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getCSN());
      // Publish ADD
      String lentry =
          "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
      Entry entry = TestCaseUtils.entryFromLdifString(
          "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n"
          + "objectClass: domain\n"
          + "entryUUID: "+user1entryUUID+"\n";
      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
          + "entryUUID: " + user1entryUUID + "\n");
      AddMsg addMsg = new AddMsg(
          csns[1],
          entry.getDN(),
@@ -2242,7 +2114,7 @@
      debugInfo(tn, " publishes " + addMsg.getCSN());
      // Publish MOD
      DN baseDN = DN.decode("uid="+tn+"3," + TEST_ROOT_DN_STRING);
      DN baseDN = DN.decode("uid="+tn+"-3," + TEST_ROOT_DN_STRING);
      List<Modification> mods = createMods("description", "new value");
      ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID);
      server01.publish(modMsg);
@@ -2250,7 +2122,7 @@
      // Publish modDN
      ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
          DN.decode("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN
          DN.decode("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN
          RDN.decode("uid="+tn+"new4"), // new rdn
          true,  // deleteoldrdn
          TEST_ROOT_DN2); // new superior
@@ -2260,8 +2132,8 @@
      server01.publish(modDNMsg);
      debugInfo(tn, " publishes " + modDNMsg.getCSN());
      String filter = "(targetdn=*" + tn + "*,o=test)";
      InternalSearchOperation searchOp = searchOnChangelog(filter, 4, tn, SUCCESS);
      InternalSearchOperation searchOp =
          searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS);
      // test 4 entries returned
      final LDIFWriter ldifWriter = getLDIFWriter();
@@ -2271,7 +2143,7 @@
      stop(server01);
      // Test with filter on change number
      filter =
      String filter =
          "(&(targetdn=*" + tn + "*,o=test)"
            + "(&(changenumber>=" + firstChangeNumber + ")"
              + "(changenumber<=" + (firstChangeNumber + 3) + ")))";
@@ -2334,7 +2206,7 @@
      long firstChangeNumber, int i, String tn, CSN csn)
  {
    final long changeNumber = firstChangeNumber + i;
    final String targetDN = "uid=" + tn + (i + 1) + "," + TEST_ROOT_DN_STRING;
    final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING;
    assertDNEquals(resultEntry, changeNumber);
    checkValue(resultEntry, "changenumber", String.valueOf(changeNumber));
@@ -2347,9 +2219,11 @@
  private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber)
  {
    String actualDN = resultEntry.getDN().toNormalizedString();
    String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
    assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
    final String actualDN = resultEntry.getDN().toNormalizedString();
    final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
    assertThat(actualDN)
        .as("Unexpected DN for entry " + resultEntry)
        .isEqualToIgnoringCase(expectedDN);
  }
  private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
@@ -2543,7 +2417,7 @@
    while (!cnIndexDB.isEmpty())
    {
      debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
      Thread.sleep(200);
      Thread.sleep(10);
    }
    debugInfo(tn, "Ending test with success");
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,6 +25,9 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -50,14 +53,9 @@
    }
    @Override
    protected boolean isCursorNoLongerNeededFor(String data)
    protected Iterator<String> removedCursorsIterator()
    {
      return false;
    }
    @Override
    protected void cursorRemoved(String data)
    {
      return Collections.EMPTY_LIST.iterator();
    }
  }