mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
08.02.2009 c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446
- Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS 
in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg
contains same thing as ReplServerStartMsg but also contains

- replication server weight

- number of ciurrently connected DS on the RS

=> both will be used for future new RS choice algorithm



- Addition of a StopMsg sent:

- when any entity (DS,RS) is closing a connection (sent just before)
with a peer

- when DS finishes phase 1 of handshake (was gathering RS info for RS
choice so sent just after new ReplServerStartDSMsg is received)

=> both are used to distinguish between a proper connection closure
(no message) and an unexpected one (error log)



- Compatibility between protocol V4 and V3 (and before)

- changed MonitorMsg to never be created with a protocol version

- MonitorMsg now always sent with publish(msg, version) (publish
method without version was used so bug)

- TopologyMsg now always sent with publish(msg, version) (publish
method without version was used so bug)

2 files added
34 files modified
2002 ■■■■ changed files
opendj-sdk/opends/resource/schema/02-config.ldif 8 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml 35 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/messages/messages/replication.properties 23 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java 21 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java 34 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java 7 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java 9 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java 408 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java 127 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 12 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java 8 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java 70 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java 8 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java 120 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 79 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 4 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java 31 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 30 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 39 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java 108 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java 29 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java 18 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 467 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 8 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java 23 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java 141 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java 5 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java 8 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 68 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java 27 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 13 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 6 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/resource/schema/02-config.ldif
@@ -2453,6 +2453,11 @@
  NAME 'ds-cfg-ecl-include'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.603
  NAME 'ds-cfg-weight'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler'
  SUP top
@@ -3130,7 +3135,8 @@
        ds-cfg-replication-purge-delay $
        ds-cfg-group-id $
        ds-cfg-assured-timeout $
        ds-cfg-degraded-status-threshold)
        ds-cfg-degraded-status-threshold $
        ds-cfg-weight)
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65
  NAME 'ds-backup-directory'
opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -216,11 +216,11 @@
  </adm:property>
  <adm:property name="assured-timeout" mandatory="false">
    <adm:synopsis>
      The timeout value when waiting for assured mode acknowledgements.
      The timeout value when waiting for assured mode acknowledgments.
    </adm:synopsis>
    <adm:description>
      Defines the amount of milliseconds the replication server will wait for
      assured acknowledgements (in either Safe Data or Safe Read assured sub
      assured acknowledgments (in either Safe Data or Safe Read assured sub
      modes) before forgetting them and answer to the entity that sent an update
      and is waiting for acknowledgment.
    </adm:description>
@@ -265,4 +265,35 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="weight" mandatory="false">
    <adm:synopsis>
      The weight of the replication server.
    </adm:synopsis>
    <adm:description>
      The weight affected to the replication server.
      Each replication server of the topology has a weight. When combined
      together, the weights of the replication servers of a same group can be
      translated to a percentage that determines the quantity of directory
      servers of the topology that should be connected to a replication server.
      For instance imagine a topology with 3 replication servers (with the same
      group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
      RS1 should have 25% of the directory servers connected in the topology,
      RS2 25%, and RS3 50%. This may be useful if the replication servers of the
      topology have a different power and one wants to spread the load between
      the replication servers according to their power.
    </adm:description>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>1</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="0"></adm:integer>
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-weight</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
</adm:managed-object>
opendj-sdk/opends/src/messages/messages/replication.properties
@@ -73,7 +73,6 @@
 base dn : %s
MILD_ERR_ERROR_SEARCHING_RUV_15=Error %s when searching for server state %s : \
 %s base dn : %s
NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server %s
NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
 listening on %s
NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \
@@ -175,12 +174,13 @@
NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
 for domain %s with replication server id %s %s - local server id is %s - data \
 generation is %s
NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \
 %s has been dropped by the Replication Server for %s in local server id %s
NOTICE_REPLICATION_SERVER_PROPERLY_DISCONNECTED_63=Replication server %s \
 %s has properly disconnected for %s in local server id %s. Trying to reconnect \
 to a suitable replication server
SEVERE_ERR_CHANGELOG_ERROR_SENDING_ERROR_65=An unexpected error occurred \
 while sending an Error Message to %s. This connection is going to be closed \
 and reopened
SEVERE_ERR_CHANGELOG_ERROR_SENDING_MSG_66=An unexpected error occurred  while \
SEVERE_ERR_CHANGELOG_ERROR_SENDING_MSG_66=An unexpected error occurred while \
 sending a Message to %s. This connection is going to be closed and reopened
MILD_ERR_ERROR_REPLAYING_OPERATION_67=Could not replay operation %s with \
 ChangeNumber %s error %s %s
@@ -409,7 +409,7 @@
NOTICE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL_172=The export of \
 domain %s from server %s to all other servers of the topology is forbidden as \
 the source server has some fractional configuration : only fractional servers \
 in a replicated topology does not makes sense
 in a replicated topology does not make sense
MILD_ERR_DRAFT_CHANGENUMBER_DATABASE_173=An error occurred when accessing the \
 database of the draft change number : %s
SEVERE_ERR_INITIALIZATION_FAILED_NOCONN_174=The initialization failed because \
@@ -423,4 +423,15 @@
NOTICE_ERR_LDIF_IMPORT_FRACTIONAL_DATA_SET_IS_FRACTIONAL_177=The LDIF import \
 for importing suffix %s data has been stopped due to fractional configuration \
 inconsistency : imported data set has some fractional configuration but not \
 local server
 local server
SEVERE_ERR_DS_DISCONNECTED_DURING_HANDSHAKE_178=Directory server %s was \
 attempting to connect to replication server %s but has disconnected in \
 handshake phase
SEVERE_ERR_RS_DISCONNECTED_DURING_HANDSHAKE_179=Replication server %s was \
 attempting to connect to replication server %s but has disconnected in \
 handshake phase
SEVERE_ERR_REPLICATION_SERVER_BADLY_DISCONNECTED_180=The connection to \
 replication server %s %s has been unexpectedly dropped by the replication \
 server for %s in local server id %s
SEVERE_ERR_SERVER_BADLY_DISCONNECTED_181= %s has badly disconnected from this \
 replication server %s
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4472,7 +4472,7 @@
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param sourceString The string representing the source
   * @return The source as a short value
   * @return The source as a integer value
   * @throws DirectoryException if the string is not valid
   */
  public int decodeSource(String sourceString)
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
@@ -41,7 +41,7 @@
   * Creates a message.
   *
   * @param serverID The sender server of this message.
   * @param i The server or servers targetted by this message.
   * @param i The server or servers targeted by this message.
   */
  public DoneMsg(int serverID, int i)
  {
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -69,6 +69,11 @@
   */
  private boolean shutdown = false;
  /**
   * Send StopMsg before session closure or not.
   */
  private boolean sendStopBeforeClose = false;
  /**
   * Create a heartbeat monitor thread.
@@ -76,13 +81,16 @@
   * @param session The session on which heartbeats are to be monitored.
   * @param heartbeatInterval The expected interval between heartbeats received
   * (in milliseconds).
   * @param sendStopBeforeClose Should we send a StopMsg before closing the
   *        session ?
   */
  public HeartbeatMonitor(String threadName, ProtocolSession session,
                          long heartbeatInterval)
                          long heartbeatInterval, boolean sendStopBeforeClose)
  {
    super(threadName);
    this.session = session;
    this.heartbeatInterval = heartbeatInterval;
    this.sendStopBeforeClose = sendStopBeforeClose;
  }
  /**
@@ -117,6 +125,17 @@
          {
            // Heartbeat is well overdue so the server is assumed to be dead.
            logError(NOTE_HEARTBEAT_FAILURE.get(currentThread().getName()));
            if (sendStopBeforeClose)
            {
              // V4 protocol introduces a StopMsg to properly end communications
              try
              {
                session.publish(new StopMsg());
              } catch(IOException ioe)
              {
                // Anyway, going to close session, so nothing to do
              }
            }
            session.close();
            break;
          }
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -82,11 +82,6 @@
  SubTopoMonitorData data = new SubTopoMonitorData();
  /**
   * The protocolVersion that should be used when serializing this message.
   */
  private final short protocolVersion;
  /**
   * Creates a new MonitorMsg.
   *
   * @param sender The sender of this message.
@@ -95,25 +90,8 @@
  public MonitorMsg(int sender, int destination)
  {
    super(sender, destination);
    protocolVersion = ProtocolVersion.getCurrentVersion();
  }
  /**
   * Creates a new MonitorMsg with a specific protocol version.
   *
   * @param sender                The sender of this message.
   * @param destination           The destination of this message.
   * @param replicationProtocol   The protocol version to use.
   */
  public MonitorMsg(int sender, int destination,
      short replicationProtocol)
  {
    super(sender, destination);
    protocolVersion = replicationProtocol;
  }
  /**
   * Sets the state of the replication server.
   * @param state The state.
@@ -204,7 +182,6 @@
   */
  public MonitorMsg(byte[] in, short version) throws DataFormatException
  {
    protocolVersion = ProtocolVersion.getCurrentVersion();
    ByteSequenceReader reader = ByteString.wrap(in).asReader();
    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -328,6 +305,17 @@
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
     throws UnsupportedEncodingException
  {
    try
    {
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -42,7 +42,7 @@
   * Creates a message.
   *
   * @param serverID The sender server of this message.
   * @param destination The server or servers targetted by this message.
   * @param destination The server or servers targeted by this message.
   */
  public MonitorRequestMsg(int serverID, int destination)
  {
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -116,6 +116,13 @@
   */
  public abstract String getRemoteAddress();
  /**
   * Retrieve the human readable address of the remote server.
   *
   * @return The human readable address of the remote server.
   */
  public abstract String getReadableRemoteAddress();
  /**
  * Set a timeout value.
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -54,9 +54,12 @@
  public static final short REPLICATION_PROTOCOL_V3 = 3;
  /**
   * 4th version of the replication protocol.
   * Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
   * ECL entry attributes.
   * The constant for the 4th version of the replication protocol.
   * - Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
   *   ECL entry attributes.
   * - Modified algorithm for choosing a RS to connect to: introduction of a
   *   ReplicationServerDSMsg message.
   * - Introduction of a StopMsg for proper connections ending.
   */
  public static final short REPLICATION_PROTOCOL_V4 = 4;
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
New file
@@ -0,0 +1,408 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
/**
 * Message sent by a replication server to a directory server in reply to the
 * ServerStartMsg.
 */
public class ReplServerStartDSMsg extends StartMsg
{
  private int serverId;
  private String serverURL;
  private String baseDn = null;
  private int windowSize;
  private ServerState serverState;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  /**
   * Threshold value used by the RS to determine if a DS must be put in
   * degraded status because the number of pending changes for him has crossed
   * this value. This field is only used by a DS.
   */
  private int degradedStatusThreshold = -1;
  /**
   * The weight affected to the replication server.
   */
  private int weight = -1;
  /**
   * Number of currently connected DS to the replication server.
   */
  private int connectedDSNumber = -1;
  /**
   * Create a ReplServerStartDSMsg.
   *
   * @param serverId replication server id
   * @param serverURL replication server URL
   * @param baseDn base DN for which the ReplServerStartDSMsg is created.
   * @param windowSize The window size.
   * @param serverState our ServerState for this baseDn.
   * @param protocolVersion The replication protocol version of the creator.
   * @param generationId The generationId for this server.
   * @param sslEncryption Whether to continue using SSL to encrypt messages
   *                      after the start messages have been exchanged.
   * @param groupId The group id of the RS
   * @param degradedStatusThreshold The degraded status threshold
   * @param weight The weight affected to the replication server.
   * @param connectedDSNumber Number of currently connected DS to the
   *        replication server.
   */
  public ReplServerStartDSMsg(int serverId, String serverURL, String baseDn,
                               int windowSize,
                               ServerState serverState,
                               short protocolVersion,
                               long generationId,
                               boolean sslEncryption,
                               byte groupId,
                               int degradedStatusThreshold,
                               int weight,
                               int connectedDSNumber)
  {
    super(protocolVersion, generationId);
    this.serverId = serverId;
    this.serverURL = serverURL;
    if (baseDn != null)
      this.baseDn = baseDn;
    else
      this.baseDn = null;
    this.windowSize = windowSize;
    this.serverState = serverState;
    this.sslEncryption = sslEncryption;
    this.groupId = groupId;
    this.degradedStatusThreshold = degradedStatusThreshold;
    this.weight = weight;
    this.connectedDSNumber = connectedDSNumber;
  }
  /**
   * Creates a new ReplServerStartDSMsg by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the
   *             ReplServerStartDSMsg
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ReplServerStartDSMsg.
   */
  public ReplServerStartDSMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[1];
    allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START_DS;
    headerLength = decodeHeader(allowedPduTypes, in);
    try
    {
      /* The ReplServerStartDSMsg payload is stored in the form :
       * <baseDn><serverId><serverURL><windowSize><sslEncryption>
       * <degradedStatusThreshold><weight><connectedDSNumber>
       * <serverState>
       */
      /* first bytes are the header */
      int pos = headerLength;
      /* read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDn = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /**
       * read the degraded status threshold
       */
      length = getNextLength(in, pos);
      degradedStatusThreshold =
        Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      /*
       * read the weight
       */
      length = getNextLength(in, pos);
      String weightString = new String(in, pos, length, "UTF-8");
      weight = Integer.valueOf(weightString);
      pos += length +1;
      /*
       * read the connected DS number
       */
      length = getNextLength(in, pos);
      String connectedDSNumberString = new String(in, pos, length, "UTF-8");
      connectedDSNumber = Integer.valueOf(connectedDSNumberString);
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Get the Server Id.
   * @return the server id
   */
  public int getServerId()
  {
    return this.serverId;
  }
  /**
   * Get the server URL.
   * @return the server URL
   */
  public String getServerURL()
  {
    return this.serverURL;
  }
  /**
   * Get the base DN from this ReplServerStartDSMsg.
   *
   * @return the base DN from this ReplServerStartDSMsg.
   */
  public String getBaseDn()
  {
    return baseDn;
  }
  /**
   * Get the serverState.
   * @return Returns the serverState.
   */
  public ServerState getServerState()
  {
    return this.serverState;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
     throws UnsupportedEncodingException
  {
    /* The ReplServerStartDSMsg is stored in the form :
     * <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><weight><connectedDSNumber>
     * <serverState>
     */
    byte[] byteDn = baseDn.getBytes("UTF-8");
    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
    byte[] byteServerState = serverState.getBytes();
    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
    byte[] byteSSLEncryption =
      String.valueOf(sslEncryption).getBytes("UTF-8");
    byte[] byteDegradedStatusThreshold =
      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
    byte[] byteWeight =
      String.valueOf(weight).getBytes("UTF-8");
    byte[] byteConnectedDSNumber =
      String.valueOf(connectedDSNumber).getBytes("UTF-8");
    int length = byteDn.length + 1 + byteServerId.length + 1 +
      byteServerUrl.length + 1 + byteWindowSize.length + 1 +
      byteSSLEncryption.length + 1 + byteDegradedStatusThreshold.length + 1 +
      byteWeight.length + 1 + byteConnectedDSNumber.length + 1 +
      byteServerState.length + 1;
    /* encode the header in a byte[] large enough */
    byte resultByteArray[] =
      encodeHeader(MSG_TYPE_REPL_SERVER_START_DS, length, protocolVersion);
    int pos = headerLength;
    /* put the baseDN and a terminating 0 */
    pos = addByteArray(byteDn, resultByteArray, pos);
    /* put the ServerId */
    pos = addByteArray(byteServerId, resultByteArray, pos);
    /* put the ServerURL */
    pos = addByteArray(byteServerUrl, resultByteArray, pos);
    /* put the window size */
    pos = addByteArray(byteWindowSize, resultByteArray, pos);
    /* put the SSL Encryption setting */
    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
    /* put the degraded status threshold */
    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the weight */
    pos = addByteArray(byteWeight, resultByteArray, pos);
    /* put the connected DS number */
    pos = addByteArray(byteConnectedDSNumber, resultByteArray, pos);
    /* put the ServerState */
    pos = addByteArray(byteServerState, resultByteArray, pos);
    return resultByteArray;
  }
  /**
   * get the window size for the server that created this message.
   *
   * @return The window size for the server that created this message.
   */
  public int getWindowSize()
  {
    return windowSize;
  }
  /**
   * Get the SSL encryption value for the server that created the
   * message.
   *
   * @return The SSL encryption value for the server that created the
   *         message.
   */
  public boolean getSSLEncryption()
  {
    return sslEncryption;
  }
  /**
   * Get the degraded status threshold value.
   * @return The degraded status threshold value.
   */
  public int getDegradedStatusThreshold()
  {
    return degradedStatusThreshold;
  }
  /**
   * Set the degraded status threshold (For test purpose).
   * @param degradedStatusThreshold The degraded status threshold to set.
   */
  public void setDegradedStatusThreshold(int degradedStatusThreshold)
  {
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    return "ReplServerStartDSMsg content: " +
      "\nprotocolVersion: " + protocolVersion +
      "\ngenerationId: " + generationId +
      "\nbaseDn: " + baseDn +
      "\ngroupId: " + groupId +
      "\nserverId: " + serverId +
      "\nserverState: " + serverState +
      "\nserverURL: " + serverURL +
      "\nsslEncryption: " + sslEncryption +
      "\ndegradedStatusThreshold: " + degradedStatusThreshold +
      "\nwindowSize: " + windowSize +
      "\nweight: " + weight +
      "\nconnectedDSNumber: " + connectedDSNumber;
  }
  /**
   * Gets the weight of the replication server.
   * @return The weight of the replication server.
   */
  public int getWeight()
  {
    return weight;
  }
  /**
   * Gets the number of directory servers connected to the replication server.
   * @return The number of directory servers connected to the replication
   * server.
   */
  public int getConnectedDSNumber()
  {
    return connectedDSNumber;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -50,6 +50,14 @@
  private boolean sslEncryption;
  /**
   * NOTE: Starting from protocol V4, we introduce a dedicated PDU for answering
   * to the DS ServerStartMsg. This is the ReplServerStartDSMsg. So the
   * degradedStatusThreshold value being used only by a DS, it could be removed
   * from the ReplServerStartMsg PDU. However for a smoothly transition to V4
   * protocol, we prefer to let this variable also in this PDU but the one
   * really used is in the ReplServerStartDSMsg PDU. This prevents from having
   * only RSv3 able to connect to RSv4 as connection initiator.
   *
   * Threshold value used by the RS to determine if a DS must be put in
   * degraded status because the number of pending changes for him has crossed
   * this value. This field is only used by a DS.
@@ -108,6 +116,15 @@
    allowedPduTypes[1] = MSG_TYPE_REPL_SERVER_START_V1;
    headerLength = decodeHeader(allowedPduTypes, in);
    // Protocol version has been read as part of the header:
    // decode the body according to the protocol version read in the header
    switch(protocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        decodeBody_V1(in, headerLength);
        return;
    }
    try
    {
      /* The ReplServerStartMsg payload is stored in the form :
@@ -154,22 +171,85 @@
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // For easiness (no additional method), simply compare PDU type to
      // know if we have to read new parameters of V2
      if (in[0] == MSG_TYPE_REPL_SERVER_START)
      {
        /**
         * read the degraded status threshold
         */
        length = getNextLength(in, pos);
        degradedStatusThreshold =
          Integer.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
      }
      /**
       * read the degraded status threshold
       */
      length = getNextLength(in, pos);
      degradedStatusThreshold =
        Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of sererid string ..) it
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Decodes the body of a just received ReplServerStartMsg. The body is in the
   * passed array, and starts at the provided location. This is for a PDU
   * encoded in V1 protocol version.
   * @param in A byte array containing the body for the ReplServerStartMsg
   * @param pos The position in the array where the decoding should start
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ReplServerStartMsg.
   */
  public void decodeBody_V1(byte[] in, int pos) throws DataFormatException
  {
    try
    {
      /* The ReplServerStartMsg payload is stored in the form :
       * <baseDn><serverId><serverURL><windowSize><sslEncryption>
       * <serverState>
       */
      /* read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDn = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
@@ -235,8 +315,12 @@
  public byte[] getBytes(short protocolVersion)
     throws UnsupportedEncodingException
  {
    if  (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      return getBytes_V1();
    // If an older version requested, encode in the requested way
    switch(protocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        return getBytes_V1();
    }
    /* The ReplServerStartMsg is stored in the form :
     * <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
@@ -254,12 +338,12 @@
      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
    int length = byteDn.length + 1 + byteServerId.length + 1 +
    byteServerUrl.length + 1 + byteWindowSize.length + 1 +
    byteSSLEncryption.length + 1 +
    byteDegradedStatusThreshold.length + 1 +
    byteServerState.length + 1;
      byteServerUrl.length + 1 + byteWindowSize.length + 1 +
      byteSSLEncryption.length + 1 +
      byteDegradedStatusThreshold.length + 1 +
      byteServerState.length + 1;
    /* encode the header in a byte[] large enough to also contain the mods */
    /* encode the header in a byte[] large enough */
    byte resultByteArray[] =
      encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
@@ -377,7 +461,7 @@
                   byteSSLEncryption.length + 1 +
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      /* encode the header in a byte[] large enough */
      byte resultByteArray[] = encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1,
        length);
      int pos = headerLength;
@@ -407,5 +491,4 @@
      return null;
    }
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -71,12 +71,16 @@
  static final byte MSG_TYPE_CHANGE_STATUS = 28;
  static final byte MSG_TYPE_GENERIC_UPDATE = 29;
  // Protocol version : 3
  // Added for protocol version 3
  static final byte MSG_TYPE_START_ECL = 30;
  static final byte MSG_TYPE_START_ECL_SESSION = 31;
  static final byte MSG_TYPE_ECL_UPDATE = 32;
  static final byte MSG_TYPE_CT_HEARTBEAT = 33;
  // Added for protocol version 4
  static final byte MSG_TYPE_REPL_SERVER_START_DS = 34;
  static final byte MSG_TYPE_STOP = 35;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -238,6 +242,12 @@
      case MSG_TYPE_CT_HEARTBEAT:
        msg = new ChangeTimeHeartbeatMsg(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_START_DS:
        msg = new ReplServerStartDSMsg(buffer);
      break;
      case MSG_TYPE_STOP:
        msg = new StopMsg(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -232,6 +232,14 @@
  /**
   * {@inheritDoc}
   */
  public String getReadableRemoteAddress()
  {
    return socket.getRemoteSocketAddress().toString();
  }
  /**
   * {@inheritDoc}
   */
  public void setSoTimeout(int timeout) throws SocketException
  {
    socket.setSoTimeout(timeout);
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
New file
@@ -0,0 +1,70 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.util.zip.DataFormatException;
/**
 * This message is part of the replication protocol.
 * This message is sent by a server to tell a peer the communication will be
 * terminated.
 */
public class StopMsg extends ReplicationMsg
{
  /**
   * Creates a message.
   */
  public StopMsg()
  {
  }
  /**
   * Creates a new message by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the message,
   * @throws DataFormatException If the in does not contain a properly,
   *                             encoded message.
   */
  public StopMsg(byte[] in) throws DataFormatException
  {
    // First byte is the type
    if (in[0] != MSG_TYPE_STOP)
      throw new DataFormatException("input is not a valid Stop message: " +
        in[0]);
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    return new byte[]
      {
        MSG_TYPE_STOP
      };
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -243,6 +243,14 @@
  /**
   * {@inheritDoc}
   */
  public String getReadableRemoteAddress()
  {
    return plainSocket.getRemoteSocketAddress().toString();
  }
  /**
   * {@inheritDoc}
   */
  public void setSoTimeout(int timeout) throws SocketException
  {
    plainSocket.setSoTimeout(timeout);
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -145,7 +145,21 @@
          try
          {
            if (session != null)
            {
              // V4 protocol introduces a StopMsg to properly close the
              // connection between servers
              if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
              {
                try
                {
                  session.publish(new StopMsg());
                } catch (IOException ioe)
                {
                  // Anyway, going to close session, so nothing to do
                }
              }
              session.close();
            }
          } catch (IOException e)
          {
            // ignore
@@ -461,7 +475,7 @@
  {
    TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
        this.serverId);
    session.publish(outTopoMsg);
    session.publish(outTopoMsg, protocolVersion);
    return outTopoMsg;
  }
  /**
@@ -500,14 +514,13 @@
        return;
      }
      //
      ReplServerStartMsg outReplServerStartMsg = null;
      StartMsg outStartMsg = null;
      try
      {
        outReplServerStartMsg = sendStartToRemote(protocolVersion);
        outStartMsg = sendStartToRemote(protocolVersion);
        // log
        logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
        logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
        // The session initiator decides whether to use SSL.
        // Until here session is encrypted then it depends on the negotiation
@@ -517,6 +530,13 @@
        // wait and process StartSessionMsg from remote RS
        StartSessionMsg inStartSessionMsg =
          waitAndProcessStartSessionFromRemoteDS();
        if (inStartSessionMsg == null)
        {
          // DS wants to properly close the connection (DS sent a StopMsg)
          logStopReceived();
          abortStart(null);
          return;
        }
        // Send our own TopologyMsg to remote RS
        TopologyMsg outTopoMsg = sendTopoToRemoteDS();
@@ -525,18 +545,12 @@
      }
      catch(IOException e)
      {
        // We do not want polluting error log if error is due to normal session
        // aborted after handshake phase one from a DS that is searching for
        // best suitable RS.
        // don't log a polluting error when connection aborted
        // from a DS that wanted only to perform handshake phase 1 in order
        // to determine the best suitable RS:
        // 1) -> ServerStartMsg
        // 2) <- ReplServerStartMsg
        // 3) connection closure
        throw new DirectoryException(ResultCode.OTHER, null, null);
        Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
          Integer.toString(inServerStartMsg.getServerId()),
          Integer.toString(replicationServerDomain.getReplicationServer().
          getServerId()));
        logError(errMessage);
        throw new DirectoryException(ResultCode.OTHER, errMessage);
      }
      catch (NotSupportedOldVersionPDUException e)
      {
@@ -578,6 +592,65 @@
        replicationServerDomain.release();
    }
  }
  /**
   * Send the ReplServerStartDSMsg to the remote DS.
   * @param requestedProtocolVersion The provided protocol version.
   * @return The StartMsg sent.
   * @throws IOException When an exception occurs.
   */
  private StartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  {
    // Before V4 protocol, we sent a ReplServerStartMsg
    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      ReplServerStartMsg outReplServerStartMsg
      = new ReplServerStartMsg(
          replicationServerId,
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold());
      session.publish(outReplServerStartMsg, requestedProtocolVersion);
      return outReplServerStartMsg;
    }
    else
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      ReplServerStartDSMsg outReplServerStartDSMsg
      = new ReplServerStartDSMsg(
          replicationServerId,
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold(),
          replicationServer.getWeight(),
          replicationServerDomain.getConnectedLDAPservers().size());
      session.publish(outReplServerStartDSMsg);
      return outReplServerStartDSMsg;
    }
  }
  /**
   * Creates a DSInfo structure representing this remote DS.
   * @return The DSInfo structure representing this remote DS
@@ -609,8 +682,10 @@
  }
  /**
   * Wait receiving the StartSessionMsg from the remote DS and process it.
   * @return the startSessionMsg received
   * Wait receiving the StartSessionMsg from the remote DS and process it, or
   * receiving a StopMsg to properly stop the handshake procedure.
   * @return the startSessionMsg received or null DS sent a stop message to
   *         not finish the handshake.
   * @throws DirectoryException
   * @throws IOException
   * @throws ClassNotFoundException
@@ -625,7 +700,12 @@
    ReplicationMsg msg = null;
    msg = session.receive();
    if (!(msg instanceof StartSessionMsg))
    if (msg instanceof StopMsg)
    {
      // DS wants to stop handshake (was just for handshake phase one for RS
      // choice). Return null to make the session be terminated.
      return null;
    } else if (!(msg instanceof StartSessionMsg))
    {
      Message message = Message.raw(
          "Protocol error: StartSessionMsg required." + msg + " received.");
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -331,6 +331,64 @@
  }
  /**
   * Send the ReplServerStartDSMsg to the remote ECL server.
   * @param requestedProtocolVersion The provided protocol version.
   * @return The StartMsg sent.
   * @throws IOException When an exception occurs.
   */
  private StartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  {
    // Before V4 protocol, we sent a ReplServerStartMsg
    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      ReplServerStartMsg outReplServerStartMsg
      = new ReplServerStartMsg(
          replicationServerId,
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold());
      session.publish(outReplServerStartMsg, requestedProtocolVersion);
      return outReplServerStartMsg;
    }
    else
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      ReplServerStartDSMsg outReplServerStartDSMsg
      = new ReplServerStartDSMsg(
          replicationServerId,
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold(),
          replicationServer.getWeight(),
          replicationServerDomain.getConnectedLDAPservers().size());
      session.publish(outReplServerStartDSMsg);
      return outReplServerStartDSMsg;
    }
  }
  /**
   * Creates a new handler object to a remote replication server.
   * @param session The session with the remote RS.
   * @param queueSize The queue size to manage updates to that RS.
@@ -406,12 +464,14 @@
      // lock with timeout
      lockDomain(true);
      this.localGenerationId = replicationServerDomain.getGenerationId();
      // send start to remote
      ReplServerStartMsg outReplServerStartMsg =
      StartMsg outStartMsg =
        sendStartToRemote(protocolVersion);
      // log
      logStartHandshakeRCVandSND(inECLStartMsg, outReplServerStartMsg);
      logStartHandshakeRCVandSND(inECLStartMsg, outStartMsg);
      // until here session is encrypted then it depends on the negociation
      // The session initiator decides whether to use SSL.
@@ -421,6 +481,14 @@
      // wait and process StartSessionMsg from remote RS
      StartECLSessionMsg inStartECLSessionMsg =
        waitAndProcessStartSessionECLFromRemoteServer();
      if (inStartECLSessionMsg == null)
        {
          // client wants to properly close the connection (client sent a
          // StopMsg)
          logStopReceived();
          abortStart(null);
          return;
        }
      logStartECLSessionHandshake(inStartECLSessionMsg);
@@ -462,7 +530,12 @@
    ReplicationMsg msg = null;
    msg = session.receive();
    if (!(msg instanceof StartECLSessionMsg))
    if (msg instanceof StopMsg)
    {
      // client wants to stop handshake (was just for handshake phase one for RS
      // choice). Return null to make the session be terminated.
      return null;
    } else if (!(msg instanceof StartECLSessionMsg))
    {
      Message message = Message.raw(
          "Protocol error: StartECLSessionMsg required." + msg + " received.");
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -169,7 +169,7 @@
    catch (SocketException e)
    {
      // Just ignore the exception and let the thread die as well
      errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
          "for operation " + handler.getOperationId());
      logError(errMessage);
    }
@@ -198,7 +198,7 @@
  }
  /**
   * Loop geting changes from the domain and publishing them either to
   * Loop getting changes from the domain and publishing them either to
   * the provided session or to the ECL session interface.
   * @throws IOException when raised (connection closure)
   * @throws InterruptedException when raised
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -189,6 +189,21 @@
  final private Object domainMonitor = new Object();
  /**
   * The weight affected to the replication server.
   * Each replication server of the topology has a weight. When combined
   * together, the weights of the replication servers of a same group can be
   * translated to a percentage that determines the quantity of directory
   * servers of the topology that should be connected to a replication server.
   * For instance imagine a topology with 3 replication servers (with the same
   * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
   * RS1 should have 25% of the directory servers connected in the topology,
   * RS2 25%, and RS3 50%. This may be useful if the replication servers of the
   * topology have a different power and one wants to spread the load between
   * the replication servers according to their power.
   */
  private int weight = 1;
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this replication server.
@@ -979,6 +994,13 @@
      }
    }
    // Set a potential new weight
    if (weight != configuration.getWeight())
    {
      weight = configuration.getWeight();
      // TODO: send new TopologyMsg
    }
    if ((configuration.getReplicationDBDirectory() != null) &&
        (!dbDirname.equals(configuration.getReplicationDBDirectory())))
    {
@@ -1786,4 +1808,13 @@
    }
    return result;
  }
  /**
   * Gets the weight.
   * @return the weight
   */
  public int getWeight()
  {
    return weight;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1554,20 +1554,8 @@
        // in the topology.
        if (senderHandler.isDataServer())
        {
          MonitorMsg returnMsg;
          if (senderHandler.getProtocolVersion() >
                             ProtocolVersion.REPLICATION_PROTOCOL_V1)
          {
           returnMsg =
          MonitorMsg returnMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
          }
          else
          {
            returnMsg =
              new MonitorMsg(msg.getDestination(), msg.getsenderID(),
                  ProtocolVersion.REPLICATION_PROTOCOL_V1);
          }
          try
          {
@@ -1613,20 +1601,8 @@
          return;
        }
        MonitorMsg monitorMsg;
        if (senderHandler.getProtocolVersion() >
                                  ProtocolVersion.REPLICATION_PROTOCOL_V1)
        {
          monitorMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
        }
        else
        {
          monitorMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID(),
                ProtocolVersion.REPLICATION_PROTOCOL_V1);
        }
        MonitorMsg monitorMsg =
          new MonitorMsg(msg.getDestination(), msg.getsenderID());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -121,6 +121,34 @@
  }
  /**
   * Send the ReplServerStartMsg to the remote RS.
   * @param requestedProtocolVersion The provided protocol version.
   * @return The ReplServerStartMsg sent.
   * @throws IOException When an exception occurs.
   */
  private ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  {
    ReplServerStartMsg outReplServerStartMsg
    = new ReplServerStartMsg(
        replicationServerId,
        replicationServerURL,
        getServiceId(),
        maxRcvWindow,
        replicationServerDomain.getDbServerState(),
        protocolVersion,
        localGenerationId,
        sslEncryption,
        getLocalGroupId(),
        replicationServerDomain.
        getReplicationServer().getDegradedStatusThreshold());
    session.publish(outReplServerStartMsg, requestedProtocolVersion);
    return outReplServerStartMsg;
  }
  /**
   * Creates a new handler object to a remote replication server.
   * @param session The session with the remote RS.
   * @param queueSize The queue size to manage updates to that RS.
@@ -262,6 +290,7 @@
      // lock with timeout
      lockDomain(true);
      this.localGenerationId = replicationServerDomain.getGenerationId();
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(protocolVersion);
@@ -389,6 +418,14 @@
      super.finalizeStart();
    }
    catch(IOException ioe) {
      Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
        Integer.toString(inReplServerStartMsg.getServerId()),
        Integer.toString(replicationServerDomain.getReplicationServer().
        getServerId()));
      logError(errMessage);
      abortStart(errMessage);
    }
    catch(DirectoryException de)
    {
      abortStart(de.getMessageObject());
@@ -425,7 +462,7 @@
  throws IOException
  {
    TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
    session.publish(outTopoMsg);
    session.publish(outTopoMsg, protocolVersion);
    return outTopoMsg;
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,13 +50,14 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -94,17 +95,21 @@
    if (providedMsg != null)
    {
      if (debugEnabled())
        TRACER.debugInfo("In "
            + ((handler!=null)?handler.toString():"Replication Server")
            + " closing session with err=" +
            providedMsg.toString());
        TRACER.debugInfo("In " +
          ((handler != null) ? handler.toString() : "Replication Server") +
          " closing session with err=" +
          providedMsg.toString());
      logError(providedMsg);
    }
    try
    {
      if (providedSession!=null)
      if (providedSession != null)
        // This method is only called when aborting a failing handshake and
        // not StopMsg should be sent in such situation. StopMsg are only
        // expected when full handshake has been performed, or at end of
        // handshake phase 1, when DS was just gathering available RS info
        providedSession.close();
    } catch (IOException ee)
    } catch (IOException e)
    {
      // ignore
    }
@@ -174,7 +179,10 @@
  private int rcvWindow;
  private int rcvWindowSizeHalf;
  private int maxRcvWindow;
  /**
   * The size of the receiving window.
   */
  protected int maxRcvWindow;
  /**
   * Semaphore that the writer uses to control the flow to the remote server.
   */
@@ -197,7 +205,7 @@
   */
  protected long localGenerationId = -1;
  /**
   * The generation id before procesing a new start handshake.
   * The generation id before processing a new start handshake.
   */
  protected long oldGenerationId = -1;
  /**
@@ -210,7 +218,7 @@
  protected boolean initSslEncryption;
  /**
   * The SSL encryption after the negociation with the peer.
   * The SSL encryption after the negotiation with the peer.
   */
  protected boolean sslEncryption;
  /**
@@ -275,17 +283,6 @@
    // be disturbed
    if (session!=null)
    {
      try
      {
        session.publish(
            new ErrorMsg(
                replicationServerDomain.getReplicationServer().getServerId(),
                serverId,
                reason));
      }
      catch(Exception e)
      {
      }
      closeSession(session, reason, this);
    }
@@ -991,7 +988,14 @@
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + this +
          " publishes message:\n" + msg);
    session.publish(msg);
    // Currently only MonitorMsg has to support a backward compatibility
    if (msg instanceof MonitorMsg)
    {
      session.publish(msg, protocolVersion);
    } else
    {
      session.publish(msg);
    }
  }
  /**
@@ -1017,35 +1021,6 @@
  }
  /**
   * Send the ReplServerStartMsg to the remote server (RS or DS).
   * @param requestedProtocolVersion The provided protocol version.
   * @return The ReplServerStartMsg sent.
   * @throws IOException When an exception occurs.
   */
  public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  {
    this.localGenerationId = replicationServerDomain.getGenerationId();
    ReplServerStartMsg outReplServerStartMsg
    = new ReplServerStartMsg(
        replicationServerId,
        replicationServerURL,
        getServiceId(),
        maxRcvWindow,
        replicationServerDomain.getDbServerState(),
        protocolVersion,
        localGenerationId,
        sslEncryption,
        getLocalGroupId(),
        replicationServerDomain.
        getReplicationServer().getDegradedStatusThreshold());
    session.publish(outReplServerStartMsg, requestedProtocolVersion);
    return outReplServerStartMsg;
  }
  /**
   * Sends the provided TopologyMsg to the peer server.
   *
   * @param topoMsg The TopologyMsg message to be sent.
@@ -1058,7 +1033,7 @@
    // V1 Rs do not support the TopologyMsg
    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      session.publish(topoMsg);
      session.publish(topoMsg, protocolVersion);
    }
  }
@@ -1110,6 +1085,18 @@
    if (session != null)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end
        // communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      // Close session to end ServerReader or ServerWriter
      try
      {
@@ -1328,12 +1315,27 @@
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
    }
  }
  /**
   * Log stop message has been received.
   */
  protected void logStopReceived()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
    }
  }
  /**
   * Log the messages involved in the Topology/StartSession handshake.
   * @param inStartECLSessionMsg The message received first.
   */
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -272,6 +272,18 @@
            ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
            replicationServerDomain.processChangeTimeHeartbeatMsg(handler,
                cthbMsg);
          } else if (msg instanceof StopMsg)
          {
            // Peer server is properly disconnecting: go out of here to
            // properly close the server handler going to finally block.
            if (debugEnabled())
            {
              TRACER.debugInfo(handler.toString() + " has properly " +
                "disconnected from this replication server " +
                Integer.toString(replicationServerDomain.getReplicationServer().
                getServerId()));
            }
            return;
          } else if (msg == null)
          {
            /*
@@ -308,7 +320,7 @@
          " reader IO EXCEPTION for serverID=" + serverId + " " +
          this + " " +
          stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
      errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
        Integer.toString(replicationServerDomain.
        getReplicationServer().getServerId()));
      logError(errMessage);
@@ -346,7 +358,7 @@
    finally
    {
      /*
       * The thread only exit the loop above is some error condition
       * The thread only exits the loop above if some error condition
       * happen.
       * Attempt to close the socket and stop the server handler.
       */
@@ -357,6 +369,19 @@
            "In RS " + replicationServerDomain.getReplicationServer().
            getMonitorInstanceName() +
            this + " is closing the session");
        if (handler.getProtocolVersion() >=
          ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // V4 protocol introduces a StopMsg to properly end
          // communications
          try
          {
            session.publish(new StopMsg());
          } catch (IOException ioe)
          {
            // Anyway, going to close session, so nothing to do
          }
        }
        session.close();
      } catch (IOException e)
      {
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -41,6 +41,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -198,7 +200,7 @@
       * The remote host has disconnected and this particular Tree is going to
       * be removed, just ignore the exception and let the thread die as well
       */
      errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
        Integer.toString(replicationServerDomain.
        getReplicationServer().getServerId()));
      logError(errMessage);
@@ -209,7 +211,7 @@
       * The remote host has disconnected and this particular Tree is going to
       * be removed, just ignore the exception and let the thread die as well
       */
      errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
        Integer.toString(replicationServerDomain.
        getReplicationServer().getServerId()));
      logError(errMessage);
@@ -225,6 +227,18 @@
      logError(errMessage);
    }
    finally {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end
        // communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        session.close();
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -68,6 +70,7 @@
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -112,9 +115,6 @@
  // Our replication domain
  private ReplicationDomain domain = null;
  // Trick for avoiding a inner class for many parameters return for
  // performPhaseOneHandshake method.
  private String tmpReadableServerName = null;
  /**
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
@@ -183,7 +183,7 @@
   * @param groupId The group id of our domain.
   * @param changeTimeHeartbeatInterval The interval (in ms) between Change
   *        time  heartbeats are sent to the RS,
   *        or zero if no CN heartbeat shoud be sent.
   *        or zero if no CN heartbeat should be sent.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
    ServerState state, String baseDn, int serverID2, int window,
@@ -290,23 +290,93 @@
  /**
   * Bag class for keeping info we get from a server in order to compute the
   * best one to connect to.
   * best one to connect to. This is in fact a wrapper to a
   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
   */
  public static class ServerInfo
  {
    private ServerState serverState = null;
    private short protocolVersion;
    private long generationId;
    private byte groupId = (byte) -1;
    private int serverId;
    private String serverURL;
    private String baseDn = null;
    private int windowSize;
    private ServerState serverState;
    private boolean sslEncryption;
    private int degradedStatusThreshold = -1;
    // Keeps the -1 value if created with a ReplServerStartMsg
    private int weight = -1;
    // Keeps the -1 value if created with a ReplServerStartMsg
    private int connectedDSNumber = -1;
    /**
     * Constructor.
     * @param serverState Server state of the RS
     * @param groupId Group id of the RS
     * Create a new instance of ServerInfo wrapping the passed message.
     * @param msg Message to wrap.
     * @return The new instance wrapping the passed message.
     * @throws IllegalArgumentException If the passed message has an unexpected
     *                                  type.
     */
    public ServerInfo(ServerState serverState, byte groupId)
    public static ServerInfo newServerInfo(
      ReplicationMsg msg) throws IllegalArgumentException
    {
      this.serverState = serverState;
      this.groupId = groupId;
      if (msg instanceof ReplServerStartMsg)
      {
        // This is a ReplServerStartMsg (RS uses protocol V3 or under)
        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
        return new ServerInfo(replServerStartMsg);
      }
      else if (msg instanceof ReplServerStartDSMsg)
      {
        // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
        return new ServerInfo(replServerStartDSMsg);
      }
      // Unsupported message type: should not happen
      throw new IllegalArgumentException("Unexpected PDU type: " +
        msg.getClass().getName() + " :\n" + msg.toString());
    }
    /**
     * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
     * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
     */
    private ServerInfo(ReplServerStartMsg replServerStartMsg)
    {
      this.protocolVersion = replServerStartMsg.getVersion();
      this.generationId = replServerStartMsg.getGenerationId();
      this.groupId = replServerStartMsg.getGroupId();
      this.serverId = replServerStartMsg.getServerId();
      this.serverURL = replServerStartMsg.getServerURL();
      this.baseDn = replServerStartMsg.getBaseDn();
      this.windowSize = replServerStartMsg.getWindowSize();
      this.serverState = replServerStartMsg.getServerState();
      this.sslEncryption = replServerStartMsg.getSSLEncryption();
      this.degradedStatusThreshold =
        replServerStartMsg.getDegradedStatusThreshold();
    }
    /**
     * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
     * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
     * wrap.
     */
    private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
    {
      this.protocolVersion = replServerStartDSMsg.getVersion();
      this.generationId = replServerStartDSMsg.getGenerationId();
      this.groupId = replServerStartDSMsg.getGroupId();
      this.serverId = replServerStartDSMsg.getServerId();
      this.serverURL = replServerStartDSMsg.getServerURL();
      this.baseDn = replServerStartDSMsg.getBaseDn();
      this.windowSize = replServerStartDSMsg.getWindowSize();
      this.serverState = replServerStartDSMsg.getServerState();
      this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
      this.degradedStatusThreshold =
        replServerStartDSMsg.getDegradedStatusThreshold();
      this.weight = replServerStartDSMsg.getWeight();
      this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
    }
    /**
@@ -326,6 +396,98 @@
    {
      return groupId;
    }
    /**
     * Get the server protocol version.
     * @return the protocolVersion
     */
    public short getProtocolVersion()
    {
      return protocolVersion;
    }
    /**
     * Get the generation id.
     * @return the generationId
     */
    public long getGenerationId()
    {
      return generationId;
    }
    /**
     * Get the server id.
     * @return the serverId
     */
    public int getServerId()
    {
      return serverId;
    }
    /**
     * Get the server URL.
     * @return the serverURL
     */
    public String getServerURL()
    {
      return serverURL;
    }
    /**
     * Get the base dn.
     * @return the baseDn
     */
    public String getBaseDn()
    {
      return baseDn;
    }
    /**
     * Get the window size.
     * @return the windowSize
     */
    public int getWindowSize()
    {
      return windowSize;
    }
    /**
     * Get the ssl encryption.
     * @return the sslEncryption
     */
    public boolean isSslEncryption()
    {
      return sslEncryption;
    }
    /**
     * Get the degraded status threshold.
     * @return the degradedStatusThreshold
     */
    public int getDegradedStatusThreshold()
    {
      return degradedStatusThreshold;
    }
    /**
     * Get the weight.
     * @return the weight. Null if this object is a wrapper for
     * a ReplServerStartMsg.
     */
    public int getWeight()
    {
      return weight;
    }
    /**
     * Get the connected DS number.
     * @return the connectedDSNumber. Null if this object is a wrapper for
     * a ReplServerStartMsg.
     */
    public int getConnectedDSNumber()
    {
      return connectedDSNumber;
    }
  }
  private void connect()
@@ -342,10 +504,34 @@
  }
  /**
   * Contacts all replication servers to get information from them and being
   * able to choose the more suitable.
   * @return the collected information.
   */
  private Map<String, ServerInfo> collectReplicationServersInfo() {
    Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    for (String server : servers)
    {
      // Connect to server and get info about it
      ServerInfo serverInfo = performPhaseOneHandshake(server, false);
      // Store server info in list
      if (serverInfo != null)
      {
        rsInfos.put(server, serverInfo);
      }
    }
    return rsInfos;
  }
  /**
   * Special aspects of connecting as ECL compared to connecting as data server
   * are :
   * - 1 single RS configured
   * - so no choice of the prefered RS
   * - so no choice of the preferred RS
   * - No same groupID polling
   * - ?? Heartbeat
   * - Start handshake is :
@@ -358,10 +544,10 @@
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = this.servers.iterator().next();
    ReplServerStartMsg inReplServerStartMsg
    ReplServerStartDSMsg inReplServerStartDSMsg
      = performECLPhaseOneHandshake(bestServer, true);
    if (inReplServerStartMsg!=null)
    if (inReplServerStartDSMsg!=null)
      performECLPhaseTwoHandshake(bestServer);
  }
@@ -392,8 +578,6 @@
   */
  private void connectAsDataServer()
  {
    HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
    // May have created a broker with null replication domain for
    // unit test purpose.
    if (domain != null)
@@ -418,24 +602,12 @@
       */
      if (debugEnabled())
        TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
            " order to elect the prefered one");
      for (String server : servers)
      {
        // Connect to server and get reply message
        ReplServerStartMsg replServerStartMsg =
          performPhaseOneHandshake(server, false);
            " order to elect the preferred one");
        // Store reply message info in list
        if (replServerStartMsg != null)
        {
          ServerInfo serverInfo =
            new ServerInfo(replServerStartMsg.getServerState(),
            replServerStartMsg.getGroupId());
          rsInfos.put(server, serverInfo);
        }
      } // for servers
      // Get info from every available replication servers
      Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
      ReplServerStartMsg replServerStartMsg = null;
      ServerInfo serverInfo = null;
      if (rsInfos.size() > 0)
      {
@@ -446,19 +618,17 @@
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo(
              "phase 2 : will perform PhaseOneH with the prefered RS.");
        replServerStartMsg = performPhaseOneHandshake(bestServer, true);
              "phase 2 : will perform PhaseOneH with the preferred RS.");
        serverInfo = performPhaseOneHandshake(bestServer, true);
        if (replServerStartMsg != null) // Handshake phase 1 exchange went well
        if (serverInfo != null) // Handshake phase 1 exchange went well
        {
          ServerInfo bestServerInfo = rsInfos.get(bestServer);
          // Compute in which status we are starting the session to tell the RS
          ServerStatus initStatus =
            computeInitialServerStatus(replServerStartMsg.getGenerationId(),
            bestServerInfo.getServerState(),
            replServerStartMsg.getDegradedStatusThreshold(),
            computeInitialServerStatus(serverInfo.getGenerationId(),
            serverInfo.getServerState(),
            serverInfo.getDegradedStatusThreshold(),
            this.getGenerationID());
          // Perfom session start (handshake phase 2)
@@ -485,7 +655,7 @@
               * reconnection at that time to retrieve a server with our group
               * id.
               */
              byte tmpRsGroupId = bestServerInfo.getGroupId();
              byte tmpRsGroupId = serverInfo.getGroupId();
              boolean someServersWithSameGroupId =
                hasSomeServerWithSameGroupId(topologyMsg.getRsList());
@@ -493,10 +663,10 @@
              if ((tmpRsGroupId == groupId) ||
                ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
              {
                replicationServer = tmpReadableServerName;
                maxSendWindow = replServerStartMsg.getWindowSize();
                rsGroupId = replServerStartMsg.getGroupId();
                rsServerId = replServerStartMsg.getServerId();
                replicationServer = session.getReadableRemoteAddress();
                maxSendWindow = serverInfo.getWindowSize();
                rsGroupId = serverInfo.getGroupId();
                rsServerId = serverInfo.getServerId();
                rsServerUrl = bestServer;
                // May have created a broker with null replication domain for
@@ -504,8 +674,8 @@
                if (domain != null)
                {
                  domain.sessionInitiated(
                      initStatus, replServerStartMsg.getServerState(),
                      replServerStartMsg.getGenerationId(),
                      initStatus, serverInfo.getServerState(),
                      serverInfo.getGenerationId(),
                      session);
                }
                receiveTopo(topologyMsg);
@@ -524,7 +694,7 @@
                 startSameGroupIdPoller();
                }
                startRSHeartBeatMonitoring();
                if (replServerStartMsg.getVersion()
                if (serverInfo.getProtocolVersion()
                    >= ProtocolVersion.REPLICATION_PROTOCOL_V3)
                {
                  startChangeTimeHeartBeatPublishing();
@@ -584,8 +754,8 @@
        rcvWindow = maxRcvWindow;
        connectPhaseLock.notify();
        if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
          (replServerStartMsg.getGenerationId() == -1))
        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
          (serverInfo.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -602,7 +772,7 @@
            baseDn.toString(),
            replicationServer,
            Long.toString(this.getGenerationID()),
            Long.toString(replServerStartMsg.getGenerationId()));
            Long.toString(serverInfo.getGenerationId()));
          logError(message);
        }
      } else
@@ -709,19 +879,19 @@
  /**
   * Connect to the provided server performing the first phase handshake
   * (start messages exchange) and return the reply message from the replication
   * server.
   * server, wrapped in a ServerInfo object.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   *        Use true if want to perform handshake phase 2 with the same session
   *        and keep the session to create as the current one.
   * @return The ReplServerStartMsg the server replied. Null if could not
   * @return The answer from the server . Null if could not
   *         get an answer.
   */
  private ReplServerStartMsg performPhaseOneHandshake(String server,
  private ServerInfo performPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMsg replServerStartMsg = null;
    ServerInfo serverInfo = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
@@ -738,8 +908,6 @@
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      if (keepConnection)
        tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
@@ -759,19 +927,23 @@
      localSession.publish(serverStartMsg);
      /*
       * Read the ReplServerStartMsg that should come back.
       * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
       * back.
       */
      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
      ReplicationMsg msg = localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
      }
        {
          TRACER.debugInfo("In RB for " + baseDn +
            "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
            "\nAND RECEIVED:\n" + msg.toString());
        }
      // Wrap received message in a server info object
      serverInfo = ServerInfo.newServerInfo(msg);
      // Sanity check
      String repDn = replServerStartMsg.getBaseDn();
      String repDn = serverInfo.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -786,7 +958,7 @@
       * if it is an old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
        serverInfo.getProtocolVersion());
      localSession.setProtocolVersion(protocolVersion);
