mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
08.02.2009 c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -68,6 +70,7 @@
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -112,9 +115,6 @@
  // Our replication domain
  private ReplicationDomain domain = null;
  // Trick for avoiding a inner class for many parameters return for
  // performPhaseOneHandshake method.
  private String tmpReadableServerName = null;
  /**
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
@@ -183,7 +183,7 @@
   * @param groupId The group id of our domain.
   * @param changeTimeHeartbeatInterval The interval (in ms) between Change
   *        time  heartbeats are sent to the RS,
   *        or zero if no CN heartbeat shoud be sent.
   *        or zero if no CN heartbeat should be sent.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
    ServerState state, String baseDn, int serverID2, int window,
@@ -290,23 +290,93 @@
  /**
   * Bag class for keeping info we get from a server in order to compute the
   * best one to connect to.
   * best one to connect to. This is in fact a wrapper to a
   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
   */
  public static class ServerInfo
  {
    private ServerState serverState = null;
    private short protocolVersion;
    private long generationId;
    private byte groupId = (byte) -1;
    private int serverId;
    private String serverURL;
    private String baseDn = null;
    private int windowSize;
    private ServerState serverState;
    private boolean sslEncryption;
    private int degradedStatusThreshold = -1;
    // Keeps the -1 value if created with a ReplServerStartMsg
    private int weight = -1;
    // Keeps the -1 value if created with a ReplServerStartMsg
    private int connectedDSNumber = -1;
    /**
     * Constructor.
     * @param serverState Server state of the RS
     * @param groupId Group id of the RS
     * Create a new instance of ServerInfo wrapping the passed message.
     * @param msg Message to wrap.
     * @return The new instance wrapping the passed message.
     * @throws IllegalArgumentException If the passed message has an unexpected
     *                                  type.
     */
    public ServerInfo(ServerState serverState, byte groupId)
    public static ServerInfo newServerInfo(
      ReplicationMsg msg) throws IllegalArgumentException
    {
      this.serverState = serverState;
      this.groupId = groupId;
      if (msg instanceof ReplServerStartMsg)
      {
        // This is a ReplServerStartMsg (RS uses protocol V3 or under)
        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
        return new ServerInfo(replServerStartMsg);
      }
      else if (msg instanceof ReplServerStartDSMsg)
      {
        // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
        return new ServerInfo(replServerStartDSMsg);
      }
      // Unsupported message type: should not happen
      throw new IllegalArgumentException("Unexpected PDU type: " +
        msg.getClass().getName() + " :\n" + msg.toString());
    }
    /**
     * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
     * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
     */
    private ServerInfo(ReplServerStartMsg replServerStartMsg)
    {
      this.protocolVersion = replServerStartMsg.getVersion();
      this.generationId = replServerStartMsg.getGenerationId();
      this.groupId = replServerStartMsg.getGroupId();
      this.serverId = replServerStartMsg.getServerId();
      this.serverURL = replServerStartMsg.getServerURL();
      this.baseDn = replServerStartMsg.getBaseDn();
      this.windowSize = replServerStartMsg.getWindowSize();
      this.serverState = replServerStartMsg.getServerState();
      this.sslEncryption = replServerStartMsg.getSSLEncryption();
      this.degradedStatusThreshold =
        replServerStartMsg.getDegradedStatusThreshold();
    }
    /**
     * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
     * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
     * wrap.
     */
    private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
    {
      this.protocolVersion = replServerStartDSMsg.getVersion();
      this.generationId = replServerStartDSMsg.getGenerationId();
      this.groupId = replServerStartDSMsg.getGroupId();
      this.serverId = replServerStartDSMsg.getServerId();
      this.serverURL = replServerStartDSMsg.getServerURL();
      this.baseDn = replServerStartDSMsg.getBaseDn();
      this.windowSize = replServerStartDSMsg.getWindowSize();
      this.serverState = replServerStartDSMsg.getServerState();
      this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
      this.degradedStatusThreshold =
        replServerStartDSMsg.getDegradedStatusThreshold();
      this.weight = replServerStartDSMsg.getWeight();
      this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
    }
    /**
@@ -326,6 +396,98 @@
    {
      return groupId;
    }
    /**
     * Get the server protocol version.
     * @return the protocolVersion
     */
    public short getProtocolVersion()
    {
      return protocolVersion;
    }
    /**
     * Get the generation id.
     * @return the generationId
     */
    public long getGenerationId()
    {
      return generationId;
    }
    /**
     * Get the server id.
     * @return the serverId
     */
    public int getServerId()
    {
      return serverId;
    }
    /**
     * Get the server URL.
     * @return the serverURL
     */
    public String getServerURL()
    {
      return serverURL;
    }
    /**
     * Get the base dn.
     * @return the baseDn
     */
    public String getBaseDn()
    {
      return baseDn;
    }
    /**
     * Get the window size.
     * @return the windowSize
     */
    public int getWindowSize()
    {
      return windowSize;
    }
    /**
     * Get the ssl encryption.
     * @return the sslEncryption
     */
    public boolean isSslEncryption()
    {
      return sslEncryption;
    }
    /**
     * Get the degraded status threshold.
     * @return the degradedStatusThreshold
     */
    public int getDegradedStatusThreshold()
    {
      return degradedStatusThreshold;
    }
    /**
     * Get the weight.
     * @return the weight. Null if this object is a wrapper for
     * a ReplServerStartMsg.
     */
    public int getWeight()
    {
      return weight;
    }
    /**
     * Get the connected DS number.
     * @return the connectedDSNumber. Null if this object is a wrapper for
     * a ReplServerStartMsg.
     */
    public int getConnectedDSNumber()
    {
      return connectedDSNumber;
    }
  }
  private void connect()
