From b4f8838b15342670c31753a484abf0129e3c9653 Mon Sep 17 00:00:00 2001
From: jcduff <jcduff@localhost>
Date: Thu, 23 Oct 2008 14:04:24 +0000
Subject: [PATCH] The commit will bring the following features :     - An updated version of the underlying database. BDB JE 3.3 is now used.     - Attribute API refactoring providing a better abstraction and offering improved performances.     - A new GUI called the Control-Panel to replace the Status-Panel: the specifications for this       GUI are available on OpenDS Wiki and contains a link to a mockup.        See <https://www.opends.org/wiki/page/ControlPanelUISpecification>.     - Some changes in the replication protocol to implement "Assured Replication Mode". The        specifications are on OpenDS Wiki at <https://www.opends.org/wiki/page/AssuredMode> and section 7       described some of the replication changes required to support this. Assured Replication is not finished,       but the main replication protocol changes to support it are done. As explained by Gilles on an email on       the Dev mailing list (http://markmail.org/message/46rgo3meq3vriy4a), with these changes the newer versions       of OpenDS may not be able to replicate with OpenDS 1.0 instances.     - Support for Service Tags on the platforms where the functionality is available and enabled. Specifications       are published at <https://www.opends.org/wiki/page/OpenDSServiceTagEnabled>. For more information on       Service Tags see <http://wikis.sun.com/display/ServiceTag/Sun+Service+Tag+FAQ>.     - The Admin Connector service. In order to provide agentry of the OpenDS server at any time, a new service       has been added, dedicated to the administration, configuration and monitoring of the server.       An overview of the Admin Connector service and it's use is available on the       OpenDS wiki <https://www.opends.org/wiki/page/ManagingAdministrationTrafficToTheServer>     - Updates to the various command line tools to support the Admin Connector service.     - Some internal re-architecting of the server to put the foundation of future developments such as virtual       directory services. The new NetworkGroups and WorkFlow internal services which have been specified in       <https://www.opends.org/wiki/page/BasicOperationRoutingThroughNetworkGroup> are now implemented.     - Many bug fixes...

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 2129 ++++++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 1,296 insertions(+), 833 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 1a0faa3..a7c3dae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -25,6 +25,7 @@
  *      Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
+
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 
@@ -49,19 +50,25 @@
 
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.AckMessage;
-import org.opends.server.replication.protocol.ErrorMessage;
-import org.opends.server.replication.protocol.RoutableMessage;
-import org.opends.server.replication.protocol.UpdateMessage;
-import org.opends.server.replication.protocol.ReplServerInfoMessage;
-import org.opends.server.replication.protocol.MonitorMessage;
-import org.opends.server.replication.protocol.MonitorRequestMessage;
-import org.opends.server.replication.protocol.ResetGenerationId;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
 import org.opends.server.util.TimeThread;
 import com.sleepycat.je.DatabaseException;
+import java.util.concurrent.locks.ReentrantLock;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
 
 /**
  * This class define an in-memory cache that will be used to store
@@ -81,8 +88,12 @@
  */
 public class ReplicationServerDomain
 {
+
   private final Object flowControlLock = new Object();
   private final DN baseDn;
+  // The Status analyzer that periodically verifis if the connected DSs are
+  // late or not
+  private StatusAnalyzer statusAnalyzer = null;
 
   /*
    * The following map contains one balanced tree for each replica ID
@@ -94,7 +105,7 @@
    * to this replication server.
    *
    */
-  private final Map<Short, ServerHandler> connectedServers =
+  private final Map<Short, ServerHandler> directoryServers =
     new ConcurrentHashMap<Short, ServerHandler>();
 
   /*
@@ -106,7 +117,6 @@
    * We add new TreeSet in the HashMap when a new replication server register
    * to this replication server.
    */
-
   private final Map<Short, ServerHandler> replicationServers =
     new ConcurrentHashMap<Short, ServerHandler>();
 
@@ -121,7 +131,6 @@
   /* GenerationId management */
   private long generationId = -1;
   private boolean generationIdSavedStatus = false;
-
   /**
    * The tracer object for the debug logger.
    */
@@ -138,12 +147,11 @@
    * The worker thread is awoke on this semaphore, or on timeout.
    */
   Semaphore remoteMonitorResponsesSemaphore;
-
   /**
    * The monitor data consolidated over the topology.
    */
-  private  MonitorData monitorData = new MonitorData();
-  private  MonitorData wrkMonitorData;
+  private MonitorData monitorData = new MonitorData();
+  private MonitorData wrkMonitorData;
 
   /**
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -168,8 +176,8 @@
    * @throws IOException When an IO exception happens during the update
    *         processing.
    */
-  public void put(UpdateMessage update, ServerHandler sourceHandler)
-              throws IOException
+  public void put(UpdateMsg update, ServerHandler sourceHandler)
+    throws IOException
   {
     /*
      * TODO : In case that the source server is a LDAP server this method
@@ -177,7 +185,7 @@
      * other replication server before pushing it to the LDAP servers
      */
 
-    short id  = update.getChangeNumber().getServerId();
+    short id = update.getChangeNumber().getServerId();
     sourceHandler.updateServerState(update);
     sourceHandler.incrementInCount();
 
@@ -188,11 +196,10 @@
       {
         if (sourceHandler.isReplicationServer())
           ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
-                                      this, count - 1);
+            this, count - 1);
         else
           sourceHandler.addWaitingAck(update, count - 1);
-      }
-      else
+      } else
       {
         sourceHandler.sendAck(update.getChangeNumber());
       }
@@ -208,15 +215,14 @@
     DbHandler dbHandler = null;
     synchronized (sourceDbHandlers)
     {
-      dbHandler   = sourceDbHandlers.get(id);
+      dbHandler = sourceDbHandlers.get(id);
       if (dbHandler == null)
       {
         try
         {
           dbHandler = replicationServer.newDbHandler(id, baseDn);
           generationIdSavedStatus = true;
-        }
-        catch (DatabaseException e)
+        } catch (DatabaseException e)
         {
           /*
            * Because of database problem we can't save any more changes
@@ -246,6 +252,26 @@
     {
       for (ServerHandler handler : replicationServers.values())
       {
+        /**
+         * Ignore updates to RS with bad gen id
+         * (no system managed status for a RS)
+         */
+        if ( (generationId>0) && (generationId != handler.getGenerationId()) )
+        {
+          if (debugEnabled())
+            TRACER.debugInfo("In RS " +
+              replicationServer.getServerId() +
+              " for dn " + baseDn.toNormalizedString() + ", update " +
+              update.getChangeNumber().toString() +
+              " will not be sent to replication server " +
+              Short.toString(handler.getServerId()) + " with generation id " +
+              Long.toString(handler.getGenerationId()) +
+              " different from local " +
+              "generation id " + Long.toString(generationId));
+
+          continue;
+        }
+
         handler.add(update, sourceHandler);
       }
     }
@@ -253,7 +279,7 @@
     /*
      * Push the message to the LDAP servers
      */
-    for (ServerHandler handler : connectedServers.values())
+    for (ServerHandler handler : directoryServers.values())
     {
       // don't forward the change to the server that just sent it
       if (handler == sourceHandler)
@@ -261,6 +287,46 @@
         continue;
       }
 
+      /**
+       * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS
+       *
+       * The RSD lock should not be taken here as it is acceptable to have a
+       * delay between the time the server has a wrong status and the fact we
+       * detect it: the updates that succeed to pass during this time will have
+       * no impact on remote server. But it is interesting to not saturate
+       * uselessly the network if the updates are not necessary so this check to
+       * stop sending updates is interesting anyway. Not taking the RSD lock
+       * allows to have better performances in normal mode (most of the time).
+       */
+      ServerStatus dsStatus = handler.getStatus();
+      if ( (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
+        (dsStatus == ServerStatus.FULL_UPDATE_STATUS) )
+      {
+        if (debugEnabled())
+        {
+          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+            TRACER.debugInfo("In RS " +
+              replicationServer.getServerId() +
+              " for dn " + baseDn.toNormalizedString() + ", update " +
+              update.getChangeNumber().toString() +
+              " will not be sent to directory server " +
+              Short.toString(handler.getServerId()) + " with generation id " +
+              Long.toString(handler.getGenerationId()) +
+              " different from local " +
+              "generation id " + Long.toString(generationId));
+          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+            TRACER.debugInfo("In RS " +
+              replicationServer.getServerId() +
+              " for dn " + baseDn.toNormalizedString() + ", update " +
+              update.getChangeNumber().toString() +
+              " will not be sent to directory server " +
+              Short.toString(handler.getServerId()) +
+              " as it is in full update");
+        }
+
+        continue;
+      }
+
       handler.add(update, sourceHandler);
     }
 
