mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
06.41.2013 8ed297692b7674b67b8d05a26fa9b04c20930e37
OPENDJ-1116 Introduce abstraction for the changelog DB

Added ChangelogDB interface to abstract all the remaining code away from JE.



ChangelogDB.java, ChangelogState.java, Pair.java: ADDED

JEChangelogDB.java: ADDED
Created from code in ReplicationServer and ReplicationServerDomain.


ReplicationServer.java:
Moved a lot of code to JEChangelogDB.
Added ChangelogDB field and delegated existing method calls to it.
Added getChangelogDB().
Made getChangeNumberIndexDB() package protected for the tests.

ReplicationServerDomain.java:
Moved a lot of code to JEChangelogDB.
Replaced sourceDbHandlers field with ChangelogDB field.
Renamed publishMessage() to publishUpdateMsg().
FIXME!!!! Bug: generationIdSavedStatus is not protected by synchronized (sourceDbHandlers) !!!!!

ReplicationDbEnv.java:
Changed void initializeFromChangelogStateDB() to ChangelogState readChangelogState()


ExternalChangeLogTest.java:
Moved from package org.opends.server.replication to org.opends.server.replication.server.
Extracted method getReplicationDomainStartState()

ChangelogException.java:
Changed one ctor visibility to public.
4 files added
1 files renamed
4 files modified
1386 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ChangelogState.java 104 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 147 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 235 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java 229 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 452 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/util/Pair.java 162 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 36 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ChangelogState.java
New file
@@ -0,0 +1,104 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.server;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
 * This is the changelog state stored in the changelogStateDB. For each
 * replication domain, it contains:
 * <ul>
 * <li>its generationId</li>
 * <li>the list of serverIds composing it</li>
 * </ul>
 * <p>
 * This class is used during replication initialization to decouple the code
 * that reads the changelogStateDB from the code that makes use of its data.
 */
public class ChangelogState
{
  private final Map<String, Long> domainToGenerationId =
      new HashMap<String, Long>();
  private final Map<String, List<Integer>> domainToServerIds =
      new HashMap<String, List<Integer>>();
  /**
   * Sets the generationId for the supplied replication domain.
   *
   * @param baseDn
   *          the targeted replication domain baseDN
   * @param generationId
   *          the generation Id to set
   */
  public void setDomainGenerationId(String baseDn, long generationId)
  {
    domainToGenerationId.put(baseDn, generationId);
  }
  /**
   * Adds the serverId to the serverIds list of the supplied replication domain.
   *
   * @param serverId
   *          the serverId to add
   * @param baseDn
   *          the targeted replication domain baseDN
   */
  public void addServerIdToDomain(int serverId, String baseDn)
  {
    List<Integer> serverIds = domainToServerIds.get(baseDn);
    if (serverIds == null)
    {
      serverIds = new LinkedList<Integer>();
      domainToServerIds.put(baseDn, serverIds);
    }
    serverIds.add(serverId);
  }
  /**
   * Returns the Map of domainBaseDN => generationId.
   *
   * @return a Map of domainBaseDN => generationId
   */
  public Map<String, Long> getDomainToGenerationId()
  {
    return domainToGenerationId;
  }
  /**
   * Returns the Map of domainBaseDN => List&lt;serverId&gt;.
   *
   * @return a Map of domainBaseDN => List&lt;serverId&gt;.
   */
  public Map<String, List<Integer>> getDomainToServerIds()
  {
    return domainToServerIds;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -27,7 +27,6 @@
 */
package org.opends.server.replication.server;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.net.*;
@@ -54,10 +53,9 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.CNIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
import org.opends.server.replication.server.changelog.je.ReplicationDbEnv;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.types.*;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.ServerConstants;
@@ -101,10 +99,9 @@
          new HashMap<String, ReplicationServerDomain>();
  private volatile boolean shutdown = false;
  private ReplicationDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
  private String dbDirname = null;
  private final ChangelogDB changelogDB = new JEChangelogDB(this);
  /**
   * The delay (in sec) after which the changes must be deleted from the
@@ -225,30 +222,11 @@
      replicationServerUrls = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    purgeDelay = configuration.getReplicationPurgeDelay();
    dbDirname = configuration.getReplicationDBDirectory();
    rcvWindow = configuration.getWindowSize();
    if (dbDirname == null)
    {
      dbDirname = "changelogDb";
    }
    // Check that this path exists or create it.
    File f = getFileForPath(dbDirname);
    try
    {
      if (!f.exists())
      {
        f.mkdir();
      }
    }
    catch (Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(e.getLocalizedMessage());
      mb.append(" ");
      mb.append(String.valueOf(getFileForPath(dbDirname)));
      Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
      throw new ConfigException(msg, e);
    }
    this.changelogDB.setReplicationDBDirectory(configuration
        .getReplicationDBDirectory());
    groupId = (byte)configuration.getGroupId();
    weight = configuration.getWeight();
    assuredTimeout = configuration.getAssuredTimeout();
@@ -504,10 +482,7 @@
    try
    {
      // Initialize the replicationServer database.
      dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
          this);
      dbEnv.initializeFromChangelogStateDB();
      this.changelogDB.initializeDB();
      setServerURL();
      listenSocket = new ServerSocket();
@@ -539,16 +514,9 @@
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " successfully initialized");
    } catch (ChangelogException e)
    {
      Message message = ERR_COULD_NOT_READ_DB.get(
              getFileForPath(dbDirname).getAbsolutePath(),
              e.getLocalizedMessage());
      logError(message);
    } catch (UnknownHostException e)
    {
      Message message = ERR_UNKNOWN_HOSTNAME.get();
      logError(message);
      logError(ERR_UNKNOWN_HOSTNAME.get());
    } catch (IOException e)
    {
      Message message =
@@ -827,37 +795,12 @@
    shutdownECL();
    if (dbEnv != null)
    {
      dbEnv.shutdown();
    }
    this.changelogDB.shutdownDB();
    // Remove this instance from the global instance list
    allInstances.remove(this);
  }
  /**
   * Creates a new DB handler for this ReplicationServer and the serverId and DN
   * given in parameter.
   *
   * @param serverId
   *          The serverId for which the dbHandler must be created.
   * @param baseDn
   *          The DN for which the dbHandler must be created.
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws ChangelogException
   *           in case of underlying database problem.
   */
  public DbHandler newDbHandler(int serverId, String baseDn)
      throws ChangelogException
  {
    return new DbHandler(serverId, baseDn, this, dbEnv, queueSize);
  }
  /**
   * Clears the generationId for the replicationServerDomain related to the
   * provided baseDn.
@@ -867,18 +810,6 @@
   */
  public void clearGenerationId(String baseDn)
  {
    try
    {
      dbEnv.clearGenerationId(baseDn);
    }
    catch (Exception ignored)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
      }
    }
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
@@ -962,11 +893,7 @@
    if (newPurgeDelay != purgeDelay)
    {
      purgeDelay = newPurgeDelay;
      // propagate
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        domain.setPurgeDelay(purgeDelay*1000);
      }
      this.changelogDB.setPurgeDelay(purgeDelay * 1000);
    }
    rcvWindow = configuration.getWindowSize();