@@ -839,10 +1011,25 @@
    {
      if (localSession != null)
      {
        if (debugEnabled())
          TRACER.debugInfo("In RB, closing session after phase 1");
        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // V4 protocol introduces a StopMsg to properly end communications
          if (!error)
          {
            try
            {
              localSession.publish(new StopMsg());
            } catch (IOException ioe)
            {
              // Anyway, going to close session, so nothing to do
            }
          }
        }
        try
        {
          if (debugEnabled())
            TRACER.debugInfo("In RB, closing session after phase 1");
          localSession.close();
        } catch (IOException e)
        {
@@ -852,7 +1039,7 @@
      }
      if (error)
      {
        replServerStartMsg = null;
        serverInfo = null;
      } // Be sure to return null.
    }
@@ -864,7 +1051,7 @@
      session = localSession;
    }
    return replServerStartMsg;
    return serverInfo;
  }
  /**
@@ -876,13 +1063,13 @@
   * @param keepConnection Do we keep session opened or not after handshake.
   *        Use true if want to perform handshake phase 2 with the same session
   *        and keep the session to create as the current one.
   * @return The ReplServerStartMsg the server replied. Null if could not
   * @return The ReplServerStartDSMsg the server replied. Null if could not
   *         get an answer.
   */
  private ReplServerStartMsg performECLPhaseOneHandshake(String server,
  private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMsg replServerStartMsg = null;
    ReplServerStartDSMsg replServerStartDSMsg = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
@@ -899,8 +1086,6 @@
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      if (keepConnection)
        tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
@@ -920,17 +1105,17 @@
      localSession.publish(serverStartECLMsg);
      // Read the ReplServerStartMsg that should come back.
      replServerStartMsg = (ReplServerStartMsg) localSession.receive();
      replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn +
          "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
          "\nAND RECEIVED:\n" + replServerStartMsg.toString());
          "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
      }
      // Sanity check
      String repDn = replServerStartMsg.getBaseDn();
      String repDn = replServerStartDSMsg.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -946,7 +1131,7 @@
       */
      if (keepConnection)
        protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
          replServerStartDSMsg.getVersion());
      localSession.setProtocolVersion(protocolVersion);
      if (!isSslEncryption)
