mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.03.2013 e3add78e22e130ef6ece000a7989f4f4317d4da8
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -118,7 +118,7 @@
           "[draftCompat=" + draftCompat +
           "] [persistent=" + isPersistent +
           "] [startChangeNumber=" + lastChangeNumber +
           "] [isEndOfDraftCNReached=" + isEndOfCNIndexDBReached +
           "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
           "] [searchPhase=" + searchPhase +
           "] [startCookie=" + startCookie +
           "] [previousCookie=" + previousCookie +
@@ -574,7 +574,7 @@
    if (startChangeNumber <= 1)
    {
      // Request filter DOES NOT contain any first change number
      // So we'll generate from the first change number in the DraftCNdb
      // So we'll generate from the first change number in the CNIndexDB
      final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord();
      if (firstCNRecord == null)
      { // DB is empty or closed
@@ -590,7 +590,7 @@
    // Request filter DOES contain a startChangeNumber
    // Read the draftCNDb to see whether it contains startChangeNumber
    // Read the CNIndexDB to see whether it contains startChangeNumber
    CNIndexRecord startCNRecord = cnIndexDB.getRecord(startChangeNumber);
    if (startCNRecord != null)
    {
@@ -600,11 +600,11 @@
      return crossDomainStartState;
    }
    // startChangeNumber provided in the request IS NOT in the DraftCNDb
    // startChangeNumber provided in the request IS NOT in the CNIndexDB
    /*
     * Get the draftLimits (from the eligibleCSN got at the beginning of the
     * operation) in order to have the first and possible last change number.
     * Get the changeNumberLimits (from the eligibleCSN obtained at the start of
     * this method) in order to have the first and last change numbers.
     */
    final long[] limits = replicationServer.getECLChangeNumberLimits(
        eligibleCSN, excludedBaseDNs);
@@ -643,7 +643,7 @@
      cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
      return crossDomainStartState;
      // TODO:ECL ... ok we'll start from the end of the draftCNDb BUT ...
      // TODO:ECL ... ok we'll start from the end of the CNIndexDB BUT ...
      // this may be very long. Work on perf improvement here.
    }
@@ -828,7 +828,7 @@
                startStatesFromProvidedCookie.toString() ,sb.toString()));
      }
      // the next record from the DraftCNdb should be the one
      // the next record from the CNIndexDB should be the one
      startCookie = providedCookie;
      // Initializes each and every domain with the next(first) eligible message
@@ -1347,10 +1347,10 @@
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
      throws ChangelogException
  {
    // We also need to check if the draftCNdb is consistent with
    // We also need to check if the CNIndexDB is consistent with
    // the changelogdb.
    // if not, 2 potential reasons
    // a/ : changelog has been purged (trim)let's traverse the draftCNDb
    // a/ : changelog has been purged (trim)let's traverse the CNIndexDB
    // b/ : changelog is late .. let's traverse the changelogDb
    // The following loop allows to loop until being on the same cn
    // in the 2 dbs
@@ -1363,7 +1363,7 @@
    {
      if (isEndOfCNIndexDBReached)
      {
        // we are at the end of the DraftCNdb in the append mode
        // we are at the end of the CNIndexDB in the append mode
        assignNewChangeNumberAndStore(oldestChange);
        return true;
      }
@@ -1371,19 +1371,19 @@
      // the next change from the CNIndexDB
      final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord();
      final CSN csnFromDraftCNDb = currentRecord.getCSN();
      final DN dnFromDraftCNDb = currentRecord.getBaseDN();
      final CSN csnFromCNIndexDB = currentRecord.getCSN();
      final DN dnFromCNIndexDB = currentRecord.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("assignChangeNumber() generating change number "
            + " comparing the 2 db DNs :" + dnFromChangelogDb + "?="
            + csnFromChangelogDb + " timestamps:"
            + new Date(csnFromChangelogDb.getTime()) + " ?older"
            + new Date(csnFromDraftCNDb.getTime()));
            + new Date(csnFromCNIndexDB.getTime()));
      if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
          csnFromDraftCNDb, dnFromDraftCNDb))
          csnFromCNIndexDB, dnFromCNIndexDB))
      {
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number "
@@ -1395,7 +1395,7 @@
      }
      if (!csnFromDraftCNDb.older(csnFromChangelogDb))
      if (!csnFromCNIndexDB.older(csnFromChangelogDb))
      {
        // the change from the changelogDb is older
        // it should have been stored lately
@@ -1408,16 +1408,16 @@
      }
      // the change from the DraftCNDb is older
      // the change from the CNIndexDB is older
      // that means that the change has been purged from the
      // changelogDb (and DraftCNdb not yet been trimmed)
      // changelogDb (and CNIndexDB not yet been trimmed)
      try
      {
        // let's traverse the DraftCNdb searching for the change
        // let's traverse the CNIndexDB searching for the change
        // found in the changelogDb.
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number "
              + " will skip " + csnFromDraftCNDb
              + " will skip " + csnFromCNIndexDB
              + " and read next change from the CNIndexDB.");
        isEndOfCNIndexDBReached = !cnIndexDBCursor.next();
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -536,7 +536,7 @@
       * large and therefore won't contain all the changes. Some changes may
       * only be stored in the backing DB of the servers.
       * The total size of the receive queue is calculated by doing the sum of
       * the number of missing changes for every dbHandler.
       * the number of missing changes for every replicaDB.
       */
      ServerState latestState = replicationServerDomain.getLatestServerState();
      return ServerState.diffChanges(latestState, serverState);
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java
@@ -263,7 +263,7 @@
    // - in the ServerHandler for a given DS1, the stored state contains :
    // -- the max CSN produced by DS1
    // -- the last CSN consumed by DS1 from DS2..n
    // - in the RSdomain/dbHandler, the built-in state contains :
    // - in the ReplicationDomainDB/ReplicaDB, the built-in state contains:
    // -- the max CSN produced by each server
    // So for a given DS connected we can take the state and the max from
    // the DS/state.
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1376,20 +1376,20 @@
  public long[] getECLChangeNumberLimits(CSN crossDomainEligibleCSN,
      Set<String> excludedBaseDNs) throws DirectoryException
  {
    /* The content of the DraftCNdb depends on the SEARCH operations done before
     * requesting the change number. If no operations, DraftCNdb is empty.
    /* The content of the CNIndexDB depends on the SEARCH operations done before
     * requesting the change number. If no operations, CNIndexDB is empty.
     * The limits we want to get are the "potential" limits if a request was
     * done, the DraftCNdb is probably not complete to do that.
     * done, the CNIndexDB is probably not complete to do that.
     *
     * The first change number is :
     *  - the first record from the DraftCNdb
     *  - if none because DraftCNdb empty,
     *  - the first record from the CNIndexDB
     *  - if none because CNIndexDB empty,
     *      then
     *        if no change in replchangelog then return 0
     *        else return 1 (change number that WILL be returned to next search)
     *
     * The last change number is :
     *  - initialized with the last record from the DraftCNdb (0 if none)
     *  - initialized with the last record from the CNIndexDB (0 if none)
     *    and consider the genState associated
     *  - to the last change number, we add the count of updates in the
     *     replchangelog FROM that genState TO the crossDomainEligibleCSN
@@ -1451,12 +1451,12 @@
        }
        else
        {
          // There are records in the draftDB (so already returned to clients)
          // There are records in the CNIndexDB (so already returned to clients)
          // BUT
          // There is nothing related to this domain in the last draft record
          // There is nothing related to this domain in the last CNIndexRecord
          // (may be this domain was disabled when this record was returned).
          // In that case, are counted the changes from
          // the date of the most recent change from this last draft record
          // the date of the most recent change from this last CNIndexRecord
          if (newestDate == 0)
          {
            newestDate = csnForLastCN.getTime();
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2830,9 +2830,8 @@
    long res = 0;
    for (CSN csn : getLatestServerState())
    {
      int serverId = csn.getServerId();
      CSN lStartCSN = new CSN(startCSN.getTime(), startCSN.getSeqnum(),
          serverId);
      CSN lStartCSN =
          new CSN(startCSN.getTime(), startCSN.getSeqnum(), csn.getServerId());
      res += getCount(lStartCSN, endCSN);
    }
    return res;
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -104,13 +104,13 @@
  /**
   * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the
   * db managed by this DBHandler and starting at the position defined by a
   * given changeNumber.
   * db managed by this object and starting at the position defined by a given
   * changeNumber.
   *
   * @param startChangeNumber
   *          The position where the iterator must start.
   * @return a new ReplicationIterator that allows to browse this DB managed by
   *         this DBHandler and starting at the position defined by a given
   *         this object and starting at the position defined by a given
   *         changeNumber.
   * @throws ChangelogException
   *           if a database problem occurs.
opends/src/server/org/opends/server/replication/server/changelog/api/package-info.java
@@ -31,7 +31,7 @@
 * <ul>
 * <li>a changelog of all the changes that happened on each server in the
 * replication domain / suffix,</li>
 * <li>a draft changelog,</li>
 * <li>a changelog as defined by draft-good-ldap-changelog,</li>
 * <li>a state database containing specific information about each serverId in
 * the suffix, and in particular the generationId for each server.</li>
 * </ul>
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -49,7 +49,7 @@
/**
 * This class implements the interface between the underlying database
 * and the dbHandler class.
 * and the {@link JEChangeNumberIndexDB} class.
 * This is the only class that should have code using the BDB interfaces.
 */
public class DraftCNDB
@@ -76,7 +76,7 @@
    this.dbenv = dbenv;
    // Get or create the associated ReplicationServerDomain and Db.
    db = dbenv.getOrCreateDraftCNDb();
    db = dbenv.getOrCreateCNIndexDB();
  }
  /**
@@ -658,7 +658,7 @@
      dbenv.clearDb(oldDb);
      // RE-create the db
      db = dbenv.getOrCreateDraftCNDb();
      db = dbenv.getOrCreateCNIndexDB();
    }
    catch(Exception e)
    {
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -64,7 +64,7 @@
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
 */