@@ -1047,7 +974,7 @@
    }
    final String newDir = configuration.getReplicationDBDirectory();
    if (newDir != null && !dbDirname.equals(newDir))
    if (newDir != null && !this.changelogDB.getDBDirName().equals(newDir))
    {
      return new ConfigChangeResult(ResultCode.SUCCESS, true);
    }
@@ -1597,7 +1524,7 @@
   * @throws DirectoryException
   *           when needed.
   */
  public ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
  ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
  {
    synchronized (cnIndexDBLock)
    {
@@ -1605,9 +1532,9 @@
      {
        if (cnIndexDB == null)
        {
          cnIndexDB = new DraftCNDbHandler(this, this.dbEnv);
          cnIndexDB = this.changelogDB.newChangeNumberIndexDB();
          final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord();
          // initialization of the lastGeneratedChangeNumebr from the DB content
          // initialization of the lastGeneratedChangeNumber from the DB content
          // if DB is empty => last record does not exist => default to 0
          lastGeneratedChangeNumber =
              (lastCNRecord != null) ? lastCNRecord.getChangeNumber() : 0;
@@ -1617,7 +1544,8 @@
      catch (Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        Message message = ERR_CHANGENUMBER_DATABASE.get(e.getMessage());
        Message message =
            ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage());
        throw new DirectoryException(OPERATIONS_ERROR, message, e);
      }
    }
@@ -1816,6 +1744,16 @@
  }
  /**
   * Returns the changelogDB.
   *
   * @return the changelogDB.
   */
  ChangelogDB getChangelogDB()
  {
    return this.changelogDB;
  }
  /**
   * Get the replication server DB directory.
   * This is useful for tests to be able to do some cleanup. Might even be
   * useful for the server some day.
@@ -1824,7 +1762,7 @@
   */
  public String getDbDirName()
  {
    return dbDirname;
    return this.changelogDB.getDBDirName();
  }
  /*
@@ -1896,33 +1834,4 @@
        + baseDNs.keySet();
  }
  /**
   * Initializes the generationId for the specified replication domain.
   *
   * @param baseDn
   *          the replication domain
   * @param generationId
   *          the the generationId value for initialization
   */
  public void initDomainGenerationID(String baseDn, long generationId)
  {
    getReplicationServerDomain(baseDn, true).initGenerationID(generationId);
  }
  /**
   * Adds the specified serverId to the specified replication domain.
   *
   * @param serverId
   *          the server Id to add to the replication domain
   * @param baseDn
   *          the replication domain where to add the serverId
   * @throws ChangelogException
   *           If a database error happened.
   */
  public void addServerIdToDomain(int serverId, String baseDn)
      throws ChangelogException
  {
    DbHandler dbHandler = newDbHandler(serverId, baseDn);
    getReplicationServerDomain(baseDn, true).setDbHandler(serverId, dbHandler);
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -46,9 +47,9 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -117,17 +118,26 @@
  private final Queue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
  /**
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<Integer, DbHandler> sourceDbHandlers =
      new ConcurrentHashMap<Integer, DbHandler>();
  private final ChangelogDB changelogDB;
  /** The ReplicationServer that created the current instance. */
  private ReplicationServer localReplicationServer;
  /** GenerationId management. */
  /**
   * The generationId of the current replication domain. The generationId is
   * computed by hashing the first 1000 entries in the DB.
   */
  private volatile long generationId = -1;
  private boolean generationIdSavedStatus = false;
  /**
   * JNR, this is legacy code, hard to follow logic. I think what this field
   * tries to say is: "is the generationId in use anywhere?", i.e. is there a
   * replication topology in place? As soon as an answer to any of these
   * question comes true, then it is set to true.
   * <p>
   * It looks like the only use of this field is to prevent the
   * {@link #generationId} from being reset by
   * {@link #resetGenerationIdIfPossible()}.
   */
  private volatile boolean generationIdSavedStatus = false;
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
@@ -177,6 +187,7 @@
    this.assuredTimeoutTimer = new Timer("Replication server RS("
        + localReplicationServer.getServerId()
        + ") assured timer for domain \"" + baseDn + "\"", true);
    this.changelogDB = localReplicationServer.getChangelogDB();
    DirectoryServer.registerMonitorProvider(this);
  }
@@ -252,7 +263,7 @@
      }
    }
    if (!publishMessage(update, serverId))
    if (!publishUpdateMsg(update, serverId))
    {
      return;
    }
