From b4f8838b15342670c31753a484abf0129e3c9653 Mon Sep 17 00:00:00 2001
From: jcduff <jcduff@localhost>
Date: Thu, 23 Oct 2008 14:04:24 +0000
Subject: [PATCH] The commit will bring the following features :     - An updated version of the underlying database. BDB JE 3.3 is now used.     - Attribute API refactoring providing a better abstraction and offering improved performances.     - A new GUI called the Control-Panel to replace the Status-Panel: the specifications for this       GUI are available on OpenDS Wiki and contains a link to a mockup.        See <https://www.opends.org/wiki/page/ControlPanelUISpecification>.     - Some changes in the replication protocol to implement "Assured Replication Mode". The        specifications are on OpenDS Wiki at <https://www.opends.org/wiki/page/AssuredMode> and section 7       described some of the replication changes required to support this. Assured Replication is not finished,       but the main replication protocol changes to support it are done. As explained by Gilles on an email on       the Dev mailing list (http://markmail.org/message/46rgo3meq3vriy4a), with these changes the newer versions       of OpenDS may not be able to replicate with OpenDS 1.0 instances.     - Support for Service Tags on the platforms where the functionality is available and enabled. Specifications       are published at <https://www.opends.org/wiki/page/OpenDSServiceTagEnabled>. For more information on       Service Tags see <http://wikis.sun.com/display/ServiceTag/Sun+Service+Tag+FAQ>.     - The Admin Connector service. In order to provide agentry of the OpenDS server at any time, a new service       has been added, dedicated to the administration, configuration and monitoring of the server.       An overview of the Admin Connector service and it's use is available on the       OpenDS wiki <https://www.opends.org/wiki/page/ManagingAdministrationTrafficToTheServer>     - Updates to the various command line tools to support the Admin Connector service.     - Some internal re-architecting of the server to put the foundation of future developments such as virtual       directory services. The new NetworkGroups and WorkFlow internal services which have been specified in       <https://www.opends.org/wiki/page/BasicOperationRoutingThroughNetworkGroup> are now implemented.     - Many bug fixes...

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 1998 ++++++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 1,296 insertions(+), 702 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index f85e133..a8ac0c2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -30,6 +30,7 @@
 
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.StatusMachine.*;
 
 import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.messages.ReplicationMessages.*;
@@ -40,8 +41,8 @@
 import java.util.List;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -49,47 +50,59 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachine;
+import org.opends.server.replication.common.StatusMachineEvent;
 import org.opends.server.replication.protocol.*;
 import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
+import org.opends.server.types.AttributeBuilder;
+import org.opends.server.types.Attributes;
 import org.opends.server.types.DN;
 import org.opends.server.types.InitializationException;
 import org.opends.server.util.TimeThread;
 
 /**
  * This class defines a server handler, which handles all interaction with a
- * replication server.
+ * peer server (RS or DS).
  */
 public class ServerHandler extends MonitorProvider<MonitorProviderCfg>
 {
+
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
-
   /**
    * Time during which the server will wait for existing thread to stop
-   * during the shutdown.
+   * during the shutdownWriter.
    */
   private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
 
+  /*
+   * Properties, filled if remote server is either a DS or a RS
+   */
   private short serverId;
   private ProtocolSession session;
   private final MsgQueue msgQueue = new MsgQueue();
   private MsgQueue lateQueue = new MsgQueue();
-  private final Map<ChangeNumber, AckMessageList> waitingAcks  =
-          new HashMap<ChangeNumber, AckMessageList>();
+  private final Map<ChangeNumber, AckMessageList> waitingAcks =
+    new HashMap<ChangeNumber, AckMessageList>();
   private ReplicationServerDomain replicationServerDomain = null;
   private String serverURL;
   private int outCount = 0; // number of update sent to the server
+
   private int inCount = 0;  // number of updates received from the server
+
   private int inAckCount = 0;
   private int outAckCount = 0;
   private int maxReceiveQueue = 0;
@@ -105,10 +118,9 @@
   private boolean serverIsLDAPserver;
   private boolean following = false;
   private ServerState serverState;
-  private boolean active = true;
+  private boolean activeWriter = true;
   private ServerWriter writer = null;
   private DN baseDn = null;
-  private String serverAddressURL;
   private int rcvWindow;
   private int rcvWindowSizeHalf;
   private int maxRcvWindow;
@@ -116,42 +128,63 @@
   private Semaphore sendWindow;
   private int sendWindowSize;
   private boolean flowControl = false; // indicate that the server is
-                                       // flow controlled and should
-                                       // be stopped from sending messages.
+  // flow controlled and should
+  // be stopped from sending messages.
+
   private int saturationCount = 0;
   private short replicationServerId;
-
-  private short protocolVersion;
+  private short protocolVersion = -1;
   private long generationId = -1;
 
+  // Group id of this remote server
+  private byte groupId = (byte) -1;
 
+  /*
+   * Properties filled only if remote server is a DS
+   */
+
+  // Status of this DS
+  private ServerStatus status = ServerStatus.INVALID_STATUS;
+  // Referrals URLs this DS is exporting
+  private List<String> refUrls = new ArrayList<String>();
+  // Assured replication enabled on DS or not
+  private boolean assuredFlag = false;
+  // DS assured mode (relevant if assured replication enabled)
+  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
+  // DS safe data level (relevant if assured mode is safe data)
+  private byte safeDataLevel = (byte) -1;
+
+  /*
+   * Properties filled only if remote server is a RS
+   */
+  private String serverAddressURL;
   /**
    * When this Handler is related to a remote replication server
    * this collection will contain as many elements as there are
    * LDAP servers connected to the remote replication server.
    */
-  private final Map<Short, LightweightServerHandler> connectedServers =
+  private final Map<Short, LightweightServerHandler> directoryServers =
     new ConcurrentHashMap<Short, LightweightServerHandler>();
-
   /**
    * The time in milliseconds between heartbeats from the replication
    * server.  Zero means heartbeats are off.
    */
   private long heartbeatInterval = 0;
-
   /**
    * The thread that will send heartbeats.
    */
   HeartbeatThread heartbeatThread = null;
-
+  /**
+   * Set when ServerWriter is stopping.
+   */
+  private boolean shutdownWriter = false;
   /**
    * Set when ServerHandler is stopping.
    */
-  private boolean shutdown = false;
-
+  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
   private static final Map<ChangeNumber, ReplServerAckMessageList>
-   changelogsWaitingAcks =
-       new HashMap<ChangeNumber, ReplServerAckMessageList>();
+    changelogsWaitingAcks =
+    new HashMap<ChangeNumber, ReplServerAckMessageList>();
 
   /**
    * Creates a new server handler instance with the provided socket.
@@ -167,13 +200,58 @@
     this.session = session;
     this.maxQueueSize = queueSize;
     this.maxQueueBytesSize = queueSize * 100;
-    this.protocolVersion = ProtocolVersion.currentVersion();
+    this.protocolVersion = ProtocolVersion.getCurrentVersion();
   }
 
   /**
-   * Do the exchange of start messages to know if the remote
-   * server is an LDAP or replication server and to exchange serverID.
-   * Then create the reader and writer thread.
+   * Creates a DSInfo structure representing this remote DS.
+   * @return The DSInfo structure representing this remote DS
+   */
+  public DSInfo toDSInfo()
+  {
+    DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId,
+      status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls);
+
+    return dsInfo;
+  }
+
+  /**
+   * Creates a RSInfo structure representing this remote RS.
+   * @return The RSInfo structure representing this remote RS
+   */
+  public RSInfo toRSInfo()
+  {
+    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+
+    return rsInfo;
+  }
+
+  /**
+   * Do the handshake with either the DS or RS and then create the reader and
+   * writer thread.
+   *
+   * There are 2 possible handshake sequences: DS<->RS and RS<->RS. Each one are
+   * divided into 2 logical consecutive phases (phase 1 and phase 2):
+   *
+   * DS<->RS (DS (always initiating connection) always sends first message):
+   * -------
+   *
+   * phase 1:
+   * DS --- ServerStartMsg ---> RS
+   * DS <--- ReplServerStartMsg --- RS
+   * phase 2:
+   * DS --- StartSessionMsg ---> RS
+   * DS <--- TopologyMsg --- RS
+   *
+   * RS<->RS (RS initiating connection always sends first message):
+   * -------
+   *
+   * phase 1:
+   * RS1 --- ReplServerStartMsg ---> RS2
+   * RS1 <--- ReplServerStartMsg --- RS2
+   * phase 2:
+   * RS1 --- TopologyMsg ---> RS2
+   * RS1 <--- TopologyMsg --- RS2
    *
    * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
    *               null if this is an incoming connection (listen).
@@ -189,94 +267,163 @@
    *                          handler.
    */
   public void start(DN baseDn, short replicationServerId,
-                    String replicationServerURL,
-                    int windowSize, boolean sslEncryption,
-                    ReplicationServer replicationServer)
+    String replicationServerURL,
+    int windowSize, boolean sslEncryption,
+    ReplicationServer replicationServer)
   {
+
+    // The handshake phase must be done by blocking any access to structures
+    // keeping info on connected servers, so that one can safely check for
+    // pre-existence of a server, send a coherent snapshot of known topology
+    // to peers, update the local view of the topology...
+    //
+    // For instance a kind of problem could be that while we connect with a
+    // peer RS, a DS is connecting at the same time and we could publish the
+    // connected DSs to the peer RS forgetting this last DS in the TopologyMsg.
+    //
+    // This method and every others that need to read/make changes to the
+    // structures holding topology for the domain should:
+    // - call ReplicationServerDomain.lock()
+    // - read/modify structures
+    // - call ReplicationServerDomain.release()
+    //
+    // More information is provided in comment of ReplicationServerDomain.lock()
+
+    // If domain already exists, lock it until handshake is finished otherwise
+    // it will be created and locked later in the method
+    if (baseDn != null)
+    {
+      ReplicationServerDomain rsd =
+        replicationServer.getReplicationServerDomain(baseDn, false);
+      if (rsd != null)
+      {
+        try
+        {
+          rsd.lock();
+        } catch (InterruptedException ex)
+        {
+          // Thread interrupted, return.
+          return;
+        }
+      }
+    }
+
+    long oldGenerationId = -100;
+
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
-                " starts a new LS or RS " +
-                ((baseDn == null)?"incoming connection":"outgoing connection"));
+        " starts a new LS or RS " +
+        ((baseDn == null) ? "incoming connection" : "outgoing connection"));
 
     this.replicationServerId = replicationServerId;
-    rcvWindowSizeHalf = windowSize/2;
+    rcvWindowSizeHalf = windowSize / 2;
     maxRcvWindow = windowSize;
     rcvWindow = windowSize;
     long localGenerationId = -1;