@@ -273,7 +339,7 @@
    */
   public void waitDisconnection(short serverId)
   {
-    if (connectedServers.containsKey(serverId))
+    if (directoryServers.containsKey(serverId))
     {
       // try again
       try
@@ -286,51 +352,6 @@
   }
 
   /**
-   * Create initialize context necessary for finding the changes
-   * that must be sent to a given LDAP or replication server.
-   *
-   * @param handler handler for the server that must be started
-   * @throws Exception when method has failed
-   * @return A boolean indicating if the start was successfull.
-   */
-  public boolean startServer(ServerHandler handler) throws Exception
-  {
-    /*
-     * create the balanced tree that will be used to forward changes
-     */
-    synchronized (connectedServers)
-    {
-      ServerHandler oldHandler = connectedServers.get(handler.getServerId());
-
-      if (connectedServers.containsKey(handler.getServerId()))
-      {
-        // looks like two LDAP servers have the same serverId
-        // log an error message and drop this connection.
-        Message message = ERR_DUPLICATE_SERVER_ID.get(
-            oldHandler.toString(), handler.toString(), handler.getServerId());
-        logError(message);
-        return false;
-      }
-      connectedServers.put(handler.getServerId(), handler);
-
-      // It can be that the server that connects here is the
-      // first server connected for a domain.
-      // In that case, we will establish the appriopriate connections
-      // to the other repl servers for this domain and receive
-      // their ReplServerInfo messages.
-      // FIXME: Is it necessary to end this above processing BEFORE listening
-      //        to incoming messages for that domain ? But the replica
-      //        would raise Read Timeout for replica that connects.
-
-      // Update the remote replication servers with our list
-      // of connected LDAP servers
-      sendReplServerInfo();
-
-      return true;
-    }
-  }
-
-  /**
    * Stop operations with a list of servers.
    *
    * @param replServers the replication servers for which
@@ -346,43 +367,105 @@
   }
 
   /**
+   * Checks that a DS is not connected with same id.
+   *
+   * @param handler the DS we want to check
+   * @return true if this is not a duplicate server
+   */
+  public boolean checkForDuplicateDS(ServerHandler handler)
+  {
+    ServerHandler oldHandler = directoryServers.get(handler.getServerId());
+
+    if (directoryServers.containsKey(handler.getServerId()))
+    {
+      // looks like two LDAP servers have the same serverId
+      // log an error message and drop this connection.
+      Message message = ERR_DUPLICATE_SERVER_ID.get(
+        replicationServer.getMonitorInstanceName(), oldHandler.toString(),
+        handler.toString(), handler.getServerId());
+      logError(message);
+      return false;
+    }
+    return true;
+  }
+
+  /**
    * Stop operations with a given server.
    *
    * @param handler the server for which we want to stop operations
    */
   public void stopServer(ServerHandler handler)
   {
-    if (debugEnabled())
-      TRACER.debugInfo(
-        "In RS " + this.replicationServer.getMonitorInstanceName() +
-        " for " + baseDn + " " +
-        " stopServer " + handler.getMonitorInstanceName());
+    /*
+     * We must prevent deadlock on replication server domain lock, when for
+     * instance this code is called from dying ServerReader but also dying
+     * ServerWriter at the same time, or from a thread that wants to shut down
+     * the handler. So use a thread safe flag to know if the job must be done
+     * or not (is already being processed or not).
+     */
+    if (!handler.engageShutdown())
+    // Only do this once (prevent other thread to enter here again)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In RS " + this.replicationServer.getMonitorInstanceName() +
+          " for " + baseDn + " " +
+          " stopServer " + handler.getMonitorInstanceName());
 
+      try
+      {
+        // Acquire lock on domain (see more details in comment of start()
+        // method of ServerHandler)
+        lock();
+      } catch (InterruptedException ex)
+      {
+        // Try doing job anyway...
+      }
 
       if (handler.isReplicationServer())
       {
         if (replicationServers.containsValue(handler))
         {
           replicationServers.remove(handler.getServerId());
-          handler.stopHandler();
+          handler.shutdown();
 
-          // Update the remote replication servers with our list
-          // of connected LDAP servers
-          sendReplServerInfo();
+          // Check if generation id has to be resetted
+          mayResetGenerationId();
+          // Warn our DSs that a RS or DS has quit (does not use this
+          // handler as already removed from list)
+          sendTopoInfoToDSs(null);
         }
-      }
-      else
+      } else
       {
-        if (connectedServers.containsValue(handler))
+        if (directoryServers.containsValue(handler))
         {
-          connectedServers.remove(handler.getServerId());
-          handler.stopHandler();
+          // If this is the last DS for the domain, shutdown the status analyzer
+          if (directoryServers.size() == 1)
+          {
+            if (debugEnabled())
+              TRACER.debugInfo("In " +
+                replicationServer.getMonitorInstanceName() +
+                " remote server " + handler.getMonitorInstanceName() +
+                " is the last DS to be stopped: stopping status analyzer");
+            stopStatusAnalyzer();
+          }
 
+          directoryServers.remove(handler.getServerId());
+          handler.shutdown();
+
+          // Check if generation id has to be resetted
+          mayResetGenerationId();
           // Update the remote replication servers with our list
           // of connected LDAP servers
-          sendReplServerInfo();
+          sendTopoInfoToRSs();
+          // Warn our DSs that a RS or DS has quit (does not use this
+          // handler as already removed from list)
+          sendTopoInfoToDSs(null);
         }
       }
+
+      release();
+    }
   }
 
   /**
@@ -403,7 +486,7 @@
     // topology and the generationId has never been saved, then we can reset
     // it and the next LDAP server to connect will become the new reference.
     boolean lDAPServersConnectedInTheTopology = false;
-    if (connectedServers.isEmpty())
+    if (directoryServers.isEmpty())
     {
       for (ServerHandler rsh : replicationServers.values())
       {
@@ -411,12 +494,11 @@
         {
           if (debugEnabled())
             TRACER.debugInfo(
-                "In RS " + this.replicationServer.getMonitorInstanceName() +
-                " for " + baseDn + " " +
-                " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
-                " thas different genId");
-        }
-        else
+              "In RS " + this.replicationServer.getMonitorInstanceName() +
+              " for " + baseDn + " " +
+              " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
+              " that has different genId");
+        } else
         {
           if (rsh.hasRemoteLDAPServers())
           {
@@ -424,15 +506,15 @@
 
             if (debugEnabled())
               TRACER.debugInfo(
-                  "In RS " + this.replicationServer.getMonitorInstanceName() +
-                  " for " + baseDn + " " +
-                  " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
-              " has servers connected to it - will not reset generationId");
+                "In RS " + this.replicationServer.getMonitorInstanceName() +
+                " for " + baseDn + " " +
+                " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
+                " has servers connected to it - will not reset generationId");
+            break;
           }
         }
       }
-    }
-    else
+    } else
     {
       lDAPServersConnectedInTheTopology = true;
       if (debugEnabled())
@@ -442,73 +524,59 @@
           " has servers connected to it - will not reset generationId");
     }
 
-    if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus)
-        && (generationId != -1))
+    if ((!lDAPServersConnectedInTheTopology) &&
+      (!this.generationIdSavedStatus) &&
+      (generationId != -1))
     {
       setGenerationId(-1, false);
     }
   }
 
   /**
-   * Create initialize context necessary for finding the changes
-   * that must be sent to a given replication server.
+   * Checks that a RS is not already connected.
    *
-   * @param handler the server ID to which we want to forward changes
-   * @throws Exception in case of errors
-   * @return A boolean indicating if the start was successfull.
+   * @param handler the RS we want to check
+   * @return true if this is not a duplicate server
    */
-  public boolean startReplicationServer(ServerHandler handler) throws Exception
+  public boolean checkForDuplicateRS(ServerHandler handler)
   {
-    /*
-     * create the balanced tree that will be used to forward changes
-     */
-    synchronized (replicationServers)
+    ServerHandler oldHandler = replicationServers.get(handler.getServerId());
+    if ((oldHandler != null))
     {
-      ServerHandler oldHandler = replicationServers.get(handler.getServerId());
-      if ((oldHandler != null))
+      if (oldHandler.getServerAddressURL().equals(
+        handler.getServerAddressURL()))
       {
-        if (oldHandler.getServerAddressURL().equals(
-            handler.getServerAddressURL()))
-        {
-          // this is the same server, this means that our ServerStart messages
-          // have been sent at about the same time and 2 connections
-          // have been established.
-          // Silently drop this connection.
-        }
-        else
-        {
-          // looks like two replication servers have the same serverId
-          // log an error message and drop this connection.
-          Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.
-              get(oldHandler.getServerAddressURL(),
-                  handler.getServerAddressURL(), handler.getServerId());
-          logError(message);
-        }
-        return false;
+        // this is the same server, this means that our ServerStart messages
+        // have been sent at about the same time and 2 connections
+        // have been established.
+        // Silently drop this connection.
+        } else
+      {
+        // looks like two replication servers have the same serverId
+        // log an error message and drop this connection.
+        Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
+          replicationServer.getMonitorInstanceName(), oldHandler.
+          getServerAddressURL(), handler.getServerAddressURL(),
+          handler.getServerId());
+        logError(message);
       }
-      replicationServers.put(handler.getServerId(), handler);
-
-      // Update this server with the list of LDAP servers
-      // already connected
-      handler.sendInfo(
-          new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
-
-      return true;
+      return false;
     }
+    return true;
   }
 
-/**
- * Get the next update that need to be sent to a given LDAP server.
- * This call is blocking when no update is available or when dependencies
- * do not allow to send the next available change
- *
- * @param  handler  The server handler for the target directory server.
- *
- * @return the update that must be forwarded
- */
-  public UpdateMessage take(ServerHandler handler)
+  /**
+   * Get the next update that need to be sent to a given LDAP server.
+   * This call is blocking when no update is available or when dependencies
+   * do not allow to send the next available change
+   *
+   * @param  handler  The server handler for the target directory server.
+   *
+   * @return the update that must be forwarded
+   */
+  public UpdateMsg take(ServerHandler handler)
   {
-    UpdateMessage msg;
+    UpdateMsg msg;
     /*
      * Get the balanced tree that we use to sort the changes to be
      * sent to the replica from the cookie
@@ -547,7 +615,6 @@
     return mySet;
   }
 
-
   /**
    * Return a Set containing the servers known by this replicationServer.
    * @return a set containing the servers known by this replicationServer.
@@ -567,7 +634,7 @@
   {
     List<String> mySet = new ArrayList<String>(0);
 
-    for (ServerHandler handler : connectedServers.values())
+    for (ServerHandler handler : directoryServers.values())
     {
       mySet.add(String.valueOf(handler.getServerId()));
     }
@@ -577,7 +644,7 @@
   /**
    * Creates and returns an iterator.
    * When the iterator is not used anymore, the caller MUST call the
-   * ReplicationIterator.releaseCursor() method to free the ressources
+   * ReplicationIterator.releaseCursor() method to free the resources
    * and locks used by the ReplicationIterator.
    *
    * @param serverId Identifier of the server for which the iterator is created.
@@ -586,7 +653,7 @@
    * for the provided server Id.
    */
   public ReplicationIterator getChangelogIterator(short serverId,
-                    ChangeNumber changeNumber)
+    ChangeNumber changeNumber)
   {
     DbHandler handler = sourceDbHandlers.get(serverId);
     if (handler == null)
@@ -595,10 +662,9 @@
     try
     {
       return handler.generateIterator(changeNumber);
-    }
-    catch (Exception e)
+    } catch (Exception e)
     {
-     return null;
+      return null;
     }
   }
 
