mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
19.36.2013 41bef7c0b619c7bc925326451a56071b5736580a
Fix OPENDJ-986: Exception when reading messages from Replication server RS

* make protocol versioning the responsibility of the ProtocolSession object. Previously, it was split between various classes making it very confusing and risk-prone. Now only one class (TLSSocketSession) "owns" the protocol version
* removed ReplicationMsg.getBytes() and require that all message impls implement ReplicationMsg.getBytes(short protocolVersion). Messages which have never evolved since the first protocol version can simply ignore the passed in protocol version parameter
* simplified the handshake process, ensuring that we always set the protocol version once the handshake has completed. Part of this change was to remove the protocol version from the start message constructors, since it is implied during encoding (this removes some ambiguity in these classes)
* many other minor code clean ups and bug fixes.
47 files modified
1152 ■■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/AckMsg.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/DoneMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/EntryMsg.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java 15 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java 36 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java 22 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java 23 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 263 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartMsg.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java 64 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StopMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java 25 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java 41 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/WindowMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 83 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 78 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 7 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 3 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 18 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 73 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 95 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 7 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 7 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java 46 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java 24 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 65 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -214,7 +214,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -91,7 +92,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    /*
     * The message is stored in the form:
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -130,17 +130,6 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -86,7 +87,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -179,13 +179,14 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes() throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException
  {
    byte[] byteCookie    = String.valueOf(cookie).getBytes("UTF-8");
    byte[] byteServiceId = String.valueOf(serviceId).getBytes("UTF-8");
    byte[] byteDraftChangeNumber =
      Integer.toString(draftChangeNumber).getBytes("UTF-8");
    byte[] byteUpdateMsg = updateMsg.getBytes();
    byte[] byteUpdateMsg = updateMsg.getBytes(protocolVersion);
    int length = 1 + byteCookie.length +
                 1 + byteServiceId.length +
opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -150,16 +150,6 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short version)
  {
    try {
opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import org.opends.messages.Message;
@@ -174,15 +175,6 @@
  // ============
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -67,7 +68,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    /*
     * The heartbeat message contains:
opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -103,7 +104,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    try {
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -130,15 +131,6 @@
  // ============
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -176,15 +177,6 @@
  // ============
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -82,7 +82,7 @@
  /**
   * Creates a new UpdateMsg.
   */
  public LDAPUpdateMsg()
  protected LDAPUpdateMsg()
  {
  }
@@ -226,7 +226,7 @@
   */
  public void encode() throws UnsupportedEncodingException
  {
    bytes = getBytes();
    bytes = getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
@@ -347,17 +347,6 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    // Encode in the current protocol version
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -330,15 +330,6 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -88,7 +89,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -49,31 +49,14 @@
  public abstract void close();
  /**
   * This method is called when a ReplicationMsg must be sent to
   * the remote entity. The PDU is send using serialization of the current
   * protocol version.
   * Sends a replication message to the remote peer.
   *
   * It can be called by several threads and must implement appropriate
   * replication (typically, this method or a part of it should be
   * synchronized).
   *
   * @param msg The ReplicationMsg that must be sent.
   * @throws IOException If an IO error happen during the publish process.
   * @param msg
   *          The message to be sent.
   * @throws IOException
   *           If an IO error occurred.
   */
  public abstract void publish(ReplicationMsg msg)
                  throws IOException;
  /**
   * Same as publish(ReplicationMsg msg), but forcing the usage of a particular
   * protocol version for the PDU serialization.
   *
   * @param msg The ReplicationMsg that must be sent.
   * @param reqProtocolVersion The protocol version to use for serialization.
   * The version should normally be older than the current one.
   * @throws IOException If an IO error happen during the publish process.
   */
  public abstract void publish(ReplicationMsg msg, short reqProtocolVersion)
                  throws IOException;
  public abstract void publish(ReplicationMsg msg) throws IOException;
  /**
   * Attempt to receive a ReplicationMsg.
@@ -177,4 +160,11 @@
   * @param version The version of the protocol that is currently used.
   */
  public abstract void setProtocolVersion(short version);
  /**
   * Returns the version of the protocol that is currently used.
   *
   * @return The version of the protocol that is currently used.
   */
  public abstract short getProtocolVersion();
}
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -108,7 +108,7 @@
   * @param version The version to be compared to the current one.
   * @return The minimal protocol version.
   */
  public static short minWithCurrent(short version)
  public static short getCompatibleVersion(short version)
  {
    return (version < CURRENT_VERSION ? version : CURRENT_VERSION);
  }
opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -74,7 +75,6 @@
   * @param baseDn base DN for which the ReplServerStartDSMsg is created.
   * @param windowSize The window size.
   * @param serverState our ServerState for this baseDn.
   * @param protocolVersion The replication protocol version of the creator.
   * @param generationId The generationId for this server.
   * @param sslEncryption Whether to continue using SSL to encrypt messages
   *                      after the start messages have been exchanged.
