mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 b4f8838b15342670c31753a484abf0129e3c9653
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -30,6 +30,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
@@ -40,8 +41,8 @@
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -49,47 +50,59 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
/**
 * This class defines a server handler, which handles all interaction with a
 * replication server.
 * peer server (RS or DS).
 */
public class ServerHandler extends MonitorProvider<MonitorProviderCfg>
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Time during which the server will wait for existing thread to stop
   * during the shutdown.
   * during the shutdownWriter.
   */
  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
  /*
   * Properties, filled if remote server is either a DS or a RS
   */
  private short serverId;
  private ProtocolSession session;
  private final MsgQueue msgQueue = new MsgQueue();
  private MsgQueue lateQueue = new MsgQueue();
  private final Map<ChangeNumber, AckMessageList> waitingAcks  =
          new HashMap<ChangeNumber, AckMessageList>();
  private final Map<ChangeNumber, AckMessageList> waitingAcks =
    new HashMap<ChangeNumber, AckMessageList>();
  private ReplicationServerDomain replicationServerDomain = null;
  private String serverURL;
  private int outCount = 0; // number of update sent to the server
  private int inCount = 0;  // number of updates received from the server
  private int inAckCount = 0;
  private int outAckCount = 0;
  private int maxReceiveQueue = 0;
@@ -105,10 +118,9 @@
  private boolean serverIsLDAPserver;
  private boolean following = false;
  private ServerState serverState;
  private boolean active = true;
  private boolean activeWriter = true;
  private ServerWriter writer = null;
  private DN baseDn = null;
  private String serverAddressURL;
  private int rcvWindow;
  private int rcvWindowSizeHalf;
  private int maxRcvWindow;
@@ -116,42 +128,63 @@
  private Semaphore sendWindow;
  private int sendWindowSize;
  private boolean flowControl = false; // indicate that the server is
                                       // flow controlled and should
                                       // be stopped from sending messages.
  // flow controlled and should
  // be stopped from sending messages.
  private int saturationCount = 0;
  private short replicationServerId;
  private short protocolVersion;
  private short protocolVersion = -1;
  private long generationId = -1;
  // Group id of this remote server
  private byte groupId = (byte) -1;
  /*
   * Properties filled only if remote server is a DS
   */
  // Status of this DS
  private ServerStatus status = ServerStatus.INVALID_STATUS;
  // Referrals URLs this DS is exporting
  private List<String> refUrls = new ArrayList<String>();
  // Assured replication enabled on DS or not
  private boolean assuredFlag = false;
  // DS assured mode (relevant if assured replication enabled)
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  // DS safe data level (relevant if assured mode is safe data)
  private byte safeDataLevel = (byte) -1;
  /*
   * Properties filled only if remote server is a RS
   */
  private String serverAddressURL;
  /**
   * When this Handler is related to a remote replication server
   * this collection will contain as many elements as there are
   * LDAP servers connected to the remote replication server.
   */
  private final Map<Short, LightweightServerHandler> connectedServers =
  private final Map<Short, LightweightServerHandler> directoryServers =
    new ConcurrentHashMap<Short, LightweightServerHandler>();
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * The thread that will send heartbeats.
   */
  HeartbeatThread heartbeatThread = null;
  /**
   * Set when ServerWriter is stopping.
   */
  private boolean shutdownWriter = false;
  /**
   * Set when ServerHandler is stopping.
   */
  private boolean shutdown = false;
  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
  private static final Map<ChangeNumber, ReplServerAckMessageList>
   changelogsWaitingAcks =
       new HashMap<ChangeNumber, ReplServerAckMessageList>();
    changelogsWaitingAcks =
    new HashMap<ChangeNumber, ReplServerAckMessageList>();
  /**
   * Creates a new server handler instance with the provided socket.
@@ -167,13 +200,58 @@
    this.session = session;
    this.maxQueueSize = queueSize;
    this.maxQueueBytesSize = queueSize * 100;
    this.protocolVersion = ProtocolVersion.currentVersion();
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
  }
  /**
   * Do the exchange of start messages to know if the remote
   * server is an LDAP or replication server and to exchange serverID.
   * Then create the reader and writer thread.
   * Creates a DSInfo structure representing this remote DS.
   * @return The DSInfo structure representing this remote DS
   */
  public DSInfo toDSInfo()
  {
    DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId,
      status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls);
    return dsInfo;
  }
  /**
   * Creates a RSInfo structure representing this remote RS.
   * @return The RSInfo structure representing this remote RS
   */
  public RSInfo toRSInfo()
  {
    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
    return rsInfo;
  }
  /**
   * Do the handshake with either the DS or RS and then create the reader and
   * writer thread.
   *
   * There are 2 possible handshake sequences: DS<->RS and RS<->RS. Each one are
   * divided into 2 logical consecutive phases (phase 1 and phase 2):
   *
   * DS<->RS (DS (always initiating connection) always sends first message):
   * -------
   *
   * phase 1:
   * DS --- ServerStartMsg ---> RS
   * DS <--- ReplServerStartMsg --- RS
   * phase 2:
   * DS --- StartSessionMsg ---> RS
   * DS <--- TopologyMsg --- RS
   *
   * RS<->RS (RS initiating connection always sends first message):
   * -------
   *
   * phase 1:
   * RS1 --- ReplServerStartMsg ---> RS2
   * RS1 <--- ReplServerStartMsg --- RS2
   * phase 2:
   * RS1 --- TopologyMsg ---> RS2
   * RS1 <--- TopologyMsg --- RS2
   *
   * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
   *               null if this is an incoming connection (listen).
@@ -189,94 +267,163 @@
   *                          handler.
   */
  public void start(DN baseDn, short replicationServerId,
                    String replicationServerURL,
                    int windowSize, boolean sslEncryption,
                    ReplicationServer replicationServer)
    String replicationServerURL,
    int windowSize, boolean sslEncryption,
    ReplicationServer replicationServer)
  {
    // The handshake phase must be done by blocking any access to structures
    // keeping info on connected servers, so that one can safely check for
    // pre-existence of a server, send a coherent snapshot of known topology
    // to peers, update the local view of the topology...
    //
    // For instance a kind of problem could be that while we connect with a
    // peer RS, a DS is connecting at the same time and we could publish the
    // connected DSs to the peer RS forgetting this last DS in the TopologyMsg.
    //
    // This method and every others that need to read/make changes to the
    // structures holding topology for the domain should:
    // - call ReplicationServerDomain.lock()
    // - read/modify structures
    // - call ReplicationServerDomain.release()
    //
    // More information is provided in comment of ReplicationServerDomain.lock()
    // If domain already exists, lock it until handshake is finished otherwise
    // it will be created and locked later in the method
    if (baseDn != null)
    {
      ReplicationServerDomain rsd =
        replicationServer.getReplicationServerDomain(baseDn, false);
      if (rsd != null)
      {
        try
        {
          rsd.lock();
        } catch (InterruptedException ex)
        {
          // Thread interrupted, return.
          return;
        }
      }
    }
    long oldGenerationId = -100;
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
                " starts a new LS or RS " +
                ((baseDn == null)?"incoming connection":"outgoing connection"));
        " starts a new LS or RS " +
        ((baseDn == null) ? "incoming connection" : "outgoing connection"));
    this.replicationServerId = replicationServerId;
    rcvWindowSizeHalf = windowSize/2;
    rcvWindowSizeHalf = windowSize / 2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
    long localGenerationId = -1;
    boolean handshakeOnly = false;
    ReplServerStartMsg outReplServerStartMsg = null;
    /**
     * This boolean prevents from logging a polluting error when connection\
     * aborted from a DS that wanted only to perform handshake phase 1 in order
     * to determine the best suitable RS:
     * 1) -> ServerStartMsg
     * 2) <- ReplServerStartMsg
     * 3) connection closure
     */
    boolean log_error_message = true;
    try
    {
      if (baseDn != null)
      /*
       * PROCEDE WITH FIRST PHASE OF HANDSHAKE:
       * ServerStartMsg then ReplServerStartMsg (with a DS)
       * OR
       * ReplServerStartMsg then ReplServerStartMsg (with a RS)
       */
      if (baseDn != null) // Outgoing connection
      {
        // This is an outgoing connection. Publish our start message.
        this.baseDn = baseDn;
        // Get or create the ReplicationServerDomain
        replicationServerDomain =
                replicationServer.getReplicationServerDomain(baseDn, true);
          replicationServer.getReplicationServerDomain(baseDn, true);
        if (!replicationServerDomain.hasLock())
        {
          try
          {
            replicationServerDomain.lock();
          } catch (InterruptedException ex)
          {
            // Thread interrupted, return.
            return;
          }
        }
        localGenerationId = replicationServerDomain.getGenerationId();
        ServerState localServerState =
                replicationServerDomain.getDbServerState();
        ReplServerStartMessage msg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    baseDn, windowSize, localServerState,
                                    protocolVersion, localGenerationId,
                                    sslEncryption);
          replicationServerDomain.getDbServerState();
        outReplServerStartMsg = new ReplServerStartMsg(replicationServerId,
          replicationServerURL,
          baseDn, windowSize, localServerState,
          protocolVersion, localGenerationId,
          sslEncryption,
          replicationServer.getGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold());
        session.publish(msg);
        session.publish(outReplServerStartMsg);
      }
      // Wait and process ServerStart or ReplServerStart
      ReplicationMessage msg = session.receive();
      if (msg instanceof ServerStartMessage)
      // Wait and process ServerStartMsg or ReplServerStartMsg
      ReplicationMsg msg = session.receive();
      if (msg instanceof ServerStartMsg)
      {
        // The remote server is an LDAP Server.
        ServerStartMessage receivedMsg = (ServerStartMessage) msg;
        ServerStartMsg serverStartMsg = (ServerStartMsg) msg;
        generationId = receivedMsg.getGenerationId();
        generationId = serverStartMsg.getGenerationId();
        protocolVersion = ProtocolVersion.minWithCurrent(
            receivedMsg.getVersion());
        serverId = receivedMsg.getServerId();
        serverURL = receivedMsg.getServerURL();
        this.baseDn = receivedMsg.getBaseDn();
        this.serverState = receivedMsg.getServerState();
          serverStartMsg.getVersion());
        serverId = serverStartMsg.getServerId();
        serverURL = serverStartMsg.getServerURL();
        this.baseDn = serverStartMsg.getBaseDn();
        this.serverState = serverStartMsg.getServerState();
        this.groupId = serverStartMsg.getGroupId();
        maxReceiveDelay = receivedMsg.getMaxReceiveDelay();
        maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
        maxSendDelay = receivedMsg.getMaxSendDelay();
        maxSendQueue = receivedMsg.getMaxSendQueue();
        heartbeatInterval = receivedMsg.getHeartbeatInterval();
        handshakeOnly = receivedMsg.isHandshakeOnly();
        maxReceiveDelay = serverStartMsg.getMaxReceiveDelay();
        maxReceiveQueue = serverStartMsg.getMaxReceiveQueue();
        maxSendDelay = serverStartMsg.getMaxSendDelay();
        maxSendQueue = serverStartMsg.getMaxSendQueue();
        heartbeatInterval = serverStartMsg.getHeartbeatInterval();
        // The session initiator decides whether to use SSL.
        sslEncryption = receivedMsg.getSSLEncryption();
        sslEncryption = serverStartMsg.getSSLEncryption();
        if (maxReceiveQueue > 0)
          restartReceiveQueue = (maxReceiveQueue > 1000 ?
                                  maxReceiveQueue - 200 :
                                  maxReceiveQueue*8/10);
          restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue -
            200 : maxReceiveQueue * 8 / 10);
        else
          restartReceiveQueue = 0;
        if (maxSendQueue > 0)
          restartSendQueue = (maxSendQueue  > 1000 ? maxSendQueue - 200 :
            maxSendQueue*8/10);
          restartSendQueue =
            (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue * 8 /
            10);
        else
          restartSendQueue = 0;
        if (maxReceiveDelay > 0)
          restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay -1 :
            maxReceiveDelay);
          restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay - 1
            : maxReceiveDelay);
        else
          restartReceiveDelay = 0;
        if (maxSendDelay > 0)
          restartSendDelay = (maxSendDelay > 10 ?
                              maxSendDelay -1 :
                              maxSendDelay);
          restartSendDelay =
            (maxSendDelay > 10 ? maxSendDelay - 1 : maxSendDelay);
        else
          restartSendDelay = 0;