@@ -998,10 +1183,22 @@
    {
      if (localSession != null)
      {
        if (debugEnabled())
          TRACER.debugInfo("In RB, closing session after phase 1");
        // V4 protocol introduces a StopMsg to properly end communications
        if (!error)
        {
          try
          {
            localSession.publish(new StopMsg());
          } catch (IOException ioe)
          {
            // Anyway, going to close session, so nothing to do
          }
        }
        try
        {
          if (debugEnabled())
            TRACER.debugInfo("In RB, closing session after phase 1");
          localSession.close();
        } catch (IOException e)
        {
@@ -1011,7 +1208,7 @@
      }
      if (error)
      {
        replServerStartMsg = null;
        replServerStartDSMsg = null;
      } // Be sure to return null.
    }
@@ -1023,7 +1220,7 @@
      session = localSession;
    }
    return replServerStartMsg;
    return replServerStartDSMsg;
  }
  /**
@@ -1184,8 +1381,7 @@
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn,
    byte groupId)
    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
  {
    /*
     * Preference is given to servers with the requested group id:
@@ -1195,7 +1391,7 @@
     */
    // Filter for servers with same group id
    HashMap<String, ServerInfo> sameGroupIdRsInfos =
    Map<String, ServerInfo> sameGroupIdRsInfos =
      new HashMap<String, ServerInfo>();
    for (String repServer : rsInfos.keySet())