@@ -630,17 +696,17 @@
    * Sets the provided DbHandler associated to the provided serverId.
    *
    * @param serverId  the serverId for the server to which is
-   *                  associated the Dbhandler.
+   *                  associated the DbHandler.
    * @param dbHandler the dbHandler associated to the serverId.
    *
    * @throws DatabaseException If a database error happened.
    */
   public void setDbHandler(short serverId, DbHandler dbHandler)
-  throws DatabaseException
+    throws DatabaseException
   {
     synchronized (sourceDbHandlers)
     {
-      sourceDbHandlers.put(serverId , dbHandler);
+      sourceDbHandlers.put(serverId, dbHandler);
     }
   }
 
@@ -651,30 +717,29 @@
    */
   private int NumServers()
   {
-    return replicationServers.size() + connectedServers.size();
+    return replicationServers.size() + directoryServers.size();
   }
 
-
   /**
    * Add an ack to the list of ack received for a given change.
    *
    * @param message The ack message received.
    * @param fromServerId The identifier of the server that sent the ack.
    */
-  public void ack(AckMessage message, short fromServerId)
+  public void ack(AckMsg message, short fromServerId)
   {
     /*
      * there are 2 possible cases here :
      *  - the message that was acked comes from a server to which
      *    we are directly connected.
-     *    In this case, we can find the handler from the connectedServers map
+     *    In this case, we can find the handler from the directoryServers map
      *  - the message that was acked comes from a server to which we are not
      *    connected.
      *    In this case we need to find the replication server that forwarded
      *    the change and send back the ack to this server.
      */
-    ServerHandler handler = connectedServers.get(
-                                       message.getChangeNumber().getServerId());
+    ServerHandler handler = directoryServers.get(
+      message.getChangeNumber().getServerId());
     if (handler != null)
       handler.ack(message, fromServerId);
     else
@@ -690,17 +755,16 @@
    * @param senderHandler The handler of the server that published this message.
    * @return The list of destination handlers.
    */
-  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
-      ServerHandler senderHandler)
+  protected List<ServerHandler> getDestinationServers(RoutableMsg msg,
+    ServerHandler senderHandler)
   {
     List<ServerHandler> servers =
       new ArrayList<ServerHandler>();
 
-    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
+    if (msg.getDestination() == RoutableMsg.THE_CLOSEST_SERVER)
     {
       // TODO Import from the "closest server" to be implemented
-    }
-    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
+    } else if (msg.getDestination() == RoutableMsg.ALL_SERVERS)
     {
       if (!senderHandler.isReplicationServer())
       {
@@ -716,24 +780,22 @@
       }
 
       // Sends to all connected LDAP servers
-      for (ServerHandler destinationHandler : connectedServers.values())
+      for (ServerHandler destinationHandler : directoryServers.values())
       {
         // Don't loop on the sender
         if (destinationHandler == senderHandler)
           continue;
         servers.add(destinationHandler);
       }
-    }
-    else
+    } else
     {
       // Destination is one server
       ServerHandler destinationHandler =
-        connectedServers.get(msg.getDestination());
+        directoryServers.get(msg.getDestination());
       if (destinationHandler != null)
       {
         servers.add(destinationHandler);
-      }
-      else
+      } else
       {
         // the targeted server is NOT connected
         // Let's search for THE changelog server that MAY
@@ -763,50 +825,49 @@
    * @param senderHandler The server handler of the server that emitted
    * the message.
    */
-  public void process(RoutableMessage msg, ServerHandler senderHandler)
+  public void process(RoutableMsg msg, ServerHandler senderHandler)
   {
 
     // Test the message for which a ReplicationServer is expected
     // to be the destination
     if (msg.getDestination() == this.replicationServer.getServerId())
     {
-      if (msg instanceof ErrorMessage)
+      if (msg instanceof ErrorMsg)
       {
-        ErrorMessage errorMsg = (ErrorMessage)msg;
+        ErrorMsg errorMsg = (ErrorMsg) msg;
         logError(ERR_ERROR_MSG_RECEIVED.get(
-            errorMsg.getDetails()));
-      }
-      else if (msg instanceof MonitorRequestMessage)
+          errorMsg.getDetails()));
+      } else if (msg instanceof MonitorRequestMsg)
       {
-        MonitorRequestMessage replServerMonitorRequestMsg =
-          (MonitorRequestMessage) msg;
+        MonitorRequestMsg replServerMonitorRequestMsg =
+          (MonitorRequestMsg) msg;
 
-        MonitorMessage monitorMsg =
-          new MonitorMessage(
-              replServerMonitorRequestMsg.getDestination(),
-              replServerMonitorRequestMsg.getsenderID());
+        MonitorMsg monitorMsg =
+          new MonitorMsg(
+          replServerMonitorRequestMsg.getDestination(),
+          replServerMonitorRequestMsg.getsenderID());
 
         // Populate for each connected LDAP Server
         // from the states stored in the serverHandler.
         // - the server state
         // - the older missing change
-        for (ServerHandler lsh : this.connectedServers.values())
+        for (ServerHandler lsh : this.directoryServers.values())
         {
           monitorMsg.setServerState(
-              lsh.getServerId(),
-              lsh.getServerState(),
-              lsh.getApproxFirstMissingDate(),
-              true);
+            lsh.getServerId(),
+            lsh.getServerState(),
+            lsh.getApproxFirstMissingDate(),
+            true);
         }
 
         // Same for the connected RS
         for (ServerHandler rsh : this.replicationServers.values())
         {
           monitorMsg.setServerState(
-              rsh.getServerId(),
-              rsh.getServerState(),
-              rsh.getApproxFirstMissingDate(),
-              false);
+            rsh.getServerId(),
+            rsh.getServerState(),
+            rsh.getApproxFirstMissingDate(),
+            false);
         }
 
         // Populate the RS state in the msg from the DbState
@@ -816,26 +877,23 @@
         try
         {
           senderHandler.send(monitorMsg);
-        }
-        catch(Exception e)
+        } catch (Exception e)
         {
           // We log the error. The requestor will detect a timeout or
           // any other failure on the connection.
           logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
-              Short.toString((msg.getDestination()))));
+            Short.toString((msg.getDestination()))));
         }
-      }
-      else if (msg instanceof MonitorMessage)
+      } else if (msg instanceof MonitorMsg)
       {
-        MonitorMessage monitorMsg =
-          (MonitorMessage) msg;
+        MonitorMsg monitorMsg =
+          (MonitorMsg) msg;
 
         receivesMonitorDataResponse(monitorMsg);
-      }
-      else
+      } else
       {
         logError(NOTE_ERR_ROUTING_TO_SERVER.get(
-            msg.getClass().getCanonicalName()));
+          msg.getClass().getCanonicalName()));
       }
       return;
     }