@@ -289,25 +436,53 @@
        // Get or Create the ReplicationServerDomain
        replicationServerDomain =
                replicationServer.getReplicationServerDomain(this.baseDn, true);
          replicationServer.getReplicationServerDomain(this.baseDn, true);
        replicationServerDomain.waitDisconnection(receivedMsg.getServerId());
        replicationServerDomain.mayResetGenerationId();
        // Hack to be sure that if a server disconnects and reconnect, we
        // let the reader thread see the closure and cleanup any reference
        // to old connection
        replicationServerDomain.waitDisconnection(serverStartMsg.getServerId());
        if (!replicationServerDomain.hasLock())
        {
          try
          {
            replicationServerDomain.lock();
          } catch (InterruptedException ex)
          {
            // Thread interrupted, return.
            return;
          }
        }
        // Duplicate server ?
        if (!replicationServerDomain.checkForDuplicateDS(this))
        {
          closeSession(null);
          if ((replicationServerDomain != null) &&
            replicationServerDomain.hasLock())
            replicationServerDomain.release();
          return;
        }
        localGenerationId = replicationServerDomain.getGenerationId();
        ServerState localServerState =
                replicationServerDomain.getDbServerState();
          replicationServerDomain.getDbServerState();
        // This an incoming connection. Publish our start message
        ReplServerStartMessage myStartMsg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    this.baseDn, windowSize, localServerState,
                                    protocolVersion, localGenerationId,
                                    sslEncryption);
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
        ReplServerStartMsg replServerStartMsg =
          new ReplServerStartMsg(replicationServerId, replicationServerURL,
          this.baseDn, windowSize, localServerState,
          protocolVersion, localGenerationId,
          sslEncryption,
          replicationServer.getGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold());
        session.publish(replServerStartMsg);
        sendWindowSize = serverStartMsg.getWindowSize();
        /* Until here session is encrypted then it depends on the negotiation */
        /* Until here session is encrypted then it depends on the
        negotiation */
        if (!sslEncryption)
        {
          session.stopEncryption();
@@ -315,309 +490,550 @@
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss =
                  replicationServerDomain.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationServerDomain.
                   getReplicationServer().getMonitorInstanceName() +
                   ", SH received START from LS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
                   " localGenerationId=" + localGenerationId +
                   " state=" + ss +
                   " and sent ReplServerStart with state=" + lss);
          TRACER.debugInfo("In " +
            replicationServerDomain.getReplicationServer().
            getMonitorInstanceName() + ":" +
            "\nSH HANDSHAKE RECEIVED:\n" + serverStartMsg.toString() +
            "\nAND REPLIED:\n" + replServerStartMsg.toString());
        }
        /*
         * If we have already a generationID set for the domain
         * then
         *   if the connecting replica has not the same
         *   then it is degraded locally and notified by an error message
         * else
         *   we set the generationID from the one received
         *   (unsaved yet on disk . will be set with the 1rst change received)
         */
        if (localGenerationId>0)
        {
          if (generationId != localGenerationId)
          {
            Message message = NOTE_BAD_GENERATION_ID.get(
                receivedMsg.getBaseDn().toNormalizedString(),
                Short.toString(receivedMsg.getServerId()),
                Long.toString(generationId),
                Long.toString(localGenerationId));
            ErrorMessage errorMsg =
              new ErrorMessage(replicationServerId, serverId, message);
            session.publish(errorMsg);
          }
        }
        else
        {
          // We are an empty Replication Server
          if ((generationId>0)&&(!serverState.isEmpty()))
          {
            // If the LDAP server has already sent changes
            // it is not expected to connect to an empty RS
            Message message = NOTE_BAD_GENERATION_ID.get(
                receivedMsg.getBaseDn().toNormalizedString(),
                Short.toString(receivedMsg.getServerId()),
                Long.toString(generationId),
                Long.toString(localGenerationId));
            ErrorMessage errorMsg =
              new ErrorMessage(replicationServerId, serverId, message);
            session.publish(errorMsg);
          }
          else
          {
            replicationServerDomain.setGenerationId(generationId, false);
          }
        }
      }
      else if (msg instanceof ReplServerStartMessage)
      } else if (msg instanceof ReplServerStartMsg)
      {
        // The remote server is a replication server
        ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
        ReplServerStartMsg inReplServerStartMsg = (ReplServerStartMsg) msg;
        protocolVersion = ProtocolVersion.minWithCurrent(
            receivedMsg.getVersion());
        generationId = receivedMsg.getGenerationId();
        serverId = receivedMsg.getServerId();
        serverURL = receivedMsg.getServerURL();
          inReplServerStartMsg.getVersion());
        generationId = inReplServerStartMsg.getGenerationId();
        serverId = inReplServerStartMsg.getServerId();
        serverURL = inReplServerStartMsg.getServerURL();
        int separator = serverURL.lastIndexOf(':');
        serverAddressURL =
          session.getRemoteAddress() + ":" + serverURL.substring(separator + 1);
          session.getRemoteAddress() + ":" + serverURL.substring(separator +
          1);
        serverIsLDAPserver = false;
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
        {
          // We support connection from a V1 RS
          // Only V2 protocol has the group id in repl server start message
          this.groupId = inReplServerStartMsg.getGroupId();
        }
        this.baseDn = inReplServerStartMsg.getBaseDn();
        if (baseDn == null) // Reply to incoming RS
        {
          // Get or create the ReplicationServerDomain
          replicationServerDomain = replicationServer.
                  getReplicationServerDomain(this.baseDn, true);
          replicationServerDomain =
            replicationServer.getReplicationServerDomain(this.baseDn, true);
          if (!replicationServerDomain.hasLock())
          {
            try
            {
              /**
               * Take the lock on the domain.
               * WARNING: Here we try to acquire the lock with a timeout. This
               * is for preventing a deadlock that may happen if there are cross
               * connection attempts (for same domain) from this replication
               * server and from a peer one:
               * Here is the scenario:
               * - RS1 connect thread takes the domain lock and starts
               * connection to RS2
               * - at the same time RS2 connect thread takes his domain lock and
               * start connection to RS2
               * - RS2 listen thread starts processing received
               * ReplServerStartMsg from RS1 and wants to acquire the lock on
               * the domain (here) but cannot as RS2 connect thread already has
               * it
               * - RS1 listen thread starts processing received
               * ReplServerStartMsg from RS2 and wants to acquire the lock on
               * the domain (here) but cannot as RS1 connect thread already has
               * it
               * => Deadlock: 4 threads are locked.
               * So to prevent that in such situation, the listen threads here
               * will both timeout trying to acquire the lock. The random time
               * for the timeout should allow on connection attempt to be
               * aborted whereas the other one should have time to finish in the
               * same time.
               * Warning: the minimum time (3s) should be big enough to allow
               * normal situation connections to terminate. The added random
               * time should represent a big enough range so that the chance to
               * have one listen thread timing out a lot before the peer one is
               * great. When the first listen thread times out, the remote
               * connect thread should release the lock and allow the peer
               * listen thread to take the lock it was waiting for and process
               * the connection attempt.
               */
              Random random = new Random();
              int randomTime = random.nextInt(6); // Random from 0 to 5
              // Wait at least 3 seconds + (0 to 5 seconds)
              long timeout = (long) (3000 + ( randomTime * 1000 ) );
              boolean noTimeout = replicationServerDomain.tryLock(timeout);
              if (!noTimeout)
              {
                // Timeout
                Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
                  this.baseDn.toNormalizedString(),
                  Short.toString(serverId),
                  Short.toString(replicationServer.getServerId()));
                closeSession(message);
                return;
              }
            } catch (InterruptedException ex)
            {
              // Thread interrupted, return.
              return;
            }
          }
          localGenerationId = replicationServerDomain.getGenerationId();
          ServerState serverState = replicationServerDomain.getDbServerState();
          ServerState domServerState =
            replicationServerDomain.getDbServerState();
          // The session initiator decides whether to use SSL.
          sslEncryption = receivedMsg.getSSLEncryption();
          sslEncryption = inReplServerStartMsg.getSSLEncryption();
          // Publish our start message
          ReplServerStartMessage outMsg =
            new ReplServerStartMessage(replicationServerId,
                                       replicationServerURL,
                                       this.baseDn, windowSize, serverState,
                                       protocolVersion,
                                       localGenerationId,
                                       sslEncryption);
          session.publish(outMsg);
        }
        else
        {
          this.baseDn = baseDn;
        }
        this.serverState = receivedMsg.getServerState();
        sendWindowSize = receivedMsg.getWindowSize();
          outReplServerStartMsg = new ReplServerStartMsg(replicationServerId,
            replicationServerURL,
            this.baseDn, windowSize, domServerState,
            protocolVersion,
            localGenerationId,
            sslEncryption,
            replicationServer.getGroupId(),
            replicationServerDomain.
            getReplicationServer().getDegradedStatusThreshold());
        /* Until here session is encrypted then it depends on the negociation */
        if (!sslEncryption)
        {
          session.stopEncryption();
        }
          if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
          {
            session.publish(outReplServerStartMsg);
          } else {
            // We support connection from a V1 RS, send PDU with V1 form
            session.publish(outReplServerStartMsg,
              ProtocolVersion.REPLICATION_PROTOCOL_V1);
          }
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss =
                  replicationServerDomain.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationServerDomain.
                   getReplicationServer().getMonitorInstanceName() +
                   ", SH received START from RS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
                   " localGenerationId=" + localGenerationId +
                   " state=" + ss +
                   " and sent ReplServerStart with state=" + lss);
        }
        // if the remote RS and the local RS have the same genID
        // then it's ok and nothing else to do
        if (generationId == localGenerationId)
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " +
                    replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS with serverID=" + serverId +
              " is connected with the right generation ID");
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + ":" +
              "\nSH HANDSHAKE RECEIVED:\n" + inReplServerStartMsg.toString() +
              "\nAND REPLIED:\n" + outReplServerStartMsg.toString());
          }
        }
        else
        } else
        {
          if (localGenerationId>0)
          // Did the remote RS answer with the DN we provided him ?
          if (!(this.baseDn.equals(baseDn)))
          {
            // if the local RS is initialized
            if (generationId>0)
            {
              // if the remote RS is initialized
              if (generationId != localGenerationId)
              {
                // if the 2 RS have different generationID
                if (replicationServerDomain.getGenerationIdSavedStatus())
                {
                  // it the present RS has received changes regarding its
                  //     gen ID and so won't change without a reset
                  // then  we are just degrading the peer.
                  Message message = NOTE_BAD_GENERATION_ID.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(receivedMsg.getServerId()),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                  ErrorMessage errorMsg =
                    new ErrorMessage(replicationServerId, serverId, message);
                  session.publish(errorMsg);
                }
                else
                {
                  // The present RS has never received changes regarding its
                  // gen ID.
                  //
                  // Example case:
                  // - we are in RS1
                  // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
                  // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
                  // - we are in RS1 and we receive a START msg from RS2
                  // - Each RS keeps its genID / is degraded and when LS2 will
                  //   be populated from LS1 everything will becomes ok.
                  //
                  // Issue:
                  // FIXME : Would it be a good idea in some cases to just
                  //         set the gen ID received from the peer RS
                  //         specially if the peer has a non nul state and
                  //         we have a nul state ?
                  // replicationServerDomain.
                  // setGenerationId(generationId, false);
                  Message message = NOTE_BAD_GENERATION_ID.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(receivedMsg.getServerId()),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                  ErrorMessage errorMsg =
                    new ErrorMessage(replicationServerId, serverId, message);
                  session.publish(errorMsg);
                }
              }
            }
            else
            {
              // The remote RS had no genId while the local one has one genID.
              // In our start msg, we have just sent our local genID to
              // the remote RS that will immediatly adopt it
              // So let's store our local genID as the genID of the remote RS.
              // It is necessary to do so, in order to not have a 'bad genID'
              // error when we will try to send updates to the remote RS
              // (before receiving the infoMsg from the remote RS !!!)
              generationId = localGenerationId;
            }
            Message message = ERR_RS_DN_DOES_NOT_MATCH.get(
              this.baseDn.toString(),
              baseDn.toString());
            closeSession(message);
            if ((replicationServerDomain != null) &&
              replicationServerDomain.hasLock())
              replicationServerDomain.release();
            return;
          }
          else
          if (debugEnabled())
          {
            // The local RS is not initialized - take the one received
            replicationServerDomain.setGenerationId(generationId, false);
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + ":" +
              "\nSH HANDSHAKE SENT:\n" + outReplServerStartMsg.toString() +
              "\nAND RECEIVED:\n" + inReplServerStartMsg.toString());
          }
        }
      }
      else
        this.serverState = inReplServerStartMsg.getServerState();
        sendWindowSize = inReplServerStartMsg.getWindowSize();
        // Duplicate server ?
        if (!replicationServerDomain.checkForDuplicateRS(this))
        {
          closeSession(null);
          if ((replicationServerDomain != null) &&
            replicationServerDomain.hasLock())
            replicationServerDomain.release();
          return;
        }
        /* Until here session is encrypted then it depends on the
        negociation */
        if (!sslEncryption)
        {
          session.stopEncryption();
        }
      } else
      {
        // TODO : log error
        return;   // we did not recognize the message, ignore it
        // We did not recognize the message, close session as what
        // can happen after is undetermined and we do not want the server to
        // be disturbed
        closeSession(null);
        if ((replicationServerDomain != null) &&
          replicationServerDomain.hasLock())
          replicationServerDomain.release();
        return;
      }
      // Get or create the ReplicationServerDomain
      replicationServerDomain = replicationServer.
              getReplicationServerDomain(this.baseDn,true);
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      { // Only protocol version above V1 has a phase 2 handshake
      if (!handshakeOnly)
      {
        boolean started;
        if (serverIsLDAPserver)
        /*
         * NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE:
         * TopologyMsg then TopologyMsg (with a RS)
         * OR
         * StartSessionMsg then TopologyMsg (with a DS)
         */
        TopologyMsg outTopoMsg = null;
        if (baseDn != null) // Outgoing connection to a RS
        {
          started = replicationServerDomain.startServer(this);
        }
        else
        {
          started = replicationServerDomain.startReplicationServer(this);
          // Send our own TopologyMsg to remote RS
          outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
          session.publish(outTopoMsg);
        }
        if (started)
        // Wait and process TopologyMsg or StartSessionMsg
        log_error_message = false;
        ReplicationMsg msg2 = session.receive();
        log_error_message = true;
        if (msg2 instanceof TopologyMsg)
        {
          // sendWindow MUST be created before starting the writer
          sendWindow = new Semaphore(sendWindowSize);
          // Remote RS sent his topo msg
          TopologyMsg inTopoMsg = (TopologyMsg) msg2;
          writer = new ServerWriter(session, serverId,
              this, replicationServerDomain);
          reader = new ServerReader(session, serverId,
              this, replicationServerDomain);
          // CONNECTION WITH A RS
          reader.start();
          writer.start();
          // Create a thread to send heartbeat messages.
          if (heartbeatInterval > 0)
          {
            heartbeatThread = new HeartbeatThread(
                "replication Heartbeat to " + serverURL +
                " for " + this.baseDn,
                session, heartbeatInterval/3);
            heartbeatThread.start();
          }
          DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
          DirectoryServer.registerMonitorProvider(this);
        }
        else
        {
          // the connection is not valid, close it.
          try
          // if the remote RS and the local RS have the same genID
          // then it's ok and nothing else to do
          if (generationId == localGenerationId)
          {
            if (debugEnabled())
            {
              TRACER.debugInfo("In " +
                  replicationServerDomain.getReplicationServer().
                  getMonitorInstanceName() + " RS failed to start locally " +
                  " the connection from serverID="+serverId);
                replicationServerDomain.getReplicationServer().
                getMonitorInstanceName() + " RS with serverID=" + serverId +
                " is connected with the right generation ID");
            }
            session.close();
          } catch (IOException e1)
          } else
          {
            // ignore
            if (localGenerationId > 0)
            {
              // if the local RS is initialized
              if (generationId > 0)
              {
                // if the remote RS is initialized
                if (generationId != localGenerationId)
                {
                  // if the 2 RS have different generationID
                  if (replicationServerDomain.getGenerationIdSavedStatus())
                  {
                    // it the present RS has received changes regarding its
                    //     gen ID and so won't change without a reset
                    // then  we are just degrading the peer.
                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(serverId),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                    logError(message);
                  } else
                  {
                    // The present RS has never received changes regarding its
                    // gen ID.
                    //
                    // Example case:
                    // - we are in RS1
                    // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
                    // - RS1 has genId1 from LS1 /genId1 comes from data in
                    //   suffix
                    // - we are in RS1 and we receive a START msg from RS2
                    // - Each RS keeps its genID / is degraded and when LS2
                    //   will be populated from LS1 everything will become ok.
                    //
                    // Issue:
                    // FIXME : Would it be a good idea in some cases to just
                    //         set the gen ID received from the peer RS
                    //         specially if the peer has a non null state and
                    //         we have a nul state ?
                    // replicationServerDomain.
                    // setGenerationId(generationId, false);
                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(serverId),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
                    logError(message);
                  }
                }
              } else
              {
                // The remote RS has no genId. We don't change anything for the
                // current RS.
              }
            } else
            {
              // The local RS is not initialized - take the one received
              // WARNING: Must be done before computing topo message to send
              // to peer server as topo message must embed valid generation id
              // for our server
              oldGenerationId =
                replicationServerDomain.setGenerationId(generationId, false);
            }
          }
          if (baseDn == null) // Reply to the RS (incoming connection)
          {
            // Send our own TopologyMsg to remote RS
            outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
            session.publish(outTopoMsg);
            if (debugEnabled())
            {
              TRACER.debugInfo("In " +
                replicationServerDomain.getReplicationServer().
                getMonitorInstanceName() + ":" +
                "\nSH HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
                "\nAND REPLIED:\n" + outTopoMsg.toString());
            }
          } else
          {
            if (debugEnabled())
            {
              TRACER.debugInfo("In " +
                replicationServerDomain.getReplicationServer().
                getMonitorInstanceName() + ":" +
                "\nSH HANDSHAKE SENT:\n" + outTopoMsg.toString() +
                "\nAND RECEIVED:\n" + inTopoMsg.toString());
            }
          }
          // Alright, connected with new RS (either outgoing or incoming
          // connection): store handler.
          Map<Short, ServerHandler> connectedRSs =
            replicationServerDomain.getConnectedRSs();
          connectedRSs.put(serverId, this);
          // Process TopologyMsg sent by remote RS: store matching new info
          // (this will also warn our connected DSs of the new received info)
          replicationServerDomain.receiveTopoInfoFromRS(inTopoMsg, this, false);
        } else if (msg2 instanceof StartSessionMsg)
        {
          // CONNECTION WITH A DS
          // Process StartSessionMsg sent by remote DS
          StartSessionMsg startSessionMsg = (StartSessionMsg) msg2;
          this.status = startSessionMsg.getStatus();
          // Sanity check: is it a valid initial status?
          if (!isValidInitialStatus(this.status))
          {
            Message mesg = ERR_RS_INVALID_INIT_STATUS.get(
              this.status.toString(), this.baseDn.toString(),
              Short.toString(serverId));
            closeSession(mesg);
            if ((replicationServerDomain != null) &&
              replicationServerDomain.hasLock())
              replicationServerDomain.release();
            return;
          }
          this.refUrls = startSessionMsg.getReferralsURLs();
          this.assuredFlag = startSessionMsg.isAssured();
          this.assuredMode = startSessionMsg.getAssuredMode();
          this.safeDataLevel = startSessionMsg.getSafeDataLevel();
          /*
           * If we have already a generationID set for the domain
           * then
           *   if the connecting replica has not the same
           *   then it is degraded locally and notified by an error message
           * else
           *   we set the generationID from the one received
           *   (unsaved yet on disk . will be set with the 1rst change
           * received)
           */
          if (localGenerationId > 0)
          {
            if (generationId != localGenerationId)
            {
              Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
                this.baseDn.toNormalizedString(),
                Short.toString(serverId),
                Long.toString(generationId),
                Long.toString(localGenerationId));
              logError(message);
            }
          } else
          {
            // We are an empty Replicationserver
            if ((generationId > 0) && (!serverState.isEmpty()))
            {
              // If the LDAP server has already sent changes
              // it is not expected to connect to an empty RS
              Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
                this.baseDn.toNormalizedString(),
                Short.toString(serverId),
                Long.toString(generationId),
                Long.toString(localGenerationId));
              logError(message);
            } else
            {
              // The local RS is not initialized - take the one received
              // WARNING: Must be done before computing topo message to send
              // to peer server as topo message must embed valid generation id
              // for our server
              oldGenerationId =
                replicationServerDomain.setGenerationId(generationId, false);
            }
          }
          // Send our own TopologyMsg to DS
          outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
            this.serverId);
          session.publish(outTopoMsg);
          if (debugEnabled())
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + ":" +
              "\nSH HANDSHAKE RECEIVED:\n" + startSessionMsg.toString() +
              "\nAND REPLIED:\n" + outTopoMsg.toString());
          }
          // Alright, connected with new DS: store handler.
          Map<Short, ServerHandler> connectedDSs =
            replicationServerDomain.getConnectedDSs();
          connectedDSs.put(serverId, this);
          // Tell peer DSs a new DS just connected to us
          // No need to resend topo msg to this just new DS so not null
          // argument
          replicationServerDomain.sendTopoInfoToDSs(this);
          // Tell peer RSs a new DS just connected to us
          replicationServerDomain.sendTopoInfoToRSs();
        } else
        {
          // We did not recognize the message, close session as what
          // can happen after is undetermined and we do not want the server to
          // be disturbed
          closeSession(null);
          if ((replicationServerDomain != null) &&
            replicationServerDomain.hasLock())
            replicationServerDomain.release();
          return;
        }
      }
      else
      /*
       * FINALIZE INITIALIZATION:
       * CREATE READER AND WRITER, HEARTBEAT SYSTEM AND UPDATE MONITORING
       * SYSTEM
       */
      // Disable timeout for next communications
      session.setSoTimeout(0);
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      writer = new ServerWriter(session, serverId,
        this, replicationServerDomain);
      reader = new ServerReader(session, serverId,
        this, replicationServerDomain);
      reader.start();
      writer.start();
      // Create a thread to send heartbeat messages.
      if (heartbeatInterval > 0)
      {
        // For a hanshakeOnly connection, let's only create a reader
        // in order to detect the connection closure.
        reader = new ServerReader(session, serverId,
            this, replicationServerDomain);
        reader.start();
        heartbeatThread = new HeartbeatThread(
          "Replication Heartbeat to DS " + serverURL + " " + serverId +
          " for " + this.baseDn + " in RS " + replicationServerId,
          session, heartbeatInterval / 3);
        heartbeatThread.start();
      }
      // Create the status analyzer for the domain if not already started
      if (serverIsLDAPserver)
      {
        if (!replicationServerDomain.isRunningStatusAnalyzer())
        {
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServerDomain.
              getReplicationServer().
              getMonitorInstanceName() +
              " SH for remote server " + this.getMonitorInstanceName() +
              " is starting status analyzer");
          replicationServerDomain.startStatusAnalyzer();
        }
      }
      DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
      DirectoryServer.registerMonitorProvider(this);
    } catch (NotSupportedOldVersionPDUException e)
    {
      // We do not need to support DS V1 connection, we just accept RS V1
      // connection:
      // We just trash the message, log the event for debug purpose and close
      // the connection
      if (debugEnabled())
      TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ":"
        + e.getMessage());
      closeSession(null);
    } catch (Exception e)
    {
      // We do not want polluting error log if error is due to normal session
      // aborted after handshake phase one from a DS that is searching for best
      // suitable RS.
      if ( log_error_message || (baseDn != null) )
      {
        // some problem happened, reject the connection
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_REPLICATION_SERVER_CONNECTION_ERROR.get(
          this.getMonitorInstanceName()));
        mb.append(": " + stackTraceToSingleLineString(e));
        closeSession(mb.toMessage());
      } else
      {
        closeSession(null);
      }
      // If generation id of domain was changed, set it back to old value
      // We may have changed it as it was -1 and we received a value >0 from
      // peer server and the last topo message sent may have failed being
      // sent: in that case retrieve old value of generation id for
      // replication server domain
      if (oldGenerationId != -100)
      {
        replicationServerDomain.setGenerationId(oldGenerationId, false);
      }
    }
    catch (Exception e)
    // Release domain
    if ((replicationServerDomain != null) &&
      replicationServerDomain.hasLock())
      replicationServerDomain.release();
  }
  /*
   * Close the session logging the passed error message
   * Log nothing if message is null.
   */
  private void closeSession(Message msg)
  {
    if (msg != null)
    {
      // some problem happened, reject the connection
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
          this.getMonitorInstanceName()));
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      try
      {
        session.close();
      } catch (IOException e1)
      {
        // ignore
      }
      logError(msg);
    }
    try
    {
      session.close();
    } catch (IOException ee)
    {
      // ignore
    }
  }