@@ -390,43 +401,46 @@
    }
  }
  private boolean publishMessage(UpdateMsg update, int serverId)
  private boolean publishUpdateMsg(UpdateMsg updateMsg, int serverId)
  {
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler;
    synchronized (sourceDbHandlers)
    try
    {
      dbHandler = sourceDbHandlers.get(serverId);
      if (dbHandler == null)
      if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
      {
        try
        {
          dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
          generationIdSavedStatus = true;
        } catch (ChangelogException e)
        /*
         * JNR: Matt and I had a hard time figuring out where to put this
         * synchronized block. We elected to put it here, but without a strong
         * conviction.
         */
        synchronized (generationIDLock)
        {
          /*
           * Because of database problem we can't save any more changes
           * from at least one LDAP server.
           * This replicationServer therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           * JNR: I think the generationIdSavedStatus is set to true because
           * method above created a ReplicaDB which assumes the generationId was
           * communicated to another server. Hence setting true on this field
           * prevent the generationId from being reset.
           */
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
          mb.append(" ");
          mb.append(stackTraceToSingleLineString(e));
          logError(mb.toMessage());
          localReplicationServer.shutdown();
          return false;
          generationIdSavedStatus = true;
        }
        sourceDbHandlers.put(serverId, dbHandler);
      }
      return true;
    }
    // Publish the messages to the source handler
    dbHandler.add(update);
    return true;
    catch (ChangelogException e)
    {
      /*
       * Because of database problem we can't save any more changes from at
       * least one LDAP server. This replicationServer therefore can't do it's
       * job properly anymore and needs to close all its connections and
       * shutdown itself.
       */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(" ");
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      localReplicationServer.shutdown();
      return false;
    }
  }
  private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
