From c90277aee027fd834936b44a621a142cf71de444 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 14 Dec 2007 15:55:42 +0000
Subject: [PATCH] - Partial fix for #1302: on startup servers should wait to catchup before accepting connections - Fix for #795: Changelog fail over unit tests

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                                     |  741 ++++++++++++++-------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java |  477 ++++++++++++++
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                     |    3 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java         |    3 
 opends/src/messages/messages/replication.properties                                                               |   16 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java         |  684 ++++++++++++++++++++
 6 files changed, 1,659 insertions(+), 265 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 07cdeb9..f69a1cf 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -76,8 +76,8 @@
 NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server
 NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
  listening on %s
-NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \
- changes that this server has already processed on suffix %s
+NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \
+up to date chnages for suffix %s
 NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
  server should be configured
 SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
@@ -85,8 +85,8 @@
 MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
  database for base DN %s
 NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \
- replication server that has seen all the local changes on suffix %s. Going to replay \
- changes
+ replication server that has seen all the local changes on suffix %s. Found %d \
+replications server(s) not up to date. Going to replay changes
 NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
  server on suffix %s, retrying...
 NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing changelog database %s :
@@ -249,4 +249,12 @@
 SEVERE_ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED_99=The replication \
   server backend cannot export its entries in LDIF format because the \
   export-ldif command must be run as a task
+DEBUG_GOING_TO_SEARCH_FOR_CHANGES_100=The replication server is late \
+regarding our changes: going to send missing ones
+DEBUG_SENDING_CHANGE_101=Sending change number: %s
+DEBUG_CHANGES_SENT_102=All missing changes sent to replication server
+SEVERE_ERR_PUBLISHING_FAKE_OPS_103=Caught exception publishing fake operations \
+for domain %s to replication server %s : %s
+SEVERE_ERR_COMPUTING_FAKE_OPS_104=Caught exception computing fake operations \
+for domain %s for replication server %s : %s
 
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 94fddf0..049910b 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,6 +25,7 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
+
 import org.opends.messages.*;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
@@ -41,6 +42,8 @@
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.TreeSet;
 import java.util.concurrent.Semaphore;
@@ -61,17 +64,16 @@
 import org.opends.server.types.SearchResultReference;
 import org.opends.server.types.SearchScope;
 
-
 /**
  * The broker for Multi-master Replication.
  */
 public class ReplicationBroker implements InternalSearchListener
 {
+
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
-
   private boolean shutdown = false;
   private Collection<String> servers;
   private boolean connected = false;
@@ -95,23 +97,23 @@
   private long generationId = -1;
   private ReplSessionSecurity replSessionSecurity;
 
+  // Trick for avoiding a inner class for many parameters return for
+  // performHandshake method.
+  private String tmpReadableServerName = null;
+
   /**
    * The time in milliseconds between heartbeats from the replication
    * server.  Zero means heartbeats are off.
    */
   private long heartbeatInterval = 0;
-
-
   /**
    * A thread to monitor heartbeats on the session.
    */
   private HeartbeatMonitor heartbeatMonitor = null;
-
   /**
    * The number of times the connection was lost.
    */
   private int numLostConnections = 0;
-
   /**
    * When the broker cannot connect to any replication server
    * it log an error and keeps continuing every second.
@@ -121,7 +123,6 @@
    * finally succeed to connect.
    */
   private boolean connectionError = false;
-
   private final Object connectPhaseLock = new Object();
 
   /**
@@ -149,9 +150,9 @@
    * @param replSessionSecurity The session security configuration.
    */
   public ReplicationBroker(ServerState state, DN baseDn, short serverID,
-      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
-      int maxSendDelay, int window, long heartbeatInterval,
-      long generationId, ReplSessionSecurity replSessionSecurity)
+    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+    int maxSendDelay, int window, long heartbeatInterval,
+    long generationId, ReplSessionSecurity replSessionSecurity)
   {
     this.baseDn = baseDn;
     this.serverID = serverID;
@@ -164,7 +165,7 @@
       new TreeSet<FakeOperation>(new FakeOperationComparator());
     this.rcvWindow = window;
     this.maxRcvWindow = window;
-    this.halfRcvWindow = window/2;
+    this.halfRcvWindow = window / 2;
     this.heartbeatInterval = heartbeatInterval;
     this.protocolVersion = ProtocolVersion.currentVersion();
     this.generationId = generationId;
@@ -194,7 +195,6 @@
     this.connect();
   }
 
-
   /**
    * Connect to a ReplicationServer.
    *
@@ -202,7 +202,7 @@
    */
   private void connect()
   {
-    ReplServerStartMessage replServerStartMsg = null;
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
 
     // Stop any existing heartbeat monitor from a previous session.
     if (heartbeatMonitor != null)
@@ -211,187 +211,121 @@
       heartbeatMonitor = null;
     }
 
