mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
10.43.2009 ccc4127f23f63214f4dc2f94d26a021a3ec2eec6
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -26,181 +26,217 @@
 */
package org.opends.server.replication.server;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Message;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
/**
 * This class defines a server handler, which handles all interaction with a
 * peer server (RS or DS).
 * This class defines a server handler  :
 * - that is a MessageHandler (see this class for more details)
 * - that handles all interaction with a peer server (RS or DS).
 */
public class ServerHandler extends MonitorProvider<MonitorProviderCfg>
public abstract class ServerHandler extends MessageHandler
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Time during which the server will wait for existing thread to stop
   * during the shutdownWriter.
   */
  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
  /*
   * Properties, filled if remote server is either a DS or a RS
  /**
   * Close the session and log the provided error message
   * Log nothing if message is null.
   * @param providedSession The provided closing session.
   * @param providedMsg     The provided error message.
   * @param handler         The handler that manages that session.
   */
  private short serverId;
  private ProtocolSession session;
  private final MsgQueue msgQueue = new MsgQueue();
  private MsgQueue lateQueue = new MsgQueue();
  private ReplicationServerDomain replicationServerDomain = null;
  private String serverURL;
  static protected void closeSession(ProtocolSession providedSession,
      Message providedMsg, ServerHandler handler)
  {
    if (providedMsg != null)
    {
      if (debugEnabled())
        TRACER.debugInfo("In "
            + ((handler!=null)?handler.toString():"Replication Server")
            + " closing session with err=" +
            providedMsg.toString());
      logError(providedMsg);
    }
    try
    {
      if (providedSession!=null)
        providedSession.close();
    } catch (IOException ee)
    {
      // ignore
    }
  }
  // Number of update sent to the server
  private int outCount = 0;
  // Number of updates received from the server
  private int inCount = 0;
  /**
   * The serverId of the remote server.
   */
  protected short serverId;
  /**
   * The session opened with the remote server.
   */
  protected ProtocolSession session;
  // Number of updates received from the server in assured safe read mode
  private int assuredSrReceivedUpdates = 0;
  // Number of updates received from the server in assured safe read mode that
  // timed out
  private AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
  // Number of updates sent to the server in assured safe read mode
  private int assuredSrSentUpdates = 0;
  // Number of updates sent to the server in assured safe read mode that timed
  // out
  private AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
  // Number of updates received from the server in assured safe data mode
  private int assuredSdReceivedUpdates = 0;
  // Number of updates received from the server in assured safe data mode that
  // timed out
  private AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
  // Number of updates sent to the server in assured safe data mode
  private int assuredSdSentUpdates = 0;
  // Number of updates sent to the server in assured safe data mode that timed
  // out
  private AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
  /**
   * The serverURL of the remote server.
   */
  protected String serverURL;
  /**
   * Number of updates received from the server in assured safe read mode.
   */
  protected int assuredSrReceivedUpdates = 0;
  /**
   * Number of updates received from the server in assured safe read mode that
   * timed out.
   */
  protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
  /**
   * Number of updates sent to the server in assured safe read mode.
   */
  protected int assuredSrSentUpdates = 0;
  /**
   * Number of updates sent to the server in assured safe read mode that timed
   * out.
   */
  protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
  /**
  // Number of updates received from the server in assured safe data mode.
   */
  protected int assuredSdReceivedUpdates = 0;
  /**
   * Number of updates received from the server in assured safe data mode that
   * timed out.
   */
  protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
  /**
   * Number of updates sent to the server in assured safe data mode.
   */
  protected int assuredSdSentUpdates = 0;
  private int maxReceiveQueue = 0;
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private int maxQueueSize = 5000;
  private int maxQueueBytesSize = maxQueueSize * 100;
  private int restartReceiveQueue;
  private int restartSendQueue;
  private int restartReceiveDelay;
  private int restartSendDelay;
  private boolean serverIsLDAPserver;
  private boolean following = false;
  private ServerState serverState;
  private boolean activeWriter = true;
  private ServerWriter writer = null;
  private String baseDn = null;
  /**
   * Number of updates sent to the server in assured safe data mode that timed
   * out.
   */
  protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
  /**
   * The associated ServerWriter that sends messages to the remote server.
   */
  protected ServerReader reader;
  /**
   * The associated ServerReader that receives messages from the remote server.
   */
  protected ServerWriter writer = null;
  // window
  private int rcvWindow;
  private int rcvWindowSizeHalf;
  private int maxRcvWindow;
  private ServerReader reader;
  private Semaphore sendWindow;
  private int sendWindowSize;
  private boolean flowControl = false; // indicate that the server is
  // flow controlled and should
  // be stopped from sending messages.
  /**
   * Semaphore that the writer uses to control the flow to the remote server.
   */
  protected Semaphore sendWindow;
  /**
   * The initial size of the sending window.
   */
  int sendWindowSize;
  private int saturationCount = 0;
  private short replicationServerId;
  private short protocolVersion = -1;
  private long generationId = -1;
  // Group id of this remote server
  private byte groupId = (byte) -1;
  /*
   * Properties filled only if remote server is a DS
   */
  // Status of this DS (only used if this server handler represents a DS)
  private ServerStatus status = ServerStatus.INVALID_STATUS;
  // Referrals URLs this DS is exporting
  private List<String> refUrls = new ArrayList<String>();
  // Assured replication enabled on DS or not
  private boolean assuredFlag = false;
  // DS assured mode (relevant if assured replication enabled)
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  // DS safe data level (relevant if assured mode is safe data)
  private byte safeDataLevel = (byte) -1;
  /*
   * Properties filled only if remote server is a RS
   */
  private String serverAddressURL;
  /**
   * When this Handler is related to a remote replication server
   * this collection will contain as many elements as there are
   * LDAP servers connected to the remote replication server.
   * The protocol version established with the remote server.
   */
  private final Map<Short, LightweightServerHandler> directoryServers =
    new ConcurrentHashMap<Short, LightweightServerHandler>();
  protected short protocolVersion = -1;
  /**
   * remote generation id.
   */
  protected long generationId = -1;
  /**
   * The generation id of the hosting RS.
   */
  protected long localGenerationId = -1;
  /**
   * The generation id before procesing a new start handshake.
   */
  protected long oldGenerationId = -1;
  /**
   * Group id of this remote server.
   */
  protected byte groupId = (byte) -1;
  /**
   * The SSL encryption provided by the creator/starter of this handler.
   */
  protected boolean initSslEncryption;
  /**
   * The SSL encryption after the negociation with the peer.
   */
  protected boolean sslEncryption;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  protected long heartbeatInterval = 0;
  /**
   * The thread that will send heartbeats.
   */
  HeartbeatThread heartbeatThread = null;
  /**
   * Set when ServerWriter is stopping.
   */
  private boolean shutdownWriter = false;
  protected boolean shutdownWriter = false;
  /**
   * Set when ServerHandler is stopping.
   */
  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
  /**
   * Creates a new server handler instance with the provided socket.
   *
@@ -208,846 +244,142 @@
   *                 communicate with the remote entity.
   * @param queueSize The maximum number of update that will be kept
   *                  in memory by this ServerHandler.
   * @param replicationServerURL The URL of the hosting replication server.
   * @param replicationServerId The serverId of the hosting replication server.
   * @param replicationServer The hosting replication server.
   * @param rcvWindowSize The window size to receive from the remote server.
   */
  public ServerHandler(ProtocolSession session, int queueSize)
  public ServerHandler(
      ProtocolSession session,
      int queueSize,
      String replicationServerURL,
      short replicationServerId,
      ReplicationServer replicationServer,
      int rcvWindowSize)
  {
    super("Server Handler");
    super(queueSize, replicationServerURL,
        replicationServerId, replicationServer);
    this.session = session;
    this.maxQueueSize = queueSize;
    this.maxQueueBytesSize = queueSize * 100;
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.rcvWindowSizeHalf = rcvWindowSize / 2;
    this.maxRcvWindow = rcvWindowSize;
    this.rcvWindow = rcvWindowSize;
  }
  /**
   * Creates a DSInfo structure representing this remote DS.
   * @return The DSInfo structure representing this remote DS
   * Abort a start procedure currently establishing.
   * @param reason The provided reason.
   */
  public DSInfo toDSInfo()
  protected void abortStart(Message reason)
  {
    DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId,
      status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls);
    return dsInfo;
  }
  /**
   * Creates a RSInfo structure representing this remote RS.
   * @return The RSInfo structure representing this remote RS
   */
  public RSInfo toRSInfo()
  {
    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
    return rsInfo;
  }
  /**
   * Do the handshake with either the DS or RS and then create the reader and
   * writer thread.
   *
   * There are 2 possible handshake sequences: DS<->RS and RS<->RS. Each one are
   * divided into 2 logical consecutive phases (phase 1 and phase 2):
   *
   * DS<->RS (DS (always initiating connection) always sends first message):
   * -------
   *
   * phase 1:
   * DS --- ServerStartMsg ---> RS
   * DS <--- ReplServerStartMsg --- RS
   * phase 2:
   * DS --- StartSessionMsg ---> RS
   * DS <--- TopologyMsg --- RS
   *
   * RS<->RS (RS initiating connection always sends first message):
   * -------
   *
   * phase 1:
   * RS1 --- ReplServerStartMsg ---> RS2
   * RS1 <--- ReplServerStartMsg --- RS2
   * phase 2:
   * RS1 --- TopologyMsg ---> RS2
   * RS1 <--- TopologyMsg --- RS2
   *
   * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
   *               null if this is an incoming connection (listen).
   * @param replicationServerId The identifier of the replicationServer that
   *                            creates this server handler.
   * @param replicationServerURL The URL of the replicationServer that creates
   *                             this server handler.
   * @param windowSize the window size that this server handler must use.
   * @param sslEncryption For outgoing connections indicates whether encryption
   *                      should be used after the exchange of start messages.
   *                      Ignored for incoming connections.
   * @param replicationServer the ReplicationServer that created this server
   *                          handler.
   */
  public void start(String baseDn, short replicationServerId,
    String replicationServerURL,
    int windowSize, boolean sslEncryption,
    ReplicationServer replicationServer)
  {
    // The handshake phase must be done by blocking any access to structures
    // keeping info on connected servers, so that one can safely check for
    // pre-existence of a server, send a coherent snapshot of known topology
    // to peers, update the local view of the topology...
    //
    // For instance a kind of problem could be that while we connect with a
    // peer RS, a DS is connecting at the same time and we could publish the
    // connected DSs to the peer RS forgetting this last DS in the TopologyMsg.
    //
    // This method and every others that need to read/make changes to the
    // structures holding topology for the domain should:
    // - call ReplicationServerDomain.lock()
    // - read/modify structures
    // - call ReplicationServerDomain.release()
    //
    // More information is provided in comment of ReplicationServerDomain.lock()
    // If domain already exists, lock it until handshake is finished otherwise
    // it will be created and locked later in the method
    if (baseDn != null)
    // We did not recognize the message, close session as what
    // can happen after is undetermined and we do not want the server to
    // be disturbed
    if (session!=null)
    {
      ReplicationServerDomain rsd =
        replicationServer.getReplicationServerDomain(baseDn, false);
      if (rsd != null)
      try
      {
        try
        {
          rsd.lock();
        } catch (InterruptedException ex)
        {
          // Thread interrupted, return.
          return;
        }
        session.publish(
            new ErrorMsg(
                replicationServerDomain.getReplicationServer().getServerId(),
                serverId,
                reason));
      }
      catch(Exception e)
      {
      }
      closeSession(session, reason, this);
    }
    long oldGenerationId = -100;
    if ((replicationServerDomain != null) &&
        replicationServerDomain.hasLock())
      replicationServerDomain.release();
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
        " starts a new LS or RS " +
        ((baseDn == null) ? "incoming connection" : "outgoing connection"));
    this.replicationServerId = replicationServerId;
    rcvWindowSizeHalf = windowSize / 2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
    long localGenerationId = -1;
    ReplServerStartMsg outReplServerStartMsg = null;
    /**
     * This boolean prevents from logging a polluting error when connection\
     * aborted from a DS that wanted only to perform handshake phase 1 in order
     * to determine the best suitable RS:
     * 1) -> ServerStartMsg
     * 2) <- ReplServerStartMsg
     * 3) connection closure
     */
    boolean log_error_message = true;
    try
    // If generation id of domain was changed, set it back to old value
    // We may have changed it as it was -1 and we received a value >0 from
    // peer server and the last topo message sent may have failed being
    // sent: in that case retrieve old value of generation id for
    // replication server domain
    if (oldGenerationId != -100)
    {
      /*
       * PROCEDE WITH FIRST PHASE OF HANDSHAKE:
       * ServerStartMsg then ReplServerStartMsg (with a DS)
       * OR
       * ReplServerStartMsg then ReplServerStartMsg (with a RS)
       */
      replicationServerDomain.setGenerationId(oldGenerationId, false);
    }
  }
      if (baseDn != null) // Outgoing connection
  /**
   * Check the protocol window and send WindowMsg if necessary.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void checkWindow() throws IOException
  {
    if (rcvWindow < rcvWindowSizeHalf)
    {
      if (flowControl)
      {
        // This is an outgoing connection. Publish our start message.
        this.baseDn = baseDn;
        // Get or create the ReplicationServerDomain
        replicationServerDomain =
          replicationServer.getReplicationServerDomain(baseDn, true);
        if (!replicationServerDomain.hasLock())
        if (replicationServerDomain.restartAfterSaturation(this))
        {
          try
          {
            replicationServerDomain.lock();
          } catch (InterruptedException ex)
          {
            // Thread interrupted, return.
            return;
          }
          flowControl = false;
        }
        localGenerationId = replicationServerDomain.getGenerationId();
      }
      if (!flowControl)
      {
        WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
        session.publish(msg);
        rcvWindow += rcvWindowSizeHalf;
      }
    }
  }
        ServerState localServerState =
          replicationServerDomain.getDbServerState();
        outReplServerStartMsg = new ReplServerStartMsg(replicationServerId,
          replicationServerURL,
          baseDn, windowSize, localServerState,
          protocolVersion, localGenerationId,
          sslEncryption,
          replicationServer.getGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold());
  /**
   * Decrement the protocol window, then check if it is necessary
   * to send a WindowMsg and send it.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void decAndCheckWindow() throws IOException
  {
    rcvWindow--;
    checkWindow();
  }
        session.publish(outReplServerStartMsg);
  /**
   * Set the shut down flag to true and returns the previous value of the flag.
   * @return The previous value of the shut down flag
   */
  public boolean engageShutdown()
  {
    // Use thread safe boolean
    return shuttingDown.getAndSet(true);
  }
  /**
   * Finalize the initialization, create reader, writer, heartbeat system
   * and monitoring system.
   * @throws DirectoryException When an exception is raised.
   */
  protected void finalizeStart()
  throws DirectoryException
  {
    // FIXME:ECL We should refactor so that a SH always have a session
    if (session != null)
    {
      try
      {
        // Disable timeout for next communications
        session.setSoTimeout(0);
      }
      catch(Exception e)
      {
      }
      // Wait and process ServerStartMsg or ReplServerStartMsg
      ReplicationMsg msg = session.receive();
      if (msg instanceof ServerStartMsg)
      {
        // The remote server is an LDAP Server.
        ServerStartMsg serverStartMsg = (ServerStartMsg) msg;
        generationId = serverStartMsg.getGenerationId();
        protocolVersion = ProtocolVersion.minWithCurrent(
          serverStartMsg.getVersion());
        serverId = serverStartMsg.getServerId();
        serverURL = serverStartMsg.getServerURL();
        this.baseDn = serverStartMsg.getBaseDn();
        this.serverState = serverStartMsg.getServerState();
        this.groupId = serverStartMsg.getGroupId();
        maxReceiveDelay = serverStartMsg.getMaxReceiveDelay();
        maxReceiveQueue = serverStartMsg.getMaxReceiveQueue();
        maxSendDelay = serverStartMsg.getMaxSendDelay();
        maxSendQueue = serverStartMsg.getMaxSendQueue();
        heartbeatInterval = serverStartMsg.getHeartbeatInterval();
        // The session initiator decides whether to use SSL.
        sslEncryption = serverStartMsg.getSSLEncryption();
        if (maxReceiveQueue > 0)
          restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue -
            200 : maxReceiveQueue * 8 / 10);
        else
          restartReceiveQueue = 0;
        if (maxSendQueue > 0)
          restartSendQueue =
            (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue * 8 /
            10);
        else
          restartSendQueue = 0;
        if (maxReceiveDelay > 0)
          restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay - 1
            : maxReceiveDelay);
        else
          restartReceiveDelay = 0;
        if (maxSendDelay > 0)
          restartSendDelay =
            (maxSendDelay > 10 ? maxSendDelay - 1 : maxSendDelay);
        else
          restartSendDelay = 0;
        if (heartbeatInterval < 0)
        {
          heartbeatInterval = 0;
        }
        serverIsLDAPserver = true;
        // Get or Create the ReplicationServerDomain
        replicationServerDomain =
          replicationServer.getReplicationServerDomain(this.baseDn, true);
        // Hack to be sure that if a server disconnects and reconnect, we
        // let the reader thread see the closure and cleanup any reference
        // to old connection
        replicationServerDomain.waitDisconnection(serverStartMsg.getServerId());
        if (!replicationServerDomain.hasLock())
        {
          try
          {
            replicationServerDomain.lock();
          } catch (InterruptedException ex)
          {
            // Thread interrupted, return.
            return;
          }
        }
        // Duplicate server ?
        if (!replicationServerDomain.checkForDuplicateDS(this))
        {
          closeSession(null);
          if ((replicationServerDomain != null) &&
            replicationServerDomain.hasLock())
            replicationServerDomain.release();
          return;
        }
        localGenerationId = replicationServerDomain.getGenerationId();
        ServerState localServerState =
          replicationServerDomain.getDbServerState();
        // This an incoming connection. Publish our start message
        ReplServerStartMsg replServerStartMsg =
          new ReplServerStartMsg(replicationServerId, replicationServerURL,
          this.baseDn, windowSize, localServerState,
          protocolVersion, localGenerationId,
          sslEncryption,
          replicationServer.getGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold());
        session.publish(replServerStartMsg);
        sendWindowSize = serverStartMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the
        negotiation */
        if (!sslEncryption)
        {
          session.stopEncryption();
        }
        if (debugEnabled())
        {
          TRACER.debugInfo("In " +
            replicationServerDomain.getReplicationServer().
            getMonitorInstanceName() + ":" +
            "\nSH HANDSHAKE RECEIVED:\n" + serverStartMsg.toString() +
            "\nAND REPLIED:\n" + replServerStartMsg.toString());
        }
      } else if (msg instanceof ReplServerStartMsg)
      {
        // The remote server is a replication server
        ReplServerStartMsg inReplServerStartMsg = (ReplServerStartMsg) msg;
        protocolVersion = ProtocolVersion.minWithCurrent(
          inReplServerStartMsg.getVersion());
        generationId = inReplServerStartMsg.getGenerationId();
        serverId = inReplServerStartMsg.getServerId();
        serverURL = inReplServerStartMsg.getServerURL();
        int separator = serverURL.lastIndexOf(':');
        serverAddressURL =
          session.getRemoteAddress() + ":" + serverURL.substring(separator +
          1);
        serverIsLDAPserver = false;
        if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
        {
          // We support connection from a V1 RS
          // Only V2 protocol has the group id in repl server start message
          this.groupId = inReplServerStartMsg.getGroupId();
        }
        this.baseDn = inReplServerStartMsg.getBaseDn();
        if (baseDn == null) // Reply to incoming RS
        {
          // Get or create the ReplicationServerDomain
          replicationServerDomain =
            replicationServer.getReplicationServerDomain(this.baseDn, true);
          if (!replicationServerDomain.hasLock())
          {
            try
            {
              /**
               * Take the lock on the domain.
               * WARNING: Here we try to acquire the lock with a timeout. This
               * is for preventing a deadlock that may happen if there are cross
               * connection attempts (for same domain) from this replication
               * server and from a peer one:
               * Here is the scenario:
               * - RS1 connect thread takes the domain lock and starts
               * connection to RS2
               * - at the same time RS2 connect thread takes his domain lock and
               * start connection to RS2
               * - RS2 listen thread starts processing received
               * ReplServerStartMsg from RS1 and wants to acquire the lock on
               * the domain (here) but cannot as RS2 connect thread already has
               * it
               * - RS1 listen thread starts processing received
               * ReplServerStartMsg from RS2 and wants to acquire the lock on
               * the domain (here) but cannot as RS1 connect thread already has
               * it
               * => Deadlock: 4 threads are locked.
               * So to prevent that in such situation, the listen threads here
               * will both timeout trying to acquire the lock. The random time
               * for the timeout should allow on connection attempt to be
               * aborted whereas the other one should have time to finish in the
               * same time.
               * Warning: the minimum time (3s) should be big enough to allow
               * normal situation connections to terminate. The added random
               * time should represent a big enough range so that the chance to
               * have one listen thread timing out a lot before the peer one is
               * great. When the first listen thread times out, the remote
               * connect thread should release the lock and allow the peer
               * listen thread to take the lock it was waiting for and process
               * the connection attempt.
               */
              Random random = new Random();
              int randomTime = random.nextInt(6); // Random from 0 to 5
              // Wait at least 3 seconds + (0 to 5 seconds)
              long timeout = (long) (3000 + ( randomTime * 1000 ) );
              boolean noTimeout = replicationServerDomain.tryLock(timeout);
              if (!noTimeout)
              {
                // Timeout
                Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
                  this.baseDn,
                  Short.toString(serverId),
                  Short.toString(replicationServer.getServerId()));
                closeSession(message);
                return;
              }
            } catch (InterruptedException ex)
            {
              // Thread interrupted, return.
              return;
            }
          }
          localGenerationId = replicationServerDomain.getGenerationId();
          ServerState domServerState =
            replicationServerDomain.getDbServerState();
          // The session initiator decides whether to use SSL.
          sslEncryption = inReplServerStartMsg.getSSLEncryption();
          // Publish our start message
          outReplServerStartMsg = new ReplServerStartMsg(replicationServerId,
            replicationServerURL,
            this.baseDn, windowSize, domServerState,
            protocolVersion,
            localGenerationId,
            sslEncryption,
            replicationServer.getGroupId(),
            replicationServerDomain.
            getReplicationServer().getDegradedStatusThreshold());
          if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
          {
            session.publish(outReplServerStartMsg);
          } else {
            // We support connection from a V1 RS, send PDU with V1 form
            session.publish(outReplServerStartMsg,
              ProtocolVersion.REPLICATION_PROTOCOL_V1);
          }
          if (debugEnabled())
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + ":" +
              "\nSH HANDSHAKE RECEIVED:\n" + inReplServerStartMsg.toString() +
              "\nAND REPLIED:\n" + outReplServerStartMsg.toString());
          }
        } else
        {
          // Did the remote RS answer with the DN we provided him ?
          if (!(this.baseDn.equals(baseDn)))
          {
            Message message = ERR_RS_DN_DOES_NOT_MATCH.get(
              this.baseDn.toString(),
              baseDn.toString());
            closeSession(message);
            if ((replicationServerDomain != null) &&
              replicationServerDomain.hasLock())
              replicationServerDomain.release();
            return;
          }
          if (debugEnabled())
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + ":" +
              "\nSH HANDSHAKE SENT:\n" + outReplServerStartMsg.toString() +
              "\nAND RECEIVED:\n" + inReplServerStartMsg.toString());
          }
        }
        this.serverState = inReplServerStartMsg.getServerState();
        sendWindowSize = inReplServerStartMsg.getWindowSize();
        // Duplicate server ?
        if (!replicationServerDomain.checkForDuplicateRS(this))
        {
          closeSession(null);
          if ((replicationServerDomain != null) &&
            replicationServerDomain.hasLock())
            replicationServerDomain.release();
          return;
        }
        /* Until here session is encrypted then it depends on the
        negociation */
        if (!sslEncryption)
        {
          session.stopEncryption();
        }
      } else
      {
        // We did not recognize the message, close session as what
        // can happen after is undetermined and we do not want the server to
        // be disturbed
        closeSession(null);
        if ((replicationServerDomain != null) &&
          replicationServerDomain.hasLock())
          replicationServerDomain.release();
        return;
      }
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      { // Only protocol version above V1 has a phase 2 handshake
        /*
         * NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE:
         * TopologyMsg then TopologyMsg (with a RS)
         * OR
         * StartSessionMsg then TopologyMsg (with a DS)
         */
        TopologyMsg outTopoMsg = null;
        if (baseDn != null) // Outgoing connection to a RS
        {
          // Send our own TopologyMsg to remote RS
          outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
          session.publish(outTopoMsg);
        }
        // Wait and process TopologyMsg or StartSessionMsg
        log_error_message = false;
        ReplicationMsg msg2 = session.receive();
        log_error_message = true;
        if (msg2 instanceof TopologyMsg)
        {
          // Remote RS sent his topo msg
          TopologyMsg inTopoMsg = (TopologyMsg) msg2;
          // CONNECTION WITH A RS
          // if the remote RS and the local RS have the same genID
          // then it's ok and nothing else to do
          if (generationId == localGenerationId)
          {
            if (debugEnabled())
            {
              TRACER.debugInfo("In " +
                replicationServerDomain.getReplicationServer().
                getMonitorInstanceName() + " RS with serverID=" + serverId +
                " is connected with the right generation ID");
            }
          } else
          {
            if (localGenerationId > 0)
            {
              // if the local RS is initialized
              if (generationId > 0)
              {
                // if the remote RS is initialized
                if (generationId != localGenerationId)
                {
                  // if the 2 RS have different generationID
                  if (replicationServerDomain.getGenerationIdSavedStatus())
                  {
                    // if the present RS has received changes regarding its
                    //     gen ID and so won't change without a reset
                    // then  we are just degrading the peer.
                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                      this.baseDn,
                      Short.toString(serverId),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                    logError(message);
                  } else
                  {
                    // The present RS has never received changes regarding its
                    // gen ID.
                    //
                    // Example case:
                    // - we are in RS1
                    // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
                    // - RS1 has genId1 from LS1 /genId1 comes from data in
                    //   suffix
                    // - we are in RS1 and we receive a START msg from RS2
                    // - Each RS keeps its genID / is degraded and when LS2
                    //   will be populated from LS1 everything will become ok.
                    //
                    // Issue:
                    // FIXME : Would it be a good idea in some cases to just
                    //         set the gen ID received from the peer RS
                    //         specially if the peer has a non null state and
                    //         we have a nul state ?
                    // replicationServerDomain.
                    // setGenerationId(generationId, false);
                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                      this.baseDn,
                      Short.toString(serverId),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                    logError(message);
                  }
                }
              } else
              {
                // The remote RS has no genId. We don't change anything for the
                // current RS.
              }
            } else
            {
              // The local RS is not initialized - take the one received
              // WARNING: Must be done before computing topo message to send
              // to peer server as topo message must embed valid generation id
              // for our server
              oldGenerationId =
                replicationServerDomain.setGenerationId(generationId, false);
            }
          }
          if (baseDn == null) // Reply to the RS (incoming connection)
          {
            // Send our own TopologyMsg to remote RS
            outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
            session.publish(outTopoMsg);
            if (debugEnabled())
            {
              TRACER.debugInfo("In " +
                replicationServerDomain.getReplicationServer().
                getMonitorInstanceName() + ":" +
                "\nSH HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
                "\nAND REPLIED:\n" + outTopoMsg.toString());
            }
          } else
          {
            if (debugEnabled())
            {
              TRACER.debugInfo("In " +
                replicationServerDomain.getReplicationServer().
                getMonitorInstanceName() + ":" +
                "\nSH HANDSHAKE SENT:\n" + outTopoMsg.toString() +
                "\nAND RECEIVED:\n" + inTopoMsg.toString());
            }
          }
          // Alright, connected with new RS (either outgoing or incoming
          // connection): store handler.
          Map<Short, ServerHandler> connectedRSs =
            replicationServerDomain.getConnectedRSs();
          connectedRSs.put(serverId, this);
          // Process TopologyMsg sent by remote RS: store matching new info
          // (this will also warn our connected DSs of the new received info)
          replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false);
        } else if (msg2 instanceof StartSessionMsg)
        {
          // CONNECTION WITH A DS
          // Process StartSessionMsg sent by remote DS
          StartSessionMsg startSessionMsg = (StartSessionMsg) msg2;
          this.status = startSessionMsg.getStatus();
          // Sanity check: is it a valid initial status?
          if (!isValidInitialStatus(this.status))
          {
            Message mesg = ERR_RS_INVALID_INIT_STATUS.get(
              this.status.toString(), this.baseDn.toString(),
              Short.toString(serverId));
            closeSession(mesg);
            if ((replicationServerDomain != null) &&
              replicationServerDomain.hasLock())
              replicationServerDomain.release();
            return;
          }
          this.refUrls = startSessionMsg.getReferralsURLs();
          this.assuredFlag = startSessionMsg.isAssured();
          this.assuredMode = startSessionMsg.getAssuredMode();
          this.safeDataLevel = startSessionMsg.getSafeDataLevel();
          /*
           * If we have already a generationID set for the domain
           * then
           *   if the connecting replica has not the same
           *   then it is degraded locally and notified by an error message
           * else
           *   we set the generationID from the one received
           *   (unsaved yet on disk . will be set with the 1rst change
           * received)
           */
          if (localGenerationId > 0)
          {
            if (generationId != localGenerationId)
            {
              Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
                this.baseDn,
                Short.toString(serverId),
                Long.toString(generationId),
                Long.toString(localGenerationId));
              logError(message);
            }
          } else
          {
            // We are an empty Replicationserver
            if ((generationId > 0) && (!serverState.isEmpty()))
            {
              // If the LDAP server has already sent changes
              // it is not expected to connect to an empty RS
              Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
                this.baseDn,
                Short.toString(serverId),
                Long.toString(generationId),
                Long.toString(localGenerationId));
              logError(message);
            } else
            {
              // The local RS is not initialized - take the one received
              // WARNING: Must be done before computing topo message to send
              // to peer server as topo message must embed valid generation id
              // for our server
              oldGenerationId =
                replicationServerDomain.setGenerationId(generationId, false);
            }
          }
          // Send our own TopologyMsg to DS
          outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
            this.serverId);
          session.publish(outTopoMsg);
          if (debugEnabled())
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + ":" +
              "\nSH HANDSHAKE RECEIVED:\n" + startSessionMsg.toString() +
              "\nAND REPLIED:\n" + outTopoMsg.toString());
          }
          // Alright, connected with new DS: store handler.
          Map<Short, ServerHandler> connectedDSs =
            replicationServerDomain.getConnectedDSs();
          connectedDSs.put(serverId, this);
          // Tell peer DSs a new DS just connected to us
          // No need to resend topo msg to this just new DS so not null
          // argument
          replicationServerDomain.sendTopoInfoToDSs(this);
          // Tell peer RSs a new DS just connected to us
          replicationServerDomain.sendTopoInfoToRSs();
        } else
        {
          // We did not recognize the message, close session as what
          // can happen after is undetermined and we do not want the server to
          // be disturbed
          closeSession(null);
          if ((replicationServerDomain != null) &&
            replicationServerDomain.hasLock())
            replicationServerDomain.release();
          return;
        }
      } else
      {
        // Terminate connection from a V1 RS
        // if the remote RS and the local RS have the same genID
        // then it's ok and nothing else to do
        if (generationId == localGenerationId)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS V1 with serverID=" + serverId +
              " is connected with the right generation ID");
          }
        } else
        {
          if (localGenerationId > 0)
          {
            // if the local RS is initialized
            if (generationId > 0)
            {
              // if the remote RS is initialized
              if (generationId != localGenerationId)
              {
                // if the 2 RS have different generationID
                if (replicationServerDomain.getGenerationIdSavedStatus())
                {
                  // if the present RS has received changes regarding its
                  //     gen ID and so won't change without a reset
                  // then  we are just degrading the peer.
                  Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                    this.baseDn,
                    Short.toString(serverId),
                    Long.toString(generationId),
                    Long.toString(localGenerationId));
                  logError(message);
                } else
                {
                  // The present RS has never received changes regarding its
                  // gen ID.
                  //
                  // Example case:
                  // - we are in RS1
                  // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
                  // - RS1 has genId1 from LS1 /genId1 comes from data in
                  //   suffix
                  // - we are in RS1 and we receive a START msg from RS2
                  // - Each RS keeps its genID / is degraded and when LS2
                  //   will be populated from LS1 everything will become ok.
                  //
                  // Issue:
                  // FIXME : Would it be a good idea in some cases to just
                  //         set the gen ID received from the peer RS
                  //         specially if the peer has a non null state and
                  //         we have a nul state ?
                  // replicationServerDomain.
                  // setGenerationId(generationId, false);
                  Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                    this.baseDn,
                    Short.toString(serverId),
                    Long.toString(generationId),
                    Long.toString(localGenerationId));
                  logError(message);
                }
              }
            } else
            {
              // The remote RS has no genId. We don't change anything for the
              // current RS.
            }
          } else
          {
            // The local RS is not initialized - take the one received
            oldGenerationId =
              replicationServerDomain.setGenerationId(generationId, false);
          }
        }
        // Alright, connected with new incoming V1 RS: store handler.
        Map<Short, ServerHandler> connectedRSs =
          replicationServerDomain.getConnectedRSs();
        connectedRSs.put(serverId, this);
        // Note: the supported scenario for V1->V2 upgrade is to upgrade 1 by 1
        // all the servers of the topology. We prefer not not send a TopologyMsg
        // for giving partial/false information to the V2 servers as for
        // instance we don't have the connected DS of the V1 RS...When the V1
        // RS will be upgraded in his turn, topo info will be sent and accurate.
        // That way, there is  no risk to have false/incomplete information in
        // other servers.
      }
      /*
       * FINALIZE INITIALIZATION:
       * CREATE READER AND WRITER, HEARTBEAT SYSTEM AND UPDATE MONITORING
       * SYSTEM
       */
      // Disable timeout for next communications
      session.setSoTimeout(0);
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      writer = new ServerWriter(session, serverId,
        this, replicationServerDomain);
          this, replicationServerDomain);
      reader = new ServerReader(session, serverId,
        this, replicationServerDomain);
          this, replicationServerDomain);
      reader.start();
      writer.start();
