From c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 08 Oct 2009 16:02:17 +0000
Subject: [PATCH] - Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS  in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg  contains same thing as ReplServerStartMsg but also contains

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java                                    |    7 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java     |   68 +
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java                                 |  127 +++
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java                                            |   70 ++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java  |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java                                   |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java                                        |  108 +-
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java                                     |   12 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                              |   30 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java                                         |   29 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java                                      |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java                             |   39 +
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java  |   27 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                    |   31 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java                                         |   18 
 opendj-sdk/opends/src/messages/messages/replication.properties                                                              |   23 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java               |    8 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java             |    5 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java                               |  408 ++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java                                         |   34 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java |   23 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java                                    |    9 
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                   |  467 ++++++++++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java                                    |  120 ++
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java                                            |    2 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java        |   13 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java       |    6 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java                                  |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java                                   |   21 
 opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml                             |   35 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                                     |   79 ++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java                                      |    4 
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |    2 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java        |  141 +++
 opendj-sdk/opends/resource/schema/02-config.ldif                                                                            |    8 
 36 files changed, 1,655 insertions(+), 347 deletions(-)

diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index 8f0c851..f1f179d 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -2453,6 +2453,11 @@
   NAME 'ds-cfg-ecl-include'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.603
+  NAME 'ds-cfg-weight'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler'
   SUP top
@@ -3130,7 +3135,8 @@
         ds-cfg-replication-purge-delay $
         ds-cfg-group-id $
         ds-cfg-assured-timeout $
-        ds-cfg-degraded-status-threshold)
+        ds-cfg-degraded-status-threshold $
+        ds-cfg-weight)
   X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.65
   NAME 'ds-backup-directory'
diff --git a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
index 812bad9..6c91def 100644
--- a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
+++ b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -216,11 +216,11 @@
   </adm:property>
   <adm:property name="assured-timeout" mandatory="false">
     <adm:synopsis>
-      The timeout value when waiting for assured mode acknowledgements.
+      The timeout value when waiting for assured mode acknowledgments.
     </adm:synopsis>
     <adm:description>
       Defines the amount of milliseconds the replication server will wait for
