mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
07.55.2009 99480fcbcb68be6a357f6218668feab697e1a93d
Fix for 4096 MonitorMsg is not compatible with replication version

This fix restore the compatibility by using either format depending
on the protocol version.
After this fix the trunk is now compatible with 1.0 and 2.0
but not with 1.2 since there is no way to make the difference between the
2 versions of the replication protocol (1.2 and 2.0 both use version 2
of the replication protocol but don't encode MonitorMsg the same way)

This fix also add a trick in the schema backend so that the schema
from 00-core is not replicated.

Gilles
23 files modified
477 ■■■■■ changed files
opends/src/server/org/opends/server/backends/SchemaBackend.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java 4 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java 21 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java 116 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java 113 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/SocketSession.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartMsg.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java 15 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationData.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 34 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 5 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 6 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java 28 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 21 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/SchemaBackend.java
@@ -122,6 +122,7 @@
  private static final String CONFIG_SCHEMA_ELEMENTS_FILE = "02-config.ldif";
  private static final String CORE_SCHEMA_ELEMENTS_FILE = "00-core.ldif";
@@ -4325,11 +4326,13 @@
    {
      String schemaFile = removeType.getSchemaFile();
      if ((schemaFile != null) &&
           (schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)))
           ((schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)) ||
            (schemaFile.equals(CORE_SCHEMA_ELEMENTS_FILE))) )
      {
        // Don't import the file containing the definitiong of the
        // Don't import the file containing the definitions of the
        // Schema elements used for configuration because these
        // definitions may vary between versions of OpenDS.
        // Also never delete anything from the core schema file.
        continue;
      }
      if (!oidList.contains(removeType.getOID()))
@@ -4447,7 +4450,7 @@
      if ((schemaFile != null) &&
          (schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)))
      {
        // Don't import the file containing the definitiong of the
        // Don't import the file containing the definition of the
        // Schema elements used for configuration because these
        // definitions may vary between versions of OpenDS.
        continue;
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -98,7 +98,9 @@
      length = in.length - pos - 1;
      byte[] encodedMsg = new byte[length];
      System.arraycopy(in, pos, encodedMsg, 0, length);
      ReplicationMsg rmsg = ReplicationMsg.generateMsg(encodedMsg);
      ReplicationMsg rmsg =
        ReplicationMsg.generateMsg(
            encodedMsg, ProtocolVersion.getCurrentVersion());
      this.updateMsg = (LDAPUpdateMsg)rmsg;
    }
    catch (UnsupportedEncodingException e)
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -276,25 +276,10 @@
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Using current protocol version should normally not be done as we would
    // normally call the getBytes() method instead for that. So this check
    // for security
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
    if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      return getBytes_V1();
    else
      return getBytes();
    }
    switch (reqProtocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        return getBytes_V1();
      default:
        // Unsupported requested version
        throw new UnsupportedEncodingException(getClass().getSimpleName() +
          " PDU does not support requested protocol version serialization: " +
          reqProtocolVersion);
    }
  }
  /**
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -26,6 +26,7 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -81,7 +82,12 @@
  SubTopoMonitorData data = new SubTopoMonitorData();
  /**
   * Creates a new EntryMessage.
   * The protocolVersion that should be used when serializing this message.
   */
  private final short protocolVersion;
  /**
   * Creates a new MonitorMsg.
   *
   * @param sender The sender of this message.
   * @param destination The destination of this message.
@@ -89,8 +95,25 @@
  public MonitorMsg(short sender, short destination)
  {
    super(sender, destination);
    protocolVersion = ProtocolVersion.getCurrentVersion();
  }
  /**
   * Creates a new MonitorMsg with a specific protocol version.
   *
   * @param sender                The sender of this message.
   * @param destination           The destination of this message.
   * @param replicationProtocol   The protocol version to use.
   */
  public MonitorMsg(short sender, short destination,
      short replicationProtocol)
  {
    super(sender, destination);
    protocolVersion = replicationProtocol;
  }
  /**
   * Sets the state of the replication server.
   * @param state The state.
@@ -174,24 +197,58 @@
  /**
   * Creates a new EntryMessage from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @param in       The byte array containing the encoded form of the message.
   * @param version  The version of the protocol to use to decode the msg.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public MonitorMsg(byte[] in) throws DataFormatException
  public MonitorMsg(byte[] in, short version) throws DataFormatException
  {
    protocolVersion = ProtocolVersion.getCurrentVersion();
    ByteSequenceReader reader = ByteString.wrap(in).asReader();
    /* first byte is the type */
    if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
      throw new DataFormatException("input is not a valid " +
          this.getClass().getCanonicalName());
    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      try
      {
        /* first byte is the type */
        if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
          throw new DataFormatException("input is not a valid " +
              this.getClass().getCanonicalName());
        int pos = 1;
    // sender
    this.senderID = reader.getShort();
        // sender
        int length = getNextLength(in, pos);
        String senderIDString = new String(in, pos, length, "UTF-8");
        this.senderID = Short.valueOf(senderIDString);
        pos += length +1;
    // destination
    this.destination = reader.getShort();
        // destination
        length = getNextLength(in, pos);
        String destinationString = new String(in, pos, length, "UTF-8");
        this.destination = Short.valueOf(destinationString);
        pos += length +1;
        reader.position(pos);
      }
      catch (UnsupportedEncodingException e)
      {
        throw new DataFormatException("UTF-8 is not supported by this jvm.");
      }
    }
    else
    {
      if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      // sender
      this.senderID = reader.getShort();
      // destination
      this.destination = reader.getShort();
    }
    ASN1Reader asn1Reader = ASN1.getReader(reader);
    try