@@ -1056,440 +388,30 @@
      if (heartbeatInterval > 0)
      {
        heartbeatThread = new HeartbeatThread(
          "Replication Heartbeat to DS " + serverURL + " " + serverId +
          " for " + this.baseDn + " in RS " + replicationServerId,
          session, heartbeatInterval / 3);
            "Replication Heartbeat to " + this +
            " in RS " + replicationServerDomain.getReplicationServer().
            getMonitorInstanceName(),
            session, heartbeatInterval / 3);
        heartbeatThread.start();
      }
      // Create the status analyzer for the domain if not already started
      if (serverIsLDAPserver)
      {
        if (!replicationServerDomain.isRunningStatusAnalyzer())
        {
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServerDomain.
              getReplicationServer().
              getMonitorInstanceName() +
              " SH for remote server " + this.getMonitorInstanceName() +
              " is starting status analyzer");
          replicationServerDomain.startStatusAnalyzer();
        }
      }
      DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
      DirectoryServer.registerMonitorProvider(this);
    } catch (NotSupportedOldVersionPDUException e)
    {
      // We do not need to support DS V1 connection, we just accept RS V1
      // connection:
      // We just trash the message, log the event for debug purpose and close
      // the connection
      if (debugEnabled())
      TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ":"
        + e.getMessage());
      closeSession(null);
    } catch (Exception e)
    {
      // We do not want polluting error log if error is due to normal session
      // aborted after handshake phase one from a DS that is searching for best
      // suitable RS.
      if ( log_error_message || (baseDn != null) )
      {
        // some problem happened, reject the connection
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_REPLICATION_SERVER_CONNECTION_ERROR.get(
          this.getMonitorInstanceName()));
        mb.append(": " + stackTraceToSingleLineString(e));
        closeSession(mb.toMessage());
      } else
      {
        closeSession(null);
      }
      // If generation id of domain was changed, set it back to old value
      // We may have changed it as it was -1 and we received a value >0 from
      // peer server and the last topo message sent may have failed being
      // sent: in that case retrieve old value of generation id for
      // replication server domain
      if (oldGenerationId != -100)
      {
        replicationServerDomain.setGenerationId(oldGenerationId, false);
      }
    }
    // Release domain
    if ((replicationServerDomain != null) &&
      replicationServerDomain.hasLock())
      replicationServerDomain.release();
  }
  /*
   * Close the session logging the passed error message
   * Log nothing if message is null.
   */
  private void closeSession(Message msg)
  {
    if (msg != null)
    {
      logError(msg);
    }
    try
    {
      session.close();
    } catch (IOException ee)
    {
      // ignore
    }
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
    DirectoryServer.registerMonitorProvider(this);
  }
  /**
   * get the Server Id.
   * Sends a message containing a generationId to a peer server.
   * The peer is expected to be a replication server.
   *
   * @return the ID of the server to which this object is linked
   */
  public short getServerId()
  {
    return serverId;
  }
  /**
   * Retrieves the Address URL for this server handler.
   * @param  msg         The GenerationIdMessage message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   * @return  The Address URL for this server handler,
   *          in the form of an IP address and port separated by a colon.
   */
  public String getServerAddressURL()
  public void forwardGenerationIdToRS(ResetGenerationIdMsg msg)
  throws IOException
  {
    return serverAddressURL;
  }
  /**
   * Retrieves the URL for this server handler.
   *
   * @return  The URL for this server handler, in the form of an address and
   *          port separated by a colon.
   */
  public String getServerURL()
  {
    return serverURL;
  }
  /**
   * Increase the counter of updates sent to the server.
   */
  public void incrementOutCount()
  {
    outCount++;
  }
  /**
   * Increase the counter of update received from the server.
   */
  public void incrementInCount()
  {
    inCount++;
  }
  /**
   * Get the count of updates received from the server.
   * @return the count of update received from the server.
   */
  public int getInCount()
  {
    return inCount;
  }
  /**
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */
  public int getOutCount()
  {
    return outCount;
  }
  /**
   * Get the number of updates received from the server in assured safe read
   * mode.
   * @return The number of updates received from the server in assured safe read
   * mode
   */
  public int getAssuredSrReceivedUpdates()
  {
    return assuredSrReceivedUpdates;
  }
  /**
   * Get the number of updates received from the server in assured safe read
   * mode that timed out.
   * @return The number of updates received from the server in assured safe read
   * mode that timed out.
   */
  public AtomicInteger getAssuredSrReceivedUpdatesTimeout()
  {
    return assuredSrReceivedUpdatesTimeout;
  }
  /**
   * Get the number of updates sent to the server in assured safe read mode.
   * @return The number of updates sent to the server in assured safe read mode
   */
  public int getAssuredSrSentUpdates()
  {
    return assuredSrSentUpdates;
  }
  /**
   * Get the number of updates sent to the server in assured safe read mode that
   * timed out.
   * @return The number of updates sent to the server in assured safe read mode
   * that timed out.
   */
  public AtomicInteger getAssuredSrSentUpdatesTimeout()
  {
    return assuredSrSentUpdatesTimeout;
  }
    /**
   * Get the number of updates received from the server in assured safe data
   * mode.
   * @return The number of updates received from the server in assured safe data
   * mode
   */
  public int getAssuredSdReceivedUpdates()
  {
    return assuredSdReceivedUpdates;
  }
  /**
   * Get the number of updates received from the server in assured safe data
   * mode that timed out.
   * @return The number of updates received from the server in assured safe data
   * mode that timed out.
   */
  public AtomicInteger getAssuredSdReceivedUpdatesTimeout()
  {
    return assuredSdReceivedUpdatesTimeout;
  }
  /**
   * Get the number of updates sent to the server in assured safe data mode.
   * @return The number of updates sent to the server in assured safe data mode
   */
  public int getAssuredSdSentUpdates()
  {
    return assuredSdSentUpdates;
  }
  /**
   * Get the number of updates sent to the server in assured safe data mode that
   * timed out.
   * @return The number of updates sent to the server in assured safe data mode
   * that timed out.
   */
  public AtomicInteger getAssuredSdSentUpdatesTimeout()
  {
    return assuredSdSentUpdatesTimeout;
  }
  /**
   * Increment the number of updates received from the server in assured safe
   * read mode.
   */
  public void incrementAssuredSrReceivedUpdates()
  {
    assuredSrReceivedUpdates++;
  }
  /**
   * Increment the number of updates received from the server in assured safe
   * read mode that timed out.
   */
  public void incrementAssuredSrReceivedUpdatesTimeout()
  {
    assuredSrReceivedUpdatesTimeout.incrementAndGet();
  }
  /**
   * Increment the number of updates sent to the server in assured safe read
   * mode.
   */
  public void incrementAssuredSrSentUpdates()
  {
    assuredSrSentUpdates++;
  }
  /**
   * Increment the number of updates sent to the server in assured safe read
   * mode that timed out.
   */
  public void incrementAssuredSrSentUpdatesTimeout()
  {
    assuredSrSentUpdatesTimeout.incrementAndGet();
  }
  /**
   * Increment the number of updates received from the server in assured safe
   * data mode.
   */
  public void incrementAssuredSdReceivedUpdates()
  {
    assuredSdReceivedUpdates++;
  }
  /**
   * Increment the number of updates received from the server in assured safe
   * data mode that timed out.
   */
  public void incrementAssuredSdReceivedUpdatesTimeout()
  {
    assuredSdReceivedUpdatesTimeout.incrementAndGet();
  }
  /**
   * Increment the number of updates sent to the server in assured safe data
   * mode.
   */
  public void incrementAssuredSdSentUpdates()
  {
    assuredSdSentUpdates++;
  }
  /**
   * Increment the number of updates sent to the server in assured safe data
   * mode that timed out.
   */
  public void incrementAssuredSdSentUpdatesTimeout()
  {
    assuredSdSentUpdatesTimeout.incrementAndGet();
  }
  /**
   * Check is this server is saturated (this server has already been
   * sent a bunch of updates and has not processed them so they are staying
   * in the message queue for this server an the size of the queue
   * for this server is above the configured limit.
   *
   * The limit can be defined in number of updates or with a maximum delay
   *
   * @param changeNumber The changenumber to use to make the delay calculations.
   * @param sourceHandler The ServerHandler which is sending the update.
   * @return true is saturated false if not saturated.
   */
  public boolean isSaturated(ChangeNumber changeNumber,
    ServerHandler sourceHandler)
  {
    synchronized (msgQueue)
    {
      int size = msgQueue.count();
      if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
        return true;
      if ((sourceHandler.maxSendQueue > 0) &&
        (size >= sourceHandler.maxSendQueue))
        return true;
      if (!msgQueue.isEmpty())
      {
        UpdateMsg firstUpdate = msgQueue.first();
        if (firstUpdate != null)
        {
          long timeDiff = changeNumber.getTimeSec() -
            firstUpdate.getChangeNumber().getTimeSec();
          if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay))
            return true;
          if ((sourceHandler.maxSendDelay > 0) &&
            (timeDiff >= sourceHandler.maxSendDelay))
            return true;
        }
      }
      return false;
    }
  }
  /**
   * Check that the size of the Server Handler messages Queue has lowered
   * below the limit and therefore allowing the reception of messages
   * from other servers to restart.
   * @param source The ServerHandler which was sending the update.
   *        can be null.
   * @return true if the processing can restart
   */
  public boolean restartAfterSaturation(ServerHandler source)
  {
    synchronized (msgQueue)
    {
      int queueSize = msgQueue.count();
      if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
        return false;
      if ((source != null) && (source.maxSendQueue > 0) &&
        (queueSize >= source.restartSendQueue))
        return false;
      if (!msgQueue.isEmpty())
      {
        UpdateMsg firstUpdate = msgQueue.first();
        UpdateMsg lastUpdate = msgQueue.last();
        if ((firstUpdate != null) && (lastUpdate != null))
        {
          long timeDiff = lastUpdate.getChangeNumber().getTimeSec() -
            firstUpdate.getChangeNumber().getTimeSec();
          if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay))
            return false;
          if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >=
            source.restartSendDelay))
            return false;
        }
      }
    }
    return true;
  }
  /**
   * Check if the server associated to this ServerHandler is a replication
   * server.
   * @return true if the server associated to this ServerHandler is a
   *         replication server.
   */
  public boolean isReplicationServer()
  {
    return (!serverIsLDAPserver);
  }
  /**
   * Get the number of message in the receive message queue.
   * @return Size of the receive message queue.
   */
  public int getRcvMsgQueueSize()
  {
    synchronized (msgQueue)
    {
      /*
       * When the server is up to date or close to be up to date,
       * the number of updates to be sent is the size of the receive queue.
       */
      if (isFollowing())
        return msgQueue.count();
      else
      {
        /**
         * When the server  is not able to follow, the msgQueue
         * may become too large and therefore won't contain all the
         * changes. Some changes may only be stored in the backing DB
         * of the servers.
         * The total size of the receive queue is calculated by doing
         * the sum of the number of missing changes for every dbHandler.
         */
        ServerState dbState = replicationServerDomain.getDbServerState();
        return ServerState.diffChanges(dbState, serverState);
      }
    }
    session.publish(msg);
  }
  /**
@@ -1536,478 +458,133 @@
  }
  /**
   * Get the older update time for that server.
   * @return The older update time.
   * Get the number of updates received from the server in assured safe data
   * mode.
   * @return The number of updates received from the server in assured safe data
   * mode
   */
  public long getOlderUpdateTime()
  public int getAssuredSdReceivedUpdates()
  {
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return 0;
    return olderUpdateCN.getTime();
    return assuredSdReceivedUpdates;
  }
  /**
   * Get the older Change Number for that server.
   * Returns null when the queue is empty.
   * @return The older change number.
   * Get the number of updates received from the server in assured safe data
   * mode that timed out.
   * @return The number of updates received from the server in assured safe data
   * mode that timed out.
   */
  public ChangeNumber getOlderUpdateCN()
  public AtomicInteger getAssuredSdReceivedUpdatesTimeout()
  {
    ChangeNumber result = null;
    synchronized (msgQueue)
    {
      if (isFollowing())
      {
        if (msgQueue.isEmpty())
        {
          result = null;
        } else
        {
          UpdateMsg msg = msgQueue.first();
          result = msg.getChangeNumber();
        }
      } else
      {
        if (lateQueue.isEmpty())
        {
          // isFollowing is false AND lateQueue is empty
          // We may be at the very moment when the writer has emptyed the
          // lateQueue when it sent the last update. The writer will fill again
          // the lateQueue when it will send the next update but we are not yet
          // there. So let's take the last change not sent directly from
          // the db.
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          try
          {
            // Build a list of candidates iterator (i.e. db i.e. server)
            for (short serverId : replicationServerDomain.getServers())
            {
              // get the last already sent CN from that server
              ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
              // get an iterator in this server db from that last change
              ReplicationIterator iterator =
                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
              // if that iterator has changes, then it is a candidate
              // it is added in the sorted list at a position given by its
              // current change (see ReplicationIteratorComparator).
              if ((iterator != null) && (iterator.getChange() != null))
              {
                iteratorSortedSet.add(iterator);
              }
            }
            UpdateMsg msg = iteratorSortedSet.first().getChange();
            result = msg.getChangeNumber();
          } catch (Exception e)
          {
            result = null;
          } finally
          {
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
          }
        } else
        {
          UpdateMsg msg = lateQueue.first();
          result = msg.getChangeNumber();
        }
      }
    }
    return result;
    return assuredSdReceivedUpdatesTimeout;
  }
  /**
   * Check if the LDAP server can follow the speed of the other servers.
   * @return true when the server has all the not yet sent changes
   *         in its queue.
   * Get the number of updates sent to the server in assured safe data mode.
   * @return The number of updates sent to the server in assured safe data mode
   */
  public boolean isFollowing()
  public int getAssuredSdSentUpdates()
  {
    return following;
    return assuredSdSentUpdates;
  }
  /**
   * Set the following flag of this server.
   * @param following the value that should be set.
   * Get the number of updates sent to the server in assured safe data mode that
   * timed out.
   * @return The number of updates sent to the server in assured safe data mode
   * that timed out.
   */
  public void setFollowing(boolean following)
  public AtomicInteger getAssuredSdSentUpdatesTimeout()
  {
    this.following = following;
    return assuredSdSentUpdatesTimeout;
  }
  /**
   * Add an update to the list of updates that must be sent to the server
   * managed by this ServerHandler.
   * Get the number of updates received from the server in assured safe read
   * mode.
   * @return The number of updates received from the server in assured safe read
   * mode
   */
  public int getAssuredSrReceivedUpdates()
  {
    return assuredSrReceivedUpdates;
  }
  /**
   * Get the number of updates received from the server in assured safe read
   * mode that timed out.
   * @return The number of updates received from the server in assured safe read
   * mode that timed out.
   */
  public AtomicInteger getAssuredSrReceivedUpdatesTimeout()
  {
    return assuredSrReceivedUpdatesTimeout;
  }
  /**
   * Get the number of updates sent to the server in assured safe read mode.
   * @return The number of updates sent to the server in assured safe read mode
   */
  public int getAssuredSrSentUpdates()
  {
    return assuredSrSentUpdates;
  }
  /**
   * Get the number of updates sent to the server in assured safe read mode that
   * timed out.
   * @return The number of updates sent to the server in assured safe read mode
   * that timed out.
   */
  public AtomicInteger getAssuredSrSentUpdatesTimeout()
  {
    return assuredSrSentUpdatesTimeout;
  }
  /**
   * Returns the Replication Server Domain to which belongs this server handler.
   *
   * @param update The update that must be added to the list of updates.
   * @param sourceHandler The server that sent the update.
   * @return The replication server domain.
   */
  public void add(UpdateMsg update, ServerHandler sourceHandler)
  public ReplicationServerDomain getDomain()
  {
    synchronized (msgQueue)
    {
      /*
       * If queue was empty the writer thread was probably asleep
       * waiting for some changes, wake it up
       */
      if (msgQueue.isEmpty())
        msgQueue.notify();
      msgQueue.add(update);
      /* TODO : size should be configurable
       * and larger than max-receive-queue-size
       */
      while ((msgQueue.count() > maxQueueSize) ||
          (msgQueue.bytesCount() > maxQueueBytesSize))
      {
        setFollowing(false);
        msgQueue.removeFirst();
      }
    }
    if (isSaturated(update.getChangeNumber(), sourceHandler))
    {
      sourceHandler.setSaturated(true);
    }
  }
  private void setSaturated(boolean value)
  {
    flowControl = value;
    return this.replicationServerDomain;
  }
  /**
   * Select the next update that must be sent to the server managed by this
   * ServerHandler.
   *
   * @return the next update that must be sent to the server managed by this
   *         ServerHandler.
   * Returns the value of generationId for that handler.
   * @return The value of the generationId.
   */
  public UpdateMsg take()
  public long getGenerationId()
  {
    boolean interrupted = true;
    UpdateMsg msg = getnextMessage();
    /*
     * When we remove a message from the queue we need to check if another
     * server is waiting in flow control because this queue was too long.
     * This check might cause a performance penalty an therefore it
     * is not done for every message removed but only every few messages.
     */
    if (++saturationCount > 10)
    {
      saturationCount = 0;
      try
      {
        replicationServerDomain.checkAllSaturation();
      } catch (IOException e)
      {
      }
    }
    boolean acquired = false;
    do
    {
      try
      {
        acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS);
        interrupted = false;
      } catch (InterruptedException e)
      {
        // loop until not interrupted
      }
    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
    if (msg != null)
    {
      incrementOutCount();
      if (msg.isAssured())
      {
        if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
        {
          incrementAssuredSrSentUpdates();
        } else
        {
          if (!isLDAPserver())
            incrementAssuredSdSentUpdates();
        }
      }
    }
    return msg;
    return generationId;
  }
  /**
   * Get the next update that must be sent to the server
   * from the message queue or from the database.
   *
   * @return The next update that must be sent to the server.
   * Gets the group id of the server represented by this object.
   * @return The group id of the server represented by this object.
   */
  private UpdateMsg getnextMessage()
  public byte getGroupId()
  {
    UpdateMsg msg;
    while (activeWriter == true)
    {
      if (following == false)
      {
        /* this server is late with regard to some other masters
         * in the topology or just joined the topology.
         * In such cases, we can't keep all changes in the queue
         * without saturating the memory, we therefore use
         * a lateQueue that is filled with a few changes from the changelogDB
         * If this server is able to close the gap, it will start using again
         * the regular msgQueue later.
         */
        if (lateQueue.isEmpty())
        {
          /*
           * Start from the server State
           * Loop until the queue high mark or until no more changes
           *   for each known LDAP master
           *      get the next CSN after this last one :
           *         - try to get next from the file
           *         - if not found in the file
           *             - try to get the next from the queue
           *   select the smallest of changes
           *   check if it is in the memory tree
           *     yes : lock memory tree.
           *           check all changes from the list, remove the ones that
           *           are already sent
           *           unlock memory tree
           *           restart as usual
           *   load this change on the delayList
           *
           */
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          /* fill the lateQueue */
          for (short serverId : replicationServerDomain.getServers())
          {
            ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
            ReplicationIterator iterator =
              replicationServerDomain.getChangelogIterator(serverId, lastCsn);
            if (iterator != null)
            {
              if (iterator.getChange() != null)
              {
                iteratorSortedSet.add(iterator);
              } else
              {
                iterator.releaseCursor();
              }
            }
          }
          // The loop below relies on the fact that it is sorted based
          // on the currentChange of each iterator to consider the next
          // change across all servers.
          // Hence it is necessary to remove and eventual add again an iterator
          // when looping in order to keep consistent the order of the
          // iterators (see ReplicationIteratorComparator.
          while (!iteratorSortedSet.isEmpty() &&
                 (lateQueue.count()<100) &&
                 (lateQueue.bytesCount()<50000) )
          {
            ReplicationIterator iterator = iteratorSortedSet.first();
            iteratorSortedSet.remove(iterator);
            lateQueue.add(iterator.getChange());
            if (iterator.next())
              iteratorSortedSet.add(iterator);
            else
              iterator.releaseCursor();
          }
          for (ReplicationIterator iterator : iteratorSortedSet)
          {
            iterator.releaseCursor();
          }
          /*
           * Check if the first change in the lateQueue is also on the regular
           * queue
           */
          if (lateQueue.isEmpty())
          {
            synchronized (msgQueue)
            {
              if ((msgQueue.count() < maxQueueSize) &&
                  (msgQueue.bytesCount() < maxQueueBytesSize))
              {
                setFollowing(true);
              }
            }
          } else
          {
            msg = lateQueue.first();
            synchronized (msgQueue)
            {
              if (msgQueue.contains(msg))
              {
                /* we finally catch up with the regular queue */
                setFollowing(true);
                lateQueue.clear();
                UpdateMsg msg1;
                do
                {
                  msg1 = msgQueue.removeFirst();
                } while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
                this.updateServerState(msg);
                return msg;
              }
            }
          }
        } else
        {
          /* get the next change from the lateQueue */
          msg = lateQueue.removeFirst();
          this.updateServerState(msg);
          return msg;
        }
      }
      synchronized (msgQueue)
      {
        if (following == true)
        {
          try
          {
            while (msgQueue.isEmpty() && (following == true))
            {
              msgQueue.wait(500);
              if (!activeWriter)
                return null;
            }
          } catch (InterruptedException e)
          {
            return null;
          }
          if (following == true)
          {
            msg = msgQueue.removeFirst();
            if (this.updateServerState(msg))
            {
              /*
               * Only push the message if it has not yet been seen
               * by the other server.
               * Otherwise just loop to select the next message.
               */
              return msg;
            }
          }
        }
      }
    /*
     * Need to loop because following flag may have gone to false between
     * the first check at the beginning of this method
     * and the second check just above.
     */
    }
    return null;
    return groupId;
  }
  /**
   * Update the serverState with the last message sent.
   *
   * @param msg the last update sent.
   * @return boolean indicating if the update was meaningful.
   * Get our heartbeat interval.
   * @return Our heartbeat interval.
   */
  public boolean updateServerState(UpdateMsg msg)
  public long getHeartbeatInterval()
  {
    return serverState.update(msg.getChangeNumber());
    return heartbeatInterval;
  }
  /**
   * Get the state of this server.
   *
   * @return ServerState the state for this server..
   * Get the count of updates received from the server.
   * @return the count of update received from the server.
   */
  public ServerState getServerState()
  public int getInCount()
  {
    return serverState;
  }
  /**
   * Sends an ack message to the server represented by this object.
   *
   * @param ack The ack message to be sent.
   * @throws IOException In case of Exception thrown sending the ack.
   */
  public void sendAck(AckMsg ack) throws IOException
  {
    session.publish(ack);
  }
  /**
   * Check type of server handled.
   *
   * @return true if the handled server is an LDAP server.
   *         false if the handled server is a replicationServer
   */
  public boolean isLDAPserver()
  {
    return serverIsLDAPserver;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
    throws ConfigException, InitializationException
  {
    // Nothing to do for now
  }
  /**
   * Retrieves the name of this monitor provider.  It should be unique among all
   * monitor providers, including all instances of the same monitor provider.
   *
   * @return  The name of this monitor provider.
   */
  @Override
  public String getMonitorInstanceName()
  {
    String str = serverURL + " " + String.valueOf(serverId);
    if (serverIsLDAPserver)
      return "Connected Replica " + str +
                ",cn=" + replicationServerDomain.getMonitorInstanceName();
    else
      return "Connected Replication Server " + str +
                ",cn=" + replicationServerDomain.getMonitorInstanceName();
  }
  /**
   * Retrieves the length of time in milliseconds that should elapse between
   * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
   * return value indicates that the <CODE>updateMonitorData()</CODE> method
   * should not be periodically invoked.
   *
   * @return  The length of time in milliseconds that should elapse between
   *          calls to the <CODE>updateMonitorData()</CODE> method.
   */
  @Override
  public long getUpdateInterval()
  {
    /* we don't wont to do polling on this monitor */
    return 0;
  }
  /**
   * Performs any processing periodic processing that may be desired to update
   * the information associated with this monitor.  Note that best-effort
   * attempts will be made to ensure that calls to this method come
   * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
   * be made.
   */
  @Override
  public void updateMonitorData()
  {
    // As long as getUpdateInterval() returns 0, this will never get called
    return inCount;
  }
  /**
@@ -2021,101 +598,11 @@
  @Override
  public ArrayList<Attribute> getMonitorData()
  {
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    if (serverIsLDAPserver)
    {
      attributes.add(Attributes.create("replica", serverURL));
      attributes.add(Attributes.create("connected-to",
          this.replicationServerDomain.getReplicationServer()
              .getMonitorInstanceName()));
    // Get the generic ones
    ArrayList<Attribute> attributes = super.getMonitorData();
    }
    else
    {
      attributes.add(Attributes.create("Replication-Server",
          serverURL));
    }
    attributes.add(Attributes.create("server-id", String
        .valueOf(serverId)));
    attributes.add(Attributes.create("domain-name", baseDn.toString()));
    try
    {
      MonitorData md;
      md = replicationServerDomain.computeMonitorData();
      if (serverIsLDAPserver)
      {
        // Oldest missing update
        Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
        if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
        {
          Date date = new Date(approxFirstMissingDate);
          attributes.add(Attributes.create(
              "approx-older-change-not-synchronized", date.toString()));
          attributes.add(Attributes.create(
              "approx-older-change-not-synchronized-millis", String
              .valueOf(approxFirstMissingDate)));
        }
        // Missing changes
        long missingChanges = md.getMissingChanges(serverId);
        attributes.add(Attributes.create("missing-changes", String
            .valueOf(missingChanges)));
        // Replication delay
        long delay = md.getApproxDelay(serverId);
        attributes.add(Attributes.create("approximate-delay", String
            .valueOf(delay)));
        /* get the Server State */
        AttributeBuilder builder = new AttributeBuilder("server-state");
        ServerState state = md.getLDAPServerState(serverId);
        if (state != null)
        {
          for (String str : state.toStringSet())
          {
            builder.add(str);
          }
          attributes.add(builder.toAttribute());
        }
      }
      else
      {
        // Missing changes
        long missingChanges = md.getMissingChangesRS(serverId);
        attributes.add(Attributes.create("missing-changes", String
            .valueOf(missingChanges)));
        /* get the Server State */
        AttributeBuilder builder = new AttributeBuilder("server-state");
        ServerState state = md.getRSStates(serverId);
        if (state != null)
        {
          for (String str : state.toStringSet())
          {
            builder.add(str);
          }
          attributes.add(builder.toAttribute());
        }
      }
    }
    catch (Exception e)
    {
      Message message =
        ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e));
      // We failed retrieving the monitor data.
      attributes.add(Attributes.create("error", message.toString()));
    }
    attributes.add(
        Attributes.create("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(
        Attributes.create(
            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(
        Attributes.create(
            "following", String.valueOf(following)));
    attributes.add(Attributes.create("server-id", String.valueOf(serverId)));
    attributes.add(Attributes.create("domain-name", getServiceId().toString()));
    // Deprecated
    attributes.add(Attributes.create("max-waiting-changes", String
@@ -2129,23 +616,23 @@
    attributes.add(Attributes.create("assured-sr-received-updates", String
        .valueOf(getAssuredSrReceivedUpdates())));
    attributes.add(Attributes.create("assured-sr-received-updates-timeout",
      String .valueOf(getAssuredSrReceivedUpdatesTimeout())));
        String .valueOf(getAssuredSrReceivedUpdatesTimeout())));
    attributes.add(Attributes.create("assured-sr-sent-updates", String
        .valueOf(getAssuredSrSentUpdates())));
    attributes.add(Attributes.create("assured-sr-sent-updates-timeout", String
        .valueOf(getAssuredSrSentUpdatesTimeout())));
    attributes.add(Attributes.create("assured-sd-received-updates", String
        .valueOf(getAssuredSdReceivedUpdates())));
    if (!isLDAPserver())
    if (!isDataServer())
    {
      attributes.add(Attributes.create("assured-sd-sent-updates",
        String.valueOf(getAssuredSdSentUpdates())));
          String.valueOf(getAssuredSdSentUpdates())));
      attributes.add(Attributes.create("assured-sd-sent-updates-timeout",
        String.valueOf(getAssuredSdSentUpdatesTimeout())));
          String.valueOf(getAssuredSdSentUpdatesTimeout())));
    } else
    {
      attributes.add(Attributes.create("assured-sd-received-updates-timeout",
        String.valueOf(getAssuredSdReceivedUpdatesTimeout())));
          String.valueOf(getAssuredSdReceivedUpdatesTimeout())));
    }
    // Window stats