@@ -846,21 +904,20 @@
     {
       MessageBuilder mb = new MessageBuilder();
       mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
-      mb.append(" In Replication Server=" + this.replicationServer.
-          getMonitorInstanceName());
+      mb.append(" In Replication Server=" +
+        this.replicationServer.getMonitorInstanceName());
       mb.append(" domain =" + this.baseDn);
       mb.append(" unroutable message =" + msg.toString());
       mb.append(" routing table is empty");
-      ErrorMessage errMsg = new ErrorMessage(
-          this.replicationServer.getServerId(),
-          msg.getsenderID(),
-          mb.toMessage());
+      ErrorMsg errMsg = new ErrorMsg(
+        this.replicationServer.getServerId(),
+        msg.getsenderID(),
+        mb.toMessage());
       logError(mb.toMessage());
       try
       {
         senderHandler.send(errMsg);
-      }
-      catch(IOException ioe)
+      } catch (IOException ioe)
       {
         // TODO Handle error properly (sender timeout in addition)
         /*
@@ -871,18 +928,16 @@
         mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
         mb2.append(stackTraceToSingleLineString(ioe));
         logError(mb2.toMessage());
-        senderHandler.shutdown();
+        stopServer(senderHandler);
       }
-    }
-    else
+    } else
     {
       for (ServerHandler targetHandler : servers)
       {
         try
         {
           targetHandler.send(msg);
-        }
-        catch(IOException ioe)
+        } catch (IOException ioe)
         {
           /*
            * An error happened trying the send a routabled message
@@ -899,733 +954,1141 @@
           MessageBuilder mb1 = new MessageBuilder();
           mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
           mb1.append("serverID:" + msg.getDestination());
-          ErrorMessage errMsg = new ErrorMessage(
-              msg.getsenderID(), mb1.toMessage());
+          ErrorMsg errMsg = new ErrorMsg(
+            msg.getsenderID(), mb1.toMessage());
           try
           {
             senderHandler.send(errMsg);
-          }
-          catch(IOException ioe1)
+          } catch (IOException ioe1)
           {
             // an error happened on the sender session trying to recover
             // from an error on the receiver session.
             // We don't have much solution left beside closing the sessions.
-            senderHandler.shutdown();
-            targetHandler.shutdown();
+            stopServer(senderHandler);
+            stopServer(targetHandler);
           }
-          // TODO Handle error properly (sender timeout in addition)
+        // TODO Handle error properly (sender timeout in addition)
         }
       }
     }
 
   }
 
-    /**
-     * Send back an ack to the server that sent the change.
-     *
-     * @param changeNumber The ChangeNumber of the change that must be acked.
-     * @param isLDAPserver This boolean indicates if the server that sent the
-     *                     change was an LDAP server or a ReplicationServer.
-     */
-    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+  /**
+   * Send back an ack to the server that sent the change.
+   *
+   * @param changeNumber The ChangeNumber of the change that must be acked.
+   * @param isLDAPserver This boolean indicates if the server that sent the
+   *                     change was an LDAP server or a ReplicationServer.
+   */
+  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
+  {
+    short serverId = changeNumber.getServerId();
+    sendAck(changeNumber, isLDAPserver, serverId);
+  }
+
+  /**
+   *
+   * Send back an ack to a server that sent the change.
+   *
+   * @param changeNumber The ChangeNumber of the change that must be acked.
+   * @param isLDAPserver This boolean indicates if the server that sent the
+   *                     change was an LDAP server or a ReplicationServer.
+   * @param serverId     The identifier of the server from which we
+   *                     received the change..
+   */
+  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
+    short serverId)
+  {
+    ServerHandler handler;
+    if (isLDAPserver)
+      handler = directoryServers.get(serverId);
+    else
+      handler = replicationServers.get(serverId);
+
+    // TODO : check for null handler and log error
+    try
     {
-      short serverId = changeNumber.getServerId();
-      sendAck(changeNumber, isLDAPserver, serverId);
+      handler.sendAck(changeNumber);
+    } catch (IOException e)
+    {
+      /*
+       * An error happened trying the send back an ack to this server.
+       * Log an error and close the connection to this server.
+       */
+      MessageBuilder mb = new MessageBuilder();
+      mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString()));
+      mb.append(stackTraceToSingleLineString(e));
+      logError(mb.toMessage());
+      stopServer(handler);
+    }
+  }
+
+  /**
+   * Shutdown this ReplicationServerDomain.
+   */
+  public void shutdown()
+  {
+    // Close session with other changelogs
+    for (ServerHandler serverHandler : replicationServers.values())
+    {
+      stopServer(serverHandler);
     }
 
-    /**
-     *
-     * Send back an ack to a server that sent the change.
-     *
-     * @param changeNumber The ChangeNumber of the change that must be acked.
-     * @param isLDAPserver This boolean indicates if the server that sent the
-     *                     change was an LDAP server or a ReplicationServer.
-     * @param serverId     The identifier of the server from which we
-     *                     received the change..
-     */
-    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
-        short serverId)
+    // Close session with other LDAP servers
+    for (ServerHandler serverHandler : directoryServers.values())
     {
-      ServerHandler handler;
-      if (isLDAPserver)
-        handler = connectedServers.get(serverId);
-      else
-        handler = replicationServers.get(serverId);
+      stopServer(serverHandler);
+    }
 
-      // TODO : check for null handler and log error
+    // Shutdown the dbHandlers
+    synchronized (sourceDbHandlers)
+    {
+      for (DbHandler dbHandler : sourceDbHandlers.values())
+      {
+        dbHandler.shutdown();
+      }
+      sourceDbHandlers.clear();
+    }
+  }
+
+  /**
+   * Returns the ServerState describing the last change from this replica.
+   *
+   * @return The ServerState describing the last change from this replica.
+   */
+  public ServerState getDbServerState()
+  {
+    ServerState serverState = new ServerState();
+    for (DbHandler db : sourceDbHandlers.values())
+    {
+      serverState.update(db.getLastChange());
+    }
+    return serverState;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString()
+  {
+    return "ReplicationServerDomain " + baseDn;
+  }
+
+  /**
+   * Check if some server Handler should be removed from flow control state.
+   * @throws IOException If an error happened.
+   */
+  public void checkAllSaturation() throws IOException
+  {
+    for (ServerHandler handler : replicationServers.values())
+    {
+      handler.checkWindow();
+    }
+
+    for (ServerHandler handler : directoryServers.values())
+    {
+      handler.checkWindow();
+    }
+  }
+
+  /**
+   * Check if a server that was in flow control can now restart
+   * sending updates.
+   * @param sourceHandler The server that must be checked.
+   * @return true if the server can restart sending changes.
+   *         false if the server can't restart sending changes.
+   */
+  public boolean restartAfterSaturation(ServerHandler sourceHandler)
+  {
+    for (ServerHandler handler : replicationServers.values())
+    {
+      if (!handler.restartAfterSaturation(sourceHandler))
+        return false;
+    }
+
+    for (ServerHandler handler : directoryServers.values())
+    {
+      if (!handler.restartAfterSaturation(sourceHandler))
+        return false;
+    }
+    return true;
+  }
+
+  /**
+   * Send a TopologyMsg to all the connected directory servers in order to
+   * let.
+   * them know the topology (every known DSs and RSs)
+   * @param notThisOne If not null, the topology message will not be sent to
+   * this passed server.
+   */
+  public void sendTopoInfoToDSs(ServerHandler notThisOne)
+  {
+    for (ServerHandler handler : directoryServers.values())
+    {
+      if ((notThisOne == null) || // All DSs requested
+        ((notThisOne != null) && (handler != notThisOne)))
+      // All except passed one
+      {
+        TopologyMsg topoMsg = createTopologyMsgForDS(handler.getServerId());
+        try
+        {
+          handler.sendTopoInfo(topoMsg);
+        } catch (IOException e)
+        {
+          Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
+            baseDn.toString(),
+            "directory", Short.toString(handler.getServerId()), e.getMessage());
+          logError(message);
+        }
+      }
+    }
+  }
+
+  /**
+   * Send a TopologyMsg to all the connected replication servers
+   * in order to let them know our connected LDAP servers.
+   */
+  public void sendTopoInfoToRSs()
+  {
+    TopologyMsg topoMsg = createTopologyMsgForRS();
+    for (ServerHandler handler : replicationServers.values())
+    {
       try
       {
-        handler.sendAck(changeNumber);
+        handler.sendTopoInfo(topoMsg);
       } catch (IOException e)
       {
-        /*
-         * An error happened trying the send back an ack to this server.
-         * Log an error and close the connection to this server.
-         */
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString()));
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-        handler.shutdown();
+        Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
+          baseDn.toString(),
+          "replication", Short.toString(handler.getServerId()),
+          e.getMessage());
+        logError(message);
+      }
+    }
+  }
+
+  /**
+   * Creates a TopologyMsg filled with information to be sent to a remote RS.
+   * We send remote RS the info of every DS that are directly connected to us
+   * plus our own info as RS.
+   * @return A suitable TopologyMsg PDU to be sent to a peer RS
+   */
+  public TopologyMsg createTopologyMsgForRS()
+  {
+    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
+
+    // Go through every DSs
+    for (ServerHandler serverHandler : directoryServers.values())
+    {
+      dsInfos.add(serverHandler.toDSInfo());
+    }
+
+    // Create info for us (local RS)
+    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
+    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
+      generationId, replicationServer.getGroupId());
+    rsInfos.add(localRSInfo);
+
+    return new TopologyMsg(dsInfos, rsInfos);
+  }
+
+  /**
+   * Creates a TopologyMsg filled with information to be sent to a DS.
+   * We send remote DS the info of every known DS and RS in the topology (our
+   * directly connected DSs plus the DSs connected to other RSs) except himself.
+   * Also put info related to local RS.
+   *
+   * @param destDsId The id of the DS the TopologyMsg PDU is to be sent to and
+   * that we must not include in the list DS list.
+   * @return A suitable TopologyMsg PDU to be sent to a peer DS
+   */
+  public TopologyMsg createTopologyMsgForDS(short destDsId)
+  {
+    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
+    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
+
+    // Go through every DSs (except recipient of msg)
+    for (ServerHandler serverHandler : directoryServers.values())
+    {
+      if (serverHandler.getServerId() == destDsId)
+        continue;
+      dsInfos.add(serverHandler.toDSInfo());
+    }
+
+    // Add our own info (local RS)
+    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
+      generationId, replicationServer.getGroupId());
+    rsInfos.add(localRSInfo);
+
+    // Go through every peer RSs (and get their connected DSs), also add info
+    // for RSs
+    for (ServerHandler serverHandler : replicationServers.values())
+    {
+      // Put RS info
+      rsInfos.add(serverHandler.toRSInfo());
+      // Put his DSs info
+      Map<Short, LightweightServerHandler> lsList =
+        serverHandler.getConnectedDSs();
+      for (LightweightServerHandler ls : lsList.values())
+      {
+        dsInfos.add(ls.toDSInfo());
       }
     }
 
-    /**
-     * Shutdown this ReplicationServerDomain.
-     */
-    public void shutdown()
+    return new TopologyMsg(dsInfos, rsInfos);
+  }
+
+  /**
+   * Get the generationId associated to this domain.
+   *
+   * @return The generationId
+   */
+  public long getGenerationId()
+  {
+    return generationId;
+  }
+
+  /**
+   * Get the generationId saved status.
+   *
+   * @return The generationId saved status.
+   */
+  public boolean getGenerationIdSavedStatus()
+  {
+    return generationIdSavedStatus;
+  }
+
+  /**
+   * Sets the provided value as the new in memory generationId.
+   *
+   * @param generationId The new value of generationId.
+   * @param savedStatus  The saved status of the generationId.
+   * @return The old generation id
+   */
+  synchronized public long setGenerationId(long generationId,
+    boolean savedStatus)
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "In " + this.replicationServer.getMonitorInstanceName() +
+        " baseDN=" + baseDn +
+        " RCache.set GenerationId=" + generationId);
+
+    long oldGenerationId = this.generationId;
+
+    if (this.generationId != generationId)
     {
-      // Close session with other changelogs
-      for (ServerHandler serverHandler : replicationServers.values())
-      {
-        serverHandler.shutdown();
-      }
+      // we are changing of genId
+      clearDbs();
 
-      // Close session with other LDAP servers
-      for (ServerHandler serverHandler : connectedServers.values())
-      {
-        serverHandler.shutdown();
-      }
+      this.generationId = generationId;
+      this.generationIdSavedStatus = savedStatus;
+    }
 
