From 305d344b81f1fd8eb96c6c938ae0be0c268f45af Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 08 Oct 2009 16:02:17 +0000
Subject: [PATCH] - Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS  in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg  contains same thing as ReplServerStartMsg but also contains

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  467 ++++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 358 insertions(+), 109 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 9b39dc7..e058301 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -43,6 +43,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -61,6 +62,7 @@
 import org.opends.server.replication.protocol.HeartbeatMonitor;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
 import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -68,6 +70,7 @@
 import org.opends.server.replication.protocol.ServerStartMsg;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
 import org.opends.server.replication.protocol.TopologyMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.protocol.WindowMsg;
@@ -112,9 +115,6 @@
   // Our replication domain
   private ReplicationDomain domain = null;
 
-  // Trick for avoiding a inner class for many parameters return for
-  // performPhaseOneHandshake method.
-  private String tmpReadableServerName = null;
   /**
    * The expected duration in milliseconds between heartbeats received
    * from the replication server.  Zero means heartbeats are off.
@@ -183,7 +183,7 @@
    * @param groupId The group id of our domain.
    * @param changeTimeHeartbeatInterval The interval (in ms) between Change
    *        time  heartbeats are sent to the RS,
-   *        or zero if no CN heartbeat shoud be sent.
+   *        or zero if no CN heartbeat should be sent.
    */
   public ReplicationBroker(ReplicationDomain replicationDomain,
     ServerState state, String baseDn, int serverID2, int window,
@@ -290,23 +290,93 @@
 
   /**
    * Bag class for keeping info we get from a server in order to compute the
-   * best one to connect to.
+   * best one to connect to. This is in fact a wrapper to a
+   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
    */
   public static class ServerInfo
   {
-
-    private ServerState serverState = null;
+    private short protocolVersion;
+    private long generationId;
     private byte groupId = (byte) -1;
+    private int serverId;
+    private String serverURL;
+    private String baseDn = null;
+    private int windowSize;
+    private ServerState serverState;
+    private boolean sslEncryption;
+    private int degradedStatusThreshold = -1;
+    // Keeps the -1 value if created with a ReplServerStartMsg
+    private int weight = -1;
+    // Keeps the -1 value if created with a ReplServerStartMsg
+    private int connectedDSNumber = -1;
 
     /**
-     * Constructor.
-     * @param serverState Server state of the RS
-     * @param groupId Group id of the RS
+     * Create a new instance of ServerInfo wrapping the passed message.
+     * @param msg Message to wrap.
+     * @return The new instance wrapping the passed message.
+     * @throws IllegalArgumentException If the passed message has an unexpected
+     *                                  type.
      */
-    public ServerInfo(ServerState serverState, byte groupId)
+    public static ServerInfo newServerInfo(
+      ReplicationMsg msg) throws IllegalArgumentException
     {
-      this.serverState = serverState;
-      this.groupId = groupId;
+      if (msg instanceof ReplServerStartMsg)
+      {
+        // This is a ReplServerStartMsg (RS uses protocol V3 or under)
+        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
+        return new ServerInfo(replServerStartMsg);
+      }
+      else if (msg instanceof ReplServerStartDSMsg)
+      {
+        // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
+        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
+        return new ServerInfo(replServerStartDSMsg);
+      }
+
+      // Unsupported message type: should not happen
+      throw new IllegalArgumentException("Unexpected PDU type: " +
+        msg.getClass().getName() + " :\n" + msg.toString());
+    }
+
+    /**
+     * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
+     * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
+     */
+    private ServerInfo(ReplServerStartMsg replServerStartMsg)
+    {
+      this.protocolVersion = replServerStartMsg.getVersion();
+      this.generationId = replServerStartMsg.getGenerationId();
+      this.groupId = replServerStartMsg.getGroupId();
+      this.serverId = replServerStartMsg.getServerId();
+      this.serverURL = replServerStartMsg.getServerURL();
+      this.baseDn = replServerStartMsg.getBaseDn();
+      this.windowSize = replServerStartMsg.getWindowSize();
+      this.serverState = replServerStartMsg.getServerState();
+      this.sslEncryption = replServerStartMsg.getSSLEncryption();
+      this.degradedStatusThreshold =
+        replServerStartMsg.getDegradedStatusThreshold();
+    }
+
+    /**
+     * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
+     * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
+     * wrap.
+     */
+    private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+    {
+      this.protocolVersion = replServerStartDSMsg.getVersion();
+      this.generationId = replServerStartDSMsg.getGenerationId();
+      this.groupId = replServerStartDSMsg.getGroupId();
+      this.serverId = replServerStartDSMsg.getServerId();
+      this.serverURL = replServerStartDSMsg.getServerURL();
+      this.baseDn = replServerStartDSMsg.getBaseDn();
+      this.windowSize = replServerStartDSMsg.getWindowSize();
+      this.serverState = replServerStartDSMsg.getServerState();
+      this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
+      this.degradedStatusThreshold =
+        replServerStartDSMsg.getDegradedStatusThreshold();
+      this.weight = replServerStartDSMsg.getWeight();
+      this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
     }
 
     /**
@@ -326,6 +396,98 @@
     {
       return groupId;
     }
+
+    /**
+     * Get the server protocol version.
+     * @return the protocolVersion
+     */
+    public short getProtocolVersion()
+    {
+      return protocolVersion;
+    }
+
+    /**
+     * Get the generation id.
+     * @return the generationId
+     */
+    public long getGenerationId()
+    {
+      return generationId;
+    }
+
+    /**
+     * Get the server id.
+     * @return the serverId
+     */
+    public int getServerId()
+    {
+      return serverId;
+    }
+
+    /**
+     * Get the server URL.
+     * @return the serverURL
+     */
+    public String getServerURL()
+    {
+      return serverURL;
+    }
+
+    /**
+     * Get the base dn.
+     * @return the baseDn
+     */
+    public String getBaseDn()
+    {
+      return baseDn;
+    }
+
+    /**
+     * Get the window size.
+     * @return the windowSize
+     */
+    public int getWindowSize()
+    {
+      return windowSize;
+    }
+
+    /**
+     * Get the ssl encryption.
+     * @return the sslEncryption
+     */
+    public boolean isSslEncryption()
+    {
+      return sslEncryption;
+    }
+
+    /**
+     * Get the degraded status threshold.
+     * @return the degradedStatusThreshold
+     */
+    public int getDegradedStatusThreshold()
+    {
+      return degradedStatusThreshold;
+    }
+
+    /**
+     * Get the weight.
+     * @return the weight. Null if this object is a wrapper for
+     * a ReplServerStartMsg.
+     */
+    public int getWeight()
+    {
+      return weight;
+    }
+
+    /**
+     * Get the connected DS number.
+     * @return the connectedDSNumber. Null if this object is a wrapper for
+     * a ReplServerStartMsg.
+     */
+    public int getConnectedDSNumber()
+    {
+      return connectedDSNumber;
+    }
   }
 
   private void connect()
@@ -342,10 +504,34 @@
   }
 
   /**
+   * Contacts all replication servers to get information from them and being
+   * able to choose the more suitable.
+   * @return the collected information.
+   */
+  private Map<String, ServerInfo> collectReplicationServersInfo() {
+
+    Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+
+    for (String server : servers)
+    {
+      // Connect to server and get info about it
+      ServerInfo serverInfo = performPhaseOneHandshake(server, false);
+
+      // Store server info in list
+      if (serverInfo != null)
+      {
+        rsInfos.put(server, serverInfo);
+      }
+    }
+
+    return rsInfos;
+  }
+
+  /**
    * Special aspects of connecting as ECL compared to connecting as data server
    * are :
    * - 1 single RS configured
-   * - so no choice of the prefered RS
+   * - so no choice of the preferred RS
    * - No same groupID polling
    * - ?? Heartbeat
    * - Start handshake is :
@@ -358,10 +544,10 @@
     // FIXME:ECL List of RS to connect is for now limited to one RS only
     String bestServer = this.servers.iterator().next();
 
-    ReplServerStartMsg inReplServerStartMsg
+    ReplServerStartDSMsg inReplServerStartDSMsg
       = performECLPhaseOneHandshake(bestServer, true);
 
-    if (inReplServerStartMsg!=null)
+    if (inReplServerStartDSMsg!=null)
       performECLPhaseTwoHandshake(bestServer);
   }
 
@@ -392,8 +578,6 @@
    */
   private void connectAsDataServer()
   {
-    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
-
     // May have created a broker with null replication domain for
     // unit test purpose.
     if (domain != null)
@@ -418,24 +602,12 @@
        */
       if (debugEnabled())
         TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
-            " order to elect the prefered one");
-      for (String server : servers)
-      {
-        // Connect to server and get reply message
-        ReplServerStartMsg replServerStartMsg =
-          performPhaseOneHandshake(server, false);
+            " order to elect the preferred one");
 