public class DraftCNDbHandler implements ChangeNumberIndexDB, Runnable
public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable
{
  /**
   * The tracer object for the debug logger.
@@ -108,14 +108,14 @@
  /**
   * Creates a new dbHandler associated to a given LDAP server.
   * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
   *
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param replicationServer The ReplicationServer that creates this instance.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @throws ChangelogException If a database problem happened
   */
  public DraftCNDbHandler(ReplicationServer replicationServer,
  public JEChangeNumberIndexDB(ReplicationServer replicationServer,
      ReplicationDbEnv dbenv) throws ChangelogException
  {
    this.replicationServer = replicationServer;
@@ -133,7 +133,8 @@
        new AtomicLong((lastRecord != null) ? lastRecord.getChangeNumber() : 0);
    // Trimming thread
    thread = new DirectoryThread(this, "Replication DraftCN db");
    thread =
        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
    thread.start();
    // Monitoring registration
@@ -157,7 +158,7 @@
    db.addRecord(record);
    if (debugEnabled())
      TRACER.debugInfo("In DraftCNDbhandler.add, added: " + record);
      TRACER.debugInfo("In JEChangeNumberIndexDB.add, added: " + record);
  }
  /** {@inheritDoc} */
@@ -227,7 +228,7 @@
  public ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber)
      throws ChangelogException
  {
    return new DraftCNDbIterator(db, startChangeNumber);
    return new JEChangeNumberIndexDBCursor(db, startChangeNumber);
  }
  /** {@inheritDoc} */
@@ -332,14 +333,13 @@
      {
        for (int j = 0; j < 50; j++)
        {
          // let's traverse the DraftCNDb
          // let's traverse the CNIndexDB
          if (!cursor.next())
          {
            cursor.close();
            return;
          }
          // From the draftCNDb change record, get the domain and CSN
          final CNIndexRecord record = cursor.currentRecord();
          if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN()))
          {
@@ -352,8 +352,7 @@
          if (domain == null)
          {
            // the domain has been removed since the record was written in the
            // draftCNDb, thus it makes no sense to keep the record in the
            // draftCNDb.
            // CNIndexDB, thus it makes no sense to keep this record in the DB.
            cursor.delete();
            continue;
          }
@@ -379,12 +378,12 @@
            csnVector = csnStartStates.get(record.getBaseDN());
            if (debugEnabled())
              TRACER.debugInfo("DraftCNDBHandler:clear() - ChangeVector:"
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
                  + csnVector + " -- StartState:" + startState);
          }
          catch(Exception e)
          {
            // We couldn't parse the mdss from the DraftCNData Value
            // We could not parse the MultiDomainServerState from the record
            cursor.delete();
            continue;
          }
@@ -395,7 +394,7 @@
          {
            cursor.delete();
            if (debugEnabled())
              TRACER.debugInfo("DraftCNDBHandler:clear() - deleted " + csn
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn
                  + "Not covering startState");
            continue;
          }
@@ -427,8 +426,8 @@
  }
  /**
   * This internal class is used to implement the Monitoring capabilities
   * of the dbHandler.
   * This internal class is used to implement the Monitoring capabilities of the
   * JEChangeNumberIndexDB.
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
@@ -498,7 +497,8 @@
  @Override
  public String toString()
  {
    return "draftCNdb:" + " " + firstChangeNumber + " " + lastChangeNumber;
    return "JEChangeNumberIndexDB: " + firstChangeNumber + " "
        + lastChangeNumber;
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java
@@ -39,7 +39,7 @@
 * This class allows to iterate through the changes received from a given
 * LDAP Server Identifier.
 */