@@ -342,10 +504,34 @@
  }
  /**
   * Contacts all replication servers to get information from them and being
   * able to choose the more suitable.
   * @return the collected information.
   */
  private Map<String, ServerInfo> collectReplicationServersInfo() {
    Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    for (String server : servers)
    {
      // Connect to server and get info about it
      ServerInfo serverInfo = performPhaseOneHandshake(server, false);
      // Store server info in list
      if (serverInfo != null)
      {
        rsInfos.put(server, serverInfo);
      }
    }
    return rsInfos;
  }
  /**
   * Special aspects of connecting as ECL compared to connecting as data server
   * are :
   * - 1 single RS configured
   * - so no choice of the prefered RS
   * - so no choice of the preferred RS
   * - No same groupID polling
   * - ?? Heartbeat
   * - Start handshake is :
@@ -358,10 +544,10 @@
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = this.servers.iterator().next();
    ReplServerStartMsg inReplServerStartMsg
    ReplServerStartDSMsg inReplServerStartDSMsg
      = performECLPhaseOneHandshake(bestServer, true);
    if (inReplServerStartMsg!=null)
    if (inReplServerStartDSMsg!=null)
      performECLPhaseTwoHandshake(bestServer);
  }
@@ -392,8 +578,6 @@
   */
  private void connectAsDataServer()
  {
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    // May have created a broker with null replication domain for
    // unit test purpose.
    if (domain != null)
@@ -418,24 +602,12 @@
       */
      if (debugEnabled())
        TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
            " order to elect the prefered one");
      for (String server : servers)
      {
        // Connect to server and get reply message
        ReplServerStartMsg replServerStartMsg =
          performPhaseOneHandshake(server, false);
            " order to elect the preferred one");
        // Store reply message info in list
        if (replServerStartMsg != null)
        {
          ServerInfo serverInfo =
            new ServerInfo(replServerStartMsg.getServerState(),
            replServerStartMsg.getGroupId());
          rsInfos.put(server, serverInfo);
        }
      } // for servers
      // Get info from every available replication servers
      Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
      ReplServerStartMsg replServerStartMsg = null;
      ServerInfo serverInfo = null;
      if (rsInfos.size() > 0)
      {
@@ -446,19 +618,17 @@
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo(
              "phase 2 : will perform PhaseOneH with the prefered RS.");
        replServerStartMsg = performPhaseOneHandshake(bestServer, true);
              "phase 2 : will perform PhaseOneH with the preferred RS.");
        serverInfo = performPhaseOneHandshake(bestServer, true);
        if (replServerStartMsg != null) // Handshake phase 1 exchange went well
        if (serverInfo != null) // Handshake phase 1 exchange went well
        {
          ServerInfo bestServerInfo = rsInfos.get(bestServer);
          // Compute in which status we are starting the session to tell the RS
          ServerStatus initStatus =
            computeInitialServerStatus(replServerStartMsg.getGenerationId(),
            bestServerInfo.getServerState(),
            replServerStartMsg.getDegradedStatusThreshold(),
            computeInitialServerStatus(serverInfo.getGenerationId(),
            serverInfo.getServerState(),
            serverInfo.getDegradedStatusThreshold(),
            this.getGenerationID());
          // Perfom session start (handshake phase 2)
@@ -485,7 +655,7 @@
               * reconnection at that time to retrieve a server with our group
               * id.
               */
              byte tmpRsGroupId = bestServerInfo.getGroupId();
              byte tmpRsGroupId = serverInfo.getGroupId();
              boolean someServersWithSameGroupId =
                hasSomeServerWithSameGroupId(topologyMsg.getRsList());
@@ -493,10 +663,10 @@
              if ((tmpRsGroupId == groupId) ||
                ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
              {
                replicationServer = tmpReadableServerName;
                maxSendWindow = replServerStartMsg.getWindowSize();
                rsGroupId = replServerStartMsg.getGroupId();
                rsServerId = replServerStartMsg.getServerId();
                replicationServer = session.getReadableRemoteAddress();
                maxSendWindow = serverInfo.getWindowSize();
                rsGroupId = serverInfo.getGroupId();
                rsServerId = serverInfo.getServerId();
                rsServerUrl = bestServer;
                // May have created a broker with null replication domain for
@@ -504,8 +674,8 @@
                if (domain != null)
                {
                  domain.sessionInitiated(
                      initStatus, replServerStartMsg.getServerState(),
                      replServerStartMsg.getGenerationId(),
                      initStatus, serverInfo.getServerState(),
                      serverInfo.getGenerationId(),
                      session);
                }
                receiveTopo(topologyMsg);
@@ -524,7 +694,7 @@
                 startSameGroupIdPoller();
                }
                startRSHeartBeatMonitoring();
                if (replServerStartMsg.getVersion()
                if (serverInfo.getProtocolVersion()
                    >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
                {
                  startChangeTimeHeartBeatPublishing();
@@ -584,8 +754,8 @@
        rcvWindow = maxRcvWindow;
        connectPhaseLock.notify();
        if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
          (replServerStartMsg.getGenerationId() == -1))
        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
          (serverInfo.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -602,7 +772,7 @@
            baseDn.toString(),
            replicationServer,
            Long.toString(this.getGenerationID()),
            Long.toString(replServerStartMsg.getGenerationId()));
            Long.toString(serverInfo.getGenerationId()));
          logError(message);
        }
      } else
@@ -709,19 +879,19 @@
  /**
   * Connect to the provided server performing the first phase handshake
   * (start messages exchange) and return the reply message from the replication
   * server.
   * server, wrapped in a ServerInfo object.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   *        Use true if want to perform handshake phase 2 with the same session
   *        and keep the session to create as the current one.
   * @return The ReplServerStartMsg the server replied. Null if could not
   * @return The answer from the server . Null if could not
   *         get an answer.
   */
  private ReplServerStartMsg performPhaseOneHandshake(String server,
  private ServerInfo performPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMsg replServerStartMsg = null;
    ServerInfo serverInfo = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
@@ -738,8 +908,6 @@
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      if (keepConnection)
        tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
@@ -759,19 +927,23 @@
      localSession.publish(serverStartMsg);
      /*
       * Read the ReplServerStartMsg that should come back.
       * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
       * back.
       */
      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
      ReplicationMsg msg = localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
      }
        {
          TRACER.debugInfo("In RB for " + baseDn +
            "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
            "\nAND RECEIVED:\n" + msg.toString());
        }
      // Wrap received message in a server info object
      serverInfo = ServerInfo.newServerInfo(msg);
      // Sanity check
      String repDn = replServerStartMsg.getBaseDn();
      String repDn = serverInfo.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -786,7 +958,7 @@
       * if it is an old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
        serverInfo.getProtocolVersion());
      localSession.setProtocolVersion(protocolVersion);
@@ -839,10 +1011,25 @@
    {
      if (localSession != null)
      {
        if (debugEnabled())
          TRACER.debugInfo("In RB, closing session after phase 1");
        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // V4 protocol introduces a StopMsg to properly end communications
          if (!error)
          {
            try
            {
              localSession.publish(new StopMsg());
            } catch (IOException ioe)
            {
              // Anyway, going to close session, so nothing to do
            }
          }
        }
        try
        {
          if (debugEnabled())
            TRACER.debugInfo("In RB, closing session after phase 1");
          localSession.close();
        } catch (IOException e)
        {
@@ -852,7 +1039,7 @@
      }
      if (error)
      {
        replServerStartMsg = null;
        serverInfo = null;
      } // Be sure to return null.
    }
@@ -864,7 +1051,7 @@
      session = localSession;
    }
    return replServerStartMsg;
    return serverInfo;
  }
  /**
@@ -876,13 +1063,13 @@
   * @param keepConnection Do we keep session opened or not after handshake.
   *        Use true if want to perform handshake phase 2 with the same session
   *        and keep the session to create as the current one.
   * @return The ReplServerStartMsg the server replied. Null if could not
   * @return The ReplServerStartDSMsg the server replied. Null if could not
   *         get an answer.
   */
  private ReplServerStartMsg performECLPhaseOneHandshake(String server,
  private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMsg replServerStartMsg = null;
    ReplServerStartDSMsg replServerStartDSMsg = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
@@ -899,8 +1086,6 @@
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      if (keepConnection)
        tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
@@ -920,17 +1105,17 @@
      localSession.publish(serverStartECLMsg);
      // Read the ReplServerStartMsg that should come back.
      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
      replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
          "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
      }
      // Sanity check
      String repDn = replServerStartMsg.getBaseDn();
      String repDn = replServerStartDSMsg.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -946,7 +1131,7 @@
       */
      if (keepConnection)
        protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
          replServerStartDSMsg.getVersion());
      localSession.setProtocolVersion(protocolVersion);
      if (!isSslEncryption)