-      // Shutdown the dbHandlers
-      synchronized (sourceDbHandlers)
+    return oldGenerationId;
+  }
+
+  /**
+   * Resets the generationID.
+   *
+   * @param senderHandler The handler associated to the server
+   *        that requested to reset the generationId.
+   * @param genIdMsg The reset generation ID msg received.
+   */
+  public void resetGenerationId(ServerHandler senderHandler,
+    ResetGenerationIdMsg genIdMsg)
+  {
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(
+        "In RS " + getReplicationServer().getServerId() +
+        " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() +
+        " for baseDn " + baseDn + ":\n" + genIdMsg);
+    }
+
+    try
+    {
+      // Acquire lock on domain (see more details in comment of start() method
+      // of ServerHandler)
+      lock();
+    } catch (InterruptedException ex)
+    {
+      // Try doing job anyway...
+    }
+
+    long newGenId = genIdMsg.getGenerationId();
+
+    if (newGenId != this.generationId)
+    {
+      setGenerationId(newGenId, false);
+    }
+    else
+    {
+      // Order to take a gen id we already have, just ignore
+      if (debugEnabled())
       {
-        for (DbHandler dbHandler : sourceDbHandlers.values())
+        TRACER.debugInfo(
+          "In RS " + getReplicationServer().getServerId()
+          + " Reset generation id requested for baseDn " + baseDn
+          + " but generation id was already " + this.generationId
+          + ":\n" + genIdMsg);
+      }
+    }
+
+    // If we are the first replication server warned,
+    // then forwards the reset message to the remote replication servers
+    for (ServerHandler rsHandler : replicationServers.values())
+    {
+      try
+      {
+        // After we'll have sent the message , the remote RS will adopt
+        // the new genId
+        rsHandler.setGenerationId(newGenId);
+        if (senderHandler.isLDAPserver())
         {
-          dbHandler.shutdown();
+          rsHandler.forwardGenerationIdToRS(genIdMsg);
         }
-        sourceDbHandlers.clear();
+      } catch (IOException e)
+      {
+        logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn.toString(),
+          e.getMessage()));
       }
     }
 
-    /**
-     * Returns the ServerState describing the last change from this replica.
-     *
-     * @return The ServerState describing the last change from this replica.
-     */
-    public ServerState getDbServerState()
+    // Change status of the connected DSs according to the requested new
+    // reference generation id
+    for (ServerHandler dsHandler : directoryServers.values())
     {
-      ServerState serverState = new ServerState();
-      for (DbHandler db : sourceDbHandlers.values())
+      try
       {
-        serverState.update(db.getLastChange());
-      }
-      return serverState;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public String toString()
-    {
-      return "ReplicationServerDomain " + baseDn;
-    }
-
-    /**
-     * Check if some server Handler should be removed from flow control state.
-     * @throws IOException If an error happened.
-     */
-    public void checkAllSaturation() throws IOException
-    {
-      for (ServerHandler handler : replicationServers.values())
+        dsHandler.changeStatusForResetGenId(newGenId);
+      } catch (IOException e)
       {
-        handler.checkWindow();
-      }
-
-      for (ServerHandler handler : connectedServers.values())
-      {
-        handler.checkWindow();
+        logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn.
+          toString(),
+          Short.toString(dsHandler.getServerId()),
+          e.getMessage()));
       }
     }
 
-    /**
-     * Check if a server that was in flow control can now restart
-     * sending updates.
-     * @param sourceHandler The server that must be checked.
-     * @return true if the server can restart sending changes.
-     *         false if the server can't restart sending changes.
-     */
-    public boolean restartAfterSaturation(ServerHandler sourceHandler)
-    {
-      for (ServerHandler handler : replicationServers.values())
-      {
-        if (!handler.restartAfterSaturation(sourceHandler))
-          return false;
-      }
+    // Update every peers (RS/DS) with potential topology changes (status
+    // change). Rather than doing that each time a DS has a status change
+    // (consecutive to reset gen id message), we prefer advertising once for
+    // all after changes (less packet sent), here at the end of the reset msg
+    // treatment.
+    sendTopoInfoToDSs(null);
+    sendTopoInfoToRSs();
 
-      for (ServerHandler handler : connectedServers.values())
-      {
-        if (!handler.restartAfterSaturation(sourceHandler))
-          return false;
-      }
+    Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(),
+      Long.toString(newGenId));
+    logError(message);
+
+    release();
+  }
+
+  /**
+   * Process message of a remote server changing his status.
+   * @param senderHandler The handler associated to the server
+   *        that changed his status.
+   * @param csMsg The message containing the new status
+   */
+  public void processNewStatus(ServerHandler senderHandler,
+    ChangeStatusMsg csMsg)
+  {
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(
+        "In RS " + getReplicationServer().getServerId() +
+        " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
+        " for baseDn " + baseDn + ":\n" + csMsg);
+    }
+
+    try
+    {
+      // Acquire lock on domain (see more details in comment of start() method
+      // of ServerHandler)
+      lock();
+    } catch (InterruptedException ex)
+    {
+      // Try doing job anyway...
+    }
+
+    ServerStatus newStatus = senderHandler.processNewStatus(csMsg);
+    if (newStatus == ServerStatus.INVALID_STATUS)
+    {
+      // Already logged an error in processNewStatus()
+      // just return not to forward a bad status to topology
+      release();
+      return;
+    }
+
+    // Update every peers (RS/DS) with topology changes
+    sendTopoInfoToDSs(senderHandler);
+    sendTopoInfoToRSs();
+
+    Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
+      Short.toString(senderHandler.getServerId()),
+      baseDn.toString(),
+      newStatus.toString());
+    logError(message);
+
+    release();
+  }
+
+  /**
+   * Change the status of a directory server according to the event generated
+   * from the status analyzer.
+   * @param serverHandler The handler of the directory server to update
+   * @param event The event to be used for new status computation
+   * @return True if we have been interrupted (must stop), false otherwise
+   */
+  public boolean changeStatusFromStatusAnalyzer(ServerHandler serverHandler,
+    StatusMachineEvent event)
+  {
+    try
+    {
+      // Acquire lock on domain (see more details in comment of start() method
+      // of ServerHandler)
+      lock();
+    } catch (InterruptedException ex)
+    {
+      // We have been interrupted for dying, from stopStatusAnalyzer
+      // to prevent deadlock in this situation:
+      // RS is being shutdown, and stopServer will call stopStatusAnalyzer.
+      // Domain lock is taken by shutdown thread while status analyzer thread
+      // is willing to change the status of a server at the same time so is
+      // waiting for the domain lock at the same time. As shutdown thread is
+      // waiting for analyzer thread death, a deadlock occurs. So we force
+      // interruption of the status analyzer thread death after 2 seconds if
+      // it has not finished (see StatusAnalyzer.waitForShutdown). This allows
+      // to have the analyzer thread taking the domain lock only when the status
+      // of a DS has to be changed. See more comments in run method of
+      // StatusAnalyzer.
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "Status analyzer for domain " + baseDn + " has been interrupted when"
+          + " trying to acquire domain lock for changing the status of DS " +
+          serverHandler.getServerId());
       return true;
     }
 
-    /**
-     * Send a ReplServerInfoMessage to all the connected replication servers
-     * in order to let them know our connected LDAP servers.
-     */
-    private void sendReplServerInfo()
+    ServerStatus newStatus = ServerStatus.INVALID_STATUS;
+    ServerStatus oldStatus = serverHandler.getStatus();
+    try
     {
-      ReplServerInfoMessage info =
-        new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
-      for (ServerHandler handler : replicationServers.values())
+      newStatus = serverHandler.changeStatusFromStatusAnalyzer(event);
+    } catch (IOException e)
+    {
+      logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER.get(baseDn.
+        toString(),
+        Short.toString(serverHandler.getServerId()),
+        e.getMessage()));
+    }
+
+    if ( (newStatus == ServerStatus.INVALID_STATUS) ||
+      (newStatus == oldStatus) )
+    {
+      // Change was impossible or already occurred (see StatusAnalyzer comments)
+      release();
+      return false;
+    }
+
+    // Update every peers (RS/DS) with topology changes
+    sendTopoInfoToDSs(serverHandler);
+    sendTopoInfoToRSs();
+
+    release();
+    return false;
+  }
+
+  /**
+   * Clears the Db associated with that cache.
+   */
+  public void clearDbs()
+  {
+    // Reset the localchange and state db for the current domain
+    synchronized (sourceDbHandlers)
+    {
+      for (DbHandler dbHandler : sourceDbHandlers.values())
       {
         try
         {
-          handler.sendInfo(info);
-        }
-        catch (IOException e)
+          dbHandler.clear();
+        } catch (Exception e)
         {
-          /*
-           * An error happened trying the send back an ack to this server.
-           * Log an error and close the connection to this server.
-           */
+          // TODO: i18n
           MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_ERROR_SENDING_INFO.get(this.toString()));
-          mb.append(stackTraceToSingleLineString(e));
+          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
+            e.getMessage() + " " +
+            stackTraceToSingleLineString(e)));
           logError(mb.toMessage());
-          handler.shutdown();
         }
       }
-    }
+      sourceDbHandlers.clear();
 