public class DraftCNDbIterator implements ChangeNumberIndexDBCursor
public class JEChangeNumberIndexDBCursor implements ChangeNumberIndexDBCursor
{
  private static final DebugTracer TRACER = getTracer();
  private DraftCNDBCursor draftCNDbCursor;
@@ -55,7 +55,7 @@
   * @throws ChangelogException
   *           If a database problem happened.
   */
  public DraftCNDbIterator(DraftCNDB db, long startChangeNumber)
  public JEChangeNumberIndexDBCursor(DraftCNDB db, long startChangeNumber)
      throws ChangelogException
  {
    draftCNDbCursor = db.openReadCursor(startChangeNumber);
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -59,8 +59,8 @@
  /**
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<DN, Map<Integer, DbHandler>> sourceDbHandlers =
      new ConcurrentHashMap<DN, Map<Integer, DbHandler>>();
  private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private final String dbDirectoryName;
  private final File dbDirectory;
@@ -152,9 +152,9 @@
    }
  }
  private Map<Integer, DbHandler> getDomainMap(DN baseDN)
  private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
  {
    final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN);
    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      return domainMap;
@@ -162,7 +162,7 @@
    return Collections.emptyMap();
  }
  private DbHandler getDbHandler(DN baseDN, int serverId)
  private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
  {
    return getDomainMap(baseDN).get(serverId);
  }
@@ -181,44 +181,44 @@
  private void commission(DN baseDN, int serverId, ReplicationServer rs)
      throws ChangelogException
  {
    getOrCreateDbHandler(baseDN, serverId, rs);
    getOrCreateReplicaDB(baseDN, serverId, rs);
  }
  /**
   * Returns a DbHandler, possibly creating it.
   * Returns a {@link JEReplicaDB}, possibly creating it.
   *
   * @param baseDN
   *          the baseDN for which to create a DbHandler
   *          the baseDN for which to create a ReplicaDB
   * @param serverId
   *          the baseserverId for which to create a DbHandler
   *          the serverId for which to create a ReplicaDB
   * @param rs
   *          the ReplicationServer
   * @return a Pair with the DbHandler and a a boolean indicating if it has been
   *         created
   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
   *         to be created
   * @throws ChangelogException
   *           if a problem occurred with the database
   */
  Pair<DbHandler, Boolean> getOrCreateDbHandler(DN baseDN,
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
      int serverId, ReplicationServer rs) throws ChangelogException
  {
    synchronized (sourceDbHandlers)
    synchronized (domainToReplicaDBs)
    {
      Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN);
      Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
      if (domainMap == null)
      {
        domainMap = new ConcurrentHashMap<Integer, DbHandler>();
        sourceDbHandlers.put(baseDN, domainMap);
        domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>();
        domainToReplicaDBs.put(baseDN, domainMap);
      }
      DbHandler dbHandler = domainMap.get(serverId);
      if (dbHandler == null)
      JEReplicaDB replicaDB = domainMap.get(serverId);
      if (replicaDB == null)
      {
        dbHandler =
            new DbHandler(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
        domainMap.put(serverId, dbHandler);
        return Pair.of(dbHandler, true);
        replicaDB =
            new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
        domainMap.put(serverId, replicaDB);
        return Pair.of(replicaDB, true);
      }
      return Pair.of(dbHandler, false);
      return Pair.of(replicaDB, false);
    }
  }
@@ -318,7 +318,7 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    for (DN baseDN : this.sourceDbHandlers.keySet())
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
    }
@@ -377,10 +377,10 @@
  @Override
  public long getCount(DN baseDN, CSN from, CSN to)
  {
    DbHandler dbHandler = getDbHandler(baseDN, from.getServerId());
    if (dbHandler != null)
    JEReplicaDB replicaDB = getReplicaDB(baseDN, from.getServerId());
    if (replicaDB != null)
    {
      return dbHandler.getCount(from, to);
      return replicaDB.getCount(from, to);
    }
    return 0;
  }
@@ -390,9 +390,9 @@
  public long getDomainChangesCount(DN baseDN)
  {
    long entryCount = 0;
    for (DbHandler dbHandler : getDomainMap(baseDN).values())
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      entryCount += dbHandler.getChangesCount();
      entryCount += replicaDB.getChangesCount();
    }
    return entryCount;
  }
@@ -401,17 +401,17 @@
  @Override
  public void shutdownDomain(DN baseDN)
  {
    shutdownDbHandlers(getDomainMap(baseDN));
    sourceDbHandlers.remove(baseDN);
    shutdownReplicaDBs(getDomainMap(baseDN));
    domainToReplicaDBs.remove(baseDN);
  }
  private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap)
  private void shutdownReplicaDBs(Map<Integer, JEReplicaDB> domainMap)
  {
    synchronized (domainMap)
    {
      for (DbHandler dbHandler : domainMap.values())
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        dbHandler.shutdown();
        replicaDB.shutdown();
      }
      domainMap.clear();
    }
@@ -422,9 +422,9 @@
  public ServerState getDomainOldestCSNs(DN baseDN)
  {
    final ServerState result = new ServerState();
    for (DbHandler dbHandler : getDomainMap(baseDN).values())
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      result.update(dbHandler.getOldestCSN());
      result.update(replicaDB.getOldestCSN());
    }
    return result;
  }
@@ -434,9 +434,9 @@
  public ServerState getDomainNewestCSNs(DN baseDN)
  {
    final ServerState result = new ServerState();
    for (DbHandler dbHandler : getDomainMap(baseDN).values())
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      result.update(dbHandler.getNewestCSN());
      result.update(replicaDB.getNewestCSN());
    }
    return result;
  }
@@ -451,22 +451,22 @@
    ChangelogException firstException = null;
    // 1- clear the replica DBs
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
    final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN);
    synchronized (domainMap)
    {
      for (DbHandler dbHandler : domainMap.values())
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        try
        {
          dbHandler.clear();
          replicaDB.clear();
        }
        catch (ChangelogException e)
        {
          firstException = e;
        }
      }
      shutdownDbHandlers(domainMap);
      sourceDbHandlers.remove(baseDN);
      shutdownReplicaDBs(domainMap);
      domainToReplicaDBs.remove(baseDN);
    }
    // 2- clear the ChangeNumber index DB
@@ -511,11 +511,11 @@
  @Override
  public void setPurgeDelay(long delay)
  {
    for (Map<Integer, DbHandler> domainMap : sourceDbHandlers.values())
    for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
    {
      for (DbHandler dbHandler : domainMap.values())
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        dbHandler.setPurgeDelay(delay);
        replicaDB.setPurgeDelay(delay);
      }
    }
  }
@@ -525,11 +525,11 @@
  public long getDomainLatestTrimDate(DN baseDN)
  {
    long latest = 0;
    for (DbHandler dbHandler : getDomainMap(baseDN).values())
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      if (latest == 0 || latest < dbHandler.getLatestTrimDate())
      if (latest == 0 || latest < replicaDB.getLatestTrimDate())
      {
        latest = dbHandler.getLatestTrimDate();
        latest = replicaDB.getLatestTrimDate();
      }
    }
    return latest;