@@ -1231,7 +1427,7 @@
   * @return The computed best replication server.
   */
  private static String searchForBestReplicationServer(ServerState myState,
    HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn)
    Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
@@ -1266,7 +1462,7 @@
    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
    /*
     * Start loop to differenciate up to date servers from late ones.
     * Start loop to differentiate up to date servers from late ones.
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
    if (myChangeNumber == null)
@@ -1321,6 +1517,7 @@
        if (ReplicationServer.isLocalReplicationServer(upServer))
        {
          localRS = true;
          break;
        }
      }
      if (localRS)
@@ -1459,7 +1656,8 @@
        new HeartbeatMonitor("Replication Heartbeat Monitor on RS " +
        getReplicationServer() + " " + rsServerId + " for " + baseDn +
        " in DS " + serverId,
        session, heartbeatInterval);
        session, heartbeatInterval, (protocolVersion >=
        ProtocolVersion.REPLICATION_PROTOCOL_V4));
      heartbeatMonitor.start();
    }
  }
@@ -1513,16 +1711,28 @@
   */
  public void reStart(ProtocolSession failingSession)
  {
    try
    if (failingSession != null)
    {
      if (failingSession != null)
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end communications
        try
        {
          failingSession.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        failingSession.close();
        numLostConnections++;
      } catch (IOException e1)
      {
        // ignore
      }
    } catch (IOException e1)
    {
      // ignore
      numLostConnections++;
    }
    if (failingSession == session)
@@ -1708,6 +1918,19 @@
          TopologyMsg topoMsg = (TopologyMsg)msg;
          receiveTopo(topoMsg);
        }
        else if (msg instanceof StopMsg)
        {
          /*
           * RS performs a proper disconnection
           */
          Message message =
            NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
            Integer.toString(rsServerId), baseDn.toString(),
            Integer.toString(serverId));
          logError(message);
          // Try to find a suitable RS
          this.reStart(failingSession);
        }
        else
        {
          return msg;
@@ -1723,10 +1946,10 @@
          {
            /*
             * If we did not initiate the close on our side, log a message.
             * We did not initiate the close on our side, log an error message.
             */
            Message message =
              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer,
              ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
                  Integer.toString(rsServerId), baseDn.toString(),
                  Integer.toString(serverId));
            logError(message);
@@ -1783,14 +2006,26 @@
    rsGroupId = (byte) -1;
    rsServerId = -1;
    rsServerUrl = null;
    try
    if (session != null)
    {
      if (session != null)
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        session.close();
      } catch (IOException e)
      {
      }
    } catch (IOException e)
    {
    }
  }