@@ -720,7 +1136,7 @@
   * @return true is saturated false if not saturated.
   */
  public boolean isSaturated(ChangeNumber changeNumber,
                             ServerHandler sourceHandler)
    ServerHandler sourceHandler)
  {
    synchronized (msgQueue)
    {
@@ -730,23 +1146,23 @@
        return true;
      if ((sourceHandler.maxSendQueue > 0) &&
          (size >= sourceHandler.maxSendQueue))
        (size >= sourceHandler.maxSendQueue))
        return true;
      if (!msgQueue.isEmpty())
      {
        UpdateMessage firstUpdate = msgQueue.first();
        UpdateMsg firstUpdate = msgQueue.first();
        if (firstUpdate != null)
        {
          long timeDiff = changeNumber.getTimeSec() -
          firstUpdate.getChangeNumber().getTimeSec();
            firstUpdate.getChangeNumber().getTimeSec();
          if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay))
            return true;
          if ((sourceHandler.maxSendDelay > 0) &&
              (timeDiff >= sourceHandler.maxSendDelay))
            (timeDiff >= sourceHandler.maxSendDelay))
            return true;
        }
      }
@@ -770,22 +1186,22 @@
      if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
        return false;
      if ((source != null) && (source.maxSendQueue > 0) &&
           (queueSize >= source.restartSendQueue))
        (queueSize >= source.restartSendQueue))
        return false;
      if (!msgQueue.isEmpty())
      {
        UpdateMessage firstUpdate = msgQueue.first();
        UpdateMessage lastUpdate = msgQueue.last();
        UpdateMsg firstUpdate = msgQueue.first();
        UpdateMsg lastUpdate = msgQueue.last();
        if ((firstUpdate != null) && (lastUpdate != null))
        {
          long timeDiff = lastUpdate.getChangeNumber().getTimeSec() -
               firstUpdate.getChangeNumber().getTimeSec();
            firstUpdate.getChangeNumber().getTimeSec();
          if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay))
            return false;
          if ((source != null) && (source.maxSendDelay > 0)
               && (timeDiff >= source.restartSendDelay))
          if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >=
            source.restartSendDelay))
            return false;
        }
      }