@@ -998,10 +1183,22 @@
    {
      if (localSession != null)
      {
        if (debugEnabled())
          TRACER.debugInfo("In RB, closing session after phase 1");
        // V4 protocol introduces a StopMsg to properly end communications
        if (!error)
        {
          try
          {
            localSession.publish(new StopMsg());
          } catch (IOException ioe)
          {
            // Anyway, going to close session, so nothing to do
          }
        }
        try
        {
          if (debugEnabled())
            TRACER.debugInfo("In RB, closing session after phase 1");
          localSession.close();
        } catch (IOException e)
        {
@@ -1011,7 +1208,7 @@
      }
      if (error)
      {
        replServerStartMsg = null;
        replServerStartDSMsg = null;
      } // Be sure to return null.
    }
@@ -1023,7 +1220,7 @@
      session = localSession;
    }
    return replServerStartMsg;
    return replServerStartDSMsg;
  }
  /**
@@ -1184,8 +1381,7 @@
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn,
    byte groupId)
    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
  {
    /*
     * Preference is given to servers with the requested group id:
@@ -1195,7 +1391,7 @@
     */
    // Filter for servers with same group id
    HashMap<String, ServerInfo> sameGroupIdRsInfos =
    Map<String, ServerInfo> sameGroupIdRsInfos =
      new HashMap<String, ServerInfo>();
    for (String repServer : rsInfos.keySet())