@@ -1261,7 +1275,7 @@
   */
  public Set<Integer> getServerIds()
  {
    return sourceDbHandlers.keySet();
    return changelogDB.getDomainServerIds(baseDn);
  }
  /**
@@ -1278,29 +1292,7 @@
   */
  public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
  {
    DbHandler dbHandler = sourceDbHandlers.get(serverId);
    if (dbHandler == null)
    {
      return null;
    }
    ReplicaDBCursor cursor;
    try
    {
      cursor = dbHandler.generateCursorFrom(startAfterCSN);
    }
    catch (Exception e)
    {
      return null;
    }
    if (!cursor.next())
    {
      close(cursor);
      return null;
    }
    return cursor;
    return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
  }
 /**
@@ -1313,12 +1305,7 @@
  */
  public long getCount(int serverId, CSN from, CSN to)
  {
    DbHandler dbHandler = sourceDbHandlers.get(serverId);
    if (dbHandler != null)
    {
      return dbHandler.getCount(from, to);
    }
    return 0;
    return changelogDB.getCount(baseDn, serverId, from, to);
  }
  /**
@@ -1328,12 +1315,7 @@
   */
  public long getChangesCount()
  {
    long entryCount = 0;
    for (DbHandler dbHandler : sourceDbHandlers.values())
    {
      entryCount += dbHandler.getChangesCount();
    }
    return entryCount;
    return changelogDB.getDomainChangesCount(baseDn);
  }
  /**
@@ -1346,24 +1328,6 @@
  }
  /**
   * Sets the provided DbHandler associated to the provided serverId.
   *
   * @param serverId  the serverId for the server to which is
   *                  associated the DbHandler.
   * @param dbHandler the dbHandler associated to the serverId.
   *
   * @throws ChangelogException If a database error happened.
   */
  public void setDbHandler(int serverId, DbHandler dbHandler)
    throws ChangelogException
  {
    synchronized (sourceDbHandlers)
    {
      sourceDbHandlers.put(serverId, dbHandler);
    }
  }
  /**
   * Retrieves the destination handlers for a routable message.
   *
   * @param msg The message to route.
@@ -1734,20 +1698,7 @@
    stopAllServers(true);
    shutdownDbHandlers();
  }
  /** Shutdown all the dbHandlers. */
  private void shutdownDbHandlers()
  {
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
      {
        dbHandler.shutdown();
      }
      sourceDbHandlers.clear();
    }
    changelogDB.shutdownDomain(baseDn);
  }
  /**
@@ -1758,9 +1709,9 @@
  public ServerState getDbServerState()
  {
    ServerState serverState = new ServerState();
    for (DbHandler db : sourceDbHandlers.values())
    for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
    {
      serverState.update(db.getLastChange());
      serverState.update(lastCSN);
    }
    return serverState;
  }
@@ -2235,24 +2186,7 @@
  public void clearDbs()
  {
    // Reset the localchange and state db for the current domain
    synchronized (sourceDbHandlers)
    {
      for (DbHandler dbHandler : sourceDbHandlers.values())
      {
        try
        {
          dbHandler.clear();
        } catch (Exception e)
        {
          // TODO: i18n
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
              e.getMessage() + " " + stackTraceToSingleLineString(e)));
          logError(mb.toMessage());
        }
      }
      shutdownDbHandlers();
    }
    changelogDB.clearDomain(baseDn);
    try
    {
      localReplicationServer.clearGenerationId(baseDn);
@@ -2397,20 +2331,6 @@
  }
  /**
   * Set the purge delay on all the db Handlers for this Domain
   * of Replication.
   *
   * @param delay The new purge delay to use.
   */
  public void setPurgeDelay(long delay)
  {
    for (DbHandler dbHandler : sourceDbHandlers.values())
    {
      dbHandler.setPurgeDelay(delay);
    }
  }
  /**
   * Get the map of connected DSs.
   * @return The map of connected DSs
   */