-    // checkState is true for the first loop on all replication servers
-    // looking for one already up-to-date.
-    // If we found some responding replication servers but none up-to-date
-    // then we set check-state to false and do a second loop where the first
-    // found will be the one elected and then we will update this replication
-    // server.
-    boolean checkState = true;
-    boolean receivedResponse = true;
-
-    // TODO: We are doing here 2 loops opening , closing , reopening session to
-    // the same servers .. risk to have 'same server id' erros.
-    // Would be better to do only one loop, keeping the best candidate while
-    // traversing the list of replication servers to connect to.
-    if (servers.size()==1)
-    {
-      checkState = false;
-    }
-
     synchronized (connectPhaseLock)
     {
-      while ((!connected) && (!shutdown) && (receivedResponse))
+      /*
+       * Connect to each replication server and get their ServerState then find
+       * out which one is the best to connect to.
+       */
+      for (String server : servers)
       {
-        receivedResponse = false;
-        for (String server : servers)
-        {
-          int separator = server.lastIndexOf(':');
-          String port = server.substring(separator + 1);
-          String hostname = server.substring(0, separator);
+        // Connect to server and get reply message
+        ReplServerStartMessage replServerStartMsg =
+          performHandshake(server, false);
+        tmpReadableServerName = null; // Not needed now
 
+        // Store reply message in list
+        if (replServerStartMsg != null)
+        {
+          ServerState rsState = replServerStartMsg.getServerState();
+          rsStates.put(server, rsState);
+        }
+      } // for servers
+
+      ReplServerStartMessage replServerStartMsg = null;
+
+      if (rsStates.size() > 0)
+      {
+
+        // At least one server answered, find the best one.
+        String bestServer = computeBestReplicationServer(state, rsStates,
+          serverID, baseDn);
+
+        // Best found, now connect to this one
+        replServerStartMsg = performHandshake(bestServer, true);
+
+        if (replServerStartMsg != null)
+        {
           try
           {
             /*
-             * Open a socket connection to the next candidate.
-             */
-            InetSocketAddress ServerAddr = new InetSocketAddress(
-                InetAddress.getByName(hostname), Integer.parseInt(port));
-            Socket socket = new Socket();
-            socket.setReceiveBufferSize(1000000);
-            socket.setTcpNoDelay(true);
-            socket.connect(ServerAddr, 500);
-            session = replSessionSecurity.createClientSession(server, socket);
-            boolean isSslEncryption =
-                 replSessionSecurity.isSslEncryption(server);
-            /*
-             * Send our ServerStartMessage.
-             */
-            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
-                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-                halfRcvWindow*2, heartbeatInterval, state,
-                protocolVersion, generationId, isSslEncryption);
-            session.publish(msg);
-
-
-            /*
-             * Read the ReplServerStartMessage that should come back.
-             */
-            session.setSoTimeout(1000);
-            replServerStartMsg = (ReplServerStartMessage) session.receive();
-            receivedResponse = true;
-
-            /*
-             * We have sent our own protocol version to the replication server.
-             * The replication server will use the same one (or an older one
-             * if it is an old replication server).
-             */
-            protocolVersion = ProtocolVersion.minWithCurrent(
-                replServerStartMsg.getVersion());
-            session.setSoTimeout(timeout);
-
-            if (!isSslEncryption)
-            {
-              session.stopEncryption();
-            }
-
-            /*
              * We must not publish changes to a replicationServer that has not
              * seen all our previous changes because this could cause some
              * other ldap servers to miss those changes.
              * Check that the ReplicationServer has seen all our previous
              * changes.
-             * If not, try another replicationServer.
-             * If no other replicationServer has seen all our changes, recover
-             * those changes and send them again to any replicationServer.
              */
             ChangeNumber replServerMaxChangeNumber =
               replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
+
             if (replServerMaxChangeNumber == null)
+            {
               replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+            }
             ChangeNumber ourMaxChangeNumber =
               state.getMaxChangeNumber(serverID);
-            if ((ourMaxChangeNumber == null) ||
-                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+
+            if ((ourMaxChangeNumber != null) &&
+              (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
             {
-              replicationServer = ServerAddr.toString();
-              maxSendWindow = replServerStartMsg.getWindowSize();
-              connected = true;
-              startHeartBeat();
-              break;
-            }
-            else
-            {
-              if (checkState == true)
+
+              // Replication server is missing some of our changes: let's send
+              // them to him.
+              replayOperations.clear();
+
+              Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
+              logError(message);
+
+              /*
+               * Get all the changes that have not been seen by this
+               * replication server and populate the replayOperations
+               * list.
+               */
+              InternalSearchOperation op = seachForChangedEntries(
+                baseDn, replServerMaxChangeNumber, this);
+              if (op.getResultCode() != ResultCode.SUCCESS)
               {
-                /* This replicationServer is missing some
-                 * of our changes, we are going to try another server
-                 * but before log a notice message
+                /*
+                 * An error happened trying to search for the updates
+                 * This server will start acepting again new updates but
+                 * some inconsistencies will stay between servers.
+                 * Log an error for the repair tool
+                 * that will need to resynchronize the servers.
                  */
-                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
-                    baseDn.toNormalizedString());
+                message = ERR_CANNOT_RECOVER_CHANGES.get(
+                  baseDn.toNormalizedString());
+                logError(message);
+              } else
+              {
+                for (FakeOperation replayOp : replayOperations)
+                {
+                  message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
+                    toString());
+                  logError(message);
+                  session.publish(replayOp.generateMessage());
+                }
+                message = DEBUG_CHANGES_SENT.get();
                 logError(message);
               }
-              else
-              {
-                replayOperations.clear();
-
-                 // TODO: i18n
-                logError(Message.raw("going to search for changes"));
-
-                /*
-                 * Get all the changes that have not been seen by this
-                 * replicationServer and populate the replayOperations
-                 * list.
-                 */
-                InternalSearchOperation op = seachForChangedEntries(
-                    baseDn, replServerMaxChangeNumber, this);
-                if (op.getResultCode() != ResultCode.SUCCESS)
-                {
-                  /*
-                   * An error happened trying to search for the updates
-                   * This server will start acepting again new updates but
-                   * some inconsistencies will stay between servers.
-                   * Log an error for the repair tool
-                   * that will need to resynchronize the servers.
-                   */
-                  Message message = ERR_CANNOT_RECOVER_CHANGES.get(
-                      baseDn.toNormalizedString());
-                  logError(message);
-                  replicationServer = ServerAddr.toString();
-                  maxSendWindow = replServerStartMsg.getWindowSize();
-                  connected = true;
-                  startHeartBeat();
-                }
-                else
-                {
-                  replicationServer = ServerAddr.toString();
-                  maxSendWindow = replServerStartMsg.getWindowSize();
-                  connected = true;
-                  for (FakeOperation replayOp : replayOperations)
-                  {
-                    logError(Message.raw("sendingChange")); // TODO: i18n
-                    session.publish(replayOp.generateMessage());
-                  }
-                  startHeartBeat();
-                  logError(Message.raw("changes sent")); // TODO: i18n
-                  break;
-                }
-              }
             }
-          }
-          catch (ConnectException e)
+
+            replicationServer = tmpReadableServerName;
+            maxSendWindow = replServerStartMsg.getWindowSize();
+            connected = true;
+            startHeartBeat();
+          } catch (IOException e)
           {
-            /*
-             * There was no server waiting on this host:port
-             * Log a notice and try the next replicationServer in the list
-             */
-            if (!connectionError )
-            {
-              // the error message is only logged once to avoid overflowing
-              // the error log
-              Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
-              logError(message);
-            }
-          }
-          catch (Exception e)
-          {
-            Message message = ERR_EXCEPTION_STARTING_SESSION.get(
-                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
-                stackTraceToSingleLineString(e));
+            Message message = ERR_PUBLISHING_FAKE_OPS.get(
+              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
+              stackTraceToSingleLineString(e));
             logError(message);
-          }
-          finally
+          } catch (Exception e)
+          {
+            Message message = ERR_COMPUTING_FAKE_OPS.get(
+              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
+              stackTraceToSingleLineString(e));
+            logError(message);
+          } finally
           {
             if (connected == false)
             {
@@ -402,81 +336,60 @@
                   session.close();
                 } catch (IOException e)
                 {
-                  // The session was already closed, just ignore.
+                // The session was already closed, just ignore.
                 }
                 session = null;
               }
             }
           }