-    boolean handshakeOnly = false;
+    ReplServerStartMsg outReplServerStartMsg = null;
+
+    /**
+     * This boolean prevents from logging a polluting error when connection\
+     * aborted from a DS that wanted only to perform handshake phase 1 in order
+     * to determine the best suitable RS:
+     * 1) -> ServerStartMsg
+     * 2) <- ReplServerStartMsg
+     * 3) connection closure
+     */
+    boolean log_error_message = true;
 
     try
     {
-      if (baseDn != null)
+      /*
+       * PROCEDE WITH FIRST PHASE OF HANDSHAKE:
+       * ServerStartMsg then ReplServerStartMsg (with a DS)
+       * OR
+       * ReplServerStartMsg then ReplServerStartMsg (with a RS)
+       */
+
+      if (baseDn != null) // Outgoing connection
+
       {
         // This is an outgoing connection. Publish our start message.
         this.baseDn = baseDn;
 
         // Get or create the ReplicationServerDomain
         replicationServerDomain =
-                replicationServer.getReplicationServerDomain(baseDn, true);
+          replicationServer.getReplicationServerDomain(baseDn, true);
+        if (!replicationServerDomain.hasLock())
+        {
+          try
+          {
+            replicationServerDomain.lock();
+          } catch (InterruptedException ex)
+          {
+            // Thread interrupted, return.
+            return;
+          }
+        }
         localGenerationId = replicationServerDomain.getGenerationId();
 
         ServerState localServerState =
-                replicationServerDomain.getDbServerState();
-        ReplServerStartMessage msg =
-          new ReplServerStartMessage(replicationServerId, replicationServerURL,
-                                    baseDn, windowSize, localServerState,
-                                    protocolVersion, localGenerationId,
-                                    sslEncryption);
+          replicationServerDomain.getDbServerState();
+        outReplServerStartMsg = new ReplServerStartMsg(replicationServerId,
+          replicationServerURL,
+          baseDn, windowSize, localServerState,
+          protocolVersion, localGenerationId,
+          sslEncryption,
+          replicationServer.getGroupId(),
+          replicationServerDomain.
+          getReplicationServer().getDegradedStatusThreshold());
 
-        session.publish(msg);
+        session.publish(outReplServerStartMsg);
       }
 
-      // Wait and process ServerStart or ReplServerStart
-      ReplicationMessage msg = session.receive();
-      if (msg instanceof ServerStartMessage)
+      // Wait and process ServerStartMsg or ReplServerStartMsg
+      ReplicationMsg msg = session.receive();
+      if (msg instanceof ServerStartMsg)
       {
         // The remote server is an LDAP Server.
-        ServerStartMessage receivedMsg = (ServerStartMessage) msg;
+        ServerStartMsg serverStartMsg = (ServerStartMsg) msg;
 
-        generationId = receivedMsg.getGenerationId();
+        generationId = serverStartMsg.getGenerationId();
         protocolVersion = ProtocolVersion.minWithCurrent(
-            receivedMsg.getVersion());
-        serverId = receivedMsg.getServerId();
-        serverURL = receivedMsg.getServerURL();
-        this.baseDn = receivedMsg.getBaseDn();
-        this.serverState = receivedMsg.getServerState();
+          serverStartMsg.getVersion());
+        serverId = serverStartMsg.getServerId();
+        serverURL = serverStartMsg.getServerURL();
+        this.baseDn = serverStartMsg.getBaseDn();
+        this.serverState = serverStartMsg.getServerState();
+        this.groupId = serverStartMsg.getGroupId();
 
-        maxReceiveDelay = receivedMsg.getMaxReceiveDelay();
-        maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
-        maxSendDelay = receivedMsg.getMaxSendDelay();
-        maxSendQueue = receivedMsg.getMaxSendQueue();
-        heartbeatInterval = receivedMsg.getHeartbeatInterval();
-
-        handshakeOnly = receivedMsg.isHandshakeOnly();
+        maxReceiveDelay = serverStartMsg.getMaxReceiveDelay();
+        maxReceiveQueue = serverStartMsg.getMaxReceiveQueue();
+        maxSendDelay = serverStartMsg.getMaxSendDelay();
+        maxSendQueue = serverStartMsg.getMaxSendQueue();
+        heartbeatInterval = serverStartMsg.getHeartbeatInterval();
 
         // The session initiator decides whether to use SSL.
-        sslEncryption = receivedMsg.getSSLEncryption();
+        sslEncryption = serverStartMsg.getSSLEncryption();
 
         if (maxReceiveQueue > 0)
-          restartReceiveQueue = (maxReceiveQueue > 1000 ?
-                                  maxReceiveQueue - 200 :
-                                  maxReceiveQueue*8/10);
+          restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue -
+            200 : maxReceiveQueue * 8 / 10);
         else
           restartReceiveQueue = 0;
 
         if (maxSendQueue > 0)
-          restartSendQueue = (maxSendQueue  > 1000 ? maxSendQueue - 200 :
-            maxSendQueue*8/10);
+          restartSendQueue =
+            (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue * 8 /
+            10);
         else
           restartSendQueue = 0;
 
         if (maxReceiveDelay > 0)
-          restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay -1 :
-            maxReceiveDelay);
+          restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay - 1
+            : maxReceiveDelay);
         else
           restartReceiveDelay = 0;
 
         if (maxSendDelay > 0)
-          restartSendDelay = (maxSendDelay > 10 ?
-                              maxSendDelay -1 :
-                              maxSendDelay);
+          restartSendDelay =
+            (maxSendDelay > 10 ? maxSendDelay - 1 : maxSendDelay);
         else
           restartSendDelay = 0;
 
@@ -289,25 +436,53 @@
 
         // Get or Create the ReplicationServerDomain
         replicationServerDomain =
-                replicationServer.getReplicationServerDomain(this.baseDn, true);
+          replicationServer.getReplicationServerDomain(this.baseDn, true);
 
-        replicationServerDomain.waitDisconnection(receivedMsg.getServerId());
-        replicationServerDomain.mayResetGenerationId();
+        // Hack to be sure that if a server disconnects and reconnect, we
+        // let the reader thread see the closure and cleanup any reference
+        // to old connection
+        replicationServerDomain.waitDisconnection(serverStartMsg.getServerId());
+
+        if (!replicationServerDomain.hasLock())
+        {
+          try
+          {
+            replicationServerDomain.lock();
+          } catch (InterruptedException ex)
+          {
+            // Thread interrupted, return.
+            return;
+          }
+        }
+
+        // Duplicate server ?
+        if (!replicationServerDomain.checkForDuplicateDS(this))
+        {
+          closeSession(null);
+          if ((replicationServerDomain != null) &&
+            replicationServerDomain.hasLock())
+            replicationServerDomain.release();
+          return;
+        }
 
         localGenerationId = replicationServerDomain.getGenerationId();
 
         ServerState localServerState =
-                replicationServerDomain.getDbServerState();
+          replicationServerDomain.getDbServerState();
         // This an incoming connection. Publish our start message
-        ReplServerStartMessage myStartMsg =
-          new ReplServerStartMessage(replicationServerId, replicationServerURL,
-                                    this.baseDn, windowSize, localServerState,
-                                    protocolVersion, localGenerationId,
-                                    sslEncryption);
-        session.publish(myStartMsg);
-        sendWindowSize = receivedMsg.getWindowSize();
+        ReplServerStartMsg replServerStartMsg =
+          new ReplServerStartMsg(replicationServerId, replicationServerURL,
+          this.baseDn, windowSize, localServerState,
+          protocolVersion, localGenerationId,
+          sslEncryption,
+          replicationServer.getGroupId(),
+          replicationServerDomain.
+          getReplicationServer().getDegradedStatusThreshold());
+        session.publish(replServerStartMsg);
+        sendWindowSize = serverStartMsg.getWindowSize();
 
-        /* Until here session is encrypted then it depends on the negotiation */
+        /* Until here session is encrypted then it depends on the
+        negotiation */
         if (!sslEncryption)
         {
           session.stopEncryption();
@@ -315,309 +490,550 @@
 
         if (debugEnabled())
         {
-          Set<String> ss = this.serverState.toStringSet();
-          Set<String> lss =
-                  replicationServerDomain.getDbServerState().toStringSet();
-          TRACER.debugInfo("In " + replicationServerDomain.
-                   getReplicationServer().getMonitorInstanceName() +
-                   ", SH received START from LS serverId=" + serverId +
-                   " baseDN=" + this.baseDn +
-                   " generationId=" + generationId +
-                   " localGenerationId=" + localGenerationId +
-                   " state=" + ss +
-                   " and sent ReplServerStart with state=" + lss);
+          TRACER.debugInfo("In " +
+            replicationServerDomain.getReplicationServer().
+            getMonitorInstanceName() + ":" +
+            "\nSH HANDSHAKE RECEIVED:\n" + serverStartMsg.toString() +
+            "\nAND REPLIED:\n" + replServerStartMsg.toString());
         }
-
-        /*
-         * If we have already a generationID set for the domain
-         * then
-         *   if the connecting replica has not the same
-         *   then it is degraded locally and notified by an error message
-         * else
-         *   we set the generationID from the one received
-         *   (unsaved yet on disk . will be set with the 1rst change received)
-         */
-        if (localGenerationId>0)
-        {
-          if (generationId != localGenerationId)
-          {
-            Message message = NOTE_BAD_GENERATION_ID.get(
-                receivedMsg.getBaseDn().toNormalizedString(),
-                Short.toString(receivedMsg.getServerId()),
-                Long.toString(generationId),
-                Long.toString(localGenerationId));
-
-            ErrorMessage errorMsg =
-              new ErrorMessage(replicationServerId, serverId, message);
-            session.publish(errorMsg);
-          }
-        }
-        else
-        {
-          // We are an empty Replication Server
-          if ((generationId>0)&&(!serverState.isEmpty()))
-          {
-            // If the LDAP server has already sent changes
-            // it is not expected to connect to an empty RS
-            Message message = NOTE_BAD_GENERATION_ID.get(
-                receivedMsg.getBaseDn().toNormalizedString(),
-                Short.toString(receivedMsg.getServerId()),
-                Long.toString(generationId),
-                Long.toString(localGenerationId));
-
-            ErrorMessage errorMsg =
-              new ErrorMessage(replicationServerId, serverId, message);
-            session.publish(errorMsg);
-          }
-          else
-          {
-            replicationServerDomain.setGenerationId(generationId, false);
-          }
-        }
-      }
-      else if (msg instanceof ReplServerStartMessage)
+      } else if (msg instanceof ReplServerStartMsg)
       {
         // The remote server is a replication server
-        ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
+        ReplServerStartMsg inReplServerStartMsg = (ReplServerStartMsg) msg;
         protocolVersion = ProtocolVersion.minWithCurrent(
-            receivedMsg.getVersion());
-        generationId = receivedMsg.getGenerationId();
-        serverId = receivedMsg.getServerId();
-        serverURL = receivedMsg.getServerURL();
+          inReplServerStartMsg.getVersion());
+        generationId = inReplServerStartMsg.getGenerationId();
+        serverId = inReplServerStartMsg.getServerId();
+        serverURL = inReplServerStartMsg.getServerURL();
         int separator = serverURL.lastIndexOf(':');
         serverAddressURL =
