From 8ed297692b7674b67b8d05a26fa9b04c20930e37 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Sep 2013 21:41:58 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                             |  147 +----
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                    |  452 ++++++++++++++++++
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java                 |   19 
 opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java                     |  229 +++++++++
 opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java              |    2 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java |   36 
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                       |  235 ++------
 opends/src/server/org/opends/server/util/Pair.java                                                        |  162 ++++++
 opends/src/server/org/opends/server/replication/server/ChangelogState.java                                |  104 ++++
 9 files changed, 1,070 insertions(+), 316 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ChangelogState.java b/opends/src/server/org/opends/server/replication/server/ChangelogState.java
new file mode 100644
index 0000000..18ea636
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -0,0 +1,104 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the changelog state stored in the changelogStateDB. For each
+ * replication domain, it contains:
+ * <ul>
+ * <li>its generationId</li>
+ * <li>the list of serverIds composing it</li>
+ * </ul>
+ * <p>
+ * This class is used during replication initialization to decouple the code
+ * that reads the changelogStateDB from the code that makes use of its data.
+ */
+public class ChangelogState
+{
+
+  private final Map<String, Long> domainToGenerationId =
+      new HashMap<String, Long>();
+  private final Map<String, List<Integer>> domainToServerIds =
+      new HashMap<String, List<Integer>>();
+
+  /**
+   * Sets the generationId for the supplied replication domain.
+   *
+   * @param baseDn
+   *          the targeted replication domain baseDN
+   * @param generationId
+   *          the generation Id to set
+   */
+  public void setDomainGenerationId(String baseDn, long generationId)
+  {
+    domainToGenerationId.put(baseDn, generationId);
+  }
+
+  /**
+   * Adds the serverId to the serverIds list of the supplied replication domain.
+   *
+   * @param serverId
+   *          the serverId to add
+   * @param baseDn
+   *          the targeted replication domain baseDN
+   */
+  public void addServerIdToDomain(int serverId, String baseDn)
+  {
+    List<Integer> serverIds = domainToServerIds.get(baseDn);
+    if (serverIds == null)
+    {
+      serverIds = new LinkedList<Integer>();
+      domainToServerIds.put(baseDn, serverIds);
+    }
+    serverIds.add(serverId);
+  }
+
+  /**
+   * Returns the Map of domainBaseDN => generationId.
+   *
+   * @return a Map of domainBaseDN => generationId
+   */
+  public Map<String, Long> getDomainToGenerationId()
+  {
+    return domainToGenerationId;
+  }
+
+  /**
+   * Returns the Map of domainBaseDN => List&lt;serverId&gt;.
+   *
+   * @return a Map of domainBaseDN => List&lt;serverId&gt;.
+   */
+  public Map<String, List<Integer>> getDomainToServerIds()
+  {
+    return domainToServerIds;
+  }
+}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 62fee3a..f351728 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -27,7 +27,6 @@
  */
 package org.opends.server.replication.server;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.StringReader;
 import java.net.*;
@@ -54,10 +53,9 @@
 import org.opends.server.replication.protocol.*;
 import org.opends.server.replication.server.changelog.api.CNIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.je.DbHandler;
-import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
-import org.opends.server.replication.server.changelog.je.ReplicationDbEnv;
+import org.opends.server.replication.server.changelog.je.JEChangelogDB;
 import org.opends.server.types.*;
 import org.opends.server.util.LDIFReader;
 import org.opends.server.util.ServerConstants;
@@ -101,10 +99,9 @@
           new HashMap<String, ReplicationServerDomain>();
 
   private volatile boolean shutdown = false;
-  private ReplicationDbEnv dbEnv;
   private int rcvWindow;
   private int queueSize;