-    /**
-     * Get the generationId associated to this domain.
-     *
-     * @return The generationId
-     */
-    public long getGenerationId()
-    {
-      return generationId;
-    }
-
-    /**
-     * Get the generationId saved status.
-     *
-     * @return The generationId saved status.
-     */
-    public boolean getGenerationIdSavedStatus()
-    {
-      return generationIdSavedStatus;
-    }
-
-    /**
-     * Sets the provided value as the new in memory generationId.
-     *
-     * @param generationId The new value of generationId.
-     * @param savedStatus  The saved status of the generationId.
-     */
-    synchronized public void setGenerationId(long generationId,
-        boolean savedStatus)
-    {
       if (debugEnabled())
         TRACER.debugInfo(
           "In " + this.replicationServer.getMonitorInstanceName() +
           " baseDN=" + baseDn +
-          " RCache.set GenerationId=" + generationId);
-
-      if (this.generationId != generationId)
-      {
-        // we are changing of genId
-        clearDbs();
-
-        this.generationId = generationId;
-        this.generationIdSavedStatus = savedStatus;
-
-        // they have a generationId different from the reference one
-        for (ServerHandler handler : connectedServers.values())
-        {
-          if (generationId != handler.getGenerationId())
-          {
-            // Notify our remote LS that from now on have a different genID
-            handler.warnBadGenerationId();
-          }
-        }
-      }
-    }
-
-    /**
-     * Resets the generationID.
-     *
-     * @param senderHandler The handler associated to the server
-     *        that requested to reset the generationId.
-     * @param genIdMsg The reset generation ID msg received.
-     */
-    public void resetGenerationId(ServerHandler senderHandler,
-        ResetGenerationId genIdMsg)
-    {
-      long newGenId = genIdMsg.getGenerationId();
-
-      if (newGenId != this.generationId)
-      {
-        this.setGenerationId(newGenId, false);
-      }
-
-      // If we are the first replication server warned,
-      // then forwards the reset message to the remote replication servers
-      for (ServerHandler rsHandler : replicationServers.values())
-      {
-        try
-        {
-          // After we'll have send the mssage , the remote RS will adopt
-          // the new genId
-          rsHandler.setGenerationId(newGenId);
-          if (senderHandler.isLDAPserver())
-          {
-            rsHandler.forwardGenerationIdToRS(genIdMsg);
-          }
-        }
-        catch (IOException e)
-        {
-          logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
-              get(rsHandler.getMonitorInstanceName()));
-        }
-      }
-    }
-
-    /**
-     * Clears the Db associated with that cache.
-     */
-    public void clearDbs()
-    {
-      // Reset the localchange and state db for the current domain
-      synchronized (sourceDbHandlers)
-      {
-        for (DbHandler dbHandler : sourceDbHandlers.values())
-        {
-          try
-          {
-            dbHandler.clear();
-          }
-          catch (Exception e)
-          {
-            // TODO: i18n
-            MessageBuilder mb = new MessageBuilder();
-            mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
-                e.getMessage() + " " +
-                stackTraceToSingleLineString(e)));
-            logError(mb.toMessage());
-          }
-        }
-        sourceDbHandlers.clear();
-
-        if (debugEnabled())
-          TRACER.debugInfo(
-              "In " + this.replicationServer.getMonitorInstanceName() +
-              " baseDN=" + baseDn +
           " The source db handler has been cleared");
-      }
-      try
-      {
-        replicationServer.clearGenerationId(baseDn);
-      }
-      catch (Exception e)
-      {
-        // TODO: i18n
-        logError(Message.raw(
-            "Exception caught while clearing generationId:" +
-            e.getLocalizedMessage()));
-      }
     }
-
-    /**
-     * Returns whether the provided server is in degraded
-     * state due to the fact that the peer server has an invalid
-     * generationId for this domain.
-     *
-     * @param serverId The serverId for which we want to know the
-     *                 the state.
-     * @return Whether it is degraded or not.
-     */
-
-    public boolean isDegradedDueToGenerationId(short serverId)
+    try
     {
-      if (debugEnabled())
-        TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDN=" + baseDn +
-            " isDegraded serverId=" + serverId +
-            " given local generation Id=" + this.generationId);
+      replicationServer.clearGenerationId(baseDn);
+    } catch (Exception e)
+    {
+      // TODO: i18n
+      logError(Message.raw(
+        "Exception caught while clearing generationId:" +
+        e.getLocalizedMessage()));
+    }
+  }
 
-      ServerHandler handler = replicationServers.get(serverId);
+  /**
+   * Returns whether the provided server is in degraded
+   * state due to the fact that the peer server has an invalid
+   * generationId for this domain.
+   *
+   * @param serverId The serverId for which we want to know the
+   *                 the state.
+   * @return Whether it is degraded or not.
+   */
+  public boolean isDegradedDueToGenerationId(short serverId)
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "In " + this.replicationServer.getMonitorInstanceName() +
+        " baseDN=" + baseDn +
+        " isDegraded serverId=" + serverId +
+        " given local generation Id=" + this.generationId);
+
+    ServerHandler handler = replicationServers.get(serverId);
+    if (handler == null)
+    {
+      handler = directoryServers.get(serverId);
       if (handler == null)
       {
-        handler = connectedServers.get(serverId);
-        if (handler == null)
-        {
-          return false;
-        }
+        return false;
       }
-
-      if (debugEnabled())
-        TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDN=" + baseDn +
-            " Compute degradation of serverId=" + serverId +
-            " LS server generation Id=" + handler.getGenerationId());
-      return (handler.getGenerationId() != this.generationId);
     }
 
-    /**
-     * Return the associated replication server.
-     * @return The replication server.
-     */
-    public ReplicationServer getReplicationServer()
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "In " + this.replicationServer.getMonitorInstanceName() +
+        " baseDN=" + baseDn +
+        " Compute degradation of serverId=" + serverId +
+        " LS server generation Id=" + handler.getGenerationId());
+    return (handler.getGenerationId() != this.generationId);
+  }
+
+  /**
+   * Return the associated replication server.
+   * @return The replication server.
+   */
+  public ReplicationServer getReplicationServer()
+  {
+    return replicationServer;
+  }
+
+  /**
+   * Process topology information received from a peer RS.
+   * @param topoMsg The just received topo message from remote RS
+   * @param handler The handler that received the message.
+   * @param allowResetGenId True for allowing to reset the generation id (
+   * when called after initial handshake)
+   * @throws IOException If an error occurred.
+   */
+  public void receiveTopoInfoFromRS(TopologyMsg topoMsg, ServerHandler handler,
+    boolean allowResetGenId)
+    throws IOException
+  {
+    if (debugEnabled())
     {
-      return replicationServer;
+      TRACER.debugInfo(
+        "In RS " + getReplicationServer().getServerId() +
+        " Receiving TopologyMsg from " + handler.getServerId() +
+        " for baseDn " + baseDn + ":\n" + topoMsg);
     }
 
-    /**
-     * Process reception of a ReplServerInfoMessage.
-     *
-     * @param infoMsg The received message.
-     * @param handler The handler that received the message.
-     * @throws IOException when raised by the underlying session.
-     */
-    public void receiveReplServerInfo(
-        ReplServerInfoMessage infoMsg, ServerHandler handler) throws IOException
+    try
     {
-      if (debugEnabled())
-      {
-        if (handler.isReplicationServer())
-          TRACER.debugInfo(
-           "In RS " + getReplicationServer().getServerId() +
-           " Receiving replServerInfo from " + handler.getServerId() +
-           " baseDn=" + baseDn +
-           " genId=" + infoMsg.getGenerationId());
-      }
+      // Acquire lock on domain (see more details in comment of start() method
+      // of ServerHandler)
+      lock();
+    } catch (InterruptedException ex)
+    {
+      // Try doing job anyway...
+    }
 
+    /*
+     * Store DS connected to remote RS and update information about the peer RS
+     */
+    handler.receiveTopoInfoFromRS(topoMsg);
+
+    /*
+     * Handle generation id
+     */
+    if (allowResetGenId)
+    {
+      // Check if generation id has to be resetted
       mayResetGenerationId();
       if (generationId < 0)
         generationId = handler.getGenerationId();
-      if (generationId > 0 && (generationId != infoMsg.getGenerationId()))
-      {
-        Message message = NOTE_BAD_GENERATION_ID.get(
-            baseDn.toNormalizedString(),
-            Short.toString(handler.getServerId()),
-            Long.toString(infoMsg.getGenerationId()),
-            Long.toString(generationId));
-
-        ErrorMessage errorMsg = new ErrorMessage(
-            getReplicationServer().getServerId(),
-            handler.getServerId(),
-            message);
-        handler.sendError(errorMsg);
-      }
     }
 
-    /* =======================
-     * Monitor Data generation
-     * =======================
-     */
-
-    /**
-     * Retrieves the global monitor data.
-     * @return The monitor data.
-     * @throws DirectoryException When an error occurs.
-     */
-    synchronized protected MonitorData getMonitorData()
-      throws DirectoryException
+    if (generationId > 0 && (generationId != handler.getGenerationId()))
     {
-      if (monitorData.getBuildDate() + monitorDataLifeTime
-          > TimeThread.getTime())
-      {
-        if (debugEnabled())
-          TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
-        // The current data are still valid. No need to renew them.
-        return monitorData;
-      }
+      Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
+        baseDn.toNormalizedString(),
+        Short.toString(handler.getServerId()),
+        Long.toString(handler.getGenerationId()),
+        Long.toString(generationId));
+      logError(message);
 
-      wrkMonitorData = new MonitorData();
-      synchronized(wrkMonitorData)
-      {
-        if (debugEnabled())
-          TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " Computing monitor data ");
-
-        // Let's process our directly connected LSes
-        // - in the ServerHandler for a given LS1, the stored state contains :
-        //   - the max CN produced by LS1
-        //   - the last CN consumed by LS1 from LS2..n
-        // - in the RSdomain/dbHandler, the built-in state contains :
-        //   - the max CN produced by each server
-        // So for a given LS connected we can take the state and the max from
-        // the LS/state.
-
-        for (ServerHandler directlsh : connectedServers.values())
-        {
-          short serverID = directlsh.getServerId();
-
-          // the state comes from the state stored in the SH
-          ServerState directlshState = directlsh.getServerState().duplicate();
-
-          // the max CN sent by that LS also comes from the SH
-          ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
-          if (maxcn == null)
-          {
-            // This directly connected LS has never produced any change
-            maxcn = new ChangeNumber(0, 0 , serverID);
-          }
-          wrkMonitorData.setMaxCN(serverID, maxcn);
-          wrkMonitorData.setLDAPServerState(serverID, directlshState);
-          wrkMonitorData.setFirstMissingDate(serverID, directlsh.
-                                             getApproxFirstMissingDate());
-        }
-
-        // Then initialize the max CN for the LS that produced something
-        // - from our own local db state
-        // - whatever they are directly or undirectly connected
-        ServerState dbServerState = getDbServerState();
-        Iterator<Short> it = dbServerState.iterator();
-        while (it.hasNext())
-        {
-          short sid = it.next();
-          ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
-          wrkMonitorData.setMaxCN(sid, storedCN);
-        }
-
-        // Now we have used all available local informations
-        // and we need the remote ones.
-        if (debugEnabled())
-          TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDn=" + baseDn + " Local monitor data: " +
-            wrkMonitorData.toString());
-      }
-
-      // Send Request to the other Replication Servers
-      if (remoteMonitorResponsesSemaphore == null)
-      {
-        remoteMonitorResponsesSemaphore = new Semaphore(0);
-        short requestCnt = sendMonitorDataRequest();
-        // Wait reponses from them or timeout
-        waitMonitorDataResponses(requestCnt);
-      }
-      else
-      {
-        // The processing of renewing the monitor cache is already running
-        // We'll make it sleeping until the end
-        // TODO: unit test for this case.
-        while (remoteMonitorResponsesSemaphore!=null)
-        {
-          waitMonitorDataResponses(1);
-        }
-      }
-
-      wrkMonitorData.completeComputing();
-
-      // Store the new computed data as the reference
-      synchronized(monitorData)
-      {
-        // Now we have the expected answers or an error occured
-        monitorData = wrkMonitorData;
-        wrkMonitorData = null;
-        if (debugEnabled())
-          TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " *** Computed MonitorData: " +
-          monitorData.toString());
-      }
-      return monitorData;
+      ErrorMsg errorMsg = new ErrorMsg(
+        getReplicationServer().getServerId(),
+        handler.getServerId(),
+        message);
+      handler.sendError(errorMsg);
     }
 