-          session.getRemoteAddress() + ":" + serverURL.substring(separator + 1);
+          session.getRemoteAddress() + ":" + serverURL.substring(separator +
+          1);
         serverIsLDAPserver = false;
-        this.baseDn = receivedMsg.getBaseDn();
-        if (baseDn == null)
+        if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+        {
+          // We support connection from a V1 RS
+          // Only V2 protocol has the group id in repl server start message
+          this.groupId = inReplServerStartMsg.getGroupId();
+        }
+        this.baseDn = inReplServerStartMsg.getBaseDn();
+
+        if (baseDn == null) // Reply to incoming RS
+
         {
           // Get or create the ReplicationServerDomain
-          replicationServerDomain = replicationServer.
-                  getReplicationServerDomain(this.baseDn, true);
+          replicationServerDomain =
+            replicationServer.getReplicationServerDomain(this.baseDn, true);
+          if (!replicationServerDomain.hasLock())
+          {
+            try
+            {
+              /**
+               * Take the lock on the domain.
+               * WARNING: Here we try to acquire the lock with a timeout. This
+               * is for preventing a deadlock that may happen if there are cross
+               * connection attempts (for same domain) from this replication
+               * server and from a peer one:
+               * Here is the scenario:
+               * - RS1 connect thread takes the domain lock and starts
+               * connection to RS2
+               * - at the same time RS2 connect thread takes his domain lock and
+               * start connection to RS2
+               * - RS2 listen thread starts processing received
+               * ReplServerStartMsg from RS1 and wants to acquire the lock on
+               * the domain (here) but cannot as RS2 connect thread already has
+               * it
+               * - RS1 listen thread starts processing received
+               * ReplServerStartMsg from RS2 and wants to acquire the lock on
+               * the domain (here) but cannot as RS1 connect thread already has
+               * it
+               * => Deadlock: 4 threads are locked.
+               * So to prevent that in such situation, the listen threads here
+               * will both timeout trying to acquire the lock. The random time
+               * for the timeout should allow on connection attempt to be
+               * aborted whereas the other one should have time to finish in the
+               * same time.
+               * Warning: the minimum time (3s) should be big enough to allow
+               * normal situation connections to terminate. The added random
+               * time should represent a big enough range so that the chance to
+               * have one listen thread timing out a lot before the peer one is
+               * great. When the first listen thread times out, the remote
+               * connect thread should release the lock and allow the peer
+               * listen thread to take the lock it was waiting for and process
+               * the connection attempt.
+               */
+              Random random = new Random();
+              int randomTime = random.nextInt(6); // Random from 0 to 5
+              // Wait at least 3 seconds + (0 to 5 seconds)
+              long timeout = (long) (3000 + ( randomTime * 1000 ) );
+              boolean noTimeout = replicationServerDomain.tryLock(timeout);
+              if (!noTimeout)
+              {
+                // Timeout
+                Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
+                  this.baseDn.toNormalizedString(),
+                  Short.toString(serverId),
+                  Short.toString(replicationServer.getServerId()));
+                closeSession(message);
+                return;
+              }
+            } catch (InterruptedException ex)
+            {
+              // Thread interrupted, return.
+              return;
+            }
+          }
           localGenerationId = replicationServerDomain.getGenerationId();
-          ServerState serverState = replicationServerDomain.getDbServerState();
+          ServerState domServerState =
+            replicationServerDomain.getDbServerState();
 
           // The session initiator decides whether to use SSL.
-          sslEncryption = receivedMsg.getSSLEncryption();
+          sslEncryption = inReplServerStartMsg.getSSLEncryption();
 
           // Publish our start message
-          ReplServerStartMessage outMsg =
-            new ReplServerStartMessage(replicationServerId,
-                                       replicationServerURL,
-                                       this.baseDn, windowSize, serverState,
-                                       protocolVersion,
-                                       localGenerationId,
-                                       sslEncryption);
-          session.publish(outMsg);
-        }
-        else
-        {
-          this.baseDn = baseDn;
-        }
-        this.serverState = receivedMsg.getServerState();
-        sendWindowSize = receivedMsg.getWindowSize();
+          outReplServerStartMsg = new ReplServerStartMsg(replicationServerId,
+            replicationServerURL,
+            this.baseDn, windowSize, domServerState,
+            protocolVersion,
+            localGenerationId,
+            sslEncryption,
+            replicationServer.getGroupId(),
+            replicationServerDomain.
+            getReplicationServer().getDegradedStatusThreshold());
 
-        /* Until here session is encrypted then it depends on the negociation */
-        if (!sslEncryption)
-        {
-          session.stopEncryption();
-        }
+          if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+          {
+            session.publish(outReplServerStartMsg);
+          } else {
+            // We support connection from a V1 RS, send PDU with V1 form
+            session.publish(outReplServerStartMsg,
+              ProtocolVersion.REPLICATION_PROTOCOL_V1);
+          }
 
-        if (debugEnabled())
-        {
-          Set<String> ss = this.serverState.toStringSet();
-          Set<String> lss =
-                  replicationServerDomain.getDbServerState().toStringSet();
-          TRACER.debugInfo("In " + replicationServerDomain.
-                   getReplicationServer().getMonitorInstanceName() +
-                   ", SH received START from RS serverId=" + serverId +
-                   " baseDN=" + this.baseDn +
-                   " generationId=" + generationId +
-                   " localGenerationId=" + localGenerationId +
-                   " state=" + ss +
-                   " and sent ReplServerStart with state=" + lss);
-        }
-
-        // if the remote RS and the local RS have the same genID
-        // then it's ok and nothing else to do
-        if (generationId == localGenerationId)
-        {
           if (debugEnabled())
           {
             TRACER.debugInfo("In " +
-                    replicationServerDomain.getReplicationServer().
-              getMonitorInstanceName() + " RS with serverID=" + serverId +
-              " is connected with the right generation ID");
+              replicationServerDomain.getReplicationServer().
+              getMonitorInstanceName() + ":" +
+              "\nSH HANDSHAKE RECEIVED:\n" + inReplServerStartMsg.toString() +
+              "\nAND REPLIED:\n" + outReplServerStartMsg.toString());
           }
-        }
-        else
+        } else
         {
-          if (localGenerationId>0)
+          // Did the remote RS answer with the DN we provided him ?
+          if (!(this.baseDn.equals(baseDn)))
           {
-            // if the local RS is initialized
-            if (generationId>0)
-            {
-              // if the remote RS is initialized
-              if (generationId != localGenerationId)
-              {
-                // if the 2 RS have different generationID
-                if (replicationServerDomain.getGenerationIdSavedStatus())
-                {
-                  // it the present RS has received changes regarding its
-                  //     gen ID and so won't change without a reset
-                  // then  we are just degrading the peer.
-                  Message message = NOTE_BAD_GENERATION_ID.get(
-                      this.baseDn.toNormalizedString(),
-                      Short.toString(receivedMsg.getServerId()),
-                      Long.toString(generationId),
-                      Long.toString(localGenerationId));
-
-                  ErrorMessage errorMsg =
-                    new ErrorMessage(replicationServerId, serverId, message);
-                  session.publish(errorMsg);
-                }
-                else
-                {
-                  // The present RS has never received changes regarding its
-                  // gen ID.
-                  //
-                  // Example case:
-                  // - we are in RS1
-                  // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
-                  // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
-                  // - we are in RS1 and we receive a START msg from RS2
-                  // - Each RS keeps its genID / is degraded and when LS2 will
-                  //   be populated from LS1 everything will becomes ok.
-                  //
-                  // Issue:
-                  // FIXME : Would it be a good idea in some cases to just
-                  //         set the gen ID received from the peer RS
-                  //         specially if the peer has a non nul state and
-                  //         we have a nul state ?
-                  // replicationServerDomain.
-                  // setGenerationId(generationId, false);
-                  Message message = NOTE_BAD_GENERATION_ID.get(
-                      this.baseDn.toNormalizedString(),
-                      Short.toString(receivedMsg.getServerId()),
-                      Long.toString(generationId),
-                      Long.toString(localGenerationId));
-
-                  ErrorMessage errorMsg =
-                    new ErrorMessage(replicationServerId, serverId, message);
-                  session.publish(errorMsg);
-                }
-              }
-            }
-            else
-            {
-              // The remote RS had no genId while the local one has one genID.
-              // In our start msg, we have just sent our local genID to
-              // the remote RS that will immediatly adopt it
-              // So let's store our local genID as the genID of the remote RS.
-              // It is necessary to do so, in order to not have a 'bad genID'
-              // error when we will try to send updates to the remote RS
-              // (before receiving the infoMsg from the remote RS !!!)
-              generationId = localGenerationId;
-            }
+            Message message = ERR_RS_DN_DOES_NOT_MATCH.get(
+              this.baseDn.toString(),
+              baseDn.toString());
+            closeSession(message);
+            if ((replicationServerDomain != null) &&
+              replicationServerDomain.hasLock())
+              replicationServerDomain.release();
+            return;
           }
-          else
+
+          if (debugEnabled())
           {
-            // The local RS is not initialized - take the one received
-            replicationServerDomain.setGenerationId(generationId, false);
+            TRACER.debugInfo("In " +
+              replicationServerDomain.getReplicationServer().
+              getMonitorInstanceName() + ":" +
+              "\nSH HANDSHAKE SENT:\n" + outReplServerStartMsg.toString() +
+              "\nAND RECEIVED:\n" + inReplServerStartMsg.toString());
           }
         }
-      }
-      else
+        this.serverState = inReplServerStartMsg.getServerState();
+        sendWindowSize = inReplServerStartMsg.getWindowSize();
+
+        // Duplicate server ?
+        if (!replicationServerDomain.checkForDuplicateRS(this))
+        {
+          closeSession(null);
+          if ((replicationServerDomain != null) &&
+            replicationServerDomain.hasLock())
+            replicationServerDomain.release();
+          return;
+        }
+
+        /* Until here session is encrypted then it depends on the
+        negociation */
+        if (!sslEncryption)
+        {
+          session.stopEncryption();
+        }
+      } else
       {
-        // TODO : log error
-        return;   // we did not recognize the message, ignore it
+        // We did not recognize the message, close session as what
+        // can happen after is undetermined and we do not want the server to
+        // be disturbed
+        closeSession(null);
+        if ((replicationServerDomain != null) &&
+          replicationServerDomain.hasLock())
+          replicationServerDomain.release();
+        return;
       }
 