-        } // for servers
-
-        // We have traversed all the replication servers
-
-        if ((!connected) && (checkState == true) && receivedResponse)
-        {
-          /*
-           * We could not find a replicationServer that has seen all the
-           * changes that this server has already processed, start again
-           * the loop looking for any replicationServer.
-           */
-          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
-              baseDn.toNormalizedString());
-          logError(message);
-          checkState = false;
-        }
-      }
-
-      // We have traversed all the replication servers as many times as needed
-      // to find one if one is up and running.
+        } // Could perform handshake with best
+      } // Reached some servers
 
       if (connected)
       {
-        // This server has connected correctly.
         // Log a message to let the administrator know that the failure was
         // resolved.
-        // wakeup all the thread that were waiting on the window
+        // Wakeup all the thread that were waiting on the window
         // on the previous connection.
         connectionError = false;
         if (sendWindow != null)
+        {
           sendWindow.release(Integer.MAX_VALUE);
+        }
         this.sendWindow = new Semaphore(maxSendWindow);
         connectPhaseLock.notify();
 
         if ((replServerStartMsg.getGenerationId() == this.generationId) ||
-           (replServerStartMsg.getGenerationId() == -1))
+          (replServerStartMsg.getGenerationId() == -1))
         {
           Message message =
             NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
-                baseDn.toString(),
-                replicationServer,
-                Long.toString(this.generationId));
+            baseDn.toString(),
+            replicationServer,
+            Long.toString(this.generationId));
           logError(message);
-        }
-        else
+        } else
         {
           Message message =
             NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
-                baseDn.toString(),
-                replicationServer,
-                Long.toString(this.generationId),
-                Long.toString(replServerStartMsg.getGenerationId()));
+            baseDn.toString(),
+            replicationServer,
+            Long.toString(this.generationId),
+            Long.toString(replServerStartMsg.getGenerationId()));
           logError(message);
         }
-      }
-      else
+      } else
       {
         /*
-         * This server could not find any replicationServer
-         * It's going to start in degraded mode.
-         * Log a message
+         * This server could not find any replicationServer. It's going to start
+         * in degraded mode. Log a message.
          */
         if (!connectionError)
         {
-          checkState = false;
           connectionError = true;
           connectPhaseLock.notify();
           Message message =
-              NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
+            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
           logError(message);
         }
       }