@@ -1896,7 +2131,7 @@
      Collection<String> replicationServers, int window, long heartbeatInterval,
      byte groupId)
  {
    // These parameters needs to be renegociated with the ReplicationServer
    // These parameters needs to be renegotiated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
    // the ReplicationServer.
    Boolean needToRestartSession = false;
@@ -1945,7 +2180,7 @@
  private boolean debugEnabled()
  {
    return true;
    return false;
  }
  private static final void debugInfo(String s)
@@ -2057,13 +2292,13 @@
                continue;
              // Connect to server and get reply message
              ReplServerStartMsg replServerStartMsg =
              ServerInfo serverInfo =
                performPhaseOneHandshake(server, false);
              // Store reply message info in list
              if (replServerStartMsg != null)
              // Is it a server with our group id ?
              if (serverInfo != null)
              {
                if (groupId == replServerStartMsg.getGroupId())
                if (groupId == serverInfo.getGroupId())
                {
                  // Found one server with the same group id as us, disconnect
                  // session to force reconnection to a server with same group
@@ -2072,6 +2307,20 @@
                    Byte.toString(groupId), baseDn.toString(),
                    Integer.toString(serverId));
                  logError(message);
                  if (protocolVersion >=
                    ProtocolVersion.REPLICATION_PROTOCOL_V4)
                  {
                    // V4 protocol introduces a StopMsg to properly end
                    // communications
                    try
                    {
                      session.publish(new StopMsg());
                    } catch (IOException ioe)
                    {
                      // Anyway, going to close session, so nothing to do
                    }
                  }
                  try
                  {
                    session.close();
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -1245,7 +1245,7 @@
   * from which this server can be initialized.
   *
   * @param targetString The string representing the source
   * @return The source as a short value
   * @return The source as a integer value
   * @throws DirectoryException if the string is not valid
   */
  public int decodeTarget(String targetString)
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -3445,7 +3445,7 @@
      SortedSet<String> replServers = new TreeSet<String>();
      replServers.add("localhost:"+replicationServerPort);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn2, (short) 1702, replServers);
        new DomainFakeCfg(baseDn2, 1702, replServers);
      SortedSet<String> includeAttributes = new TreeSet<String>();
      includeAttributes.add("sn");
      domainConf.setEclIncludes(includeAttributes);
@@ -3457,7 +3457,7 @@
          TEST_ROOT_DN_STRING3, TEST_BACKEND_ID3);
      DN baseDn3 = DN.decode(TEST_ROOT_DN_STRING3);
      domainConf =
        new DomainFakeCfg(baseDn3, (short) 1703, replServers);
        new DomainFakeCfg(baseDn3, 1703, replServers);
      includeAttributes = new TreeSet<String>();
      includeAttributes.add("objectclass");
      domainConf.setEclIncludes(includeAttributes);
@@ -3466,7 +3466,7 @@
      replicationPlugin3.completeSynchronizationProvider();
      domainConf =
        new DomainFakeCfg(baseDn2, (short) 1704, replServers);
        new DomainFakeCfg(baseDn2, 1704, replServers);
      includeAttributes = new TreeSet<String>();
      includeAttributes.add("cn");
      domainConf.setEclIncludes(includeAttributes);
@@ -3475,7 +3475,7 @@
      Set<String> attrList = new HashSet<String>();
      attrList.add(new String("cn"));
      ReplicationBroker server01 = openReplicationSession(
          DN.decode(TEST_ROOT_DN_STRING2), (short) 1206,
          DN.decode(TEST_ROOT_DN_STRING2), 1206,
          100, replicationServerPort,
          1000, true, -1 , domain21);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -36,11 +36,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import org.opends.server.types.ResultCode;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -69,13 +66,14 @@
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeValue;
import org.testng.annotations.BeforeClass;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.LockManager;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.testng.annotations.DataProvider;
@@ -478,8 +476,14 @@
          session.stopEncryption();
        }
        // Read start session
        StartSessionMsg startSessionMsg = (StartSessionMsg) session.receive();
        // Read start session or stop
        ReplicationMsg msg = session.receive();
        if (msg instanceof StopMsg){
          // Disconnection of DS looking for best server
          return false;
        }
        StartSessionMsg startSessionMsg = (StartSessionMsg)msg;
        // Sanity checking for assured parameters
        boolean receivedIsAssured = startSessionMsg.isAssured();