-        // Store reply message info in list
-        if (replServerStartMsg != null)
-        {
-          ServerInfo serverInfo =
-            new ServerInfo(replServerStartMsg.getServerState(),
-            replServerStartMsg.getGroupId());
-          rsInfos.put(server, serverInfo);
-        }
-      } // for servers
+      // Get info from every available replication servers
+      Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
 
-      ReplServerStartMsg replServerStartMsg = null;
+      ServerInfo serverInfo = null;
 
       if (rsInfos.size() > 0)
       {
@@ -446,19 +618,17 @@
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
           TRACER.debugInfo(
-              "phase 2 : will perform PhaseOneH with the prefered RS.");
-        replServerStartMsg = performPhaseOneHandshake(bestServer, true);
+              "phase 2 : will perform PhaseOneH with the preferred RS.");
+        serverInfo = performPhaseOneHandshake(bestServer, true);
 
-        if (replServerStartMsg != null) // Handshake phase 1 exchange went well
+        if (serverInfo != null) // Handshake phase 1 exchange went well
 
         {
-          ServerInfo bestServerInfo = rsInfos.get(bestServer);
-
           // Compute in which status we are starting the session to tell the RS
           ServerStatus initStatus =
-            computeInitialServerStatus(replServerStartMsg.getGenerationId(),
-            bestServerInfo.getServerState(),
-            replServerStartMsg.getDegradedStatusThreshold(),
+            computeInitialServerStatus(serverInfo.getGenerationId(),
+            serverInfo.getServerState(),
+            serverInfo.getDegradedStatusThreshold(),
             this.getGenerationID());
 
           // Perfom session start (handshake phase 2)
@@ -485,7 +655,7 @@
                * reconnection at that time to retrieve a server with our group
                * id.
                */
-              byte tmpRsGroupId = bestServerInfo.getGroupId();
+              byte tmpRsGroupId = serverInfo.getGroupId();
               boolean someServersWithSameGroupId =
                 hasSomeServerWithSameGroupId(topologyMsg.getRsList());
 
@@ -493,10 +663,10 @@
               if ((tmpRsGroupId == groupId) ||
                 ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
               {
-                replicationServer = tmpReadableServerName;
-                maxSendWindow = replServerStartMsg.getWindowSize();
-                rsGroupId = replServerStartMsg.getGroupId();
-                rsServerId = replServerStartMsg.getServerId();
+                replicationServer = session.getReadableRemoteAddress();
+                maxSendWindow = serverInfo.getWindowSize();
+                rsGroupId = serverInfo.getGroupId();
+                rsServerId = serverInfo.getServerId();
                 rsServerUrl = bestServer;
 
                 // May have created a broker with null replication domain for
@@ -504,8 +674,8 @@
                 if (domain != null)
                 {
                   domain.sessionInitiated(
-                      initStatus, replServerStartMsg.getServerState(),
-                      replServerStartMsg.getGenerationId(),
+                      initStatus, serverInfo.getServerState(),
+                      serverInfo.getGenerationId(),
                       session);
                 }
                 receiveTopo(topologyMsg);
@@ -524,7 +694,7 @@
                  startSameGroupIdPoller();
                 }
                 startRSHeartBeatMonitoring();
-                if (replServerStartMsg.getVersion()
+                if (serverInfo.getProtocolVersion()
                     >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
                 {
                   startChangeTimeHeartBeatPublishing();
@@ -584,8 +754,8 @@
         rcvWindow = maxRcvWindow;
         connectPhaseLock.notify();
 
-        if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
-          (replServerStartMsg.getGenerationId() == -1))
+        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
+          (serverInfo.getGenerationId() == -1))
         {
           Message message =
             NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -602,7 +772,7 @@
             baseDn.toString(),
             replicationServer,
             Long.toString(this.getGenerationID()),
-            Long.toString(replServerStartMsg.getGenerationId()));
+            Long.toString(serverInfo.getGenerationId()));
           logError(message);
         }
       } else
@@ -709,19 +879,19 @@
   /**
    * Connect to the provided server performing the first phase handshake
    * (start messages exchange) and return the reply message from the replication
-   * server.
+   * server, wrapped in a ServerInfo object.
    *
    * @param server Server to connect to.
    * @param keepConnection Do we keep session opened or not after handshake.
    *        Use true if want to perform handshake phase 2 with the same session
    *        and keep the session to create as the current one.
-   * @return The ReplServerStartMsg the server replied. Null if could not
+   * @return The answer from the server . Null if could not
    *         get an answer.
    */
-  private ReplServerStartMsg performPhaseOneHandshake(String server,
+  private ServerInfo performPhaseOneHandshake(String server,
     boolean keepConnection)
   {
-    ReplServerStartMsg replServerStartMsg = null;
+    ServerInfo serverInfo = null;
 
     // Parse server string.
     int separator = server.lastIndexOf(':');
@@ -738,8 +908,6 @@
       int intPort = Integer.parseInt(port);
       InetSocketAddress serverAddr = new InetSocketAddress(
         InetAddress.getByName(hostname), intPort);
-      if (keepConnection)
-        tmpReadableServerName = serverAddr.toString();
       Socket socket = new Socket();
       socket.setReceiveBufferSize(1000000);
       socket.setTcpNoDelay(true);
@@ -759,19 +927,23 @@
       localSession.publish(serverStartMsg);
 
       /*
-       * Read the ReplServerStartMsg that should come back.
+       * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
+       * back.
        */
-      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+      ReplicationMsg msg = localSession.receive();
 
       if (debugEnabled())
-      {
-        TRACER.debugInfo("In RB for " + baseDn +
-          "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
-          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
-      }
+        {
+          TRACER.debugInfo("In RB for " + baseDn +
+            "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
+            "\nAND RECEIVED:\n" + msg.toString());
+        }
+
+      // Wrap received message in a server info object
+      serverInfo = ServerInfo.newServerInfo(msg);
 
       // Sanity check
-      String repDn = replServerStartMsg.getBaseDn();
+      String repDn = serverInfo.getBaseDn();
       if (!(this.baseDn.equals(repDn)))
       {
         Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -786,7 +958,7 @@
        * if it is an old replication server).
        */
       protocolVersion = ProtocolVersion.minWithCurrent(
-          replServerStartMsg.getVersion());
+        serverInfo.getProtocolVersion());
       localSession.setProtocolVersion(protocolVersion);
 
 
@@ -839,10 +1011,25 @@
     {
       if (localSession != null)
       {
+        if (debugEnabled())
+          TRACER.debugInfo("In RB, closing session after phase 1");
+
+        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        {
+          // V4 protocol introduces a StopMsg to properly end communications
+          if (!error)
+          {
+            try
+            {
+              localSession.publish(new StopMsg());
+            } catch (IOException ioe)
+            {
+              // Anyway, going to close session, so nothing to do
+            }
+          }
+        }
         try
         {
-          if (debugEnabled())
-            TRACER.debugInfo("In RB, closing session after phase 1");
           localSession.close();
         } catch (IOException e)
         {
@@ -852,7 +1039,7 @@
       }
       if (error)
       {
-        replServerStartMsg = null;
+        serverInfo = null;
       } // Be sure to return null.
 
     }
@@ -864,7 +1051,7 @@
       session = localSession;
     }
 
-    return replServerStartMsg;
+    return serverInfo;
   }
 
   /**
@@ -876,13 +1063,13 @@
    * @param keepConnection Do we keep session opened or not after handshake.
    *        Use true if want to perform handshake phase 2 with the same session
    *        and keep the session to create as the current one.
-   * @return The ReplServerStartMsg the server replied. Null if could not
+   * @return The ReplServerStartDSMsg the server replied. Null if could not
    *         get an answer.
    */
-  private ReplServerStartMsg performECLPhaseOneHandshake(String server,
+  private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
     boolean keepConnection)
   {
-    ReplServerStartMsg replServerStartMsg = null;
+    ReplServerStartDSMsg replServerStartDSMsg = null;
 
     // Parse server string.
     int separator = server.lastIndexOf(':');
@@ -899,8 +1086,6 @@
       int intPort = Integer.parseInt(port);
       InetSocketAddress serverAddr = new InetSocketAddress(
         InetAddress.getByName(hostname), intPort);
-      if (keepConnection)
-        tmpReadableServerName = serverAddr.toString();
       Socket socket = new Socket();
       socket.setReceiveBufferSize(1000000);
       socket.setTcpNoDelay(true);
@@ -920,17 +1105,17 @@
       localSession.publish(serverStartECLMsg);
 
       // Read the ReplServerStartMsg that should come back.
-      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+      replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
 
       if (debugEnabled())
       {
         TRACER.debugInfo("In RB for " + baseDn +
           "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
-          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
+          "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
       }
 
       // Sanity check
-      String repDn = replServerStartMsg.getBaseDn();
+      String repDn = replServerStartDSMsg.getBaseDn();
       if (!(this.baseDn.equals(repDn)))
       {
         Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -946,7 +1131,7 @@
        */
       if (keepConnection)
         protocolVersion = ProtocolVersion.minWithCurrent(
-          replServerStartMsg.getVersion());
+          replServerStartDSMsg.getVersion());
       localSession.setProtocolVersion(protocolVersion);
 
       if (!isSslEncryption)
@@ -998,10 +1183,22 @@
     {
       if (localSession != null)
       {
+        if (debugEnabled())
+          TRACER.debugInfo("In RB, closing session after phase 1");
+
+        // V4 protocol introduces a StopMsg to properly end communications
+        if (!error)
+        {
+          try
+          {
+            localSession.publish(new StopMsg());
+          } catch (IOException ioe)
+          {
+            // Anyway, going to close session, so nothing to do
+          }
+        }
         try
         {
-          if (debugEnabled())
-            TRACER.debugInfo("In RB, closing session after phase 1");
           localSession.close();
         } catch (IOException e)
         {
@@ -1011,7 +1208,7 @@
       }
       if (error)
       {
-        replServerStartMsg = null;
+        replServerStartDSMsg = null;
       } // Be sure to return null.
 
     }
@@ -1023,7 +1220,7 @@
       session = localSession;
     }
 
-    return replServerStartMsg;
+    return replServerStartDSMsg;
   }
 
   /**
@@ -1184,8 +1381,7 @@
    * @return The computed best replication server.
    */
   public static String computeBestReplicationServer(ServerState myState,
-    HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn,
-    byte groupId)
+    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
   {
     /*
      * Preference is given to servers with the requested group id:
@@ -1195,7 +1391,7 @@
      */
 
     // Filter for servers with same group id
-    HashMap<String, ServerInfo> sameGroupIdRsInfos =
+    Map<String, ServerInfo> sameGroupIdRsInfos =
       new HashMap<String, ServerInfo>();
 
     for (String repServer : rsInfos.keySet())
@@ -1231,7 +1427,7 @@
    * @return The computed best replication server.
    */
   private static String searchForBestReplicationServer(ServerState myState,
-    HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn)
+    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
   {
     /*
      * Find replication servers who are up to date (or more up to date than us,
@@ -1266,7 +1462,7 @@
     HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
 
     /*
-     * Start loop to differenciate up to date servers from late ones.
+     * Start loop to differentiate up to date servers from late ones.
      */
     ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
     if (myChangeNumber == null)
@@ -1321,6 +1517,7 @@
         if (ReplicationServer.isLocalReplicationServer(upServer))
         {
           localRS = true;
+          break;
         }
       }
       if (localRS)
@@ -1459,7 +1656,8 @@
         new HeartbeatMonitor("Replication Heartbeat Monitor on RS " +
         getReplicationServer() + " " + rsServerId + " for " + baseDn +
         " in DS " + serverId,
-        session, heartbeatInterval);
+        session, heartbeatInterval, (protocolVersion >=
+        ProtocolVersion.REPLICATION_PROTOCOL_V4));
       heartbeatMonitor.start();
     }
   }
@@ -1513,16 +1711,28 @@
    */
   public void reStart(ProtocolSession failingSession)
   {
-    try
+
+    if (failingSession != null)
     {
-      if (failingSession != null)
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // V4 protocol introduces a StopMsg to properly end communications
+        try
+        {
+          failingSession.publish(new StopMsg());
+        } catch (IOException ioe)
+        {
+          // Anyway, going to close session, so nothing to do
+        }
+      }
+      try
       {
         failingSession.close();
-        numLostConnections++;
+      } catch (IOException e1)
+      {
+        // ignore
       }
-    } catch (IOException e1)
-    {
-      // ignore
+      numLostConnections++;
     }
 
     if (failingSession == session)
@@ -1708,6 +1918,19 @@
           TopologyMsg topoMsg = (TopologyMsg)msg;
           receiveTopo(topoMsg);
         }