-      // Get or create the ReplicationServerDomain
-      replicationServerDomain = replicationServer.
-              getReplicationServerDomain(this.baseDn,true);
+      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+      { // Only protocol version above V1 has a phase 2 handshake
 
-      if (!handshakeOnly)
-      {
-        boolean started;
-        if (serverIsLDAPserver)
+        /*
+         * NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE:
+         * TopologyMsg then TopologyMsg (with a RS)
+         * OR
+         * StartSessionMsg then TopologyMsg (with a DS)
+         */
+
+        TopologyMsg outTopoMsg = null;
+
+        if (baseDn != null) // Outgoing connection to a RS
+
         {
-          started = replicationServerDomain.startServer(this);
-        }
-        else
-        {
-          started = replicationServerDomain.startReplicationServer(this);
+          // Send our own TopologyMsg to remote RS
+          outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
+          session.publish(outTopoMsg);
         }
 
-        if (started)
+        // Wait and process TopologyMsg or StartSessionMsg
+        log_error_message = false;
+        ReplicationMsg msg2 = session.receive();
+        log_error_message = true;
+        if (msg2 instanceof TopologyMsg)
         {
-          // sendWindow MUST be created before starting the writer
-          sendWindow = new Semaphore(sendWindowSize);
+          // Remote RS sent his topo msg
+          TopologyMsg inTopoMsg = (TopologyMsg) msg2;
 
-          writer = new ServerWriter(session, serverId,
-              this, replicationServerDomain);
-          reader = new ServerReader(session, serverId,
-              this, replicationServerDomain);
+          // CONNECTION WITH A RS
 
-          reader.start();
-          writer.start();
-
-          // Create a thread to send heartbeat messages.
-          if (heartbeatInterval > 0)
-          {
-            heartbeatThread = new HeartbeatThread(
-                "replication Heartbeat to " + serverURL +
-                " for " + this.baseDn,
-                session, heartbeatInterval/3);
-            heartbeatThread.start();
-          }
-
-          DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
-          DirectoryServer.registerMonitorProvider(this);
-        }
-        else
-        {
-          // the connection is not valid, close it.
-          try
+          // if the remote RS and the local RS have the same genID
+          // then it's ok and nothing else to do
+          if (generationId == localGenerationId)
           {
             if (debugEnabled())
             {
               TRACER.debugInfo("In " +
-                  replicationServerDomain.getReplicationServer().
-                  getMonitorInstanceName() + " RS failed to start locally " +
-                  " the connection from serverID="+serverId);
+                replicationServerDomain.getReplicationServer().
+                getMonitorInstanceName() + " RS with serverID=" + serverId +
+                " is connected with the right generation ID");
             }
-            session.close();
-          } catch (IOException e1)
+          } else
           {
-            // ignore
+            if (localGenerationId > 0)
+            {
+              // if the local RS is initialized
+              if (generationId > 0)
+              {
+                // if the remote RS is initialized
+                if (generationId != localGenerationId)
+                {
+                  // if the 2 RS have different generationID
+                  if (replicationServerDomain.getGenerationIdSavedStatus())
+                  {
+                    // it the present RS has received changes regarding its
+                    //     gen ID and so won't change without a reset
+                    // then  we are just degrading the peer.
+                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
+                      this.baseDn.toNormalizedString(),
+                      Short.toString(serverId),
+                      Long.toString(generationId),
+                      Long.toString(localGenerationId));
+                    logError(message);
+                  } else
+                  {
+                    // The present RS has never received changes regarding its
+                    // gen ID.
+                    //
+                    // Example case:
+                    // - we are in RS1
+                    // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
+                    // - RS1 has genId1 from LS1 /genId1 comes from data in
+                    //   suffix
+                    // - we are in RS1 and we receive a START msg from RS2
+                    // - Each RS keeps its genID / is degraded and when LS2
+                    //   will be populated from LS1 everything will become ok.
+                    //
+                    // Issue:
+                    // FIXME : Would it be a good idea in some cases to just
+                    //         set the gen ID received from the peer RS
+                    //         specially if the peer has a non null state and
+                    //         we have a nul state ?
+                    // replicationServerDomain.
+                    // setGenerationId(generationId, false);
+                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
+                      this.baseDn.toNormalizedString(),
+                      Short.toString(serverId),
+                      Long.toString(generationId),
+                      Long.toString(localGenerationId));
+                    logError(message);
+                  }
+                }
+              } else
+              {
+                // The remote RS has no genId. We don't change anything for the
+                // current RS.
+              }
+            } else
+            {
+              // The local RS is not initialized - take the one received
+              // WARNING: Must be done before computing topo message to send
+              // to peer server as topo message must embed valid generation id
+              // for our server
+              oldGenerationId =
+                replicationServerDomain.setGenerationId(generationId, false);
+            }
           }
+
+          if (baseDn == null) // Reply to the RS (incoming connection)
+
+          {
+            // Send our own TopologyMsg to remote RS
+            outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
+            session.publish(outTopoMsg);
+
+            if (debugEnabled())
+            {
+              TRACER.debugInfo("In " +
+                replicationServerDomain.getReplicationServer().
+                getMonitorInstanceName() + ":" +
+                "\nSH HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
+                "\nAND REPLIED:\n" + outTopoMsg.toString());
+            }
+          } else
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugInfo("In " +
+                replicationServerDomain.getReplicationServer().
+                getMonitorInstanceName() + ":" +
+                "\nSH HANDSHAKE SENT:\n" + outTopoMsg.toString() +
+                "\nAND RECEIVED:\n" + inTopoMsg.toString());
+            }
+          }
+
+          // Alright, connected with new RS (either outgoing or incoming
+          // connection): store handler.
+          Map<Short, ServerHandler> connectedRSs =
+            replicationServerDomain.getConnectedRSs();
+          connectedRSs.put(serverId, this);
+
+          // Process TopologyMsg sent by remote RS: store matching new info
+          // (this will also warn our connected DSs of the new received info)
+          replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false);
+
+        } else if (msg2 instanceof StartSessionMsg)
+        {
+          // CONNECTION WITH A DS
+
+          // Process StartSessionMsg sent by remote DS
+          StartSessionMsg startSessionMsg = (StartSessionMsg) msg2;
+
+          this.status = startSessionMsg.getStatus();
+          // Sanity check: is it a valid initial status?
+          if (!isValidInitialStatus(this.status))
+          {
+            Message mesg = ERR_RS_INVALID_INIT_STATUS.get(
+              this.status.toString(), this.baseDn.toString(),
+              Short.toString(serverId));
+            closeSession(mesg);
+            if ((replicationServerDomain != null) &&
+              replicationServerDomain.hasLock())
+              replicationServerDomain.release();
+            return;
+          }
+          this.refUrls = startSessionMsg.getReferralsURLs();
+          this.assuredFlag = startSessionMsg.isAssured();
+          this.assuredMode = startSessionMsg.getAssuredMode();
+          this.safeDataLevel = startSessionMsg.getSafeDataLevel();
+
+          /*
+           * If we have already a generationID set for the domain
+           * then
+           *   if the connecting replica has not the same
+           *   then it is degraded locally and notified by an error message
+           * else
+           *   we set the generationID from the one received
+           *   (unsaved yet on disk . will be set with the 1rst change
+           * received)
+           */
+          if (localGenerationId > 0)
+          {
+            if (generationId != localGenerationId)
+            {
+              Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
+                this.baseDn.toNormalizedString(),
+                Short.toString(serverId),
+                Long.toString(generationId),
+                Long.toString(localGenerationId));
+              logError(message);
+            }
+          } else
+          {
+            // We are an empty Replicationserver
+            if ((generationId > 0) && (!serverState.isEmpty()))
+            {
+              // If the LDAP server has already sent changes
+              // it is not expected to connect to an empty RS
+              Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
+                this.baseDn.toNormalizedString(),
+                Short.toString(serverId),
+                Long.toString(generationId),
+                Long.toString(localGenerationId));
+              logError(message);
+            } else
+            {
+              // The local RS is not initialized - take the one received
+              // WARNING: Must be done before computing topo message to send
+              // to peer server as topo message must embed valid generation id
+              // for our server
+              oldGenerationId =
+                replicationServerDomain.setGenerationId(generationId, false);
+            }
+          }
+
+          // Send our own TopologyMsg to DS
+          outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
+            this.serverId);
+          session.publish(outTopoMsg);
+
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("In " +
+              replicationServerDomain.getReplicationServer().
+              getMonitorInstanceName() + ":" +
+              "\nSH HANDSHAKE RECEIVED:\n" + startSessionMsg.toString() +
+              "\nAND REPLIED:\n" + outTopoMsg.toString());
+          }
+
+          // Alright, connected with new DS: store handler.
+          Map<Short, ServerHandler> connectedDSs =
+            replicationServerDomain.getConnectedDSs();
+          connectedDSs.put(serverId, this);
+
+          // Tell peer DSs a new DS just connected to us
+          // No need to resend topo msg to this just new DS so not null
+          // argument
+          replicationServerDomain.sendTopoInfoToDSs(this);
+          // Tell peer RSs a new DS just connected to us
+          replicationServerDomain.sendTopoInfoToRSs();
+        } else
+        {
+          // We did not recognize the message, close session as what
+          // can happen after is undetermined and we do not want the server to
+          // be disturbed
+          closeSession(null);
+          if ((replicationServerDomain != null) &&
+            replicationServerDomain.hasLock())
+            replicationServerDomain.release();
+          return;
         }
       }