@@ -810,34 +1226,28 @@
   */
  public int getRcvMsgQueueSize()
  {
   synchronized (msgQueue)
   {
    /*
     * When the server is up to date or close to be up to date,
     * the number of updates to be sent is the size of the receive queue.
     */
     if (isFollowing())
       return msgQueue.count();
     else
     {
       /*
        * When the server  is not able to follow, the msgQueue
        * may become too large and therefore won't contain all the
        * changes. Some changes may only be stored in the backing DB
        * of the servers.
        * The total size of teh receieve queue is calculated by doing
        * the sum of the number of missing changes for every dbHandler.
        */
       int totalCount = 0;
       ServerState dbState = replicationServerDomain.getDbServerState();
       for (short id : dbState)
       {
         totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
             serverState.getMaxChangeNumber(id));
       }
       return totalCount;
     }
   }
    synchronized (msgQueue)
    {
      /*
       * When the server is up to date or close to be up to date,
       * the number of updates to be sent is the size of the receive queue.
       */
      if (isFollowing())
        return msgQueue.count();
      else
      {
        /*
         * When the server  is not able to follow, the msgQueue
         * may become too large and therefore won't contain all the
         * changes. Some changes may only be stored in the backing DB
         * of the servers.
         * The total size of teh receieve queue is calculated by doing
         * the sum of the number of missing changes for every dbHandler.
         */
        ServerState dbState = replicationServerDomain.getDbServerState();
        return ServerState.diffChanges(dbState, serverState);
      }
    }
  }
  /**
@@ -859,7 +1269,7 @@
      return 0;
    long currentTime = TimeThread.getTime();
    return ((currentTime - olderUpdateTime)/1000);
    return ((currentTime - olderUpdateTime) / 1000);
  }
  /**
@@ -870,7 +1280,7 @@
   */
  public Long getApproxFirstMissingDate()
  {
    Long result = (long)0;
    Long result = (long) 0;
    // Get the older CN received
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
@@ -878,7 +1288,7 @@
    {
      // If not present in the local RS db,
      // then approximate with the older update time
      result=olderUpdateCN.getTime();
      result = olderUpdateCN.getTime();
    }
    return result;
  }