@@ -539,12 +539,12 @@
  @Override
  public CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN)
  {
    final DbHandler dbHandler = getDbHandler(baseDN, serverId);
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    ReplicaDBCursor cursor = null;
    try
    {
      cursor = dbHandler.generateCursorFrom(startAfterCSN);
      cursor = replicaDB.generateCursorFrom(startAfterCSN);
      if (cursor != null && cursor.getChange() != null)
      {
        return cursor.getChange().getCSN();
@@ -572,7 +572,7 @@
      {
        try
        {
          cnIndexDB = new DraftCNDbHandler(replicationServer, this.dbEnv);
          cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
        }
        catch (Exception e)
        {
@@ -595,12 +595,12 @@
  public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
      CSN startAfterCSN)
  {
    DbHandler dbHandler = getDbHandler(baseDN, serverId);
    if (dbHandler != null)
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      try
      {
        ReplicaDBCursor cursor = dbHandler.generateCursorFrom(startAfterCSN);
        ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
        cursor.next();
        return cursor;
      }
@@ -617,12 +617,12 @@
  public boolean publishUpdateMsg(DN baseDN, int serverId,
      UpdateMsg updateMsg) throws ChangelogException
  {
    final Pair<DbHandler, Boolean> pair =
        getOrCreateDbHandler(baseDN, serverId, replicationServer);
    final DbHandler dbHandler = pair.getFirst();
    final Pair<JEReplicaDB, Boolean> pair =
        getOrCreateReplicaDB(baseDN, serverId, replicationServer);
    final JEReplicaDB replicaDB = pair.getFirst();
    final boolean wasCreated = pair.getSecond();
    dbHandler.add(updateMsg);
    replicaDB.add(updateMsg);
    return wasCreated;
  }
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -65,7 +65,7 @@
 *
 * This class publish some monitoring information below cn=monitor.
 */
public class DbHandler implements Runnable
public class JEReplicaDB implements Runnable
{
  /**
   * The msgQueue holds all the updates not yet saved to stable storage.
@@ -129,17 +129,17 @@
  private long trimAge;
  /**
   * Creates a new dbHandler associated to a given LDAP server.
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
   * @param id Identifier of the DB.
   * @param baseDN the baseDN for which this DB was created.
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param replicationServer The ReplicationServer that creates this ReplicaDB.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @param queueSize The queueSize to use when creating the dbHandler.
   * @param queueSize The queueSize to use when creating the ReplicaDB.
   * @throws ChangelogException If a database problem happened
   */
  public DbHandler(int id, DN baseDN, ReplicationServer replicationServer,
  public JEReplicaDB(int id, DN baseDN, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv, int queueSize) throws ChangelogException
  {
    this.replicationServer = replicationServer;
@@ -260,13 +260,13 @@
  /**
   * Generate a new {@link ReplicaDBCursor} that allows to browse the db managed
   * by this dbHandler and starting at the position defined by a given CSN.
   * by this ReplicaDB and starting at the position defined by a given CSN.
   *
   * @param startAfterCSN
   *          The position where the cursor must start. If null, start from the
   *          oldest CSN
   * @return a new {@link ReplicaDBCursor} that allows to browse the db managed
   *         by this dbHandler and starting at the position defined by a given
   *         by this ReplicaDB and starting at the position defined by a given
   *         CSN.
   * @throws ChangelogException
   *           if a database problem happened.
@@ -306,7 +306,7 @@
  }
  /**
   * Shutdown this dbHandler.
   * Shutdown this ReplicaDB.
   */
  public void shutdown()
  {
@@ -519,8 +519,8 @@
  }
  /**
   * This internal class is used to implement the Monitoring capabilities
   * of the dbHandler.
   * This internal class is used to implement the Monitoring capabilities of the
   * ReplicaDB.
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
@@ -625,7 +625,7 @@
  }
  /**
   * Return the size of the msgQueue (the memory cache of the DbHandler).
   * Return the size of the msgQueue (the memory cache of the ReplicaDB).
   * For test purpose.
   * @return The memory queue size.
   */
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -40,7 +40,7 @@
{
  private UpdateMsg currentChange;
  private ReplServerDBCursor cursor;
  private DbHandler dbHandler;
  private JEReplicaDB replicaDB;
  private ReplicationDB db;
  private CSN lastNonNullCurrentCSN;
@@ -53,16 +53,16 @@
   * @param startAfterCSN
   *          The CSN after which the cursor must start.If null, start from the
   *          oldest CSN
   * @param dbHandler
   *          The associated DbHandler.
   * @param replicaDB
   *          The associated JEReplicaDB.
   * @throws ChangelogException
   *           if a database problem happened.
   */
  public JEReplicaDBCursor(ReplicationDB db, CSN startAfterCSN,
      DbHandler dbHandler) throws ChangelogException
      JEReplicaDB replicaDB) throws ChangelogException
  {
    this.db = db;
    this.dbHandler = dbHandler;
    this.replicaDB = replicaDB;
    this.lastNonNullCurrentCSN = startAfterCSN;
    try
@@ -78,7 +78,7 @@
    if (cursor == null)
    {
      // flush the queue into the db
      dbHandler.flush();
      replicaDB.flush();
      // look again in the db
      cursor = db.openReadCursor(startAfterCSN);
@@ -111,7 +111,7 @@
          cursor.close();
          cursor = null;
        }
        dbHandler.flush();
        replicaDB.flush();
        try
        {
          cursor = db.openReadCursor(lastNonNullCurrentCSN);
@@ -141,7 +141,7 @@
        cursor.close();
        cursor = null;
      }
      this.dbHandler = null;
      this.replicaDB = null;
      this.db = null;
    }
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -54,7 +54,7 @@
/**
 * This class implements the interface between the underlying database
 * and the dbHandler class.
 * and the JEReplicaDB class.
 * This is the only class that should have code using the BDB interfaces.
 */
public class ReplicationDB
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -190,8 +190,7 @@
  }
  /**
   * Read the list of known servers from the database and start dbHandler for
   * each of them.
   * Read and return the list of known servers from the database.
   *
   * @return the {@link ChangelogState} read from the changelogState DB
   * @throws ChangelogException
@@ -591,7 +590,7 @@
     * @return the retrieved or created db.
     * @throws ChangelogException when a problem occurs.
     */
    public Database getOrCreateDraftCNDb() throws ChangelogException
    public Database getOrCreateCNIndexDB() throws ChangelogException
    {
      try
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -27,7 +27,10 @@
 */
package org.opends.server.replication.server;
import java.io.*;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.net.Socket;
import java.util.*;
@@ -55,7 +58,7 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
import org.opends.server.replication.server.changelog.je.JEChangeNumberIndexDB;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.tools.LDAPWriter;
@@ -67,7 +70,10 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
import org.testng.Assert;
import org.testng.annotations.*;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -372,8 +378,8 @@
    // search again the ECL, but search for first and last
    ECLCompatTestLimitsAndAdd(1,8, ts);
    // Test DraftCNDb is purged when replication change log is purged
    ECLPurgeDraftCNDbAfterChangelogClear();
    // Test CNIndexDB is purged when replication change log is purged
    ECLPurgeCNIndexDBAfterChangelogClear();
    // Test first and last are updated
    ECLCompatTestLimits(0,0, true);
@@ -2593,26 +2599,26 @@
  }
  /**
   * Put a short purge delay to the draftCNDB, clear the changelogDB,
   * expect the draftCNDb to be purged accordingly.
   * Put a short purge delay to the CNIndexDB, clear the changelogDB, expect the
   * CNIndexDB to be purged accordingly.
   */
  private void ECLPurgeDraftCNDbAfterChangelogClear() throws Exception
  private void ECLPurgeCNIndexDBAfterChangelogClear() throws Exception
  {
    String tn = "ECLPurgeDraftCNDbAfterChangelogClear";
    String tn = "ECLPurgeCNIndexDBAfterChangelogClear";
    debugInfo(tn, "Starting test\n\n");
    DraftCNDbHandler draftdb =
        (DraftCNDbHandler) replicationServer.getChangeNumberIndexDB();
    assertEquals(draftdb.count(), 8);
    draftdb.setPurgeDelay(1000);
    JEChangeNumberIndexDB cnIndexDB =
        (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
    assertEquals(cnIndexDB.count(), 8);
    cnIndexDB.setPurgeDelay(1000);
    clearChangelogDB(replicationServer);
    // Expect changes purged from the changelog db to be sometimes
    // also purged from the DraftCNDb.
    while (!draftdb.isEmpty())
    // also purged from the CNIndexDB.
    while (!cnIndexDB.isEmpty())
    {
      debugInfo(tn, "draftdb.count=" + draftdb.count());
      debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
      sleep(200);
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -43,27 +43,32 @@
import org.opends.server.util.StaticUtils;
import org.testng.annotations.Test;
import static org.opends.server.replication.server.changelog.je.DbHandlerTest.*;
import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*;
import static org.testng.Assert.*;
/**
 * Test the DraftCNDbHandler class with 2 kinds of cleaning of the db : -
 * Test the JEChangeNumberIndexDB class with 2 kinds of cleaning of the db : -
 * periodic trim - call to clear method()
 */
@SuppressWarnings("javadoc")
public class DraftCNDbHandlerTest extends ReplicationTestCase
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
{
  /**
   * This test makes basic operations of a DraftCNDb : - create the db - add
   * records - read them with a cursor - set a very short trim period - wait for
   * the db to be trimmed / here since the changes are not stored in the
   * replication changelog, the draftCNDb will be cleared.
   * This test makes basic operations of a JEChangeNumberIndexDB:
   * <ol>
   * <li>create the db</li>
   * <li>add records</li>
   * <li>read them with a cursor</li>
   * <li>set a very short trim period</li>
   * <li>wait for the db to be trimmed / here since the changes are not stored
   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
   * </ol>
   */
  @Test()
  void testDraftCNDbHandlerTrim() throws Exception
  void testTrim() throws Exception
  {
    ReplicationServer replicationServer = null;
    DraftCNDbHandler handler = null;
    JEChangeNumberIndexDB cnIndexDB = null;
    try
    {
      TestCaseUtils.startServer();
@@ -76,8 +81,8 @@
        2, 0, 100, null);
      replicationServer = new ReplicationServer(conf);
      handler = newDraftCNDbHandler(replicationServer);
      handler.setPurgeDelay(0);
      cnIndexDB = newCNIndexDB(replicationServer);
      cnIndexDB.setPurgeDelay(0);
      // Prepare data to be stored in the db
      int cn1 = 3;
@@ -95,16 +100,16 @@
      CSN[] csns = newCSNs(1, 0, 3);
      // Add records
      handler.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0]));
      handler.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1]));
      handler.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2]));
      cnIndexDB.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0]));
      cnIndexDB.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1]));
      cnIndexDB.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2]));
      // The ChangeNumber should not get purged
      final long firstChangeNumber = handler.getFirstRecord().getChangeNumber();
      final long firstChangeNumber = cnIndexDB.getFirstRecord().getChangeNumber();
      assertEquals(firstChangeNumber, cn1);
      assertEquals(handler.getLastRecord().getChangeNumber(), cn3);
      assertEquals(cnIndexDB.getLastRecord().getChangeNumber(), cn3);
      DraftCNDBCursor dbc = handler.getReadCursor(firstChangeNumber);
      DraftCNDBCursor dbc = cnIndexDB.getReadCursor(firstChangeNumber);
      try
      {
        assertEqualTo(dbc.currentRecord(), csns[0], baseDN1, value1);
@@ -123,21 +128,21 @@
        StaticUtils.close(dbc);
      }
      handler.setPurgeDelay(100);
      cnIndexDB.setPurgeDelay(100);
      // Check the db is cleared.
      while (!handler.isEmpty())
      while (!cnIndexDB.isEmpty())
      {
        Thread.sleep(200);
      }
      assertNull(handler.getFirstRecord());
      assertNull(handler.getLastRecord());
      assertEquals(handler.count(), 0);
      assertNull(cnIndexDB.getFirstRecord());
      assertNull(cnIndexDB.getLastRecord());
      assertEquals(cnIndexDB.count(), 0);
    }
    finally
    {
      if (handler != null)
        handler.shutdown();
      if (cnIndexDB != null)
        cnIndexDB.shutdown();
      remove(replicationServer);
    }
  }