@@ -1231,7 +1427,7 @@
   * @return The computed best replication server.
   */
  private static String searchForBestReplicationServer(ServerState myState,
    HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn)
    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
@@ -1266,7 +1462,7 @@
    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
    /*
     * Start loop to differenciate up to date servers from late ones.
     * Start loop to differentiate up to date servers from late ones.
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
    if (myChangeNumber == null)
@@ -1321,6 +1517,7 @@
        if (ReplicationServer.isLocalReplicationServer(upServer))
        {
          localRS = true;
          break;
        }
      }
      if (localRS)
@@ -1459,7 +1656,8 @@
        new HeartbeatMonitor("Replication Heartbeat Monitor on RS " +
        getReplicationServer() + " " + rsServerId + " for " + baseDn +
        " in DS " + serverId,
        session, heartbeatInterval);
        session, heartbeatInterval, (protocolVersion >=
        ProtocolVersion.REPLICATION_PROTOCOL_V4));
      heartbeatMonitor.start();
    }
  }
@@ -1513,16 +1711,28 @@
   */
  public void reStart(ProtocolSession failingSession)
  {
    try
    if (failingSession != null)
    {
      if (failingSession != null)
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end communications
        try
        {
          failingSession.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        failingSession.close();
        numLostConnections++;
      } catch (IOException e1)
      {
        // ignore
      }
    } catch (IOException e1)
    {
      // ignore
      numLostConnections++;
    }
    if (failingSession == session)
@@ -1708,6 +1918,19 @@
          TopologyMsg topoMsg = (TopologyMsg)msg;
          receiveTopo(topoMsg);
        }
        else if (msg instanceof StopMsg)
        {
          /*
           * RS performs a proper disconnection
           */
          Message message =
            NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
            Integer.toString(rsServerId), baseDn.toString(),
            Integer.toString(serverId));
          logError(message);
          // Try to find a suitable RS
          this.reStart(failingSession);
        }
        else
        {
          return msg;
@@ -1723,10 +1946,10 @@
          {
            /*
             * If we did not initiate the close on our side, log a message.
             * We did not initiate the close on our side, log an error message.
             */
            Message message =
              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer,
              ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
                  Integer.toString(rsServerId), baseDn.toString(),
                  Integer.toString(serverId));
            logError(message);
@@ -1783,14 +2006,26 @@
    rsGroupId = (byte) -1;
    rsServerId = -1;
    rsServerUrl = null;
    try
    if (session != null)
    {
      if (session != null)
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        session.close();
      } catch (IOException e)
      {
      }
    } catch (IOException e)
    {
    }
  }