@@ -892,7 +1302,7 @@
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return 0;
    return  olderUpdateCN.getTime();
    return olderUpdateCN.getTime();
  }
  /**
@@ -909,15 +1319,13 @@
      {
        if (msgQueue.isEmpty())
        {
          result=null;
        }
        else
          result = null;
        } else
        {
          UpdateMessage msg = msgQueue.first();
          UpdateMsg msg = msgQueue.first();
          result = msg.getChangeNumber();
        }
      }
      else
      } else
      {
        if (lateQueue.isEmpty())
        {
@@ -950,24 +1358,21 @@
                iteratorSortedSet.add(iterator);
              }
            }
            UpdateMessage msg = iteratorSortedSet.first().getChange();
            UpdateMsg msg = iteratorSortedSet.first().getChange();
            result = msg.getChangeNumber();
          }
          catch(Exception e)
          } catch (Exception e)
          {
            result=null;
          }
          finally
            result = null;
          } finally
          {
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
          }
        }
        else
        } else
        {
          UpdateMessage msg = lateQueue.first();
          UpdateMsg msg = lateQueue.first();
          result = msg.getChangeNumber();
        }
      }
@@ -995,33 +1400,14 @@
  }
  /**
   * Add an update the list of updates that must be sent to the server
   * Add an update to the list of updates that must be sent to the server
   * managed by this ServerHandler.
   *
   * @param update The update that must be added to the list of updates.
   * @param sourceHandler The server that sent the update.
   */
  public void add(UpdateMessage update, ServerHandler sourceHandler)
  public void add(UpdateMsg update, ServerHandler sourceHandler)
  {
    /*
     * Ignore updates from a server that is degraded due to
     * its inconsistent generationId
     */
    long referenceGenerationId = replicationServerDomain.getGenerationId();
    if ((referenceGenerationId>0) &&
        (referenceGenerationId != generationId))
    {
      logError(ERR_IGNORING_UPDATE_TO.get(
               this.replicationServerDomain.getReplicationServer().
                 getMonitorInstanceName(),
               update.getDn(),
               this.getMonitorInstanceName(),
               Long.toString(generationId),
               Long.toString(referenceGenerationId)));
      return;
    }
    synchronized (msgQueue)
    {
      /*
@@ -1063,10 +1449,10 @@
   * @return the next update that must be sent to the server managed by this
   *         ServerHandler.
   */
  public UpdateMessage take()
  public UpdateMsg take()
  {
    boolean interrupted = true;
    UpdateMessage msg = getnextMessage();
    UpdateMsg msg = getnextMessage();
    /*
     * When we remove a message from the queue we need to check if another
@@ -1080,8 +1466,7 @@
      try
      {
        replicationServerDomain.checkAllSaturation();
      }
      catch (IOException e)
      } catch (IOException e)
      {
      }
    }
@@ -1090,13 +1475,13 @@
    {
      try
      {
        acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
        acquired = sendWindow.tryAcquire((long) 500, TimeUnit.MILLISECONDS);
        interrupted = false;
      } catch (InterruptedException e)
      {
        // loop until not interrupted
      }
    } while (((interrupted) || (!acquired )) && (!shutdown));
    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
    this.incrementOutCount();
    return msg;
  }
@@ -1107,10 +1492,10 @@
   *
   * @return The next update that must be sent to the server.
   */
  private UpdateMessage getnextMessage()
  private UpdateMsg getnextMessage()
  {
    UpdateMessage msg;
    while (active == true)
    UpdateMsg msg;
    while (activeWriter == true)
    {
      if (following == false)
      {
@@ -1157,8 +1542,7 @@
              if (iterator.getChange() != null)
              {
                iteratorSortedSet.add(iterator);
              }
              else
              } else
              {
                iterator.releaseCursor();
              }
@@ -1201,8 +1585,7 @@
                setFollowing(true);
              }
            }
          }
          else
          } else
          {
            msg = lateQueue.first();
            synchronized (msgQueue)
@@ -1212,7 +1595,7 @@
                /* we finally catch up with the regular queue */
                setFollowing(true);
                lateQueue.clear();
                UpdateMessage msg1;
                UpdateMsg msg1;
                do
                {
                  msg1 = msgQueue.removeFirst();
@@ -1222,8 +1605,7 @@
              }
            }
          }
        }
        else
        } else
        {
          /* get the next change from the lateQueue */
          msg = lateQueue.removeFirst();
@@ -1240,7 +1622,7 @@
            while (msgQueue.isEmpty())
            {
              msgQueue.wait(500);
              if (!active)
              if (!activeWriter)
                return null;
            }
          } catch (InterruptedException e)