@@ -149,11 +154,11 @@
    assertEquals(data.getPreviousCookie(), cookie);
  }
  private DraftCNDbHandler newDraftCNDbHandler(ReplicationServer rs) throws Exception
  private JEChangeNumberIndexDB newCNIndexDB(ReplicationServer rs) throws Exception
  {
    File testRoot = createCleanDir();
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(testRoot.getPath(), rs);
    return new DraftCNDbHandler(rs, dbEnv);
    return new JEChangeNumberIndexDB(rs, dbEnv);
  }
  private File createCleanDir() throws IOException
@@ -161,7 +166,7 @@
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
            + File.separator + "build");
    path = path + File.separator + "unit-tests" + File.separator + "DraftCNDbHandler";
    path = path + File.separator + "unit-tests" + File.separator + "JEChangeNumberIndexDB";
    final File testRoot = new File(path);
    TestCaseUtils.deleteDirectory(testRoot);
    testRoot.mkdirs();
@@ -169,20 +174,21 @@
  }
  /**
   * This test makes basic operations of a DraftCNDb and explicitly calls
   * the clear() method instead of waiting for the periodic trim to clear
   * This test makes basic operations of a JEChangeNumberIndexDB and explicitly
   * calls the clear() method instead of waiting for the periodic trim to clear
   * it.
   * - create the db
   * - add records
   * - read them with a cursor
   * - clear the db.
   * @throws Exception
   * <ol>
   * <li>create the db</li>
   * <li>add records</li>
   * <li>read them with a cursor</li>
   * <li>clear the db</li>
   * </ol>
   */
  @Test()
  void testDraftCNDbHandlerClear() throws Exception
  void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    DraftCNDbHandler handler = null;
    JEChangeNumberIndexDB cnIndexDB = null;
    try
    {
      TestCaseUtils.startServer();
@@ -195,10 +201,10 @@
        2, 0, 100, null);
      replicationServer = new ReplicationServer(conf);
      handler = newDraftCNDbHandler(replicationServer);
      handler.setPurgeDelay(0);
      cnIndexDB = newCNIndexDB(replicationServer);
      cnIndexDB.setPurgeDelay(0);
      assertTrue(handler.isEmpty());
      assertTrue(cnIndexDB.isEmpty());
      // Prepare data to be stored in the db
      int cn1 = 3;