@@ -262,11 +319,14 @@
      ByteStringBuilder byteBuilder = new ByteStringBuilder();
      ASN1Writer writer = ASN1.getWriter(byteBuilder);
      /* put the type of the operation */
      byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        /* put the type of the operation */
        byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
      byteBuilder.append(senderID);
      byteBuilder.append(destination);
        byteBuilder.append(senderID);
        byteBuilder.append(destination);
      }
      /* Put the serverStates ... */
      writer.writeStartSequence();
@@ -331,7 +391,31 @@
      writer.writeEndSequence();
      return byteBuilder.toByteArray();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        return byteBuilder.toByteArray();
      }
      else
      {
        byte[] temp = byteBuilder.toByteArray();
        byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
        byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
        int length = 1 +  1 + senderBytes.length +
        1 + destinationBytes.length + temp.length +1;
        byte[] resultByteArray = new byte[length];
        /* put the type of the operation */
        resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
        int pos = 1;
        pos = addByteArray(senderBytes, resultByteArray, pos);
        pos = addByteArray(destinationBytes, resultByteArray, pos);
        pos = addByteArray(temp, resultByteArray, pos);
        return resultByteArray;
      }
    }
    catch (Exception e)
    {
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -156,4 +156,12 @@
   * on this ProtocolSession.
   */
  public abstract boolean closeInitiated();
  /**
   * This method is called at the establishment of the session and can
   * be used to record the version of the protocol that is currently used.
   *
   * @param version The version of the protocol that is currently used.
   */
  public abstract void setProtocolVersion(short version);
}
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -223,59 +223,70 @@
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
     throws UnsupportedEncodingException
  {
    if  (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      return getBytes_V1();
    /* The ReplServerStartMsg is stored in the form :
     * <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><serverState>
     */
    try {
      byte[] byteDn = baseDn.getBytes("UTF-8");
      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      byte[] byteDegradedStatusThreshold =
        String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 + byteWindowSize.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteDegradedStatusThreshold.length + 1 +
                   byteServerState.length + 1;
    byte[] byteDn = baseDn.getBytes("UTF-8");
    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
    byte[] byteServerState = serverState.getBytes();
    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
    byte[] byteSSLEncryption =
      String.valueOf(sslEncryption).getBytes("UTF-8");
    byte[] byteDegradedStatusThreshold =
      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length);
      int pos = headerLength;
    int length = byteDn.length + 1 + byteServerId.length + 1 +
    byteServerUrl.length + 1 + byteWindowSize.length + 1 +
    byteSSLEncryption.length + 1 +
    byteDegradedStatusThreshold.length + 1 +
    byteServerState.length + 1;
      /* put the baseDN and a terminating 0 */
      pos = addByteArray(byteDn, resultByteArray, pos);
    /* encode the header in a byte[] large enough to also contain the mods */
    byte resultByteArray[] =
      encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
      /* put the ServerId */
      pos = addByteArray(byteServerId, resultByteArray, pos);
    int pos = headerLength;
      /* put the ServerURL */
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
    /* put the baseDN and a terminating 0 */
    pos = addByteArray(byteDn, resultByteArray, pos);
      /* put the window size */
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
    /* put the ServerId */
    pos = addByteArray(byteServerId, resultByteArray, pos);
      /* put the SSL Encryption setting */
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
    /* put the ServerURL */
    pos = addByteArray(byteServerUrl, resultByteArray, pos);
      /* put the degraded status threshold */
      pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the window size */
    pos = addByteArray(byteWindowSize, resultByteArray, pos);
      /* put the ServerState */
      pos = addByteArray(byteServerState, resultByteArray, pos);
    /* put the SSL Encryption setting */
    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    /* put the degraded status threshold */
    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the ServerState */
    pos = addByteArray(byteServerState, resultByteArray, pos);
    return resultByteArray;
  }
  /**
@@ -338,32 +349,6 @@
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Of course, always support current protocol version
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
      return getBytes();
    }
    // Supported older protocol versions
    switch (reqProtocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        return getBytes_V1();
      default:
        // Unsupported requested version
        throw new UnsupportedEncodingException(getClass().getSimpleName() +
          " PDU does not support requested protocol version serialization: " +
          reqProtocolVersion);
    }
  }
  /**
   * Get the byte array representation of this Message. This uses the version
   * 1 of the replication protocol (used for compatibility purpose).
   *
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -125,14 +125,19 @@
   * is done taking into account the various supported replication protocol
   * versions.
   *
   * @param buffer The encode form of the ReplicationMsg.
   * @param buffer    The encode form of the ReplicationMsg.
   * @param version   The version to use to decode the msg.
   *
   * @return The generated SycnhronizationMessage.
   *
   * @throws DataFormatException If the encoded form was not a valid msg.
   * @throws UnsupportedEncodingException If UTF8 is not supported.
   * @throws NotSupportedOldVersionPDUException If the PDU is part of an old
   * protocol version and we do not support it.
   */
  public static ReplicationMsg generateMsg(byte[] buffer)
  public static ReplicationMsg generateMsg(
                byte[] buffer,
                short version)
                throws DataFormatException, UnsupportedEncodingException,
                NotSupportedOldVersionPDUException
  {
@@ -207,7 +212,7 @@
        msg = new MonitorRequestMsg(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_MONITOR:
        msg = new MonitorMsg(buffer);
        msg = new MonitorMsg(buffer, version);
      break;
      case MSG_TYPE_START_SESSION:
        msg = new StartSessionMsg(buffer);
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -291,7 +291,8 @@
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length);
      byte resultByteArray[] = encodeHeader(
          MSG_TYPE_START_ECL, length, ProtocolVersion.getCurrentVersion());
      int pos = headerLength;
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -327,7 +327,8 @@
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length);
      byte resultByteArray[] = encodeHeader(
          MSG_TYPE_SERVER_START, length, ProtocolVersion.getCurrentVersion());
      int pos = headerLength;
      pos = addByteArray(byteDn, resultByteArray, pos);
opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -69,6 +69,7 @@
  private boolean closeInitiated = false;
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  /**
   * Creates a new SocketSession based on the provided socket.
@@ -175,7 +176,7 @@
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      return ReplicationMsg.generateMsg(buffer);
      return ReplicationMsg.generateMsg(buffer, protocolVersion);
    }
    catch (OutOfMemoryError e)
    {
@@ -243,4 +244,12 @@
  {
    return closeInitiated;
  }
  /**
   * {@inheritDoc}
   */
  public void setProtocolVersion(short version)
  {
    protocolVersion = version;
  }
}
opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -75,17 +75,21 @@
   * Encode the header for the start message.
   *
   * @param type The type of the message to create.
   * @param additionalLength additional length needed to encode the remaining
   * @param additionalLength Additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @param protocolVersion  The version to use when encoding the header.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   */
  public byte[] encodeHeader(byte type, int additionalLength)
  public byte[] encodeHeader(
      byte type, int additionalLength,
      short protocolVersion)
  throws UnsupportedEncodingException
  {
    byte[] byteGenerationID =
      String.valueOf(generationId).getBytes("UTF-8");
@@ -101,7 +105,7 @@
    encodedMsg[0] = type;
    /* put the protocol version */
    encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
    encodedMsg[1] = (byte)protocolVersion;
    /* put the generationId */
    int pos = 2;
@@ -192,11 +196,11 @@
    {
      /* then read the version */
      short readVersion = (short)encodedMsg[1];
      if (readVersion != ProtocolVersion.getCurrentVersion())
      if (readVersion < ProtocolVersion.REPLICATION_PROTOCOL_V2)
        throw new DataFormatException("Not a valid message: type is " +
          encodedMsg[0] + " but protocol version byte is " + readVersion +
          " instead of " + ProtocolVersion.getCurrentVersion());
      protocolVersion = ProtocolVersion.getCurrentVersion();
      protocolVersion = readVersion;
      /* read the generationId */
      int pos = 2;
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -71,6 +71,7 @@
  private boolean closeInitiated = false;
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  /**
   * Creates a new TLSSocketSession.
@@ -185,7 +186,7 @@
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      return ReplicationMsg.generateMsg(buffer);
      return ReplicationMsg.generateMsg(buffer, protocolVersion);
    }
    catch (OutOfMemoryError e)
    {
@@ -254,4 +255,12 @@
  {
    return closeInitiated;
  }
  /**
   * {@inheritDoc}
   */
  public void setProtocolVersion(short version)
  {
    protocolVersion = version;
  }
}
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -194,17 +194,8 @@
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  {
    // Of course, always support current protocol version
    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
    {
      return getBytes();
    }
    else
    {
      throw new UnsupportedEncodingException(getClass().getSimpleName() +
          " PDU does not support requested protocol version serialization: " +
          reqProtocolVersion);
    }
    // There was no change since version 2.
    return getBytes();
  }
  /**
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -435,13 +435,13 @@
   */
  public void registerIntoDomain()
  {
    // Alright, connected with new DS: store handler.
    // All-right, connected with new DS: store handler.
    Map<Short, DataServerHandler> connectedDSs =
      replicationServerDomain.getConnectedDSs();
    connectedDSs.put(serverId, this);
    // Tell peer DSs a new DS just connected to us
    // No need to resend topo msg to this just new DS so not null
    // No need to re-send TopologyMsg to this just new DS so not null
    // argument
    replicationServerDomain.buildAndSendTopoInfoToDSs(this);
    // Tell peer RSs a new DS just connected to us
@@ -499,13 +499,13 @@
      ReplServerStartMsg outReplServerStartMsg = null;
      try
      {
        outReplServerStartMsg = sendStartToRemote((short)-1);
        outReplServerStartMsg = sendStartToRemote(protocolVersion);
        // log
        logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
        // The session initiator decides whether to use SSL.
        // Until here session is encrypted then it depends on the negociation
        // Until here session is encrypted then it depends on the negotiation
        if (!sessionInitiatorSSLEncryption)
          session.stopEncryption();
@@ -524,7 +524,7 @@
        // aborted after handshake phase one from a DS that is searching for
        // best suitable RS.
        // don't log a poluting error when connection aborted
        // don't log a polluting error when connection aborted
        // from a DS that wanted only to perform handshake phase 1 in order
        // to determine the best suitable RS:
        // 1) -> ServerStartMsg
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -299,7 +299,8 @@
      lockDomain(true);
      // send start to remote
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1);
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(protocolVersion);
      // log
      logStartHandshakeRCVandSND(inECLStartMsg, outReplServerStartMsg);
opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -30,6 +30,7 @@
import com.sleepycat.je.DatabaseEntry;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -67,6 +68,7 @@
  public static UpdateMsg generateChange(byte[] data)
                                             throws Exception
  {
    return (UpdateMsg) ReplicationMsg.generateMsg(data);
    return (UpdateMsg) ReplicationMsg.generateMsg(
        data, ProtocolVersion.getCurrentVersion());
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -75,6 +75,7 @@
import org.opends.server.replication.protocol.ServerStartECLMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
@@ -299,18 +300,21 @@
        if (msg instanceof ServerStartMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          DataServerHandler handler = new DataServerHandler(session,
              queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteDS((ServerStartMsg)msg);
        }
        else if (msg instanceof ReplServerStartMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          ReplicationServerHandler handler = new ReplicationServerHandler(
              session,queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteRS((ReplServerStartMsg)msg);
        }
        else if (msg instanceof ServerStartECLMsg)
        {
          session.setProtocolVersion(((StartMsg)msg).getVersion());
          ECLServerHandler handler = new ECLServerHandler(
              session,queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteServer((ServerStartECLMsg)msg);
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1509,8 +1509,21 @@
        // in the topology.
        if (senderHandler.isDataServer())
        {
          MonitorMsg returnMsg =
          MonitorMsg returnMsg;
          if (senderHandler.getProtocolVersion() >
                             ProtocolVersion.REPLICATION_PROTOCOL_V1)
          {
           returnMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
          }
          else
          {
            returnMsg =
              new MonitorMsg(msg.getDestination(), msg.getsenderID(),
                  ProtocolVersion.REPLICATION_PROTOCOL_V1);
          }
          try
          {
            returnMsg.setReplServerDbState(getDbServerState());
@@ -1555,13 +1568,20 @@
          return;
        }
        MonitorRequestMsg replServerMonitorRequestMsg =
          (MonitorRequestMsg) msg;
        MonitorMsg monitorMsg;
        MonitorMsg monitorMsg =
          new MonitorMsg(
          replServerMonitorRequestMsg.getDestination(),
          replServerMonitorRequestMsg.getsenderID());
        if (senderHandler.getProtocolVersion() >
                                  ProtocolVersion.REPLICATION_PROTOCOL_V1)
        {
          monitorMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
        }
        else
        {
          monitorMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID(),
                ProtocolVersion.REPLICATION_PROTOCOL_V1);
        }
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -85,6 +85,7 @@
    {
      protocolVersion = ProtocolVersion.minWithCurrent(
          inReplServerStartMsg.getVersion());
      session.setProtocolVersion(protocolVersion);
      generationId = inReplServerStartMsg.getGenerationId();
      serverId = inReplServerStartMsg.getServerId();
      serverURL = inReplServerStartMsg.getServerURL();
@@ -163,13 +164,14 @@
    try
    {
      //
      lockDomain(false); // notimeout
      lockDomain(false); // no timeout
      // we are the initiator and decides of the encryption
      boolean sessionInitiatorSSLEncryption = this.initSslEncryption;
      // Send start
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1);
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(ProtocolVersion.getCurrentVersion());
      // Wait answer
      ReplicationMsg msg = session.receive();
@@ -260,20 +262,13 @@
      // lock with timeout
      lockDomain(true);
      short reqVersion = -1;
      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        // We support connection from a V1 RS, send PDU with V1 form
        reqVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
      }
      // send start to remote
      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(reqVersion);
      ReplServerStartMsg outReplServerStartMsg =
        sendStartToRemote(protocolVersion);
      // log
      logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
      // until here session is encrypted then it depends on the negociation
      // until here session is encrypted then it depends on the negotiation
      // The session initiator decides whether to use SSL.
      if (!sessionInitiatorSSLEncryption)
        session.stopEncryption();
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -1042,10 +1042,7 @@
        replicationServerDomain.
        getReplicationServer().getDegradedStatusThreshold());
    if (requestedProtocolVersion>0)
      session.publish(outReplServerStartMsg, requestedProtocolVersion);
    else
      session.publish(outReplServerStartMsg);
    session.publish(outReplServerStartMsg, requestedProtocolVersion);
    return outReplServerStartMsg;
  }
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -765,9 +765,10 @@
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       */
      if (keepConnection)
        protocolVersion = ProtocolVersion.minWithCurrent(
      protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
      localSession.setProtocolVersion(protocolVersion);
      if (!isSslEncryption)
      {
@@ -926,6 +927,7 @@
      if (keepConnection)
        protocolVersion = ProtocolVersion.minWithCurrent(
          replServerStartMsg.getVersion());
      localSession.setProtocolVersion(protocolVersion);
      if (!isSslEncryption)
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -236,7 +236,8 @@
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Un-serialize V1 message
    AddMsg newMsg = (AddMsg)ReplicationMsg.generateMsg(v1MsgBytes);
    AddMsg newMsg = (AddMsg)ReplicationMsg.generateMsg(
        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -273,7 +274,8 @@
    newMsg.setSafeDataLevel(safeDataLevel);
    // Serialize in VLAST msg
    AddMsg vlastMsg = (AddMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
    AddMsg vlastMsg = (AddMsg)ReplicationMsg.generateMsg(
        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -351,7 +353,8 @@
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Un-serialize V1 message
    DeleteMsg newMsg = (DeleteMsg)ReplicationMsg.generateMsg(v1MsgBytes);
    DeleteMsg newMsg = (DeleteMsg)ReplicationMsg.generateMsg(
        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -371,7 +374,8 @@
    newMsg.setSafeDataLevel(safeDataLevel);
    // Serialize in VLAST msg
    DeleteMsg vlastMsg = (DeleteMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
    DeleteMsg vlastMsg = (DeleteMsg)ReplicationMsg.generateMsg(
        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -484,7 +488,8 @@
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Un-serialize V1 message
    ModifyMsg newMsg = (ModifyMsg)ReplicationMsg.generateMsg(v1MsgBytes);
    ModifyMsg newMsg = (ModifyMsg)ReplicationMsg.generateMsg(
        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -518,7 +523,8 @@
    newMsg.setSafeDataLevel(safeDataLevel);
    // Serialize in VLAST msg
    ModifyMsg vlastMsg = (ModifyMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
    ModifyMsg vlastMsg = (ModifyMsg)ReplicationMsg.generateMsg(
        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -548,7 +554,7 @@
  @DataProvider(name = "createModifyDnData")
  public Object[][] createModifyDnData() {
    AttributeType type = DirectoryServer.getAttributeType("description");
    Attribute attr1 = Attributes.create("description", "new value");
@@ -579,7 +585,7 @@
      Modification mod = new Modification(ModificationType.ADD, attr);
      mods4.add(mod);
    }
    return new Object[][] {
        {"dc=test,dc=com", "dc=new", "11111111-1111-1111-1111-111111111111", "22222222-2222-2222-2222-222222222222", false, "dc=change", mods1, false, AssuredMode.SAFE_DATA_MODE, (byte)0},
        {"dc=test,dc=com", "dc=new", "33333333-3333-3333-3333-333333333333", "44444444-4444-4444-4444-444444444444", true, "dc=change", mods2, true, AssuredMode.SAFE_READ_MODE, (byte)1},
@@ -632,7 +638,8 @@
    byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Un-serialize V1 message
    ModifyDNMsg newMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(v1MsgBytes);
    ModifyDNMsg newMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(
        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -672,7 +679,8 @@
    newMsg.setMods(mods);
    // Serialize in VLAST msg
    ModifyDNMsg vlastMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
    ModifyDNMsg vlastMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(
        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    // Check original version of message
    assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -177,8 +177,8 @@
    msg.setAssuredMode(assuredMode);
    msg.setSafeDataLevel(safeDataLevel);
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg
        .generateMsg(msg.getBytes());
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.getCurrentVersion());
    // Test that generated attributes match original attributes.
    assertEquals(generatedMsg.isAssured(), isAssured);
@@ -234,8 +234,8 @@
    assertTrue(msg.getSafeDataLevel() == safeDataLevel);
    // Check equals
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg
        .generateMsg(msg.getBytes());
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
    assertFalse(msg.equals(null));
    assertFalse(msg.equals(new Object()));
@@ -298,8 +298,8 @@
      (short) 123, (short) 45);
    op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
    DeleteMsg msg = new DeleteMsg(op);
    DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg
        .generateMsg(msg.getBytes());
    DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.getCurrentVersion());
    assertEquals(msg.toString(), generatedMsg.toString());
@@ -393,7 +393,7 @@
    msg.setSafeDataLevel(safeDataLevel);
    ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg
        .generateMsg(msg.getBytes());
        .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion());
    // Test that generated attributes match original attributes.
    assertEquals(generatedMsg.isAssured(), isAssured);
@@ -466,7 +466,7 @@
    msg.setSafeDataLevel(safeDataLevel);
    AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg
        .getBytes());
        .getBytes(), ProtocolVersion.getCurrentVersion());
    assertEquals(msg.getBytes(), generatedMsg.getBytes());
    assertEquals(msg.toString(), generatedMsg.toString());
@@ -611,7 +611,8 @@
    }
    // Check that retrieved CN is OK
    msg2 = (AckMsg) ReplicationMsg.generateMsg(msg1.getBytes());
    msg2 = (AckMsg) ReplicationMsg.generateMsg(
        msg1.getBytes(), ProtocolVersion.getCurrentVersion());
  }
  @Test()
@@ -1006,7 +1007,7 @@
    msg.setServerState(sid3, s3, now+3, false);
    byte[] b = msg.getBytes();
    MonitorMsg newMsg = new MonitorMsg(b);
    MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
    assertEquals(rsState, msg.getReplServerDbState());
    assertEquals(newMsg.getReplServerDbState().toString(),