-  private String dbDirname = null;
+  private final ChangelogDB changelogDB = new JEChangelogDB(this);
 
   /**
    * The delay (in sec) after which the changes must be deleted from the
@@ -225,30 +222,11 @@
       replicationServerUrls = new ArrayList<String>();
     queueSize = configuration.getQueueSize();
     purgeDelay = configuration.getReplicationPurgeDelay();
-    dbDirname = configuration.getReplicationDBDirectory();
     rcvWindow = configuration.getWindowSize();
-    if (dbDirname == null)
-    {
-      dbDirname = "changelogDb";
-    }
-    // Check that this path exists or create it.
-    File f = getFileForPath(dbDirname);
-    try
-    {
-      if (!f.exists())
-      {
-        f.mkdir();
-      }
-    }
-    catch (Exception e)
-    {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(e.getLocalizedMessage());
-      mb.append(" ");
-      mb.append(String.valueOf(getFileForPath(dbDirname)));
-      Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
-      throw new ConfigException(msg, e);
-    }
+
+    this.changelogDB.setReplicationDBDirectory(configuration
+        .getReplicationDBDirectory());
+
     groupId = (byte)configuration.getGroupId();
     weight = configuration.getWeight();
     assuredTimeout = configuration.getAssuredTimeout();
@@ -504,10 +482,7 @@
 
     try
     {
-      // Initialize the replicationServer database.
-      dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
-          this);
-      dbEnv.initializeFromChangelogStateDB();
+      this.changelogDB.initializeDB();
 
       setServerURL();
       listenSocket = new ServerSocket();
@@ -539,16 +514,9 @@
       if (debugEnabled())
         TRACER.debugInfo("RS " +getMonitorInstanceName()+
             " successfully initialized");
-    } catch (ChangelogException e)
-    {
-      Message message = ERR_COULD_NOT_READ_DB.get(
-              getFileForPath(dbDirname).getAbsolutePath(),
-              e.getLocalizedMessage());
-      logError(message);
     } catch (UnknownHostException e)
     {
-      Message message = ERR_UNKNOWN_HOSTNAME.get();
-      logError(message);
+      logError(ERR_UNKNOWN_HOSTNAME.get());
     } catch (IOException e)
     {
       Message message =
@@ -827,37 +795,12 @@
 
     shutdownECL();
 
-    if (dbEnv != null)
-    {
-      dbEnv.shutdown();
-    }
+    this.changelogDB.shutdownDB();
 
     // Remove this instance from the global instance list
     allInstances.remove(this);
   }
 
-
-  /**
-   * Creates a new DB handler for this ReplicationServer and the serverId and DN
-   * given in parameter.
-   *
-   * @param serverId
-   *          The serverId for which the dbHandler must be created.
-   * @param baseDn
-   *          The DN for which the dbHandler must be created.
-   * @return The new DB handler for this ReplicationServer and the serverId and
-   *         DN given in parameter.
-   * @throws ChangelogException
-   *           in case of underlying database problem.
-   */
-  public DbHandler newDbHandler(int serverId, String baseDn)
-      throws ChangelogException
-  {
-    return new DbHandler(serverId, baseDn, this, dbEnv, queueSize);
-  }
-
-
-
   /**
    * Clears the generationId for the replicationServerDomain related to the
    * provided baseDn.
@@ -867,18 +810,6 @@
    */
   public void clearGenerationId(String baseDn)
   {
-    try
-    {
-      dbEnv.clearGenerationId(baseDn);
-    }
-    catch (Exception ignored)
-    {
-      if (debugEnabled())
-      {
-        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
-      }
-    }
-
     synchronized (cnIndexDBLock)
     {
       if (cnIndexDB != null)
@@ -962,11 +893,7 @@
     if (newPurgeDelay != purgeDelay)
     {
       purgeDelay = newPurgeDelay;
-      // propagate
-      for (ReplicationServerDomain domain : getReplicationServerDomains())
-      {
-        domain.setPurgeDelay(purgeDelay*1000);
-      }
+      this.changelogDB.setPurgeDelay(purgeDelay * 1000);
     }
 
     rcvWindow = configuration.getWindowSize();
@@ -1047,7 +974,7 @@
     }
 
     final String newDir = configuration.getReplicationDBDirectory();
-    if (newDir != null && !dbDirname.equals(newDir))
+    if (newDir != null && !this.changelogDB.getDBDirName().equals(newDir))
     {
       return new ConfigChangeResult(ResultCode.SUCCESS, true);
     }
@@ -1597,7 +1524,7 @@
    * @throws DirectoryException
    *           when needed.
    */
-  public ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
+  ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
   {
     synchronized (cnIndexDBLock)
     {
@@ -1605,9 +1532,9 @@
       {
         if (cnIndexDB == null)
         {
-          cnIndexDB = new DraftCNDbHandler(this, this.dbEnv);
+          cnIndexDB = this.changelogDB.newChangeNumberIndexDB();
           final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord();
-          // initialization of the lastGeneratedChangeNumebr from the DB content
+          // initialization of the lastGeneratedChangeNumber from the DB content
           // if DB is empty => last record does not exist => default to 0
           lastGeneratedChangeNumber =
               (lastCNRecord != null) ? lastCNRecord.getChangeNumber() : 0;
@@ -1617,7 +1544,8 @@
       catch (Exception e)
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        Message message = ERR_CHANGENUMBER_DATABASE.get(e.getMessage());
+        Message message =
+            ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage());
         throw new DirectoryException(OPERATIONS_ERROR, message, e);
       }
     }
@@ -1816,6 +1744,16 @@
   }
 
   /**
+   * Returns the changelogDB.
+   *
+   * @return the changelogDB.
+   */
+  ChangelogDB getChangelogDB()
+  {
+    return this.changelogDB;
+  }
+
+  /**
    * Get the replication server DB directory.
    * This is useful for tests to be able to do some cleanup. Might even be
    * useful for the server some day.
@@ -1824,7 +1762,7 @@
    */
   public String getDbDirName()
   {
-    return dbDirname;
+    return this.changelogDB.getDBDirName();
   }
 
   /*
@@ -1896,33 +1834,4 @@
         + baseDNs.keySet();
   }
 
-  /**
-   * Initializes the generationId for the specified replication domain.
-   *
-   * @param baseDn
-   *          the replication domain
-   * @param generationId
-   *          the the generationId value for initialization
-   */
-  public void initDomainGenerationID(String baseDn, long generationId)
-  {
-    getReplicationServerDomain(baseDn, true).initGenerationID(generationId);
-  }
-
-  /**
-   * Adds the specified serverId to the specified replication domain.
-   *
-   * @param serverId
-   *          the server Id to add to the replication domain
-   * @param baseDn
-   *          the replication domain where to add the serverId
-   * @throws ChangelogException
-   *           If a database error happened.
-   */
-  public void addServerIdToDomain(int serverId, String baseDn)
-      throws ChangelogException
-  {
-    DbHandler dbHandler = newDbHandler(serverId, baseDn);
-    getReplicationServerDomain(baseDn, true).setDbHandler(serverId, dbHandler);
-  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index c4e4eee..eef08e4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -30,6 +30,7 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -46,9 +47,9 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.*;
 import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
-import org.opends.server.replication.server.changelog.je.DbHandler;
 import org.opends.server.types.*;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -117,17 +118,26 @@
   private final Queue<MessageHandler> otherHandlers =
     new ConcurrentLinkedQueue<MessageHandler>();
 
-  /**
-   * This map contains the List of updates received from each LDAP server.
-   */
-  private final Map<Integer, DbHandler> sourceDbHandlers =
-      new ConcurrentHashMap<Integer, DbHandler>();
+  private final ChangelogDB changelogDB;
   /** The ReplicationServer that created the current instance. */
   private ReplicationServer localReplicationServer;
 
-  /** GenerationId management. */
+  /**
+   * The generationId of the current replication domain. The generationId is
+   * computed by hashing the first 1000 entries in the DB.
+   */
   private volatile long generationId = -1;
-  private boolean generationIdSavedStatus = false;
+  /**
+   * JNR, this is legacy code, hard to follow logic. I think what this field
+   * tries to say is: "is the generationId in use anywhere?", i.e. is there a
+   * replication topology in place? As soon as an answer to any of these
+   * question comes true, then it is set to true.
+   * <p>
+   * It looks like the only use of this field is to prevent the
+   * {@link #generationId} from being reset by
+   * {@link #resetGenerationIdIfPossible()}.
+   */
+  private volatile boolean generationIdSavedStatus = false;
 
   /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
@@ -177,6 +187,7 @@
     this.assuredTimeoutTimer = new Timer("Replication server RS("
         + localReplicationServer.getServerId()
         + ") assured timer for domain \"" + baseDn + "\"", true);
+    this.changelogDB = localReplicationServer.getChangelogDB();
 
     DirectoryServer.registerMonitorProvider(this);
   }
@@ -252,7 +263,7 @@
       }
     }
 