@@ -484,6 +397,315 @@
   }
 
   /**
+   * Connect to the provided server performing the handshake (start messages
+   * exchange) and return the reply message from the replication server.
+   *
+   * @param server Server to connect to.
+   * @param keepConnection Do we keep session opened or not after handshake.
+   * @return The ReplServerStartMessage the server replied. Null if could not
+   *         get an answer.
+   */
+  public ReplServerStartMessage performHandshake(String server,
+    boolean keepConnection)
+  {
+    ReplServerStartMessage replServerStartMsg = null;
+
+    // Parse server string.
+    int separator = server.lastIndexOf(':');
+    String port = server.substring(separator + 1);
+    String hostname = server.substring(0, separator);
+
+    boolean error = false;
+    try
+    {
+      /*
+       * Open a socket connection to the next candidate.
+       */
+      int intPort = Integer.parseInt(port);
+      InetSocketAddress serverAddr = new InetSocketAddress(
+        InetAddress.getByName(hostname), intPort);
+      tmpReadableServerName = serverAddr.toString();
+      Socket socket = new Socket();
+      socket.setReceiveBufferSize(1000000);
+      socket.setTcpNoDelay(true);
+      socket.connect(serverAddr, 500);
+      session = replSessionSecurity.createClientSession(server, socket);
+      boolean isSslEncryption =
+        replSessionSecurity.isSslEncryption(server);
+      /*
+       * Send our ServerStartMessage.
+       */
+      ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
+        maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+        halfRcvWindow * 2, heartbeatInterval, state,
+        protocolVersion, generationId, isSslEncryption);
+      session.publish(msg);
+
+      /*
+       * Read the ReplServerStartMessage that should come back.
+       */
+      session.setSoTimeout(1000);
+      replServerStartMsg = (ReplServerStartMessage) session.receive();
+
+      /*
+       * We have sent our own protocol version to the replication server.
+       * The replication server will use the same one (or an older one
+       * if it is an old replication server).
+       */
+      protocolVersion = ProtocolVersion.minWithCurrent(
+        replServerStartMsg.getVersion());
+      session.setSoTimeout(timeout);
+
+      if (!isSslEncryption)
+      {
+        session.stopEncryption();
+      }
+    } catch (ConnectException e)
+    {
+      /*
+       * There was no server waiting on this host:port
+       * Log a notice and try the next replicationServer in the list
+       */
+      if (!connectionError)
+      {
+        // the error message is only logged once to avoid overflowing
+        // the error log
+        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
+        logError(message);
+      }
+      error = true;
+    } catch (Exception e)
+    {
+      Message message = ERR_EXCEPTION_STARTING_SESSION.get(
+        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+        stackTraceToSingleLineString(e));
+      logError(message);
+      error = true;
+    }
+
+    // Close session if requested
+    if (!keepConnection || error)
+    {
+      if (session != null)
+      {
+        try
+        {
+          session.close();
+        } catch (IOException e)
+        {
+        // The session was already closed, just ignore.
+        }
+        session = null;
+      }
+      if (error)
+      {
+        replServerStartMsg = null;
+      } // Be sure to return null.
+    }
+
+    return replServerStartMsg;
+  }
+
+  /**
+   * Returns the replication server that best fits our need so that we can
+   * connect to it.
+   *
+   * Note: this method put as public static for unit testing purpose.
+   *
+   * @param myState The local server state.
+   * @param rsStates The list of available replication servers and their
+   *                 associated server state.
+   * @param serverId The server id for the suffix we are working for.
+   * @param baseDn The suffix for which we are working for.
+   * @return The computed best replication server.
+   */
+  public static String computeBestReplicationServer(ServerState myState,
+    HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
+  {
+
+    /*
+     * Find replication servers who are up to date (or more up to date than us,
+     * if for instance we failed and restarted, having sent some changes to the
+     * RS but without having time to store our own state) regarding our own
+     * server id. Then, among them, choose the server that is the most up to
+     * date regarding the whole topology.
+     *
+     * If no server is up to date regarding our own server id, find the one who
+     * is the most up to date regarding our server id.
+     */
+
+    // Should never happen (sanity check)
+    if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
+      (baseDn == null))
+    {
+      return null;
+    }
+
+    String bestServer = null;
+    // Servers up to dates with regard to our changes
+    HashMap<String, ServerState> upToDateServers =
+      new HashMap<String, ServerState>();
+    // Servers late with regard to our changes
+    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
+
+    /*
+     * Start loop to differenciate up to date servers from late ones.
+     */
+    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
+    if (myChangeNumber == null)
+    {
+      myChangeNumber = new ChangeNumber(0, 0, serverId);
+    }
+    for (String repServer : rsStates.keySet())
+    {
+
+      ServerState rsState = rsStates.get(repServer);
+      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
+      if (rsChangeNumber == null)
+      {
+        rsChangeNumber = new ChangeNumber(0, 0, serverId);
+      }
+
+      // Store state in right list
+      if (myChangeNumber.olderOrEqual(rsChangeNumber))
+      {
+        upToDateServers.put(repServer, rsState);
+      } else
+      {
+        lateOnes.put(repServer, rsState);
+      }
+    }
+
+    if (upToDateServers.size() > 0)
+    {
+
+      /*
+       * Some up to date servers, among them, choose the one that has the
+       * maximum number of changes to send us. This is the most up to date one
+       * regarding the whole topology. This server is the one which has the less
+       * difference with the topology server state. For comparison, we need to
+       * compute the difference for each server id with the topology server
+       * state.
+       */
+
+      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
+        upToDateServers.size(),
+        baseDn.toNormalizedString());
+      logError(message);
+
+      /*
+       * First of all, compute the virtual server state for the whole topology,
+       * which is composed of the most up to date change numbers for
+       * each server id in the topology.
+       */
+      ServerState topoState = new ServerState();
+      for (ServerState curState : upToDateServers.values())
+      {
+
+        Iterator<Short> it = curState.iterator();
+        while (it.hasNext())
+        {
+          Short sId = it.next();
+          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
+          if (curSidCn == null)
+          {
+            curSidCn = new ChangeNumber(0, 0, sId);
+          }
+          // Update topology state
+          topoState.update(curSidCn);
+        }
+      } // For up to date servers
+
+      // Min of the max shifts
+      long minShift = -1L;
+      for (String upServer : upToDateServers.keySet())
+      {
+
+        /*
+         * Compute the maximum difference between the time of a server id's
+         * change number and the time of the matching server id's change
+         * number in the topology server state.
+         *
+         * Note: we could have used the sequence number here instead of the
+         * timestamp, but this would have caused a problem when the sequence
+         * number loops and comes back to 0 (computation would have becomen
+         * meaningless).
+         */
+        long shift = -1L;
+        ServerState curState = upToDateServers.get(upServer);
+        Iterator<Short> it = curState.iterator();
+        while (it.hasNext())
+        {
+          Short sId = it.next();
+          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
+          if (curSidCn == null)
+          {
+            curSidCn = new ChangeNumber(0, 0, sId);
+          }
+          // Cannot be null as checked at construction time
+          ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
+          // Cannot be negative as topoState computed as being the max CN
+          // for each server id in the topology
+          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
+          if (tmpShift > shift)
+          {
+            shift = tmpShift;
+          }
+        }
+
+        if ((minShift < 0) // First time in loop
+          || (shift < minShift))
+        {
+          // This sever is even closer to topo state
+          bestServer = upServer;
+          minShift = shift;
+        }
+      } // For up to date servers
+
+    } else
+    {
+      /*
+       * We could not find a replication server that has seen all the
+       * changes that this server has already processed,
+       */
+      // lateOnes cannot be empty
+      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
+        baseDn.toNormalizedString(), lateOnes.size());
+      logError(message);
+
+      // Min of the shifts
+      long minShift = -1L;
+      for (String lateServer : lateOnes.keySet())
+      {
+
+        /*
+         * Choose the server who is the closest to us regarding our server id
+         * (this is the most up to date regarding our server id).
+         */
+        ServerState curState = lateOnes.get(lateServer);
+        ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
+        if (ourSidCn == null)
+        {
+          ourSidCn = new ChangeNumber(0, 0, serverId);
+        }
+        // Cannot be negative as our Cn for our server id is strictly
+        // greater than those of the servers in late server list
+        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
+
+        if ((minShift < 0) // First time in loop
+          || (tmpShift < minShift))
+        {
+          // This sever is even closer to topo state
+          bestServer = lateServer;
+          minShift = tmpShift;
+        }
+      } // For late servers
+    }
+
+    return bestServer;
+  }
+
+  /**
    * Search for the changes that happened since fromChangeNumber
    * based on the historical attribute.
    * @param baseDn the base DN
@@ -493,26 +715,26 @@
    * @throws Exception when raised.
    */
   public static InternalSearchOperation seachForChangedEntries(
-      DN baseDn,
-      ChangeNumber fromChangeNumber,
-      InternalSearchListener resultListener)
-  throws Exception
+    DN baseDn,
+    ChangeNumber fromChangeNumber,
+    InternalSearchListener resultListener)
+    throws Exception
   {
     InternalClientConnection conn =
       InternalClientConnection.getRootConnection();
     LDAPFilter filter = LDAPFilter.decode(
-        "("+ Historical.HISTORICALATTRIBUTENAME +
-        ">=dummy:" + fromChangeNumber + ")");
+      "(" + Historical.HISTORICALATTRIBUTENAME +
+      ">=dummy:" + fromChangeNumber + ")");
     LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
     attrs.add(Historical.HISTORICALATTRIBUTENAME);
     attrs.add(Historical.ENTRYUIDNAME);
     return conn.processSearch(
-        new ASN1OctetString(baseDn.toString()),
-        SearchScope.WHOLE_SUBTREE,
-        DereferencePolicy.NEVER_DEREF_ALIASES,
-        0, 0, false, filter,
-        attrs,
-        resultListener);
+      new ASN1OctetString(baseDn.toString()),
+      SearchScope.WHOLE_SUBTREE,
+      DereferencePolicy.NEVER_DEREF_ALIASES,
+      0, 0, false, filter,
+      attrs,
+      resultListener);
   }
 
   /**
@@ -524,14 +746,13 @@
     if (heartbeatInterval > 0)
     {
       heartbeatMonitor =
-           new HeartbeatMonitor("Replication Heartbeat Monitor on " +
-               baseDn + " with " + getReplicationServer(),
-               session, heartbeatInterval);
+        new HeartbeatMonitor("Replication Heartbeat Monitor on " +
+        baseDn + " with " + getReplicationServer(),
+        session, heartbeatInterval);
       heartbeatMonitor.start();
     }
   }
 
-
   /**
    * restart the ReplicationBroker.
    */