-      assured acknowledgements (in either Safe Data or Safe Read assured sub
+      assured acknowledgments (in either Safe Data or Safe Read assured sub
       modes) before forgetting them and answer to the entity that sent an update
       and is waiting for acknowledgment.
     </adm:description>
@@ -265,4 +265,35 @@
       </ldap:attribute>
     </adm:profile>
   </adm:property>
+  <adm:property name="weight" mandatory="false">
+    <adm:synopsis>
+      The weight of the replication server.
+    </adm:synopsis>
+    <adm:description>
+      The weight affected to the replication server.
+      Each replication server of the topology has a weight. When combined
+      together, the weights of the replication servers of a same group can be
+      translated to a percentage that determines the quantity of directory
+      servers of the topology that should be connected to a replication server.
+      For instance imagine a topology with 3 replication servers (with the same
+      group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
+      RS1 should have 25% of the directory servers connected in the topology,
+      RS2 25%, and RS3 50%. This may be useful if the replication servers of the
+      topology have a different power and one wants to spread the load between
+      the replication servers according to their power.
+    </adm:description>
+    <adm:default-behavior>
+      <adm:defined>
+        <adm:value>1</adm:value>
+      </adm:defined>
+    </adm:default-behavior>
+    <adm:syntax>
+      <adm:integer lower-limit="0"></adm:integer>
+    </adm:syntax>
+    <adm:profile name="ldap">
+      <ldap:attribute>
+        <ldap:name>ds-cfg-weight</ldap:name>
+      </ldap:attribute>
+    </adm:profile>
+  </adm:property>
 </adm:managed-object>
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index ba4ca5b..71e4264 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -73,7 +73,6 @@
  base dn : %s
 MILD_ERR_ERROR_SEARCHING_RUV_15=Error %s when searching for server state %s : \
  %s base dn : %s
-NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server %s
 NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
  listening on %s
 NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \
@@ -175,12 +174,13 @@
 NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
  for domain %s with replication server id %s %s - local server id is %s - data \
  generation is %s
-NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \
- %s has been dropped by the Replication Server for %s in local server id %s
+NOTICE_REPLICATION_SERVER_PROPERLY_DISCONNECTED_63=Replication server %s \
+ %s has properly disconnected for %s in local server id %s. Trying to reconnect \
+ to a suitable replication server
 SEVERE_ERR_CHANGELOG_ERROR_SENDING_ERROR_65=An unexpected error occurred \
  while sending an Error Message to %s. This connection is going to be closed \
  and reopened
-SEVERE_ERR_CHANGELOG_ERROR_SENDING_MSG_66=An unexpected error occurred  while \
+SEVERE_ERR_CHANGELOG_ERROR_SENDING_MSG_66=An unexpected error occurred while \
  sending a Message to %s. This connection is going to be closed and reopened
 MILD_ERR_ERROR_REPLAYING_OPERATION_67=Could not replay operation %s with \
  ChangeNumber %s error %s %s
@@ -409,7 +409,7 @@
 NOTICE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL_172=The export of \
  domain %s from server %s to all other servers of the topology is forbidden as \
  the source server has some fractional configuration : only fractional servers \
- in a replicated topology does not makes sense
+ in a replicated topology does not make sense
 MILD_ERR_DRAFT_CHANGENUMBER_DATABASE_173=An error occurred when accessing the \
  database of the draft change number : %s
 SEVERE_ERR_INITIALIZATION_FAILED_NOCONN_174=The initialization failed because \
@@ -423,4 +423,15 @@
 NOTICE_ERR_LDIF_IMPORT_FRACTIONAL_DATA_SET_IS_FRACTIONAL_177=The LDIF import \
  for importing suffix %s data has been stopped due to fractional configuration \
  inconsistency : imported data set has some fractional configuration but not \
- local server
\ No newline at end of file
+ local server
+SEVERE_ERR_DS_DISCONNECTED_DURING_HANDSHAKE_178=Directory server %s was \
+ attempting to connect to replication server %s but has disconnected in \
+ handshake phase
+SEVERE_ERR_RS_DISCONNECTED_DURING_HANDSHAKE_179=Replication server %s was \
+ attempting to connect to replication server %s but has disconnected in \
+ handshake phase
+SEVERE_ERR_REPLICATION_SERVER_BADLY_DISCONNECTED_180=The connection to \
+ replication server %s %s has been unexpectedly dropped by the replication \
+ server for %s in local server id %s
+SEVERE_ERR_SERVER_BADLY_DISCONNECTED_181= %s has badly disconnected from this \
+ replication server %s
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index b950162..898b387 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4472,7 +4472,7 @@
    * Verifies that the given string represents a valid source
    * from which this server can be initialized.
    * @param sourceString The string representing the source
-   * @return The source as a short value
+   * @return The source as a integer value
    * @throws DirectoryException if the string is not valid
    */
   public int decodeSource(String sourceString)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
index fdc2197..52798ad 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
@@ -41,7 +41,7 @@
    * Creates a message.
    *
    * @param serverID The sender server of this message.
-   * @param i The server or servers targetted by this message.
+   * @param i The server or servers targeted by this message.
    */
   public DoneMsg(int serverID, int i)
   {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
index a60b658..518435b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -69,6 +69,11 @@
    */
   private boolean shutdown = false;
 
+  /**
+   * Send StopMsg before session closure or not.
+   */
+  private boolean sendStopBeforeClose = false;
+
 
   /**
    * Create a heartbeat monitor thread.
@@ -76,13 +81,16 @@
    * @param session The session on which heartbeats are to be monitored.
    * @param heartbeatInterval The expected interval between heartbeats received
    * (in milliseconds).
+   * @param sendStopBeforeClose Should we send a StopMsg before closing the
+   *        session ?
    */
   public HeartbeatMonitor(String threadName, ProtocolSession session,
-                          long heartbeatInterval)
+                          long heartbeatInterval, boolean sendStopBeforeClose)
   {
     super(threadName);
     this.session = session;
     this.heartbeatInterval = heartbeatInterval;
+    this.sendStopBeforeClose = sendStopBeforeClose;
   }
 
   /**
@@ -117,6 +125,17 @@
           {
             // Heartbeat is well overdue so the server is assumed to be dead.
             logError(NOTE_HEARTBEAT_FAILURE.get(currentThread().getName()));
+            if (sendStopBeforeClose)
+            {
+              // V4 protocol introduces a StopMsg to properly end communications
+              try
+              {
+                session.publish(new StopMsg());
+              } catch(IOException ioe)
+              {
+                // Anyway, going to close session, so nothing to do
+              }
+            }
             session.close();
             break;
           }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 7b7e01d..e3f6e48 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -82,11 +82,6 @@
   SubTopoMonitorData data = new SubTopoMonitorData();
 
   /**
-   * The protocolVersion that should be used when serializing this message.
-   */
-  private final short protocolVersion;
-
-  /**
    * Creates a new MonitorMsg.
    *
    * @param sender The sender of this message.
@@ -95,25 +90,8 @@
   public MonitorMsg(int sender, int destination)
   {
     super(sender, destination);
-    protocolVersion = ProtocolVersion.getCurrentVersion();
   }
 
-
-  /**
-   * Creates a new MonitorMsg with a specific protocol version.
-   *
-   * @param sender                The sender of this message.
-   * @param destination           The destination of this message.
-   * @param replicationProtocol   The protocol version to use.
-   */
-  public MonitorMsg(int sender, int destination,
-      short replicationProtocol)
-  {
-    super(sender, destination);
-    protocolVersion = replicationProtocol;
-  }
-
-
   /**
    * Sets the state of the replication server.
    * @param state The state.
@@ -204,7 +182,6 @@
    */
   public MonitorMsg(byte[] in, short version) throws DataFormatException
   {
-    protocolVersion = ProtocolVersion.getCurrentVersion();
     ByteSequenceReader reader = ByteString.wrap(in).asReader();
 
     if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -328,6 +305,17 @@
    */
   @Override
   public byte[] getBytes()
+  throws UnsupportedEncodingException
+  {
+    return getBytes(ProtocolVersion.getCurrentVersion());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes(short protocolVersion)
+     throws UnsupportedEncodingException
   {
     try
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
index 47c380e..db05464 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -42,7 +42,7 @@
    * Creates a message.
    *
    * @param serverID The sender server of this message.
-   * @param destination The server or servers targetted by this message.
+   * @param destination The server or servers targeted by this message.
    */
   public MonitorRequestMsg(int serverID, int destination)
   {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
index 4825445..ee31cb6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -116,6 +116,13 @@
    */
   public abstract String getRemoteAddress();
 
+  /**
+   * Retrieve the human readable address of the remote server.
+   *
+   * @return The human readable address of the remote server.
+   */
+  public abstract String getReadableRemoteAddress();
+
 
   /**
   * Set a timeout value.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 997267d..b2f095c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -54,9 +54,12 @@
   public static final short REPLICATION_PROTOCOL_V3 = 3;
 
   /**
-   * 4th version of the replication protocol.
-   * Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
-   * ECL entry attributes.
+   * The constant for the 4th version of the replication protocol.
+   * - Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
+   *   ECL entry attributes.
+   * - Modified algorithm for choosing a RS to connect to: introduction of a
+   *   ReplicationServerDSMsg message.
+   * - Introduction of a StopMsg for proper connections ending.
    */
   public static final short REPLICATION_PROTOCOL_V4 = 4;
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
new file mode 100644
index 0000000..5c507ec
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
@@ -0,0 +1,408 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+import org.opends.server.replication.common.ServerState;
+
+/**
+ * Message sent by a replication server to a directory server in reply to the
+ * ServerStartMsg.
+ */
+public class ReplServerStartDSMsg extends StartMsg
+{
+  private int serverId;
+  private String serverURL;
+  private String baseDn = null;
+  private int windowSize;
+  private ServerState serverState;
+
+  /**
+   * Whether to continue using SSL to encrypt messages after the start
+   * messages have been exchanged.
+   */
+  private boolean sslEncryption;
+
+  /**
+   * Threshold value used by the RS to determine if a DS must be put in
+   * degraded status because the number of pending changes for him has crossed
+   * this value. This field is only used by a DS.
+   */
+  private int degradedStatusThreshold = -1;
+
+  /**
+   * The weight affected to the replication server.
+   */
+  private int weight = -1;
+
+  /**
+   * Number of currently connected DS to the replication server.
+   */
+  private int connectedDSNumber = -1;
+
+  /**
+   * Create a ReplServerStartDSMsg.
+   *
+   * @param serverId replication server id
+   * @param serverURL replication server URL
+   * @param baseDn base DN for which the ReplServerStartDSMsg is created.
+   * @param windowSize The window size.
+   * @param serverState our ServerState for this baseDn.
+   * @param protocolVersion The replication protocol version of the creator.
+   * @param generationId The generationId for this server.
+   * @param sslEncryption Whether to continue using SSL to encrypt messages
+   *                      after the start messages have been exchanged.
+   * @param groupId The group id of the RS
+   * @param degradedStatusThreshold The degraded status threshold
+   * @param weight The weight affected to the replication server.
+   * @param connectedDSNumber Number of currently connected DS to the
+   *        replication server.
+   */
+  public ReplServerStartDSMsg(int serverId, String serverURL, String baseDn,
+                               int windowSize,
+                               ServerState serverState,
+                               short protocolVersion,
+                               long generationId,
+                               boolean sslEncryption,
+                               byte groupId,
+                               int degradedStatusThreshold,
+                               int weight,
+                               int connectedDSNumber)
+  {
+    super(protocolVersion, generationId);
+    this.serverId = serverId;
+    this.serverURL = serverURL;
+    if (baseDn != null)
+      this.baseDn = baseDn;
+    else
+      this.baseDn = null;
+    this.windowSize = windowSize;
+    this.serverState = serverState;
+    this.sslEncryption = sslEncryption;
+    this.groupId = groupId;
+    this.degradedStatusThreshold = degradedStatusThreshold;
+    this.weight = weight;
+    this.connectedDSNumber = connectedDSNumber;
+  }
+
+  /**
+   * Creates a new ReplServerStartDSMsg by decoding the provided byte array.
+   * @param in A byte array containing the encoded information for the
+   *             ReplServerStartDSMsg
+   * @throws DataFormatException If the in does not contain a properly
+   *                             encoded ReplServerStartDSMsg.
+   */
+  public ReplServerStartDSMsg(byte[] in) throws DataFormatException
+  {
+    byte[] allowedPduTypes = new byte[1];
+    allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START_DS;
+    headerLength = decodeHeader(allowedPduTypes, in);
+
+    try
+    {
+      /* The ReplServerStartDSMsg payload is stored in the form :
+       * <baseDn><serverId><serverURL><windowSize><sslEncryption>
+       * <degradedStatusThreshold><weight><connectedDSNumber>
+       * <serverState>
+       */
+
+      /* first bytes are the header */
+      int pos = headerLength;
+
+      /* read the dn
+       * first calculate the length then construct the string
+       */
+      int length = getNextLength(in, pos);
+      baseDn = new String(in, pos, length, "UTF-8");
+      pos += length +1;
+
+      /*
+       * read the ServerId
+       */
+      length = getNextLength(in, pos);
+      String serverIdString = new String(in, pos, length, "UTF-8");
+      serverId = Integer.valueOf(serverIdString);
+      pos += length +1;
+
+      /*
+       * read the ServerURL
+       */
+      length = getNextLength(in, pos);
+      serverURL = new String(in, pos, length, "UTF-8");
+      pos += length +1;
+
+      /*
+       * read the window size
+       */
+      length = getNextLength(in, pos);
+      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length +1;
+
+      /*
+       * read the sslEncryption setting
+       */
+      length = getNextLength(in, pos);
+      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length +1;
+
+      /**
+       * read the degraded status threshold
+       */
+      length = getNextLength(in, pos);
+      degradedStatusThreshold =
+        Integer.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length + 1;
+
+      /*
+       * read the weight
+       */
+      length = getNextLength(in, pos);
+      String weightString = new String(in, pos, length, "UTF-8");
+      weight = Integer.valueOf(weightString);
+      pos += length +1;
+
+      /*
+       * read the connected DS number
+       */
+      length = getNextLength(in, pos);
+      String connectedDSNumberString = new String(in, pos, length, "UTF-8");
+      connectedDSNumber = Integer.valueOf(connectedDSNumberString);
+      pos += length +1;
+
+      // Read the ServerState
+      // Caution: ServerState MUST be the last field. Because ServerState can
+      // contain null character (string termination of serverid string ..) it
+      // cannot be decoded using getNextLength() like the other fields. The
+      // only way is to rely on the end of the input buffer : and that forces
+      // the ServerState to be the last. This should be changed and we want to
+      // have more than one ServerState field.
+      serverState = new ServerState(in, pos, in.length - 1);
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+  }
+
+  /**
+   * Get the Server Id.
+   * @return the server id
+   */
+  public int getServerId()
+  {
+    return this.serverId;
+  }
+
+  /**
+   * Get the server URL.
+   * @return the server URL
+   */
+  public String getServerURL()
+  {
+    return this.serverURL;
+  }
+
+  /**
+   * Get the base DN from this ReplServerStartDSMsg.
+   *
+   * @return the base DN from this ReplServerStartDSMsg.
+   */
+  public String getBaseDn()
+  {
+    return baseDn;
+  }
+
+  /**
+   * Get the serverState.
+   * @return Returns the serverState.
+   */
+  public ServerState getServerState()
+  {
+    return this.serverState;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  throws UnsupportedEncodingException
+  {
+    return getBytes(ProtocolVersion.getCurrentVersion());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes(short protocolVersion)
+     throws UnsupportedEncodingException
+  {
+    /* The ReplServerStartDSMsg is stored in the form :
+     * <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
+     * <degradedStatusThreshold><weight><connectedDSNumber>
+     * <serverState>
+     */
+
+    byte[] byteDn = baseDn.getBytes("UTF-8");
+    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
+    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
+    byte[] byteServerState = serverState.getBytes();
+    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
+    byte[] byteSSLEncryption =
+      String.valueOf(sslEncryption).getBytes("UTF-8");
+    byte[] byteDegradedStatusThreshold =
+      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
+    byte[] byteWeight =
+      String.valueOf(weight).getBytes("UTF-8");
+    byte[] byteConnectedDSNumber =
+      String.valueOf(connectedDSNumber).getBytes("UTF-8");
+
+    int length = byteDn.length + 1 + byteServerId.length + 1 +
+      byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+      byteSSLEncryption.length + 1 + byteDegradedStatusThreshold.length + 1 +
+      byteWeight.length + 1 + byteConnectedDSNumber.length + 1 +
+      byteServerState.length + 1;
+
+    /* encode the header in a byte[] large enough */
+    byte resultByteArray[] =
+      encodeHeader(MSG_TYPE_REPL_SERVER_START_DS, length, protocolVersion);
+
+    int pos = headerLength;
+
+    /* put the baseDN and a terminating 0 */
+    pos = addByteArray(byteDn, resultByteArray, pos);
+
+    /* put the ServerId */
+    pos = addByteArray(byteServerId, resultByteArray, pos);
+
+    /* put the ServerURL */
+    pos = addByteArray(byteServerUrl, resultByteArray, pos);
+
+    /* put the window size */
+    pos = addByteArray(byteWindowSize, resultByteArray, pos);
+
+    /* put the SSL Encryption setting */
+    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
+
+    /* put the degraded status threshold */
+    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
+
+    /* put the weight */
+    pos = addByteArray(byteWeight, resultByteArray, pos);
+
+    /* put the connected DS number */
+    pos = addByteArray(byteConnectedDSNumber, resultByteArray, pos);
+
+    /* put the ServerState */
+    pos = addByteArray(byteServerState, resultByteArray, pos);
+
+    return resultByteArray;
+  }
+
+  /**
+   * get the window size for the server that created this message.
+   *
+   * @return The window size for the server that created this message.
+   */
+  public int getWindowSize()
+  {
+    return windowSize;
+  }
+
+  /**
+   * Get the SSL encryption value for the server that created the
+   * message.
+   *
+   * @return The SSL encryption value for the server that created the
+   *         message.
+   */
+  public boolean getSSLEncryption()
+  {
+    return sslEncryption;
+  }
+
+  /**
+   * Get the degraded status threshold value.
+   * @return The degraded status threshold value.
+   */
+  public int getDegradedStatusThreshold()
+  {
+    return degradedStatusThreshold;
+  }
+
+  /**
+   * Set the degraded status threshold (For test purpose).
+   * @param degradedStatusThreshold The degraded status threshold to set.
+   */
+  public void setDegradedStatusThreshold(int degradedStatusThreshold)
+  {
+    this.degradedStatusThreshold = degradedStatusThreshold;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString()
+  {
+    return "ReplServerStartDSMsg content: " +
+      "\nprotocolVersion: " + protocolVersion +
+      "\ngenerationId: " + generationId +
+      "\nbaseDn: " + baseDn +
+      "\ngroupId: " + groupId +
+      "\nserverId: " + serverId +
+      "\nserverState: " + serverState +
+      "\nserverURL: " + serverURL +
+      "\nsslEncryption: " + sslEncryption +
+      "\ndegradedStatusThreshold: " + degradedStatusThreshold +
+      "\nwindowSize: " + windowSize +
+      "\nweight: " + weight +
+      "\nconnectedDSNumber: " + connectedDSNumber;
+  }
+
+  /**
+   * Gets the weight of the replication server.
+   * @return The weight of the replication server.
+   */
+  public int getWeight()
+  {
+    return weight;
+  }
+
+  /**
+   * Gets the number of directory servers connected to the replication server.
+   * @return The number of directory servers connected to the replication
+   * server.
+   */
+  public int getConnectedDSNumber()
+  {
+    return connectedDSNumber;
+  }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
index 7c83505..6fce977 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -50,6 +50,14 @@
   private boolean sslEncryption;
 
   /**
+   * NOTE: Starting from protocol V4, we introduce a dedicated PDU for answering
+   * to the DS ServerStartMsg. This is the ReplServerStartDSMsg. So the
+   * degradedStatusThreshold value being used only by a DS, it could be removed
+   * from the ReplServerStartMsg PDU. However for a smoothly transition to V4
+   * protocol, we prefer to let this variable also in this PDU but the one
+   * really used is in the ReplServerStartDSMsg PDU. This prevents from having
+   * only RSv3 able to connect to RSv4 as connection initiator.
+   *
    * Threshold value used by the RS to determine if a DS must be put in
    * degraded status because the number of pending changes for him has crossed
    * this value. This field is only used by a DS.
@@ -108,6 +116,15 @@
     allowedPduTypes[1] = MSG_TYPE_REPL_SERVER_START_V1;
     headerLength = decodeHeader(allowedPduTypes, in);
 
+    // Protocol version has been read as part of the header:
+    // decode the body according to the protocol version read in the header
+    switch(protocolVersion)
+    {
+      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
+        decodeBody_V1(in, headerLength);
+        return;
+    }
+
     try
     {
       /* The ReplServerStartMsg payload is stored in the form :
@@ -154,22 +171,85 @@
       sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
       pos += length +1;
 
-      // For easiness (no additional method), simply compare PDU type to
-      // know if we have to read new parameters of V2
-      if (in[0] == MSG_TYPE_REPL_SERVER_START)
-      {
-        /**
-         * read the degraded status threshold
-         */
-        length = getNextLength(in, pos);
-        degradedStatusThreshold =
-          Integer.valueOf(new String(in, pos, length, "UTF-8"));
-        pos += length + 1;
-      }
+      /**
+       * read the degraded status threshold
+       */
+      length = getNextLength(in, pos);
+      degradedStatusThreshold =
+        Integer.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length + 1;
 
       // Read the ServerState
       // Caution: ServerState MUST be the last field. Because ServerState can
-      // contain null character (string termination of sererid string ..) it
+      // contain null character (string termination of serverid string ..) it
+      // cannot be decoded using getNextLength() like the other fields. The
+      // only way is to rely on the end of the input buffer : and that forces
+      // the ServerState to be the last. This should be changed and we want to
+      // have more than one ServerState field.
+      serverState = new ServerState(in, pos, in.length - 1);
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+  }
+
+  /**
+   * Decodes the body of a just received ReplServerStartMsg. The body is in the
+   * passed array, and starts at the provided location. This is for a PDU
+   * encoded in V1 protocol version.
+   * @param in A byte array containing the body for the ReplServerStartMsg
+   * @param pos The position in the array where the decoding should start
+   * @throws DataFormatException If the in does not contain a properly
+   *                             encoded ReplServerStartMsg.
+   */
+  public void decodeBody_V1(byte[] in, int pos) throws DataFormatException
+  {
+    try
+    {
+      /* The ReplServerStartMsg payload is stored in the form :
+       * <baseDn><serverId><serverURL><windowSize><sslEncryption>
+       * <serverState>
+       */
+
+      /* read the dn
+       * first calculate the length then construct the string
+       */
+      int length = getNextLength(in, pos);
+      baseDn = new String(in, pos, length, "UTF-8");
+      pos += length +1;
+
+      /*
+       * read the ServerId
+       */
+      length = getNextLength(in, pos);
+      String serverIdString = new String(in, pos, length, "UTF-8");
+      serverId = Integer.valueOf(serverIdString);
+      pos += length +1;
+
+      /*
+       * read the ServerURL
+       */
+      length = getNextLength(in, pos);
+      serverURL = new String(in, pos, length, "UTF-8");
+      pos += length +1;
+
+      /*
+       * read the window size
+       */
+      length = getNextLength(in, pos);
+      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length +1;
+
+      /*
+       * read the sslEncryption setting
+       */
+      length = getNextLength(in, pos);
+      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length +1;
+
+      // Read the ServerState
+      // Caution: ServerState MUST be the last field. Because ServerState can
+      // contain null character (string termination of serverid string ..) it
       // cannot be decoded using getNextLength() like the other fields. The
       // only way is to rely on the end of the input buffer : and that forces
       // the ServerState to be the last. This should be changed and we want to
@@ -235,8 +315,12 @@
   public byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException
   {
-    if  (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
-      return getBytes_V1();
+    // If an older version requested, encode in the requested way
+    switch(protocolVersion)
+    {
+      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
+        return getBytes_V1();
+    }
 
     /* The ReplServerStartMsg is stored in the form :
      * <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
@@ -254,12 +338,12 @@
       String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
 
     int length = byteDn.length + 1 + byteServerId.length + 1 +
-    byteServerUrl.length + 1 + byteWindowSize.length + 1 +
-    byteSSLEncryption.length + 1 +
-    byteDegradedStatusThreshold.length + 1 +
-    byteServerState.length + 1;
+      byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+      byteSSLEncryption.length + 1 +
+      byteDegradedStatusThreshold.length + 1 +
+      byteServerState.length + 1;
 
-    /* encode the header in a byte[] large enough to also contain the mods */
+    /* encode the header in a byte[] large enough */
     byte resultByteArray[] =
       encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
 
@@ -377,7 +461,7 @@
                    byteSSLEncryption.length + 1 +
                    byteServerState.length + 1;
 
-      /* encode the header in a byte[] large enough to also contain the mods */
+      /* encode the header in a byte[] large enough */
       byte resultByteArray[] = encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1,
         length);
       int pos = headerLength;
@@ -407,5 +491,4 @@
       return null;
     }
   }
-
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 466d884..e2ba2e3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -71,12 +71,16 @@
   static final byte MSG_TYPE_CHANGE_STATUS = 28;
   static final byte MSG_TYPE_GENERIC_UPDATE = 29;
 
-  // Protocol version : 3
+  // Added for protocol version 3
   static final byte MSG_TYPE_START_ECL = 30;
   static final byte MSG_TYPE_START_ECL_SESSION = 31;
   static final byte MSG_TYPE_ECL_UPDATE = 32;
   static final byte MSG_TYPE_CT_HEARTBEAT = 33;
 
+  // Added for protocol version 4
+  static final byte MSG_TYPE_REPL_SERVER_START_DS = 34;
+  static final byte MSG_TYPE_STOP = 35;
+
   // Adding a new type of message here probably requires to
   // change accordingly generateMsg method below
 
@@ -238,6 +242,12 @@
       case MSG_TYPE_CT_HEARTBEAT:
         msg = new ChangeTimeHeartbeatMsg(buffer);
       break;
+      case MSG_TYPE_REPL_SERVER_START_DS:
+        msg = new ReplServerStartDSMsg(buffer);
+      break;
+      case MSG_TYPE_STOP:
+        msg = new StopMsg(buffer);
+      break;
       default:
         throw new DataFormatException("received message with unknown type");
     }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 0631618..18cf6d3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -232,6 +232,14 @@
   /**
    * {@inheritDoc}
    */
+  public String getReadableRemoteAddress()
+  {
+    return socket.getRemoteSocketAddress().toString();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
   public void setSoTimeout(int timeout) throws SocketException
   {
     socket.setSoTimeout(timeout);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
new file mode 100644
index 0000000..e6c260a
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
@@ -0,0 +1,70 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is part of the replication protocol.
+ * This message is sent by a server to tell a peer the communication will be
+ * terminated.
+ */
+public class StopMsg extends ReplicationMsg
+{
+  /**
+   * Creates a message.
+   */
+  public StopMsg()
+  {
+  }
+
+  /**
+   * Creates a new message by decoding the provided byte array.
+   * @param in A byte array containing the encoded information for the message,
+   * @throws DataFormatException If the in does not contain a properly,
+   *                             encoded message.
+   */
+  public StopMsg(byte[] in) throws DataFormatException
+  {
+    // First byte is the type
+    if (in[0] != MSG_TYPE_STOP)
+      throw new DataFormatException("input is not a valid Stop message: " +
+        in[0]);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  {
+    return new byte[]
+      {
+        MSG_TYPE_STOP
+      };
+  }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index 4b2fc59..c20d895 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -243,6 +243,14 @@
   /**
    * {@inheritDoc}
    */
+  public String getReadableRemoteAddress()
+  {
+    return plainSocket.getRemoteSocketAddress().toString();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
   public void setSoTimeout(int timeout) throws SocketException
   {
     plainSocket.setSoTimeout(timeout);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 3cd91c4..40fe57d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -145,7 +145,21 @@
           try
           {
             if (session != null)
+            {
+              // V4 protocol introduces a StopMsg to properly close the
+              // connection between servers
+              if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+              {
+                try
+                {
+                  session.publish(new StopMsg());
+                } catch (IOException ioe)
+                {
+                  // Anyway, going to close session, so nothing to do
+                }
+              }
               session.close();
+            }
           } catch (IOException e)
           {
             // ignore
@@ -461,7 +475,7 @@
   {
     TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
         this.serverId);
-    session.publish(outTopoMsg);
+    session.publish(outTopoMsg, protocolVersion);
     return outTopoMsg;
   }
   /**
@@ -500,14 +514,13 @@
         return;
       }
 
-      //
-      ReplServerStartMsg outReplServerStartMsg = null;
+      StartMsg outStartMsg = null;
       try
       {
-        outReplServerStartMsg = sendStartToRemote(protocolVersion);
+        outStartMsg = sendStartToRemote(protocolVersion);
 
         // log
-        logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
+        logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
 
         // The session initiator decides whether to use SSL.
         // Until here session is encrypted then it depends on the negotiation
@@ -517,6 +530,13 @@
         // wait and process StartSessionMsg from remote RS
         StartSessionMsg inStartSessionMsg =
           waitAndProcessStartSessionFromRemoteDS();
+        if (inStartSessionMsg == null)
+        {
+          // DS wants to properly close the connection (DS sent a StopMsg)
+          logStopReceived();
+          abortStart(null);
+          return;
+        }
 
         // Send our own TopologyMsg to remote RS
         TopologyMsg outTopoMsg = sendTopoToRemoteDS();
@@ -525,18 +545,12 @@
       }
       catch(IOException e)
       {
-        // We do not want polluting error log if error is due to normal session
-        // aborted after handshake phase one from a DS that is searching for
-        // best suitable RS.
-
-        // don't log a polluting error when connection aborted
-        // from a DS that wanted only to perform handshake phase 1 in order
-        // to determine the best suitable RS:
-        // 1) -> ServerStartMsg
-        // 2) <- ReplServerStartMsg
-        // 3) connection closure
-
-        throw new DirectoryException(ResultCode.OTHER, null, null);
+        Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
+          Integer.toString(inServerStartMsg.getServerId()),
+          Integer.toString(replicationServerDomain.getReplicationServer().
+          getServerId()));
+        logError(errMessage);
+        throw new DirectoryException(ResultCode.OTHER, errMessage);
       }
       catch (NotSupportedOldVersionPDUException e)
       {
@@ -578,6 +592,65 @@
         replicationServerDomain.release();
     }
   }
+
+  /**
+   * Send the ReplServerStartDSMsg to the remote DS.
+   * @param requestedProtocolVersion The provided protocol version.
+   * @return The StartMsg sent.
+   * @throws IOException When an exception occurs.
+   */
+  private StartMsg sendStartToRemote(short requestedProtocolVersion)
+  throws IOException
+  {
+    // Before V4 protocol, we sent a ReplServerStartMsg
+    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
+    {
+
+      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
+      ReplServerStartMsg outReplServerStartMsg
+      = new ReplServerStartMsg(
+          replicationServerId,
+          replicationServerURL,
+          getServiceId(),
+          maxRcvWindow,
+          replicationServerDomain.getDbServerState(),
+          protocolVersion,
+          localGenerationId,
+          sslEncryption,
+          getLocalGroupId(),
+          replicationServerDomain.
+          getReplicationServer().getDegradedStatusThreshold());
+
+      session.publish(outReplServerStartMsg, requestedProtocolVersion);
+
+      return outReplServerStartMsg;
+    }
+    else
+    {
+      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
+      ReplServerStartDSMsg outReplServerStartDSMsg
+      = new ReplServerStartDSMsg(
+          replicationServerId,
+          replicationServerURL,
+          getServiceId(),
+          maxRcvWindow,
+          replicationServerDomain.getDbServerState(),
+          protocolVersion,
+          localGenerationId,
+          sslEncryption,
+          getLocalGroupId(),
+          replicationServerDomain.
+          getReplicationServer().getDegradedStatusThreshold(),
+          replicationServer.getWeight(),
+          replicationServerDomain.getConnectedLDAPservers().size());
+
+
+      session.publish(outReplServerStartDSMsg);
+
+      return outReplServerStartDSMsg;
+    }
+  }
+
   /**
    * Creates a DSInfo structure representing this remote DS.
    * @return The DSInfo structure representing this remote DS
@@ -609,8 +682,10 @@
   }
 
   /**
-   * Wait receiving the StartSessionMsg from the remote DS and process it.
-   * @return the startSessionMsg received
+   * Wait receiving the StartSessionMsg from the remote DS and process it, or
+   * receiving a StopMsg to properly stop the handshake procedure.
+   * @return the startSessionMsg received or null DS sent a stop message to
+   *         not finish the handshake.
    * @throws DirectoryException
    * @throws IOException
    * @throws ClassNotFoundException
@@ -625,7 +700,12 @@
     ReplicationMsg msg = null;
     msg = session.receive();
 
-    if (!(msg instanceof StartSessionMsg))
+    if (msg instanceof StopMsg)
+    {
+      // DS wants to stop handshake (was just for handshake phase one for RS
+      // choice). Return null to make the session be terminated.
+      return null;
+    } else if (!(msg instanceof StartSessionMsg))
     {
       Message message = Message.raw(
           "Protocol error: StartSessionMsg required." + msg + " received.");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 82aacd7..923e1b6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -331,6 +331,64 @@
   }
 
   /**
+   * Send the ReplServerStartDSMsg to the remote ECL server.
+   * @param requestedProtocolVersion The provided protocol version.
+   * @return The StartMsg sent.
+   * @throws IOException When an exception occurs.
+   */
+  private StartMsg sendStartToRemote(short requestedProtocolVersion)
+  throws IOException
+  {
+    // Before V4 protocol, we sent a ReplServerStartMsg
+    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
+    {
+
+      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
+      ReplServerStartMsg outReplServerStartMsg
+      = new ReplServerStartMsg(
+          replicationServerId,
+          replicationServerURL,
+          getServiceId(),
+          maxRcvWindow,
+          replicationServerDomain.getDbServerState(),
+          protocolVersion,
+          localGenerationId,
+          sslEncryption,
+          getLocalGroupId(),
+          replicationServerDomain.
+          getReplicationServer().getDegradedStatusThreshold());
+
+      session.publish(outReplServerStartMsg, requestedProtocolVersion);
+
+      return outReplServerStartMsg;
+    }
+    else
+    {
+      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
+      ReplServerStartDSMsg outReplServerStartDSMsg
+      = new ReplServerStartDSMsg(
+          replicationServerId,
+          replicationServerURL,
+          getServiceId(),
+          maxRcvWindow,
+          replicationServerDomain.getDbServerState(),
+          protocolVersion,
+          localGenerationId,
+          sslEncryption,
+          getLocalGroupId(),
+          replicationServerDomain.
+          getReplicationServer().getDegradedStatusThreshold(),
+          replicationServer.getWeight(),
+          replicationServerDomain.getConnectedLDAPservers().size());
+
+
+      session.publish(outReplServerStartDSMsg);
+
+      return outReplServerStartDSMsg;
+    }
+  }
+
+  /**
    * Creates a new handler object to a remote replication server.
    * @param session The session with the remote RS.
    * @param queueSize The queue size to manage updates to that RS.
@@ -406,12 +464,14 @@
       // lock with timeout
       lockDomain(true);
 
+      this.localGenerationId = replicationServerDomain.getGenerationId();
+
       // send start to remote
-      ReplServerStartMsg outReplServerStartMsg =
+      StartMsg outStartMsg =
         sendStartToRemote(protocolVersion);
 
       // log
-      logStartHandshakeRCVandSND(inECLStartMsg, outReplServerStartMsg);
+      logStartHandshakeRCVandSND(inECLStartMsg, outStartMsg);
 
       // until here session is encrypted then it depends on the negociation
       // The session initiator decides whether to use SSL.
@@ -421,6 +481,14 @@
       // wait and process StartSessionMsg from remote RS
       StartECLSessionMsg inStartECLSessionMsg =
         waitAndProcessStartSessionECLFromRemoteServer();
+      if (inStartECLSessionMsg == null)
+        {
+          // client wants to properly close the connection (client sent a
+          // StopMsg)
+          logStopReceived();
+          abortStart(null);
+          return;
+        }
 
       logStartECLSessionHandshake(inStartECLSessionMsg);
 
@@ -462,7 +530,12 @@
     ReplicationMsg msg = null;
     msg = session.receive();
 
-    if (!(msg instanceof StartECLSessionMsg))
+    if (msg instanceof StopMsg)
+    {
+      // client wants to stop handshake (was just for handshake phase one for RS
+      // choice). Return null to make the session be terminated.
+      return null;
+    } else if (!(msg instanceof StartECLSessionMsg))
     {
       Message message = Message.raw(
           "Protocol error: StartECLSessionMsg required." + msg + " received.");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 53efc16..2567751 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -169,7 +169,7 @@
     catch (SocketException e)
     {
       // Just ignore the exception and let the thread die as well
-      errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
           "for operation " + handler.getOperationId());
       logError(errMessage);
     }
@@ -198,7 +198,7 @@
   }
 
   /**
-   * Loop geting changes from the domain and publishing them either to
+   * Loop getting changes from the domain and publishing them either to
    * the provided session or to the ECL session interface.
    * @throws IOException when raised (connection closure)
    * @throws InterruptedException when raised
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 8cd9f1a..c093c86 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -189,6 +189,21 @@
   final private Object domainMonitor = new Object();
 
   /**
+   * The weight affected to the replication server.
+   * Each replication server of the topology has a weight. When combined
+   * together, the weights of the replication servers of a same group can be
+   * translated to a percentage that determines the quantity of directory
+   * servers of the topology that should be connected to a replication server.
+   * For instance imagine a topology with 3 replication servers (with the same
+   * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
+   * RS1 should have 25% of the directory servers connected in the topology,
+   * RS2 25%, and RS3 50%. This may be useful if the replication servers of the
+   * topology have a different power and one wants to spread the load between
+   * the replication servers according to their power.
+   */
+  private int weight = 1;
+
+  /**
    * Creates a new Replication server using the provided configuration entry.
    *
    * @param configuration The configuration of this replication server.
@@ -979,6 +994,13 @@
       }
     }
 
+    // Set a potential new weight
+    if (weight != configuration.getWeight())
+    {
+      weight = configuration.getWeight();
+      // TODO: send new TopologyMsg
+    }
+
     if ((configuration.getReplicationDBDirectory() != null) &&
         (!dbDirname.equals(configuration.getReplicationDBDirectory())))
     {
@@ -1786,4 +1808,13 @@
     }
     return result;
   }
+
+  /**
+   * Gets the weight.
+   * @return the weight
+   */
+  public int getWeight()
+  {
+    return weight;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 8d3b4a7..a301fc3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1554,20 +1554,8 @@
         // in the topology.
         if (senderHandler.isDataServer())
         {
-          MonitorMsg returnMsg;
-
-          if (senderHandler.getProtocolVersion() >
-                             ProtocolVersion.REPLICATION_PROTOCOL_V1)
-          {
-           returnMsg =
+          MonitorMsg returnMsg =
             new MonitorMsg(msg.getDestination(), msg.getsenderID());
-          }
-          else
-          {
-            returnMsg =
-              new MonitorMsg(msg.getDestination(), msg.getsenderID(),
-                  ProtocolVersion.REPLICATION_PROTOCOL_V1);
-          }
 
           try
           {
@@ -1613,20 +1601,8 @@
           return;
         }
 
-        MonitorMsg monitorMsg;
-
-        if (senderHandler.getProtocolVersion() >
-                                  ProtocolVersion.REPLICATION_PROTOCOL_V1)
-        {
-          monitorMsg =
-            new MonitorMsg(msg.getDestination(), msg.getsenderID());
-        }
-        else
-        {
-          monitorMsg =
-            new MonitorMsg(msg.getDestination(), msg.getsenderID(),
-                ProtocolVersion.REPLICATION_PROTOCOL_V1);
-        }
+        MonitorMsg monitorMsg =
+          new MonitorMsg(msg.getDestination(), msg.getsenderID());
 
         // Populate for each connected LDAP Server
         // from the states stored in the serverHandler.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 8693dd9..6ce4627 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -121,6 +121,34 @@
   }
 
   /**
+   * Send the ReplServerStartMsg to the remote RS.
+   * @param requestedProtocolVersion The provided protocol version.
+   * @return The ReplServerStartMsg sent.
+   * @throws IOException When an exception occurs.
+   */
+  private ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
+  throws IOException
+  {
+    ReplServerStartMsg outReplServerStartMsg
+    = new ReplServerStartMsg(
+        replicationServerId,
+        replicationServerURL,
+        getServiceId(),
+        maxRcvWindow,
+        replicationServerDomain.getDbServerState(),
+        protocolVersion,
+        localGenerationId,
+        sslEncryption,
+        getLocalGroupId(),
+        replicationServerDomain.
+        getReplicationServer().getDegradedStatusThreshold());
+
+    session.publish(outReplServerStartMsg, requestedProtocolVersion);
+
+    return outReplServerStartMsg;
+  }
+
+  /**
    * Creates a new handler object to a remote replication server.
    * @param session The session with the remote RS.
    * @param queueSize The queue size to manage updates to that RS.
@@ -262,6 +290,7 @@
       // lock with timeout
       lockDomain(true);
 
+      this.localGenerationId = replicationServerDomain.getGenerationId();
       ReplServerStartMsg outReplServerStartMsg =
         sendStartToRemote(protocolVersion);
 
@@ -389,6 +418,14 @@
       super.finalizeStart();
 
     }
+    catch(IOException ioe) {
+      Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
+        Integer.toString(inReplServerStartMsg.getServerId()),
+        Integer.toString(replicationServerDomain.getReplicationServer().
+        getServerId()));
+      logError(errMessage);
+      abortStart(errMessage);
+    }
     catch(DirectoryException de)
     {
       abortStart(de.getMessageObject());
@@ -425,7 +462,7 @@
   throws IOException
   {
     TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
-    session.publish(outTopoMsg);
+    session.publish(outTopoMsg, protocolVersion);
     return outTopoMsg;
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index ce95024..489dbae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,13 +50,14 @@
 import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
 import org.opends.server.replication.protocol.HeartbeatThread;
+import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartMsg;
 import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
 import org.opends.server.replication.protocol.TopologyMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.protocol.WindowMsg;
@@ -94,17 +95,21 @@
     if (providedMsg != null)
     {
       if (debugEnabled())
-        TRACER.debugInfo("In "
-            + ((handler!=null)?handler.toString():"Replication Server")
-            + " closing session with err=" +
-            providedMsg.toString());
+        TRACER.debugInfo("In " +
+          ((handler != null) ? handler.toString() : "Replication Server") +
+          " closing session with err=" +
+          providedMsg.toString());
       logError(providedMsg);
     }
     try
     {
-      if (providedSession!=null)
+      if (providedSession != null)
+        // This method is only called when aborting a failing handshake and
+        // not StopMsg should be sent in such situation. StopMsg are only
+        // expected when full handshake has been performed, or at end of
+        // handshake phase 1, when DS was just gathering available RS info
         providedSession.close();
-    } catch (IOException ee)
+    } catch (IOException e)
     {
       // ignore
     }
@@ -174,7 +179,10 @@
   private int rcvWindow;
   private int rcvWindowSizeHalf;
 
-  private int maxRcvWindow;
+  /**
+   * The size of the receiving window.
+   */
+  protected int maxRcvWindow;
   /**
    * Semaphore that the writer uses to control the flow to the remote server.
    */
@@ -197,7 +205,7 @@
    */
   protected long localGenerationId = -1;
   /**
-   * The generation id before procesing a new start handshake.
+   * The generation id before processing a new start handshake.
    */
   protected long oldGenerationId = -1;
   /**
@@ -210,7 +218,7 @@
   protected boolean initSslEncryption;
 
   /**
-   * The SSL encryption after the negociation with the peer.
+   * The SSL encryption after the negotiation with the peer.
    */
   protected boolean sslEncryption;
   /**
@@ -275,17 +283,6 @@
     // be disturbed
     if (session!=null)
     {
-      try
-      {
-        session.publish(
-            new ErrorMsg(
-                replicationServerDomain.getReplicationServer().getServerId(),
-                serverId,
-                reason));
-      }
-      catch(Exception e)
-      {
-      }
       closeSession(session, reason, this);
     }
 
@@ -991,7 +988,14 @@
           replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() + this +
           " publishes message:\n" + msg);
-    session.publish(msg);
+    // Currently only MonitorMsg has to support a backward compatibility
+    if (msg instanceof MonitorMsg)
+    {
+      session.publish(msg, protocolVersion);
+    } else
+    {
+      session.publish(msg);
+    }
   }
 
   /**
@@ -1017,35 +1021,6 @@
   }
 
   /**
-   * Send the ReplServerStartMsg to the remote server (RS or DS).
-   * @param requestedProtocolVersion The provided protocol version.
-   * @return The ReplServerStartMsg sent.
-   * @throws IOException When an exception occurs.
-   */
-  public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
-  throws IOException
-  {
-    this.localGenerationId = replicationServerDomain.getGenerationId();
-    ReplServerStartMsg outReplServerStartMsg
-    = new ReplServerStartMsg(
-        replicationServerId,
-        replicationServerURL,
-        getServiceId(),
-        maxRcvWindow,
-        replicationServerDomain.getDbServerState(),
-        protocolVersion,
-        localGenerationId,
-        sslEncryption,
-        getLocalGroupId(),
-        replicationServerDomain.
-        getReplicationServer().getDegradedStatusThreshold());
-
-    session.publish(outReplServerStartMsg, requestedProtocolVersion);
-
-    return outReplServerStartMsg;
-  }
-
-  /**
    * Sends the provided TopologyMsg to the peer server.
    *
    * @param topoMsg The TopologyMsg message to be sent.
@@ -1058,7 +1033,7 @@
     // V1 Rs do not support the TopologyMsg
     if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
     {
-      session.publish(topoMsg);
+      session.publish(topoMsg, protocolVersion);
     }
   }
 
@@ -1110,6 +1085,18 @@
 
     if (session != null)
     {
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // V4 protocol introduces a StopMsg to properly end
+        // communications
+        try
+        {
+          session.publish(new StopMsg());
+        } catch (IOException ioe)
+        {
+          // Anyway, going to close session, so nothing to do
+        }
+      }
       // Close session to end ServerReader or ServerWriter
       try
       {
@@ -1328,12 +1315,27 @@
           replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + " :" +
-          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
+          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
           "\nAND REPLIED:\n" + outTopoMsg.toString());
     }
   }
 
   /**
+   * Log stop message has been received.
+   */
+  protected void logStopReceived()
+  {
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("In " +
+          replicationServerDomain.getReplicationServer().
+          getMonitorInstanceName() + ", " +
+          this.getClass().getSimpleName() + " " + this + " :" +
+          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
+    }
+  }
+
+  /**
    * Log the messages involved in the Topology/StartSession handshake.
    * @param inStartECLSessionMsg The message received first.
    */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index c238e19..4b5037d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -272,6 +272,18 @@
             ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
             replicationServerDomain.processChangeTimeHeartbeatMsg(handler,
                 cthbMsg);
+          } else if (msg instanceof StopMsg)
+          {
+            // Peer server is properly disconnecting: go out of here to
+            // properly close the server handler going to finally block.
+            if (debugEnabled())
+            {
+              TRACER.debugInfo(handler.toString() + " has properly " +
+                "disconnected from this replication server " +
+                Integer.toString(replicationServerDomain.getReplicationServer().
+                getServerId()));
+            }
+            return;
           } else if (msg == null)
           {
             /*
@@ -308,7 +320,7 @@
           " reader IO EXCEPTION for serverID=" + serverId + " " +
           this + " " +
           stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
-      errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
         Integer.toString(replicationServerDomain.
         getReplicationServer().getServerId()));
       logError(errMessage);
@@ -346,7 +358,7 @@
     finally
     {
       /*
-       * The thread only exit the loop above is some error condition
+       * The thread only exits the loop above if some error condition
        * happen.
        * Attempt to close the socket and stop the server handler.
        */
@@ -357,6 +369,19 @@
             "In RS " + replicationServerDomain.getReplicationServer().
             getMonitorInstanceName() +
             this + " is closing the session");
+        if (handler.getProtocolVersion() >=
+          ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        {
+          // V4 protocol introduces a StopMsg to properly end
+          // communications
+          try
+          {
+            session.publish(new StopMsg());
+          } catch (IOException ioe)
+          {
+            // Anyway, going to close session, so nothing to do
+          }
+        }
         session.close();
       } catch (IOException e)
       {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 37579ba..80ad900 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -41,6 +41,8 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.StopMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 
 
@@ -198,7 +200,7 @@
        * The remote host has disconnected and this particular Tree is going to
        * be removed, just ignore the exception and let the thread die as well
        */
-      errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
         Integer.toString(replicationServerDomain.
         getReplicationServer().getServerId()));
       logError(errMessage);
@@ -209,7 +211,7 @@
        * The remote host has disconnected and this particular Tree is going to
        * be removed, just ignore the exception and let the thread die as well
        */
-      errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
         Integer.toString(replicationServerDomain.
         getReplicationServer().getServerId()));
       logError(errMessage);
@@ -225,6 +227,18 @@
       logError(errMessage);
     }
     finally {
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // V4 protocol introduces a StopMsg to properly end
+        // communications
+        try
+        {
+          session.publish(new StopMsg());
+        } catch (IOException ioe)
+        {
+          // Anyway, going to close session, so nothing to do
+        }
+      }
       try
       {
         session.close();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 9b39dc7..e058301 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -43,6 +43,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -61,6 +62,7 @@
 import org.opends.server.replication.protocol.HeartbeatMonitor;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
 import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -68,6 +70,7 @@
 import org.opends.server.replication.protocol.ServerStartMsg;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
 import org.opends.server.replication.protocol.TopologyMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.protocol.WindowMsg;
@@ -112,9 +115,6 @@
   // Our replication domain
   private ReplicationDomain domain = null;
 
-  // Trick for avoiding a inner class for many parameters return for
-  // performPhaseOneHandshake method.
-  private String tmpReadableServerName = null;
   /**
    * The expected duration in milliseconds between heartbeats received
    * from the replication server.  Zero means heartbeats are off.
@@ -183,7 +183,7 @@
    * @param groupId The group id of our domain.
    * @param changeTimeHeartbeatInterval The interval (in ms) between Change
    *        time  heartbeats are sent to the RS,
-   *        or zero if no CN heartbeat shoud be sent.
+   *        or zero if no CN heartbeat should be sent.
    */
   public ReplicationBroker(ReplicationDomain replicationDomain,
     ServerState state, String baseDn, int serverID2, int window,
@@ -290,23 +290,93 @@
 
   /**
    * Bag class for keeping info we get from a server in order to compute the
-   * best one to connect to.
+   * best one to connect to. This is in fact a wrapper to a
+   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
    */
   public static class ServerInfo
   {
-
-    private ServerState serverState = null;
+    private short protocolVersion;
+    private long generationId;
     private byte groupId = (byte) -1;
+    private int serverId;
+    private String serverURL;
+    private String baseDn = null;
+    private int windowSize;
+    private ServerState serverState;
+    private boolean sslEncryption;
+    private int degradedStatusThreshold = -1;
+    // Keeps the -1 value if created with a ReplServerStartMsg
+    private int weight = -1;
+    // Keeps the -1 value if created with a ReplServerStartMsg
+    private int connectedDSNumber = -1;
 
     /**
-     * Constructor.
-     * @param serverState Server state of the RS
-     * @param groupId Group id of the RS
+     * Create a new instance of ServerInfo wrapping the passed message.
+     * @param msg Message to wrap.
+     * @return The new instance wrapping the passed message.
+     * @throws IllegalArgumentException If the passed message has an unexpected
+     *                                  type.
      */
-    public ServerInfo(ServerState serverState, byte groupId)
+    public static ServerInfo newServerInfo(
+      ReplicationMsg msg) throws IllegalArgumentException
     {
-      this.serverState = serverState;
-      this.groupId = groupId;
+      if (msg instanceof ReplServerStartMsg)
+      {
+        // This is a ReplServerStartMsg (RS uses protocol V3 or under)
+        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
+        return new ServerInfo(replServerStartMsg);
+      }
+      else if (msg instanceof ReplServerStartDSMsg)
+      {
+        // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
+        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
+        return new ServerInfo(replServerStartDSMsg);
+      }
+
+      // Unsupported message type: should not happen
+      throw new IllegalArgumentException("Unexpected PDU type: " +
+        msg.getClass().getName() + " :\n" + msg.toString());
+    }
+
+    /**
+     * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
+     * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
+     */
+    private ServerInfo(ReplServerStartMsg replServerStartMsg)
+    {
+      this.protocolVersion = replServerStartMsg.getVersion();
+      this.generationId = replServerStartMsg.getGenerationId();
+      this.groupId = replServerStartMsg.getGroupId();
+      this.serverId = replServerStartMsg.getServerId();
+      this.serverURL = replServerStartMsg.getServerURL();
+      this.baseDn = replServerStartMsg.getBaseDn();
+      this.windowSize = replServerStartMsg.getWindowSize();
+      this.serverState = replServerStartMsg.getServerState();
+      this.sslEncryption = replServerStartMsg.getSSLEncryption();
+      this.degradedStatusThreshold =
+        replServerStartMsg.getDegradedStatusThreshold();
+    }
+
+    /**
+     * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
+     * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
+     * wrap.
+     */
+    private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+    {
+      this.protocolVersion = replServerStartDSMsg.getVersion();
+      this.generationId = replServerStartDSMsg.getGenerationId();
+      this.groupId = replServerStartDSMsg.getGroupId();
+      this.serverId = replServerStartDSMsg.getServerId();
+      this.serverURL = replServerStartDSMsg.getServerURL();
+      this.baseDn = replServerStartDSMsg.getBaseDn();
+      this.windowSize = replServerStartDSMsg.getWindowSize();
+      this.serverState = replServerStartDSMsg.getServerState();
+      this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
+      this.degradedStatusThreshold =
+        replServerStartDSMsg.getDegradedStatusThreshold();
+      this.weight = replServerStartDSMsg.getWeight();
+      this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
     }
 
     /**
@@ -326,6 +396,98 @@
     {
       return groupId;
     }
+
+    /**
+     * Get the server protocol version.
+     * @return the protocolVersion
+     */
+    public short getProtocolVersion()
+    {
+      return protocolVersion;
+    }
+
+    /**
+     * Get the generation id.
+     * @return the generationId
+     */
+    public long getGenerationId()
+    {
+      return generationId;
+    }
+
+    /**
+     * Get the server id.
+     * @return the serverId
+     */
+    public int getServerId()
+    {
+      return serverId;
+    }
+
+    /**
+     * Get the server URL.
+     * @return the serverURL
+     */
+    public String getServerURL()
+    {
+      return serverURL;
+    }
+
+    /**
+     * Get the base dn.
+     * @return the baseDn
+     */
+    public String getBaseDn()
+    {
+      return baseDn;
+    }
+
+    /**
+     * Get the window size.
+     * @return the windowSize
+     */
+    public int getWindowSize()
+    {
+      return windowSize;
+    }
+
+    /**
+     * Get the ssl encryption.
+     * @return the sslEncryption
+     */
+    public boolean isSslEncryption()
+    {
+      return sslEncryption;
+    }
+
+    /**
+     * Get the degraded status threshold.
+     * @return the degradedStatusThreshold
+     */
+    public int getDegradedStatusThreshold()
+    {
+      return degradedStatusThreshold;
+    }
+
+    /**
+     * Get the weight.
+     * @return the weight. Null if this object is a wrapper for
+     * a ReplServerStartMsg.
+     */
+    public int getWeight()
+    {
+      return weight;
+    }
+
+    /**
+     * Get the connected DS number.
+     * @return the connectedDSNumber. Null if this object is a wrapper for
+     * a ReplServerStartMsg.
+     */
+    public int getConnectedDSNumber()
+    {
+      return connectedDSNumber;
+    }
   }
 
   private void connect()
@@ -342,10 +504,34 @@
   }
 
   /**
+   * Contacts all replication servers to get information from them and being
+   * able to choose the more suitable.
+   * @return the collected information.
+   */
+  private Map<String, ServerInfo> collectReplicationServersInfo() {
+
+    Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+
+    for (String server : servers)
+    {
+      // Connect to server and get info about it
+      ServerInfo serverInfo = performPhaseOneHandshake(server, false);
+
+      // Store server info in list
+      if (serverInfo != null)
+      {
+        rsInfos.put(server, serverInfo);
+      }
+    }
+
+    return rsInfos;
+  }
+
+  /**
    * Special aspects of connecting as ECL compared to connecting as data server
    * are :
    * - 1 single RS configured
-   * - so no choice of the prefered RS
+   * - so no choice of the preferred RS
    * - No same groupID polling
    * - ?? Heartbeat
    * - Start handshake is :
@@ -358,10 +544,10 @@
     // FIXME:ECL List of RS to connect is for now limited to one RS only
     String bestServer = this.servers.iterator().next();
 
-    ReplServerStartMsg inReplServerStartMsg
+    ReplServerStartDSMsg inReplServerStartDSMsg
       = performECLPhaseOneHandshake(bestServer, true);
 
-    if (inReplServerStartMsg!=null)
+    if (inReplServerStartDSMsg!=null)
       performECLPhaseTwoHandshake(bestServer);
   }
 
@@ -392,8 +578,6 @@
    */
   private void connectAsDataServer()
   {
-    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
-
     // May have created a broker with null replication domain for
     // unit test purpose.
     if (domain != null)
@@ -418,24 +602,12 @@
        */
       if (debugEnabled())
         TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
-            " order to elect the prefered one");
-      for (String server : servers)
-      {
-        // Connect to server and get reply message
-        ReplServerStartMsg replServerStartMsg =
-          performPhaseOneHandshake(server, false);
+            " order to elect the preferred one");
 
-        // Store reply message info in list
-        if (replServerStartMsg != null)
-        {
-          ServerInfo serverInfo =
-            new ServerInfo(replServerStartMsg.getServerState(),
-            replServerStartMsg.getGroupId());
-          rsInfos.put(server, serverInfo);
-        }
-      } // for servers
+      // Get info from every available replication servers
+      Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
 
-      ReplServerStartMsg replServerStartMsg = null;
+      ServerInfo serverInfo = null;
 
       if (rsInfos.size() > 0)
       {
@@ -446,19 +618,17 @@
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
           TRACER.debugInfo(
-              "phase 2 : will perform PhaseOneH with the prefered RS.");
-        replServerStartMsg = performPhaseOneHandshake(bestServer, true);
+              "phase 2 : will perform PhaseOneH with the preferred RS.");
+        serverInfo = performPhaseOneHandshake(bestServer, true);
 
-        if (replServerStartMsg != null) // Handshake phase 1 exchange went well
+        if (serverInfo != null) // Handshake phase 1 exchange went well
 
         {
-          ServerInfo bestServerInfo = rsInfos.get(bestServer);
-
           // Compute in which status we are starting the session to tell the RS
           ServerStatus initStatus =
-            computeInitialServerStatus(replServerStartMsg.getGenerationId(),
-            bestServerInfo.getServerState(),
-            replServerStartMsg.getDegradedStatusThreshold(),
+            computeInitialServerStatus(serverInfo.getGenerationId(),
+            serverInfo.getServerState(),
+            serverInfo.getDegradedStatusThreshold(),
             this.getGenerationID());
 
           // Perfom session start (handshake phase 2)
@@ -485,7 +655,7 @@
                * reconnection at that time to retrieve a server with our group
                * id.
                */
-              byte tmpRsGroupId = bestServerInfo.getGroupId();
+              byte tmpRsGroupId = serverInfo.getGroupId();
               boolean someServersWithSameGroupId =
                 hasSomeServerWithSameGroupId(topologyMsg.getRsList());
 
@@ -493,10 +663,10 @@
               if ((tmpRsGroupId == groupId) ||
                 ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
               {
-                replicationServer = tmpReadableServerName;
-                maxSendWindow = replServerStartMsg.getWindowSize();
-                rsGroupId = replServerStartMsg.getGroupId();
-                rsServerId = replServerStartMsg.getServerId();
+                replicationServer = session.getReadableRemoteAddress();
+                maxSendWindow = serverInfo.getWindowSize();
+                rsGroupId = serverInfo.getGroupId();
+                rsServerId = serverInfo.getServerId();
                 rsServerUrl = bestServer;
 
                 // May have created a broker with null replication domain for
@@ -504,8 +674,8 @@
                 if (domain != null)
                 {
                   domain.sessionInitiated(
-                      initStatus, replServerStartMsg.getServerState(),
-                      replServerStartMsg.getGenerationId(),
+                      initStatus, serverInfo.getServerState(),
+                      serverInfo.getGenerationId(),
                       session);
                 }
                 receiveTopo(topologyMsg);
@@ -524,7 +694,7 @@
                  startSameGroupIdPoller();
                 }
                 startRSHeartBeatMonitoring();
-                if (replServerStartMsg.getVersion()
+                if (serverInfo.getProtocolVersion()
                     >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
                 {
                   startChangeTimeHeartBeatPublishing();
@@ -584,8 +754,8 @@
         rcvWindow = maxRcvWindow;
         connectPhaseLock.notify();
 
-        if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
-          (replServerStartMsg.getGenerationId() == -1))
+        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
+          (serverInfo.getGenerationId() == -1))
         {
           Message message =
             NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -602,7 +772,7 @@
             baseDn.toString(),
             replicationServer,
             Long.toString(this.getGenerationID()),
-            Long.toString(replServerStartMsg.getGenerationId()));
+            Long.toString(serverInfo.getGenerationId()));
           logError(message);
         }
       } else
@@ -709,19 +879,19 @@
   /**
    * Connect to the provided server performing the first phase handshake
    * (start messages exchange) and return the reply message from the replication
-   * server.
+   * server, wrapped in a ServerInfo object.
    *
    * @param server Server to connect to.
    * @param keepConnection Do we keep session opened or not after handshake.
    *        Use true if want to perform handshake phase 2 with the same session
    *        and keep the session to create as the current one.
-   * @return The ReplServerStartMsg the server replied. Null if could not
+   * @return The answer from the server . Null if could not
    *         get an answer.
    */
-  private ReplServerStartMsg performPhaseOneHandshake(String server,
+  private ServerInfo performPhaseOneHandshake(String server,
     boolean keepConnection)
   {
-    ReplServerStartMsg replServerStartMsg = null;
+    ServerInfo serverInfo = null;
 
     // Parse server string.
     int separator = server.lastIndexOf(':');
@@ -738,8 +908,6 @@
       int intPort = Integer.parseInt(port);
       InetSocketAddress serverAddr = new InetSocketAddress(
         InetAddress.getByName(hostname), intPort);
-      if (keepConnection)
-        tmpReadableServerName = serverAddr.toString();
       Socket socket = new Socket();
       socket.setReceiveBufferSize(1000000);
       socket.setTcpNoDelay(true);
@@ -759,19 +927,23 @@
       localSession.publish(serverStartMsg);
 
       /*
-       * Read the ReplServerStartMsg that should come back.
+       * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
+       * back.
        */
-      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+      ReplicationMsg msg = localSession.receive();
 
       if (debugEnabled())
-      {
-        TRACER.debugInfo("In RB for " + baseDn +
-          "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
-          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
-      }
+        {
+          TRACER.debugInfo("In RB for " + baseDn +
+            "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
+            "\nAND RECEIVED:\n" + msg.toString());
+        }
+
+      // Wrap received message in a server info object
+      serverInfo = ServerInfo.newServerInfo(msg);
 
       // Sanity check
-      String repDn = replServerStartMsg.getBaseDn();
+      String repDn = serverInfo.getBaseDn();
       if (!(this.baseDn.equals(repDn)))
       {
         Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -786,7 +958,7 @@
        * if it is an old replication server).
        */
       protocolVersion = ProtocolVersion.minWithCurrent(
-          replServerStartMsg.getVersion());
+        serverInfo.getProtocolVersion());
       localSession.setProtocolVersion(protocolVersion);
 
 
@@ -839,10 +1011,25 @@
     {
       if (localSession != null)
       {
+        if (debugEnabled())
+          TRACER.debugInfo("In RB, closing session after phase 1");
+
+        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        {
+          // V4 protocol introduces a StopMsg to properly end communications
+          if (!error)
+          {
+            try
+            {
+              localSession.publish(new StopMsg());
+            } catch (IOException ioe)
+            {
+              // Anyway, going to close session, so nothing to do
+            }
+          }
+        }
         try
         {
-          if (debugEnabled())
-            TRACER.debugInfo("In RB, closing session after phase 1");
           localSession.close();
         } catch (IOException e)
         {
@@ -852,7 +1039,7 @@
       }
       if (error)
       {
-        replServerStartMsg = null;
+        serverInfo = null;
       } // Be sure to return null.
 
     }
@@ -864,7 +1051,7 @@
       session = localSession;
     }
 
-    return replServerStartMsg;
+    return serverInfo;
   }
 
   /**
@@ -876,13 +1063,13 @@
    * @param keepConnection Do we keep session opened or not after handshake.
    *        Use true if want to perform handshake phase 2 with the same session
    *        and keep the session to create as the current one.
-   * @return The ReplServerStartMsg the server replied. Null if could not
+   * @return The ReplServerStartDSMsg the server replied. Null if could not
    *         get an answer.
    */
-  private ReplServerStartMsg performECLPhaseOneHandshake(String server,
+  private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
     boolean keepConnection)
   {
-    ReplServerStartMsg replServerStartMsg = null;
+    ReplServerStartDSMsg replServerStartDSMsg = null;
 
     // Parse server string.
     int separator = server.lastIndexOf(':');
@@ -899,8 +1086,6 @@
       int intPort = Integer.parseInt(port);
       InetSocketAddress serverAddr = new InetSocketAddress(
         InetAddress.getByName(hostname), intPort);
-      if (keepConnection)
-        tmpReadableServerName = serverAddr.toString();
       Socket socket = new Socket();
       socket.setReceiveBufferSize(1000000);
       socket.setTcpNoDelay(true);
@@ -920,17 +1105,17 @@
       localSession.publish(serverStartECLMsg);
 
       // Read the ReplServerStartMsg that should come back.
-      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+      replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
 
       if (debugEnabled())
       {
         TRACER.debugInfo("In RB for " + baseDn +
           "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
-          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
+          "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
       }
 
       // Sanity check
-      String repDn = replServerStartMsg.getBaseDn();
+      String repDn = replServerStartDSMsg.getBaseDn();
       if (!(this.baseDn.equals(repDn)))
       {
         Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -946,7 +1131,7 @@
        */
       if (keepConnection)
         protocolVersion = ProtocolVersion.minWithCurrent(
-          replServerStartMsg.getVersion());
+          replServerStartDSMsg.getVersion());
       localSession.setProtocolVersion(protocolVersion);
 
       if (!isSslEncryption)
@@ -998,10 +1183,22 @@
     {
       if (localSession != null)
       {
+        if (debugEnabled())
+          TRACER.debugInfo("In RB, closing session after phase 1");
+
+        // V4 protocol introduces a StopMsg to properly end communications
+        if (!error)
+        {
+          try
+          {
+            localSession.publish(new StopMsg());
+          } catch (IOException ioe)
+          {
+            // Anyway, going to close session, so nothing to do
+          }
+        }
         try
         {
-          if (debugEnabled())
-            TRACER.debugInfo("In RB, closing session after phase 1");
           localSession.close();
         } catch (IOException e)
         {
@@ -1011,7 +1208,7 @@
       }
       if (error)
       {
-        replServerStartMsg = null;
+        replServerStartDSMsg = null;
       } // Be sure to return null.
 
     }
@@ -1023,7 +1220,7 @@
       session = localSession;
     }
 
-    return replServerStartMsg;
+    return replServerStartDSMsg;
   }
 
   /**
@@ -1184,8 +1381,7 @@
    * @return The computed best replication server.
    */
   public static String computeBestReplicationServer(ServerState myState,
-    HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn,
-    byte groupId)
+    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
   {
     /*
      * Preference is given to servers with the requested group id:
@@ -1195,7 +1391,7 @@
      */
 
     // Filter for servers with same group id
-    HashMap<String, ServerInfo> sameGroupIdRsInfos =
+    Map<String, ServerInfo> sameGroupIdRsInfos =
       new HashMap<String, ServerInfo>();
 
     for (String repServer : rsInfos.keySet())
@@ -1231,7 +1427,7 @@
    * @return The computed best replication server.
    */
   private static String searchForBestReplicationServer(ServerState myState,
-    HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn)
+    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
   {
     /*
      * Find replication servers who are up to date (or more up to date than us,
@@ -1266,7 +1462,7 @@
     HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
 
     /*
-     * Start loop to differenciate up to date servers from late ones.
+     * Start loop to differentiate up to date servers from late ones.
      */
     ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
     if (myChangeNumber == null)
@@ -1321,6 +1517,7 @@
         if (ReplicationServer.isLocalReplicationServer(upServer))
         {
           localRS = true;
+          break;
         }
       }
       if (localRS)
@@ -1459,7 +1656,8 @@
         new HeartbeatMonitor("Replication Heartbeat Monitor on RS " +
         getReplicationServer() + " " + rsServerId + " for " + baseDn +
         " in DS " + serverId,
-        session, heartbeatInterval);
+        session, heartbeatInterval, (protocolVersion >=
+        ProtocolVersion.REPLICATION_PROTOCOL_V4));
       heartbeatMonitor.start();
     }
   }
@@ -1513,16 +1711,28 @@
    */
   public void reStart(ProtocolSession failingSession)
   {
-    try
+
+    if (failingSession != null)
     {
-      if (failingSession != null)
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // V4 protocol introduces a StopMsg to properly end communications
+        try
+        {
+          failingSession.publish(new StopMsg());
+        } catch (IOException ioe)
+        {
+          // Anyway, going to close session, so nothing to do
+        }
+      }
+      try
       {
         failingSession.close();
-        numLostConnections++;
+      } catch (IOException e1)
+      {
+        // ignore
       }
-    } catch (IOException e1)
-    {
-      // ignore
+      numLostConnections++;
     }
 
     if (failingSession == session)
@@ -1708,6 +1918,19 @@
           TopologyMsg topoMsg = (TopologyMsg)msg;
           receiveTopo(topoMsg);
         }
+        else if (msg instanceof StopMsg)
+        {
+          /*
+           * RS performs a proper disconnection
+           */
+          Message message =
+            NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
+            Integer.toString(rsServerId), baseDn.toString(),
+            Integer.toString(serverId));
+          logError(message);
+          // Try to find a suitable RS
+          this.reStart(failingSession);
+        }
         else
         {
           return msg;
@@ -1723,10 +1946,10 @@
 
           {
             /*
-             * If we did not initiate the close on our side, log a message.
+             * We did not initiate the close on our side, log an error message.
              */
             Message message =
-              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer,
+              ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
                   Integer.toString(rsServerId), baseDn.toString(),
                   Integer.toString(serverId));
             logError(message);
@@ -1783,14 +2006,26 @@
     rsGroupId = (byte) -1;
     rsServerId = -1;
     rsServerUrl = null;
-    try
+
+    if (session != null)
     {
-      if (session != null)
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        // V4 protocol introduces a StopMsg to properly end communications
+        try
+        {
+          session.publish(new StopMsg());
+        } catch (IOException ioe)
+        {
+          // Anyway, going to close session, so nothing to do
+        }
+      }
+      try
       {
         session.close();
+      } catch (IOException e)
+      {
       }
-    } catch (IOException e)
-    {
     }
   }
 
@@ -1896,7 +2131,7 @@
       Collection<String> replicationServers, int window, long heartbeatInterval,
       byte groupId)
   {
-    // These parameters needs to be renegociated with the ReplicationServer
+    // These parameters needs to be renegotiated with the ReplicationServer
     // so if they have changed, that requires restarting the session with
     // the ReplicationServer.
     Boolean needToRestartSession = false;
@@ -1945,7 +2180,7 @@
 
   private boolean debugEnabled()
   {
-    return true;
+    return false;
   }
 
   private static final void debugInfo(String s)
@@ -2057,13 +2292,13 @@
                 continue;
 
               // Connect to server and get reply message
-              ReplServerStartMsg replServerStartMsg =
+              ServerInfo serverInfo =
                 performPhaseOneHandshake(server, false);
 
-              // Store reply message info in list
-              if (replServerStartMsg != null)
+              // Is it a server with our group id ?
+              if (serverInfo != null)
               {
-                if (groupId == replServerStartMsg.getGroupId())
+                if (groupId == serverInfo.getGroupId())
                 {
                   // Found one server with the same group id as us, disconnect
                   // session to force reconnection to a server with same group
@@ -2072,6 +2307,20 @@
                     Byte.toString(groupId), baseDn.toString(),
                     Integer.toString(serverId));
                   logError(message);
+
+                  if (protocolVersion >=
+                    ProtocolVersion.REPLICATION_PROTOCOL_V4)
+                  {
+                    // V4 protocol introduces a StopMsg to properly end
+                    // communications
+                    try
+                    {
+                      session.publish(new StopMsg());
+                    } catch (IOException ioe)
+                    {
+                      // Anyway, going to close session, so nothing to do
+                    }
+                  }
                   try
                   {
                     session.close();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index db42214..9b8f67d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -1245,7 +1245,7 @@
    * from which this server can be initialized.
    *
    * @param targetString The string representing the source
-   * @return The source as a short value
+   * @return The source as a integer value
    * @throws DirectoryException if the string is not valid
    */
   public int decodeTarget(String targetString)
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index cf503e4..77f0fce 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -3445,7 +3445,7 @@
       SortedSet<String> replServers = new TreeSet<String>();
       replServers.add("localhost:"+replicationServerPort);
       DomainFakeCfg domainConf =
-        new DomainFakeCfg(baseDn2, (short) 1702, replServers);
+        new DomainFakeCfg(baseDn2, 1702, replServers);
       SortedSet<String> includeAttributes = new TreeSet<String>();
       includeAttributes.add("sn");
       domainConf.setEclIncludes(includeAttributes);
@@ -3457,7 +3457,7 @@
           TEST_ROOT_DN_STRING3, TEST_BACKEND_ID3);
       DN baseDn3 = DN.decode(TEST_ROOT_DN_STRING3);
       domainConf =
-        new DomainFakeCfg(baseDn3, (short) 1703, replServers);
+        new DomainFakeCfg(baseDn3, 1703, replServers);
       includeAttributes = new TreeSet<String>();
       includeAttributes.add("objectclass");
       domainConf.setEclIncludes(includeAttributes);
@@ -3466,7 +3466,7 @@
       replicationPlugin3.completeSynchronizationProvider();
 
       domainConf =
-        new DomainFakeCfg(baseDn2, (short) 1704, replServers);
+        new DomainFakeCfg(baseDn2, 1704, replServers);
       includeAttributes = new TreeSet<String>();
       includeAttributes.add("cn");
       domainConf.setEclIncludes(includeAttributes);
@@ -3475,7 +3475,7 @@
       Set<String> attrList = new HashSet<String>();
       attrList.add(new String("cn"));
       ReplicationBroker server01 = openReplicationSession(
-          DN.decode(TEST_ROOT_DN_STRING2), (short) 1206,
+          DN.decode(TEST_ROOT_DN_STRING2), 1206,
           100, replicationServerPort,
           1000, true, -1 , domain21);
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 56abd87..7529f60 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -36,11 +36,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.StringTokenizer;
 import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import org.opends.server.types.ResultCode;
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
@@ -69,13 +66,14 @@
 import org.opends.server.replication.common.ChangeNumberGenerator;
 import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.StopMsg;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeValue;
 import org.testng.annotations.BeforeClass;
 import org.opends.server.types.ByteString;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
-import org.opends.server.types.LockManager;
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.types.SearchScope;
 import org.testng.annotations.DataProvider;
@@ -478,8 +476,14 @@
           session.stopEncryption();
         }
 
-        // Read start session
-        StartSessionMsg startSessionMsg = (StartSessionMsg) session.receive();
+        // Read start session or stop
+        ReplicationMsg msg = session.receive();
+        if (msg instanceof StopMsg){
+          // Disconnection of DS looking for best server
+          return false;
+        }
+
+        StartSessionMsg startSessionMsg = (StartSessionMsg)msg;
 
         // Sanity checking for assured parameters
         boolean receivedIsAssured = startSessionMsg.isAssured();
@@ -505,7 +509,8 @@
 
       } catch (IOException e)
       {
-        // Probably un-connection of DS looking for best server
+        fail("Unexpected io exception in fake replication server handshake " +
+          "processing: " + e);
         return false;
       } catch (Exception e)
       {
@@ -1364,9 +1369,9 @@
 //      assertFalse(ackMsg.hasTimeout());
 //      assertTrue(ackMsg.hasReplayError());
 //      assertFalse(ackMsg.hasWrongStatus());
-//      List<Short> failedServers = ackMsg.getFailedServers();
+//      List<Integer> failedServers = ackMsg.getFailedServers();
 //      assertEquals(failedServers.size(), 1);
-//      assertEquals((short)failedServers.get(0), (short)1);
+//      assertEquals((integer)failedServers.get(0), (integer)1);
     } finally
     {
       endTest();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
index 825307a..349767e 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -40,6 +40,7 @@
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.testng.annotations.Test;
 
 /**
@@ -98,7 +99,10 @@
     aState.update(cn);
     cn = new ChangeNumber(0L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -143,7 +147,10 @@
     aState.update(cn);
     cn = new ChangeNumber(0L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -190,7 +197,10 @@
     aState.update(cn);
     cn = new ChangeNumber(0L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -237,7 +247,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -285,7 +298,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 2
     aState = new ServerState();
@@ -295,7 +311,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -345,7 +364,10 @@
     aState.update(cn);
     // This server has less changes than the other one but it has the same
     // group id as us so he should be the winner
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 2
     aState = new ServerState();
@@ -355,7 +377,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)2));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)2, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -403,7 +428,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)2));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)2, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 2
     aState = new ServerState();
@@ -413,7 +441,10 @@
     aState.update(cn);
     cn = new ChangeNumber(2L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)2));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)2, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -462,7 +493,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 2
     aState = new ServerState();
@@ -472,7 +506,10 @@
     aState.update(cn);
     cn = new ChangeNumber(3L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 3
     aState = new ServerState();
@@ -482,7 +519,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -531,7 +571,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 2
     aState = new ServerState();
@@ -541,7 +584,10 @@
     aState.update(cn);
     cn = new ChangeNumber(3L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)2));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)2, 0);
+    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 3
     aState = new ServerState();
@@ -553,7 +599,10 @@
     aState.update(cn);
     // This server has less changes than looser2 but it has the same
     // group id as us so he should be the winner
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -600,7 +649,10 @@
     aState.update(cn);
     cn = new ChangeNumber(1L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -648,7 +700,10 @@
     aState.update(cn);
     cn = new ChangeNumber(10L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 2
     aState = new ServerState();
@@ -658,7 +713,10 @@
     aState.update(cn);
     cn = new ChangeNumber(0L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -707,7 +765,10 @@
     aState.update(cn);
     cn = new ChangeNumber(10L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 2
     aState = new ServerState();
@@ -717,7 +778,10 @@
     aState.update(cn);
     cn = new ChangeNumber(0L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 3
     aState = new ServerState();
@@ -727,7 +791,10 @@
     aState.update(cn);
     cn = new ChangeNumber(10L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -780,7 +847,10 @@
     aState.update(cn);
     cn = new ChangeNumber(10L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+    ReplServerStartMsg replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 2
     aState = new ServerState();
@@ -790,7 +860,10 @@
     aState.update(cn);
     cn = new ChangeNumber(5L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 3
     aState = new ServerState();
@@ -800,7 +873,10 @@
     aState.update(cn);
     cn = new ChangeNumber(10L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER3, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER3, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 4
     aState = new ServerState();
@@ -810,7 +886,10 @@
     aState.update(cn);
     cn = new ChangeNumber(8L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 5 (null one for our serverid)
     aState = new ServerState();
@@ -818,7 +897,10 @@
     aState.update(cn);
     cn = new ChangeNumber(5L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER4, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER4, ServerInfo.newServerInfo(replServerStartMsg));
 
     // State for server 6
     aState = new ServerState();
@@ -828,7 +910,10 @@
     aState.update(cn);
     cn = new ChangeNumber(6L, 0, myId3);
     aState.update(cn);
-    rsInfos.put(LOOSER5, new ServerInfo(aState, (byte)1));
+    replServerStartMsg =
+      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+      false, (byte)1, 0);
+    rsInfos.put(LOOSER5, ServerInfo.newServerInfo(replServerStartMsg));
 
     String bestServer =
       computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index ce0b4ac..2744026 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -39,6 +39,7 @@
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
@@ -849,7 +850,7 @@
     AssuredType assuredType = null;
     int assuredSdLevel = -100;
     SortedSet<String> refUrls = null;
-    SortedSet<String> attrs = new TreeSet<String>();
+    Set<String> eclIncludes = new HashSet<String>();
 
     switch (dsId)
       {
@@ -904,7 +905,7 @@
     }
 
     return new DSInfo(dsId, rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assMode,
-       (byte)assuredSdLevel, groupId, urls, attrs);
+       (byte)assuredSdLevel, groupId, urls, eclIncludes);
   }
 
   /**
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index af8f66a..a293fbe 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -889,7 +889,7 @@
         {"1603303030303030303030303030303030313030303130303030303030300064633" +
          "d746573740066616b65756e69717565696400000200301f0a0102301a040b646573" +
          "6372697074696f6e310b04096e65772076616c756500",
-          ModifyMsg.class, new ChangeNumber(1, (short) 0, (short) 1), "dc=test" },
+          ModifyMsg.class, new ChangeNumber(1, 0, 1), "dc=test" },
         {"1803303030303031323366313238343132303030326430303030303037620064633" +
          "d636f6d00756e69717565696400000201",
             DeleteMsg.class, new ChangeNumber(0x123f1284120L,123,45), "dc=com"},
@@ -1092,11 +1092,11 @@
     dsList4.add(dsInfo2);
     dsList4.add(dsInfo1);
 
-    RSInfo rsInfo1 = new RSInfo((short)4527, (long)45316, (byte)103);
+    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
 
-    RSInfo rsInfo2 = new RSInfo((short)4527, (long)0, (byte)0);
+    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
 
-    RSInfo rsInfo3 = new RSInfo((short)0, (long)-21113, (byte)98);
+    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
 
     List<RSInfo> rsList1 = new ArrayList<RSInfo>();
     rsList1.add(rsInfo1);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 98a1e72..308a7df 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -886,6 +886,65 @@
       newMsg.getDegradedStatusThreshold());
   }
 
+  @DataProvider(name="createReplServerStartDSData")
+  public Object [][] createReplServerStartDSData() throws Exception
+  {
+    String baseDN = TEST_ROOT_DN_STRING;
+    ServerState state = new ServerState();
+    state.update(new ChangeNumber((long)0, 0, 0));
+    Object[] set1 = new Object[] {1, baseDN, 0, "localhost:8989", state, 0L, (byte)0, 0, 0, 0};
+
+    state = new ServerState();
+    state.update(new ChangeNumber((long)75, 5, 263));
+    Object[] set2 = new Object[] {16, baseDN, 100, "anotherHost:1025", state, 1245L, (byte)25, 3456, 3, 31512};
+
+    state = new ServerState();
+    state.update(new ChangeNumber((long)123, 5, 98));
+    Object[] set3 = new Object[] {36, baseDN, 100, "anotherHostAgain:8017", state, 6841L, (byte)32, 2496, 630, 9524};
+
+    return new Object [][] { set1, set2, set3 };
+  }
+
+  /**
+   * Test that ReplServerStartDSMsg encoding and decoding works
+   * by checking that : msg == new ReplServerStartMsg(msg.getBytes()).
+   */
+  @Test(dataProvider="createReplServerStartDSData")
+  public void replServerStartDSMsgTest(int serverId, String baseDN, int window,
+         String url, ServerState state, long genId, byte groupId, int degTh,
+         int weight, int connectedDSNumber) throws Exception
+  {
+    ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId,
+        url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId,
+        true, groupId, degTh, weight, connectedDSNumber);
+    ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes());
+    assertEquals(msg.getServerId(), newMsg.getServerId());
+    assertEquals(msg.getServerURL(), newMsg.getServerURL());
+    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
+    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+    assertEquals(msg.getServerState().getMaxChangeNumber(1),
+        newMsg.getServerState().getMaxChangeNumber(1));
+    assertEquals(msg.getVersion(), newMsg.getVersion());
+    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
+    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
+    assertTrue(msg.getGroupId() == newMsg.getGroupId());
+    assertTrue(msg.getDegradedStatusThreshold() ==
+      newMsg.getDegradedStatusThreshold());
+    assertEquals(msg.getWeight(), newMsg.getWeight());
+    assertEquals(msg.getConnectedDSNumber(), newMsg.getConnectedDSNumber());
+  }
+
+  /**
+   * Test that StopMsg encoding and decoding works
+   * by checking that : msg == new StopMsg(msg.getBytes()).
+   */
+  @Test
+  public void stopMsgTest() throws Exception
+  {
+    StopMsg msg = new StopMsg();
+    StopMsg newMsg = new StopMsg(msg.getBytes());
+  }
+
   /**
    * Test that WindowMsg encoding and decoding works
    * by checking that : msg == new WindowMsg(msg.getBytes()).
@@ -1457,8 +1516,7 @@
       new HashMap<AttributeType,List<Attribute>>();
     opList.put(attr.getAttributeType(), operationalAttributes);
 
-    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
-        (short) 123, (short) 45);
+    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
     DN dn = DN.decode(rawDN);
 
     for (int i=1;i<perfRep;i++)
@@ -1538,8 +1596,7 @@
     long buildnew = 0;
     long t1,t2,t3,t31,t4,t5,t6 = 0;
 
-    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
-        (short) 123, (short) 45);
+    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
     DN dn = DN.decode(rawdn);
 
     for (int i=1;i<perfRep;i++)
@@ -1627,8 +1684,7 @@
       DeleteOperationBasis opBasis =
         new DeleteOperationBasis(connection, 1, 1,null, DN.decode(rawDN));
       LocalBackendDeleteOperation op = new LocalBackendDeleteOperation(opBasis);
-      ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
-        (short) 123, (short) 45);
+      ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
       op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
       t2 = System.nanoTime();
       createop += (t2 - t1);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
index a5f21ce..d277405 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2007-2008 Sun Microsystems, Inc.
+ *      Copyright 2007-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -60,8 +60,11 @@
   // Threshold for status analyzers
   private int degradedStatusThreshold = 5000;
 
+  // The weight of the server
+  private int weight = 1;
+
   /**
-   * Constructor without assured info
+   * Constructor without goup id, assured info and weight
    */
   public ReplServerFakeConfiguration(
       int port, String dirName, int purgeDelay, int serverId,
@@ -103,7 +106,7 @@
   }
   
   /**
-   * Constructor with assured info
+   * Constructor with group id and assured info
    */
   public ReplServerFakeConfiguration(
       int port, String dirName, int purgeDelay, int serverId,
@@ -117,6 +120,19 @@
   }
 
   /**
+   * Constructor with group id, assured info and weight
+   */
+  public ReplServerFakeConfiguration(
+      int port, String dirName, int purgeDelay, int serverId,
+      int queueSize, int windowSize, SortedSet<String> servers,
+      int groupId, long assuredTimeout, int degradedStatusThreshold, int weight)
+  {
+    this(port, dirName, purgeDelay, serverId, queueSize, windowSize, servers,
+      groupId, assuredTimeout, degradedStatusThreshold);
+    this.weight = weight;
+  }
+
+  /**
    * {@inheritDoc}
    */
   public void addChangeListener(
@@ -233,4 +249,9 @@
     this.degradedStatusThreshold = degradedStatusThreshold;
   }
 
+  public int getWeight()
+  {
+    return weight;
+  }
+
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 0ed272c..d4e3302 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -73,6 +73,7 @@
 import org.opends.server.replication.protocol.ModifyMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
 import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -1006,10 +1007,10 @@
 
       // Read the Replication Server state from the ReplServerStartMsg that
       // comes back.
-      ReplServerStartMsg replStartMsg =
-        (ReplServerStartMsg) session.receive();
-      int serverwindow = replStartMsg.getWindowSize();
-      ServerState replServerState = replStartMsg.getServerState();
+      ReplServerStartDSMsg replStartDSMsg =
+        (ReplServerStartDSMsg) session.receive();
+      int serverwindow = replStartDSMsg.getWindowSize();
+      ServerState replServerState = replStartDSMsg.getServerState();
 
       if (!sslEncryption)
       {
@@ -1052,9 +1053,9 @@
           sslEncryption, (byte)10);
       session.publish(msg);
 
-      // Read the ReplServerStartMsg that should come back.
+      // Read the ReplServerStartDSMsg that should come back.
       repMsg = session.receive();
-      assertTrue(repMsg instanceof ReplServerStartMsg);
+      assertTrue(repMsg instanceof ReplServerStartDSMsg);
 
       if (!sslEncryption)
       {
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 32dbfe0..6940355 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -464,15 +464,15 @@
       }
       String exportedData=exportedDataBuilder.toString();
       domain1 = new FakeReplicationDomain(
-          testService, (short) 1, servers1,
+          testService, 1, servers1,
           100, 0, exportedData, null, ENTRYCOUNT);
 
       StringBuilder importedData = new StringBuilder();
       domain2 = new FakeReplicationDomain(
-          testService, (short) 2, servers2, 100, 0,
+          testService, 2, servers2, 100, 0,
           null, importedData, 0);
 
-      domain2.initializeFromRemote((short)1);
+      domain2.initializeFromRemote(1);
 
       int count = 0;
       while ((importedData.length() < exportedData.length()) && (count < 500))

--
Gitblit v1.10.0