@@ -2667,7 +2587,6 @@
    {
      for (int serverId : dbState)
      {
        DbHandler h = sourceDbHandlers.get(serverId);
        CSN mostRecentDbCSN = dbState.getCSN(serverId);
        try {
          // Is the most recent change in the Db newer than eligible CSN ?
@@ -2676,19 +2595,8 @@
          if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
          {
            // let's try to seek the first change <= eligibleCSN
            ReplicaDBCursor cursor = null;
            try {
              cursor = h.generateCursorFrom(eligibleCSN);
              if (cursor != null && cursor.getChange() != null) {
                CSN newCSN = cursor.getChange().getCSN();
                result.update(newCSN);
              }
            } catch (ChangelogException e) {
              // there's no change older than eligibleCSN (case of s3/csn31)
              result.update(new CSN(0, 0, serverId));
            } finally {
              close(cursor);
            }
            CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
            result.update(newCSN);
          } else {
            // for this serverId, all changes in the ChangelogDb are holder
            // than eligibleCSN, the most recent in the db is our guy.
@@ -2721,9 +2629,9 @@
  public ServerState getStartState()
  {
    ServerState domainStartState = new ServerState();
    for (DbHandler dbHandler : sourceDbHandlers.values())
    for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
    {
      domainStartState.update(dbHandler.getFirstChange());
      domainStartState.update(firstCSN);
    }
    return domainStartState;
  }
@@ -2741,10 +2649,12 @@
  {
    CSN eligibleCSN = null;
    for (DbHandler db : sourceDbHandlers.values())
    for (Entry<Integer, CSN> entry :
      changelogDB.getDomainLastCSNs(baseDn).entrySet())
    {
      // Consider this producer (DS/db).
      int serverId = db.getServerId();
      final int serverId = entry.getKey();
      final CSN changelogLastCSN = entry.getValue();
      // Should it be considered for eligibility ?
      CSN heartbeatLastCSN =
@@ -2774,7 +2684,6 @@
        continue;
      }
      CSN changelogLastCSN = db.getLastChange();
      if (changelogLastCSN != null
          && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
      {
@@ -2935,15 +2844,7 @@
   */
  public long getLatestDomainTrimDate()
  {
    long latest = 0;
    for (DbHandler db : sourceDbHandlers.values())
    {
      if (latest == 0 || latest < db.getLatestTrimDate())
      {
        latest = db.getLatestTrimDate();
      }
    }
    return latest;
    return changelogDB.getDomainLatestTrimDate(baseDn);
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
New file
@@ -0,0 +1,229 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
import java.util.Map;
import java.util.Set;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
/**
 * The changelogDB stores the replication data on persistent storage.
 * <p>
 * This interface allows to:
 * <ul>
 * <li>set the storage directory and the purge interval</li>
 * <li>get access to the {@link ChangeNumberIndexDB}</li>
 * <li>query or control the replication domain database(s) (composed of one or
 * more ReplicaDBs)</li>
 * <li>query/update each ReplicaDB</li>
 * </ul>
 */
public interface ChangelogDB
{
  // DB control methods
  /**
   * Set the directory to be used by the replication database.
   *
   * @param dbDirName
   *          the directory for use by the replication database
   * @throws ConfigException
   *           if a problem occurs opening the directory
   */
  void setReplicationDBDirectory(String dbDirName) throws ConfigException;
  /**
   * Get the replication server database directory. This is used by tests to do
   * some cleanup.
   *
   * @return the database directory name
   */
  String getDBDirName();
  /**
   * Initializes the replication database.
   */
  void initializeDB();
  /**
   * Sets the purge delay for the replication database. This purge delay is a
   * best effort.
   *
   * @param delayInMillis
   *          the purge delay in milliseconds
   */
  void setPurgeDelay(long delayInMillis);
  /**
   * Shutdown the replication database.
   */
  void shutdownDB();
  /**
   * Returns a new {@link ChangeNumberIndexDB} object.
   *
   * @return a new {@link ChangeNumberIndexDB} object
   * @throws ChangelogException
   *           If a database problem happened
   */
  ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException;
  // Domain methods
  /**
   * Returns the serverIds for the servers that are or have been part of the
   * provided replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @return a set of integers holding the serverIds
   */
  Set<Integer> getDomainServerIds(String baseDn);
  /**
   * Get the number of changes for the specified replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @return the number of changes.
   */
  long getDomainChangesCount(String baseDn);
  /**
   * Returns the FIRST {@link CSN}s of each serverId for the specified
   * replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @return a {serverId => FIRST CSN} Map
   */
  Map<Integer, CSN> getDomainFirstCSNs(String baseDn);
  /**
   * Returns the LAST {@link CSN}s of each serverId for the specified
   * replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @return a {serverId => LAST CSN} Map
   */
  Map<Integer, CSN> getDomainLastCSNs(String baseDn);
  /**
   * Retrieves the latest trim date for the specified replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @return the domain latest trim date
   */
  long getDomainLatestTrimDate(String baseDn);
  /**
   * Shutdown the specified replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   */
  void shutdownDomain(String baseDn);
  /**
   * Clear DB and shutdown for the specified replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   */
  void clearDomain(String baseDn);
  // serverId methods
  /**
   * Return the number of changes between 2 provided {@link CSN}s for the
   * specified serverId and replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @param serverId
   *          the serverId on which to act
   * @param from
   *          The lower (older) CSN
   * @param to
   *          The upper (newer) CSN
   * @return The computed number of changes
   */
  long getCount(String baseDn, int serverId, CSN from, CSN to);
  /**
   * Returns the {@link CSN} situated immediately after the specified
   * {@link CSN} for the specified serverId and replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @param serverId
   *          the serverId for which we want the information
   * @param startAfterCSN
   *          The position where the iterator must start
   * @return a new ReplicationIterator that allows to browse the db managed by
   *         this dbHandler and starting at the position defined by a given CSN.
   */
  CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN);
  /**
   * Generates a non empty {@link ReplicaDBCursor} for the specified serverId
   * and replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @param serverId
   *          the serverId on which to act
   * @param startAfterCSN
   *          The position where the iterator must start
   * @return a {@link ReplicaDBCursor} if the ReplicaDB is not empty, null
   *         otherwise
   */
  ReplicaDBCursor getCursorFrom(String baseDn, int serverId, CSN startAfterCSN);
  /**
   * for the specified serverId and replication domain.
   *
   * @param baseDn
   *          the replication domain baseDn
   * @param serverId
   *          the serverId on which to act
   * @param updateMsg
   *          the update message to publish to the replicaDB
   * @return true if a db had to be created to publish this message
   * @throws ChangelogException
   *           If a database problem happened
   */
  boolean publishUpdateMsg(String baseDn, int serverId, UpdateMsg updateMsg)
      throws ChangelogException;
}
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java
@@ -70,7 +70,7 @@
   * @param cause
   *          The underlying cause that triggered this exception.
   */
  protected ChangelogException(Message message, Throwable cause)
  public ChangelogException(Message message, Throwable cause)
  {
    super(message, cause);
  }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
New file
@@ -0,0 +1,452 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * JE implementation of the ChangelogDB.
 */
public class JEChangelogDB implements ChangelogDB
{
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
   * This map contains the List of updates received from each LDAP server.
   */
  private final Map<String, Map<Integer, DbHandler>> sourceDbHandlers =
      new ConcurrentHashMap<String, Map<Integer, DbHandler>>();
  private ReplicationDbEnv dbEnv;
  private String dbDirName = null;
  private File dbDirectory;
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  /**
   * Builds an instance of this class.
   *
   * @param replicationServer
   *          the local replication server.
   */
  public JEChangelogDB(ReplicationServer replicationServer)
  {
    this.replicationServer = replicationServer;
  }
  private Map<Integer, DbHandler> getDomainMap(String baseDn)
  {
    final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn);
    if (domainMap != null)
    {
      return domainMap;
    }
    return Collections.emptyMap();
  }
  private DbHandler getDbHandler(String baseDn, int serverId)
  {
    return getDomainMap(baseDn).get(serverId);
  }
  /**
   * Provision resources for the specified serverId in the specified replication
   * domain.
   *
   * @param baseDn
   *          the replication domain where to add the serverId
   * @param serverId
   *          the server Id to add to the replication domain
   * @throws ChangelogException
   *           If a database error happened.
   */
  private void commission(String baseDn, int serverId, ReplicationServer rs)
      throws ChangelogException
  {
    getOrCreateDbHandler(baseDn, serverId, rs);
  }
  private Pair<DbHandler, Boolean> getOrCreateDbHandler(String baseDn,
      int serverId, ReplicationServer rs) throws ChangelogException
  {
    synchronized (sourceDbHandlers)
    {
      Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn);
      if (domainMap == null)
      {
        domainMap = new ConcurrentHashMap<Integer, DbHandler>();
        sourceDbHandlers.put(baseDn, domainMap);
      }
      DbHandler dbHandler = domainMap.get(serverId);
      if (dbHandler == null)
      {
        dbHandler =
            new DbHandler(serverId, baseDn, rs, dbEnv, rs.getQueueSize());
        domainMap.put(serverId, dbHandler);
        return Pair.of(dbHandler, true);
      }
      return Pair.of(dbHandler, false);
    }
  }
  /** {@inheritDoc} */
  @Override
  public void initializeDB()
  {
    try
    {
      dbEnv = new ReplicationDbEnv(getFileForPath(dbDirName).getAbsolutePath(),
          replicationServer);
      initializeChangelogState(dbEnv.readChangelogState());
    }
    catch (ChangelogException e)
    {
      Message message =
          ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e
              .getLocalizedMessage());
      logError(message);
    }
  }
  private void initializeChangelogState(final ChangelogState changelogState)
      throws ChangelogException
  {
    for (Map.Entry<String, Long> entry :
      changelogState.getDomainToGenerationId().entrySet())
    {
      replicationServer.getReplicationServerDomain(entry.getKey(), true)
          .initGenerationID(entry.getValue());
    }
    for (Map.Entry<String, List<Integer>> entry : changelogState
        .getDomainToServerIds().entrySet())
    {
      final String baseDn = entry.getKey();
      for (int serverId : entry.getValue())
      {
        commission(baseDn, serverId, replicationServer);
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public void shutdownDB()
  {
    if (dbEnv != null)
    {
      dbEnv.shutdown();
    }
  }
  /** {@inheritDoc} */
  @Override
  public Set<Integer> getDomainServerIds(String baseDn)
  {
    return getDomainMap(baseDn).keySet();
  }
  /** {@inheritDoc} */
  @Override
  public long getCount(String baseDn, int serverId, CSN from, CSN to)
  {
    DbHandler dbHandler = getDbHandler(baseDn, serverId);
    if (dbHandler != null)
    {
      return dbHandler.getCount(from, to);
    }
    return 0;
  }
  /** {@inheritDoc} */
  @Override
  public long getDomainChangesCount(String baseDn)
  {
    long entryCount = 0;
    for (DbHandler dbHandler : getDomainMap(baseDn).values())
    {
      entryCount += dbHandler.getChangesCount();
    }
    return entryCount;
  }
  /** {@inheritDoc} */
  @Override
  public void shutdownDomain(String baseDn)
  {
    shutdownDbHandlers(getDomainMap(baseDn));
  }
  private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap)
  {
    synchronized (domainMap)
    {
      for (DbHandler dbHandler : domainMap.values())
      {
        dbHandler.shutdown();
      }
      domainMap.clear();
    }
  }
  /** {@inheritDoc} */
  @Override
  public Map<Integer, CSN> getDomainFirstCSNs(String baseDn)
  {
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
    final Map<Integer, CSN> results =
        new HashMap<Integer, CSN>(domainMap.size());
    for (DbHandler dbHandler : domainMap.values())
    {
      results.put(dbHandler.getServerId(), dbHandler.getFirstChange());
    }
    return results;
  }
  /** {@inheritDoc} */
  @Override
  public Map<Integer, CSN> getDomainLastCSNs(String baseDn)
  {
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
    final Map<Integer, CSN> results =
        new HashMap<Integer, CSN>(domainMap.size());
    for (DbHandler dbHandler : domainMap.values())
    {
      results.put(dbHandler.getServerId(), dbHandler.getLastChange());
    }
    return results;
  }
  /** {@inheritDoc} */
  @Override
  public void clearDomain(String baseDn)
  {
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
    synchronized (domainMap)
    {
      for (DbHandler dbHandler : domainMap.values())
      {
        try
        {
          dbHandler.clear();
        }
        catch (Exception e)
        {
          // TODO: i18n
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e
              .getMessage()
              + " " + stackTraceToSingleLineString(e)));
          logError(mb.toMessage());
        }
      }
      shutdownDbHandlers(domainMap);
    }
    try
    {
      dbEnv.clearGenerationId(baseDn);
    }
    catch (Exception ignored)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public void setPurgeDelay(long delay)
  {
    for (Map<Integer, DbHandler> domainMap : sourceDbHandlers.values())
    {
      for (DbHandler dbHandler : domainMap.values())
      {
        dbHandler.setPurgeDelay(delay);
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(String baseDn)
  {
    long latest = 0;
    for (DbHandler dbHandler : getDomainMap(baseDn).values())
    {
      if (latest == 0 || latest < dbHandler.getLatestTrimDate())
      {
        latest = dbHandler.getLatestTrimDate();
      }
    }
    return latest;
  }
  /** {@inheritDoc} */
  @Override
  public CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN)
  {
    final DbHandler dbHandler = getDbHandler(baseDn, serverId);
    ReplicaDBCursor cursor = null;
    try
    {
      cursor = dbHandler.generateCursorFrom(startAfterCSN);
      if (cursor != null && cursor.getChange() != null)
      {
        return cursor.getChange().getCSN();
      }
      return null;
    }
    catch (ChangelogException e)
    {
      // there's no change older than startAfterCSN
      return new CSN(0, 0, serverId);
    }
    finally
    {
      close(cursor);
    }
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException
  {
    return new DraftCNDbHandler(replicationServer, this.dbEnv);
  }
  /** {@inheritDoc} */
  @Override
  public void setReplicationDBDirectory(String dbDirName)
      throws ConfigException
  {
    if (dbDirName == null)
    {
      dbDirName = "changelogDb";
    }
    this.dbDirName = dbDirName;
    // Check that this path exists or create it.
    dbDirectory = getFileForPath(this.dbDirName);
    try
    {
      if (!dbDirectory.exists())
      {
        dbDirectory.mkdir();
      }
    }
    catch (Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(e.getLocalizedMessage());
      mb.append(" ");
      mb.append(String.valueOf(dbDirectory));
      Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
      throw new ConfigException(msg, e);
    }
  }
  /** {@inheritDoc} */
  @Override
  public String getDBDirName()
  {
    return this.dbDirName;
  }
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(String baseDn, int serverId,
      CSN startAfterCSN)
  {
    DbHandler dbHandler = getDbHandler(baseDn, serverId);
    if (dbHandler == null)
    {
      return null;
    }
    ReplicaDBCursor it;
    try
    {
      it = dbHandler.generateCursorFrom(startAfterCSN);
    }
    catch (Exception e)
    {
      return null;
    }
    if (!it.next())
    {
      close(it);
      return null;
    }
    return it;
  }
  /** {@inheritDoc} */
  @Override
  public boolean publishUpdateMsg(String baseDn, int serverId,
      UpdateMsg updateMsg) throws ChangelogException
  {
    final Pair<DbHandler, Boolean> pair =
        getOrCreateDbHandler(baseDn, serverId, replicationServer);
    final DbHandler dbHandler = pair.getFirst();
    final boolean wasCreated = pair.getSecond();
    dbHandler.add(updateMsg);
    return wasCreated;
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -34,6 +34,7 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -166,12 +167,14 @@
  }
  /**
   * Read the list of known servers from the database and start dbHandler
   * for each of them.
   * Read the list of known servers from the database and start dbHandler for
   * each of them.
   *
   * @throws ChangelogException in case of underlying Exception
   * @return the {@link ChangelogState} read from the changelogState DB
   * @throws ChangelogException
   *           if a database problem occurs
   */
  public void initializeFromChangelogStateDB() throws ChangelogException
  public ChangelogState readChangelogState() throws ChangelogException
  {
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
@@ -179,6 +182,8 @@
    try
    {
      final ChangelogState result = new ChangelogState();
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
@@ -197,7 +202,7 @@
          if (debugEnabled())
            debug("has read baseDn=" + baseDn + " generationId=" +generationId);
          replicationServer.initDomainGenerationID(baseDn, generationId);
          result.setDomainGenerationId(baseDn, generationId);
        }
        else
        {
@@ -207,11 +212,13 @@
          if (debugEnabled())
            debug("has read: baseDn=" + baseDn + " serverId=" + serverId);
          replicationServer.addServerIdToDomain(serverId, baseDn);
          result.addServerIdToDomain(serverId, baseDn);
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      return result;
    }
    catch (RuntimeException e)
    {
opends/src/server/org/opends/server/util/Pair.java
New file
@@ -0,0 +1,162 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 */
package org.opends.server.util;
/**
 * Ordered pair of various objects.
 *
 * @param <F>
 *          type of the first pair element
 * @param <S>
 *          type of the second pair element
 */
public class Pair<F, S>
{
  /** An empty Pair. */
  public static final Pair<?, ?> EMPTY = Pair.of(null, null);
  /** The first pair element. */
  private final F first;
  /** The second pair element. */
  private final S second;
  /**
   * Default ctor.
   *
   * @param first
   *          the first element of the constructed pair
   * @param second
   *          the second element of the constructed pair
   */
  private Pair(F first, S second)
  {
    this.first = first;
    this.second = second;
  }
  /**
   * Factory method to build a new Pair.
   *
   * @param first
   *          the first element of the constructed pair
   * @param second
   *          the second element of the constructed pair
   * @param <F>
   *          type of the first pair element
   * @param <S>
   *          type of the second pair element
   * @return A new Pair built with the provided elements
   */
  public static <F, S> Pair<F, S> of(F first, S second)
  {
    return new Pair<F, S>(first, second);
  }
  /**
   * Returns an empty Pair matching the required types.
   *
   * @param <F>
   *          type of the first pair element
   * @param <S>
   *          type of the second pair element
   * @return An empty Pair matching the required types
   */
  @SuppressWarnings("unchecked")
  public static <F, S> Pair<F, S> empty()
  {
    return (Pair<F, S>) EMPTY;
  }
  /**
   * Returns the first element of this pair.
   *
   * @return the first element of this pair
   */
  public F getFirst()
  {
    return first;
  }
  /**
   * Returns the second element of this pair.
   *
   * @return the second element of this pair
   */
  public S getSecond()
  {
    return second;
  }
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((first == null) ? 0 : first.hashCode());
    result = prime * result + ((second == null) ? 0 : second.hashCode());
    return result;
  }
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object obj)
  {
    if (this == obj)
      return true;
    if (obj == null)
      return false;
    if (getClass() != obj.getClass())
      return false;
    Pair<?, ?> other = (Pair<?, ?>) obj;
    if (first == null)
    {
      if (other.first != null)
        return false;
    }
    else if (!first.equals(other.first))
      return false;
    if (second == null)
    {
      if (other.second != null)
        return false;
    }
    else if (!second.equals(other.second))
      return false;
    return true;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return "Pair [" + first + ", " + second + "]";
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -25,7 +25,7 @@
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication;
package org.opends.server.replication.server;
import java.io.*;
import java.net.Socket;
@@ -44,6 +44,7 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.*;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.MultiDomainServerState;
@@ -53,7 +54,6 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
@@ -828,14 +828,12 @@
      publishDeleteMsgInOTest(s2test, csn9, tn, 9);
      sleep(500);
      ReplicationServerDomain rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
      ServerState startState = rsd.getStartState();
      ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING);
      assertEquals(startState.getCSN(s1test.getServerId()).getSeqnum(), 1);
      assertTrue(startState.getCSN(s2test.getServerId()) != null);
      assertEquals(startState.getCSN(s2test.getServerId()).getSeqnum(), 7);
      rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2);
      startState = rsd.getStartState();
      startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING2);
      assertEquals(startState.getCSN(s2test2.getServerId()).getSeqnum(), 2);
      assertEquals(startState.getCSN(s1test2.getServerId()).getSeqnum(), 6);
@@ -891,6 +889,11 @@
    debugInfo(tn, "Ending test successfully");
  }
  private ServerState getReplicationDomainStartState(String baseDn)
  {
    return replicationServer.getReplicationServerDomain(baseDn).getStartState();
  }
  private String getCookie(List<SearchResultEntry> entries,
      int expectedNbEntries, String tn, LDIFWriter ldifWriter, String cookie)
      throws Exception
@@ -979,8 +982,6 @@
    debugInfo(tn, "Starting test");
    ReplicationBroker server01 = null;
    ReplicationServerDomain d1 = null;
    ReplicationServerDomain d2 = null;
    try
    {
@@ -1010,10 +1011,7 @@
      // ---
      // 2. Now set up a very short purge delay on the replication changelogs
      // so that this test can play with a trimmed changelog.
      d1 = replicationServer.getReplicationServerDomain("o=test");
      d2 = replicationServer.getReplicationServerDomain("o=test2");
      d1.setPurgeDelay(1);
      d2.setPurgeDelay(1);
      replicationServer.getChangelogDB().setPurgeDelay(1);
      // Sleep longer than this delay - so that the changelog is trimmed
      Thread.sleep(1000);
@@ -1047,8 +1045,8 @@
      //    returns the appropriate error.
      publishDeleteMsgInOTest(server01, csns[3], tn, 1);
      debugInfo(tn, "d1 trimdate" + d1.getStartState());
      debugInfo(tn, "d2 trimdate" + d2.getStartState());
      debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState("o=test"));
      debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState("o=test2"));
      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, tn, UNWILLING_TO_PERFORM);
      assertEquals(searchOp.getSearchEntries().size(), 0);
      assertTrue(searchOp.getErrorMessage().toString().startsWith(
@@ -1059,15 +1057,7 @@
    {
      stop(server01);
      // And reset changelog purge delay for the other tests.
      if (d1 != null)
      {
        d1.setPurgeDelay(15 * 1000);
      }
      if (d2 != null)
      {
        d2.setPurgeDelay(15 * 1000);
      }
      replicationServer.getChangelogDB().setPurgeDelay(15 * 1000);
      replicationServer.clearDb();
    }
    debugInfo(tn, "Ending test successfully");