mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.05.2013 d356a45009acfa47a94f3d023ac38b4c1faadd13
OPENDJ-1231 Make the Medium Consistency Point support replica heartbeats


Code cleanup: removed code that has been obsoleted by having the separate ChangeNumberIndexer thread.
Matt and I think that this code was necessary because the change number was computed lazily.
Now that the change number is computed eagerly, there is no need for this obsolete code.


Additional potential work that might be done later:
- Could we remove the counter records from the replicaDBs?
- Could we remove ECLServerHandler.eligibleCSN()?


ReplicationServer.java:
In getECLChangeNumberLimits():
- removed the for loop inferring the "eligibleCount", i.e. the number of changes that are in the replicaDBs but not yet in the ChangeNumberIndexDB
- removed all the parameters which are now useless

ReplicationServerDomain.java, ReplicationDomainDB.java, JEChangelogDB.java, JEReplicaDB.java, ReplicationDB.java:
Transitively removed all the methods that were called by ReplicationServer.getECLChangeNumberLimits().

ECLServerHandler.java, FirstChangeNumberVirtualAttributeProvider.java, LastChangeNumberVirtualAttributeProvider.java:
Removed all the code that was getting and passing in the parameters of ReplicationServer.getECLChangeNumberLimits().

ExternalChangeLogTest.java:
Removed now useless test ECLReplicationServerFullTest10().

JEReplicaDBTest.java:
Renamed testDbCounts() to testGetOldestNewestCSNs().
Removed now useless test testGetCountNoCounterRecords()
11 files modified
790 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java 12 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java 12 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 130 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 62 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 232 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 130 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 126 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -37,10 +37,8 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ExtensionMessages.*;
@@ -91,16 +89,10 @@
    {
      ECLWorkflowElement eclwe = (ECLWorkflowElement)
      DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
      if (eclwe!=null)
      if (eclwe != null)
      {
        // Set a list of excluded domains (also exclude 'cn=changelog' itself)
        Set<String> excludedDomains =
          MultimasterReplication.getECLDisabledDomains();
        excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
        final ReplicationServer rs = eclwe.getReplicationServer();
        final long[] limits = rs.getECLChangeNumberLimits(
            rs.getEligibleCSN(excludedDomains), excludedDomains);
        final long[] limits = rs.getECLChangeNumberLimits();
        value = String.valueOf(limits[0]);
      }
    }
opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -37,10 +37,8 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ExtensionMessages.*;
@@ -91,16 +89,10 @@
    {
      ECLWorkflowElement eclwe = (ECLWorkflowElement)
      DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
      if (eclwe!=null)
      if (eclwe != null)
      {
        // Set a list of excluded domains (also exclude 'cn=changelog' itself)
        Set<String> excludedDomains =
          MultimasterReplication.getECLDisabledDomains();
        excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
        final ReplicationServer rs = eclwe.getReplicationServer();
        final long[] limits = rs.getECLChangeNumberLimits(
            rs.getEligibleCSN(excludedDomains), excludedDomains);
        final long[] limits = rs.getECLChangeNumberLimits();
        value = String.valueOf(limits[1]);
      }
    }
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -639,8 +639,7 @@
     * Get the changeNumberLimits (from the eligibleCSN obtained at the start of
     * this method) in order to have the oldest and newest change numbers.
     */
    final long[] limits = replicationServer.getECLChangeNumberLimits(
        eligibleCSN, excludedBaseDNs);
    final long[] limits = replicationServer.getECLChangeNumberLimits();
    final long oldestChangeNumber = limits[0];
    final long newestChangeNumber = limits[1];
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1381,133 +1381,37 @@
  /**
   * Get the oldest and newest change numbers.
   * <p>
   * Implementation detail (but it could be more than a detail): The newest
   * change number seem to be a "potential" newest number. It adds up the
   * newesst change number to the number of changes coming from a domain's
   * ReplicaDBs.
   *
   * @param endCSN
   *          The CSN used as the upper limit when computing the newest change
   *          number
   * @param excludedBaseDNs
   *          The baseDNs that are excluded from the ECL.
   * @return an array of size 2 holding the oldest and newest change numbers at
   *         indexes 0 and 1.
   * @throws DirectoryException
   *           When it happens.
   */
  public long[] getECLChangeNumberLimits(CSN endCSN,
      Set<String> excludedBaseDNs) throws DirectoryException
  public long[] getECLChangeNumberLimits() throws DirectoryException
  {
    /* The content of the CNIndexDB depends on the SEARCH operations done before
     * requesting the change number. If no operations, CNIndexDB is empty.
     * The limits we want to get are the "potential" limits if a request was
     * done, the CNIndexDB is probably not complete to do that.
     *
     * The oldest change number is :
     *  - the oldest record from the CNIndexDB
     *  - if none because CNIndexDB empty,
     *      then
     *        if no change in replchangelog then return 0
     *        else return 1 (change number that WILL be returned to next search)
     *
     * The newest change number is :
     *  - initialized with the newest record from the CNIndexDB (0 if none)
     *    and consider the genState associated
     *  - to the newest change number, we add the count of updates in the
     *     replchangelog FROM that genState TO the crossDomainEligibleCSN
     *     (this diff is done domain by domain)
     */
    try
    {
      final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
      final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
      final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord();
      boolean dbEmpty = true;
      long oldestChangeNumber = 0;
      long newestChangeNumber = 0;
      boolean noCookieForNewestCN = true;
      CSN csnForNewestCN = null;
      DN baseDNForNewestCN = null;
      if (oldestRecord != null)
      if (oldestRecord == null)
      {
        if (newestRecord == null)
        {
          // Edge case: DB was cleaned or closed in between calls to
          // getOldest*() and getNewest*().
          // The only remaining solution is to fail fast.
          throw new ChangelogException(
              ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get());
        }
        dbEmpty = false;
        oldestChangeNumber = oldestRecord.getChangeNumber();
        newestChangeNumber = newestRecord.getChangeNumber();
        // Get the generalized state associated with the current newest change
        // number and initializes from it the startStates table
        final String cookie = newestRecord.getPreviousCookie();
        noCookieForNewestCN = cookie == null || cookie.length() == 0;
        csnForNewestCN = newestRecord.getCSN();
        baseDNForNewestCN = newestRecord.getBaseDN();
      }
      long newestTime = csnForNewestCN != null ? csnForNewestCN.getTime() : 0;
      for (ReplicationServerDomain rsDomain : getReplicationServerDomains())
      {
        if (contains(
            excludedBaseDNs, rsDomain.getBaseDN().toNormalizedString()))
          continue;
        // for this domain, have the state in the replchangelog
        // where the newest change number update is
        long ec;
        if (noCookieForNewestCN)
        {
          // Count changes of this domain from the beginning of the changelog
          final ServerState startState = rsDomain.getOldestState()
              .duplicateOnlyOlderThan(rsDomain.getLatestDomainTrimDate());
          ec = rsDomain.getEligibleCount(startState, endCSN);
        }
        else
        {
          // There are records in the CNIndexDB (so already returned to clients)
          // BUT
          // There is nothing related to this domain in the newest record
          // (maybe this domain was disabled when this record was returned).
          // In that case, are counted the changes from the time of the most
          // recent change
          // And count changes of this domain from the date of the
          // newest seqnum record (that does not refer to this domain)
          CSN csnx = new CSN(newestTime, csnForNewestCN.getSeqnum(), 0);
          ec = rsDomain.getEligibleCount(csnx, endCSN);
          if (baseDNForNewestCN.equals(rsDomain.getBaseDN()))
            ec--;
        }
        // cumulates on domains
        newestChangeNumber += ec;
        // CNIndexDB is empty and there are eligible updates in the replication
        // changelog then init oldest change number
        if (ec > 0 && oldestChangeNumber == 0)
          oldestChangeNumber = 1;
      }
      if (dbEmpty)
      {
        // The database was empty, just keep increasing numbers since last time
        // The database is empty, just keep increasing numbers since last time
        // we generated one change number.
        long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
        oldestChangeNumber += lastGeneratedCN;
        newestChangeNumber += lastGeneratedCN;
        final long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
        return new long[] { lastGeneratedCN, lastGeneratedCN };
      }
      return new long[] { oldestChangeNumber, newestChangeNumber };
      final ChangeNumberIndexRecord newestRecord = cnIndexDB.getNewestRecord();
      if (newestRecord == null)
      {
        // Edge case: DB was cleaned (or purged) in between calls to
        // getOldest*() and getNewest*().
        // The only remaining solution is to fail fast.
        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
            ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get());
      }
      return new long[] { oldestRecord.getChangeNumber(),
        newestRecord.getChangeNumber() };
    }
    catch (ChangelogException e)
    {
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1312,19 +1312,6 @@
    return domainDB.getCursorFrom(baseDN, startAfterServerState);
  }
 /**
  * Count the number of changes in the replication changelog for the provided
  * serverID, between 2 provided CSNs.
  * @param serverId Identifier of the server for which to compute the count.
  * @param from lower limit CSN.
  * @param to   upper limit CSN.
  * @return the number of changes.
  */
  public long getCount(int serverId, CSN from, CSN to)
  {
    return domainDB.getCount(baseDN, serverId, from, to);
  }
  /**
   * Returns the change count for that ReplicationServerDomain.
   *
@@ -2685,55 +2672,6 @@
  }
  /**
   * This methods count the changes, server by server :
   * - from a serverState start point
   * - to (inclusive) an end point (the provided endCSN).
   * @param startState The provided start server state.
   * @param endCSN The provided end CSN.
   * @return The number of changes between startState and endCSN.
   */
  public long getEligibleCount(ServerState startState, CSN endCSN)
  {
    long res = 0;
    for (CSN csn : getLatestServerState())
    {
      CSN startCSN = startState.getCSN(csn.getServerId());
      long serverIdRes = getCount(csn.getServerId(), startCSN, endCSN);
      // The startPoint is excluded when counting the ECL eligible changes
      if (startCSN != null && serverIdRes > 0)
      {
        serverIdRes--;
      }
      res += serverIdRes;
    }
    return res;
  }
  /**
   * This methods count the changes, server by server:
   * - from a start CSN
   * - to (inclusive) an end point (the provided endCSN).
   * @param startCSN The provided start CSN.
   * @param endCSN The provided end CSN.
   * @return The number of changes between startTime and endCSN.
   */
  public long getEligibleCount(CSN startCSN, CSN endCSN)
  {
    long res = 0;
    for (CSN csn : getLatestServerState())
    {
      int serverId = csn.getServerId();
      CSN lStartCSN =
          new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId);
      res += getCount(serverId, lStartCSN, endCSN);
    }
    return res;
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
   * @return The latest trim date.
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -113,58 +113,6 @@
  // serverId methods
  /**
   * Return the number of changes inclusive between 2 provided {@link CSN}s for
   * the specified serverId and replication domain. i.e. the <code>from</code>
   * and <code>to</code> CSNs are included in the count.
   * <p>
   * Note that:
   * <ol>
   * <li>If <code>from</code> is null, the count starts at the oldest CSN in the
   * database.</li>
   * <li>If <code>to</code> is null, the count is 0.</li>
   * <li>If both from and to are present, then the count includes them both
   * <code>to</code> is null, the count ends at the newest CSN in the database.
   * </li>
   * <li>incidentally, if both <code>from</code> and <code>to</code> are null,
   * the total count of entries in the replica database is returned.</li>
   * </ol>
   * <h6>Example</h6>
   * <p>
   * Given the following replica database for baseDN "dc=example,dc=com" and
   * serverId 1:
   *
   * <pre>
   * CSN1  <=  Oldest
   * CSN2
   * CSN3
   * CSN4
   * CSN5  <=  Newest
   * </pre>
   *
   * Then:
   *
   * <pre>
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN1), 1);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN2), 2);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN5), 5);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, null, CSN5), 5);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, null), 0);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, null, null), 5);
   * </pre>
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          the serverId on which to act
   * @param from
   *          The older CSN where to start the count
   * @param to
   *          The newer CSN where to end the count
   * @return The computed number of changes
   */
  long getCount(DN baseDN, int serverId, CSN from, CSN to);
  /**
   * Generates a {@link DBCursor} across all the replicaDBs for the specified
   * replication domain, with all cursors starting after the provided CSN.
   * <p>
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -489,17 +489,6 @@
    StaticUtils.recursiveDelete(dbDirectory);
  }
  /** {@inheritDoc} */
  @Override
  public long getCount(DN baseDN, int serverId, CSN from, CSN to)
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      return replicaDB.getCount(from, to);
    }
    return 0;
  }
  /** {@inheritDoc} */
  @Override
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -652,24 +652,4 @@
    db.setCounterRecordWindowSize(size);
  }
  /**
   * Return the number of changes between 2 provided CSNs.
   * This a alternative to traverseAndCount, expected to be much more efficient
   * when there is a huge number of changes in the Db.
   * @param from The lower (older) CSN.
   * @param to   The upper (newer) CSN.
   * @return The computed number of changes.
   */
  public long getCount(CSN from, CSN to)
  {
    // Now that we always keep the last CSN in the DB to avoid expiring cookies
    // too quickly, we need to check if the "to" is older than the trim date.
    if (to == null || !to.isOlderThan(new CSN(latestTrimDate, 0, 0)))
    {
      flush();
      return db.count(from, to);
    }
    return 0;
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -59,8 +59,6 @@
 */
public class ReplicationDB
{
  private static final int START = 0;
  private static final int STOP = 1;
  private Database db;
  private ReplicationDbEnv dbenv;
@@ -901,236 +899,6 @@
  }
  /**
   * Count the number of changes between 2 changes numbers (inclusive).
   * @param start The lower limit of the count.
   * @param stop The higher limit of the count.
   * @return The number of changes between provided start and stop CSN.
   * Returns 0 when an error occurs.
   */
  public long count(CSN start, CSN stop)
  {
    dbCloseLock.readLock().lock();
    try
    {
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        return 0;
      }
      if (start == null && stop == null)
      {
        return db.count();
      }
      int[] counterValues = new int[2];
      int[] distanceToCounterRecords = new int[2];
      // Step 1 : from the start point, traverse db to the next counter record
      // or to the stop point.
      findFirstCounterRecordAfterStartPoint(start, stop, counterValues,
          distanceToCounterRecords);
      // cases
      if (counterValues[START] == 0)
        return distanceToCounterRecords[START];
      // Step 2 : from the stop point, traverse db to the next counter record
      // or to the start point.
      if (!findFirstCounterRecordBeforeStopPoint(start, stop, counterValues,
          distanceToCounterRecords))
      {
        // database is empty
        return 0;
      }
      // Step 3 : Now consolidates the result
      return computeDistance(counterValues, distanceToCounterRecords);
    }
    catch (DatabaseException e)
    {
      dbenv.shutdownOnException(e);
    }
    finally
    {
      dbCloseLock.readLock().unlock();
    }
    return 0;
  }
  private void findFirstCounterRecordAfterStartPoint(CSN start, CSN stop,
      int[] counterValues, int[] distanceToCounterRecords)
      throws DatabaseException
  {
    Cursor cursor = db.openCursor(null, null);
    try
    {
      OperationStatus status;
      DatabaseEntry key;
      DatabaseEntry data = new DatabaseEntry();
      if (start != null)
      {
        key = createReplicationKey(start);
        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
          status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
      }
      else
      {
        key = new DatabaseEntry();
        // JNR: I suspect this is equivalent to writing cursor.getFirst().
        // If it is, then please change the code to make it clearer.
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      while (status == OperationStatus.SUCCESS)
      {
        // test whether the record is a regular change or a counter
        final CSN csn = toCSN(key.getData());
        if (isACounterRecord(csn))
        {
          // we have found the counter record
          counterValues[START] = decodeCounterValue(data.getData());
          break;
        }
        // it is a regular change record
        if (csn.isNewerThan(stop))
        { // we are outside the range: we reached the 'stop' target
          break;
        }
        distanceToCounterRecords[START]++;
        status = cursor.getNext(key, data, LockMode.DEFAULT);
        // loop to update the distance and possibly find a counter record
      }
    }
    finally
    {
      close(cursor);
    }
  }
  private boolean findFirstCounterRecordBeforeStopPoint(CSN start, CSN stop,
      int[] counterValues, int[] distanceToCounterRecords)
      throws DatabaseException
  {
    Cursor cursor = db.openCursor(null, null);
    try
    {
      DatabaseEntry key = createReplicationKey(stop);
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        key = new DatabaseEntry();
        data = new DatabaseEntry();
        status = cursor.getLast(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          return false;
        }
      }
      while (status == OperationStatus.SUCCESS)
      {
        final CSN csn = toCSN(key.getData());
        if (isACounterRecord(csn))
        {
          // we have found the counter record
          counterValues[STOP] = decodeCounterValue(data.getData());
          break;
        }
        // it is a regular change record
        if (csn.isOlderThan(start))
        { // we are outside the range: we reached the 'start' target
          break;
        }
        distanceToCounterRecords[STOP]++;
        status = cursor.getPrev(key, data, LockMode.DEFAULT);
        // loop to update the distance and possibly find a counter record
      }
      return true;
    }
    finally
    {
      close(cursor);
    }
  }
  /**
   * The diagram below shows a visual description of how the distance between
   * two CSNs in the database is computed.
   *
   * <pre>
   *     +--------+                        +--------+
   *     | CASE 1 |                        | CASE 2 |
   *     +--------+                        +--------+
   *
   *             CSN                               CSN
   *             -----                             -----
   *   START  => -----                   START  => -----
   *     ^       -----                     ^       -----
   *     |       -----                     |       -----
   *   dist 1    -----                   dist 1    -----
   *     |       -----                     |       -----
   *     v       -----                     v       -----
   *   CR 1&2 => [1000]                   CR 1  => [1000]
   *     ^       -----                             -----
   *     |       -----                             -----
   *   dist 2    -----                             -----
   *     |       -----                             -----
   *     v       -----                             -----
   *   STOP   => -----                             -----
   *             -----                             -----
   *     CR   => [2000]                   CR 2  => [2000]
   *             -----                     ^       -----
   *                                       |       -----
   *                                     dist 2    -----
   *                                       |       -----
   *                                       v       -----
   *                                     STOP   => -----
   * </pre>
   *
   * Explanation of the terms used:
   * <dl>
   * <dt>START</dt>
   * <dd>Start CSN for the count</dd>
   * <dt>STOP</dt>
   * <dd>Stop CSN for the count</dd>
   * <dt>dist</dt>
   * <dd>Distance from START (or STOP) to the counter record</dd>
   * <dt>CSN</dt>
   * <dd>Stands for "Change Sequence Number". Below it, the database is
   * symbolized, where each record is represented by using dashes "-----". The
   * database is ordered.</dd>
   * <dt>CR</dt>
   * <dd>Stands for "Counter Record". Counter Records are inserted in the
   * database along with real CSNs, but they are not real changes. They are only
   * used to speed up calculating the distance between 2 CSNs without the need
   * to scan the whole database in between.</dd>
   * </dl>
   */
  private long computeDistance(int[] counterValues,
      int[] distanceToCounterRecords)
  {
    if (counterValues[START] != 0)
    {
      if (counterValues[START] == counterValues[STOP])
      {
        // only one counter record between from and to - no need to use it
        return distanceToCounterRecords[START] + distanceToCounterRecords[STOP];
      }
      // at least 2 counter records between from and to
      return distanceToCounterRecords[START]
          + (counterValues[STOP] - counterValues[START])
          + distanceToCounterRecords[STOP];
    }
    return 0;
  }
  /**
   * Whether a provided CSN represents a counter record. A counter record is
   * used to store the time.
   *
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -63,7 +63,6 @@
import org.opends.server.tools.LDAPWriter;
import org.opends.server.types.*;
import org.opends.server.util.LDIFWriter;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -308,13 +307,6 @@
    ECLSimultaneousPsearches();
  }
  @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerFullTest10() throws Exception
  {
    // Test eligible count method.
    ECLGetEligibleCountTest();
  }
  // TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned
  // TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned
  // TODO:ECL Test invalid DN in cookie returns UNWILLING + message
@@ -2613,128 +2605,6 @@
    return null;
  }
  private void ECLGetEligibleCountTest() throws Exception
  {
    String tn = "ECLGetEligibleCountTest";
    debugInfo(tn, "Starting test\n\n");
    String user1entryUUID = "11111111-1112-1113-1114-111111111115";
    final CSN[] csns = generateCSNs(4, SERVER_ID_1);
    final CSN csn1 = csns[0];
    final CSN csn2 = csns[1];
    final CSN csn3 = csns[2];
    getCNIndexDB().setPurgeDelay(0);
    ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
    // this empty state will force to count from the start of the DB
    final ServerState fromStart = new ServerState();
    // The replication changelog is empty
    assertEquals(rsdtest.getEligibleCount(fromStart, csns[0]), 0);
    // Creates broker on o=test
    ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
        1000, replicationServerPort, brokerSessionTimeout, true);
    // Publish one first message
    DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, user1entryUUID);
    server01.publish(delMsg);
    debugInfo(tn, " publishes " + delMsg.getCSN());
    Thread.sleep(300);
    // From begin to now : 1 change
    assertEquals(rsdtest.getEligibleCount(fromStart, now()), 1);
    // Publish one second message
    delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn2, user1entryUUID);
    server01.publish(delMsg);
    debugInfo(tn, " publishes " + delMsg.getCSN());
    // From begin to now : 2 changes
    searchOnChangelog("(changenumber>=1)", 2, tn, SUCCESS);
    assertEquals(rsdtest.getEligibleCount(fromStart, now()), 2);
    // From begin to first change (inclusive) : 1 change = csn1
    assertEquals(rsdtest.getEligibleCount(fromStart, csn1), 1);
    final ServerState fromStateBeforeCSN1 = new ServerState();
    fromStateBeforeCSN1.update(csn1);
    // From state/csn1(exclusive) to csn1 (inclusive) : 0 change
    assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN1, csn1), 0);
    // From state/csn1(exclusive) to csn2 (inclusive) : 1 change = csn2
    assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN1, csn2), 1);
    final ServerState fromStateBeforeCSN2 = new ServerState();
    fromStateBeforeCSN2.update(csn2);
    // From state/csn2(exclusive) to now (inclusive) : 0 change
    assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN2, now()), 0);
    // Publish one third message
    delMsg = newDeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, csn3, user1entryUUID);
    server01.publish(delMsg);
    debugInfo(tn, " publishes " + delMsg.getCSN());
    Thread.sleep(300);
    fromStateBeforeCSN2.update(csn2);
    // From state/csn2(exclusive) to now : 1 change = csn3
    assertEquals(rsdtest.getEligibleCount(fromStateBeforeCSN2, now()), 1);
    boolean perfs=false;
    if (perfs)
    {
      // number of msgs used by the test
      final int maxMsg = 999999;
      // We need an RS configured with a window size bigger than the number
      // of msg used by the test.
      assertTrue(maxMsg<maxWindow);
      debugInfo(tn, "Perf test in compat mode - will generate " + maxMsg + " msgs.");
      for (int i=4; i<=maxMsg; i++)
      {
        CSN csnx = new CSN(TimeThread.getTime(), i, SERVER_ID_1);
        delMsg = newDeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, csnx, user1entryUUID);
        server01.publish(delMsg);
      }
      Thread.sleep(1000);
      debugInfo(tn, "Perfs test in compat - search lastChangeNumber");
      Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
      excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
      long t1 = TimeThread.getTime();
      long[] limits = replicationServer.getECLChangeNumberLimits(
          replicationServer.getEligibleCSN(excludedDomains), excludedDomains);
      assertEquals(limits[1], maxMsg);
      long t2 = TimeThread.getTime();
      debugInfo(tn, "Perfs - " + maxMsg + " counted in (ms):" + (t2 - t1));
      String filter = "(changenumber>=" + maxMsg + ")";
      searchOnChangelog(filter, 1, tn, SUCCESS);
      long t3 = TimeThread.getTime();
      debugInfo(tn, "Perfs - last change searched in (ms):" + (t3 - t2));
      filter = "(changenumber>=" + maxMsg + ")";
      searchOnChangelog(filter, 1, tn, SUCCESS);
      long t4 = TimeThread.getTime();
      debugInfo(tn, "Perfs - last change searched in (ms):" + (t4 - t3));
      filter = "(changenumber>=" + (maxMsg - 2) + ")";
      searchOnChangelog(filter, 3, tn, SUCCESS);
      long t5 = TimeThread.getTime();
      debugInfo(tn, "Perfs - last 3 changes searched in (ms):" + (t5 - t4));
    }
    stop(server01);
    debugInfo(tn, "Ending test with success");
  }
  private CSN now()
  {
    return new CSN(TimeThread.getTime(), 1, SERVER_ID_1);
  }
  /**
   * Test ECl entry attributes, and there configuration.
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -316,60 +316,13 @@
    }
  }
  @Test
  public void testGetCountNoCounterRecords() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
      for (CSN csn : csns)
      {
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      replicaDB.flush();
      assertEquals(replicaDB.getCount(csns[0], csns[0]), 1);
      assertEquals(replicaDB.getCount(csns[0], csns[1]), 2);
      assertEquals(replicaDB.getCount(csns[0], csns[4]), 5);
      assertEquals(replicaDB.getCount(null, csns[4]), 5);
      assertEquals(replicaDB.getCount(csns[0], null), 0);
      assertEquals(replicaDB.getCount(null, null), 5);
    }
    finally
    {
      remove(replicationServer);
    }
  }
  /**
   * Test the logic that manages counter records in the JEReplicaDB in order to
   * optimize the counting of record in the replication changelog db.
   * optimize the oldest and newest records in the replication changelog db.
   */
  @Test(enabled=true, groups = { "opendj-256" })
  void testDbCounts() throws Exception
  void testGetOldestNewestCSNs() throws Exception
  {
    // FIXME: for some reason this test is always failing in Jenkins when run as
    // part of the unit tests. Here is the output (the failure is 100%
    // reproducible and always has the same value of 3004):
    //
    // Failed Test:
    // org.opends.server.replication.server.JEReplicaDBTest#testDbCounts
    // [testng] Failure Cause: java.lang.AssertionError: AFTER PURGE
    // expected:<8000> but was:<3004>
    // [testng] org.testng.Assert.fail(Assert.java:84)
    // [testng] org.testng.Assert.failNotEquals(Assert.java:438)
    // [testng] org.testng.Assert.assertEquals(Assert.java:108)
    // [testng] org.testng.Assert.assertEquals(Assert.java:323)
    // [testng]
    // org.opends.server.replication.server.JEReplicaDBTest.testDBCount(JEReplicaDBTest.java:594)
    // [testng]
    // org.opends.server.replication.server.JEReplicaDBTest.testDbCounts(JEReplicaDBTest.java:389)
    // It's worth testing with 2 different setting for counterRecord
    // - a counter record is put every 10 Update msg in the db - just a unit
    //   setting.
@@ -381,13 +334,12 @@
    // - when start and stop are after the first counter record,
    // - when start and stop are before and after more than one counter record,
    // After a purge.
    // After shutdowning/closing and reopening the db.
    testDBCount(40, 10);
    // FIXME next line is the one failing with the stacktrace above
    testDBCount(4000, 1000);
    // After shutting down/closing and reopening the db.
    testGetOldestNewestCSNs(40, 10);
    testGetOldestNewestCSNs(4000, 1000);
  }
  private void testDBCount(int max, int counterWindow) throws Exception
  private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception
  {
    String tn = "testDBCount("+max+","+counterWindow+")";
    debugInfo(tn, "Starting test");
@@ -421,41 +373,6 @@
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
      // Test count in different subcases trying to handle all special cases
      // regarding the 'counter' record and 'count' algorithm
      assertCount(tn, replicaDB, csns[1], csns[1], 1, "FROM change1 TO change1 ");
      assertCount(tn, replicaDB, csns[1], csns[2], 2, "FROM change1 TO change2 ");
      assertCount(tn, replicaDB, csns[1], csns[counterWindow], counterWindow,
          "FROM change1 TO counterWindow=" + counterWindow);
      final int j = counterWindow + 1;
      assertCount(tn, replicaDB, csns[1], csns[j], j,
          "FROM change1 TO counterWindow+1=" + j);
      final int k = 2 * counterWindow;
      assertCount(tn, replicaDB, csns[1], csns[k], k,
          "FROM change1 TO 2*counterWindow=" + k);
      final int l = k + 1;
      assertCount(tn, replicaDB, csns[1], csns[l], l,
          "FROM change1 TO 2*counterWindow+1=" + l);
      assertCount(tn, replicaDB, csns[2], csns[5], 4,
          "FROM change2 TO change5 ");
      assertCount(tn, replicaDB, csns[(counterWindow + 2)], csns[(counterWindow + 5)], 4,
          "FROM counterWindow+2 TO counterWindow+5 ");
      assertCount(tn, replicaDB, csns[2], csns[(counterWindow + 5)], counterWindow + 4,
          "FROM change2 TO counterWindow+5 ");
      assertCount(tn, replicaDB, csns[(counterWindow + 4)], csns[(counterWindow + 4)], 1,
          "FROM counterWindow+4 TO counterWindow+4 ");
      CSN olderThanOldest = null;
      CSN newerThanNewest = new CSN(System.currentTimeMillis() + (2*(max+1)), 100, 1);
      // Now we want to test with start and stop outside of the db
      assertCount(tn, replicaDB, csns[1], newerThanNewest, max,
          "FROM our first generated change TO now (> newest change in the db)");
      assertCount(tn, replicaDB, olderThanOldest, newerThanNewest, max,
          "FROM null (start of time) TO now (> newest change in the db)");
      // Now we want to test that after closing and reopening the db, the
      // counting algo is well reinitialized and when new messages are added
      // the new counter are correctly generated.
@@ -468,9 +385,6 @@
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
      assertCount(tn, replicaDB, csns[1], newerThanNewest, max,
          "FROM our first generated change TO now (> newest change in the db)");
      // Populate the db with 'max' msg
      for (int i=max+1; i<=(2*max); i++)
      {
@@ -483,33 +397,15 @@
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
      assertCount(tn, replicaDB, csns[1], newerThanNewest, 2 * max,
          "FROM our first generated change TO now (> newest change in the db)");
      //
      replicaDB.setPurgeDelay(100);
      sleep(4000);
      long totalCount = replicaDB.getCount(null, null);
      debugInfo(tn, "FROM our first generated change TO now (> newest change in the db)" + " After purge, total count=" + totalCount);
      sleep(1000);
      String testcase = "AFTER PURGE (oldest, newest)=";
      debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest=");
      int expectedCnt;
      if (totalCount>1)
      {
        final int newestSeqnum = replicaDB.getNewestCSN().getSeqnum();
        final int oldestSeqnum = replicaDB.getOldestCSN().getSeqnum();
        expectedCnt = ((newestSeqnum - oldestSeqnum + 1)/2) + 1;
      }
      else
      {
        expectedCnt = 0;
      }
      assertCount(tn, replicaDB, csns[1], newerThanNewest, expectedCnt, "AFTER PURGE");
      // Clear ...
      debugInfo(tn,"clear:");
      replicaDB.clear();
@@ -530,12 +426,4 @@
    }
  }
  private void assertCount(String tn, JEReplicaDB replicaDB, CSN from, CSN to,
      int expectedCount, String testcase)
  {
    long actualCount = replicaDB.getCount(from, to);
    debugInfo(tn, testcase + " actualCount=" + actualCount);
    assertEquals(actualCount, expectedCount, testcase);
  }
}