-      else
+
+      /*
+       * FINALIZE INITIALIZATION:
+       * CREATE READER AND WRITER, HEARTBEAT SYSTEM AND UPDATE MONITORING
+       * SYSTEM
+       */
+
+      // Disable timeout for next communications
+      session.setSoTimeout(0);
+      // sendWindow MUST be created before starting the writer
+      sendWindow = new Semaphore(sendWindowSize);
+
+      writer = new ServerWriter(session, serverId,
+        this, replicationServerDomain);
+      reader = new ServerReader(session, serverId,
+        this, replicationServerDomain);
+
+      reader.start();
+      writer.start();
+
+      // Create a thread to send heartbeat messages.
+      if (heartbeatInterval > 0)
       {
-        // For a hanshakeOnly connection, let's only create a reader
-        // in order to detect the connection closure.
-        reader = new ServerReader(session, serverId,
-            this, replicationServerDomain);
-        reader.start();
+        heartbeatThread = new HeartbeatThread(
+          "Replication Heartbeat to DS " + serverURL + " " + serverId +
+          " for " + this.baseDn + " in RS " + replicationServerId,
+          session, heartbeatInterval / 3);
+        heartbeatThread.start();
+      }
+
+      // Create the status analyzer for the domain if not already started
+      if (serverIsLDAPserver)
+      {
+        if (!replicationServerDomain.isRunningStatusAnalyzer())
+        {
+          if (debugEnabled())
+            TRACER.debugInfo("In " + replicationServerDomain.
+              getReplicationServer().
+              getMonitorInstanceName() +
+              " SH for remote server " + this.getMonitorInstanceName() +
+              " is starting status analyzer");
+          replicationServerDomain.startStatusAnalyzer();
+        }
+      }
+
+      DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+      DirectoryServer.registerMonitorProvider(this);
+    } catch (NotSupportedOldVersionPDUException e)
+    {
+      // We do not need to support DS V1 connection, we just accept RS V1
+      // connection:
+      // We just trash the message, log the event for debug purpose and close
+      // the connection
+      if (debugEnabled())
+      TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ":"
+        + e.getMessage());
+      closeSession(null);
+    } catch (Exception e)
+    {
+      // We do not want polluting error log if error is due to normal session
+      // aborted after handshake phase one from a DS that is searching for best
+      // suitable RS.
+      if ( log_error_message || (baseDn != null) )
+      {
+        // some problem happened, reject the connection
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_REPLICATION_SERVER_CONNECTION_ERROR.get(
+          this.getMonitorInstanceName()));
+        mb.append(": " + stackTraceToSingleLineString(e));
+        closeSession(mb.toMessage());
+      } else
+      {
+        closeSession(null);
+      }
+
+      // If generation id of domain was changed, set it back to old value
+      // We may have changed it as it was -1 and we received a value >0 from
+      // peer server and the last topo message sent may have failed being
+      // sent: in that case retrieve old value of generation id for
+      // replication server domain
+      if (oldGenerationId != -100)
+      {
+        replicationServerDomain.setGenerationId(oldGenerationId, false);
       }
     }
-    catch (Exception e)
+
+    // Release domain
+    if ((replicationServerDomain != null) &&
+      replicationServerDomain.hasLock())
+      replicationServerDomain.release();
+  }
+
+  /*
+   * Close the session logging the passed error message
+   * Log nothing if message is null.
+   */
+  private void closeSession(Message msg)
+  {
+    if (msg != null)
     {
-      // some problem happened, reject the connection
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
-          this.getMonitorInstanceName()));
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      try
-      {
-        session.close();
-      } catch (IOException e1)
-      {
-        // ignore
-      }
+      logError(msg);
+    }
+    try
+    {
+      session.close();
+    } catch (IOException ee)
+    {
+      // ignore
     }
   }
 
@@ -720,7 +1136,7 @@
    * @return true is saturated false if not saturated.
    */
   public boolean isSaturated(ChangeNumber changeNumber,
-                             ServerHandler sourceHandler)
+    ServerHandler sourceHandler)
   {
     synchronized (msgQueue)
     {
@@ -730,23 +1146,23 @@
         return true;
 
       if ((sourceHandler.maxSendQueue > 0) &&
-          (size >= sourceHandler.maxSendQueue))
+        (size >= sourceHandler.maxSendQueue))
         return true;
 
       if (!msgQueue.isEmpty())
       {
-        UpdateMessage firstUpdate = msgQueue.first();
+        UpdateMsg firstUpdate = msgQueue.first();
 
         if (firstUpdate != null)
         {
           long timeDiff = changeNumber.getTimeSec() -
-          firstUpdate.getChangeNumber().getTimeSec();
+            firstUpdate.getChangeNumber().getTimeSec();
 
           if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay))
             return true;
 
           if ((sourceHandler.maxSendDelay > 0) &&
-              (timeDiff >= sourceHandler.maxSendDelay))
+            (timeDiff >= sourceHandler.maxSendDelay))
             return true;
         }
       }
@@ -770,22 +1186,22 @@
       if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
         return false;
       if ((source != null) && (source.maxSendQueue > 0) &&
-           (queueSize >= source.restartSendQueue))
+        (queueSize >= source.restartSendQueue))
         return false;
 
       if (!msgQueue.isEmpty())
       {
-        UpdateMessage firstUpdate = msgQueue.first();
-        UpdateMessage lastUpdate = msgQueue.last();
+        UpdateMsg firstUpdate = msgQueue.first();
+        UpdateMsg lastUpdate = msgQueue.last();
 
         if ((firstUpdate != null) && (lastUpdate != null))
         {
           long timeDiff = lastUpdate.getChangeNumber().getTimeSec() -
-               firstUpdate.getChangeNumber().getTimeSec();
+            firstUpdate.getChangeNumber().getTimeSec();
           if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay))
             return false;
-          if ((source != null) && (source.maxSendDelay > 0)
-               && (timeDiff >= source.restartSendDelay))
+          if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >=
+            source.restartSendDelay))
             return false;
         }
       }
@@ -810,34 +1226,28 @@
    */
   public int getRcvMsgQueueSize()
   {
-   synchronized (msgQueue)
-   {
-    /*
-     * When the server is up to date or close to be up to date,
-     * the number of updates to be sent is the size of the receive queue.
-     */
-     if (isFollowing())
-       return msgQueue.count();
-     else
-     {
-       /*
-        * When the server  is not able to follow, the msgQueue
-        * may become too large and therefore won't contain all the
-        * changes. Some changes may only be stored in the backing DB
-        * of the servers.
-        * The total size of teh receieve queue is calculated by doing
-        * the sum of the number of missing changes for every dbHandler.
-        */
-       int totalCount = 0;
-       ServerState dbState = replicationServerDomain.getDbServerState();
-       for (short id : dbState)
-       {
-         totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
-             serverState.getMaxChangeNumber(id));
-       }
-       return totalCount;
-     }
-   }
+    synchronized (msgQueue)
+    {
+      /*
+       * When the server is up to date or close to be up to date,
+       * the number of updates to be sent is the size of the receive queue.
+       */
+      if (isFollowing())
+        return msgQueue.count();
+      else
+      {
+        /*
+         * When the server  is not able to follow, the msgQueue
+         * may become too large and therefore won't contain all the
+         * changes. Some changes may only be stored in the backing DB
+         * of the servers.
+         * The total size of teh receieve queue is calculated by doing
+         * the sum of the number of missing changes for every dbHandler.
+         */
+        ServerState dbState = replicationServerDomain.getDbServerState();
+        return ServerState.diffChanges(dbState, serverState);
+      }
+    }
   }
 
   /**
@@ -859,7 +1269,7 @@
       return 0;
 
     long currentTime = TimeThread.getTime();
-    return ((currentTime - olderUpdateTime)/1000);
+    return ((currentTime - olderUpdateTime) / 1000);
   }
 
   /**
@@ -870,7 +1280,7 @@
    */
   public Long getApproxFirstMissingDate()
   {
-    Long result = (long)0;
+    Long result = (long) 0;
 
     // Get the older CN received
     ChangeNumber olderUpdateCN = getOlderUpdateCN();
@@ -878,7 +1288,7 @@
     {
       // If not present in the local RS db,
       // then approximate with the older update time
-      result=olderUpdateCN.getTime();
+      result = olderUpdateCN.getTime();
     }
     return result;
   }
@@ -892,7 +1302,7 @@
     ChangeNumber olderUpdateCN = getOlderUpdateCN();
     if (olderUpdateCN == null)
       return 0;
-    return  olderUpdateCN.getTime();
+    return olderUpdateCN.getTime();
   }
 
   /**
@@ -909,15 +1319,13 @@
       {
         if (msgQueue.isEmpty())
         {
-          result=null;
-        }
-        else
+          result = null;
+        } else
         {
-          UpdateMessage msg = msgQueue.first();
+          UpdateMsg msg = msgQueue.first();
           result = msg.getChangeNumber();
         }
-      }
-      else
+      } else
       {
         if (lateQueue.isEmpty())
         {
@@ -950,24 +1358,21 @@
                 iteratorSortedSet.add(iterator);
               }
             }
-            UpdateMessage msg = iteratorSortedSet.first().getChange();
+            UpdateMsg msg = iteratorSortedSet.first().getChange();
             result = msg.getChangeNumber();
-          }
-          catch(Exception e)
+          } catch (Exception e)
           {
-            result=null;
-          }
-          finally
+            result = null;
+          } finally
           {
             for (ReplicationIterator iterator : iteratorSortedSet)
             {
               iterator.releaseCursor();
             }
           }
-        }
-        else
+        } else
         {
-          UpdateMessage msg = lateQueue.first();
+          UpdateMsg msg = lateQueue.first();
           result = msg.getChangeNumber();
         }
       }
@@ -995,33 +1400,14 @@
   }
 
   /**
-   * Add an update the list of updates that must be sent to the server
+   * Add an update to the list of updates that must be sent to the server
    * managed by this ServerHandler.
    *
    * @param update The update that must be added to the list of updates.
    * @param sourceHandler The server that sent the update.
    */
-  public void add(UpdateMessage update, ServerHandler sourceHandler)
+  public void add(UpdateMsg update, ServerHandler sourceHandler)
   {
-    /*
-     * Ignore updates from a server that is degraded due to
-     * its inconsistent generationId
-     */
-    long referenceGenerationId = replicationServerDomain.getGenerationId();
-    if ((referenceGenerationId>0) &&
-        (referenceGenerationId != generationId))
-    {
-      logError(ERR_IGNORING_UPDATE_TO.get(
-               this.replicationServerDomain.getReplicationServer().
-                 getMonitorInstanceName(),
-               update.getDn(),
-               this.getMonitorInstanceName(),
-               Long.toString(generationId),
-               Long.toString(referenceGenerationId)));
-
-      return;
-    }
-
     synchronized (msgQueue)
     {
       /*
@@ -1063,10 +1449,10 @@
    * @return the next update that must be sent to the server managed by this
    *         ServerHandler.
    */
-  public UpdateMessage take()
+  public UpdateMsg take()
   {
     boolean interrupted = true;
-    UpdateMessage msg = getnextMessage();
+    UpdateMsg msg = getnextMessage();
 
     /*
      * When we remove a message from the queue we need to check if another
@@ -1080,8 +1466,7 @@
       try
       {
         replicationServerDomain.checkAllSaturation();
-      }
-      catch (IOException e)
+      } catch (IOException e)
       {
       }
     }
@@ -1090,13 +1475,13 @@
     {
       try
       {
-        acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
+        acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS);
         interrupted = false;
       } catch (InterruptedException e)
       {
         // loop until not interrupted
       }
-    } while (((interrupted) || (!acquired )) && (!shutdown));
+    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
     this.incrementOutCount();
     return msg;
   }
@@ -1107,10 +1492,10 @@
    *
    * @return The next update that must be sent to the server.
    */
-  private UpdateMessage getnextMessage()
+  private UpdateMsg getnextMessage()
   {
-    UpdateMessage msg;
-    while (active == true)
+    UpdateMsg msg;
+    while (activeWriter == true)
     {
       if (following == false)
       {
@@ -1157,8 +1542,7 @@
               if (iterator.getChange() != null)
               {
                 iteratorSortedSet.add(iterator);
-              }
-              else
+              } else
               {
                 iterator.releaseCursor();
               }
@@ -1201,8 +1585,7 @@
                 setFollowing(true);
               }
             }
-          }
-          else
+          } else
           {
             msg = lateQueue.first();
             synchronized (msgQueue)
@@ -1212,7 +1595,7 @@
                 /* we finally catch up with the regular queue */
                 setFollowing(true);
                 lateQueue.clear();
-                UpdateMessage msg1;
+                UpdateMsg msg1;
                 do
                 {
                   msg1 = msgQueue.removeFirst();
@@ -1222,8 +1605,7 @@
               }
             }
           }