@@ -1259,11 +1641,11 @@
          }
        }
      }
      /*
       * Need to loop because following flag may have gone to false between
       * the first check at the beginning of this method
       * and the second check just above.
       */
    /*
     * Need to loop because following flag may have gone to false between
     * the first check at the beginning of this method
     * and the second check just above.
     */
    }
    return null;
  }
@@ -1274,7 +1656,7 @@
   * @param msg the last update sent.
   * @return boolean indicating if the update was meaningful.
   */
  public boolean  updateServerState(UpdateMessage msg)
  public boolean updateServerState(UpdateMsg msg)
  {
    return serverState.update(msg.getChangeNumber());
  }
@@ -1290,45 +1672,6 @@
  }
  /**
   * Stop this server handler processing.
   */
  public void stopHandler()
  {
    active = false;
    // Stop the remote LSHandler
    for (LightweightServerHandler lsh : connectedServers.values())
    {
      lsh.stopHandler();
    }
    connectedServers.clear();
    try
    {
      session.close();
    } catch (IOException e)
    {
      // ignore.
    }
    synchronized (msgQueue)
    {
      /* wake up the writer thread on an empty queue so that it disappear */
      msgQueue.clear();
      msgQueue.notify();
      msgQueue.notifyAll();
    }
    // Stop the heartbeat thread.
    if (heartbeatThread != null)
    {
      heartbeatThread.shutdown();
    }
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
  }
  /**
   * Send the ack to the server that did the original modification.
   *
   * @param changeNumber The ChangeNumber of the update that is acked.
@@ -1336,7 +1679,7 @@
   */
  public void sendAck(ChangeNumber changeNumber) throws IOException
  {
    AckMessage ack = new AckMessage(changeNumber);
    AckMsg ack = new AckMsg(changeNumber);
    session.publish(ack);
    outAckCount++;
  }
@@ -1347,7 +1690,7 @@
   * @param message The ack message that was received.
   * @param ackingServerId The  id of the server that acked the change.
   */
  public void ack(AckMessage message, short ackingServerId)
  public void ack(AckMsg message, short ackingServerId)
  {
    ChangeNumber changeNumber = message.getChangeNumber();
    AckMessageList ackList;
@@ -1377,7 +1720,7 @@
   * @param message the ack message that was received.
   * @param ackingServerId The  id of the server that acked the change.
   */
  public static void ackChangelog(AckMessage message, short ackingServerId)
  public static void ackChangelog(AckMsg message, short ackingServerId)
  {
    ChangeNumber changeNumber = message.getChangeNumber();
    ReplServerAckMessageList ackList;
@@ -1397,9 +1740,9 @@
    if (completedFlag)
    {
      ReplicationServerDomain replicationServerDomain =
              ackList.getChangelogCache();
        ackList.getChangelogCache();
      replicationServerDomain.sendAck(changeNumber, false,
                             ackList.getReplicationServerId());
        ackList.getReplicationServerId());
    }
  }
@@ -1410,11 +1753,11 @@
   * @param nbWaitedAck  The number of ack that must be received before
   *               the update is fully acked.
   */
  public void addWaitingAck(UpdateMessage update, int nbWaitedAck)
  public void addWaitingAck(UpdateMsg update, int nbWaitedAck)
  {
    AckMessageList ackList = new AckMessageList(update.getChangeNumber(),
                                                nbWaitedAck);
    synchronized(waitingAcks)
      nbWaitedAck);
    synchronized (waitingAcks)
    {
      waitingAcks.put(update.getChangeNumber(), ackList);
    }
@@ -1434,16 +1777,16 @@
   *                    the update is fully acked.
   */
  public static void addWaitingAck(
      UpdateMessage update,
      short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
      int nbWaitedAck)
    UpdateMsg update,
    short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
    int nbWaitedAck)
  {
    ReplServerAckMessageList ackList =
          new ReplServerAckMessageList(update.getChangeNumber(),
                                      nbWaitedAck,
                                      ChangelogServerId,
                                      replicationServerDomain);
    synchronized(changelogsWaitingAcks)
      new ReplServerAckMessageList(update.getChangeNumber(),
      nbWaitedAck,
      ChangelogServerId,
      replicationServerDomain);
    synchronized (changelogsWaitingAcks)
    {
      changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
    }
@@ -1486,7 +1829,7 @@
   */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
                          throws ConfigException,InitializationException
    throws ConfigException, InitializationException
  {
    // Nothing to do for now
  }
@@ -1501,12 +1844,12 @@
  public String getMonitorInstanceName()
  {
    String str = baseDn.toString() +
                 " " + serverURL + " " + String.valueOf(serverId);
      " " + serverURL + " " + String.valueOf(serverId);
    if (serverIsLDAPserver)
      return "Direct LDAP Server " + str;
      return "Directory Server " + str;
    else
      return "Remote Repl Server " + str;
      return "Remote Replication Server " + str;
  }
  /**
@@ -1536,7 +1879,6 @@
  public void updateMonitorData()
  {
    // As long as getUpdateInterval() returns 0, this will never get called
  }
  /**
@@ -1553,19 +1895,20 @@
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    if (serverIsLDAPserver)
    {
      attributes.add(new Attribute("LDAP-Server", serverURL));
      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
          getReplicationServer().getMonitorInstanceName()));
      attributes.add(Attributes.create("LDAP-Server", serverURL));
      attributes.add(Attributes.create("connected-to",
          this.replicationServerDomain.getReplicationServer()
              .getMonitorInstanceName()));
    }
    else
    {
      attributes.add(new Attribute("ReplicationServer-Server", serverURL));
      attributes.add(Attributes.create("ReplicationServer-Server",
          serverURL));
    }
    attributes.add(new Attribute("server-id",
                                 String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
                                 baseDn.toString()));
    attributes.add(Attributes.create("server-id", String
        .valueOf(serverId)));
    attributes.add(Attributes.create("base-dn", baseDn.toString()));
    if (serverIsLDAPserver)
    {
@@ -1576,106 +1919,85 @@
        // Oldest missing update
        Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
        if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
        if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
        {
          Date date = new Date(approxFirstMissingDate);
          attributes.add(new Attribute("approx-older-change-not-synchronized",
              date.toString()));
          attributes.add(
              new Attribute("approx-older-change-not-synchronized-millis",
                  String.valueOf(approxFirstMissingDate)));
          attributes.add(Attributes.create(
              "approx-older-change-not-synchronized", date.toString()));
          attributes.add(Attributes.create(
              "approx-older-change-not-synchronized-millis", String
                  .valueOf(approxFirstMissingDate)));
        }
        // Missing changes
        long missingChanges = md.getMissingChanges(serverId);
        attributes.add(new Attribute("missing-changes",
            String.valueOf(missingChanges)));
        attributes.add(Attributes.create("missing-changes", String
            .valueOf(missingChanges)));
        // Replication delay
        long delay = md.getApproxDelay(serverId);
        attributes.add(new Attribute("approximate-delay",
            String.valueOf(delay)));
        attributes.add(Attributes.create("approximate-delay", String
            .valueOf(delay)));
      }
      catch(Exception e)
      catch (Exception e)
      {
        // TODO: improve the log
        // We failed retrieving the remote monitor data.
        attributes.add(new Attribute("error",
        attributes.add(Attributes.create("error",
            stackTraceToSingleLineString(e)));
      }
    }
    attributes.add(
        new Attribute("queue-size", String.valueOf(msgQueue.count())));
        Attributes.create("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(
        new Attribute(
        Attributes.create(
            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(
        new Attribute(
        Attributes.create(
            "following", String.valueOf(following)));
    // Deprecated
    attributes.add(new Attribute("max-waiting-changes",
                                  String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-sent",
                                 String.valueOf(getOutCount())));
    attributes.add(new Attribute("update-received",
                                 String.valueOf(getInCount())));
    attributes.add(Attributes.create("max-waiting-changes", String
        .valueOf(maxQueueSize)));
    attributes.add(Attributes.create("update-sent", String
        .valueOf(getOutCount())));
    attributes.add(Attributes.create("update-received", String
        .valueOf(getInCount())));
    // Deprecated as long as assured is not exposed
    attributes.add(new Attribute("update-waiting-acks",
        String.valueOf(getWaitingAckSize())));
    attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
    attributes.add(new Attribute("ack-received",
                                 String.valueOf(getInAckCount())));
    attributes.add(Attributes.create("update-waiting-acks", String
        .valueOf(getWaitingAckSize())));
    attributes.add(Attributes.create("ack-sent", String
        .valueOf(getOutAckCount())));
    attributes.add(Attributes.create("ack-received", String
        .valueOf(getInAckCount())));
    // Window stats
    attributes.add(new Attribute("max-send-window",
                                 String.valueOf(sendWindowSize)));
    attributes.add(new Attribute("current-send-window",
                                String.valueOf(sendWindow.availablePermits())));
    attributes.add(new Attribute("max-rcv-window",
                                 String.valueOf(maxRcvWindow)));
    attributes.add(new Attribute("current-rcv-window",
                                 String.valueOf(rcvWindow)));
    /*
     * FIXME:PGB DEPRECATED
     *
    // Missing changes
    attributes.add(new Attribute("waiting-changes",
        String.valueOf(getRcvMsgQueueSize())));
    // Age of oldest missing change
    // Date of the oldest missing change
    long olderUpdateTime = getOlderUpdateTime();
    if (olderUpdateTime != 0)
    {
      Date date = new Date(getOlderUpdateTime());
      attributes.add(new Attribute("older-change-not-synchronized",
                                 String.valueOf(date.toString())));
    }
    */
    attributes.add(Attributes.create("max-send-window", String
        .valueOf(sendWindowSize)));
    attributes.add(Attributes.create("current-send-window", String
        .valueOf(sendWindow.availablePermits())));
    attributes.add(Attributes.create("max-rcv-window", String
        .valueOf(maxRcvWindow)));
    attributes.add(Attributes.create("current-rcv-window", String
        .valueOf(rcvWindow)));
    /* get the Server State */
    final String ATTR_SERVER_STATE = "server-state";
    AttributeType type =
      DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    AttributeBuilder builder = new AttributeBuilder("server-state");
    for (String str : serverState.toStringSet())
    {
      values.add(new AttributeValue(type,str));
      builder.add(str);
    }
    Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
    attributes.add(attr);
    attributes.add(builder.toAttribute());
    // Encryption
    attributes.add(new Attribute("ssl-encryption",
        String.valueOf(session.isEncrypted())));
    attributes.add(Attributes.create("ssl-encryption", String
        .valueOf(session.isEncrypted())));
    // Data generation
    attributes.add(new Attribute("generation-id",
        String.valueOf(generationId)));
    attributes.add(Attributes.create("generation-id", String
        .valueOf(generationId)));
    return attributes;
  }