@@ -556,7 +777,7 @@
       }
     } catch (IOException e1)
     {
-      // ignore
+    // ignore
     }
 
     if (failingSession == session)
@@ -572,7 +793,7 @@
       {
         MessageBuilder mb = new MessageBuilder();
         mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
-         baseDn.toNormalizedString(), e.getLocalizedMessage()));
+          baseDn.toNormalizedString(), e.getLocalizedMessage()));
         mb.append(stackTraceToSingleLineString(e));
         logError(mb.toMessage());
       }
@@ -583,13 +804,12 @@
           Thread.sleep(500);
         } catch (InterruptedException e)
         {
-          // ignore
+        // ignore
         }
       }
     }
   }
 
-
   /**
    * Publish a message to the other servers.
    * @param msg the message to publish
@@ -598,7 +818,7 @@
   {
     boolean done = false;
 
-    while (!done)
+    while (!done && !shutdown)
     {
       if (connectionError)
       {
@@ -611,7 +831,7 @@
         if (debugEnabled())
         {
           debugInfo("ReplicationBroker.publish() Publishing a " +
-              " message is not possible due to existing connection error.");
+            " message is not possible due to existing connection error.");
         }
 
         return;
@@ -642,9 +862,8 @@
           // want to hold off reconnection in case the connection dropped.
           credit =
             currentWindowSemaphore.tryAcquire(
-                (long) 500, TimeUnit.MILLISECONDS);
-        }
-        else
+            (long) 500, TimeUnit.MILLISECONDS);
+        } else
         {
           credit = true;
         }
@@ -685,24 +904,22 @@
             if (debugEnabled())
             {
               debugInfo("ReplicationBroker.publish() " +
-                  "IO exception raised : " + e.getLocalizedMessage());
+                "IO exception raised : " + e.getLocalizedMessage());
             }
           }
         }
-      }
-      catch (InterruptedException e)
+      } catch (InterruptedException e)
       {
         // just loop.
         if (debugEnabled())
         {
           debugInfo("ReplicationBroker.publish() " +
-              "Interrupted exception raised." + e.getLocalizedMessage());
+            "Interrupted exception raised." + e.getLocalizedMessage());
         }
       }
     }
   }
 
-
   /**
    * Receive a message.
    * This method is not multithread safe and should either always be
@@ -730,8 +947,7 @@
         {
           WindowMessage windowMsg = (WindowMessage) msg;
           sendWindow.release(windowMsg.getNumAck());
-        }
-        else
+        } else
         {
           if (msg instanceof UpdateMessage)
           {
@@ -752,11 +968,11 @@
         if (shutdown == false)
         {
           Message message =
-              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
+            NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
           logError(message);
 
           debugInfo("ReplicationBroker.receive() " + baseDn +
-              " Exception raised." + e + e.getLocalizedMessage());
+            " Exception raised." + e + e.getLocalizedMessage());
           this.reStart(failingSession);
         }
       }
@@ -764,7 +980,6 @@
     return null;
   }
 
-
   /**
    * stop the server.
    */
@@ -773,8 +988,10 @@
     replicationServer = "stopped";
     shutdown = true;
     connected = false;
-    if (heartbeatMonitor!= null)
+    if (heartbeatMonitor != null)
+    {
       heartbeatMonitor.shutdown();
+    }
     try
     {
       if (debugEnabled())
@@ -784,9 +1001,12 @@
       }
 
       if (session != null)
+      {
         session.close();
+      }
     } catch (IOException e)