-
-    /**
-     * Sends a MonitorRequest message to all connected RS.
-     * @return the number of requests sent.
-     * @throws DirectoryException when a problem occurs.
+    /*
+     * Sends the currently known topology information to every connected
+     * DS we have.
      */
-    protected short sendMonitorDataRequest()
-      throws DirectoryException
-    {
-      short sent=0;
-      try
-      {
-        for (ServerHandler rs : replicationServers.values())
-        {
-          MonitorRequestMessage msg = new
-            MonitorRequestMessage(this.replicationServer.getServerId(),
-              rs.getServerId());
-          rs.send(msg);
-          sent++;
-        }
-      }
-      catch(Exception e)
-      {
-        Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
-        logError(message);
-        throw new DirectoryException(ResultCode.OTHER,
-            message, e);
-      }
-      return sent;
-    }
+    sendTopoInfoToDSs(null);
 
-    /**
-     * Wait for the expected count of received MonitorMessage.
-     * @param expectedResponses The number of expected answers.
-     * @throws DirectoryException When an error occurs.
-     */
-    protected void waitMonitorDataResponses(int expectedResponses)
-      throws DirectoryException
-    {
-      try
-      {
-        if (debugEnabled())
-          TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn +
-          " waiting for " + expectedResponses
-          + " expected monitor messages");
+    release();
+  }
 
-        boolean allPermitsAcquired =
-          remoteMonitorResponsesSemaphore.tryAcquire(
-              expectedResponses,
-              (long) 5000, TimeUnit.MILLISECONDS);
-
-        if (!allPermitsAcquired)
-        {
-          logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
-          // let's go on in best effort even with limited data received.
-        }
-        else
-        {
-          if (debugEnabled())
-            TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDn=" + baseDn +
-            " Successfully received all " + expectedResponses
-            + " expected monitor messages");
-        }
-      }
-      catch(Exception e)
-      {
-        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
-      }
-      finally
-      {
-        remoteMonitorResponsesSemaphore = null;
-      }
-    }
-
-    /**
-     * Processes a Monitor message receives from a remote Replication Server
-     * and stores the data received.
-     *
-     * @param msg The message to be processed.
-     */
-    public void receivesMonitorDataResponse(MonitorMessage msg)
+  /* =======================
+   * Monitor Data generation
+   * =======================
+   */
+  /**
+   * Retrieves the global monitor data.
+   * @return The monitor data.
+   * @throws DirectoryException When an error occurs.
+   */
+  synchronized protected MonitorData getMonitorData()
+    throws DirectoryException
+  {
+    if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime())
     {
       if (debugEnabled())
         TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
+      // The current data are still valid. No need to renew them.
+      return monitorData;
+    }
+
+    wrkMonitorData = new MonitorData();
+    synchronized (wrkMonitorData)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn + " Computing monitor data ");
+
+      // Let's process our directly connected LSes
+      // - in the ServerHandler for a given LS1, the stored state contains :
+      //   - the max CN produced by LS1
+      //   - the last CN consumed by LS1 from LS2..n
+      // - in the RSdomain/dbHandler, the built-in state contains :
+      //   - the max CN produced by each server
+      // So for a given LS connected we can take the state and the max from
+      // the LS/state.
+
+      for (ServerHandler directlsh : directoryServers.values())
+      {
+        short serverID = directlsh.getServerId();
+
+        // the state comes from the state stored in the SH
+        ServerState directlshState = directlsh.getServerState().duplicate();
+
+        // the max CN sent by that LS also comes from the SH
+        ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
+        if (maxcn == null)
+        {
+          // This directly connected LS has never produced any change
+          maxcn = new ChangeNumber(0, 0, serverID);
+        }
+        wrkMonitorData.setMaxCN(serverID, maxcn);
+        wrkMonitorData.setLDAPServerState(serverID, directlshState);
+        wrkMonitorData.setFirstMissingDate(serverID,
+          directlsh.getApproxFirstMissingDate());
+      }
+
+      // Then initialize the max CN for the LS that produced something
+      // - from our own local db state
+      // - whatever they are directly or undirectly connected
+      ServerState dbServerState = getDbServerState();
+      Iterator<Short> it = dbServerState.iterator();
+      while (it.hasNext())
+      {
+        short sid = it.next();
+        ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
+        wrkMonitorData.setMaxCN(sid, storedCN);
+      }
+
+      // Now we have used all available local informations
+      // and we need the remote ones.
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn + " Local monitor data: " +
+          wrkMonitorData.toString());
+    }
+
+    // Send Request to the other Replication Servers
+    if (remoteMonitorResponsesSemaphore == null)
+    {
+      remoteMonitorResponsesSemaphore = new Semaphore(0);
+      short requestCnt = sendMonitorDataRequest();
+      // Wait reponses from them or timeout
+      waitMonitorDataResponses(requestCnt);
+    } else
+    {
+      // The processing of renewing the monitor cache is already running
+      // We'll make it sleeping until the end
+      // TODO: unit test for this case.
+      while (remoteMonitorResponsesSemaphore != null)
+      {
+        waitMonitorDataResponses(1);
+      }
+    }
+
+    wrkMonitorData.completeComputing();
+
+    // Store the new computed data as the reference
+    synchronized (monitorData)
+    {
+      // Now we have the expected answers or an error occurred
+      monitorData = wrkMonitorData;
+      wrkMonitorData = null;
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn + " *** Computed MonitorData: " +
+          monitorData.toString());
+    }
+    return monitorData;
+  }
+
+  /**
+   * Sends a MonitorRequest message to all connected RS.
+   * @return the number of requests sent.
+   * @throws DirectoryException when a problem occurs.
+   */
+  protected short sendMonitorDataRequest()
+    throws DirectoryException
+  {
+    short sent = 0;
+    try
+    {
+      for (ServerHandler rs : replicationServers.values())
+      {
+        MonitorRequestMsg msg =
+          new MonitorRequestMsg(this.replicationServer.getServerId(),
+          rs.getServerId());
+        rs.send(msg);
+        sent++;
+      }
+    } catch (Exception e)
+    {
+      Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
+      logError(message);
+      throw new DirectoryException(ResultCode.OTHER,
+        message, e);
+    }
+    return sent;
+  }
+
+  /**
+   * Wait for the expected count of received MonitorMsg.
+   * @param expectedResponses The number of expected answers.
+   * @throws DirectoryException When an error occurs.
+   */
+  protected void waitMonitorDataResponses(int expectedResponses)
+    throws DirectoryException
+  {
+    try
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn +
+          " waiting for " + expectedResponses + " expected monitor messages");
+
+      boolean allPermitsAcquired =
+        remoteMonitorResponsesSemaphore.tryAcquire(
+        expectedResponses,
+        (long) 5000, TimeUnit.MILLISECONDS);
+
+      if (!allPermitsAcquired)
+      {
+        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+      // let's go on in best effort even with limited data received.
+      } else
+      {
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn +
+            " Successfully received all " + expectedResponses +
+            " expected monitor messages");
+      }
+    } catch (Exception e)
+    {
+      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+    } finally
+    {
+      remoteMonitorResponsesSemaphore = null;
+    }
+  }
+
+  /**
+   * Processes a Monitor message receives from a remote Replication Server
+   * and stores the data received.
+   *
+   * @param msg The message to be processed.
+   */
+  public void receivesMonitorDataResponse(MonitorMsg msg)
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
         "In " + this.replicationServer.getMonitorInstanceName() +
         "Receiving " + msg + " from " + msg.getsenderID() +
         remoteMonitorResponsesSemaphore);
 