@@ -216,50 +222,51 @@
      CSN[] csns = newCSNs(1, 0, 3);
      // Add records
      handler.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0]));
      handler.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1]));
      handler.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2]));
      cnIndexDB.addRecord(new CNIndexRecord(cn1, value1, baseDN1, csns[0]));
      cnIndexDB.addRecord(new CNIndexRecord(cn2, value2, baseDN2, csns[1]));
      cnIndexDB.addRecord(new CNIndexRecord(cn3, value3, baseDN3, csns[2]));
      Thread.sleep(500);
      // Checks
      assertEquals(handler.getFirstRecord().getChangeNumber(), cn1);
      assertEquals(handler.getLastRecord().getChangeNumber(), cn3);
      assertEquals(cnIndexDB.getFirstRecord().getChangeNumber(), cn1);
      assertEquals(cnIndexDB.getLastRecord().getChangeNumber(), cn3);
      assertEquals(handler.count(), 3, "Db count");
      assertFalse(handler.isEmpty());
      assertEquals(cnIndexDB.count(), 3, "Db count");
      assertFalse(cnIndexDB.isEmpty());
      assertEquals(getPreviousCookie(handler, cn1), value1);
      assertEquals(getPreviousCookie(handler, cn2), value2);
      assertEquals(getPreviousCookie(handler, cn3), value3);
      assertEquals(getPreviousCookie(cnIndexDB, cn1), value1);
      assertEquals(getPreviousCookie(cnIndexDB, cn2), value2);
      assertEquals(getPreviousCookie(cnIndexDB, cn3), value3);
      ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(cn1);
      ChangeNumberIndexDBCursor cursor = cnIndexDB.getCursorFrom(cn1);
      assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
      cursor = handler.getCursorFrom(cn2);
      cursor = cnIndexDB.getCursorFrom(cn2);
      assertCursorReadsInOrder(cursor, cn2, cn3);
      cursor = handler.getCursorFrom(cn3);
      cursor = cnIndexDB.getCursorFrom(cn3);
      assertCursorReadsInOrder(cursor, cn3);
      handler.clear();
      cnIndexDB.clear();
      // Check the db is cleared.
      assertNull(handler.getFirstRecord());
      assertNull(handler.getLastRecord());
      assertEquals(handler.count(), 0);
      assertTrue(handler.isEmpty());
      assertNull(cnIndexDB.getFirstRecord());
      assertNull(cnIndexDB.getLastRecord());
      assertEquals(cnIndexDB.count(), 0);
      assertTrue(cnIndexDB.isEmpty());
    }
    finally
    {
      if (handler != null)
        handler.shutdown();
      if (cnIndexDB != null)
        cnIndexDB.shutdown();
      remove(replicationServer);
    }
  }
  private String getPreviousCookie(DraftCNDbHandler handler, long changeNumber) throws Exception
  private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,
      long changeNumber) throws Exception
  {
    ChangeNumberIndexDBCursor cursor = handler.getCursorFrom(changeNumber);
    ChangeNumberIndexDBCursor cursor = cnIndexDB.getCursorFrom(changeNumber);
    try
    {
      return cursor.getRecord().getPreviousCookie();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
@@ -52,10 +52,10 @@
import static org.testng.Assert.*;
/**
 * Test the dbHandler class
 * Test the JEReplicaDB class
 */
@SuppressWarnings("javadoc")
public class DbHandlerTest extends ReplicationTestCase
public class JEReplicaDBTest extends ReplicationTestCase
{
  /** The tracer object for the debug logger */
  private static final DebugTracer TRACER = getTracer();
@@ -80,66 +80,66 @@
  }
  @Test(enabled=true)
  void testDbHandlerTrim() throws Exception
  void testTrim() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      DbHandler handler = newDbHandler(replicationServer);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 5);
      handler.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      handler.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
      handler.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
      //--
      // Iterator tests with memory queue only populated
      // verify that memory queue is populated
      assertEquals(handler.getQueueSize(),3);
      assertEquals(replicaDB.getQueueSize(), 3);
      assertFoundInOrder(handler, csns[0], csns[1], csns[2]);
      assertNotFound(handler, csns[4]);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
      //--
      // Iterator tests with db only populated
      Thread.sleep(1000); // let the time for flush to happen
      // verify that memory queue is empty (all changes flushed in the db)
      assertEquals(handler.getQueueSize(),0);
      assertEquals(replicaDB.getQueueSize(), 0);
      assertFoundInOrder(handler, csns[0], csns[1], csns[2]);
      assertNotFound(handler, csns[4]);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
      assertEquals(handler.getOldestCSN(), csns[0]);
      assertEquals(handler.getNewestCSN(), csns[2]);
      assertEquals(replicaDB.getOldestCSN(), csns[0]);
      assertEquals(replicaDB.getNewestCSN(), csns[2]);
      //--
      // Cursor tests with db and memory queue populated
      // all changes in the db - add one in the memory queue
      handler.add(update4);
      replicaDB.add(update4);
      // verify memory queue contains this one
      assertEquals(handler.getQueueSize(),1);
      assertEquals(replicaDB.getQueueSize(), 1);
      assertFoundInOrder(handler, csns[0], csns[1], csns[2], csns[3]);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
      // Test cursor from existing CSN at the limit between queue and db
      assertFoundInOrder(handler, csns[2], csns[3]);
      assertFoundInOrder(handler, csns[3]);
      assertNotFound(handler, csns[4]);
      assertFoundInOrder(replicaDB, csns[2], csns[3]);
      assertFoundInOrder(replicaDB, csns[3]);
      assertNotFound(replicaDB, csns[4]);
      handler.setPurgeDelay(1);
      replicaDB.setPurgeDelay(1);
      boolean purged = false;
      int count = 300;  // wait at most 60 seconds
      while (!purged && (count > 0))
      {
        CSN oldestCSN = handler.getOldestCSN();
        CSN newestCSN = handler.getNewestCSN();
        CSN oldestCSN = replicaDB.getOldestCSN();
        CSN newestCSN = replicaDB.getNewestCSN();
        if (!oldestCSN.equals(csns[3]) || !newestCSN.equals(csns[3]))
        {
          TestCaseUtils.sleep(100);
@@ -176,10 +176,10 @@
    return new ReplicationServer(conf);
  }
  private DbHandler newDbHandler(ReplicationServer replicationServer) throws Exception
  private JEReplicaDB newReplicaDB(ReplicationServer replicationServer) throws Exception
  {
    JEChangelogDB changelogDB = (JEChangelogDB) replicationServer.getChangelogDB();
    return changelogDB.getOrCreateDbHandler(TEST_ROOT_DN, 1, replicationServer).getFirst();
    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, replicationServer).getFirst();
  }
  private File createCleanDir() throws IOException
@@ -187,21 +187,21 @@
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
            + File.separator + "build");
    path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
    path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB";
    final File testRoot = new File(path);
    TestCaseUtils.deleteDirectory(testRoot);
    testRoot.mkdirs();
    return testRoot;
  }
  private void assertFoundInOrder(DbHandler handler, CSN... csns) throws Exception
  private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception
  {
    if (csns.length == 0)
    {
      return;
    }
    ReplicaDBCursor cursor = handler.generateCursorFrom(csns[0]);
    ReplicaDBCursor cursor = replicaDB.generateCursorFrom(csns[0]);
    try
    {
      assertNull(cursor.getChange());
@@ -220,12 +220,12 @@
    }
  }
  private void assertNotFound(DbHandler handler, CSN csn)
  private void assertNotFound(JEReplicaDB replicaDB, CSN csn)
  {
    ReplicaDBCursor cursor = null;
    try
    {
      cursor = handler.generateCursorFrom(csn);
      cursor = replicaDB.generateCursorFrom(csn);
      fail("Expected exception");
    }
    catch (ChangelogException e)
@@ -239,37 +239,37 @@
  }
  /**
   * Test the feature of clearing a dbHandler used by a replication server.
   * The clear feature is used when a replication server receives a request
   * to reset the generationId of a given domain.
   * Test the feature of clearing a JEReplicaDB used by a replication server.
   * The clear feature is used when a replication server receives a request to
   * reset the generationId of a given domain.
   */
  @Test(enabled=true)
  void testDbHandlerClear() throws Exception
  void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      DbHandler handler = newDbHandler(replicationServer);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 3);
      // Add the changes
      handler.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      handler.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
      handler.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
      // Check they are here
      assertEquals(csns[0], handler.getOldestCSN());
      assertEquals(csns[2], handler.getNewestCSN());
      assertEquals(csns[0], replicaDB.getOldestCSN());
      assertEquals(csns[2], replicaDB.getNewestCSN());
      // Clear ...
      handler.clear();
      replicaDB.clear();
      // Check the db is cleared.
      assertEquals(null, handler.getOldestCSN());
      assertEquals(null, handler.getNewestCSN());
      assertEquals(null, replicaDB.getOldestCSN());
      assertEquals(null, replicaDB.getNewestCSN());
    }
    finally
