mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
07.55.2009 99480fcbcb68be6a357f6218668feab697e1a93d
opends/src/server/org/opends/server/backends/SchemaBackend.java
@@ -122,6 +122,7 @@
  private static final String CONFIG_SCHEMA_ELEMENTS_FILE = "02-config.ldif";
  private static final String CORE_SCHEMA_ELEMENTS_FILE = "00-core.ldif";
@@ -4325,11 +4326,13 @@
    {
      String schemaFile = removeType.getSchemaFile();
      if ((schemaFile != null) &&
           (schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)))
           ((schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)) ||
            (schemaFile.equals(CORE_SCHEMA_ELEMENTS_FILE))) )
      {
        // Don't import the file containing the definitiong of the
        // Don't import the file containing the definitions of the
        // Schema elements used for configuration because these
        // definitions may vary between versions of OpenDS.
        // Also never delete anything from the core schema file.
        continue;
      }
      if (!oidList.contains(removeType.getOID()))
@@ -4447,7 +4450,7 @@
      if ((schemaFile != null) &&
          (schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)))
      {
        // Don't import the file containing the definitiong of the
        // Don't import the file containing the definition of the
        // Schema elements used for configuration because these
        // definitions may vary between versions of OpenDS.
        continue;
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -98,7 +98,9 @@
      length = in.length - pos - 1;
      byte[] encodedMsg = new byte[length];
      System.arraycopy(in, pos, encodedMsg, 0, length);
      ReplicationMsg rmsg = ReplicationMsg.generateMsg(encodedMsg);
      ReplicationMsg rmsg =
        ReplicationMsg.generateMsg(
            encodedMsg, ProtocolVersion.getCurrentVersion());
      this.updateMsg = (LDAPUpdateMsg)rmsg;
    }
    catch (UnsupportedEncodingException e)
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -276,25 +276,10 @@
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Using current protocol version should normally not be done as we would
    // normally call the getBytes() method instead for that. So this check
    // for security
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
    if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      return getBytes_V1();
    else
      return getBytes();
    }
    switch (reqProtocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        return getBytes_V1();
      default:
        // Unsupported requested version
        throw new UnsupportedEncodingException(getClass().getSimpleName() +
          " PDU does not support requested protocol version serialization: " +
          reqProtocolVersion);
    }
  }
  /**
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -26,6 +26,7 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -81,7 +82,12 @@
  SubTopoMonitorData data = new SubTopoMonitorData();
  /**
   * Creates a new EntryMessage.
   * The protocolVersion that should be used when serializing this message.
   */
  private final short protocolVersion;
  /**
   * Creates a new MonitorMsg.
   *
   * @param sender The sender of this message.
   * @param destination The destination of this message.
@@ -89,8 +95,25 @@
  public MonitorMsg(short sender, short destination)
  {
    super(sender, destination);
    protocolVersion = ProtocolVersion.getCurrentVersion();
  }
  /**
   * Creates a new MonitorMsg with a specific protocol version.
   *
   * @param sender                The sender of this message.
   * @param destination           The destination of this message.
   * @param replicationProtocol   The protocol version to use.
   */
  public MonitorMsg(short sender, short destination,
      short replicationProtocol)
  {
    super(sender, destination);
    protocolVersion = replicationProtocol;
  }
  /**
   * Sets the state of the replication server.
   * @param state The state.
@@ -174,24 +197,58 @@
  /**
   * Creates a new EntryMessage from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @param in       The byte array containing the encoded form of the message.
   * @param version  The version of the protocol to use to decode the msg.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public MonitorMsg(byte[] in) throws DataFormatException
  public MonitorMsg(byte[] in, short version) throws DataFormatException
  {
    protocolVersion = ProtocolVersion.getCurrentVersion();
    ByteSequenceReader reader = ByteString.wrap(in).asReader();
    /* first byte is the type */
    if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
      throw new DataFormatException("input is not a valid " +
          this.getClass().getCanonicalName());
    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      try
      {
        /* first byte is the type */
        if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
          throw new DataFormatException("input is not a valid " +
              this.getClass().getCanonicalName());
        int pos = 1;
    // sender
    this.senderID = reader.getShort();
        // sender
        int length = getNextLength(in, pos);
        String senderIDString = new String(in, pos, length, "UTF-8");
        this.senderID = Short.valueOf(senderIDString);
        pos += length +1;
    // destination
    this.destination = reader.getShort();
        // destination
        length = getNextLength(in, pos);
        String destinationString = new String(in, pos, length, "UTF-8");
        this.destination = Short.valueOf(destinationString);
        pos += length +1;
        reader.position(pos);
      }
      catch (UnsupportedEncodingException e)
      {
        throw new DataFormatException("UTF-8 is not supported by this jvm.");
      }
    }
    else
    {
      if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      // sender
      this.senderID = reader.getShort();
      // destination
      this.destination = reader.getShort();
    }
    ASN1Reader asn1Reader = ASN1.getReader(reader);
    try