-    {}
+    {
+    }
   }
 
   /**
@@ -834,12 +1054,13 @@
   {
     return replicationServer;
   }
+
   /**
    * {@inheritDoc}
    */
   public void handleInternalSearchEntry(
-      InternalSearchOperation searchOperation,
-      SearchResultEntry searchEntry)
+    InternalSearchOperation searchOperation,
+    SearchResultEntry searchEntry)
   {
     /*
      * Only deal with modify operation so far
@@ -862,10 +1083,10 @@
    * {@inheritDoc}
    */
   public void handleInternalSearchReference(
-      InternalSearchOperation searchOperation,
-      SearchResultReference searchReference)
+    InternalSearchOperation searchOperation,
+    SearchResultReference searchReference)
   {
-    // TODO to be implemented
+  // TODO to be implemented
   }
 
   /**
@@ -906,9 +1127,12 @@
   public int getCurrentSendWindow()
   {
     if (connected)
+    {
       return sendWindow.availablePermits();
-    else
+    } else
+    {
       return 0;
+    }
   }
 
   /**
@@ -920,7 +1144,6 @@
     return numLostConnections;
   }
 
-
   /**
    * Change some config parameters.
    *
@@ -933,8 +1156,8 @@
    * @param heartbeatInterval   The heartbeat interval.
    */
   public void changeConfig(Collection<String> replicationServers,
-      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
-      int maxSendDelay, int window, long heartbeatInterval)
+    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+    int maxSendDelay, int window, long heartbeatInterval)
   {
     this.servers = replicationServers;
     this.maxRcvWindow = window;
@@ -943,9 +1166,9 @@
     this.maxReceiveQueue = maxReceiveQueue;
     this.maxSendDelay = maxSendDelay;
     this.maxSendQueue = maxSendQueue;
-    // TODO : Changing those parameters requires to either restart a new
-    // session with the replicationServer or renegociate the parameters that
-    // were sent in the ServerStart message
+  // TODO : Changing those parameters requires to either restart a new
+  // session with the replicationServer or renegociate the parameters that
+  // were sent in the ServerStart message
   }
 
   /**
@@ -968,7 +1191,11 @@
     return !connectionError;
   }
 
-  private boolean debugEnabled() { return true; }
+  private boolean debugEnabled()
+  {
+    return true;
+  }
+
   private static final void debugInfo(String s)
   {
     logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 397a00a..c1add4c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -133,9 +133,6 @@
 
   // At startup, the listen thread wait on this flag for the connet
   // thread to look for other servers in the topology.
-  // TODO when a replication server is out of date (has old changes
-  // to receive from other servers, the listen thread should not accept
-  // connection from ldap servers. (issue 1302)
   private boolean connectedInTopology = false;
   private final Object connectedInTopologyLock = new Object();
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
new file mode 100644
index 0000000..5321783
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -0,0 +1,684 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import java.util.HashMap;
+import static org.opends.server.replication.plugin.ReplicationBroker.*;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.testng.Assert.*;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.types.DN;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test the algorithm for find the best replication server among the configured
+ * ones.
+ */
+public class ComputeBestServerTest extends ReplicationTestCase
+{
+
+  // The tracer object for the debug logger
+  private static final DebugTracer TRACER = getTracer();
+
+  private void debugInfo(String s)
+  {
+    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("** TEST **" + s);
+    }
+  }
+
+  private void debugInfo(String message, Exception e)
+  {
+    debugInfo(message + stackTraceToSingleLineString(e));
+  }
+
+  /**
+   * Set up the environment.
+   *
+   * @throws Exception
+   *           If the environment could not be set up.
+   */
+  @BeforeClass
+  @Override
+  public void setUp() throws Exception
+  {
+  // Don't need server context in these tests
+  }
+
+  /**
+   * Clean up the environment.
+   *
+   * @throws Exception
+   *           If the environment could not be set up.
+   */
+  @AfterClass
+  @Override
+  public void classCleanUp() throws Exception
+  {
+  // Don't need server context in these tests
+  }
+
+  /**
+   * Test with one replication server, nobody has a change number (simulates)
+   * very first connection.
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void testNullCNBoth() throws Exception
+  {
+    String testCase = "testNullCNBoth";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+
+    // definitions for server names
+    final String WINNER = "winner";
+
+    // Create my state
+    ServerState mySt = new ServerState();     
+    ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(0L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(0L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+  
+  /**
+   * Test with one replication server, only replication server has a non null
+   * changenumber for ds server id
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void testNullCNDS() throws Exception
+  {
+    String testCase = "testNullCNDS";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+    // definitions for server names
+    final String WINNER = "winner";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(0L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(0L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(0L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+  
+  /**
+   * Test with one replication server, only ds server has a non null
+   * changenumber for ds server id but rs has a null one.
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void testNullCNRS() throws Exception
+  {
+    String testCase = "testNullCNRS";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+    
+    // definitions for server names
+    final String WINNER = "winner";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(0L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(0L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+
+  /**
+   * Test with one replication server, up to date.
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void test1ServerUp() throws Exception
+  {
+    String testCase = "test1ServerUp";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+    // definitions for server names
+    final String WINNER = "winner";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(1L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+
+  /**
+   * Test with 2 replication servers, up to date.
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void test2ServersUp() throws Exception
+  {
+    String testCase = "test2ServersUp";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+    // definitions for server names
+    final String WINNER = "winner";
+    final String LOOSER1 = "looser1";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(1L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER1, aState);
+
+    // State for server 2
+    aState = new ServerState();
+    cn = new ChangeNumber(2L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+  
+  /**
+   * Test with 3 replication servers, up to date.
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void test3ServersUp() throws Exception
+  {
+    String testCase = "test3ServersUp";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+    // definitions for server names
+    final String WINNER = "winner";
+    final String LOOSER1 = "looser1";
+    final String LOOSER2 = "looser2";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(1L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER1, aState);
+
+    // State for server 2
+    aState = new ServerState();
+    cn = new ChangeNumber(2L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    // State for server 3
+    aState = new ServerState();
+    cn = new ChangeNumber(3L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER2, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+  
+  /**
+   * Test with one replication server, late.
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void test1ServerLate() throws Exception
+  {
+    String testCase = "test1ServerLate";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+    // definitions for server names
+    final String WINNER = "winner";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(0L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(1L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+  
+  /**
+   * Test with 2 replication servers, late.
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void test2ServersLate() throws Exception
+  {
+    String testCase = "test2ServersLate";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+    // definitions for server names
+    final String WINNER = "winner";
+    final String LOOSER1 = "looser1";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(2L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(0L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER1, aState);
+
+    // State for server 2
+    aState = new ServerState();
+    cn = new ChangeNumber(1L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(0L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(0L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+  
+  /**
+   * Test with 3 replication servers, late.
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void test3ServersLate() throws Exception
+  {
+    String testCase = "test3ServersLate";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;
+    // definitions for server names
+    final String WINNER = "winner";
+    final String LOOSER1 = "looser1";
+    final String LOOSER2 = "looser2";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(1L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER1, aState);
+
+    // State for server 2
+    aState = new ServerState();
+    cn = new ChangeNumber(3L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(0L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(0L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+
+    // State for server 3
+    aState = new ServerState();
+    cn = new ChangeNumber(2L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER2, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+  
+  /**
+   * Test with 6 replication servers, some up, some late, one null
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void test6ServersMixed() throws Exception
+  {
+    String testCase = "test6ServersMixed";
+
+    debugInfo("Starting " + testCase);
+
+    // definitions for server ids
+    short myId1 = 1;
+    short myId2 = 2;
+    short myId3 = 3;    
+    
+    // definitions for server names
+    final String WINNER = "winner";
+    final String LOOSER1 = "looser1";
+    final String LOOSER2 = "looser2";
+    final String LOOSER3 = "looser3";
+    final String LOOSER4 = "looser4";
+    final String LOOSER5 = "looser5";
+
+    // Create my state
+    ServerState mySt = new ServerState();
+    ChangeNumber cn = new ChangeNumber(5L, 0, myId1);
+    mySt.update(cn);
+    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+    mySt.update(cn);
+    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+    mySt.update(cn);
+
+    // Create replication servers state list
+    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+    // State for server 1
+    ServerState aState = new ServerState();
+    cn = new ChangeNumber(4L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER1, aState);
+
+    // State for server 2
+    aState = new ServerState();
+    cn = new ChangeNumber(7L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(6L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(5L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER2, aState);
+
+    // State for server 3
+    aState = new ServerState();
+    cn = new ChangeNumber(3L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(10L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER3, aState);
+    
+    // State for server 4
+    aState = new ServerState();
+    cn = new ChangeNumber(6L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(6L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(8L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(WINNER, aState);
+    
+    // State for server 5 (null one for our serverid)
+    aState = new ServerState();
+    cn = new ChangeNumber(5L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(5L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER4, aState);
+    
+    // State for server 6
+    aState = new ServerState();
+    cn = new ChangeNumber(5L, 0, myId1);
+    aState.update(cn);
+    cn = new ChangeNumber(7L, 0, myId2);
+    aState.update(cn);
+    cn = new ChangeNumber(6L, 0, myId3);
+    aState.update(cn);
+    rsStates.put(LOOSER5, aState);
+
+    String bestServer =
+      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+    assertEquals(bestServer, WINNER, "Wrong best replication server.");
+  }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
new file mode 100644
index 0000000..2c8116d
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -0,0 +1,477 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import java.io.IOException;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.testng.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.DN;
+import org.opends.server.types.ResultCode;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test if the replication domain is able to switch of replication rerver
+ * if there is some replication server failure.
+ */
+@Test(sequential = true)
+public class ReplicationServerFailoverTest extends ReplicationTestCase
+{
+
+  private static final String BASEDN_STRING = "dc=example,dc=com";
+  private static final short DS1_ID = 1;
+  private static final short DS2_ID = 2;
+  private static final short RS1_ID = 11;
+  private static final short RS2_ID = 12;
+  private int rs1Port = -1;
+  private int rs2Port = -1;
+  private ReplicationDomain rd1 = null;
+  private ReplicationDomain rd2 = null;
+  private ReplicationServer rs1 = null;
+  private ReplicationServer rs2 = null;
+
+  // The tracer object for the debug logger
+  private static final DebugTracer TRACER = getTracer();
+
+  private void debugInfo(String s)
+  {
+    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("** TEST **" + s);
+    }
+  }
+
+  private void debugInfo(String message, Exception e)
+  {
+    debugInfo(message + stackTraceToSingleLineString(e));
+  }
+
+  private void initTest()
+  {
+    rs1Port = -1;
+    rs2Port = -1;
+    rd1 = null;
+    rd2 = null;
+    rs1 = null;
+    rs2 = null;
+    findFreePorts();
+  }
+
+  private void endTest()
+  {
+    if (rd1 != null)
+    {
+      rd1.shutdown();
+      rd1 = null;
+    }
+
+    if (rd2 != null)
+    {
+      rd2.shutdown();
+      rd2 = null;
+    }
+
+    if (rs1 != null)
+    {
+      rs1.shutdown();
+      rs1 = null;
+    }
+
+    if (rs2 != null)
+    {
+      rs2.shutdown();
+      rs2 = null;
+    }
+    rs1Port = -1;
+    rs2Port = -1;
+  }
+
+  /**
+   * Test the failover feature when one RS fails:
+   * 1 DS (DS1) and 2 RS (RS1 and RS2) in topology.
+   * DS1 connected to RS1 (DS1<->RS1)
+   * Both RS are connected together (RS1<->RS2)
+   * RS1 fails, DS1 should be connected to RS2
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test
+  public void testFailOverSingle() throws Exception
+  {
+    String testCase = "testFailOverSingle";
+
+    debugInfo("Starting " + testCase);
+
+    initTest();
+
+    // Start RS1
+    rs1 = createReplicationServer(RS1_ID, testCase);
+    // Start RS2
+    rs2 = createReplicationServer(RS2_ID, testCase);
+
+    // Start DS1
+    DN baseDn = DN.decode(BASEDN_STRING);
+    rd1 = createReplicationDomain(baseDn, DS1_ID, testCase);
+
+    // DS1 connected to RS1 ?
+    String msg = "Before " + RS1_ID + " failure";
+    checkConnection(DS1_ID, RS1_ID, msg);
+
+    // Simulate RS1 failure
+    rs1.shutdown();
+    // Let time for failover to happen
+    sleep(5000);
+
+    // DS1 connected to RS2 ?
+    msg = "After " + RS1_ID + " failure";
+    checkConnection(DS1_ID, RS2_ID, msg);
+
+    endTest();
+  }
+
+  /**
+   * Test the failover feature when one RS fails:
+   * 2 DS (DS1 and DS2) and 2 RS (RS1 and RS2) in topology.
+   * Each DS connected to its own RS (DS1<->RS1, DS2<->RS2)
+   * Both RS are connected together (RS1<->RS2)
+   * RS1 fails, DS1 and DS2 should be both connected to RS2
+   * RS1 comes back (no change)
+   * RS2 fails, DS1 and DS2 should be both connected to RS1
+   *
+   * @throws Exception If a problem occured
+   */
+  @Test(enabled = false)
+  // This test to be run in standalone, not in precommit
+  // because the timing is important as we restart servers after they fail
+  // and thus cannot warrenty that the recovering server is the right one if
+  // the sleep time is not enough with regard to thread scheduling in heavy
+  // precommit environment
+  public void testFailOverMulti() throws Exception
+  {
+    String testCase = "testFailOverMulti";
+
+    debugInfo("Starting " + testCase);
+
+    initTest();
+
+    // Start RS1
+    rs1 = createReplicationServer(RS1_ID, testCase);
+    // Start RS2
+    rs2 = createReplicationServer(RS2_ID, testCase);
+
+    // Start DS1
+    DN baseDn = DN.decode(BASEDN_STRING);
+    rd1 = createReplicationDomain(baseDn, DS1_ID, testCase);
+    // Start DS2
+    rd2 = createReplicationDomain(baseDn, DS2_ID, testCase);
+
+    // DS1 connected to RS1 ?
+    String msg = "Before " + RS1_ID + " failure";
+    checkConnection(DS1_ID, RS1_ID, msg);
+    // DS2 connected to RS2 ?
+    checkConnection(DS2_ID, RS2_ID, msg);
+
+    // Simulate RS1 failure
+    rs1.shutdown();
+    // Let time for failover to happen
+    sleep(5000);
+
+    // DS1 connected to RS2 ?
+    msg = "After " + RS1_ID + " failure";
+    checkConnection(DS1_ID, RS2_ID, msg);
+    // DS2 connected to RS2 ?
+    checkConnection(DS2_ID, RS2_ID, msg);
+
+    // Restart RS1
+    rs1 = createReplicationServer(RS1_ID, testCase);
+    // Let time for RS1 to restart
+    sleep(5000);
+
+    // DS1 connected to RS2 ?
+    msg = "Before " + RS2_ID + " failure";
+    checkConnection(DS1_ID, RS2_ID, msg);
+    // DS2 connected to RS2 ?
+    checkConnection(DS2_ID, RS2_ID, msg);
+
+    // Simulate RS2 failure
+    rs2.shutdown();
+    // Let time for failover to happen
+    sleep(5000);
+
+    // DS1 connected to RS1 ?
+    msg = "After " + RS2_ID + " failure";
+    checkConnection(DS1_ID, RS1_ID, msg);
+    // DS2 connected to RS1 ?
+    checkConnection(DS2_ID, RS1_ID, msg);
+
+    // Restart RS2
+    rs2 = createReplicationServer(RS2_ID, testCase);
+    // Let time for RS2 to restart
+    sleep(5000);
+
+    // DS1 connected to RS1 ?
+    msg = "After " + RS2_ID + " restart";
+    checkConnection(DS1_ID, RS1_ID, msg);
+    // DS2 connected to RS1 ?
+    checkConnection(DS2_ID, RS1_ID, msg);
+
+    endTest();
+  }
+
+  private void sleep(long time)
+  {
+    try
+    {
+      Thread.sleep(time);
+    } catch (InterruptedException ex)
+    {
+      fail("Error sleeping " + stackTraceToSingleLineString(ex));
+    }
+  }
+
+  /**
+   * Check connection of the provided replication domain to the provided
+   * replication server.
+   */
+  private void checkConnection(short dsId, short rsId, String msg)
+  {
+
+    int rsPort = -1;
+    ReplicationDomain rd = null;
+    if (dsId == DS1_ID)
+    {
+      rd = rd1;
+    } else if (dsId == DS2_ID)
+    {
+      rd = rd2;
+    } else
+    {
+      fail("Unknown replication domain server id.");
+    }
+
+    if (rsId == RS1_ID)
+    {
+      rsPort = rs1Port;
+    } else if (rsId == RS2_ID)
+    {
+      rsPort = rs2Port;
+    } else
+    {
+      fail("Unknown replication server id.");
+    }
+
+    // Connected ?
+    assertEquals(rd.isConnected(), true,
+      "Replication domain " + dsId +
+      " is not connected to a replication server (" + msg + ")");
+    // Right port ?
+    String serverStr = rd.getReplicationServer();
+    int index = serverStr.lastIndexOf(':');
+    if ((index == -1) || (index >= serverStr.length()))
+      fail("Enable to find port number in: " + serverStr);
+    String rdPortStr = serverStr.substring(index + 1);
+    int rdPort = -1;
+    try
+    {
+      rdPort = (new Integer(rdPortStr)).intValue();
+    } catch (Exception e)
+    {
+      fail("Enable to get an int from: " + rdPortStr);
+    }
+    assertEquals(rdPort, rsPort,
+      "Replication domain " + dsId +
+      " is not connected to right replication server port (" +
+      rdPort + ") was expecting " + rsPort +
+      " (" + msg + ")");
+  }
+
+  /**
+   * Find needed free TCP ports.
+   */
+  private void findFreePorts()
+  {
+    try
+    {
+      ServerSocket socket1 = TestCaseUtils.bindFreePort();
+      ServerSocket socket2 = TestCaseUtils.bindFreePort();
+      rs1Port = socket1.getLocalPort();
+      rs2Port = socket2.getLocalPort();
+      socket1.close();
+      socket2.close();
+    } catch (IOException e)
+    {
+      fail("Unable to determinate some free ports " +
+        stackTraceToSingleLineString(e));
+    }
+  }
+
+  /**
+   * Creates a new ReplicationServer.
+   */
+  private ReplicationServer createReplicationServer(short serverId,
+    String suffix)
+  {
+    SortedSet<String> replServers = new TreeSet<String>();
+    try
+    {
+      int port = -1;
+      if (serverId == RS1_ID)
+      {
+        port = rs1Port;
+        replServers.add("localhost:" + rs2Port);
+      } else if (serverId == RS2_ID)
+      {
+        port = rs2Port;
+        replServers.add("localhost:" + rs1Port);
+      } else
+      {
+        fail("Unknown replication server id.");
+      }
+
+      String dir = "genid" + serverId + suffix + "Db";
+      ReplServerFakeConfiguration conf =
+        new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
+        replServers);
+      ReplicationServer replicationServer = new ReplicationServer(conf);
+      return replicationServer;
+
+    } catch (Exception e)
+    {
+      fail("createReplicationServer " + stackTraceToSingleLineString(e));
+    }
+    return null;
+  }
+
+  /**
+   * Creates a new ReplicationDomain.
+   */
+  private ReplicationDomain createReplicationDomain(DN baseDn, short serverId,
+    String suffix)
+  {
+
+    SortedSet<String> replServers = new TreeSet<String>();
+    try
+    {
+      if (serverId == DS1_ID)
+      {
+        replServers.add("localhost:" + rs1Port);
+      } else if (serverId == DS2_ID)
+      {
+        replServers.add("localhost:" + rs2Port);
+      } else
+      {
+        fail("Unknown replication domain server id.");
+      }
+
+      DomainFakeCfg domainConf =
+        new DomainFakeCfg(baseDn, serverId, replServers);
+      //domainConf.setHeartbeatInterval(500);
+      ReplicationDomain replicationDomain =
+        MultimasterReplication.createNewDomain(domainConf);
+
+      // Add other server (doing that after connection insure we connect to
+      // the right server)
+      // WARNING: only works because for the moment, applying changes to conf
+      // does not force reconnection in replication domain
+      // when it is coded, the reconnect may 1 of both servers and we can not
+      // guaranty anymore that we reach the server we want at the beginning.
+      if (serverId == DS1_ID)
+      {
+        replServers.add("localhost:" + rs2Port);
+      } else if (serverId == DS2_ID)
+      {
+        replServers.add("localhost:" + rs1Port);
+      } else
+      {
+        fail("Unknown replication domain server id.");
+      }
+      domainConf = new DomainFakeCfg(baseDn, serverId, replServers);
+      ConfigChangeResult chgRes =
+        replicationDomain.applyConfigurationChange(domainConf);
+      if ((chgRes == null) ||
+        (!chgRes.getResultCode().equals(ResultCode.SUCCESS)))
+      {
+        fail("Could not change replication domain config" +
+          " (add some replication servers).");
+      }
+
+      return replicationDomain;
+
+    } catch (Exception e)
+    {
+      fail("createReplicationDomain " + stackTraceToSingleLineString(e));
+    }
+    return null;
+  }
+
+  /**
+   * Set up the environment.
+   *
+   * @throws Exception
+   *           If the environment could not be set up.
+   */
+  @BeforeClass
+  @Override
+  public void setUp() throws Exception
+  {
+    super.setUp();
+  // In case we need to extend
+  }
+
+  /**
+   * Clean up the environment.
+   *
+   * @throws Exception
+   *           If the environment could not be set up.
+   */
+  @AfterClass
+  @Override
+  public void classCleanUp() throws Exception
+  {
+    super.classCleanUp();
+  // In case we need it extend
+  }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index fc5180d..210d849 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -785,7 +785,8 @@
           }
           else
           {
-            fail("ReplicationServer transmission failed: no expected message class.");
+            fail("ReplicationServer transmission failed: no expected message" +
+              " class: " + msg2);
             break;
           }
         }

--
Gitblit v1.10.0