@@ -287,29 +287,29 @@
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      DbHandler handler = newDbHandler(replicationServer);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
      for (int i = 0; i < 5; i++)
      {
        if (i != 3)
        {
          handler.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        }
      }
      handler.flush();
      replicaDB.flush();
      cursor = handler.generateCursorFrom(csns[0]);
      cursor = replicaDB.generateCursorFrom(csns[0]);
      assertTrue(cursor.next());
      assertEquals(cursor.getChange().getCSN(), csns[1]);
      StaticUtils.close(cursor);
      cursor = handler.generateCursorFrom(csns[3]);
      cursor = replicaDB.generateCursorFrom(csns[3]);
      assertTrue(cursor.next());
      assertEquals(cursor.getChange().getCSN(), csns[4]);
      StaticUtils.close(cursor);
      cursor = handler.generateCursorFrom(csns[4]);
      cursor = replicaDB.generateCursorFrom(csns[4]);
      assertFalse(cursor.next());
      assertNull(cursor.getChange());
    }
@@ -328,21 +328,21 @@
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      DbHandler handler = newDbHandler(replicationServer);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
      for (CSN csn : csns)
      {
        handler.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      handler.flush();
      replicaDB.flush();
      assertEquals(handler.getCount(csns[0], csns[0]), 1);
      assertEquals(handler.getCount(csns[0], csns[1]), 2);
      assertEquals(handler.getCount(csns[0], csns[4]), 5);
      assertEquals(handler.getCount(null, csns[4]), 5);
      assertEquals(handler.getCount(csns[0], null), 0);
      assertEquals(handler.getCount(null, null), 5);
      assertEquals(replicaDB.getCount(csns[0], csns[0]), 1);
      assertEquals(replicaDB.getCount(csns[0], csns[1]), 2);
      assertEquals(replicaDB.getCount(csns[0], csns[4]), 5);
      assertEquals(replicaDB.getCount(null, csns[4]), 5);
      assertEquals(replicaDB.getCount(csns[0], null), 0);
      assertEquals(replicaDB.getCount(null, null), 5);
    }
    finally
    {
@@ -351,7 +351,7 @@
  }
  /**
   * Test the logic that manages counter records in the DbHandler in order to
   * Test the logic that manages counter records in the JEReplicaDB in order to
   * optimize the counting of record in the replication changelog db.
   */
  @Test(enabled=true, groups = { "opendj-256" })
@@ -362,7 +362,7 @@
    // reproducible and always has the same value of 3004):
    //
    // Failed Test:
    // org.opends.server.replication.server.DbHandlerTest#testDbCounts
    // org.opends.server.replication.server.JEReplicaDBTest#testDbCounts
    // [testng] Failure Cause: java.lang.AssertionError: AFTER PURGE
    // expected:<8000> but was:<3004>
    // [testng] org.testng.Assert.fail(Assert.java:84)
@@ -370,9 +370,9 @@
    // [testng] org.testng.Assert.assertEquals(Assert.java:108)
    // [testng] org.testng.Assert.assertEquals(Assert.java:323)
    // [testng]
    // org.opends.server.replication.server.DbHandlerTest.testDBCount(DbHandlerTest.java:594)
    // org.opends.server.replication.server.JEReplicaDBTest.testDBCount(JEReplicaDBTest.java:594)
    // [testng]
    // org.opends.server.replication.server.DbHandlerTest.testDbCounts(DbHandlerTest.java:389)
    // org.opends.server.replication.server.JEReplicaDBTest.testDbCounts(JEReplicaDBTest.java:389)
    // It's worth testing with 2 different setting for counterRecord
    // - a counter record is put every 10 Update msg in the db - just a unit
@@ -399,7 +399,7 @@
    File testRoot = null;
    ReplicationServer replicationServer = null;
    ReplicationDbEnv dbEnv = null;
    DbHandler handler = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