-        }
-        else
+        } else
         {
           /* get the next change from the lateQueue */
           msg = lateQueue.removeFirst();
@@ -1240,7 +1622,7 @@
             while (msgQueue.isEmpty())
             {
               msgQueue.wait(500);
-              if (!active)
+              if (!activeWriter)
                 return null;
             }
           } catch (InterruptedException e)
@@ -1259,11 +1641,11 @@
           }
         }
       }
-      /*
-       * Need to loop because following flag may have gone to false between
-       * the first check at the beginning of this method
-       * and the second check just above.
-       */
+    /*
+     * Need to loop because following flag may have gone to false between
+     * the first check at the beginning of this method
+     * and the second check just above.
+     */
     }
     return null;
   }
@@ -1274,7 +1656,7 @@
    * @param msg the last update sent.
    * @return boolean indicating if the update was meaningful.
    */
-  public boolean  updateServerState(UpdateMessage msg)
+  public boolean updateServerState(UpdateMsg msg)
   {
     return serverState.update(msg.getChangeNumber());
   }
@@ -1290,45 +1672,6 @@
   }
 
   /**
-   * Stop this server handler processing.
-   */
-  public void stopHandler()
-  {
-    active = false;
-
-    // Stop the remote LSHandler
-    for (LightweightServerHandler lsh : connectedServers.values())
-    {
-      lsh.stopHandler();
-    }
-    connectedServers.clear();
-
-    try
-    {
-      session.close();
-    } catch (IOException e)
-    {
-      // ignore.
-    }
-
-    synchronized (msgQueue)
-    {
-      /* wake up the writer thread on an empty queue so that it disappear */
-      msgQueue.clear();
-      msgQueue.notify();
-      msgQueue.notifyAll();
-    }
-
-    // Stop the heartbeat thread.
-    if (heartbeatThread != null)
-    {
-      heartbeatThread.shutdown();
-    }
-
-    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
-  }
-
-  /**
    * Send the ack to the server that did the original modification.
    *
    * @param changeNumber The ChangeNumber of the update that is acked.
@@ -1336,7 +1679,7 @@
    */
   public void sendAck(ChangeNumber changeNumber) throws IOException
   {
-    AckMessage ack = new AckMessage(changeNumber);
+    AckMsg ack = new AckMsg(changeNumber);
     session.publish(ack);
     outAckCount++;
   }
@@ -1347,7 +1690,7 @@
    * @param message The ack message that was received.
    * @param ackingServerId The  id of the server that acked the change.
    */
-  public void ack(AckMessage message, short ackingServerId)
+  public void ack(AckMsg message, short ackingServerId)
   {
     ChangeNumber changeNumber = message.getChangeNumber();
     AckMessageList ackList;
@@ -1377,7 +1720,7 @@
    * @param message the ack message that was received.
    * @param ackingServerId The  id of the server that acked the change.
    */
-  public static void ackChangelog(AckMessage message, short ackingServerId)
+  public static void ackChangelog(AckMsg message, short ackingServerId)
   {
     ChangeNumber changeNumber = message.getChangeNumber();
     ReplServerAckMessageList ackList;
@@ -1397,9 +1740,9 @@
     if (completedFlag)
     {
       ReplicationServerDomain replicationServerDomain =
-              ackList.getChangelogCache();
+        ackList.getChangelogCache();
       replicationServerDomain.sendAck(changeNumber, false,
-                             ackList.getReplicationServerId());
+        ackList.getReplicationServerId());
     }
   }
 
@@ -1410,11 +1753,11 @@
    * @param nbWaitedAck  The number of ack that must be received before
    *               the update is fully acked.
    */
-  public void addWaitingAck(UpdateMessage update, int nbWaitedAck)
+  public void addWaitingAck(UpdateMsg update, int nbWaitedAck)
   {
     AckMessageList ackList = new AckMessageList(update.getChangeNumber(),
-                                                nbWaitedAck);
-    synchronized(waitingAcks)
+      nbWaitedAck);
+    synchronized (waitingAcks)
     {
       waitingAcks.put(update.getChangeNumber(), ackList);
     }
@@ -1434,16 +1777,16 @@
    *                    the update is fully acked.
    */
   public static void addWaitingAck(
-      UpdateMessage update,
-      short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
-      int nbWaitedAck)
+    UpdateMsg update,
+    short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
+    int nbWaitedAck)
   {
     ReplServerAckMessageList ackList =
-          new ReplServerAckMessageList(update.getChangeNumber(),
-                                      nbWaitedAck,
-                                      ChangelogServerId,
-                                      replicationServerDomain);
-    synchronized(changelogsWaitingAcks)
+      new ReplServerAckMessageList(update.getChangeNumber(),
+      nbWaitedAck,
+      ChangelogServerId,
+      replicationServerDomain);
+    synchronized (changelogsWaitingAcks)
     {
       changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
     }
@@ -1486,7 +1829,7 @@
    */
   @Override
   public void initializeMonitorProvider(MonitorProviderCfg configuration)
-                          throws ConfigException,InitializationException
+    throws ConfigException, InitializationException
   {
     // Nothing to do for now
   }
@@ -1501,12 +1844,12 @@
   public String getMonitorInstanceName()
   {
     String str = baseDn.toString() +
-                 " " + serverURL + " " + String.valueOf(serverId);
+      " " + serverURL + " " + String.valueOf(serverId);
 
     if (serverIsLDAPserver)
-      return "Direct LDAP Server " + str;
+      return "Directory Server " + str;
     else
-      return "Remote Repl Server " + str;
+      return "Remote Replication Server " + str;
   }
 
   /**
@@ -1536,7 +1879,6 @@
   public void updateMonitorData()
   {
     // As long as getUpdateInterval() returns 0, this will never get called
-
   }
 
   /**
@@ -1553,19 +1895,20 @@
     ArrayList<Attribute> attributes = new ArrayList<Attribute>();
     if (serverIsLDAPserver)
     {
-      attributes.add(new Attribute("LDAP-Server", serverURL));
-      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
-          getReplicationServer().getMonitorInstanceName()));
+      attributes.add(Attributes.create("LDAP-Server", serverURL));
+      attributes.add(Attributes.create("connected-to",
+          this.replicationServerDomain.getReplicationServer()
+              .getMonitorInstanceName()));
 
     }
     else
     {
-      attributes.add(new Attribute("ReplicationServer-Server", serverURL));
+      attributes.add(Attributes.create("ReplicationServer-Server",
+          serverURL));
     }
-    attributes.add(new Attribute("server-id",
-                                 String.valueOf(serverId)));
-    attributes.add(new Attribute("base-dn",
-                                 baseDn.toString()));
+    attributes.add(Attributes.create("server-id", String
+        .valueOf(serverId)));
+    attributes.add(Attributes.create("base-dn", baseDn.toString()));
 
     if (serverIsLDAPserver)
     {
@@ -1576,106 +1919,85 @@
 
         // Oldest missing update
         Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
-        if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
+        if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
         {
           Date date = new Date(approxFirstMissingDate);
-          attributes.add(new Attribute("approx-older-change-not-synchronized",
-              date.toString()));
-          attributes.add(
-              new Attribute("approx-older-change-not-synchronized-millis",
-                  String.valueOf(approxFirstMissingDate)));
+          attributes.add(Attributes.create(
+              "approx-older-change-not-synchronized", date.toString()));
+          attributes.add(Attributes.create(
+              "approx-older-change-not-synchronized-millis", String
+                  .valueOf(approxFirstMissingDate)));
         }
 
         // Missing changes
         long missingChanges = md.getMissingChanges(serverId);
-        attributes.add(new Attribute("missing-changes",
-            String.valueOf(missingChanges)));
+        attributes.add(Attributes.create("missing-changes", String
+            .valueOf(missingChanges)));
 
         // Replication delay
         long delay = md.getApproxDelay(serverId);
-        attributes.add(new Attribute("approximate-delay",
-            String.valueOf(delay)));
+        attributes.add(Attributes.create("approximate-delay", String
+            .valueOf(delay)));
       }
-      catch(Exception e)
+      catch (Exception e)
       {
         // TODO: improve the log
         // We failed retrieving the remote monitor data.
-        attributes.add(new Attribute("error",
+        attributes.add(Attributes.create("error",
             stackTraceToSingleLineString(e)));
       }
     }
 
     attributes.add(
-        new Attribute("queue-size", String.valueOf(msgQueue.count())));
+        Attributes.create("queue-size", String.valueOf(msgQueue.count())));
     attributes.add(
-        new Attribute(
+        Attributes.create(
             "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
     attributes.add(
-        new Attribute(
+        Attributes.create(
             "following", String.valueOf(following)));
 
     // Deprecated
-    attributes.add(new Attribute("max-waiting-changes",
-                                  String.valueOf(maxQueueSize)));
-    attributes.add(new Attribute("update-sent",
-                                 String.valueOf(getOutCount())));
-    attributes.add(new Attribute("update-received",
-                                 String.valueOf(getInCount())));
+    attributes.add(Attributes.create("max-waiting-changes", String
+        .valueOf(maxQueueSize)));
+    attributes.add(Attributes.create("update-sent", String
+        .valueOf(getOutCount())));
+    attributes.add(Attributes.create("update-received", String
+        .valueOf(getInCount())));
 
     // Deprecated as long as assured is not exposed
-    attributes.add(new Attribute("update-waiting-acks",
-        String.valueOf(getWaitingAckSize())));
-    attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
-    attributes.add(new Attribute("ack-received",
-                                 String.valueOf(getInAckCount())));
+    attributes.add(Attributes.create("update-waiting-acks", String
+        .valueOf(getWaitingAckSize())));
+    attributes.add(Attributes.create("ack-sent", String
+        .valueOf(getOutAckCount())));
+    attributes.add(Attributes.create("ack-received", String
+        .valueOf(getInAckCount())));
 
     // Window stats
-    attributes.add(new Attribute("max-send-window",
-                                 String.valueOf(sendWindowSize)));
-    attributes.add(new Attribute("current-send-window",
-                                String.valueOf(sendWindow.availablePermits())));
-    attributes.add(new Attribute("max-rcv-window",
-                                 String.valueOf(maxRcvWindow)));
-    attributes.add(new Attribute("current-rcv-window",
-                                 String.valueOf(rcvWindow)));
-
-    /*
-     * FIXME:PGB DEPRECATED
-     *
-    // Missing changes
-    attributes.add(new Attribute("waiting-changes",
-        String.valueOf(getRcvMsgQueueSize())));
-    // Age of oldest missing change
-
-    // Date of the oldest missing change
-    long olderUpdateTime = getOlderUpdateTime();
-    if (olderUpdateTime != 0)
-    {
-      Date date = new Date(getOlderUpdateTime());
-      attributes.add(new Attribute("older-change-not-synchronized",
-                                 String.valueOf(date.toString())));
-    }
-    */
+    attributes.add(Attributes.create("max-send-window", String
+        .valueOf(sendWindowSize)));
+    attributes.add(Attributes.create("current-send-window", String
+        .valueOf(sendWindow.availablePermits())));
+    attributes.add(Attributes.create("max-rcv-window", String
+        .valueOf(maxRcvWindow)));
+    attributes.add(Attributes.create("current-rcv-window", String
+        .valueOf(rcvWindow)));
 
     /* get the Server State */
-    final String ATTR_SERVER_STATE = "server-state";
-    AttributeType type =
-      DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
-    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
+    AttributeBuilder builder = new AttributeBuilder("server-state");
     for (String str : serverState.toStringSet())
     {
-      values.add(new AttributeValue(type,str));
+      builder.add(str);
     }
-    Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
-    attributes.add(attr);
+    attributes.add(builder.toAttribute());
 
     // Encryption
-    attributes.add(new Attribute("ssl-encryption",
-        String.valueOf(session.isEncrypted())));
+    attributes.add(Attributes.create("ssl-encryption", String
+        .valueOf(session.isEncrypted())));
 
     // Data generation
-    attributes.add(new Attribute("generation-id",
-        String.valueOf(generationId)));
+    attributes.add(Attributes.create("generation-id", String
+        .valueOf(generationId)));
 
     return attributes;
   }