@@ -2170,44 +657,484 @@
  }
  /**
   * Retrieves the name of this monitor provider.  It should be unique among all
   * monitor providers, including all instances of the same monitor provider.
   *
   * @return  The name of this monitor provider.
   */
  @Override
  public abstract String getMonitorInstanceName();
  /**
   * Get the older update time for that server.
   * @return The older update time.
   */
  public long getOlderUpdateTime()
  {
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return 0;
    return olderUpdateCN.getTime();
  }
  /**
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */
  public int getOutCount()
  {
    return outCount;
  }
  /**
   * Gets the protocol version used with this remote server.
   * @return The protocol version used with this remote server.
   */
  public short getProtocolVersion()
  {
    return protocolVersion;
  }
  /**
   * get the Server Id.
   *
   * @return the ID of the server to which this object is linked
   */
  public short getServerId()
  {
    return serverId;
  }
  /**
   * Retrieves the URL for this server handler.
   *
   * @return  The URL for this server handler, in the form of an address and
   *          port separated by a colon.
   */
  public String getServerURL()
  {
    return serverURL;
  }
  /**
   * Return the ServerStatus.
   * @return The server status.
   */
  protected abstract ServerStatus getStatus();
  /**
   * Retrieves the length of time in milliseconds that should elapse between
   * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
   * return value indicates that the <CODE>updateMonitorData()</CODE> method
   * should not be periodically invoked.
   *
   * @return  The length of time in milliseconds that should elapse between
   *          calls to the <CODE>updateMonitorData()</CODE> method.
   */
  @Override
  public long getUpdateInterval()
  {
    /* we don't wont to do polling on this monitor */
    return 0;
  }
  /**
   * Increment the number of updates received from the server in assured safe
   * data mode.
   */
  public void incrementAssuredSdReceivedUpdates()
  {
    assuredSdReceivedUpdates++;
  }
  /**
   * Increment the number of updates received from the server in assured safe
   * data mode that timed out.
   */
  public void incrementAssuredSdReceivedUpdatesTimeout()
  {
    assuredSdReceivedUpdatesTimeout.incrementAndGet();
  }
  /**
   * Increment the number of updates sent to the server in assured safe data
   * mode.
   */
  public void incrementAssuredSdSentUpdates()
  {
    assuredSdSentUpdates++;
  }
  /**
   * Increment the number of updates sent to the server in assured safe data
   * mode that timed out.
   */
  public void incrementAssuredSdSentUpdatesTimeout()
  {
    assuredSdSentUpdatesTimeout.incrementAndGet();
  }
  /**
   * Increment the number of updates received from the server in assured safe
   * read mode.
   */
  public void incrementAssuredSrReceivedUpdates()
  {
    assuredSrReceivedUpdates++;
  }
  /**
   * Increment the number of updates received from the server in assured safe
   * read mode that timed out.
   */
  public void incrementAssuredSrReceivedUpdatesTimeout()
  {
    assuredSrReceivedUpdatesTimeout.incrementAndGet();
  }
  /**
   * Increment the number of updates sent to the server in assured safe read
   * mode.
   */
  public void incrementAssuredSrSentUpdates()
  {
    assuredSrSentUpdates++;
  }
  /**
   * Increment the number of updates sent to the server in assured safe read
   * mode that timed out.
   */
  public void incrementAssuredSrSentUpdatesTimeout()
  {
    assuredSrSentUpdatesTimeout.incrementAndGet();
  }
  /**
   * Increase the counter of update received from the server.
   */
  public void incrementInCount()
  {
    inCount++;
  }
  /**
   * Increase the counter of updates sent to the server.
   */
  public void incrementOutCount()
  {
    outCount++;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
  throws ConfigException, InitializationException
  {
    // Nothing to do for now
  }
  /**
   * Check if the server associated to this ServerHandler is a data server
   * in the topology.
   * @return true if the server is a data server.
   */
  public abstract boolean isDataServer();
  /**
   * Check if the server associated to this ServerHandler is a replication
   * server.
   * @return true if the server is a replication server.
   */
  public boolean isReplicationServer()
  {
    return (!this.isDataServer());
  }
  /**
   * Lock the domain potentially with a timeout.
   * @param timedout The provided timeout.
   * @throws DirectoryException When an exception occurs.
   */
  protected void lockDomain(boolean timedout)
  throws DirectoryException
  {
    // The handshake phase must be done by blocking any access to structures
    // keeping info on connected servers, so that one can safely check for
    // pre-existence of a server, send a coherent snapshot of known topology
    // to peers, update the local view of the topology...
    //
    // For instance a kind of problem could be that while we connect with a
    // peer RS, a DS is connecting at the same time and we could publish the
    // connected DSs to the peer RS forgetting this last DS in the TopologyMsg.
    //
    // This method and every others that need to read/make changes to the
    // structures holding topology for the domain should:
    // - call ReplicationServerDomain.lock()
    // - read/modify structures
    // - call ReplicationServerDomain.release()
    //
    // More information is provided in comment of ReplicationServerDomain.lock()
    // If domain already exists, lock it until handshake is finished otherwise
    // it will be created and locked later in the method
    try
    {
      if (!timedout)
      {
        // !timedout
        if (!replicationServerDomain.hasLock())
          replicationServerDomain.lock();
      }
      else
      {
        // timedout
        /**
         * Take the lock on the domain.
         * WARNING: Here we try to acquire the lock with a timeout. This
         * is for preventing a deadlock that may happen if there are cross
         * connection attempts (for same domain) from this replication
         * server and from a peer one:
         * Here is the scenario:
         * - RS1 connect thread takes the domain lock and starts
         * connection to RS2
         * - at the same time RS2 connect thread takes his domain lock and
         * start connection to RS2
         * - RS2 listen thread starts processing received
         * ReplServerStartMsg from RS1 and wants to acquire the lock on
         * the domain (here) but cannot as RS2 connect thread already has
         * it
         * - RS1 listen thread starts processing received
         * ReplServerStartMsg from RS2 and wants to acquire the lock on
         * the domain (here) but cannot as RS1 connect thread already has
         * it
         * => Deadlock: 4 threads are locked.
         * So to prevent that in such situation, the listen threads here
         * will both timeout trying to acquire the lock. The random time
         * for the timeout should allow on connection attempt to be
         * aborted whereas the other one should have time to finish in the
         * same time.
         * Warning: the minimum time (3s) should be big enough to allow
         * normal situation connections to terminate. The added random
         * time should represent a big enough range so that the chance to
         * have one listen thread timing out a lot before the peer one is
         * great. When the first listen thread times out, the remote
         * connect thread should release the lock and allow the peer
         * listen thread to take the lock it was waiting for and process
         * the connection attempt.
         */
        Random random = new Random();
        int randomTime = random.nextInt(6); // Random from 0 to 5
        // Wait at least 3 seconds + (0 to 5 seconds)
        long timeout = (long) (3000 + ( randomTime * 1000 ) );
        boolean noTimeout = replicationServerDomain.tryLock(timeout);
        if (!noTimeout)
        {
          // Timeout
          Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
              getServiceId(),
              Short.toString(serverId),
              Short.toString(replicationServerId));
          throw new DirectoryException(ResultCode.OTHER, message);
        }
      }
    }
    catch (InterruptedException e)
    {
      // Thread interrupted
      Message message = ERR_EXCEPTION_LOCKING_RS_DOMAIN.get(e.getMessage());
      logError(message);
    }
  }
  /**
   * Processes a routable message.
   *
   * @param msg The message to be processed.
   */
  public void process(RoutableMsg msg)
  {
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + this +
          " processes received msg:\n" + msg);
    replicationServerDomain.process(msg, this);
  }
  /**
   * Process the reception of a WindowProbeMsg message.
   *
   * @param  windowProbeMsg The message to process.
   *
   * @throws IOException    When the session becomes unavailable.
   */
  public void process(WindowProbeMsg windowProbeMsg) throws IOException
  {
    if (rcvWindow > 0)
    {
      // The LDAP server believes that its window is closed
      // while it is not, this means that some problem happened in the
      // window exchange procedure !
      // lets update the LDAP server with out current window size and hope
      // that everything will work better in the futur.
      // TODO also log an error message.
      WindowMsg msg = new WindowMsg(rcvWindow);
      session.publish(msg);
    } else
    {
      // Both the LDAP server and the replication server believes that the
      // window is closed. Lets check the flowcontrol in case we
      // can now resume operations and send a windowMessage if necessary.
      checkWindow();
    }
  }
  /**
   * Send an InitializeRequestMessage to the server connected through this
   * handler.
   *
   * @param msg The message to be processed
   * @throws IOException when raised by the underlying session
   */
  public void send(RoutableMsg msg) throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + this +
          " publishes message:\n" + msg);
    session.publish(msg);
  }
  /**
   * Sends an ack message to the server represented by this object.
   *
   * @param ack The ack message to be sent.
   * @throws IOException In case of Exception thrown sending the ack.
   */
  public void sendAck(AckMsg ack) throws IOException
  {
    session.publish(ack);
  }
  /**
   * Send an ErrorMsg to the peer.
   *
   * @param errorMsg The message to be sent
   * @throws IOException when raised by the underlying session
   */
  public void sendError(ErrorMsg errorMsg) throws IOException
  {
    session.publish(errorMsg);
  }
  /**
   * Send the ReplServerStartMsg to the remote server (RS or DS).
   * @param requestedProtocolVersion The provided protocol version.
   * @return The ReplServerStartMsg sent.
   * @throws IOException When an exception occurs.
   */
  public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  {
    this.localGenerationId = replicationServerDomain.getGenerationId();
    ReplServerStartMsg outReplServerStartMsg
    = new ReplServerStartMsg(
        replicationServerId,
        replicationServerURL,
        getServiceId(),
        maxRcvWindow,
        replicationServerDomain.getDbServerState(),
        protocolVersion,
        localGenerationId,
        sslEncryption,
        getLocalGroupId(),
        replicationServerDomain.
        getReplicationServer().getDegradedStatusThreshold());
    if (requestedProtocolVersion>0)
      session.publish(outReplServerStartMsg, requestedProtocolVersion);
    else
      session.publish(outReplServerStartMsg);
    return outReplServerStartMsg;
  }
  /**
   * Sends the provided TopologyMsg to the peer server.
   *
   * @param topoMsg The TopologyMsg message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
  public void sendTopoInfo(TopologyMsg topoMsg)
  throws IOException
  {
    // V1 Rs do not support the TopologyMsg
    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      session.publish(topoMsg);
    }
  }
  /**
   * Set a new generation ID.
   *
   * @param generationId The new generation ID
   *
   */
  public void setGenerationId(long generationId)
  {
    this.generationId = generationId;
  }
  /**
   * Sets the replication server domain associated.
   * @param rsd The provided replication server domain.
   */
  protected void setReplicationServerDomain(ReplicationServerDomain rsd)
  {
    this.replicationServerDomain = rsd;
  }
  /**
   * Sets the window size when used when sending to the remote.
   * @param size The provided window size.
   */
  protected void setSendWindowSize(int size)
  {
    this.sendWindowSize = size;
  }
  /**
   * Requests to shutdown the writer.
   */
  protected void shutdownWriter()
  {
    shutdownWriter = true;
  }
  /**
   * Shutdown This ServerHandler.
   */
  public void shutdown()
  {
    /*
     * Shutdown ServerWriter
     */
    shutdownWriter = true;
    activeWriter = false;
    synchronized (msgQueue)
    {
      /* wake up the writer thread on an empty queue so that it disappear */
      msgQueue.clear();
      msgQueue.notify();
      msgQueue.notifyAll();
    }
    shutdownWriter();
    setConsumerActive(false);
    super.shutdown();
    /*
     * Close session to end ServerReader or ServerWriter
     */
    try
    if (session != null)
    {
      session.close();
    } catch (IOException e)
    {
      // ignore.
    }
    /*
     * Stop the remote LSHandler
     */
    synchronized (directoryServers)
    {
      for (LightweightServerHandler lsh : directoryServers.values())
      // Close session to end ServerReader or ServerWriter
      try
      {
        lsh.stopHandler();
        session.close();
      } catch (IOException e)
      {
        // ignore.
      }
      directoryServers.clear();
    }
    /*
@@ -2240,68 +1167,93 @@
    {
      // don't try anymore to join and return.
    }
    if (debugEnabled())
      TRACER.debugInfo("SH.shutdowned(" + this + ")");
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    String localString;
    if (serverId != 0)
    {
      if (serverIsLDAPserver)
        localString = "Directory Server ";
      else
        localString = "Replication Server ";
      localString += serverId + " " + serverURL + " " + baseDn;
    } else
      localString = "Unknown server";
    return localString;
  }
  /**
   * Decrement the protocol window, then check if it is necessary
   * to send a WindowMsg and send it.
   * Select the next update that must be sent to the server managed by this
   * ServerHandler.
   *
   * @throws IOException when the session becomes unavailable.
   * @return the next update that must be sent to the server managed by this
   *         ServerHandler.
   */
  public synchronized void decAndCheckWindow() throws IOException
  public UpdateMsg take()
  {
    rcvWindow--;
    checkWindow();
  }
    boolean interrupted = true;
    UpdateMsg msg = getnextMessage(true); // synchronous:block until msg
  /**
   * Check the protocol window and send WindowMsg if necessary.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void checkWindow() throws IOException
  {
    if (rcvWindow < rcvWindowSizeHalf)
    /*
     * When we remove a message from the queue we need to check if another
     * server is waiting in flow control because this queue was too long.
     * This check might cause a performance penalty an therefore it
     * is not done for every message removed but only every few messages.
     */
    if (++saturationCount > 10)
    {
      if (flowControl)
      saturationCount = 0;
      try
      {
        if (replicationServerDomain.restartAfterSaturation(this))
        {
          flowControl = false;
        }
      }
      if (!flowControl)
        replicationServerDomain.checkAllSaturation();
      } catch (IOException e)
      {
        WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
        session.publish(msg);
        rcvWindow += rcvWindowSizeHalf;
      }
    }
    boolean acquired = false;
    do
    {
      try
      {
        acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS);
        interrupted = false;
      } catch (InterruptedException e)
      {
        // loop until not interrupted
      }
    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
    if (msg != null)
    {
      incrementOutCount();
      if (msg.isAssured())
      {
        if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
        {
          incrementAssuredSrSentUpdates();
        } else
        {
          if (!isDataServer())
            incrementAssuredSdSentUpdates();
        }
      }
    }
    return msg;
  }
  /**
   * Creates a RSInfo structure representing this remote RS.
   * @return The RSInfo structure representing this remote RS
   */
  public RSInfo toRSInfo()
  {
    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
    return rsInfo;
  }
  /**
   * Performs any processing periodic processing that may be desired to update
   * the information associated with this monitor.  Note that best-effort
   * attempts will be made to ensure that calls to this method come
   * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
   * be made.
   */
  @Override
  public void updateMonitorData()
  {
    // As long as getUpdateInterval() returns 0, this will never get called
  }
  /**
   * Update the send window size based on the credit specified in the
   * given window message.
   *
@@ -2314,510 +1266,122 @@
  }
  /**
   * Get our heartbeat interval.
   * @return Our heartbeat interval.
   * Log the messages involved in the start handshake.
   * @param inStartMsg The message received first.
   * @param outStartMsg The message sent in response.
   */
  public long getHeartbeatInterval()
  {
    return heartbeatInterval;
  }
  /**
   * Processes a routable message.
   *
   * @param msg The message to be processed.
   */
  public void process(RoutableMsg msg)
  protected void logStartHandshakeRCVandSND(
      StartMsg inStartMsg,
      StartMsg outStartMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        " SH for remote server " + this.getMonitorInstanceName() + ":" +
        "\nprocesses received msg:\n" + msg);
    replicationServerDomain.process(msg, this);
  }
  /**
   * Sends the provided TopologyMsg to the peer server.
   *
   * @param topoMsg The TopologyMsg message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
  public void sendTopoInfo(TopologyMsg topoMsg)
    throws IOException
  {
    // V1 Rs do not support the TopologyMsg
    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      if (debugEnabled())
        TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " SH for remote server " + this.getMonitorInstanceName() + ":" +
          "\nsends message:\n" + topoMsg);
      session.publish(topoMsg);
    }
  }
  /**
   * Stores topology information received from a peer RS and that must be kept
   * in RS handler.
   *
   * @param topoMsg The received topology message
   */
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
  {
    // Store info for remote RS
    List<RSInfo> rsInfos = topoMsg.getRsList();
    // List should only contain RS info for sender
    RSInfo rsInfo = rsInfos.get(0);
    generationId = rsInfo.getGenerationId();
    groupId = rsInfo.getGroupId();
    /**
     * Store info for DSs connected to the peer RS
     */
    List<DSInfo> dsInfos = topoMsg.getDsList();
    synchronized (directoryServers)
    {
      // Removes the existing structures
      for (LightweightServerHandler lsh : directoryServers.values())
      {
        lsh.stopHandler();
      }
      directoryServers.clear();
      // Creates the new structure according to the message received.
      for (DSInfo dsInfo : dsInfos)
      {
        LightweightServerHandler lsh = new LightweightServerHandler(this,
            serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
            dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
            dsInfo.isAssured(), dsInfo.getAssuredMode(),
            dsInfo.getSafeDataLevel());
        lsh.startHandler();
        directoryServers.put(lsh.getServerId(), lsh);
      }
    }
  }
  /**
   * Process message of a remote server changing his status.
   * @param csMsg The message containing the new status
   * @return The new server status of the DS
   */
  public ServerStatus processNewStatus(ChangeStatusMsg csMsg)
  {
    // Sanity check
    if (!serverIsLDAPserver)
    {
      Message msg =
        ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(baseDn.toString(),
        Short.toString(serverId), csMsg.toString());
      logError(msg);
      return ServerStatus.INVALID_STATUS;
    }
    // Get the status the DS just entered
    ServerStatus reqStatus = csMsg.getNewStatus();
    // Translate new status to a state machine event
    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
    if (event == StatusMachineEvent.INVALID_EVENT)
    {
      Message msg = ERR_RS_INVALID_NEW_STATUS.get(reqStatus.toString(),
        baseDn.toString(), Short.toString(serverId));
      logError(msg);
      return ServerStatus.INVALID_STATUS;
    }
    // Check state machine allows this new status
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
        Short.toString(serverId), status.toString(), event.toString());
      logError(msg);
      return ServerStatus.INVALID_STATUS;
    }
    status = newStatus;
    return status;
  }
  /**
   * Change the status according to the event generated from the status
   * analyzer.
   * @param event The event to be used for new status computation
   * @return The new status of the DS
   * @throws IOException When raised by the underlying session
   */
  public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
    throws IOException
  {
    // Check state machine allows this new status (Sanity check)
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
        Short.toString(serverId), status.toString(), event.toString());
      logError(msg);
      // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS
      // and vice versa. We may are being trying to change the status while for
      // instance another status has just been entered: e.g a full update has
      // just been engaged. In that case, just ignore attempt to change the
      // status
      return newStatus;
    }
    // Send message requesting to change the DS status
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
      ServerStatus.INVALID_STATUS);
    if (debugEnabled())
    {
      TRACER.debugInfo(
        "In RS " +
        replicationServerDomain.getReplicationServer().getServerId() +
        " Sending change status from status analyzer to " + getServerId() +
        " for baseDn " + baseDn + ":\n" + csMsg);
    }
    session.publish(csMsg);
    status = newStatus;
    return newStatus;
  }
  /**
   * When this handler is connected to a replication server, specifies if
   * a wanted server is connected to this replication server.
   *
   * @param wantedServer The server we want to know if it is connected
   * to the replication server represented by this handler.
   * @return boolean True is the wanted server is connected to the server
   * represented by this handler.
   */
  public boolean isRemoteLDAPServer(short wantedServer)
  {
    synchronized (directoryServers)
    {
      for (LightweightServerHandler server : directoryServers.values())
      {
        if (wantedServer == server.getServerId())
        {
          return true;
        }
      }
      return false;
    }
  }
  /**
   * When the handler is connected to a replication server, specifies the
   * replication server has remote LDAP servers connected to it.
   *
   * @return boolean True is the replication server has remote LDAP servers
   * connected to it.
   */
  public boolean hasRemoteLDAPServers()
  {
    synchronized (directoryServers)
    {
      return !directoryServers.isEmpty();
    }
  }
  /**
   * Send an InitializeRequestMessage to the server connected through this
   * handler.
   *
   * @param msg The message to be processed
   * @throws IOException when raised by the underlying session
   */
  public void send(RoutableMsg msg) throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo("In " +
        replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        " SH for remote server " + this.getMonitorInstanceName() + ":" +
        "\nsends message:\n" + msg);
    session.publish(msg);
  }
  /**
   * Send an ErrorMsg to the peer.
   *
   * @param errorMsg The message to be sent
   * @throws IOException when raised by the underlying session
   */
  public void sendError(ErrorMsg errorMsg) throws IOException
  {
    session.publish(errorMsg);
  }
  /**
   * Process the reception of a WindowProbeMsg message.
   *
   * @param  windowProbeMsg The message to process.
   *
   * @throws IOException    When the session becomes unavailable.
   */
  public void process(WindowProbeMsg windowProbeMsg) throws IOException
  {
    if (rcvWindow > 0)
    {
      // The LDAP server believes that its window is closed
      // while it is not, this means that some problem happened in the
      // window exchange procedure !
      // lets update the LDAP server with out current window size and hope
      // that everything will work better in the futur.
      // TODO also log an error message.
      WindowMsg msg = new WindowMsg(rcvWindow);
      session.publish(msg);
    } else
    {
      // Both the LDAP server and the replication server believes that the
      // window is closed. Lets check the flowcontrol in case we
      // can now resume operations and send a windowMessage if necessary.
      checkWindow();
        getMonitorInstanceName() + ", " +
        this.getClass().getSimpleName() + " " + this + ":" +
        "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
        "\nAND REPLIED:\n" + outStartMsg.toString());
    }
  }
  /**
   * Returns the value of generationId for that handler.
   * @return The value of the generationId.
   * Log the messages involved in the start handshake.
   * @param outStartMsg The message sent first.
   * @param inStartMsg The message received in response.
   */
  public long getGenerationId()
  protected void logStartHandshakeSNDandRCV(
      StartMsg outStartMsg,
      StartMsg inStartMsg)
  {
    return generationId;
  }
  /**
   * Sends a message containing a generationId to a peer server.
   * The peer is expected to be a replication server.
   *
   * @param  msg         The GenerationIdMessage message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
  public void forwardGenerationIdToRS(ResetGenerationIdMsg msg)
    throws IOException
  {
    session.publish(msg);
  }
  /**
   * Set a new generation ID.
   *
   * @param generationId The new generation ID
   *
   */
  public void setGenerationId(long generationId)
  {
    this.generationId = generationId;
  }
  /**
   * Returns the Replication Server Domain to which belongs this server handler.
   *
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain()
  {
    return this.replicationServerDomain;
  }
  /**
   * Return a Set containing the servers known by this replicationServer.
   * @return a set containing the servers known by this replicationServer.
   */
  public Set<Short> getConnectedDirectoryServerIds()
  {
    synchronized (directoryServers)
    {
      return directoryServers.keySet();
    }
  }
  /**
   * Order the peer DS server to change his status or close the connection
   * according to the requested new generation id.
   * @param newGenId The new generation id to take into account
   * @throws IOException If IO error occurred.
   */
  public void changeStatusForResetGenId(long newGenId)
    throws IOException
  {
    StatusMachineEvent event = null;
    if (newGenId == -1)
    {
      // The generation id is being made invalid, let's put the DS
      // into BAD_GEN_ID_STATUS
      event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
    } else
    {
      if (newGenId == generationId)
      {
        if (status == ServerStatus.BAD_GEN_ID_STATUS)
        {
          // This server has the good new reference generation id.
          // Close connection with him to force his reconnection: DS will
          // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
          if (debugEnabled())
          {
            TRACER.debugInfo(
              "In RS " +
              replicationServerDomain.getReplicationServer().getServerId() +
              ". Closing connection to DS " + getServerId() +
              " for baseDn " + baseDn + " to force reconnection as new local" +
              " generation id and remote one match and DS is in bad gen id: " +
              newGenId);
          }
          // Connection closure must not be done calling RSD.stopHandler() as it
          // would rewait the RSD lock that we already must have entering this
          // method. This would lead to a reentrant lock which we do not want.
          // So simply close the session, this will make the hang up appear
          // after the reader thread that took the RSD lock realeases it.
          try
          {
            if (session != null)
              session.close();
          } catch (IOException e)
          {
            // ignore
          }
          // NOT_CONNECTED_STATUS is the last one in RS session life: handler
          // will soon disappear after this method call...
          status = ServerStatus.NOT_CONNECTED_STATUS;
          return;
        } else
        {
          if (debugEnabled())
          {
            TRACER.debugInfo(
              "In RS " +
              replicationServerDomain.getReplicationServer().getServerId() +
              ". DS " + getServerId() + " for baseDn " + baseDn +
              " has already generation id " + newGenId +
              " so no ChangeStatusMsg sent to him.");
          }
          return;
        }
      } else
      {
        // This server has a bad generation id compared to new reference one,
        // let's put it into BAD_GEN_ID_STATUS
        event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
      }
    }
    if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
      (status == ServerStatus.FULL_UPDATE_STATUS))
    {
      // Prevent useless error message (full update status cannot lead to bad
      // gen status)
      Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
        Short.toString(replicationServerDomain.
        getReplicationServer().getServerId()),
        baseDn.toString(),
        Short.toString(serverId),
        Long.toString(generationId),
        Long.toString(newGenId));
      logError(message);
      return;
    }
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
        Short.toString(serverId), status.toString(), event.toString());
      logError(msg);
      return;
    }
    // Send message requesting to change the DS status
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
      ServerStatus.INVALID_STATUS);
    if (debugEnabled())
    {
      TRACER.debugInfo(
        "In RS " +
        replicationServerDomain.getReplicationServer().getServerId() +
        " Sending change status for reset gen id to " + getServerId() +
        " for baseDn " + baseDn + ":\n" + csMsg);
      TRACER.debugInfo("In " +
        replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() + ", " +
        this.getClass().getSimpleName() + " " + this + ":" +
        "\nSH START HANDSHAKE SENT("+ this +
        "):\n" + outStartMsg.toString()+
        "\nAND RECEIVED:\n" + inStartMsg.toString());
    }
    session.publish(csMsg);
    status = newStatus;
  }
  /**
   * Set the shut down flag to true and returns the previous value of the flag.
   * @return The previous value of the shut down flag
   * Log the messages involved in the Topology handshake.
   * @param inTopoMsg The message received first.
   * @param outTopoMsg The message sent in response.
   */
  public boolean engageShutdown()
  protected void logTopoHandshakeRCVandSND(
      TopologyMsg inTopoMsg,
      TopologyMsg outTopoMsg)
  {
    // Use thread safe boolean
    return shuttingDown.getAndSet(true);
  }
  /**
   * Gets the status of the connected DS.
   * @return The status of the connected DS.
   */
  public ServerStatus getStatus()
  {
    return status;
  }
  /**
   * Gets the protocol version used with this remote server.
   * @return The protocol version used with this remote server.
   */
  public short getProtocolVersion()
  {
    return protocolVersion;
  }
  /**
   * Add the DSinfos of the connected Directory Servers
   * to the List of DSInfo provided as a parameter.
   *
   * @param dsInfos The List of DSInfo that should be updated
   *                with the DSInfo for the directoryServers
   *                connected to this ServerHandler.
   */
  public void addDSInfos(List<DSInfo> dsInfos)
  {
    synchronized (directoryServers)
    if (debugEnabled())
    {
      for (LightweightServerHandler ls : directoryServers.values())
      {
        dsInfos.add(ls.toDSInfo());
      }
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + ":" +
          "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
    }
  }
  /**
   * Gets the group id of the server represented by this object.
   * @return The group id of the server represented by this object.
   * Log the messages involved in the Topology handshake.
   * @param outTopoMsg The message sent first.
   * @param inTopoMsg The message received in response.
   */
  public byte getGroupId()
  protected void logTopoHandshakeSNDandRCV(
      TopologyMsg outTopoMsg,
      TopologyMsg inTopoMsg)
  {
    return groupId;
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + ":" +
          "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
          "\nAND RECEIVED:\n" + inTopoMsg.toString());
    }
  }
  /**
   * Log the messages involved in the Topology/StartSession handshake.
   * @param inStartSessionMsg The message received first.
   * @param outTopoMsg The message sent in response.
   */
  protected void logStartSessionHandshake(
      StartSessionMsg inStartSessionMsg,
      TopologyMsg outTopoMsg)
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
    }
  }
  /**
   * Log the messages involved in the Topology/StartSession handshake.
   * @param inStartECLSessionMsg The message received first.
   */
  protected void logStartECLSessionHandshake(
      StartECLSessionMsg inStartECLSessionMsg)
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
          inStartECLSessionMsg.toString());
    }
  }
}