@@ -1685,23 +2007,63 @@
   */
  public void shutdown()
  {
    shutdown  = true;
    /*
     * Shutdown ServerWriter
     */
    shutdownWriter = true;
    activeWriter = false;
    synchronized (msgQueue)
    {
      /* wake up the writer thread on an empty queue so that it disappear */
      msgQueue.clear();
      msgQueue.notify();
      msgQueue.notifyAll();
    }
    /*
     * Close session to end ServerReader or ServerWriter
     */
    try
    {
      session.close();
    } catch (IOException e)
    {
      // Service is closing.
      // ignore.
    }
    stopHandler();
    /*
     * Stop the remote LSHandler
     */
    for (LightweightServerHandler lsh : directoryServers.values())
    {
      lsh.stopHandler();
    }
    directoryServers.clear();
    /*
     * Stop the heartbeat thread.
     */
    if (heartbeatThread != null)
    {
      heartbeatThread.shutdown();
    }
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
    /*
     * Be sure to wait for ServerWriter and ServerReader death
     * It does not matter if we try to stop a thread which is us (reader
     * or writer), but we must not wait for our own thread death.
     */
    try
    {
      if (writer != null) {
      if ((writer != null) && (!(Thread.currentThread().equals(writer))))
      {
        writer.join(SHUTDOWN_JOIN_TIMEOUT);
      }
      if (reader != null) {
      if ((reader != null) && (!(Thread.currentThread().equals(reader))))
      {
        reader.join(SHUTDOWN_JOIN_TIMEOUT);
      }
    } catch (InterruptedException e)
@@ -1726,8 +2088,7 @@
      localString += serverId + " " + serverURL + " " + baseDn;
    }
    else
    } else
      localString = "Unknown server";
    return localString;
@@ -1735,7 +2096,7 @@
  /**
   * Decrement the protocol window, then check if it is necessary
   * to send a WindowMessage and send it.
   * to send a WindowMsg and send it.
   *
   * @throws IOException when the session becomes unavailable.
   */
@@ -1746,7 +2107,7 @@
  }
  /**
   * Check the protocol window and send WindowMessage if necessary.
   * Check the protocol window and send WindowMsg if necessary.
   *
   * @throws IOException when the session becomes unavailable.
   */
@@ -1763,7 +2124,7 @@
      }
      if (!flowControl)
      {
        WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
        WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
        session.publish(msg);
        outAckCount++;
        rcvWindow += rcvWindowSizeHalf;
@@ -1778,7 +2139,7 @@
   * @param windowMsg The Window Message containing the information
   *                  necessary for updating the window size.
   */
  public void updateWindow(WindowMessage windowMsg)
  public void updateWindow(WindowMsg windowMsg)
  {
    sendWindow.release(windowMsg.getNumAck());
  }
@@ -1797,107 +2158,204 @@
   *
   * @param msg The message to be processed.
   */
  public void process(RoutableMessage msg)
  public void process(RoutableMsg msg)
  {
    if (debugEnabled())
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
                 getMonitorInstanceName() +
                 " SH for remote server " + this.getMonitorInstanceName() +
                 " processes received msg=" + msg);
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        " SH for remote server " + this.getMonitorInstanceName() + ":" +
        "\nprocesses received msg:\n" + msg);
    replicationServerDomain.process(msg, this);
  }
  /**
   * Sends the provided ReplServerInfoMessage.
   * Sends the provided TopologyMsg to the peer server.
   *
   * @param info The ReplServerInfoMessage message to be sent.
   * @param topoMsg The TopologyMsg message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
   public void sendInfo(ReplServerInfoMessage info)
   throws IOException
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sends message=" + info);
  public void sendTopoInfo(TopologyMsg topoMsg)
    throws IOException
  {
    // V1 Rs do not support the TopologyMsg
    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      if (debugEnabled())
        TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " SH for remote server " + this.getMonitorInstanceName() + ":" +
          "\nsends message:\n" + topoMsg);
     session.publish(info);
   }
      session.publish(topoMsg);
    }
  }
   /**
    *
    * Sets the replication server from the message provided.
    *
    * @param infoMsg The information message.
    */
   public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sets replServerInfo " + "<" + infoMsg + ">");
  /**
   * Stores topology information received from a peer RS and that must be kept
   * in RS handler.
   *
   * @param topoMsg The received topology message
   */
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
  {
    // Store info for remote RS
    List<RSInfo> rsInfos = topoMsg.getRsList();
    // List should only contain RS info for sender
    RSInfo rsInfo = rsInfos.get(0);
    generationId = rsInfo.getGenerationId();
    groupId = rsInfo.getGroupId();
     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
     generationId = infoMsg.getGenerationId();
    /**
     * Store info for DSs connected to the peer RS
     */
    List<DSInfo> dsInfos = topoMsg.getDsList();
     synchronized(connectedServers)
     {
       // Removes the existing structures
       for (LightweightServerHandler lsh : connectedServers.values())
       {
         lsh.stopHandler();
       }
       connectedServers.clear();
    // Removes the existing structures
    for (LightweightServerHandler lsh : directoryServers.values())
    {
      lsh.stopHandler();
    }
    directoryServers.clear();
       // Creates the new structure according to the message received.
       for (String newConnectedServer : newRemoteLDAPservers)
       {
         LightweightServerHandler lsh
         = new LightweightServerHandler(newConnectedServer, this);
         lsh.startHandler();
         connectedServers.put(lsh.getServerId(), lsh);
       }
     }
   }
    // Creates the new structure according to the message received.
    for (DSInfo dsInfo : dsInfos)
    {
      LightweightServerHandler lsh = new LightweightServerHandler(this,
        serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
        dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
        dsInfo.isAssured(), dsInfo.getAssuredMode(),
        dsInfo.getSafeDataLevel());
      lsh.startHandler();
      directoryServers.put(lsh.getServerId(), lsh);
    }
  }
   /**
    * When this handler is connected to a replication server, specifies if
    * a wanted server is connected to this replication server.
    *
    * @param wantedServer The server we want to know if it is connected
    * to the replication server represented by this handler.
    * @return boolean True is the wanted server is connected to the server
    * represented by this handler.
    */
   public boolean isRemoteLDAPServer(short wantedServer)
   {
     synchronized(connectedServers)
     {
       for (LightweightServerHandler server : connectedServers.values())
       {
         if (wantedServer == server.getServerId())
         {
           return true;
         }
       }
       return false;
     }
   }
  /**
   * Process message of a remote server changing his status.
   * @param csMsg The message containing the new status
   * @return The new server status of the DS
   */
  public ServerStatus processNewStatus(ChangeStatusMsg csMsg)
  {
   /**
    * When the handler is connected to a replication server, specifies the
    * replication server has remote LDAP servers connected to it.
    *
    * @return boolean True is the replication server has remote LDAP servers
    * connected to it.
    */
   public boolean hasRemoteLDAPServers()
   {
     return !connectedServers.isEmpty();
   }
    // Sanity check
    if (!serverIsLDAPserver)
    {
      Message msg =
        ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(baseDn.toString(),
        Short.toString(serverId), csMsg.toString());
      logError(msg);
      return ServerStatus.INVALID_STATUS;
    }
    // Get the status the DS just entered
    ServerStatus reqStatus = csMsg.getNewStatus();
    // Translate new status to a state machine event
    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
    if (event == StatusMachineEvent.INVALID_EVENT)
    {
      Message msg = ERR_RS_INVALID_NEW_STATUS.get(reqStatus.toString(),
        baseDn.toString(), Short.toString(serverId));
      logError(msg);
      return ServerStatus.INVALID_STATUS;
    }
    // Check state machine allows this new status
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
        Short.toString(serverId), status.toString(), event.toString());
      logError(msg);
      return ServerStatus.INVALID_STATUS;
    }
    status = newStatus;
    return status;
  }
  /**
   * Change the status according to the event generated from the status
   * analyzer.
   * @param event The event to be used for new status computation
   * @return The new status of the DS
   * @throws IOException When raised by the underlying session
   */
  public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
    throws IOException
  {
    // Check state machine allows this new status (Sanity check)
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
        Short.toString(serverId), status.toString(), event.toString());
      logError(msg);
      // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS
      // and vice versa. We may are being trying to change the status while for
      // instance another status has just been entered: e.g a full update has
      // just been engaged. In that case, just ignore attempt to change the
      // status
      return newStatus;
    }
    // Send message requesting to change the DS status
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
      ServerStatus.INVALID_STATUS);
    if (debugEnabled())
    {
      TRACER.debugInfo(
        "In RS " +
        replicationServerDomain.getReplicationServer().getServerId() +
        " Sending change status from status analyzer to " + getServerId() +
        " for baseDn " + baseDn + ":\n" + csMsg);
    }
    session.publish(csMsg);
    status = newStatus;
    return newStatus;
  }
  /**
   * When this handler is connected to a replication server, specifies if
   * a wanted server is connected to this replication server.
   *
   * @param wantedServer The server we want to know if it is connected
   * to the replication server represented by this handler.
   * @return boolean True is the wanted server is connected to the server
   * represented by this handler.
   */
  public boolean isRemoteLDAPServer(short wantedServer)
  {
    synchronized (directoryServers)
    {
      for (LightweightServerHandler server : directoryServers.values())
      {
        if (wantedServer == server.getServerId())
        {
          return true;
        }
      }
      return false;
    }
  }
  /**
   * When the handler is connected to a replication server, specifies the
   * replication server has remote LDAP servers connected to it.
   *
   * @return boolean True is the replication server has remote LDAP servers
   * connected to it.
   */
  public boolean hasRemoteLDAPServers()
  {
    return !directoryServers.isEmpty();
  }
  /**
   * Send an InitializeRequestMessage to the server connected through this
@@ -1906,36 +2364,36 @@
   * @param msg The message to be processed
   * @throws IOException when raised by the underlying session
   */
  public void send(RoutableMessage msg) throws IOException
  public void send(RoutableMsg msg) throws IOException
  {
    if (debugEnabled())
          TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
              " SH for remote server " + this.getMonitorInstanceName() +
              " sends message=" + msg);
      TRACER.debugInfo("In " +
        replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() +
        " SH for remote server " + this.getMonitorInstanceName() + ":" +
        "\nsends message:\n" + msg);
    session.publish(msg);
  }
  /**
   * Send an ErrorMessage to the peer.
   * Send an ErrorMsg to the peer.
   *
   * @param errorMsg The message to be sent
   * @throws IOException when raised by the underlying session
   */
  public void sendError(ErrorMessage errorMsg) throws IOException
  public void sendError(ErrorMsg errorMsg) throws IOException
  {
    session.publish(errorMsg);
  }
  /**
   * Process the reception of a WindowProbe message.
   * Process the reception of a WindowProbeMsg message.
   *
   * @param  windowProbeMsg The message to process.
   *
   * @throws IOException    When the session becomes unavailable.
   */
  public void process(WindowProbe windowProbeMsg) throws IOException
  public void process(WindowProbeMsg windowProbeMsg) throws IOException
  {
    if (rcvWindow > 0)
    {
@@ -1945,11 +2403,10 @@
      // lets update the LDAP server with out current window size and hope
      // that everything will work better in the futur.
      // TODO also log an error message.
      WindowMessage msg = new WindowMessage(rcvWindow);
      WindowMsg msg = new WindowMsg(rcvWindow);
      session.publish(msg);
      outAckCount++;
    }
    else
    } else
    {
      // Both the LDAP server and the replication server believes that the
      // window is closed. Lets check the flowcontrol in case we
@@ -1968,27 +2425,6 @@
  }
  /**
   * Resets the generationId for this domain.
   */
  public void warnBadGenerationId()
  {
    // Notify the peer that it is now invalid regarding the generationId
    // We are now waiting a startServer message from this server with
    // a valid generationId.
    try
    {
      Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
      ErrorMessage errorMsg =
        new ErrorMessage(serverId, replicationServerId, message);
      session.publish(errorMsg);
    }
    catch (Exception e)
    {
      // FIXME Log exception when sending reset error message
    }
  }
  /**
   * Sends a message containing a generationId to a peer server.
   * The peer is expected to be a replication server.
   *
@@ -1996,8 +2432,8 @@
   * @throws IOException When it occurs while sending the message,
   *
   */
  public void forwardGenerationIdToRS(ResetGenerationId msg)
  throws IOException
  public void forwardGenerationIdToRS(ResetGenerationIdMsg msg)
    throws IOException
  {
    session.publish(msg);
  }