@@ -1685,23 +2007,63 @@
    */
   public void shutdown()
   {
-    shutdown  = true;
+    /*
+     * Shutdown ServerWriter
+     */
+    shutdownWriter = true;
+    activeWriter = false;
+    synchronized (msgQueue)
+    {
+      /* wake up the writer thread on an empty queue so that it disappear */
+      msgQueue.clear();
+      msgQueue.notify();
+      msgQueue.notifyAll();
+    }
+
+    /*
+     * Close session to end ServerReader or ServerWriter
+     */
     try
     {
       session.close();
     } catch (IOException e)
     {
-      // Service is closing.
+      // ignore.
     }
 
-    stopHandler();
+    /*
+     * Stop the remote LSHandler
+     */
+    for (LightweightServerHandler lsh : directoryServers.values())
+    {
+      lsh.stopHandler();
+    }
+    directoryServers.clear();
 
+    /*
+     * Stop the heartbeat thread.
+     */
+    if (heartbeatThread != null)
+    {
+      heartbeatThread.shutdown();
+    }
+
+    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+
+    /*
+     * Be sure to wait for ServerWriter and ServerReader death
+     * It does not matter if we try to stop a thread which is us (reader
+     * or writer), but we must not wait for our own thread death.
+     */
     try
     {
-      if (writer != null) {
+      if ((writer != null) && (!(Thread.currentThread().equals(writer))))
+      {
+
         writer.join(SHUTDOWN_JOIN_TIMEOUT);
       }
-      if (reader != null) {
+      if ((reader != null) && (!(Thread.currentThread().equals(reader))))
+      {
         reader.join(SHUTDOWN_JOIN_TIMEOUT);
       }
     } catch (InterruptedException e)
@@ -1726,8 +2088,7 @@
 
 
       localString += serverId + " " + serverURL + " " + baseDn;
-    }
-    else
+    } else
       localString = "Unknown server";
 
     return localString;
@@ -1735,7 +2096,7 @@
 
   /**
    * Decrement the protocol window, then check if it is necessary
-   * to send a WindowMessage and send it.
+   * to send a WindowMsg and send it.
    *
    * @throws IOException when the session becomes unavailable.
    */
@@ -1746,7 +2107,7 @@
   }
 
   /**
-   * Check the protocol window and send WindowMessage if necessary.
+   * Check the protocol window and send WindowMsg if necessary.
    *
    * @throws IOException when the session becomes unavailable.
    */
@@ -1763,7 +2124,7 @@
       }
       if (!flowControl)
       {
-        WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+        WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
         session.publish(msg);
         outAckCount++;
         rcvWindow += rcvWindowSizeHalf;
@@ -1778,7 +2139,7 @@
    * @param windowMsg The Window Message containing the information
    *                  necessary for updating the window size.
    */
-  public void updateWindow(WindowMessage windowMsg)
+  public void updateWindow(WindowMsg windowMsg)
   {
     sendWindow.release(windowMsg.getNumAck());
   }
@@ -1797,107 +2158,204 @@
    *
    * @param msg The message to be processed.
    */
-  public void process(RoutableMessage msg)
+  public void process(RoutableMsg msg)
   {
     if (debugEnabled())
-       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-                 getMonitorInstanceName() +
-                 " SH for remote server " + this.getMonitorInstanceName() +
-                 " processes received msg=" + msg);
+      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
+        getMonitorInstanceName() +
+        " SH for remote server " + this.getMonitorInstanceName() + ":" +
+        "\nprocesses received msg:\n" + msg);
     replicationServerDomain.process(msg, this);
   }
 
   /**
-   * Sends the provided ReplServerInfoMessage.
+   * Sends the provided TopologyMsg to the peer server.
    *
-   * @param info The ReplServerInfoMessage message to be sent.
+   * @param topoMsg The TopologyMsg message to be sent.
    * @throws IOException When it occurs while sending the message,
    *
    */
-   public void sendInfo(ReplServerInfoMessage info)
-   throws IOException
-   {
-     if (debugEnabled())
-       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-           getMonitorInstanceName() +
-           " SH for remote server " + this.getMonitorInstanceName() +
-           " sends message=" + info);
+  public void sendTopoInfo(TopologyMsg topoMsg)
+    throws IOException
+  {
+    // V1 Rs do not support the TopologyMsg
+    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
+          getMonitorInstanceName() +
+          " SH for remote server " + this.getMonitorInstanceName() + ":" +
+          "\nsends message:\n" + topoMsg);
 
-     session.publish(info);
-   }
+      session.publish(topoMsg);
+    }
+  }
 
-   /**
-    *
-    * Sets the replication server from the message provided.
-    *
-    * @param infoMsg The information message.
-    */
-   public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
-   {
-     if (debugEnabled())
-       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-           getMonitorInstanceName() +
-           " SH for remote server " + this.getMonitorInstanceName() +
-           " sets replServerInfo " + "<" + infoMsg + ">");
+  /**
+   * Stores topology information received from a peer RS and that must be kept
+   * in RS handler.
+   *
+   * @param topoMsg The received topology message
+   */
+  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
+  {
+    // Store info for remote RS
+    List<RSInfo> rsInfos = topoMsg.getRsList();
+    // List should only contain RS info for sender
+    RSInfo rsInfo = rsInfos.get(0);
+    generationId = rsInfo.getGenerationId();
+    groupId = rsInfo.getGroupId();
 
-     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
-     generationId = infoMsg.getGenerationId();
+    /**
+     * Store info for DSs connected to the peer RS
+     */
+    List<DSInfo> dsInfos = topoMsg.getDsList();
 
-     synchronized(connectedServers)
-     {
-       // Removes the existing structures
-       for (LightweightServerHandler lsh : connectedServers.values())
-       {
-         lsh.stopHandler();
-       }
-       connectedServers.clear();
+    // Removes the existing structures
+    for (LightweightServerHandler lsh : directoryServers.values())
+    {
+      lsh.stopHandler();
+    }
+    directoryServers.clear();
 
-       // Creates the new structure according to the message received.
-       for (String newConnectedServer : newRemoteLDAPservers)
-       {
-         LightweightServerHandler lsh
-         = new LightweightServerHandler(newConnectedServer, this);
-         lsh.startHandler();
-         connectedServers.put(lsh.getServerId(), lsh);
-       }
-     }
-   }
+    // Creates the new structure according to the message received.
+    for (DSInfo dsInfo : dsInfos)
+    {
+      LightweightServerHandler lsh = new LightweightServerHandler(this,
+        serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
+        dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
+        dsInfo.isAssured(), dsInfo.getAssuredMode(),
+        dsInfo.getSafeDataLevel());
+      lsh.startHandler();
+      directoryServers.put(lsh.getServerId(), lsh);
+    }
+  }
 