-    if (!publishMessage(update, serverId))
+    if (!publishUpdateMsg(update, serverId))
     {
       return;
     }
@@ -390,43 +401,46 @@
     }
   }
 
-  private boolean publishMessage(UpdateMsg update, int serverId)
+  private boolean publishUpdateMsg(UpdateMsg updateMsg, int serverId)
   {
-    // look for the dbHandler that is responsible for the LDAP server which
-    // generated the change.
-    DbHandler dbHandler;
-    synchronized (sourceDbHandlers)
+    try
     {
-      dbHandler = sourceDbHandlers.get(serverId);
-      if (dbHandler == null)
+      if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
       {
-        try
-        {
-          dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
-          generationIdSavedStatus = true;
-        } catch (ChangelogException e)
+        /*
+         * JNR: Matt and I had a hard time figuring out where to put this
+         * synchronized block. We elected to put it here, but without a strong
+         * conviction.
+         */
+        synchronized (generationIDLock)
         {
           /*
-           * Because of database problem we can't save any more changes
-           * from at least one LDAP server.
-           * This replicationServer therefore can't do it's job properly anymore
-           * and needs to close all its connections and shutdown itself.
+           * JNR: I think the generationIdSavedStatus is set to true because
+           * method above created a ReplicaDB which assumes the generationId was
+           * communicated to another server. Hence setting true on this field
+           * prevent the generationId from being reset.
            */
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(" ");
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          localReplicationServer.shutdown();
-          return false;
+          generationIdSavedStatus = true;
         }
-        sourceDbHandlers.put(serverId, dbHandler);
       }
+      return true;
     }
-
-    // Publish the messages to the source handler
-    dbHandler.add(update);
-    return true;
+    catch (ChangelogException e)
+    {
+      /*
+       * Because of database problem we can't save any more changes from at
+       * least one LDAP server. This replicationServer therefore can't do it's
+       * job properly anymore and needs to close all its connections and
+       * shutdown itself.
+       */
+      MessageBuilder mb = new MessageBuilder();
+      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+      mb.append(" ");
+      mb.append(stackTraceToSingleLineString(e));
+      logError(mb.toMessage());
+      localReplicationServer.shutdown();
+      return false;
+    }
   }
 
   private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