@@ -505,7 +509,8 @@
      } catch (IOException e)
      {
        // Probably un-connection of DS looking for best server
        fail("Unexpected io exception in fake replication server handshake " +
          "processing: " + e);
        return false;
      } catch (Exception e)
      {
@@ -1364,9 +1369,9 @@
//      assertFalse(ackMsg.hasTimeout());
//      assertTrue(ackMsg.hasReplayError());
//      assertFalse(ackMsg.hasWrongStatus());
//      List<Short> failedServers = ackMsg.getFailedServers();
//      List<Integer> failedServers = ackMsg.getFailedServers();
//      assertEquals(failedServers.size(), 1);
//      assertEquals((short)failedServers.get(0), (short)1);
//      assertEquals((integer)failedServers.get(0), (integer)1);
    } finally
    {
      endTest();
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -40,6 +40,7 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.testng.annotations.Test;
/**
@@ -98,7 +99,10 @@
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -143,7 +147,10 @@
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -190,7 +197,10 @@
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -237,7 +247,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -285,7 +298,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -295,7 +311,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -345,7 +364,10 @@
    aState.update(cn);
    // This server has less changes than the other one but it has the same
    // group id as us so he should be the winner
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -355,7 +377,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)2));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)2, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -403,7 +428,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)2));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)2, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -413,7 +441,10 @@
    aState.update(cn);
    cn = new ChangeNumber(2L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)2));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)2, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -462,7 +493,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -472,7 +506,10 @@
    aState.update(cn);
    cn = new ChangeNumber(3L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 3
    aState = new ServerState();
@@ -482,7 +519,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -531,7 +571,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -541,7 +584,10 @@
    aState.update(cn);
    cn = new ChangeNumber(3L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)2));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)2, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 3
    aState = new ServerState();
@@ -553,7 +599,10 @@
    aState.update(cn);
    // This server has less changes than looser2 but it has the same
    // group id as us so he should be the winner
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -600,7 +649,10 @@
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -648,7 +700,10 @@
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -658,7 +713,10 @@
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -707,7 +765,10 @@
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -717,7 +778,10 @@
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 3
    aState = new ServerState();
@@ -727,7 +791,10 @@
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -780,7 +847,10 @@
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 2
    aState = new ServerState();
@@ -790,7 +860,10 @@
    aState.update(cn);
    cn = new ChangeNumber(5L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 3
    aState = new ServerState();
@@ -800,7 +873,10 @@
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER3, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER3, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 4
    aState = new ServerState();
@@ -810,7 +886,10 @@
    aState.update(cn);
    cn = new ChangeNumber(8L, 0, myId3);
    aState.update(cn);
    rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 5 (null one for our serverid)
    aState = new ServerState();
@@ -818,7 +897,10 @@
    aState.update(cn);
    cn = new ChangeNumber(5L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER4, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER4, ServerInfo.newServerInfo(replServerStartMsg));
    // State for server 6
    aState = new ServerState();
@@ -828,7 +910,10 @@
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId3);
    aState.update(cn);
    rsInfos.put(LOOSER5, new ServerInfo(aState, (byte)1));
    replServerStartMsg =
      new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
      false, (byte)1, 0);
    rsInfos.put(LOOSER5, ServerInfo.newServerInfo(replServerStartMsg));
    String bestServer =
      computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@@ -849,7 +850,7 @@
    AssuredType assuredType = null;
    int assuredSdLevel = -100;
    SortedSet<String> refUrls = null;
    SortedSet<String> attrs = new TreeSet<String>();
    Set<String> eclIncludes = new HashSet<String>();
    switch (dsId)
      {
@@ -904,7 +905,7 @@
    }
    return new DSInfo(dsId, rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assMode,
       (byte)assuredSdLevel, groupId, urls, attrs);
       (byte)assuredSdLevel, groupId, urls, eclIncludes);
  }
  /**
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -889,7 +889,7 @@
        {"1603303030303030303030303030303030313030303130303030303030300064633" +
         "d746573740066616b65756e69717565696400000200301f0a0102301a040b646573" +
         "6372697074696f6e310b04096e65772076616c756500",
          ModifyMsg.class, new ChangeNumber(1, (short) 0, (short) 1), "dc=test" },
          ModifyMsg.class, new ChangeNumber(1, 0, 1), "dc=test" },
        {"1803303030303031323366313238343132303030326430303030303037620064633" +
         "d636f6d00756e69717565696400000201",
            DeleteMsg.class, new ChangeNumber(0x123f1284120L,123,45), "dc=com"},
@@ -1092,11 +1092,11 @@
    dsList4.add(dsInfo2);
    dsList4.add(dsInfo1);
    RSInfo rsInfo1 = new RSInfo((short)4527, (long)45316, (byte)103);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
    RSInfo rsInfo2 = new RSInfo((short)4527, (long)0, (byte)0);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
    RSInfo rsInfo3 = new RSInfo((short)0, (long)-21113, (byte)98);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
    rsList1.add(rsInfo1);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -886,6 +886,65 @@
      newMsg.getDegradedStatusThreshold());
  }
  @DataProvider(name="createReplServerStartDSData")
  public Object [][] createReplServerStartDSData() throws Exception
  {
    String baseDN = TEST_ROOT_DN_STRING;
    ServerState state = new ServerState();
    state.update(new ChangeNumber((long)0, 0, 0));
    Object[] set1 = new Object[] {1, baseDN, 0, "localhost:8989", state, 0L, (byte)0, 0, 0, 0};
    state = new ServerState();
    state.update(new ChangeNumber((long)75, 5, 263));
    Object[] set2 = new Object[] {16, baseDN, 100, "anotherHost:1025", state, 1245L, (byte)25, 3456, 3, 31512};
    state = new ServerState();
    state.update(new ChangeNumber((long)123, 5, 98));
    Object[] set3 = new Object[] {36, baseDN, 100, "anotherHostAgain:8017", state, 6841L, (byte)32, 2496, 630, 9524};
    return new Object [][] { set1, set2, set3 };
  }
  /**
   * Test that ReplServerStartDSMsg encoding and decoding works
   * by checking that : msg == new ReplServerStartMsg(msg.getBytes()).
   */
  @Test(dataProvider="createReplServerStartDSData")
  public void replServerStartDSMsgTest(int serverId, String baseDN, int window,
         String url, ServerState state, long genId, byte groupId, int degTh,
         int weight, int connectedDSNumber) throws Exception
  {
    ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId,
        url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId,
        true, groupId, degTh, weight, connectedDSNumber);
    ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getServerURL(), newMsg.getServerURL());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getServerState().getMaxChangeNumber(1),
        newMsg.getServerState().getMaxChangeNumber(1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
    assertTrue(msg.getGroupId() == newMsg.getGroupId());
    assertTrue(msg.getDegradedStatusThreshold() ==
      newMsg.getDegradedStatusThreshold());
    assertEquals(msg.getWeight(), newMsg.getWeight());
    assertEquals(msg.getConnectedDSNumber(), newMsg.getConnectedDSNumber());
  }
  /**
   * Test that StopMsg encoding and decoding works
   * by checking that : msg == new StopMsg(msg.getBytes()).
   */
  @Test
  public void stopMsgTest() throws Exception
  {
    StopMsg msg = new StopMsg();
    StopMsg newMsg = new StopMsg(msg.getBytes());
  }
  /**
   * Test that WindowMsg encoding and decoding works
   * by checking that : msg == new WindowMsg(msg.getBytes()).
@@ -1457,8 +1516,7 @@
      new HashMap<AttributeType,List<Attribute>>();
    opList.put(attr.getAttributeType(), operationalAttributes);
    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
        (short) 123, (short) 45);
    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
    DN dn = DN.decode(rawDN);
    for (int i=1;i<perfRep;i++)
@@ -1538,8 +1596,7 @@
    long buildnew = 0;
    long t1,t2,t3,t31,t4,t5,t6 = 0;
    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
        (short) 123, (short) 45);
    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
    DN dn = DN.decode(rawdn);
    for (int i=1;i<perfRep;i++)
@@ -1627,8 +1684,7 @@
      DeleteOperationBasis opBasis =
        new DeleteOperationBasis(connection, 1, 1,null, DN.decode(rawDN));
      LocalBackendDeleteOperation op = new LocalBackendDeleteOperation(opBasis);
      ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
        (short) 123, (short) 45);
      ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
      op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
      t2 = System.nanoTime();
      createop += (t2 - t1);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2007-2008 Sun Microsystems, Inc.
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -60,8 +60,11 @@
  // Threshold for status analyzers
  private int degradedStatusThreshold = 5000;
  // The weight of the server
  private int weight = 1;
  /**
   * Constructor without assured info
   * Constructor without goup id, assured info and weight
   */
  public ReplServerFakeConfiguration(
      int port, String dirName, int purgeDelay, int serverId,
@@ -103,7 +106,7 @@
  }
  
  /**
   * Constructor with assured info
   * Constructor with group id and assured info
   */
  public ReplServerFakeConfiguration(
      int port, String dirName, int purgeDelay, int serverId,
@@ -117,6 +120,19 @@
  }
  /**
   * Constructor with group id, assured info and weight
   */
  public ReplServerFakeConfiguration(
      int port, String dirName, int purgeDelay, int serverId,
      int queueSize, int windowSize, SortedSet<String> servers,
      int groupId, long assuredTimeout, int degradedStatusThreshold, int weight)
  {
    this(port, dirName, purgeDelay, serverId, queueSize, windowSize, servers,
      groupId, assuredTimeout, degradedStatusThreshold);
    this.weight = weight;
  }
  /**
   * {@inheritDoc}
   */
  public void addChangeListener(
@@ -233,4 +249,9 @@
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  public int getWeight()
  {
    return weight;
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -73,6 +73,7 @@
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -1006,10 +1007,10 @@
      // Read the Replication Server state from the ReplServerStartMsg that
      // comes back.
      ReplServerStartMsg replStartMsg =
        (ReplServerStartMsg) session.receive();
      int serverwindow = replStartMsg.getWindowSize();
      ServerState replServerState = replStartMsg.getServerState();
      ReplServerStartDSMsg replStartDSMsg =
        (ReplServerStartDSMsg) session.receive();
      int serverwindow = replStartDSMsg.getWindowSize();
      ServerState replServerState = replStartDSMsg.getServerState();
      if (!sslEncryption)
      {
@@ -1052,9 +1053,9 @@
          sslEncryption, (byte)10);
      session.publish(msg);
      // Read the ReplServerStartMsg that should come back.
      // Read the ReplServerStartDSMsg that should come back.
      repMsg = session.receive();
      assertTrue(repMsg instanceof ReplServerStartMsg);
      assertTrue(repMsg instanceof ReplServerStartDSMsg);
      if (!sslEncryption)
      {
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -464,15 +464,15 @@
      }
      String exportedData=exportedDataBuilder.toString();
      domain1 = new FakeReplicationDomain(
          testService, (short) 1, servers1,
          testService, 1, servers1,
          100, 0, exportedData, null, ENTRYCOUNT);
      StringBuilder importedData = new StringBuilder();
      domain2 = new FakeReplicationDomain(
          testService, (short) 2, servers2, 100, 0,
          testService, 2, servers2, 100, 0,
          null, importedData, 0);
      domain2.initializeFromRemote((short)1);
      domain2.initializeFromRemote(1);
      int count = 0;
      while ((importedData.length() < exportedData.length()) && (count < 500))