-   /**
-    * When this handler is connected to a replication server, specifies if
-    * a wanted server is connected to this replication server.
-    *
-    * @param wantedServer The server we want to know if it is connected
-    * to the replication server represented by this handler.
-    * @return boolean True is the wanted server is connected to the server
-    * represented by this handler.
-    */
-   public boolean isRemoteLDAPServer(short wantedServer)
-   {
-     synchronized(connectedServers)
-     {
-       for (LightweightServerHandler server : connectedServers.values())
-       {
-         if (wantedServer == server.getServerId())
-         {
-           return true;
-         }
-       }
-       return false;
-     }
-   }
+  /**
+   * Process message of a remote server changing his status.
+   * @param csMsg The message containing the new status
+   * @return The new server status of the DS
+   */
+  public ServerStatus processNewStatus(ChangeStatusMsg csMsg)
+  {
 
-   /**
-    * When the handler is connected to a replication server, specifies the
-    * replication server has remote LDAP servers connected to it.
-    *
-    * @return boolean True is the replication server has remote LDAP servers
-    * connected to it.
-    */
-   public boolean hasRemoteLDAPServers()
-   {
-     return !connectedServers.isEmpty();
-   }
+    // Sanity check
+    if (!serverIsLDAPserver)
+    {
+      Message msg =
+        ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(baseDn.toString(),
+        Short.toString(serverId), csMsg.toString());
+      logError(msg);
+      return ServerStatus.INVALID_STATUS;
+    }
+
+    // Get the status the DS just entered
+    ServerStatus reqStatus = csMsg.getNewStatus();
+    // Translate new status to a state machine event
+    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
+    if (event == StatusMachineEvent.INVALID_EVENT)
+    {
+      Message msg = ERR_RS_INVALID_NEW_STATUS.get(reqStatus.toString(),
+        baseDn.toString(), Short.toString(serverId));
+      logError(msg);
+      return ServerStatus.INVALID_STATUS;
+    }
+
+    // Check state machine allows this new status
+    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
+    if (newStatus == ServerStatus.INVALID_STATUS)
+    {
+      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
+        Short.toString(serverId), status.toString(), event.toString());
+      logError(msg);
+      return ServerStatus.INVALID_STATUS;
+    }
+
+    status = newStatus;
+
+    return status;
+  }
+
+  /**
+   * Change the status according to the event generated from the status
+   * analyzer.
+   * @param event The event to be used for new status computation
+   * @return The new status of the DS
+   * @throws IOException When raised by the underlying session
+   */
+  public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
+    throws IOException
+  {
+    // Check state machine allows this new status (Sanity check)
+    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
+    if (newStatus == ServerStatus.INVALID_STATUS)
+    {
+      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
+        Short.toString(serverId), status.toString(), event.toString());
+      logError(msg);
+      // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS
+      // and vice versa. We may are being trying to change the status while for
+      // instance another status has just been entered: e.g a full update has
+      // just been engaged. In that case, just ignore attempt to change the
+      // status
+      return newStatus;
+    }
+
+    // Send message requesting to change the DS status
+    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
+      ServerStatus.INVALID_STATUS);
+
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(
+        "In RS " +
+        replicationServerDomain.getReplicationServer().getServerId() +
+        " Sending change status from status analyzer to " + getServerId() +
+        " for baseDn " + baseDn + ":\n" + csMsg);
+    }
+
+    session.publish(csMsg);
+
+    status = newStatus;
+
+    return newStatus;
+  }
+
+  /**
+   * When this handler is connected to a replication server, specifies if
+   * a wanted server is connected to this replication server.
+   *
+   * @param wantedServer The server we want to know if it is connected
+   * to the replication server represented by this handler.
+   * @return boolean True is the wanted server is connected to the server
+   * represented by this handler.
+   */
+  public boolean isRemoteLDAPServer(short wantedServer)
+  {
+    synchronized (directoryServers)
+    {
+      for (LightweightServerHandler server : directoryServers.values())
+      {
+        if (wantedServer == server.getServerId())
+        {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * When the handler is connected to a replication server, specifies the
+   * replication server has remote LDAP servers connected to it.
+   *
+   * @return boolean True is the replication server has remote LDAP servers
+   * connected to it.
+   */
+  public boolean hasRemoteLDAPServers()
+  {
+    return !directoryServers.isEmpty();
+  }
 
   /**
    * Send an InitializeRequestMessage to the server connected through this
@@ -1906,36 +2364,36 @@
    * @param msg The message to be processed
    * @throws IOException when raised by the underlying session
    */
-  public void send(RoutableMessage msg) throws IOException
+  public void send(RoutableMsg msg) throws IOException
   {
     if (debugEnabled())
-          TRACER.debugInfo("In " +
-              replicationServerDomain.getReplicationServer().
-              getMonitorInstanceName() +
-              " SH for remote server " + this.getMonitorInstanceName() +
-              " sends message=" + msg);
+      TRACER.debugInfo("In " +
+        replicationServerDomain.getReplicationServer().
+        getMonitorInstanceName() +
+        " SH for remote server " + this.getMonitorInstanceName() + ":" +
+        "\nsends message:\n" + msg);
     session.publish(msg);
   }
 
   /**
-   * Send an ErrorMessage to the peer.
+   * Send an ErrorMsg to the peer.
    *
    * @param errorMsg The message to be sent
    * @throws IOException when raised by the underlying session
    */
-  public void sendError(ErrorMessage errorMsg) throws IOException
+  public void sendError(ErrorMsg errorMsg) throws IOException
   {
     session.publish(errorMsg);
   }
 
   /**
-   * Process the reception of a WindowProbe message.
+   * Process the reception of a WindowProbeMsg message.
    *
    * @param  windowProbeMsg The message to process.
    *
    * @throws IOException    When the session becomes unavailable.
    */
-  public void process(WindowProbe windowProbeMsg) throws IOException
+  public void process(WindowProbeMsg windowProbeMsg) throws IOException
   {
     if (rcvWindow > 0)
     {
@@ -1945,11 +2403,10 @@
       // lets update the LDAP server with out current window size and hope
       // that everything will work better in the futur.
       // TODO also log an error message.
-      WindowMessage msg = new WindowMessage(rcvWindow);
+      WindowMsg msg = new WindowMsg(rcvWindow);
       session.publish(msg);
       outAckCount++;
-    }
-    else
+    } else
     {
       // Both the LDAP server and the replication server believes that the
       // window is closed. Lets check the flowcontrol in case we
@@ -1968,27 +2425,6 @@
   }
 
   /**
-   * Resets the generationId for this domain.
-   */
-  public void warnBadGenerationId()
-  {
-    // Notify the peer that it is now invalid regarding the generationId
-    // We are now waiting a startServer message from this server with
-    // a valid generationId.
-    try
-    {
-      Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
-      ErrorMessage errorMsg =
-        new ErrorMessage(serverId, replicationServerId, message);
-      session.publish(errorMsg);
-    }
-    catch (Exception e)
-    {
-      // FIXME Log exception when sending reset error message
-    }
-  }
-
-  /**
    * Sends a message containing a generationId to a peer server.
    * The peer is expected to be a replication server.
    *
@@ -1996,8 +2432,8 @@
    * @throws IOException When it occurs while sending the message,
    *
    */
-  public void forwardGenerationIdToRS(ResetGenerationId msg)
-  throws IOException
+  public void forwardGenerationIdToRS(ResetGenerationIdMsg msg)
+    throws IOException
   {
     session.publish(msg);
   }
@@ -2027,8 +2463,166 @@
    * Return a Set containing the servers known by this replicationServer.
    * @return a set containing the servers known by this replicationServer.
    */
-  public Set<Short> getConnectedServerIds()
+  public Set<Short> getConnectedDirectoryServerIds()
   {
-    return connectedServers.keySet();
+    return directoryServers.keySet();
+  }
+
+  /**
+   * Get the map of connected DSs
+   * (to the RS represented by this server handler).
+   * @return The map of connected DSs
+   */
+  public Map<Short, LightweightServerHandler> getConnectedDSs()
+  {
+    return directoryServers;
+  }
+
+  /**
+   * Order the peer DS server to change his status or close the connection
+   * according to the requested new generation id.
+   * @param newGenId The new generation id to take into account
+   * @throws IOException If IO error occurred.
+   */
+  public void changeStatusForResetGenId(long newGenId)
+    throws IOException
+  {
+    StatusMachineEvent event = null;
+
+    if (newGenId == -1)
+    {
+      // The generation id is being made invalid, let's put the DS
+      // into BAD_GEN_ID_STATUS
+      event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
+    } else
+    {
+      if (newGenId == generationId)
+      {
+        if (status == ServerStatus.BAD_GEN_ID_STATUS)
+        {
+          // This server has the good new reference generation id.
+          // Close connection with him to force his reconnection: DS will
+          // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
+
+          if (debugEnabled())
+          {
+            TRACER.debugInfo(
+              "In RS " +
+              replicationServerDomain.getReplicationServer().getServerId() +
+              ". Closing connection to DS " + getServerId() +
+              " for baseDn " + baseDn + " to force reconnection as new local" +
+              " generation id and remote one match and DS is in bad gen id: " +
+              newGenId);
+          }
+
+          // Connection closure must not be done calling RSD.stopHandler() as it
+          // would rewait the RSD lock that we already must have entering this
+          // method. This would lead to a reentrant lock which we do not want.
+          // So simply close the session, this will make the hang up appear
+          // after the reader thread that took the RSD lock realeases it.
+          try
+          {
+            if (session != null)
+              session.close();
+          } catch (IOException e)
+          {
+            // ignore
+          }
+
+          // NOT_CONNECTED_STATUS is the last one in RS session life: handler
+          // will soon disappear after this method call...
+          status = ServerStatus.NOT_CONNECTED_STATUS;
+          return;
+        } else
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugInfo(
+              "In RS " +
+              replicationServerDomain.getReplicationServer().getServerId() +
+              ". DS " + getServerId() + " for baseDn " + baseDn +
+              " has already generation id " + newGenId +
+              " so no ChangeStatusMsg sent to him.");
+          }
+          return;
+        }
+      } else
+      {
+        // This server has a bad generation id compared to new reference one,
+        // let's put it into BAD_GEN_ID_STATUS
+        event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
+      }
+    }
+
+    if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
+      (status == ServerStatus.FULL_UPDATE_STATUS))
+    {
+      // Prevent useless error message (full update status cannot lead to bad
+      // gen status)
+      Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
+        Short.toString(replicationServerDomain.
+        getReplicationServer().getServerId()),
+        baseDn.toString(),
+        Short.toString(serverId),
+        Long.toString(generationId),
+        Long.toString(newGenId));
+      logError(message);
+      return;
+    }
+
+    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
+
+    if (newStatus == ServerStatus.INVALID_STATUS)
+    {
+      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
+        Short.toString(serverId), status.toString(), event.toString());
+      logError(msg);
+      return;
+    }
+
+    // Send message requesting to change the DS status
+    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
+      ServerStatus.INVALID_STATUS);
+
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(
+        "In RS " +
+        replicationServerDomain.getReplicationServer().getServerId() +
+        " Sending change status for reset gen id to " + getServerId() +
+        " for baseDn " + baseDn + ":\n" + csMsg);
+    }
+
+    session.publish(csMsg);
+
+    status = newStatus;
+  }
+
+  /**
+   * Set the shut down flag to true and returns the previous value of the flag.
+   * @return The previous value of the shut down flag
+   */
+  public boolean engageShutdown()
+  {
+    // Use thread safe boolean
+    return shuttingDown.getAndSet(true);
+  }
+
+  /**
+   * Gets the status of the connected DS.
+   * @return The status of the connected DS.
+   */
+  public ServerStatus getStatus()
+  {
+    return status;
+  }
+
+  /**
+   * Gets the protocol version used with this remote server.
+   * @return The protocol version used with this remote server.
+   */
+  public short getProtocolVersion()
+  {
+    return protocolVersion;
   }
 }

--
Gitblit v1.10.0