@@ -407,8 +407,8 @@
      testRoot = createCleanDir();
      dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer);
      handler = new DbHandler(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
      handler.setCounterRecordWindowSize(counterWindow);
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
      replicaDB.setCounterRecordWindowSize(counterWindow);
      // Populate the db with 'max' msg
      int mySeqnum = 1;
@@ -417,37 +417,37 @@
      for (int i=1; i<=max; i++)
      {
        csns[i] = new CSN(now + i, mySeqnum, 1);
        handler.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      handler.flush();
      replicaDB.flush();
      assertEquals(handler.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(handler.getNewestCSN(), csns[max], "Wrong newest CSN");
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
      // Test count in different subcases trying to handle all special cases
      // regarding the 'counter' record and 'count' algorithm
      assertCount(tn, handler, csns[1], csns[1], 1, "FROM change1 TO change1 ");
      assertCount(tn, handler, csns[1], csns[2], 2, "FROM change1 TO change2 ");
      assertCount(tn, handler, csns[1], csns[counterWindow], counterWindow,
      assertCount(tn, replicaDB, csns[1], csns[1], 1, "FROM change1 TO change1 ");
      assertCount(tn, replicaDB, csns[1], csns[2], 2, "FROM change1 TO change2 ");
      assertCount(tn, replicaDB, csns[1], csns[counterWindow], counterWindow,
          "FROM change1 TO counterWindow=" + counterWindow);
      final int j = counterWindow + 1;
      assertCount(tn, handler, csns[1], csns[j], j,
      assertCount(tn, replicaDB, csns[1], csns[j], j,
          "FROM change1 TO counterWindow+1=" + j);
      final int k = 2 * counterWindow;
      assertCount(tn, handler, csns[1], csns[k], k,
      assertCount(tn, replicaDB, csns[1], csns[k], k,
          "FROM change1 TO 2*counterWindow=" + k);
      final int l = k + 1;
      assertCount(tn, handler, csns[1], csns[l], l,
      assertCount(tn, replicaDB, csns[1], csns[l], l,
          "FROM change1 TO 2*counterWindow+1=" + l);
      assertCount(tn, handler, csns[2], csns[5], 4,
      assertCount(tn, replicaDB, csns[2], csns[5], 4,
          "FROM change2 TO change5 ");
      assertCount(tn, handler, csns[(counterWindow + 2)], csns[(counterWindow + 5)], 4,
      assertCount(tn, replicaDB, csns[(counterWindow + 2)], csns[(counterWindow + 5)], 4,
          "FROM counterWindow+2 TO counterWindow+5 ");
      assertCount(tn, handler, csns[2], csns[(counterWindow + 5)], counterWindow + 4,
      assertCount(tn, replicaDB, csns[2], csns[(counterWindow + 5)], counterWindow + 4,
          "FROM change2 TO counterWindow+5 ");
      assertCount(tn, handler, csns[(counterWindow + 4)], csns[(counterWindow + 4)], 1,
      assertCount(tn, replicaDB, csns[(counterWindow + 4)], csns[(counterWindow + 4)], 1,
          "FROM counterWindow+4 TO counterWindow+4 ");
      CSN olderThanOldest = null;
@@ -455,78 +455,78 @@
      // Now we want to test with start and stop outside of the db
      assertCount(tn, handler, csns[1], newerThanNewest, max,
      assertCount(tn, replicaDB, csns[1], newerThanNewest, max,
          "FROM our first generated change TO now (> newest change in the db)");
      assertCount(tn, handler, olderThanOldest, newerThanNewest, max,
      assertCount(tn, replicaDB, olderThanOldest, newerThanNewest, max,
          "FROM null (start of time) TO now (> newest change in the db)");
      // Now we want to test that after closing and reopening the db, the
      // counting algo is well reinitialized and when new messages are added
      // the new counter are correctly generated.
      debugInfo(tn,"SHUTDOWN handler and recreate");
      handler.shutdown();
      debugInfo(tn, "SHUTDOWN replicaDB and recreate");
      replicaDB.shutdown();
      handler = new DbHandler(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
      handler.setCounterRecordWindowSize(counterWindow);
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
      replicaDB.setCounterRecordWindowSize(counterWindow);
      assertEquals(handler.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(handler.getNewestCSN(), csns[max], "Wrong newest CSN");
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
      assertCount(tn, handler, csns[1], newerThanNewest, max,
      assertCount(tn, replicaDB, csns[1], newerThanNewest, max,
          "FROM our first generated change TO now (> newest change in the db)");
      // Populate the db with 'max' msg
      for (int i=max+1; i<=(2*max); i++)
      {
        csns[i] = new CSN(now + i, mySeqnum, 1);
        handler.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      handler.flush();
      replicaDB.flush();
      assertEquals(handler.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(handler.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
      assertCount(tn, handler, csns[1], newerThanNewest, 2 * max,
      assertCount(tn, replicaDB, csns[1], newerThanNewest, 2 * max,
          "FROM our first generated change TO now (> newest change in the db)");
      //
      handler.setPurgeDelay(100);
      replicaDB.setPurgeDelay(100);
      sleep(4000);
      long totalCount = handler.getCount(null, null);
      long totalCount = replicaDB.getCount(null, null);
      debugInfo(tn, "FROM our first generated change TO now (> newest change in the db)" + " After purge, total count=" + totalCount);
      String testcase = "AFTER PURGE (oldest, newest)=";
      debugInfo(tn, testcase + handler.getOldestCSN() + handler.getNewestCSN());
      assertEquals(handler.getNewestCSN(), csns[2 * max], "Newest=");
      debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest=");
      int expectedCnt;
      if (totalCount>1)
      {
        final int newestSeqnum = handler.getNewestCSN().getSeqnum();
        final int oldestSeqnum = handler.getOldestCSN().getSeqnum();
        final int newestSeqnum = replicaDB.getNewestCSN().getSeqnum();
        final int oldestSeqnum = replicaDB.getOldestCSN().getSeqnum();
        expectedCnt = ((newestSeqnum - oldestSeqnum + 1)/2) + 1;
      }
      else
      {
        expectedCnt = 0;
      }
      assertCount(tn, handler, csns[1], newerThanNewest, expectedCnt, "AFTER PURGE");
      assertCount(tn, replicaDB, csns[1], newerThanNewest, expectedCnt, "AFTER PURGE");
      // Clear ...
      debugInfo(tn,"clear:");
      handler.clear();
      replicaDB.clear();
      // Check the db is cleared.
      assertEquals(null, handler.getOldestCSN());
      assertEquals(null, handler.getNewestCSN());
      assertEquals(null, replicaDB.getOldestCSN());
      assertEquals(null, replicaDB.getNewestCSN());
      debugInfo(tn,"Success");
    }
    finally
    {
      if (handler != null)
        handler.shutdown();
      if (replicaDB != null)
        replicaDB.shutdown();
      if (dbEnv != null)
        dbEnv.shutdown();
      remove(replicationServer);
@@ -534,10 +534,10 @@
    }
  }
  private void assertCount(String tn, DbHandler handler, CSN from, CSN to,
  private void assertCount(String tn, JEReplicaDB replicaDB, CSN from, CSN to,
      int expectedCount, String testcase)
  {
    long actualCount = handler.getCount(from, to);
    long actualCount = replicaDB.getCount(from, to);
    debugInfo(tn, testcase + " actualCount=" + actualCount);
    assertEquals(actualCount, expectedCount, testcase);
  }