+        else if (msg instanceof StopMsg)
+        {
+          /*
+           * RS performs a proper disconnection
+           */
+          Message message =
+            NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
+            Integer.toString(rsServerId), baseDn.toString(),
+            Integer.toString(serverId));
+          logError(message);
+          // Try to find a suitable RS
+          this.reStart(failingSession);
+        }
         else
         {
           return msg;
@@ -1723,10 +1946,10 @@
 
           {
             /*
-             * If we did not initiate the close on our side, log a message.
+             * We did not initiate the close on our side, log an error message.
              */
             Message message =
-              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer,
+              ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
                   Integer.toString(rsServerId), baseDn.toString(),
                   Integer.toString(serverId));
             logError(message);
@@ -1783,14 +2006,26 @@
     rsGroupId = (byte) -1;
     rsServerId = -1;
     rsServerUrl = null;
-    try
+
+    if (session != null)
     {
-      if (session != null)
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // V4 protocol introduces a StopMsg to properly end communications
+        try
+        {
+          session.publish(new StopMsg());
+        } catch (IOException ioe)
+        {
+          // Anyway, going to close session, so nothing to do
+        }
+      }
+      try
       {
         session.close();
+      } catch (IOException e)
+      {
       }
-    } catch (IOException e)
-    {
     }
   }
 