@@ -1896,7 +2131,7 @@
      Collection<String> replicationServers, int window, long heartbeatInterval,
      byte groupId)
  {
    // These parameters needs to be renegociated with the ReplicationServer
    // These parameters needs to be renegotiated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
    // the ReplicationServer.
    Boolean needToRestartSession = false;
@@ -1945,7 +2180,7 @@
  private boolean debugEnabled()
  {
    return true;
    return false;
  }
  private static final void debugInfo(String s)
@@ -2057,13 +2292,13 @@
                continue;
              // Connect to server and get reply message
              ReplServerStartMsg replServerStartMsg =
              ServerInfo serverInfo =
                performPhaseOneHandshake(server, false);
              // Store reply message info in list
              if (replServerStartMsg != null)
              // Is it a server with our group id ?
              if (serverInfo != null)
              {
                if (groupId == replServerStartMsg.getGroupId())
                if (groupId == serverInfo.getGroupId())
                {
                  // Found one server with the same group id as us, disconnect
                  // session to force reconnection to a server with same group
@@ -2072,6 +2307,20 @@
                    Byte.toString(groupId), baseDn.toString(),
                    Integer.toString(serverId));
                  logError(message);
                  if (protocolVersion >=
                    ProtocolVersion.REPLICATION_PROTOCOL_V4)
                  {
                    // V4 protocol introduces a StopMsg to properly end
                    // communications
                    try
                    {
                      session.publish(new StopMsg());
                    } catch (IOException ioe)
                    {
                      // Anyway, going to close session, so nothing to do
                    }
                  }
                  try
                  {
                    session.close();