@@ -262,11 +319,14 @@
      ByteStringBuilder byteBuilder = new ByteStringBuilder();
      ASN1Writer writer = ASN1.getWriter(byteBuilder);
      /* put the type of the operation */
      byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        /* put the type of the operation */
        byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
      byteBuilder.append(senderID);
      byteBuilder.append(destination);
        byteBuilder.append(senderID);
        byteBuilder.append(destination);
      }
      /* Put the serverStates ... */
      writer.writeStartSequence();
@@ -331,7 +391,31 @@
      writer.writeEndSequence();
      return byteBuilder.toByteArray();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        return byteBuilder.toByteArray();
      }
      else
      {
        byte[] temp = byteBuilder.toByteArray();
        byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
        byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
        int length = 1 +  1 + senderBytes.length +
        1 + destinationBytes.length + temp.length +1;
        byte[] resultByteArray = new byte[length];
        /* put the type of the operation */
        resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
        int pos = 1;
        pos = addByteArray(senderBytes, resultByteArray, pos);
        pos = addByteArray(destinationBytes, resultByteArray, pos);
        pos = addByteArray(temp, resultByteArray, pos);
        return resultByteArray;
      }
    }
    catch (Exception e)
    {
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -156,4 +156,12 @@
   * on this ProtocolSession.
   */
  public abstract boolean closeInitiated();
  /**
   * This method is called at the establishment of the session and can
   * be used to record the version of the protocol that is currently used.
   *
   * @param version The version of the protocol that is currently used.
   */
  public abstract void setProtocolVersion(short version);
}
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -223,59 +223,70 @@
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
     throws UnsupportedEncodingException
  {
    if  (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      return getBytes_V1();
    /* The ReplServerStartMsg is stored in the form :
     * <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><serverState>
     */
    try {
      byte[] byteDn = baseDn.getBytes("UTF-8");
      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      byte[] byteDegradedStatusThreshold =
        String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 + byteWindowSize.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteDegradedStatusThreshold.length + 1 +
                   byteServerState.length + 1;
    byte[] byteDn = baseDn.getBytes("UTF-8");
    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
    byte[] byteServerState = serverState.getBytes();
    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
    byte[] byteSSLEncryption =
      String.valueOf(sslEncryption).getBytes("UTF-8");
    byte[] byteDegradedStatusThreshold =
      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length);
      int pos = headerLength;
    int length = byteDn.length + 1 + byteServerId.length + 1 +
    byteServerUrl.length + 1 + byteWindowSize.length + 1 +
    byteSSLEncryption.length + 1 +
    byteDegradedStatusThreshold.length + 1 +
    byteServerState.length + 1;
      /* put the baseDN and a terminating 0 */
      pos = addByteArray(byteDn, resultByteArray, pos);
    /* encode the header in a byte[] large enough to also contain the mods */
    byte resultByteArray[] =
      encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
      /* put the ServerId */
      pos = addByteArray(byteServerId, resultByteArray, pos);
    int pos = headerLength;
      /* put the ServerURL */
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
    /* put the baseDN and a terminating 0 */
    pos = addByteArray(byteDn, resultByteArray, pos);
      /* put the window size */
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
    /* put the ServerId */
    pos = addByteArray(byteServerId, resultByteArray, pos);
      /* put the SSL Encryption setting */
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
    /* put the ServerURL */
    pos = addByteArray(byteServerUrl, resultByteArray, pos);
      /* put the degraded status threshold */
      pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the window size */
    pos = addByteArray(byteWindowSize, resultByteArray, pos);
      /* put the ServerState */
      pos = addByteArray(byteServerState, resultByteArray, pos);
    /* put the SSL Encryption setting */
    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    /* put the degraded status threshold */
    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the ServerState */
    pos = addByteArray(byteServerState, resultByteArray, pos);
    return resultByteArray;
  }
  /**
@@ -338,32 +349,6 @@
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Of course, always support current protocol version
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
      return getBytes();
    }
    // Supported older protocol versions
    switch (reqProtocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        return getBytes_V1();
      default:
        // Unsupported requested version
        throw new UnsupportedEncodingException(getClass().getSimpleName() +
          " PDU does not support requested protocol version serialization: " +
          reqProtocolVersion);
    }
  }
  /**
   * Get the byte array representation of this Message. This uses the version
   * 1 of the replication protocol (used for compatibility purpose).
   *
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -125,14 +125,19 @@
   * is done taking into account the various supported replication protocol
   * versions.
   *
   * @param buffer The encode form of the ReplicationMsg.
   * @param buffer    The encode form of the ReplicationMsg.
   * @param version   The version to use to decode the msg.
   *
   * @return The generated SycnhronizationMessage.
   *
   * @throws DataFormatException If the encoded form was not a valid msg.
   * @throws UnsupportedEncodingException If UTF8 is not supported.
   * @throws NotSupportedOldVersionPDUException If the PDU is part of an old
   * protocol version and we do not support it.
   */
  public static ReplicationMsg generateMsg(byte[] buffer)
  public static ReplicationMsg generateMsg(
                byte[] buffer,
                short version)
                throws DataFormatException, UnsupportedEncodingException,
                NotSupportedOldVersionPDUException
  {
@@ -207,7 +212,7 @@
        msg = new MonitorRequestMsg(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_MONITOR:
        msg = new MonitorMsg(buffer);
        msg = new MonitorMsg(buffer, version);
      break;
      case MSG_TYPE_START_SESSION:
        msg = new StartSessionMsg(buffer);
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -291,7 +291,8 @@
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length);
      byte resultByteArray[] = encodeHeader(
          MSG_TYPE_START_ECL, length, ProtocolVersion.getCurrentVersion());
      int pos = headerLength;
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -327,7 +327,8 @@
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length);
      byte resultByteArray[] = encodeHeader(
          MSG_TYPE_SERVER_START, length, ProtocolVersion.getCurrentVersion());
      int pos = headerLength;
      pos = addByteArray(byteDn, resultByteArray, pos);
opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -69,6 +69,7 @@
  private boolean closeInitiated = false;
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  /**
   * Creates a new SocketSession based on the provided socket.
@@ -175,7 +176,7 @@
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      return ReplicationMsg.generateMsg(buffer);
      return ReplicationMsg.generateMsg(buffer, protocolVersion);
    }
    catch (OutOfMemoryError e)
    {
@@ -243,4 +244,12 @@
  {
    return closeInitiated;
  }
  /**
   * {@inheritDoc}
   */
  public void setProtocolVersion(short version)
  {
    protocolVersion = version;
  }
}
opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -75,17 +75,21 @@
   * Encode the header for the start message.
   *
   * @param type The type of the message to create.
   * @param additionalLength additional length needed to encode the remaining
   * @param additionalLength Additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @param protocolVersion  The version to use when encoding the header.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   */
  public byte[] encodeHeader(byte type, int additionalLength)
  public byte[] encodeHeader(
      byte type, int additionalLength,
      short protocolVersion)
  throws UnsupportedEncodingException
  {
    byte[] byteGenerationID =
      String.valueOf(generationId).getBytes("UTF-8");
@@ -101,7 +105,7 @@
    encodedMsg[0] = type;
    /* put the protocol version */
    encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
    encodedMsg[1] = (byte)protocolVersion;
    /* put the generationId */
    int pos = 2;
@@ -192,11 +196,11 @@
    {
      /* then read the version */
      short readVersion = (short)encodedMsg[1];
      if (readVersion != ProtocolVersion.getCurrentVersion())
      if (readVersion < ProtocolVersion.REPLICATION_PROTOCOL_V2)
        throw new DataFormatException("Not a valid message: type is " +
          encodedMsg[0] + " but protocol version byte is " + readVersion +
          " instead of " + ProtocolVersion.getCurrentVersion());
      protocolVersion = ProtocolVersion.getCurrentVersion();
      protocolVersion = readVersion;
      /* read the generationId */
      int pos = 2;
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -71,6 +71,7 @@
  private boolean closeInitiated = false;
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  /**
   * Creates a new TLSSocketSession.
@@ -185,7 +186,7 @@
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      return ReplicationMsg.generateMsg(buffer);
      return ReplicationMsg.generateMsg(buffer, protocolVersion);
    }
    catch (OutOfMemoryError e)
    {
@@ -254,4 +255,12 @@
  {
    return closeInitiated;
  }
  /**
   * {@inheritDoc}
   */
  public void setProtocolVersion(short version)
  {
    protocolVersion = version;
  }
}
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -194,17 +194,8 @@
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Of course, always support current protocol version
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
      return getBytes();
    }
    else
    {
      throw new UnsupportedEncodingException(getClass().getSimpleName() +
          " PDU does not support requested protocol version serialization: " +
          reqProtocolVersion);
    }
    // There was no change since version 2.
    return getBytes();
  }
  /**
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -435,13 +435,13 @@
   */
  public void registerIntoDomain()
  {
    // Alright, connected with new DS: store handler.
    // All-right, connected with new DS: store handler.
    Map<Short, DataServerHandler> connectedDSs =
      replicationServerDomain.getConnectedDSs();
    connectedDSs.put(serverId, this);
    // Tell peer DSs a new DS just connected to us
    // No need to resend topo msg to this just new DS so not null
    // No need to re-send TopologyMsg to this just new DS so not null
    // argument
    replicationServerDomain.buildAndSendTopoInfoToDSs(this);
    // Tell peer RSs a new DS just connected to us
@@ -499,13 +499,13 @@
      ReplServerStartMsg outReplServerStartMsg = null;
      try
      {
        outReplServerStartMsg = sendStartToRemote((short)-1);
        outReplServerStartMsg = sendStartToRemote(protocolVersion);
        // log
        logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
        // The session initiator decides whether to use SSL.
        // Until here session is encrypted then it depends on the negociation
        // Until here session is encrypted then it depends on the negotiation
        if (!sessionInitiatorSSLEncryption)
          session.stopEncryption();
@@ -524,7 +524,7 @@
        // aborted after handshake phase one from a DS that is searching for
        // best suitable RS.
        // don't log a poluting error when connection aborted
        // don't log a polluting error when connection aborted
        // from a DS that wanted only to perform handshake phase 1 in order
        // to determine the best suitable RS:
        // 1) -> ServerStartMsg
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -299,7 +299,8 @@
      lockDomain(true);
      // send start to remote
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1);
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(protocolVersion);
      // log
      logStartHandshakeRCVandSND(inECLStartMsg, outReplServerStartMsg);
opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -30,6 +30,7 @@
import com.sleepycat.je.DatabaseEntry;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -67,6 +68,7 @@
  public static UpdateMsg generateChange(byte[] data)
                                             throws Exception
  {
    return (UpdateMsg) ReplicationMsg.generateMsg(data);
    return (UpdateMsg) ReplicationMsg.generateMsg(
        data, ProtocolVersion.getCurrentVersion());
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -75,6 +75,7 @@
import org.opends.server.replication.protocol.ServerStartECLMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
@@ -299,18 +300,21 @@
        if (msg instanceof ServerStartMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          DataServerHandler handler = new DataServerHandler(session,
              queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteDS((ServerStartMsg)msg);
        }
        else if (msg instanceof ReplServerStartMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          ReplicationServerHandler handler = new ReplicationServerHandler(
              session,queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteRS((ReplServerStartMsg)msg);
        }
        else if (msg instanceof ServerStartECLMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          ECLServerHandler handler = new ECLServerHandler(
              session,queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteServer((ServerStartECLMsg)msg);
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1509,8 +1509,21 @@
        // in the topology.
        if (senderHandler.isDataServer())
        {
          MonitorMsg returnMsg =
          MonitorMsg returnMsg;
          if (senderHandler.getProtocolVersion() >
                             ProtocolVersion.REPLICATION_PROTOCOL_V1)
          {
           returnMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
          }
          else
          {
            returnMsg =
              new MonitorMsg(msg.getDestination(), msg.getsenderID(),
                  ProtocolVersion.REPLICATION_PROTOCOL_V1);
          }
          try
          {
            returnMsg.setReplServerDbState(getDbServerState());
@@ -1555,13 +1568,20 @@
          return;
        }
        MonitorRequestMsg replServerMonitorRequestMsg =
          (MonitorRequestMsg) msg;
        MonitorMsg monitorMsg;
        MonitorMsg monitorMsg =
          new MonitorMsg(
          replServerMonitorRequestMsg.getDestination(),
          replServerMonitorRequestMsg.getsenderID());
        if (senderHandler.getProtocolVersion() >
                                  ProtocolVersion.REPLICATION_PROTOCOL_V1)
        {
          monitorMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
        }
        else
        {
          monitorMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID(),
                ProtocolVersion.REPLICATION_PROTOCOL_V1);
        }
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -85,6 +85,7 @@
    {
      protocolVersion = ProtocolVersion.minWithCurrent(
          inReplServerStartMsg.getVersion());
      session.setProtocolVersion(protocolVersion);
      generationId = inReplServerStartMsg.getGenerationId();
      serverId = inReplServerStartMsg.getServerId();
      serverURL = inReplServerStartMsg.getServerURL();
@@ -163,13 +164,14 @@
    try
    {
      //
      lockDomain(false); // notimeout
      lockDomain(false); // no timeout
      // we are the initiator and decides of the encryption
      boolean sessionInitiatorSSLEncryption = this.initSslEncryption;
      // Send start
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1);
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(ProtocolVersion.getCurrentVersion());
      // Wait answer
      ReplicationMsg msg = session.receive();
@@ -260,20 +262,13 @@
      // lock with timeout
      lockDomain(true);
      short reqVersion = -1;
      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        // We support connection from a V1 RS, send PDU with V1 form
        reqVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
      }
      // send start to remote
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(reqVersion);
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(protocolVersion);
      // log
      logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
      // until here session is encrypted then it depends on the negociation
      // until here session is encrypted then it depends on the negotiation
      // The session initiator decides whether to use SSL.
      if (!sessionInitiatorSSLEncryption)
        session.stopEncryption();
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -1042,10 +1042,7 @@
        replicationServerDomain.
        getReplicationServer().getDegradedStatusThreshold());
    if (requestedProtocolVersion>0)
      session.publish(outReplServerStartMsg, requestedProtocolVersion);
    else
      session.publish(outReplServerStartMsg);
    session.publish(outReplServerStartMsg, requestedProtocolVersion);
    return outReplServerStartMsg;
  }
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -765,9 +765,10 @@
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       */
      if (keepConnection)
        protocolVersion = ProtocolVersion.minWithCurrent(
      protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
      localSession.setProtocolVersion(protocolVersion);
      if (!isSslEncryption)
      {
@@ -926,6 +927,7 @@
      if (keepConnection)
        protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
      localSession.setProtocolVersion(protocolVersion);
      if (!isSslEncryption)
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -236,7 +236,8 @@
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Un-serialize V1 message
    AddMsg newMsg = (AddMsg)ReplicationMsg.generateMsg(v1MsgBytes);
    AddMsg newMsg = (AddMsg)ReplicationMsg.generateMsg(
        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -273,7 +274,8 @@
    newMsg.setSafeDataLevel(safeDataLevel);
    // Serialize in VLAST msg
    AddMsg vlastMsg = (AddMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
    AddMsg vlastMsg = (AddMsg)ReplicationMsg.generateMsg(
        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -351,7 +353,8 @@
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Un-serialize V1 message
    DeleteMsg newMsg = (DeleteMsg)ReplicationMsg.generateMsg(v1MsgBytes);
    DeleteMsg newMsg = (DeleteMsg)ReplicationMsg.generateMsg(
        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -371,7 +374,8 @@
    newMsg.setSafeDataLevel(safeDataLevel);
    // Serialize in VLAST msg
    DeleteMsg vlastMsg = (DeleteMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
    DeleteMsg vlastMsg = (DeleteMsg)ReplicationMsg.generateMsg(
        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -484,7 +488,8 @@
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Un-serialize V1 message
    ModifyMsg newMsg = (ModifyMsg)ReplicationMsg.generateMsg(v1MsgBytes);
    ModifyMsg newMsg = (ModifyMsg)ReplicationMsg.generateMsg(
        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -518,7 +523,8 @@
    newMsg.setSafeDataLevel(safeDataLevel);
    // Serialize in VLAST msg
    ModifyMsg vlastMsg = (ModifyMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
    ModifyMsg vlastMsg = (ModifyMsg)ReplicationMsg.generateMsg(
        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -548,7 +554,7 @@
  @DataProvider(name = "createModifyDnData")
  public Object[][] createModifyDnData() {
    AttributeType type = DirectoryServer.getAttributeType("description");
    Attribute attr1 = Attributes.create("description", "new value");
@@ -579,7 +585,7 @@
      Modification mod = new Modification(ModificationType.ADD, attr);
      mods4.add(mod);
    }
    return new Object[][] {
        {"dc=test,dc=com", "dc=new", "11111111-1111-1111-1111-111111111111", "22222222-2222-2222-2222-222222222222", false, "dc=change", mods1, false, AssuredMode.SAFE_DATA_MODE, (byte)0},
        {"dc=test,dc=com", "dc=new", "33333333-3333-3333-3333-333333333333", "44444444-4444-4444-4444-444444444444", true, "dc=change", mods2, true, AssuredMode.SAFE_READ_MODE, (byte)1},
@@ -632,7 +638,8 @@
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Un-serialize V1 message
    ModifyDNMsg newMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(v1MsgBytes);
    ModifyDNMsg newMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(
        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -672,7 +679,8 @@
    newMsg.setMods(mods);
    // Serialize in VLAST msg
    ModifyDNMsg vlastMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
    ModifyDNMsg vlastMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(
        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -177,8 +177,8 @@
    msg.setAssuredMode(assuredMode);
    msg.setSafeDataLevel(safeDataLevel);
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg
        .generateMsg(msg.getBytes());
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.getCurrentVersion());
    // Test that generated attributes match original attributes.
    assertEquals(generatedMsg.isAssured(), isAssured);
@@ -234,8 +234,8 @@
    assertTrue(msg.getSafeDataLevel() == safeDataLevel);
    // Check equals
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg
        .generateMsg(msg.getBytes());
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    assertFalse(msg.equals(null));
    assertFalse(msg.equals(new Object()));
@@ -298,8 +298,8 @@
      (short) 123, (short) 45);
    op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
    DeleteMsg msg = new DeleteMsg(op);
    DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg
        .generateMsg(msg.getBytes());
    DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.getCurrentVersion());
    assertEquals(msg.toString(), generatedMsg.toString());
@@ -393,7 +393,7 @@
    msg.setSafeDataLevel(safeDataLevel);
    ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg
        .generateMsg(msg.getBytes());
        .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion());
    // Test that generated attributes match original attributes.
    assertEquals(generatedMsg.isAssured(), isAssured);
@@ -466,7 +466,7 @@
    msg.setSafeDataLevel(safeDataLevel);
    AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg
        .getBytes());
        .getBytes(), ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getBytes(), generatedMsg.getBytes());
    assertEquals(msg.toString(), generatedMsg.toString());
@@ -611,7 +611,8 @@
    }
    // Check that retrieved CN is OK
    msg2 = (AckMsg) ReplicationMsg.generateMsg(msg1.getBytes());
    msg2 = (AckMsg) ReplicationMsg.generateMsg(
        msg1.getBytes(), ProtocolVersion.getCurrentVersion());
  }
  @Test()
@@ -1006,7 +1007,7 @@
    msg.setServerState(sid3, s3, now+3, false);
    byte[] b = msg.getBytes();
    MonitorMsg newMsg = new MonitorMsg(b);
    MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
    assertEquals(rsState, msg.getReplServerDbState());
    assertEquals(newMsg.getReplServerDbState().toString(),