@@ -1896,7 +2131,7 @@
       Collection<String> replicationServers, int window, long heartbeatInterval,
       byte groupId)
   {
-    // These parameters needs to be renegociated with the ReplicationServer
+    // These parameters needs to be renegotiated with the ReplicationServer
     // so if they have changed, that requires restarting the session with
     // the ReplicationServer.
     Boolean needToRestartSession = false;
@@ -1945,7 +2180,7 @@
 
   private boolean debugEnabled()
   {
-    return true;
+    return false;
   }
 
   private static final void debugInfo(String s)
@@ -2057,13 +2292,13 @@
                 continue;
 
               // Connect to server and get reply message
-              ReplServerStartMsg replServerStartMsg =
+              ServerInfo serverInfo =
                 performPhaseOneHandshake(server, false);
 
-              // Store reply message info in list
-              if (replServerStartMsg != null)
+              // Is it a server with our group id ?
+              if (serverInfo != null)
               {
-                if (groupId == replServerStartMsg.getGroupId())
+                if (groupId == serverInfo.getGroupId())
                 {
                   // Found one server with the same group id as us, disconnect
                   // session to force reconnection to a server with same group
@@ -2072,6 +2307,20 @@
                     Byte.toString(groupId), baseDn.toString(),
                     Integer.toString(serverId));
                   logError(message);
+
+                  if (protocolVersion >=
+                    ProtocolVersion.REPLICATION_PROTOCOL_V4)
+                  {
+                    // V4 protocol introduces a StopMsg to properly end
+                    // communications
+                    try
+                    {
+                      session.publish(new StopMsg());
+                    } catch (IOException ioe)
+                    {
+                      // Anyway, going to close session, so nothing to do
+                    }
+                  }
                   try
                   {
                     session.close();

--
Gitblit v1.10.0