@@ -2027,8 +2463,166 @@
   * Return a Set containing the servers known by this replicationServer.
   * @return a set containing the servers known by this replicationServer.
   */
  public Set<Short> getConnectedServerIds()
  public Set<Short> getConnectedDirectoryServerIds()
  {
    return connectedServers.keySet();
    return directoryServers.keySet();
  }
  /**
   * Get the map of connected DSs
   * (to the RS represented by this server handler).
   * @return The map of connected DSs
   */
  public Map<Short, LightweightServerHandler> getConnectedDSs()
  {
    return directoryServers;
  }
  /**
   * Order the peer DS server to change his status or close the connection
   * according to the requested new generation id.
   * @param newGenId The new generation id to take into account
   * @throws IOException If IO error occurred.
   */
  public void changeStatusForResetGenId(long newGenId)
    throws IOException
  {
    StatusMachineEvent event = null;
    if (newGenId == -1)
    {
      // The generation id is being made invalid, let's put the DS
      // into BAD_GEN_ID_STATUS
      event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
    } else
    {
      if (newGenId == generationId)
      {
        if (status == ServerStatus.BAD_GEN_ID_STATUS)
        {
          // This server has the good new reference generation id.
          // Close connection with him to force his reconnection: DS will
          // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
          if (debugEnabled())
          {
            TRACER.debugInfo(
              "In RS " +
              replicationServerDomain.getReplicationServer().getServerId() +
              ". Closing connection to DS " + getServerId() +
              " for baseDn " + baseDn + " to force reconnection as new local" +
              " generation id and remote one match and DS is in bad gen id: " +
              newGenId);
          }
          // Connection closure must not be done calling RSD.stopHandler() as it
          // would rewait the RSD lock that we already must have entering this
          // method. This would lead to a reentrant lock which we do not want.
          // So simply close the session, this will make the hang up appear
          // after the reader thread that took the RSD lock realeases it.
          try
          {
            if (session != null)
              session.close();
          } catch (IOException e)
          {
            // ignore
          }
          // NOT_CONNECTED_STATUS is the last one in RS session life: handler
          // will soon disappear after this method call...
          status = ServerStatus.NOT_CONNECTED_STATUS;
          return;
        } else
        {
          if (debugEnabled())
          {
            TRACER.debugInfo(
              "In RS " +
              replicationServerDomain.getReplicationServer().getServerId() +
              ". DS " + getServerId() + " for baseDn " + baseDn +
              " has already generation id " + newGenId +
              " so no ChangeStatusMsg sent to him.");
          }
          return;
        }
      } else
      {
        // This server has a bad generation id compared to new reference one,
        // let's put it into BAD_GEN_ID_STATUS
        event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
      }
    }
    if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
      (status == ServerStatus.FULL_UPDATE_STATUS))
    {
      // Prevent useless error message (full update status cannot lead to bad
      // gen status)
      Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
        Short.toString(replicationServerDomain.
        getReplicationServer().getServerId()),
        baseDn.toString(),
        Short.toString(serverId),
        Long.toString(generationId),
        Long.toString(newGenId));
      logError(message);
      return;
    }
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
        Short.toString(serverId), status.toString(), event.toString());
      logError(msg);
      return;
    }
    // Send message requesting to change the DS status
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
      ServerStatus.INVALID_STATUS);
    if (debugEnabled())
    {
      TRACER.debugInfo(
        "In RS " +
        replicationServerDomain.getReplicationServer().getServerId() +
        " Sending change status for reset gen id to " + getServerId() +
        " for baseDn " + baseDn + ":\n" + csMsg);
    }
    session.publish(csMsg);
    status = newStatus;
  }
  /**
   * Set the shut down flag to true and returns the previous value of the flag.
   * @return The previous value of the shut down flag
   */
  public boolean engageShutdown()
  {
    // Use thread safe boolean
    return shuttingDown.getAndSet(true);
  }
  /**
   * Gets the status of the connected DS.
   * @return The status of the connected DS.
   */
  public ServerStatus getStatus()
  {
    return status;
  }
  /**
   * Gets the protocol version used with this remote server.
   * @return The protocol version used with this remote server.
   */
  public short getProtocolVersion()
  {
    return protocolVersion;
  }
}