-      if (remoteMonitorResponsesSemaphore == null)
-      {
-        // Let's ignore the remote monitor data just received
-        // since the computing processing has been ended.
-        // An error - probably a timemout - occured that was already logged
-        logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
-            Short.toString(msg.getsenderID())));
-        return;
-      }
+    if (remoteMonitorResponsesSemaphore == null)
+    {
+      // Let's ignore the remote monitor data just received
+      // since the computing processing has been ended.
+      // An error - probably a timemout - occurred that was already logged
+      logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
+        Short.toString(msg.getsenderID())));
+      return;
+    }
 
-      try
+    try
+    {
+      synchronized (wrkMonitorData)
       {
-        synchronized(wrkMonitorData)
+        // Here is the RS state : list <serverID, lastChangeNumber>
+        // For each LDAP Server, we keep the max CN accross the RSes
+        ServerState replServerState = msg.getReplServerDbState();
+        wrkMonitorData.setMaxCNs(replServerState);
+
+        // Store the remote LDAP servers states
+        Iterator<Short> lsidIterator = msg.ldapIterator();
+        while (lsidIterator.hasNext())
         {
-          // Here is the RS state : list <serverID, lastChangeNumber>
-          // For each LDAP Server, we keep the max CN accross the RSes
-          ServerState replServerState = msg.getReplServerDbState();
-          wrkMonitorData.setMaxCNs(replServerState);
+          short sid = lsidIterator.next();
+          wrkMonitorData.setLDAPServerState(sid,
+            msg.getLDAPServerState(sid).duplicate());
+          wrkMonitorData.setFirstMissingDate(sid,
+            msg.getLDAPApproxFirstMissingDate(sid));
+        }
 
-          // Store the remote LDAP servers states
-          Iterator<Short> lsidIterator = msg.ldapIterator();
-          while (lsidIterator.hasNext())
+        // Process the latency reported by the remote RSi on its connections
+        // to the other RSes
+        Iterator<Short> rsidIterator = msg.rsIterator();
+        while (rsidIterator.hasNext())
+        {
+          short rsid = rsidIterator.next();
+          if (rsid == replicationServer.getServerId())
           {
-            short sid = lsidIterator.next();
-            wrkMonitorData.setLDAPServerState(sid,
-                msg.getLDAPServerState(sid).duplicate());
-            wrkMonitorData.setFirstMissingDate(sid,
-                msg.getLDAPApproxFirstMissingDate(sid));
-          }
-
-          // Process the latency reported by the remote RSi on its connections
-          // to the other RSes
-          Iterator<Short> rsidIterator = msg.rsIterator();
-          while (rsidIterator.hasNext())
-          {
-            short rsid = rsidIterator.next();
-            if (rsid == replicationServer.getServerId())
+            // this is the latency of the remote RSi regarding the current RS
+            // let's update the fmd of my connected LS
+            for (ServerHandler connectedlsh : directoryServers.values())
             {
-              // this is the latency of the remote RSi regarding the current RS
-              // let's update the fmd of my connected LS
-              for (ServerHandler connectedlsh : connectedServers.values())
+              short connectedlsid = connectedlsh.getServerId();
+              Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+              wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+            }
+          } else
+          {
+            // this is the latency of the remote RSi regarding another RSj
+            // let's update the latency of the LSes connected to RSj
+            ServerHandler rsjHdr = replicationServers.get(rsid);
+            if (rsjHdr != null)
+            {
+              for (short remotelsid : rsjHdr.getConnectedDirectoryServerIds())
               {
-                short connectedlsid = connectedlsh.getServerId();
                 Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
-                wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
-              }
-            }
-            else
-            {
-              // this is the latency of the remote RSi regarding another RSj
-              // let's update the latency of the LSes connected to RSj
-              ServerHandler rsjHdr = replicationServers.get(rsid);
-              if (rsjHdr != null)
-              {
-                for(short remotelsid : rsjHdr.getConnectedServerIds())
-                {
-                  Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
-                  wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
-                }
+                wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
               }
             }
           }
+        }
+        if (debugEnabled())
+        {
           if (debugEnabled())
-          {
-            if (debugEnabled())
-              TRACER.debugInfo(
+            TRACER.debugInfo(
               "In " + this.replicationServer.getMonitorInstanceName() +
               " baseDn=" + baseDn +
               " Processed msg from " + msg.getsenderID() +
               " New monitor data: " + wrkMonitorData.toString());
-          }
         }
-
-        // Decreases the number of expected responses and potentially
-        // wakes up the waiting requestor thread.
-        remoteMonitorResponsesSemaphore.release();
-
       }
-      catch (Exception e)
-      {
-        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
-            stackTraceToSingleLineString(e)));
 
-        // If an exception occurs while processing one of the expected message,
-        // the processing is aborted and the waiting thread is awoke.
-        remoteMonitorResponsesSemaphore.notifyAll();
-      }
-    }
+      // Decreases the number of expected responses and potentially
+      // wakes up the waiting requestor thread.
+      remoteMonitorResponsesSemaphore.release();
 
-    /**
-     * Set the purge delay on all the db Handlers for this Domain
-     * of Replicaiton.
-     *
-     * @param delay The new purge delay to use.
-     */
-    void setPurgeDelay(long delay)
+    } catch (Exception e)
     {
-      for (DbHandler handler : sourceDbHandlers.values())
+      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
+        stackTraceToSingleLineString(e)));
+
+      // If an exception occurs while processing one of the expected message,
+      // the processing is aborted and the waiting thread is awoke.
+      remoteMonitorResponsesSemaphore.notifyAll();
+    }
+  }
+
+  /**
+   * Set the purge delay on all the db Handlers for this Domain
+   * of Replicaiton.
+   *
+   * @param delay The new purge delay to use.
+   */
+  void setPurgeDelay(long delay)
+  {
+    for (DbHandler handler : sourceDbHandlers.values())
+    {
+      handler.setPurgeDelay(delay);
+    }
+  }
+
+  /**
+   * Get the map of connected DSs.
+   * @return The map of connected DSs
+   */
+  public Map<Short, ServerHandler> getConnectedDSs()
+  {
+    return directoryServers;
+  }
+
+  /**
+   * Get the map of connected RSs.
+   * @return The map of connected RSs
+   */
+  public Map<Short, ServerHandler> getConnectedRSs()
+  {
+    return replicationServers;
+  }
+  /**
+   * A synchronization mechanism is created to insure exclusive access to the
+   * domain. The goal is to have a consistent view of the topology by locking
+   * the structures holding the topology view of the domain: directoryServers
+   * and replicationServers. When a connection is established with a peer DS or
+   * RS, the lock should be taken before updating these structures, then
+   * released. The same mechanism should be used when updating any data related
+   * to the view of the topology: for instance if the status of a DS is changed,
+   * the lock should be taken before updating the matching server handler and
+   * sending the topology messages to peers and released after.... This allows
+   * every member of the topology to have a consistent view of the topology and
+   * to be sure it will not miss some information.
+   * So the locking system must be called (not exhaustive list):
+   * - when connection established with a DS or RS
+   * - when connection ended with a DS or RS
+   * - when receiving a TopologyMsg and updating structures
+   * - when creating and sending a TopologyMsg
+   * - when a DS status is changing (ChangeStatusMsg received or sent)...
+   */
+  private ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * Tests if the current thread has the lock on this domain.
+   * @return True if the current thread has the lock.
+   */
+  public boolean hasLock()
+  {
+    return (lock.getHoldCount() > 0);
+  }
+
+  /**
+   * Takes the lock on this domain (blocking until lock can be acquired) or
+   * calling thread is interrupted.
+   * @throws java.lang.InterruptedException If interrupted.
+   */
+  public void lock() throws InterruptedException
+  {
+    lock.lockInterruptibly();
+  }
+
+  /**
+   * Releases the lock on this domain.
+   */
+  public void release()
+  {
+    lock.unlock();
+  }
+
+  /**
+   * Tries to acquire the lock on the domain within a given amount of time.
+   * @param timeout The amount of milliseconds to wait for acquiring the lock.
+   * @return True if the lock was acquired, false if timeout occurred.
+   * @throws java.lang.InterruptedException When call was interrupted.
+   */
+  public boolean tryLock(long timeout) throws InterruptedException
+  {
+    return lock.tryLock(timeout, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Starts the status analyzer for the domain.
+   */
+  public void startStatusAnalyzer()
+  {
+    if (statusAnalyzer == null)
+    {
+      int degradedStatusThreshold =
+        replicationServer.getDegradedStatusThreshold();
+      if (degradedStatusThreshold > 0) // 0 means no status analyzer
       {
-        handler.setPurgeDelay(delay);
+        statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
+        statusAnalyzer.start();
       }
     }
+  }
+
+  /**
+   * Stops the status analyzer for the domain.
+   */
+  public void stopStatusAnalyzer()
+  {
+    if (statusAnalyzer != null)
+    {
+      statusAnalyzer.shutdown();
+      statusAnalyzer.waitForShutdown();
+      statusAnalyzer = null;
+    }
+  }
+
+  /**
+   * Tests if the status analyzer for this domain is running.
+   * @return True if the status analyzer is running, false otherwise.
+   */
+  public boolean isRunningStatusAnalyzer()
+  {
+    return (statusAnalyzer != null);
+  }
+
+  /**
+   * Update the status analyzer with the new threshold value.
+   * @param degradedStatusThreshold The new threshold value.
+   */
+  public void updateStatusAnalyzer(int degradedStatusThreshold)
+  {
+    if (statusAnalyzer != null)
+    {
+      statusAnalyzer.setDeradedStatusThreshold(degradedStatusThreshold);
+    }
+  }
 }

--
Gitblit v1.10.0