@@ -1261,7 +1275,7 @@
    */
   public Set<Integer> getServerIds()
   {
-    return sourceDbHandlers.keySet();
+    return changelogDB.getDomainServerIds(baseDn);
   }
 
   /**
@@ -1278,29 +1292,7 @@
    */
   public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
   {
-    DbHandler dbHandler = sourceDbHandlers.get(serverId);
-    if (dbHandler == null)
-    {
-      return null;
-    }
-
-    ReplicaDBCursor cursor;
-    try
-    {
-      cursor = dbHandler.generateCursorFrom(startAfterCSN);
-    }
-    catch (Exception e)
-    {
-      return null;
-    }
-
-    if (!cursor.next())
-    {
-      close(cursor);
-      return null;
-    }
-
-    return cursor;
+    return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
   }
 
  /**
@@ -1313,12 +1305,7 @@
   */
   public long getCount(int serverId, CSN from, CSN to)
   {
-    DbHandler dbHandler = sourceDbHandlers.get(serverId);
-    if (dbHandler != null)
-    {
-      return dbHandler.getCount(from, to);
-    }
-    return 0;
+    return changelogDB.getCount(baseDn, serverId, from, to);
   }
 
   /**
@@ -1328,12 +1315,7 @@
    */
   public long getChangesCount()
   {
-    long entryCount = 0;
-    for (DbHandler dbHandler : sourceDbHandlers.values())
-    {
-      entryCount += dbHandler.getChangesCount();
-    }
-    return entryCount;
+    return changelogDB.getDomainChangesCount(baseDn);
   }
 
   /**
@@ -1346,24 +1328,6 @@
   }
 
   /**
-   * Sets the provided DbHandler associated to the provided serverId.
-   *
-   * @param serverId  the serverId for the server to which is
-   *                  associated the DbHandler.
-   * @param dbHandler the dbHandler associated to the serverId.
-   *
-   * @throws ChangelogException If a database error happened.
-   */
-  public void setDbHandler(int serverId, DbHandler dbHandler)
-    throws ChangelogException
-  {
-    synchronized (sourceDbHandlers)
-    {
-      sourceDbHandlers.put(serverId, dbHandler);
-    }
-  }
-
-  /**
    * Retrieves the destination handlers for a routable message.
    *
    * @param msg The message to route.
@@ -1734,20 +1698,7 @@
 
     stopAllServers(true);
 
-    shutdownDbHandlers();
-  }
-
-  /** Shutdown all the dbHandlers. */
-  private void shutdownDbHandlers()
-  {
-    synchronized (sourceDbHandlers)
-    {
-      for (DbHandler dbHandler : sourceDbHandlers.values())
-      {
-        dbHandler.shutdown();
-      }
-      sourceDbHandlers.clear();
-    }
+    changelogDB.shutdownDomain(baseDn);
   }
 
   /**
@@ -1758,9 +1709,9 @@
   public ServerState getDbServerState()
   {
     ServerState serverState = new ServerState();
-    for (DbHandler db : sourceDbHandlers.values())
+    for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
     {
-      serverState.update(db.getLastChange());
+      serverState.update(lastCSN);
     }
     return serverState;
   }
@@ -2235,24 +2186,7 @@
   public void clearDbs()
   {
     // Reset the localchange and state db for the current domain
-    synchronized (sourceDbHandlers)
-    {
-      for (DbHandler dbHandler : sourceDbHandlers.values())
-      {
-        try
-        {
-          dbHandler.clear();
-        } catch (Exception e)
-        {
-          // TODO: i18n
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
-              e.getMessage() + " " + stackTraceToSingleLineString(e)));
-          logError(mb.toMessage());
-        }
-      }
-      shutdownDbHandlers();
-    }
+    changelogDB.clearDomain(baseDn);
     try
     {
       localReplicationServer.clearGenerationId(baseDn);
@@ -2397,20 +2331,6 @@
   }
 
   /**
-   * Set the purge delay on all the db Handlers for this Domain
-   * of Replication.
-   *
-   * @param delay The new purge delay to use.
-   */
-  public void setPurgeDelay(long delay)
-  {
-    for (DbHandler dbHandler : sourceDbHandlers.values())
-    {
-      dbHandler.setPurgeDelay(delay);
-    }
-  }
-
-  /**
    * Get the map of connected DSs.
    * @return The map of connected DSs
    */