@@ -87,7 +87,6 @@
  public ReplServerStartDSMsg(int serverId, String serverURL, String baseDn,
                               int windowSize,
                               ServerState serverState,
                               short protocolVersion,
                               long generationId,
                               boolean sslEncryption,
                               byte groupId,
@@ -95,7 +94,7 @@
                               int weight,
                               int connectedDSNumber)
  {
    super(protocolVersion, generationId);
    super((short) -1 /* version set when sending */, generationId);
    this.serverId = serverId;
    this.serverURL = serverURL;
    if (baseDn != null)
@@ -250,17 +249,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
  public byte[] getBytes(short sessionProtocolVersion)
     throws UnsupportedEncodingException
  {
    /* The ReplServerStartDSMsg is stored in the form :
@@ -268,7 +257,6 @@
     * <degradedStatusThreshold><weight><connectedDSNumber>
     * <serverState>
     */
    byte[] byteDn = baseDn.getBytes("UTF-8");
    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
@@ -290,8 +278,8 @@
      byteServerState.length + 1;
    /* encode the header in a byte[] large enough */
    byte resultByteArray[] =
      encodeHeader(MSG_TYPE_REPL_SERVER_START_DS, length, protocolVersion);
    byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START_DS,
        length, sessionProtocolVersion);
    int pos = headerLength;
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -72,7 +73,6 @@
   * @param baseDn base DN for which the ReplServerStartMsg is created.
   * @param windowSize The window size.
   * @param serverState our ServerState for this baseDn.
   * @param protocolVersion The replication protocol version of the creator.
   * @param generationId The generationId for this server.
   * @param sslEncryption Whether to continue using SSL to encrypt messages
   *                      after the start messages have been exchanged.
@@ -82,13 +82,12 @@
  public ReplServerStartMsg(int serverId, String serverURL, String baseDn,
                               int windowSize,
                               ServerState serverState,
                               short protocolVersion,
                               long generationId,
                               boolean sslEncryption,
                               byte groupId,
                               int degradedStatusThreshold)
  {
    super(protocolVersion, generationId);
    super((short) -1 /* version set when sending */, generationId);
    this.serverId = serverId;
    this.serverURL = serverURL;
    if (baseDn != null)
@@ -302,21 +301,11 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
  public byte[] getBytes(short sessionProtocolVersion)
     throws UnsupportedEncodingException
  {
    // If an older version requested, encode in the requested way
    switch(protocolVersion)
    switch(sessionProtocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        return getBytes_V1();
@@ -344,8 +333,8 @@
      byteServerState.length + 1;
    /* encode the header in a byte[] large enough */
    byte resultByteArray[] =
      encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
    byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length,
        sessionProtocolVersion);
    int pos = headerLength;
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -91,176 +91,129 @@
  // change accordingly generateMsg method below
  /**
   * Return the byte[] representation of this message.
   * Depending on the message type, the first byte of the byte[] must be one of
   * the MSG_TYPE* definitions. The serialization is done using the current
   * protocol version. For a serialization using a particular protocol version,
   * call the getBytes(byte protocolVersion) method that should be available
   * for the subclasses (PDUs) that allow such a translation.
   *
   * @return the byte[] representation of this message.
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   * Protected constructor.
   */
  public abstract byte[] getBytes() throws UnsupportedEncodingException;
  protected ReplicationMsg()
  {
    // Nothing to do.
  }
  /**
   * Serializes the PDU using the provided replication protocol version.
   * WARNING: should be overwritten by a PDU (sub class) we want to support
   * older protocol version serialization for.
   * @param reqProtocolVersion The protocol version to use for serialization.
   * The version should normally be older than the current one.
   *
   * @param protocolVersion
   *          The protocol version to use for serialization. The version should
   *          normally be older than the current one.
   * @return The encoded PDU.
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported or the
   *         requested protocol version to use is not supported by this PDU.
   * @throws UnsupportedEncodingException
   *           When the encoding of the message failed because the UTF-8
   *           encoding is not supported or the requested protocol version to
   *           use is not supported by this PDU.
   */
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Of course, always support current protocol version
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
      return getBytes();
    }
  public abstract byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException;
    // Unsupported requested version
    // Any PDU that support older protocol version serialization should
    // overwrite this method for that.
    throw new UnsupportedEncodingException(getClass().getSimpleName() +
      " PDU does not support requested protocol version serialization: " +
      reqProtocolVersion);
  }
  /**
   * Generates a ReplicationMsg from its encoded form. This un-serialization
   * is done taking into account the various supported replication protocol
   * Generates a ReplicationMsg from its encoded form. This un-serialization is
   * done taking into account the various supported replication protocol
   * versions.
   *
   * @param buffer    The encode form of the ReplicationMsg.
   * @param version   The version to use to decode the msg.
   *
   * @param buffer
   *          The encode form of the ReplicationMsg.
   * @param protocolVersion
   *          The version to use to decode the msg.
   * @return The generated SynchronizationMessage.
   *
   * @throws DataFormatException If the encoded form was not a valid msg.
   * @throws UnsupportedEncodingException If UTF8 is not supported.
   * @throws NotSupportedOldVersionPDUException If the PDU is part of an old
   * protocol version and we do not support it.
   * @throws DataFormatException
   *           If the encoded form was not a valid msg.
   * @throws UnsupportedEncodingException
   *           If UTF8 is not supported.
   * @throws NotSupportedOldVersionPDUException
   *           If the PDU is part of an old protocol version and we do not
   *           support it.
   */
  public static ReplicationMsg generateMsg(
                byte[] buffer,
                short version)
                throws DataFormatException, UnsupportedEncodingException,
                NotSupportedOldVersionPDUException
  public static ReplicationMsg generateMsg(byte[] buffer, short protocolVersion)
      throws DataFormatException, UnsupportedEncodingException,
      NotSupportedOldVersionPDUException
  {
    ReplicationMsg msg;
    switch (buffer[0])
    {
      case MSG_TYPE_SERVER_START_V1:
        throw new NotSupportedOldVersionPDUException("Server Start",
    case MSG_TYPE_SERVER_START_V1:
      throw new NotSupportedOldVersionPDUException("Server Start",
          ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]);
      case MSG_TYPE_REPL_SERVER_INFO_V1:
        throw new NotSupportedOldVersionPDUException("Replication Server Info",
    case MSG_TYPE_REPL_SERVER_INFO_V1:
      throw new NotSupportedOldVersionPDUException("Replication Server Info",
          ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]);
      case MSG_TYPE_MODIFY:
        msg = new ModifyMsg(buffer);
      break;
      case MSG_TYPE_MODIFY_V1:
          msg = ModifyMsg.createV1(buffer);
      break;
      case MSG_TYPE_ADD:
      case MSG_TYPE_ADD_V1:
          msg = new AddMsg(buffer);
      break;
      case MSG_TYPE_DELETE:
      case MSG_TYPE_DELETE_V1:
          msg = new DeleteMsg(buffer);
      break;
      case MSG_TYPE_MODIFYDN:
      case MSG_TYPE_MODIFYDN_V1:
          msg = new ModifyDNMsg(buffer);
      break;
      case MSG_TYPE_ACK:
        msg = new AckMsg(buffer);
      break;
      case MSG_TYPE_SERVER_START:
        msg = new ServerStartMsg(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_START:
      case MSG_TYPE_REPL_SERVER_START_V1:
        msg = new ReplServerStartMsg(buffer);
      break;
      case MSG_TYPE_WINDOW:
        msg = new WindowMsg(buffer);
      break;
      case MSG_TYPE_HEARTBEAT:
        msg = new HeartbeatMsg(buffer);
      break;
      case MSG_TYPE_INITIALIZE_REQUEST:
        msg = new InitializeRequestMsg(buffer, version);
      break;
      case MSG_TYPE_INITIALIZE_TARGET:
        msg = new InitializeTargetMsg(buffer, version);
      break;
      case MSG_TYPE_ENTRY:
        msg = new EntryMsg(buffer, version);
      break;
      case MSG_TYPE_DONE:
        msg = new DoneMsg(buffer);
      break;
      case MSG_TYPE_ERROR:
        msg = new ErrorMsg(buffer, version);
      break;
      case MSG_TYPE_RESET_GENERATION_ID:
        msg = new ResetGenerationIdMsg(buffer);
      break;
      case MSG_TYPE_WINDOW_PROBE:
        msg = new WindowProbeMsg(buffer);
      break;
      case MSG_TYPE_TOPOLOGY:
        msg = new TopologyMsg(buffer, version);
      break;
      case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST:
        msg = new MonitorRequestMsg(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_MONITOR:
        msg = new MonitorMsg(buffer, version);
      break;
      case MSG_TYPE_START_SESSION:
        msg = new StartSessionMsg(buffer, version);
      break;
      case MSG_TYPE_CHANGE_STATUS:
        msg = new ChangeStatusMsg(buffer);
      break;
      case MSG_TYPE_GENERIC_UPDATE:
        msg = new UpdateMsg(buffer);
      break;
      case MSG_TYPE_START_ECL:
        msg = new ServerStartECLMsg(buffer);
      break;
      case MSG_TYPE_START_ECL_SESSION:
        msg = new StartECLSessionMsg(buffer);
      break;
      case MSG_TYPE_ECL_UPDATE:
        msg = new ECLUpdateMsg(buffer);
      break;
      case MSG_TYPE_CT_HEARTBEAT:
        msg = new ChangeTimeHeartbeatMsg(buffer, version);
      break;
      case MSG_TYPE_REPL_SERVER_START_DS:
        msg = new ReplServerStartDSMsg(buffer);
      break;
      case MSG_TYPE_STOP:
        msg = new StopMsg(buffer);
      break;
      case MSG_TYPE_INITIALIZE_RCV_ACK:
        msg = new InitializeRcvAckMsg(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    case MSG_TYPE_MODIFY:
      return new ModifyMsg(buffer);
    case MSG_TYPE_MODIFY_V1:
      return ModifyMsg.createV1(buffer);
    case MSG_TYPE_ADD:
    case MSG_TYPE_ADD_V1:
      return new AddMsg(buffer);
    case MSG_TYPE_DELETE:
    case MSG_TYPE_DELETE_V1:
      return new DeleteMsg(buffer);
    case MSG_TYPE_MODIFYDN:
    case MSG_TYPE_MODIFYDN_V1:
      return new ModifyDNMsg(buffer);
    case MSG_TYPE_ACK:
      return new AckMsg(buffer);
    case MSG_TYPE_SERVER_START:
      return new ServerStartMsg(buffer);
    case MSG_TYPE_REPL_SERVER_START:
    case MSG_TYPE_REPL_SERVER_START_V1:
      return new ReplServerStartMsg(buffer);
    case MSG_TYPE_WINDOW:
      return new WindowMsg(buffer);
    case MSG_TYPE_HEARTBEAT:
      return new HeartbeatMsg(buffer);
    case MSG_TYPE_INITIALIZE_REQUEST:
      return new InitializeRequestMsg(buffer, protocolVersion);
    case MSG_TYPE_INITIALIZE_TARGET:
      return new InitializeTargetMsg(buffer, protocolVersion);
    case MSG_TYPE_ENTRY:
      return new EntryMsg(buffer, protocolVersion);
    case MSG_TYPE_DONE:
      return new DoneMsg(buffer);
    case MSG_TYPE_ERROR:
      return new ErrorMsg(buffer, protocolVersion);
    case MSG_TYPE_RESET_GENERATION_ID:
      return new ResetGenerationIdMsg(buffer);
    case MSG_TYPE_WINDOW_PROBE:
      return new WindowProbeMsg(buffer);
    case MSG_TYPE_TOPOLOGY:
      return new TopologyMsg(buffer, protocolVersion);
    case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST:
      return new MonitorRequestMsg(buffer);
    case MSG_TYPE_REPL_SERVER_MONITOR:
      return new MonitorMsg(buffer, protocolVersion);
    case MSG_TYPE_START_SESSION:
      return new StartSessionMsg(buffer, protocolVersion);
    case MSG_TYPE_CHANGE_STATUS:
      return new ChangeStatusMsg(buffer);
    case MSG_TYPE_GENERIC_UPDATE:
      return new UpdateMsg(buffer);
    case MSG_TYPE_START_ECL:
      return new ServerStartECLMsg(buffer);
    case MSG_TYPE_START_ECL_SESSION:
      return new StartECLSessionMsg(buffer);
    case MSG_TYPE_ECL_UPDATE:
      return new ECLUpdateMsg(buffer);
    case MSG_TYPE_CT_HEARTBEAT:
      return new ChangeTimeHeartbeatMsg(buffer, protocolVersion);
    case MSG_TYPE_REPL_SERVER_START_DS:
      return new ReplServerStartDSMsg(buffer);
    case MSG_TYPE_STOP:
      return new StopMsg(buffer);
    case MSG_TYPE_INITIALIZE_RCV_ACK:
      return new InitializeRcvAckMsg(buffer);
    default:
      throw new DataFormatException("received message with unknown type");
    }
    return msg;
  }
  /**
@@ -283,15 +236,21 @@
    return pos;
  }
  /**
   * Get the length of the next String encoded in the in byte array.
   *
   * @param in the byte array where to calculate the string.
   * @param pos the position where to start from in the byte array.
   * @param in
   *          the byte array where to calculate the string.
   * @param pos
   *          the position where to start from in the byte array.
   * @return the length of the next string.
   * @throws DataFormatException If the byte array does not end with null.
   * @throws DataFormatException
   *           If the byte array does not end with null.
   */
  protected int getNextLength(byte[] in, int pos) throws DataFormatException
  protected static int getNextLength(byte[] in, int pos)
      throws DataFormatException
  {
    int offset = pos;
    int length = 0;
opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -83,7 +84,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -73,10 +73,9 @@
   * @param windowSize   The window size used by this server.
   * @param heartbeatInterval The requested heartbeat interval.
   * @param serverState  The state of this server.
   * @param protocolVersion The replication protocol version of the creator.
   * @param generationId The generationId for this server.
   * @param sslEncryption Whether to continue using SSL to encrypt messages
*                      after the start messages have been exchanged.
   *                      after the start messages have been exchanged.
   * @param groupId The group id of the DS for this DN
   */
  public ServerStartECLMsg(String serverURL, int maxReceiveDelay,
@@ -84,12 +83,11 @@
                           int maxSendQueue, int windowSize,
                           long heartbeatInterval,
                           ServerState serverState,
                           short protocolVersion,
                           long generationId,
                           boolean sslEncryption,
                           byte groupId)
  {
    super(protocolVersion, generationId);
    super((short) -1 /* version set when sending */, generationId);
    this.serverURL = serverURL;
    this.maxReceiveDelay = maxReceiveDelay;
@@ -251,7 +249,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short sessionProtocolVersion)
  {
    try {
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
@@ -282,8 +280,8 @@
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(
          MSG_TYPE_START_ECL, length, ProtocolVersion.getCurrentVersion());
      byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length,
          sessionProtocolVersion);
      int pos = headerLength;
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -73,7 +73,6 @@
   * @param windowSize   The window size used by this server.
   * @param heartbeatInterval The requested heartbeat interval.
   * @param serverState  The state of this server.
   * @param protocolVersion The replication protocol version of the creator.
   * @param generationId The generationId for this server.
   * @param sslEncryption Whether to continue using SSL to encrypt messages
   *                      after the start messages have been exchanged.
@@ -81,10 +80,10 @@
   */
  public ServerStartMsg(int serverId2, String serverURL, String baseDn,
      int windowSize, long heartbeatInterval, ServerState serverState,
      short protocolVersion, long generationId, boolean sslEncryption,
      long generationId, boolean sslEncryption,
      byte groupId)
  {
    super(protocolVersion, generationId);
    super((short) -1 /* version set when sending */, generationId);
    this.serverId = serverId2;
    this.serverURL = serverURL;
@@ -282,7 +281,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short sessionProtocolVersion)
  {
    try {
      byte[] byteDn = baseDn.getBytes("UTF-8");
@@ -316,8 +315,8 @@
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(
          MSG_TYPE_SERVER_START, length, ProtocolVersion.getCurrentVersion());
      byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length,
          sessionProtocolVersion);
      int pos = headerLength;
      pos = addByteArray(byteDn, resultByteArray, pos);
opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -212,7 +212,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    String excludedSIDsString = "";
    for (String excludedServiceID : excludedServiceIDs)
opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -54,8 +54,9 @@
  /**
   * Create a new StartMsg.
   */
  public StartMsg()
  protected StartMsg()
  {
    // Nothing to do.
  }
  /**
@@ -78,7 +79,7 @@
   * @param type The type of the message to create.
   * @param additionalLength Additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @param protocolVersion  The version to use when encoding the header.
   * @param sessionProtocolVersion  The version to use when encoding the header.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
@@ -87,7 +88,7 @@
   */
  public byte[] encodeHeader(
      byte type, int additionalLength,
      short protocolVersion)
      short sessionProtocolVersion)
  throws UnsupportedEncodingException
  {
@@ -106,7 +107,7 @@
    encodedMsg[0] = type;
    /* put the protocol version */
    encodedMsg[1] = (byte)protocolVersion;
    encodedMsg[1] = (byte)sessionProtocolVersion;
    /* put the generationId */
    int pos = 2;
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -77,11 +77,6 @@
  private Set<String> eclIncludesForDeletes = new HashSet<String>();
  /**
   * The protocolVersion that should be used when serializing this message.
   */
  private final short protocolVersion;
  /**
   * Creates a new StartSessionMsg message from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
@@ -91,7 +86,6 @@
   */
  public StartSessionMsg(byte[] in, short version) throws DataFormatException
  {
    protocolVersion = ProtocolVersion.getCurrentVersion();
    if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      decode_V23(in);
@@ -103,29 +97,6 @@
  }
  /**
   * Creates a new StartSessionMsg message from its encoded form.
   *
   * Creates a new  message with the given required parameters.
   * @param status Status we are starting with
   * @param referralsURLs Referrals URLs to be used by peer DSs
   * @param assuredFlag If assured mode is enabled or not
   * @param assuredMode Assured type
   * @param safeDataLevel Assured mode safe data level
   * @param replicationProtocol   The protocol version to use.
   */
  public StartSessionMsg(ServerStatus status, List<String> referralsURLs,
      boolean assuredFlag, AssuredMode assuredMode, byte safeDataLevel,
      short replicationProtocol)
  {
    this.referralsURLs = referralsURLs;
    this.status = status;
    this.assuredFlag = assuredFlag;
    this.assuredMode = assuredMode;
    this.safeDataLevel = safeDataLevel;
    this.protocolVersion = replicationProtocol;
  }
  /**
   * Creates a new  message with the given required parameters.
   * @param status Status we are starting with
   * @param referralsURLs Referrals URLs to be used by peer DSs
@@ -141,7 +112,6 @@
    this.assuredFlag = assuredFlag;
    this.assuredMode = assuredMode;
    this.safeDataLevel = safeDataLevel;
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
  }
  /**
@@ -155,23 +125,6 @@
    this.referralsURLs = referralsURLs;
    this.status = status;
    this.assuredFlag = false;
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
  }
  /**
   * Creates a new message with the given required parameters.
   * Assured mode is false.
   * @param status Status we are starting with
   * @param referralsURLs Referrals URLs to be used by peer DSs
   * @param replicationProtocol The requested protocol version.
   */
  public StartSessionMsg(ServerStatus status, List<String> referralsURLs,
    short replicationProtocol)
  {
    this.referralsURLs = referralsURLs;
    this.status = status;
    this.assuredFlag = false;
    this.protocolVersion = replicationProtocol;
  }
  // ============
@@ -182,23 +135,6 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      return getBytes_V23();
    }
    else
    {
      return getBytes_V45(protocolVersion);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
opends/src/server/org/opends/server/replication/protocol/StopMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -60,7 +61,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    return new byte[]
      {
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -312,19 +312,7 @@
  @Override
  public void publish(final ReplicationMsg msg) throws IOException
  {
    publish(msg, ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void publish(final ReplicationMsg msg,
      final short reqProtocolVersion) throws IOException
  {
    final byte[] buffer = msg.getBytes(reqProtocolVersion);
    final byte[] buffer = msg.getBytes(protocolVersion);
    final String str = String.format("%08x", buffer.length);
    final byte[] sendLengthBuf = str.getBytes();
@@ -460,6 +448,17 @@
   * {@inheritDoc}
   */
  @Override
  public short getProtocolVersion()
  {
    return protocolVersion;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void setSoTimeout(final int timeout) throws SocketException
  {
    plainSocket.setSoTimeout(timeout);
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -298,16 +298,6 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short version)
  throws UnsupportedEncodingException
  {
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -109,6 +109,8 @@
  /**
   * Creates a new UpdateMsg with the given informations.
   * <p>
   * This constructor is only used for testing.
   *
   * @param changeNumber  The ChangeNumber associated with the change
   *                      encoded in this message.
@@ -179,18 +181,6 @@
    return changeNumber.compareTo(msg.getChangeNumber());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // There was no change since version 2.
    return getBytes();
  }
  /**
   * Get the assured mode in this message.
   * @return The assured mode in this message
@@ -355,20 +345,37 @@
  }
  /**
   * Returns the encoded representation of this update message using the current
   * protocol version.
   *
   * @return The encoded representation of this update message.
   * @throws UnsupportedEncodingException
   *           If the message could not be encoded.
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * This implementation is only called during unit testing, so we are free to
   * force the protocol version. Underlying implementations override this method
   * in order to provide version specific encodings.
   *
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes() throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException
  {
    /* Encode the header in a byte[] large enough to also contain the payload */
    byte [] resultByteArray =
      encodeHeader(MSG_TYPE_GENERIC_UPDATE, payload.length,
                   ProtocolVersion.getCurrentVersion());
    byte[] resultByteArray = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
        payload.length, ProtocolVersion.getCurrentVersion());
    int pos = resultByteArray.length - payload.length;
    /* Add the payload */
    for (int i=0; i<payload.length; i++,pos++)
    for (int i = 0; i < payload.length; i++, pos++)
    {
      resultByteArray[pos] = payload[i];
    }
opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -93,7 +94,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    /*
     * WindowMsg contains.
opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -68,7 +69,7 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  public byte[] getBytes(short protocolVersion)
  {
    // WindowProbeMsg Message only contains its type.
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -31,6 +31,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.replication.common.StatusMachine.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
@@ -147,7 +148,7 @@
          {
            // V4 protocol introduces a StopMsg to properly close the
            // connection between servers
            if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
            if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
            {
              try
              {
@@ -411,9 +412,9 @@
  public boolean processStartFromRemote(ServerStartMsg serverStartMsg)
  throws DirectoryException
  {
    session
        .setProtocolVersion(getCompatibleVersion(serverStartMsg.getVersion()));
    tmpGenerationId = serverStartMsg.getGenerationId();
    protocolVersion = ProtocolVersion.minWithCurrent(
        serverStartMsg.getVersion());
    serverId = serverStartMsg.getServerId();
    serverURL = serverStartMsg.getServerURL();
    groupId = serverStartMsg.getGroupId();
@@ -451,14 +452,14 @@
  }
  // Send our own TopologyMsg to DS
  private TopologyMsg sendTopoToRemoteDS()
  throws IOException
  private TopologyMsg sendTopoToRemoteDS() throws IOException
  {
    TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
        this.serverId);
    session.publish(outTopoMsg, protocolVersion);
    TopologyMsg outTopoMsg = replicationServerDomain
        .createTopologyMsgForDS(this.serverId);
    sendTopoInfo(outTopoMsg);
    return outTopoMsg;
  }
  /**
   * Starts the handler from a remote ServerStart message received from
   * the remote data server.
@@ -512,7 +513,7 @@
      try
      {
        StartMsg outStartMsg = sendStartToRemote(protocolVersion);
        StartMsg outStartMsg = sendStartToRemote();
        // log
        logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
@@ -597,61 +598,41 @@
  }
  /**
   * Send the ReplServerStartDSMsg to the remote DS.
   * @param requestedProtocolVersion The provided protocol version.
   * Sends a start message to the remote DS.
   *
   * @return The StartMsg sent.
   * @throws IOException When an exception occurs.
   * @throws IOException
   *           When an exception occurs.
   */
  private StartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  private StartMsg sendStartToRemote() throws IOException
  {
    final StartMsg startMsg;
    // Before V4 protocol, we sent a ReplServerStartMsg
    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
    if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      ReplServerStartMsg outReplServerStartMsg
      = new ReplServerStartMsg(
          replicationServerId,
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
      startMsg = new ReplServerStartMsg(replicationServerId,
          replicationServerURL, getServiceId(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold());
      session.publish(outReplServerStartMsg, requestedProtocolVersion);
      return outReplServerStartMsg;
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
              .getDegradedStatusThreshold());
    }
    else
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      ReplServerStartDSMsg outReplServerStartDSMsg
      = new ReplServerStartDSMsg(
          replicationServerId,
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
      startMsg = new ReplServerStartDSMsg(replicationServerId,
          replicationServerURL, getServiceId(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold(),
          replicationServer.getWeight(),
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
              .getDegradedStatusThreshold(), replicationServer.getWeight(),
          replicationServerDomain.getConnectedLDAPservers().size());
      session.publish(outReplServerStartDSMsg);
      return outReplServerStartDSMsg;
    }
    send(startMsg);
    return startMsg;
  }
  /**
@@ -662,7 +643,7 @@
  {
    return new DSInfo(serverId, serverURL, replicationServerId, generationId,
      status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
      eclIncludes, eclIncludesForDeletes, protocolVersion);
      eclIncludes, eclIncludesForDeletes, getProtocolVersion());
  }
  /**
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -30,6 +30,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
@@ -325,13 +326,12 @@
  {
    try
    {
      protocolVersion = ProtocolVersion.minWithCurrent(
          inECLStartMsg.getVersion());
      generationId = inECLStartMsg.getGenerationId();
      session.setProtocolVersion(getCompatibleVersion(inECLStartMsg
          .getVersion()));
      serverURL = inECLStartMsg.getServerURL();
      setInitialServerState(inECLStartMsg.getServerState());
      setSendWindowSize(inECLStartMsg.getWindowSize());
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        // We support connection from a V1 RS
        // Only V2 protocol has the group id in repl server start message
@@ -347,59 +347,38 @@
  }
  /**
   * Send the ReplServerStartDSMsg to the remote ECL server.
   * @param requestedProtocolVersion The provided protocol version.
   * Sends a start message to the remote ECL server.
   *
   * @return The StartMsg sent.
   * @throws IOException When an exception occurs.
   * @throws IOException
   *           When an exception occurs.
   */
  private StartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  private StartMsg sendStartToRemote() throws IOException
  {
    final StartMsg startMsg;
    // Before V4 protocol, we sent a ReplServerStartMsg
    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
    if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      ReplServerStartMsg outReplServerStartMsg
      = new ReplServerStartMsg(
          replicationServerId,
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
      startMsg = new ReplServerStartMsg(replicationServerId,
          replicationServerURL, getServiceId(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold());
      session.publish(outReplServerStartMsg, requestedProtocolVersion);
      return outReplServerStartMsg;
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
              .getDegradedStatusThreshold());
    }
    else
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      ReplServerStartDSMsg outReplServerStartDSMsg
      = new ReplServerStartDSMsg(
          replicationServerId,
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
          new ServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          0,
          replicationServer.getWeight(),
          0);
      session.publish(outReplServerStartDSMsg);
      return outReplServerStartDSMsg;
      startMsg = new ReplServerStartDSMsg(replicationServerId,
          replicationServerURL, getServiceId(), maxRcvWindow,
          new ServerState(), localGenerationId, sslEncryption,
          getLocalGroupId(), 0, replicationServer.getWeight(), 0);
    }
    send(startMsg);
    return startMsg;
  }
  /**
@@ -476,14 +455,15 @@
        processStartFromRemote(inECLStartMsg);
      // lock with timeout
      if (this.replicationServerDomain != null)
      if (replicationServerDomain != null)
      {
        lockDomain(true);
      }
      this.localGenerationId = -1;
      localGenerationId = -1;
      // send start to remote
      StartMsg outStartMsg =
        sendStartToRemote(protocolVersion);
      StartMsg outStartMsg = sendStartToRemote();
      // log
      logStartHandshakeRCVandSND(inECLStartMsg, outStartMsg);
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -64,7 +64,6 @@
  private ProtocolSession session;
  private ECLServerHandler handler;
  private ReplicationServerDomain replicationServerDomain;
  private short protocolVersion = -1;
  private boolean suspended;
  private boolean shutdown;
  private PersistentSearch mypsearch;
@@ -88,8 +87,6 @@
    this.session = session;
    this.handler = handler;
    this.replicationServerDomain = replicationServerDomain;
    // Keep protocol version locally for efficiency
    this.protocolVersion = handler.getProtocolVersion();
    this.suspended = false;
    this.shutdown = false;
@@ -249,7 +246,7 @@
            // Done is used to end phase 1
            session.publish(new DoneMsg(
                handler.getReplicationServerId(),
                handler.getServerId()), protocolVersion);
                handler.getServerId()));
          }
        }
@@ -295,7 +292,7 @@
    if (session!=null)
    {
      session.publish(msg, protocolVersion);
      session.publish(msg);
    }
    else
    {
opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
@@ -322,15 +322,6 @@
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes() throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public AssuredMode getAssuredMode()
  {
    return realUpdateMsg.getAssuredMode();
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -330,21 +330,18 @@
        if (msg instanceof ServerStartMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          DataServerHandler handler = new DataServerHandler(session,
              queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteDS((ServerStartMsg)msg);
        }
        else if (msg instanceof ReplServerStartMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          ReplicationServerHandler handler = new ReplicationServerHandler(
              session,queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteRS((ReplServerStartMsg)msg);
        }
        else if (msg instanceof ServerStartECLMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          ECLServerHandler handler = new ECLServerHandler(
              session,queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteServer((ServerStartECLMsg)msg);
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -617,8 +617,7 @@
    if (preparedAssuredInfo.expectedServers == null)
    {
      // No eligible servers found, send the ack immediately
      AckMsg ack = new AckMsg(cn);
      sourceHandler.sendAck(ack);
      sourceHandler.send(new AckMsg(cn));
    }
    return preparedAssuredInfo;
@@ -672,8 +671,7 @@
             * mode with safe data level 1, coming from a DS. No need to wait
             * for more acks
             */
            AckMsg ack = new AckMsg(cn);
            sourceHandler.sendAck(ack);
            sourceHandler.send(new AckMsg(cn));
          } else
          {
            /**
@@ -700,8 +698,7 @@
          } else
          {
            // level > 1, so Ack this message to originator RS
            AckMsg ack = new AckMsg(cn);
            sourceHandler.sendAck(ack);
            sourceHandler.send(new AckMsg(cn));
          }
        }
      }
@@ -753,8 +750,7 @@
      {
        // level > 1 and source is a DS but no eligible servers found, send the
        // ack immediately
        AckMsg ack = new AckMsg(cn);
        sourceHandler.sendAck(ack);
        sourceHandler.send(new AckMsg(cn));
      }
    }
@@ -798,7 +794,7 @@
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          try
          {
            origServer.sendAck(finalAck);
            origServer.send(finalAck);
          } catch (IOException e)
          {
            /**
@@ -877,7 +873,7 @@
              Integer.toString(origServer.getServerId()));
          try
          {
            origServer.sendAck(finalAck);
            origServer.send(finalAck);
          } catch (IOException e)
          {
            /**
@@ -2489,7 +2485,7 @@
            getReplicationServer().getServerId(),
            handler.getServerId(),
            message);
        handler.sendError(errorMsg);
        handler.send(errorMsg);
      }
      /*
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -30,6 +30,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import java.io.IOException;
import java.util.ArrayList;
@@ -76,8 +77,8 @@
  {
    try
    {
      protocolVersion = ProtocolVersion.minWithCurrent(
          inReplServerStartMsg.getVersion());
      short protocolVersion = getCompatibleVersion(inReplServerStartMsg
          .getVersion());
      session.setProtocolVersion(protocolVersion);
      generationId = inReplServerStartMsg.getGenerationId();
      serverId = inReplServerStartMsg.getServerId();
@@ -107,30 +108,21 @@
  }
  /**
   * Send the ReplServerStartMsg to the remote RS.
   * @param requestedProtocolVersion The provided protocol version.
   * Sends a start message to the remote RS.
   *
   * @return The ReplServerStartMsg sent.
   * @throws IOException When an exception occurs.
   * @throws IOException
   *           When an exception occurs.
   */
  private ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
  throws IOException
  private ReplServerStartMsg sendStartToRemote() throws IOException
  {
    ReplServerStartMsg outReplServerStartMsg
    = new ReplServerStartMsg(
        replicationServerId,
        replicationServerURL,
        getServiceId(),
        maxRcvWindow,
        replicationServerDomain.getDbServerState(),
        protocolVersion,
        localGenerationId,
        sslEncryption,
        getLocalGroupId(),
        replicationServerDomain.
        getReplicationServer().getDegradedStatusThreshold());
    session.publish(outReplServerStartMsg, requestedProtocolVersion);
    ReplServerStartMsg outReplServerStartMsg = new ReplServerStartMsg(
        replicationServerId, replicationServerURL, getServiceId(),
        maxRcvWindow, replicationServerDomain.getDbServerState(),
        localGenerationId, sslEncryption,
        getLocalGroupId(), replicationServerDomain.getReplicationServer()
            .getDegradedStatusThreshold());
    send(outReplServerStartMsg);
    return outReplServerStartMsg;
  }
@@ -178,8 +170,7 @@
      lockDomain(false); // no timeout
      // Send start
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(ProtocolVersion.getCurrentVersion());
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
      // Wait answer
      ReplicationMsg msg = session.receive();
@@ -203,7 +194,7 @@
        }
      }
      // Process hello from remote
      // Process hello from remote.
      processStartFromRemote((ReplServerStartMsg)msg);
      // Duplicate server ?
@@ -233,7 +224,7 @@
      if (!this.sslEncryption)
        session.stopEncryption();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        /*
        Only protocol version above V1 has a phase 2 handshake
@@ -242,7 +233,9 @@
        Send our own TopologyMsg to remote RS
        */
        TopologyMsg outTopoMsg = sendTopoToRemoteRS();
        TopologyMsg outTopoMsg =
            replicationServerDomain.createTopologyMsgForRS();
        sendTopoInfo(outTopoMsg);
        // wait and process Topo from remote RS
        TopologyMsg inTopoMsg = waitAndProcessTopoFromRemoteRS();
@@ -341,8 +334,7 @@
      }
      this.localGenerationId = replicationServerDomain.getGenerationId();
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(protocolVersion);
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
      // log
      logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
@@ -355,7 +347,7 @@
        session.stopEncryption();
      TopologyMsg inTopoMsg = null;
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        /*
        Only protocol version above V1 has a phase 2 handshake
@@ -372,7 +364,9 @@
        }
        // send our own TopologyMsg to remote RS
        TopologyMsg outTopoMsg = sendTopoToRemoteRS();
        TopologyMsg outTopoMsg = replicationServerDomain
            .createTopologyMsgForRS();
        sendTopoInfo(outTopoMsg);
        // log
        logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg);
@@ -476,18 +470,6 @@
  }
  /**
   * Create and send the topologyMsg to the remote replication server.
   * @return the topologyMsg sent.
   */
  private TopologyMsg sendTopoToRemoteRS()
  throws IOException
  {
    TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
    session.publish(outTopoMsg, protocolVersion);
    return outTopoMsg;
  }
  /**
   * Wait receiving the TopologyMsg from the remote RS and process it.
   * @return the topologyMsg received or {@code null} if stop was received.
   * @throws DirectoryException
@@ -528,12 +510,13 @@
    /* Store remote RS weight if it has one.
     * For protocol version < 4, use default value of 1 for weight
     */
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // List should only contain RS info for sender
      RSInfo rsInfo = inTopoMsg.getRsList().get(0);
      weight = rsInfo.getWeight();
    }
    /*
    if the remote RS and the local RS have the same genID
    then it's ok and nothing else to do
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,12 +49,7 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -193,11 +188,6 @@
   * The initial size of the sending window.
   */
  int sendWindowSize;
  /**
   * The protocol version established with the remote server.
   */
  protected short protocolVersion = -1;
  /**
   * remote generation id.
   */
@@ -267,7 +257,6 @@
    super(queueSize, replicationServerURL,
        replicationServerId, replicationServer);
    this.session = session;
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.rcvWindowSizeHalf = rcvWindowSize / 2;
    this.maxRcvWindow = rcvWindowSize;
    this.rcvWindow = rcvWindowSize;
@@ -399,13 +388,24 @@
  /**
   * Sends a message.
   * @param  msg         The message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   * @param msg
   *          The message to be sent.
   * @throws IOException
   *           When it occurs while sending the message,
   */
  public void send(ReplicationMsg msg)
  throws IOException
  public void send(ReplicationMsg msg) throws IOException
  {
    /*
     * Some unit tests include a null domain, so avoid logging anything in that
     * case.
     */
    if (debugEnabled() && replicationServerDomain != null)
    {
      TRACER.debugInfo("In "
          + replicationServerDomain.getReplicationServer()
              .getMonitorInstanceName() + this + " publishes message:\n" + msg);
    }
    session.publish(msg);
  }
@@ -653,7 +653,7 @@
   */
  public short getProtocolVersion()
  {
    return protocolVersion;
    return session.getProtocolVersion();
  }
  /**
@@ -950,68 +950,19 @@
  }
  /**
   * Send an InitializeRequestMessage to the server connected through this
   * handler.
   *
   * @param msg The message to be processed
   * @throws IOException when raised by the underlying session
   */
  public void send(RoutableMsg msg) throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + this +
          " publishes message:\n" + msg);
    // Currently only MonitorMsg has to support a backward compatibility
    if ((msg instanceof MonitorMsg) || (msg instanceof ErrorMsg) ||
        (msg instanceof EntryMsg) || (msg instanceof InitializeRequestMsg) ||
        (msg instanceof InitializeTargetMsg))
    {
      session.publish(msg, protocolVersion);
    } else
    {
      session.publish(msg);
    }
  }
  /**
   * Sends an ack message to the server represented by this object.
   *
   * @param ack The ack message to be sent.
   * @throws IOException In case of Exception thrown sending the ack.
   */
  public void sendAck(AckMsg ack) throws IOException
  {
    session.publish(ack);
  }
  /**
   * Send an ErrorMsg to the peer.
   *
   * @param errorMsg The message to be sent
   * @throws IOException when raised by the underlying session
   */
  public void sendError(ErrorMsg errorMsg) throws IOException
  {
    session.publish(errorMsg);
  }
  /**
   * Sends the provided TopologyMsg to the peer server.
   *
   * @param topoMsg The TopologyMsg message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   * @param topoMsg
   *          The TopologyMsg message to be sent.
   * @throws IOException
   *           When it occurs while sending the message,
   */
  public void sendTopoInfo(TopologyMsg topoMsg)
  throws IOException
  public void sendTopoInfo(TopologyMsg topoMsg) throws IOException
  {
    // V1 Rs do not support the TopologyMsg
    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    if (getProtocolVersion() > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      session.publish(topoMsg, protocolVersion);
      send(topoMsg);
    }
  }
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -58,7 +58,6 @@
  private final ProtocolSession session;
  private final ServerHandler handler;
  private final ReplicationServerDomain replicationServerDomain;
  private final short protocolVersion;
@@ -88,8 +87,6 @@
    this.session = session;
    this.handler = handler;
    this.replicationServerDomain = replicationServerDomain;
    // Keep protocol version locally for efficiency
    this.protocolVersion = handler.getProtocolVersion();
  }
  /**
@@ -194,7 +191,7 @@
        // Publish the update to the remote server using a protocol version he
        // it supports
        session.publish(update, protocolVersion);
        session.publish(update);
      }
    }
    catch (NoSuchElementException e)
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,6 +31,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.server.ReplicationServer.*;
import static org.opends.server.util.StaticUtils.*;
@@ -1259,14 +1260,12 @@
      {
        serverStartMsg = new ServerStartMsg(serverId, url, baseDn,
            maxRcvWindow, heartbeatInterval, state,
            ProtocolVersion.getCurrentVersion(),
            this.getGenerationID(), isSslEncryption, groupId);
      }
      else
      {
        serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
            maxRcvWindow, heartbeatInterval, state,
            ProtocolVersion.getCurrentVersion(),
            this.getGenerationID(), isSslEncryption, groupId);
      }
      localSession.publish(serverStartMsg);
@@ -1299,8 +1298,8 @@
       * replication server will use the same one (or an older one if it is an
       * old replication server).
       */
      final short localProtocolVersion = ProtocolVersion
          .minWithCurrent(replServerInfo.getProtocolVersion());
      final short localProtocolVersion = getCompatibleVersion(replServerInfo
          .getProtocolVersion());
      if (keepConnection)
      {
        protocolVersion = localProtocolVersion;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -473,7 +473,7 @@
        String serverURL = ("localhost:" + port);
        ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
          serverURL, baseDn, windowSize, serverState,
          ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
          generationId, sslEncryption,
          groupId, degradedStatusThreshold);
        session.publish(replServerStartMsg);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -108,7 +108,7 @@
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -158,7 +158,7 @@
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -210,7 +210,7 @@
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -262,7 +262,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -315,7 +315,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -328,7 +328,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -383,7 +383,7 @@
    // This server has less changes than the other one but it has the same
    // group id as us so he should be the winner
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -396,7 +396,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(12, LOOSER1, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, LOOSER1, null, 0, aState, 0L,
      false, (byte)2, 0);
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -449,7 +449,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
      false, (byte)2, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -462,7 +462,7 @@
    cn = new ChangeNumber(2L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, WINNER, null, 0, aState, 0L,
      false, (byte)2, 0);
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -516,7 +516,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -529,7 +529,7 @@
    cn = new ChangeNumber(4L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(12, LOOSER2, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, LOOSER2, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -542,7 +542,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(13, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(13, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -596,7 +596,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -609,7 +609,7 @@
    cn = new ChangeNumber(3L, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(12, LOOSER2, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, LOOSER2, null, 0, aState, 0L,
      false, (byte)2, 0);
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -624,7 +624,7 @@
    // This server has less changes than looser2 but it has the same
    // group id as us so he should be the winner
    replServerStartMsg =
      new ReplServerStartMsg(13, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(13, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -676,7 +676,7 @@
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
@@ -766,7 +766,7 @@
    cn = new ChangeNumber(looser1T3, 0, myId3);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (looser1IsLocal)
@@ -781,7 +781,7 @@
    cn = new ChangeNumber(winnerT3, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(12, WINNER, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (winnerIsLocal)
@@ -796,7 +796,7 @@
    cn = new ChangeNumber(looser2T3, 0, myId3);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, 0L,
      new ReplServerStartMsg(13, LOOSER2, null, 0, aState, 0L,
      false, (byte)1, 0);
    rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (looser2IsLocal)
@@ -867,7 +867,7 @@
    cn = new ChangeNumber(looser1T1, 0, myId1);
    aState.update(cn);
    ReplServerStartMsg replServerStartMsg =
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, (short)0, looser1GenId,
      new ReplServerStartMsg(11, LOOSER1, null, 0, aState, looser1GenId,
      false, looser1GroupId, 0);
    rsInfos.put(11, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (looser1IsLocal)
@@ -878,7 +878,7 @@
    cn = new ChangeNumber(winnerT1, 0, myId1);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(12, WINNER, null, 0, aState, (short)0, winnerGenId,
      new ReplServerStartMsg(12, WINNER, null, 0, aState, winnerGenId,
      false, winnerGroupId, 0);
    rsInfos.put(12, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (winnerIsLocal)
@@ -889,7 +889,7 @@
    cn = new ChangeNumber(looser2T1, 0, myId1);
    aState.update(cn);
    replServerStartMsg =
      new ReplServerStartMsg(13, LOOSER2, null, 0, aState, (short)0, looser2GenId,
      new ReplServerStartMsg(13, LOOSER2, null, 0, aState, looser2GenId,
      false, looser2GroupId, 0);
    rsInfos.put(13, ReplicationServerInfo.newInstance(replServerStartMsg));
    if (looser2IsLocal)
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -65,6 +65,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
import static org.opends.server.replication.protocol.ProtocolVersion.getCurrentVersion;
import static org.opends.server.util.StaticUtils.byteToHex;
import static org.opends.messages.ReplicationMessages.*;
@@ -134,12 +135,9 @@
  public void replServerStartMsgTestVLASTV1(int serverId, String baseDN, int window,
        String url, ServerState state, long genId, byte groupId, int degTh) throws Exception
  {
    // Create VLAST message
    // Create message with no version.
    ReplServerStartMsg msg = new ReplServerStartMsg(serverId,
        url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId, true, groupId, degTh);
    // Check version of message
    assertEquals(msg.getVersion(), REPLICATION_PROTOCOL_VLAST);
        url, baseDN, window, state, genId, true, groupId, degTh);
    // Serialize in V1
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -169,7 +167,7 @@
    newMsg.setDegradedStatusThreshold(degTh);
    // Serialize in VLAST msg
    ReplServerStartMsg vlastMsg = new ReplServerStartMsg(newMsg.getBytes());
    ReplServerStartMsg vlastMsg = new ReplServerStartMsg(newMsg.getBytes(getCurrentVersion()));
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -962,7 +960,7 @@
    assertEquals(msg.getServerId(), serverId);
    assertEquals(msg.getBaseDn(), dn);
    assertEquals(msg.getGroupId(), groupId);
    BigInteger bi = new BigInteger(msg.getBytes());
    BigInteger bi = new BigInteger(msg.getBytes(getCurrentVersion()));
    assertEquals(bi.toString(16), oldPdu);
  }
@@ -1187,7 +1185,7 @@
    newMsg.setMsgId(msgId);
    // Serialize in VLAST
    EntryMsg vlastMsg = new EntryMsg(newMsg.getBytes(),REPLICATION_PROTOCOL_VLAST);
    EntryMsg vlastMsg = new EntryMsg(newMsg.getBytes(getCurrentVersion()),REPLICATION_PROTOCOL_VLAST);
    // Check we retrieve original VLAST message (VLAST fields)
    assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
@@ -1233,7 +1231,7 @@
    newMsg.setCreationTime(creatTime);
    // Serialize in VLAST
    ErrorMsg vlastMsg = new ErrorMsg(newMsg.getBytes(),
    ErrorMsg vlastMsg = new ErrorMsg(newMsg.getBytes(getCurrentVersion()),
        REPLICATION_PROTOCOL_VLAST);
    // Check we retrieve original VLAST message (VLAST fields)
@@ -1284,8 +1282,8 @@
    newMsg.setInitWindow(initWindow);
    // Serialize in VLAST
    InitializeRequestMsg vlastMsg = new InitializeRequestMsg(newMsg.getBytes(),
        REPLICATION_PROTOCOL_VLAST);
    InitializeRequestMsg vlastMsg = new InitializeRequestMsg(
        newMsg.getBytes(getCurrentVersion()), REPLICATION_PROTOCOL_VLAST);
    // Check we retrieve original VLAST message (VLAST fields)
    assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
@@ -1341,8 +1339,8 @@
    newMsg.setInitWindow(initWindow);
    // Serialize in VLAST
    InitializeTargetMsg vlastMsg = new InitializeTargetMsg(newMsg.getBytes(),
        REPLICATION_PROTOCOL_VLAST);
    InitializeTargetMsg vlastMsg = new InitializeTargetMsg(
        newMsg.getBytes(getCurrentVersion()), REPLICATION_PROTOCOL_VLAST);
    // Check we retrieve original VLAST message (VLAST fields)
    assertEquals(msg.getSenderID(), vlastMsg.getSenderID());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -29,6 +29,7 @@
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
import static org.opends.server.replication.protocol.ProtocolVersion.getCurrentVersion;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -735,7 +736,7 @@
    assertEquals(msg1.getFailedServers(), failedServers);
    // Constructor test (with byte[])
    msg2 = new  AckMsg(msg1.getBytes());
    msg2 = new  AckMsg(msg1.getBytes(getCurrentVersion()));
    assertEquals(msg2.getChangeNumber().compareTo(cn), 0);
    assertTrue(msg1.hasTimeout() == msg2.hasTimeout());
    assertTrue(msg1.hasWrongStatus() == msg2.hasWrongStatus());
@@ -743,7 +744,7 @@
    assertEquals(msg1.getFailedServers(), msg2.getFailedServers());
    // Check invalid bytes for constructor
    byte[] b = msg1.getBytes();
    byte[] b = msg1.getBytes(getCurrentVersion());
    b[0] = ReplicationMsg.MSG_TYPE_ADD;
    try
    {
@@ -758,7 +759,7 @@
    // Check that retrieved CN is OK
    msg2 = (AckMsg) ReplicationMsg.generateMsg(
        msg1.getBytes(), ProtocolVersion.getCurrentVersion());
        msg1.getBytes(getCurrentVersion()), getCurrentVersion());
  }
  @Test(enabled=true)
@@ -793,7 +794,7 @@
    assertTrue(delmsg.compareTo(delmsg2)==0);
    // Constructor test (with byte[])
    ECLUpdateMsg msg2 = new ECLUpdateMsg(msg1.getBytes());
    ECLUpdateMsg msg2 = new ECLUpdateMsg(msg1.getBytes(getCurrentVersion()));
    assertTrue(msg2.getCookie().equalsTo(msg2.getCookie()));
    assertTrue(msg2.getCookie().equalsTo(cookie));
    assertTrue(msg2.getServiceId().equalsIgnoreCase(msg1.getServiceId()));
@@ -836,8 +837,8 @@
  {
    ServerStartMsg msg = new ServerStartMsg(
        serverId, "localhost:1234", baseDN, window, window, state,
        ProtocolVersion.getCurrentVersion(), genId, sslEncryption, groupId);
    ServerStartMsg newMsg = new ServerStartMsg(msg.getBytes());
        genId, sslEncryption, groupId);
    ServerStartMsg newMsg = new ServerStartMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getServerURL(), newMsg.getServerURL());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -846,7 +847,7 @@
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
    assertEquals(msg.getServerState().getMaxChangeNumber(1),
        newMsg.getServerState().getMaxChangeNumber(1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
    assertEquals(newMsg.getVersion(), getCurrentVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertTrue(msg.getGroupId() == newMsg.getGroupId());
  }
@@ -879,16 +880,16 @@
         String url, ServerState state, long genId, byte groupId, int degTh) throws Exception
  {
    ReplServerStartMsg msg = new ReplServerStartMsg(serverId,
        url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId,
        url, baseDN, window, state, genId,
        true, groupId, degTh);
    ReplServerStartMsg newMsg = new ReplServerStartMsg(msg.getBytes());
    ReplServerStartMsg newMsg = new ReplServerStartMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getServerURL(), newMsg.getServerURL());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getServerState().getMaxChangeNumber(1),
        newMsg.getServerState().getMaxChangeNumber(1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
    assertEquals(newMsg.getVersion(), getCurrentVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
    assertTrue(msg.getGroupId() == newMsg.getGroupId());
@@ -925,16 +926,16 @@
         int weight, int connectedDSNumber) throws Exception
  {
    ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId,
        url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId,
        url, baseDN, window, state, genId,
        true, groupId, degTh, weight, connectedDSNumber);
    ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes());
    ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getServerURL(), newMsg.getServerURL());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getServerState().getMaxChangeNumber(1),
        newMsg.getServerState().getMaxChangeNumber(1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
    assertEquals(newMsg.getVersion(), getCurrentVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
    assertTrue(msg.getGroupId() == newMsg.getGroupId());
@@ -952,7 +953,7 @@
  public void stopMsgTest() throws Exception
  {
    StopMsg msg = new StopMsg();
    StopMsg newMsg = new StopMsg(msg.getBytes());
    StopMsg newMsg = new StopMsg(msg.getBytes(getCurrentVersion()));
  }
  /**
@@ -963,7 +964,7 @@
  public void windowMsgTest() throws Exception
  {
    WindowMsg msg = new WindowMsg(123);
    WindowMsg newMsg = new WindowMsg(msg.getBytes());
    WindowMsg newMsg = new WindowMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getNumAck(), newMsg.getNumAck());
  }
@@ -976,7 +977,7 @@
  public void windowProbeMsgTest() throws Exception
  {
    WindowProbeMsg msg = new WindowProbeMsg();
    new WindowProbeMsg(msg.getBytes());
    new WindowProbeMsg(msg.getBytes(getCurrentVersion()));
  }
  @DataProvider(name="createTopologyData")
@@ -1071,7 +1072,7 @@
    throws Exception
  {
    TopologyMsg msg = new TopologyMsg(dsList, rsList);
    TopologyMsg newMsg = new TopologyMsg(msg.getBytes(),
    TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()),
        ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getDsList(), newMsg.getDsList());
    assertEquals(msg.getRsList(), newMsg.getRsList());
@@ -1139,7 +1140,7 @@
      assuredMode, safedataLevel);
    msg.setEclIncludes(attrs, attrs);
    StartSessionMsg newMsg =
      new StartSessionMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
      new StartSessionMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
    assertEquals(msg.getStatus(), newMsg.getStatus());
    assertTrue(msg.isAssured() == newMsg.isAssured());
    assertEquals(msg.getAssuredMode(), newMsg.getAssuredMode());
@@ -1170,7 +1171,7 @@
    throws Exception
  {
    ChangeStatusMsg msg = new ChangeStatusMsg(reqStatus, newStatus);
    ChangeStatusMsg newMsg = new ChangeStatusMsg(msg.getBytes());
    ChangeStatusMsg newMsg = new ChangeStatusMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getRequestedStatus(), newMsg.getRequestedStatus());
    assertEquals(msg.getNewStatus(), newMsg.getNewStatus());
  }
@@ -1182,7 +1183,7 @@
  public void heartbeatMsgTest() throws Exception
  {
    HeartbeatMsg msg = new HeartbeatMsg();
    HeartbeatMsg newMsg = new HeartbeatMsg(msg.getBytes());
    HeartbeatMsg newMsg = new HeartbeatMsg(msg.getBytes(getCurrentVersion()));
    assertNotNull(newMsg);
  }
@@ -1193,7 +1194,7 @@
  public void resetGenerationIdMsgTest() throws Exception
  {
    ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657);
    ResetGenerationIdMsg newMsg = new ResetGenerationIdMsg(msg.getBytes());
    ResetGenerationIdMsg newMsg = new ResetGenerationIdMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
  }
@@ -1204,7 +1205,7 @@
  public void monitorRequestMsgTest() throws Exception
  {
    MonitorRequestMsg msg = new MonitorRequestMsg(1,2);
    MonitorRequestMsg newMsg = new MonitorRequestMsg(msg.getBytes());
    MonitorRequestMsg newMsg = new MonitorRequestMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(newMsg.getDestination(), 2);
    assertEquals(newMsg.getSenderID(), 1);
  }
@@ -1251,7 +1252,7 @@
    msg.setServerState(sid2, s2, now+2, true);
    msg.setServerState(sid3, s3, now+3, false);
    byte[] b = msg.getBytes();
    byte[] b = msg.getBytes(getCurrentVersion());
    MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
    assertEquals(rsState, msg.getReplServerDbState());
@@ -1319,7 +1320,7 @@
    int target = 45678;
    byte[] entry = taskInitFromS2.getBytes();
    EntryMsg msg = new EntryMsg(sender, target, entry, 1);
    EntryMsg newMsg = new EntryMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
    EntryMsg newMsg = new EntryMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getEntryBytes(), newMsg.getEntryBytes());
@@ -1335,7 +1336,7 @@
    int target = 56789;
    InitializeRequestMsg msg = new InitializeRequestMsg(
        TEST_ROOT_DN_STRING, sender, target, 100);
    InitializeRequestMsg newMsg = new InitializeRequestMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
    InitializeRequestMsg newMsg = new InitializeRequestMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertTrue(msg.getBaseDn().equals(newMsg.getBaseDn()));
@@ -1355,7 +1356,7 @@
    InitializeTargetMsg msg = new InitializeTargetMsg(
        TEST_ROOT_DN_STRING, senderID, targetID, requestorID, entryCount, initWindow);
    InitializeTargetMsg newMsg = new InitializeTargetMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
    InitializeTargetMsg newMsg = new InitializeTargetMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getInitiatorID(), newMsg.getInitiatorID());
@@ -1377,7 +1378,7 @@
  public void doneMsgTest() throws Exception
  {
    DoneMsg msg = new DoneMsg(1, 2);
    DoneMsg newMsg = new DoneMsg(msg.getBytes());
    DoneMsg newMsg = new DoneMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
  }
@@ -1389,7 +1390,7 @@
  public void errorMsgTest() throws Exception
  {
    ErrorMsg msg = new ErrorMsg(1, 2, Message.raw("details"));
    ErrorMsg newMsg = new ErrorMsg(msg.getBytes(),ProtocolVersion.getCurrentVersion());
    ErrorMsg newMsg = new ErrorMsg(msg.getBytes(getCurrentVersion()),getCurrentVersion());
    assertEquals(msg.getSenderID(), newMsg.getSenderID());
    assertEquals(msg.getDestination(), newMsg.getDestination());
    assertEquals(msg.getMsgID(), newMsg.getMsgID());
@@ -1421,8 +1422,8 @@
  {
    ServerStartECLMsg msg = new ServerStartECLMsg(
        "localhost:1234", window, window, window, window, window, window, state,
        ProtocolVersion.getCurrentVersion(), genId, sslEncryption, groupId);
    ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes());
        genId, sslEncryption, groupId);
    ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes(getCurrentVersion()));
    assertEquals(msg.getServerURL(), newMsg.getServerURL());
    assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay());
    assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue());
@@ -1433,7 +1434,7 @@
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
    assertEquals(msg.getServerState().getMaxChangeNumber(1),
        newMsg.getServerState().getMaxChangeNumber(1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
    assertEquals(newMsg.getVersion(), getCurrentVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertTrue(msg.getGroupId() == newMsg.getGroupId());
  }
@@ -1469,7 +1470,7 @@
    dns.add(dn2);
    msg.setExcludedDNs(dns);
    // create copy
    StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes());
    StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes(getCurrentVersion()));
    // test equality between the two copies
    assertEquals(msg.getChangeNumber(), newMsg.getChangeNumber());
    assertTrue(msg.isPersistent() == newMsg.isPersistent());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -918,7 +918,7 @@
        // Send our repl server start msg
        ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
          fakeUrl, baseDn, 100, serverState,
          ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
          generationId, sslEncryption,
          groupId, 5000);
        session.publish(replServerStartMsg);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -990,7 +990,7 @@
      ServerStartMsg msg =
        new ServerStartMsg( 1723, url, TEST_ROOT_DN_STRING,
            WINDOW, 5000, new ServerState(),
            ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1);
            0, sslEncryption, (byte)-1);
      session.publish(msg);
      // Read the Replication Server state from the ReplServerStartDSMsg that