@@ -2667,7 +2587,6 @@
     {
       for (int serverId : dbState)
       {
-        DbHandler h = sourceDbHandlers.get(serverId);
         CSN mostRecentDbCSN = dbState.getCSN(serverId);
         try {
           // Is the most recent change in the Db newer than eligible CSN ?
@@ -2676,19 +2595,8 @@
           if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
           {
             // let's try to seek the first change <= eligibleCSN
-            ReplicaDBCursor cursor = null;
-            try {
-              cursor = h.generateCursorFrom(eligibleCSN);
-              if (cursor != null && cursor.getChange() != null) {
-                CSN newCSN = cursor.getChange().getCSN();
-                result.update(newCSN);
-              }
-            } catch (ChangelogException e) {
-              // there's no change older than eligibleCSN (case of s3/csn31)
-              result.update(new CSN(0, 0, serverId));
-            } finally {
-              close(cursor);
-            }
+            CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
+            result.update(newCSN);
           } else {
             // for this serverId, all changes in the ChangelogDb are holder
             // than eligibleCSN, the most recent in the db is our guy.
@@ -2721,9 +2629,9 @@
   public ServerState getStartState()
   {
     ServerState domainStartState = new ServerState();
-    for (DbHandler dbHandler : sourceDbHandlers.values())
+    for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
     {
-      domainStartState.update(dbHandler.getFirstChange());
+      domainStartState.update(firstCSN);
     }
     return domainStartState;
   }
@@ -2741,10 +2649,12 @@
   {
     CSN eligibleCSN = null;
 
-    for (DbHandler db : sourceDbHandlers.values())
+    for (Entry<Integer, CSN> entry :
+      changelogDB.getDomainLastCSNs(baseDn).entrySet())
     {
       // Consider this producer (DS/db).
-      int serverId = db.getServerId();
+      final int serverId = entry.getKey();
+      final CSN changelogLastCSN = entry.getValue();
 
       // Should it be considered for eligibility ?
       CSN heartbeatLastCSN =
@@ -2774,7 +2684,6 @@
         continue;
       }
 
-      CSN changelogLastCSN = db.getLastChange();
       if (changelogLastCSN != null
           && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
       {
@@ -2935,15 +2844,7 @@
    */
   public long getLatestDomainTrimDate()
   {
-    long latest = 0;
-    for (DbHandler db : sourceDbHandlers.values())
-    {
-      if (latest == 0 || latest < db.getLatestTrimDate())
-      {
-        latest = db.getLatestTrimDate();
-      }
-    }
-    return latest;
+    return changelogDB.getDomainLatestTrimDate(baseDn);
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
new file mode 100644
index 0000000..9686ee0
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -0,0 +1,229 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opends.server.config.ConfigException;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+
+/**
+ * The changelogDB stores the replication data on persistent storage.
+ * <p>
+ * This interface allows to:
+ * <ul>
+ * <li>set the storage directory and the purge interval</li>
+ * <li>get access to the {@link ChangeNumberIndexDB}</li>
+ * <li>query or control the replication domain database(s) (composed of one or
+ * more ReplicaDBs)</li>
+ * <li>query/update each ReplicaDB</li>
+ * </ul>
+ */
+public interface ChangelogDB
+{
+
+  // DB control methods
+
+  /**
+   * Set the directory to be used by the replication database.
+   *
+   * @param dbDirName
+   *          the directory for use by the replication database
+   * @throws ConfigException
+   *           if a problem occurs opening the directory
+   */
+  void setReplicationDBDirectory(String dbDirName) throws ConfigException;
+
+  /**
+   * Get the replication server database directory. This is used by tests to do
+   * some cleanup.
+   *
+   * @return the database directory name
+   */
+  String getDBDirName();
+
+  /**
+   * Initializes the replication database.
+   */
+  void initializeDB();
+
+  /**
+   * Sets the purge delay for the replication database. This purge delay is a
+   * best effort.
+   *
+   * @param delayInMillis
+   *          the purge delay in milliseconds
+   */
+  void setPurgeDelay(long delayInMillis);
+
+  /**
+   * Shutdown the replication database.
+   */
+  void shutdownDB();
+
+  /**
+   * Returns a new {@link ChangeNumberIndexDB} object.
+   *
+   * @return a new {@link ChangeNumberIndexDB} object
+   * @throws ChangelogException
+   *           If a database problem happened
+   */
+  ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException;
+
+  // Domain methods
+
+  /**
+   * Returns the serverIds for the servers that are or have been part of the
+   * provided replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @return a set of integers holding the serverIds
+   */
+  Set<Integer> getDomainServerIds(String baseDn);
+
+  /**
+   * Get the number of changes for the specified replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @return the number of changes.
+   */
+  long getDomainChangesCount(String baseDn);
+
+  /**
+   * Returns the FIRST {@link CSN}s of each serverId for the specified
+   * replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @return a {serverId => FIRST CSN} Map
+   */
+  Map<Integer, CSN> getDomainFirstCSNs(String baseDn);
+
+  /**
+   * Returns the LAST {@link CSN}s of each serverId for the specified
+   * replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @return a {serverId => LAST CSN} Map
+   */
+  Map<Integer, CSN> getDomainLastCSNs(String baseDn);
+
+  /**
+   * Retrieves the latest trim date for the specified replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @return the domain latest trim date
+   */
+  long getDomainLatestTrimDate(String baseDn);
+
+  /**
+   * Shutdown the specified replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   */
+  void shutdownDomain(String baseDn);
+
+  /**
+   * Clear DB and shutdown for the specified replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   */
+  void clearDomain(String baseDn);
+
+  // serverId methods
+
+  /**
+   * Return the number of changes between 2 provided {@link CSN}s for the
+   * specified serverId and replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @param serverId
+   *          the serverId on which to act
+   * @param from
+   *          The lower (older) CSN
+   * @param to
+   *          The upper (newer) CSN
+   * @return The computed number of changes
+   */
+  long getCount(String baseDn, int serverId, CSN from, CSN to);
+
+  /**
+   * Returns the {@link CSN} situated immediately after the specified
+   * {@link CSN} for the specified serverId and replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @param serverId
+   *          the serverId for which we want the information
+   * @param startAfterCSN
+   *          The position where the iterator must start
+   * @return a new ReplicationIterator that allows to browse the db managed by
+   *         this dbHandler and starting at the position defined by a given CSN.
+   */
+  CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN);
+
+  /**
+   * Generates a non empty {@link ReplicaDBCursor} for the specified serverId
+   * and replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @param serverId
+   *          the serverId on which to act
+   * @param startAfterCSN
+   *          The position where the iterator must start
+   * @return a {@link ReplicaDBCursor} if the ReplicaDB is not empty, null
+   *         otherwise
+   */
+  ReplicaDBCursor getCursorFrom(String baseDn, int serverId, CSN startAfterCSN);
+
+  /**
+   * for the specified serverId and replication domain.
+   *
+   * @param baseDn
+   *          the replication domain baseDn
+   * @param serverId
+   *          the serverId on which to act
+   * @param updateMsg
+   *          the update message to publish to the replicaDB
+   * @return true if a db had to be created to publish this message
+   * @throws ChangelogException
+   *           If a database problem happened
+   */
+  boolean publishUpdateMsg(String baseDn, int serverId, UpdateMsg updateMsg)
+      throws ChangelogException;
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java
index cc6f328..aec222a 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java
@@ -70,7 +70,7 @@
    * @param cause
    *          The underlying cause that triggered this exception.
    */
-  protected ChangelogException(Message message, Throwable cause)
+  public ChangelogException(Message message, Throwable cause)
   {
     super(message, cause);
   }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
new file mode 100644
index 0000000..bea6ac5
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -0,0 +1,452 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.server.config.ConfigException;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.util.Pair;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
+/**
+ * JE implementation of the ChangelogDB.
+ */
+public class JEChangelogDB implements ChangelogDB
+{
+
+  /** The tracer object for the debug logger. */
+  private static final DebugTracer TRACER = getTracer();
+
+  /**
+   * This map contains the List of updates received from each LDAP server.
+   */
+  private final Map<String, Map<Integer, DbHandler>> sourceDbHandlers =
+      new ConcurrentHashMap<String, Map<Integer, DbHandler>>();
+  private ReplicationDbEnv dbEnv;
+  private String dbDirName = null;
+  private File dbDirectory;
+
+  /** The local replication server. */
+  private final ReplicationServer replicationServer;
+
+  /**
+   * Builds an instance of this class.
+   *
+   * @param replicationServer
+   *          the local replication server.
+   */
+  public JEChangelogDB(ReplicationServer replicationServer)
+  {
+    this.replicationServer = replicationServer;
+  }
+
+  private Map<Integer, DbHandler> getDomainMap(String baseDn)
+  {
+    final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn);
+    if (domainMap != null)
+    {
+      return domainMap;
+    }
+    return Collections.emptyMap();
+  }
+
+  private DbHandler getDbHandler(String baseDn, int serverId)
+  {
+    return getDomainMap(baseDn).get(serverId);
+  }
+
+  /**
+   * Provision resources for the specified serverId in the specified replication
+   * domain.
+   *
+   * @param baseDn
+   *          the replication domain where to add the serverId
+   * @param serverId
+   *          the server Id to add to the replication domain
+   * @throws ChangelogException
+   *           If a database error happened.
+   */
+  private void commission(String baseDn, int serverId, ReplicationServer rs)
+      throws ChangelogException
+  {
+    getOrCreateDbHandler(baseDn, serverId, rs);
+  }
+
+  private Pair<DbHandler, Boolean> getOrCreateDbHandler(String baseDn,
+      int serverId, ReplicationServer rs) throws ChangelogException
+  {
+    synchronized (sourceDbHandlers)
+    {
+      Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn);
+      if (domainMap == null)
+      {
+        domainMap = new ConcurrentHashMap<Integer, DbHandler>();
+        sourceDbHandlers.put(baseDn, domainMap);
+      }
+
+      DbHandler dbHandler = domainMap.get(serverId);
+      if (dbHandler == null)
+      {
+        dbHandler =
+            new DbHandler(serverId, baseDn, rs, dbEnv, rs.getQueueSize());
+        domainMap.put(serverId, dbHandler);
+        return Pair.of(dbHandler, true);
+      }
+      return Pair.of(dbHandler, false);
+    }
+  }
+
+
+  /** {@inheritDoc} */
+  @Override
+  public void initializeDB()
+  {
+    try
+    {
+      dbEnv = new ReplicationDbEnv(getFileForPath(dbDirName).getAbsolutePath(),
+          replicationServer);
+      initializeChangelogState(dbEnv.readChangelogState());
+    }
+    catch (ChangelogException e)
+    {
+      Message message =
+          ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e
+              .getLocalizedMessage());
+      logError(message);
+    }
+  }
+
+  private void initializeChangelogState(final ChangelogState changelogState)
+      throws ChangelogException
+  {
+    for (Map.Entry<String, Long> entry :
+      changelogState.getDomainToGenerationId().entrySet())
+    {
+      replicationServer.getReplicationServerDomain(entry.getKey(), true)
+          .initGenerationID(entry.getValue());
+    }
+    for (Map.Entry<String, List<Integer>> entry : changelogState
+        .getDomainToServerIds().entrySet())
+    {
+      final String baseDn = entry.getKey();
+      for (int serverId : entry.getValue())
+      {
+        commission(baseDn, serverId, replicationServer);
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void shutdownDB()
+  {
+    if (dbEnv != null)
+    {
+      dbEnv.shutdown();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Set<Integer> getDomainServerIds(String baseDn)
+  {
+    return getDomainMap(baseDn).keySet();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getCount(String baseDn, int serverId, CSN from, CSN to)
+  {
+    DbHandler dbHandler = getDbHandler(baseDn, serverId);
+    if (dbHandler != null)
+    {
+      return dbHandler.getCount(from, to);
+    }
+    return 0;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getDomainChangesCount(String baseDn)
+  {
+    long entryCount = 0;
+    for (DbHandler dbHandler : getDomainMap(baseDn).values())
+    {
+      entryCount += dbHandler.getChangesCount();
+    }
+    return entryCount;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void shutdownDomain(String baseDn)
+  {
+    shutdownDbHandlers(getDomainMap(baseDn));
+  }
+
+  private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap)
+  {
+    synchronized (domainMap)
+    {
+      for (DbHandler dbHandler : domainMap.values())
+      {
+        dbHandler.shutdown();
+      }
+      domainMap.clear();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Map<Integer, CSN> getDomainFirstCSNs(String baseDn)
+  {
+    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
+    final Map<Integer, CSN> results =
+        new HashMap<Integer, CSN>(domainMap.size());
+    for (DbHandler dbHandler : domainMap.values())
+    {
+      results.put(dbHandler.getServerId(), dbHandler.getFirstChange());
+    }
+    return results;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Map<Integer, CSN> getDomainLastCSNs(String baseDn)
+  {
+    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
+    final Map<Integer, CSN> results =
+        new HashMap<Integer, CSN>(domainMap.size());
+    for (DbHandler dbHandler : domainMap.values())
+    {
+      results.put(dbHandler.getServerId(), dbHandler.getLastChange());
+    }
+    return results;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void clearDomain(String baseDn)
+  {
+    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
+    synchronized (domainMap)
+    {
+      for (DbHandler dbHandler : domainMap.values())
+      {
+        try
+        {
+          dbHandler.clear();
+        }
+        catch (Exception e)
+        {
+          // TODO: i18n
+          MessageBuilder mb = new MessageBuilder();
+          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e
+              .getMessage()
+              + " " + stackTraceToSingleLineString(e)));
+          logError(mb.toMessage());
+        }
+      }
+      shutdownDbHandlers(domainMap);
+    }
+
+    try
+    {
+      dbEnv.clearGenerationId(baseDn);
+    }
+    catch (Exception ignored)
+    {
+      if (debugEnabled())
+      {
+        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setPurgeDelay(long delay)
+  {
+    for (Map<Integer, DbHandler> domainMap : sourceDbHandlers.values())
+    {
+      for (DbHandler dbHandler : domainMap.values())
+      {
+        dbHandler.setPurgeDelay(delay);
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getDomainLatestTrimDate(String baseDn)
+  {
+    long latest = 0;
+    for (DbHandler dbHandler : getDomainMap(baseDn).values())
+    {
+      if (latest == 0 || latest < dbHandler.getLatestTrimDate())
+      {
+        latest = dbHandler.getLatestTrimDate();
+      }
+    }
+    return latest;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN)
+  {
+    final DbHandler dbHandler = getDbHandler(baseDn, serverId);
+
+    ReplicaDBCursor cursor = null;
+    try
+    {
+      cursor = dbHandler.generateCursorFrom(startAfterCSN);
+      if (cursor != null && cursor.getChange() != null)
+      {
+        return cursor.getChange().getCSN();
+      }
+      return null;
+    }
+    catch (ChangelogException e)
+    {
+      // there's no change older than startAfterCSN
+      return new CSN(0, 0, serverId);
+    }
+    finally
+    {
+      close(cursor);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException
+  {
+    return new DraftCNDbHandler(replicationServer, this.dbEnv);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setReplicationDBDirectory(String dbDirName)
+      throws ConfigException
+  {
+    if (dbDirName == null)
+    {
+      dbDirName = "changelogDb";
+    }
+    this.dbDirName = dbDirName;
+
+    // Check that this path exists or create it.
+    dbDirectory = getFileForPath(this.dbDirName);
+    try
+    {
+      if (!dbDirectory.exists())
+      {
+        dbDirectory.mkdir();
+      }
+    }
+    catch (Exception e)
+    {
+      MessageBuilder mb = new MessageBuilder();
+      mb.append(e.getLocalizedMessage());
+      mb.append(" ");
+      mb.append(String.valueOf(dbDirectory));
+      Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
+      throw new ConfigException(msg, e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getDBDirName()
+  {
+    return this.dbDirName;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ReplicaDBCursor getCursorFrom(String baseDn, int serverId,
+      CSN startAfterCSN)
+  {
+    DbHandler dbHandler = getDbHandler(baseDn, serverId);
+    if (dbHandler == null)
+    {
+      return null;
+    }
+
+    ReplicaDBCursor it;
+    try
+    {
+      it = dbHandler.generateCursorFrom(startAfterCSN);
+    }
+    catch (Exception e)
+    {
+      return null;
+    }
+
+    if (!it.next())
+    {
+      close(it);
+      return null;
+    }
+
+    return it;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean publishUpdateMsg(String baseDn, int serverId,
+      UpdateMsg updateMsg) throws ChangelogException
+  {
+    final Pair<DbHandler, Boolean> pair =
+        getOrCreateDbHandler(baseDn, serverId, replicationServer);
+    final DbHandler dbHandler = pair.getFirst();
+    final boolean wasCreated = pair.getSecond();
+
+    dbHandler.add(updateMsg);
+    return wasCreated;
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index d4965c6..4f1f166 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -34,6 +34,7 @@
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 
@@ -166,12 +167,14 @@
   }
 
   /**
-   * Read the list of known servers from the database and start dbHandler
-   * for each of them.
+   * Read the list of known servers from the database and start dbHandler for
+   * each of them.
    *
-   * @throws ChangelogException in case of underlying Exception
+   * @return the {@link ChangelogState} read from the changelogState DB
+   * @throws ChangelogException
+   *           if a database problem occurs
    */
-  public void initializeFromChangelogStateDB() throws ChangelogException
+  public ChangelogState readChangelogState() throws ChangelogException
   {
     DatabaseEntry key = new DatabaseEntry();
     DatabaseEntry data = new DatabaseEntry();
@@ -179,6 +182,8 @@
 
     try
     {
+      final ChangelogState result = new ChangelogState();
+
       OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
       while (status == OperationStatus.SUCCESS)
       {
@@ -197,7 +202,7 @@
           if (debugEnabled())
             debug("has read baseDn=" + baseDn + " generationId=" +generationId);
 
-          replicationServer.initDomainGenerationID(baseDn, generationId);
+          result.setDomainGenerationId(baseDn, generationId);
         }
         else
         {
@@ -207,11 +212,13 @@
           if (debugEnabled())
             debug("has read: baseDn=" + baseDn + " serverId=" + serverId);
 
-          replicationServer.addServerIdToDomain(serverId, baseDn);
+          result.addServerIdToDomain(serverId, baseDn);
         }
 
         status = cursor.getNext(key, data, LockMode.DEFAULT);
       }
+
+      return result;
     }
     catch (RuntimeException e)
     {
diff --git a/opends/src/server/org/opends/server/util/Pair.java b/opends/src/server/org/opends/server/util/Pair.java
new file mode 100644
index 0000000..188b6f4
--- /dev/null
+++ b/opends/src/server/org/opends/server/util/Pair.java
@@ -0,0 +1,162 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.util;
+
+
+/**
+ * Ordered pair of various objects.
+ *
+ * @param <F>
+ *          type of the first pair element
+ * @param <S>
+ *          type of the second pair element
+ */
+public class Pair<F, S>
+{
+
+  /** An empty Pair. */
+  public static final Pair<?, ?> EMPTY = Pair.of(null, null);
+
+  /** The first pair element. */
+  private final F first;
+  /** The second pair element. */
+  private final S second;
+
+  /**
+   * Default ctor.
+   *
+   * @param first
+   *          the first element of the constructed pair
+   * @param second
+   *          the second element of the constructed pair
+   */
+  private Pair(F first, S second)
+  {
+    this.first = first;
+    this.second = second;
+  }
+
+  /**
+   * Factory method to build a new Pair.
+   *
+   * @param first
+   *          the first element of the constructed pair
+   * @param second
+   *          the second element of the constructed pair
+   * @param <F>
+   *          type of the first pair element
+   * @param <S>
+   *          type of the second pair element
+   * @return A new Pair built with the provided elements
+   */
+  public static <F, S> Pair<F, S> of(F first, S second)
+  {
+    return new Pair<F, S>(first, second);
+  }
+
+  /**
+   * Returns an empty Pair matching the required types.
+   *
+   * @param <F>
+   *          type of the first pair element
+   * @param <S>
+   *          type of the second pair element
+   * @return An empty Pair matching the required types
+   */
+  @SuppressWarnings("unchecked")
+  public static <F, S> Pair<F, S> empty()
+  {
+    return (Pair<F, S>) EMPTY;
+  }
+
+  /**
+   * Returns the first element of this pair.
+   *
+   * @return the first element of this pair
+   */
+  public F getFirst()
+  {
+    return first;
+  }
+
+  /**
+   * Returns the second element of this pair.
+   *
+   * @return the second element of this pair
+   */
+  public S getSecond()
+  {
+    return second;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode()
+  {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((first == null) ? 0 : first.hashCode());
+    result = prime * result + ((second == null) ? 0 : second.hashCode());
+    return result;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Pair<?, ?> other = (Pair<?, ?>) obj;
+    if (first == null)
+    {
+      if (other.first != null)
+        return false;
+    }
+    else if (!first.equals(other.first))
+      return false;
+    if (second == null)
+    {
+      if (other.second != null)
+        return false;
+    }
+    else if (!second.equals(other.second))
+      return false;
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return "Pair [" + first + ", " + second + "]";
+  }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
similarity index 98%
rename from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
rename to opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 50ce40a..fe08024 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -25,7 +25,7 @@
  *      Copyright 2006-2010 Sun Microsystems, Inc.
  *      Portions copyright 2011-2013 ForgeRock AS
  */
-package org.opends.server.replication;
+package org.opends.server.replication.server;
 
 import java.io.*;
 import java.net.Socket;
@@ -44,6 +44,7 @@
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.*;
+import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.CSNGenerator;
 import org.opends.server.replication.common.MultiDomainServerState;
@@ -53,7 +54,6 @@
 import org.opends.server.replication.plugin.LDAPReplicationDomain;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.ReplServerFakeConfiguration;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
@@ -828,14 +828,12 @@
       publishDeleteMsgInOTest(s2test, csn9, tn, 9);
       sleep(500);
 
-      ReplicationServerDomain rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
-      ServerState startState = rsd.getStartState();
+      ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING);
       assertEquals(startState.getCSN(s1test.getServerId()).getSeqnum(), 1);
       assertTrue(startState.getCSN(s2test.getServerId()) != null);
       assertEquals(startState.getCSN(s2test.getServerId()).getSeqnum(), 7);
 
-      rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2);
-      startState = rsd.getStartState();
+      startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING2);
       assertEquals(startState.getCSN(s2test2.getServerId()).getSeqnum(), 2);
       assertEquals(startState.getCSN(s1test2.getServerId()).getSeqnum(), 6);
 
@@ -891,6 +889,11 @@
     debugInfo(tn, "Ending test successfully");
   }
 
+  private ServerState getReplicationDomainStartState(String baseDn)
+  {
+    return replicationServer.getReplicationServerDomain(baseDn).getStartState();
+  }
+
   private String getCookie(List<SearchResultEntry> entries,
       int expectedNbEntries, String tn, LDIFWriter ldifWriter, String cookie)
       throws Exception
@@ -979,8 +982,6 @@
     debugInfo(tn, "Starting test");
 
     ReplicationBroker server01 = null;
-    ReplicationServerDomain d1 = null;
-    ReplicationServerDomain d2 = null;
 
     try
     {
@@ -1010,10 +1011,7 @@
       // ---
       // 2. Now set up a very short purge delay on the replication changelogs
       // so that this test can play with a trimmed changelog.
-      d1 = replicationServer.getReplicationServerDomain("o=test");
-      d2 = replicationServer.getReplicationServerDomain("o=test2");
-      d1.setPurgeDelay(1);
-      d2.setPurgeDelay(1);
+      replicationServer.getChangelogDB().setPurgeDelay(1);
 
       // Sleep longer than this delay - so that the changelog is trimmed
       Thread.sleep(1000);
@@ -1047,8 +1045,8 @@
       //    returns the appropriate error.
       publishDeleteMsgInOTest(server01, csns[3], tn, 1);
 
-      debugInfo(tn, "d1 trimdate" + d1.getStartState());
-      debugInfo(tn, "d2 trimdate" + d2.getStartState());
+      debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState("o=test"));
+      debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState("o=test2"));
       searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, tn, UNWILLING_TO_PERFORM);
       assertEquals(searchOp.getSearchEntries().size(), 0);
       assertTrue(searchOp.getErrorMessage().toString().startsWith(
@@ -1059,15 +1057,7 @@
     {
       stop(server01);
       // And reset changelog purge delay for the other tests.
-      if (d1 != null)
-      {
-        d1.setPurgeDelay(15 * 1000);
-      }
-      if (d2 != null)
-      {
-        d2.setPurgeDelay(15 * 1000);
-      }
-
+      replicationServer.getChangelogDB().setPurgeDelay(15 * 1000);
       replicationServer.clearDb();
     }
     debugInfo(tn, "Ending test successfully");

--
Gitblit v1.10.0