From 99480fcbcb68be6a357f6218668feab697e1a93d Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 07 Jul 2009 14:55:26 +0000
Subject: [PATCH] Fix for 4096 MonitorMsg is not compatible with replication version

---
 opends/src/server/org/opends/server/replication/protocol/SocketSession.java                                     |   11 +
 opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java                            |   19 -
 opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java                                    |    3 
 opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java                                 |    3 
 opends/src/server/org/opends/server/replication/protocol/StartMsg.java                                          |   16 +
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java |   28 ++-
 opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java                                   |    8 +
 opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java                                     |   21 --
 opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java                                    |   11 +
 opends/src/server/org/opends/server/replication/server/ReplicationData.java                                     |    6 
 opends/src/server/org/opends/server/replication/server/DataServerHandler.java                                   |   10 
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                   |    4 
 opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java                                         |   15 -
 opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java                                        |  116 ++++++++++++--
 opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java                                      |    4 
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                                    |    3 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java    |   21 +-
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                                       |    5 
 opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java                                  |   11 +
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                  |    6 
 opends/src/server/org/opends/server/backends/SchemaBackend.java                                                 |    9 
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                             |   34 +++
 opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java                                |  113 ++++++--------
 23 files changed, 297 insertions(+), 180 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/SchemaBackend.java b/opends/src/server/org/opends/server/backends/SchemaBackend.java
index 429c36d..056da63 100644
--- a/opends/src/server/org/opends/server/backends/SchemaBackend.java
+++ b/opends/src/server/org/opends/server/backends/SchemaBackend.java
@@ -122,6 +122,7 @@
 
 
   private static final String CONFIG_SCHEMA_ELEMENTS_FILE = "02-config.ldif";
+  private static final String CORE_SCHEMA_ELEMENTS_FILE = "00-core.ldif";
 
 
 
@@ -4325,11 +4326,13 @@
     {
       String schemaFile = removeType.getSchemaFile();
       if ((schemaFile != null) &&
-           (schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)))
+           ((schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)) ||
+            (schemaFile.equals(CORE_SCHEMA_ELEMENTS_FILE))) )
       {
-        // Don't import the file containing the definitiong of the
+        // Don't import the file containing the definitions of the
         // Schema elements used for configuration because these
         // definitions may vary between versions of OpenDS.
+        // Also never delete anything from the core schema file.
         continue;
       }
       if (!oidList.contains(removeType.getOID()))
@@ -4447,7 +4450,7 @@
       if ((schemaFile != null) &&
           (schemaFile.equals(CONFIG_SCHEMA_ELEMENTS_FILE)))
       {
-        // Don't import the file containing the definitiong of the
+        // Don't import the file containing the definition of the
         // Schema elements used for configuration because these
         // definitions may vary between versions of OpenDS.
         continue;
diff --git a/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
index 0e8e8b5..590d83d 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -98,7 +98,9 @@
       length = in.length - pos - 1;
       byte[] encodedMsg = new byte[length];
       System.arraycopy(in, pos, encodedMsg, 0, length);
-      ReplicationMsg rmsg = ReplicationMsg.generateMsg(encodedMsg);
+      ReplicationMsg rmsg =
+        ReplicationMsg.generateMsg(
+            encodedMsg, ProtocolVersion.getCurrentVersion());
       this.updateMsg = (LDAPUpdateMsg)rmsg;
     }
     catch (UnsupportedEncodingException e)
diff --git a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
index 66624aa..3927cfd 100644
--- a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -276,25 +276,10 @@
   public byte[] getBytes(short reqProtocolVersion)
     throws UnsupportedEncodingException
   {
-
-    // Using current protocol version should normally not be done as we would
-    // normally call the getBytes() method instead for that. So this check
-    // for security
-    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
-    {
+    if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+      return getBytes_V1();
+    else
       return getBytes();
-    }
-
-    switch (reqProtocolVersion)
-    {
-      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
-        return getBytes_V1();
-      default:
-        // Unsupported requested version
-        throw new UnsupportedEncodingException(getClass().getSimpleName() +
-          " PDU does not support requested protocol version serialization: " +
-          reqProtocolVersion);
-    }
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 94161aa..a70b5b2 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -26,6 +26,7 @@
  */
 package org.opends.server.replication.protocol;
 
+import java.io.UnsupportedEncodingException;
 import java.util.zip.DataFormatException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -81,7 +82,12 @@
   SubTopoMonitorData data = new SubTopoMonitorData();
 
   /**
-   * Creates a new EntryMessage.
+   * The protocolVersion that should be used when serializing this message.
+   */
+  private final short protocolVersion;
+
+  /**
+   * Creates a new MonitorMsg.
    *
    * @param sender The sender of this message.
    * @param destination The destination of this message.
@@ -89,8 +95,25 @@
   public MonitorMsg(short sender, short destination)
   {
     super(sender, destination);
+    protocolVersion = ProtocolVersion.getCurrentVersion();
   }
 
+
+  /**
+   * Creates a new MonitorMsg with a specific protocol version.
+   *
+   * @param sender                The sender of this message.
+   * @param destination           The destination of this message.
+   * @param replicationProtocol   The protocol version to use.
+   */
+  public MonitorMsg(short sender, short destination,
+      short replicationProtocol)
+  {
+    super(sender, destination);
+    protocolVersion = replicationProtocol;
+  }
+
+
   /**
    * Sets the state of the replication server.
    * @param state The state.
@@ -174,24 +197,58 @@
   /**
    * Creates a new EntryMessage from its encoded form.
    *
-   * @param in The byte array containing the encoded form of the message.
+   * @param in       The byte array containing the encoded form of the message.
+   * @param version  The version of the protocol to use to decode the msg.
    * @throws DataFormatException If the byte array does not contain a valid
    *                             encoded form of the ServerStartMessage.
    */
-  public MonitorMsg(byte[] in) throws DataFormatException
+  public MonitorMsg(byte[] in, short version) throws DataFormatException
   {
+    protocolVersion = ProtocolVersion.getCurrentVersion();
     ByteSequenceReader reader = ByteString.wrap(in).asReader();
 
-    /* first byte is the type */
-    if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
-      throw new DataFormatException("input is not a valid " +
-          this.getClass().getCanonicalName());
+    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+    {
+      try
+      {
+        /* first byte is the type */
+        if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
+          throw new DataFormatException("input is not a valid " +
+              this.getClass().getCanonicalName());
+        int pos = 1;
 
-    // sender
-    this.senderID = reader.getShort();
+        // sender
+        int length = getNextLength(in, pos);
+        String senderIDString = new String(in, pos, length, "UTF-8");
+        this.senderID = Short.valueOf(senderIDString);
+        pos += length +1;
 
-    // destination
-    this.destination = reader.getShort();
+        // destination
+        length = getNextLength(in, pos);
+        String destinationString = new String(in, pos, length, "UTF-8");
+        this.destination = Short.valueOf(destinationString);
+        pos += length +1;
+
+        reader.position(pos);
+      }
+      catch (UnsupportedEncodingException e)
+      {
+        throw new DataFormatException("UTF-8 is not supported by this jvm.");
+      }
+    }
+    else
+    {
+      if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
+        throw new DataFormatException("input is not a valid " +
+            this.getClass().getCanonicalName());
+
+      // sender
+      this.senderID = reader.getShort();
+
+      // destination
+      this.destination = reader.getShort();
+    }
+
 
     ASN1Reader asn1Reader = ASN1.getReader(reader);
     try
@@ -262,11 +319,14 @@
       ByteStringBuilder byteBuilder = new ByteStringBuilder();
       ASN1Writer writer = ASN1.getWriter(byteBuilder);
 
-      /* put the type of the operation */
-      byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
+      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+      {
+        /* put the type of the operation */
+        byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
 
-      byteBuilder.append(senderID);
-      byteBuilder.append(destination);
+        byteBuilder.append(senderID);
+        byteBuilder.append(destination);
+      }
 
       /* Put the serverStates ... */
       writer.writeStartSequence();
@@ -331,7 +391,31 @@
 
       writer.writeEndSequence();
 
-      return byteBuilder.toByteArray();
+      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+      {
+        return byteBuilder.toByteArray();
+      }
+      else
+      {
+        byte[] temp = byteBuilder.toByteArray();
+
+        byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+        byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+
+        int length = 1 +  1 + senderBytes.length +
+        1 + destinationBytes.length + temp.length +1;
+        byte[] resultByteArray = new byte[length];
+
+        /* put the type of the operation */
+        resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
+        int pos = 1;
+
+        pos = addByteArray(senderBytes, resultByteArray, pos);
+        pos = addByteArray(destinationBytes, resultByteArray, pos);
+        pos = addByteArray(temp, resultByteArray, pos);
+
+        return resultByteArray;
+      }
     }
     catch (Exception e)
     {
diff --git a/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java b/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
index 6aace54..4825445 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -156,4 +156,12 @@
    * on this ProtocolSession.
    */
   public abstract boolean closeInitiated();
+
+  /**
+   * This method is called at the establishment of the session and can
+   * be used to record the version of the protocol that is currently used.
+   *
+   * @param version The version of the protocol that is currently used.
+   */
+  public abstract void setProtocolVersion(short version);
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
index 0201146..7127bd5 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -223,59 +223,70 @@
    */
   @Override
   public byte[] getBytes()
+  throws UnsupportedEncodingException
   {
+    return getBytes(ProtocolVersion.getCurrentVersion());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes(short protocolVersion)
+     throws UnsupportedEncodingException
+  {
+    if  (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+      return getBytes_V1();
+
     /* The ReplServerStartMsg is stored in the form :
      * <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
      * <degradedStatusThreshold><serverState>
      */
-    try {
-      byte[] byteDn = baseDn.getBytes("UTF-8");
-      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
-      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
-      byte[] byteServerState = serverState.getBytes();
-      byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
-      byte[] byteSSLEncryption =
-                     String.valueOf(sslEncryption).getBytes("UTF-8");
-      byte[] byteDegradedStatusThreshold =
-        String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
 
-      int length = byteDn.length + 1 + byteServerId.length + 1 +
-                   byteServerUrl.length + 1 + byteWindowSize.length + 1 +
-                   byteSSLEncryption.length + 1 +
-                   byteDegradedStatusThreshold.length + 1 +
-                   byteServerState.length + 1;
+    byte[] byteDn = baseDn.getBytes("UTF-8");
+    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
+    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
+    byte[] byteServerState = serverState.getBytes();
+    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
+    byte[] byteSSLEncryption =
+      String.valueOf(sslEncryption).getBytes("UTF-8");
+    byte[] byteDegradedStatusThreshold =
+      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
 
-      /* encode the header in a byte[] large enough to also contain the mods */
-      byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length);
-      int pos = headerLength;
+    int length = byteDn.length + 1 + byteServerId.length + 1 +
+    byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+    byteSSLEncryption.length + 1 +
+    byteDegradedStatusThreshold.length + 1 +
+    byteServerState.length + 1;
 
-      /* put the baseDN and a terminating 0 */
-      pos = addByteArray(byteDn, resultByteArray, pos);
+    /* encode the header in a byte[] large enough to also contain the mods */
+    byte resultByteArray[] =
+      encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
 
-      /* put the ServerId */
-      pos = addByteArray(byteServerId, resultByteArray, pos);
+    int pos = headerLength;
 
-      /* put the ServerURL */
-      pos = addByteArray(byteServerUrl, resultByteArray, pos);
+    /* put the baseDN and a terminating 0 */
+    pos = addByteArray(byteDn, resultByteArray, pos);
 
-      /* put the window size */
-      pos = addByteArray(byteWindowSize, resultByteArray, pos);
+    /* put the ServerId */
+    pos = addByteArray(byteServerId, resultByteArray, pos);
 
-      /* put the SSL Encryption setting */
-      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
+    /* put the ServerURL */
+    pos = addByteArray(byteServerUrl, resultByteArray, pos);
 
-      /* put the degraded status threshold */
-      pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
+    /* put the window size */
+    pos = addByteArray(byteWindowSize, resultByteArray, pos);
 
-      /* put the ServerState */
-      pos = addByteArray(byteServerState, resultByteArray, pos);
+    /* put the SSL Encryption setting */
+    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
 
-      return resultByteArray;
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      return null;
-    }
+    /* put the degraded status threshold */
+    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
+
+    /* put the ServerState */
+    pos = addByteArray(byteServerState, resultByteArray, pos);
+
+    return resultByteArray;
   }
 
   /**
@@ -338,32 +349,6 @@
   }
 
   /**
-   * {@inheritDoc}
-   */
-  @Override
-  public byte[] getBytes(short reqProtocolVersion)
-    throws UnsupportedEncodingException
-  {
-    // Of course, always support current protocol version
-    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
-    {
-      return getBytes();
-    }
-
-    // Supported older protocol versions
-    switch (reqProtocolVersion)
-    {
-      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
-        return getBytes_V1();
-      default:
-        // Unsupported requested version
-        throw new UnsupportedEncodingException(getClass().getSimpleName() +
-          " PDU does not support requested protocol version serialization: " +
-          reqProtocolVersion);
-    }
-  }
-
-  /**
    * Get the byte array representation of this Message. This uses the version
    * 1 of the replication protocol (used for compatibility purpose).
    *
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 3f70856..5ef9135 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -125,14 +125,19 @@
    * is done taking into account the various supported replication protocol
    * versions.
    *
-   * @param buffer The encode form of the ReplicationMsg.
+   * @param buffer    The encode form of the ReplicationMsg.
+   * @param version   The version to use to decode the msg.
+   *
    * @return The generated SycnhronizationMessage.
+   *
    * @throws DataFormatException If the encoded form was not a valid msg.
    * @throws UnsupportedEncodingException If UTF8 is not supported.
    * @throws NotSupportedOldVersionPDUException If the PDU is part of an old
    * protocol version and we do not support it.
    */
-  public static ReplicationMsg generateMsg(byte[] buffer)
+  public static ReplicationMsg generateMsg(
+                byte[] buffer,
+                short version)
                 throws DataFormatException, UnsupportedEncodingException,
                 NotSupportedOldVersionPDUException
   {
@@ -207,7 +212,7 @@
         msg = new MonitorRequestMsg(buffer);
       break;
       case MSG_TYPE_REPL_SERVER_MONITOR:
-        msg = new MonitorMsg(buffer);
+        msg = new MonitorMsg(buffer, version);
       break;
       case MSG_TYPE_START_SESSION:
         msg = new StartSessionMsg(buffer);
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
index 366ff2d..3081c12 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -291,7 +291,8 @@
                    byteServerState.length + 1;
 
       /* encode the header in a byte[] large enough to also contain the mods */
-      byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length);
+      byte resultByteArray[] = encodeHeader(
+          MSG_TYPE_START_ECL, length, ProtocolVersion.getCurrentVersion());
       int pos = headerLength;
 
       pos = addByteArray(byteServerUrl, resultByteArray, pos);
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
index 16a325b..e4991cb 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -327,7 +327,8 @@
                    byteServerState.length + 1;
 
       /* encode the header in a byte[] large enough to also contain the mods */
-      byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length);
+      byte resultByteArray[] = encodeHeader(
+          MSG_TYPE_SERVER_START, length, ProtocolVersion.getCurrentVersion());
       int pos = headerLength;
 
       pos = addByteArray(byteDn, resultByteArray, pos);
diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index e20bffa..0631618 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -69,6 +69,7 @@
 
   private boolean closeInitiated = false;
 
+  private short protocolVersion = ProtocolVersion.getCurrentVersion();
 
   /**
    * Creates a new SocketSession based on the provided socket.
@@ -175,7 +176,7 @@
       /* We do not want the heartbeat to close the session when */
       /* we are processing a message even a time consuming one. */
       lastReceiveTime=0;
-      return ReplicationMsg.generateMsg(buffer);
+      return ReplicationMsg.generateMsg(buffer, protocolVersion);
     }
     catch (OutOfMemoryError e)
     {
@@ -243,4 +244,12 @@
   {
     return closeInitiated;
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void setProtocolVersion(short version)
+  {
+    protocolVersion = version;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartMsg.java b/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
index aa352cb..2ec19b7 100644
--- a/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -75,17 +75,21 @@
    * Encode the header for the start message.
    *
    * @param type The type of the message to create.
-   * @param additionalLength additional length needed to encode the remaining
+   * @param additionalLength Additional length needed to encode the remaining
    *                         part of the UpdateMessage.
+   * @param protocolVersion  The version to use when encoding the header.
    * @return a byte array containing the common header and enough space to
    *         encode the remaining bytes of the UpdateMessage as was specified
    *         by the additionalLength.
    *         (byte array length = common header length + additionalLength)
    * @throws UnsupportedEncodingException if UTF-8 is not supported.
    */
-  public byte[] encodeHeader(byte type, int additionalLength)
+  public byte[] encodeHeader(
+      byte type, int additionalLength,
+      short protocolVersion)
   throws UnsupportedEncodingException
   {
+
     byte[] byteGenerationID =
       String.valueOf(generationId).getBytes("UTF-8");
 
@@ -101,7 +105,7 @@
     encodedMsg[0] = type;
 
     /* put the protocol version */
-    encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
+    encodedMsg[1] = (byte)protocolVersion;
 
     /* put the generationId */
     int pos = 2;
@@ -192,11 +196,11 @@
     {
       /* then read the version */
       short readVersion = (short)encodedMsg[1];
-      if (readVersion != ProtocolVersion.getCurrentVersion())
+      if (readVersion < ProtocolVersion.REPLICATION_PROTOCOL_V2)
         throw new DataFormatException("Not a valid message: type is " +
           encodedMsg[0] + " but protocol version byte is " + readVersion +
           " instead of " + ProtocolVersion.getCurrentVersion());
-      protocolVersion = ProtocolVersion.getCurrentVersion();
+      protocolVersion = readVersion;
 
       /* read the generationId */
       int pos = 2;
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index 4eda8e5..4b2fc59 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -71,6 +71,7 @@
 
   private boolean closeInitiated = false;
 
+  private short protocolVersion = ProtocolVersion.getCurrentVersion();
 
   /**
    * Creates a new TLSSocketSession.
@@ -185,7 +186,7 @@
       /* We do not want the heartbeat to close the session when */
       /* we are processing a message even a time consuming one. */
       lastReceiveTime=0;
-      return ReplicationMsg.generateMsg(buffer);
+      return ReplicationMsg.generateMsg(buffer, protocolVersion);
     }
     catch (OutOfMemoryError e)
     {
@@ -254,4 +255,12 @@
   {
     return closeInitiated;
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void setProtocolVersion(short version)
+  {
+    protocolVersion = version;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index ca50458..5152c9a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -194,17 +194,8 @@
   public byte[] getBytes(short reqProtocolVersion)
     throws UnsupportedEncodingException
   {
-    // Of course, always support current protocol version
-    if (reqProtocolVersion == ProtocolVersion.getCurrentVersion())
-    {
-      return getBytes();
-    }
-    else
-    {
-      throw new UnsupportedEncodingException(getClass().getSimpleName() +
-          " PDU does not support requested protocol version serialization: " +
-          reqProtocolVersion);
-    }
+    // There was no change since version 2.
+    return getBytes();
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 8494127..8a2fe99 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -435,13 +435,13 @@
    */
   public void registerIntoDomain()
   {
-    // Alright, connected with new DS: store handler.
+    // All-right, connected with new DS: store handler.
     Map<Short, DataServerHandler> connectedDSs =
       replicationServerDomain.getConnectedDSs();
     connectedDSs.put(serverId, this);
 
     // Tell peer DSs a new DS just connected to us
-    // No need to resend topo msg to this just new DS so not null
+    // No need to re-send TopologyMsg to this just new DS so not null
     // argument
     replicationServerDomain.buildAndSendTopoInfoToDSs(this);
     // Tell peer RSs a new DS just connected to us
@@ -499,13 +499,13 @@
       ReplServerStartMsg outReplServerStartMsg = null;
       try
       {
-        outReplServerStartMsg = sendStartToRemote((short)-1);
+        outReplServerStartMsg = sendStartToRemote(protocolVersion);
 
         // log
         logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
 
         // The session initiator decides whether to use SSL.
-        // Until here session is encrypted then it depends on the negociation
+        // Until here session is encrypted then it depends on the negotiation
         if (!sessionInitiatorSSLEncryption)
           session.stopEncryption();
 
@@ -524,7 +524,7 @@
         // aborted after handshake phase one from a DS that is searching for
         // best suitable RS.
 
-        // don't log a poluting error when connection aborted
+        // don't log a polluting error when connection aborted
         // from a DS that wanted only to perform handshake phase 1 in order
         // to determine the best suitable RS:
         // 1) -> ServerStartMsg
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 1bc0d14..d0565d6 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -299,7 +299,8 @@
       lockDomain(true);
 
       // send start to remote
-      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1);
+      ReplServerStartMsg outReplServerStartMsg =
+        sendStartToRemote(protocolVersion);
 
       // log
       logStartHandshakeRCVandSND(inECLStartMsg, outReplServerStartMsg);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationData.java b/opends/src/server/org/opends/server/replication/server/ReplicationData.java
index d3bf417..4ac650b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationData.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -30,6 +30,7 @@
 
 import com.sleepycat.je.DatabaseEntry;
 
+import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 
@@ -67,6 +68,7 @@
   public static UpdateMsg generateChange(byte[] data)
                                              throws Exception
   {
-    return (UpdateMsg) ReplicationMsg.generateMsg(data);
+    return (UpdateMsg) ReplicationMsg.generateMsg(
+        data, ProtocolVersion.getCurrentVersion());
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index e72f3f7..77d068f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -75,6 +75,7 @@
 import org.opends.server.replication.protocol.ServerStartECLMsg;
 import org.opends.server.replication.protocol.ServerStartMsg;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
+import org.opends.server.replication.protocol.StartMsg;
 import org.opends.server.types.BackupConfig;
 import org.opends.server.types.ConfigChangeResult;
 import org.opends.server.types.DN;
@@ -299,18 +300,21 @@
 
         if (msg instanceof ServerStartMsg)
         {
+          session.setProtocolVersion(((StartMsg)msg).getVersion());
           DataServerHandler handler = new DataServerHandler(session,
               queueSize,serverURL,serverId,this,rcvWindow);
           handler.startFromRemoteDS((ServerStartMsg)msg);
         }
         else if (msg instanceof ReplServerStartMsg)
         {
+          session.setProtocolVersion(((StartMsg)msg).getVersion());
           ReplicationServerHandler handler = new ReplicationServerHandler(
               session,queueSize,serverURL,serverId,this,rcvWindow);
           handler.startFromRemoteRS((ReplServerStartMsg)msg);
         }
         else if (msg instanceof ServerStartECLMsg)
         {
+          session.setProtocolVersion(((StartMsg)msg).getVersion());
           ECLServerHandler handler = new ECLServerHandler(
               session,queueSize,serverURL,serverId,this,rcvWindow);
           handler.startFromRemoteServer((ServerStartECLMsg)msg);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index baa4737..3bae821 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1509,8 +1509,21 @@
         // in the topology.
         if (senderHandler.isDataServer())
         {
-          MonitorMsg returnMsg =
+          MonitorMsg returnMsg;
+
+          if (senderHandler.getProtocolVersion() >
+                             ProtocolVersion.REPLICATION_PROTOCOL_V1)
+          {
+           returnMsg =
             new MonitorMsg(msg.getDestination(), msg.getsenderID());
+          }
+          else
+          {
+            returnMsg =
+              new MonitorMsg(msg.getDestination(), msg.getsenderID(),
+                  ProtocolVersion.REPLICATION_PROTOCOL_V1);
+          }
+
           try
           {
             returnMsg.setReplServerDbState(getDbServerState());
@@ -1555,13 +1568,20 @@
           return;
         }
 
-        MonitorRequestMsg replServerMonitorRequestMsg =
-          (MonitorRequestMsg) msg;
+        MonitorMsg monitorMsg;
 
-        MonitorMsg monitorMsg =
-          new MonitorMsg(
-          replServerMonitorRequestMsg.getDestination(),
-          replServerMonitorRequestMsg.getsenderID());
+        if (senderHandler.getProtocolVersion() >
+                                  ProtocolVersion.REPLICATION_PROTOCOL_V1)
+        {
+          monitorMsg =
+            new MonitorMsg(msg.getDestination(), msg.getsenderID());
+        }
+        else
+        {
+          monitorMsg =
+            new MonitorMsg(msg.getDestination(), msg.getsenderID(),
+                ProtocolVersion.REPLICATION_PROTOCOL_V1);
+        }
 
         // Populate for each connected LDAP Server
         // from the states stored in the serverHandler.
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 8d5d9f6..019cbf5 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -85,6 +85,7 @@
     {
       protocolVersion = ProtocolVersion.minWithCurrent(
           inReplServerStartMsg.getVersion());
+      session.setProtocolVersion(protocolVersion);
       generationId = inReplServerStartMsg.getGenerationId();
       serverId = inReplServerStartMsg.getServerId();
       serverURL = inReplServerStartMsg.getServerURL();
@@ -163,13 +164,14 @@
     try
     {
       //
-      lockDomain(false); // notimeout
+      lockDomain(false); // no timeout
 
       // we are the initiator and decides of the encryption
       boolean sessionInitiatorSSLEncryption = this.initSslEncryption;
 
       // Send start
-      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote((short)-1);
+      ReplServerStartMsg outReplServerStartMsg =
+        sendStartToRemote(ProtocolVersion.getCurrentVersion());
 
       // Wait answer
       ReplicationMsg msg = session.receive();
@@ -260,20 +262,13 @@
       // lock with timeout
       lockDomain(true);
 
-      short reqVersion = -1;
-      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
-      {
-        // We support connection from a V1 RS, send PDU with V1 form
-        reqVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
-      }
-
-      // send start to remote
-      ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(reqVersion);
+      ReplServerStartMsg outReplServerStartMsg =
+        sendStartToRemote(protocolVersion);
 
       // log
       logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
 
-      // until here session is encrypted then it depends on the negociation
+      // until here session is encrypted then it depends on the negotiation
       // The session initiator decides whether to use SSL.
       if (!sessionInitiatorSSLEncryption)
         session.stopEncryption();
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 24dd0a4..384dcae 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -1042,10 +1042,7 @@
         replicationServerDomain.
         getReplicationServer().getDegradedStatusThreshold());
 
-    if (requestedProtocolVersion>0)
-      session.publish(outReplServerStartMsg, requestedProtocolVersion);
-    else
-      session.publish(outReplServerStartMsg);
+    session.publish(outReplServerStartMsg, requestedProtocolVersion);
 
     return outReplServerStartMsg;
   }
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 9bab16c..e904311 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -765,9 +765,10 @@
        * The replication server will use the same one (or an older one
        * if it is an old replication server).
        */
-      if (keepConnection)
-        protocolVersion = ProtocolVersion.minWithCurrent(
+      protocolVersion = ProtocolVersion.minWithCurrent(
           replServerStartMsg.getVersion());
+      localSession.setProtocolVersion(protocolVersion);
+
 
       if (!isSslEncryption)
       {
@@ -926,6 +927,7 @@
       if (keepConnection)
         protocolVersion = ProtocolVersion.minWithCurrent(
           replServerStartMsg.getVersion());
+      localSession.setProtocolVersion(protocolVersion);
 
       if (!isSslEncryption)
       {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index 1f2aad5..1bae3dd 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -236,7 +236,8 @@
     byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Un-serialize V1 message
-    AddMsg newMsg = (AddMsg)ReplicationMsg.generateMsg(v1MsgBytes);
+    AddMsg newMsg = (AddMsg)ReplicationMsg.generateMsg(
+        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Check original version of message
     assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -273,7 +274,8 @@
     newMsg.setSafeDataLevel(safeDataLevel);
 
     // Serialize in VLAST msg
-    AddMsg vlastMsg = (AddMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
+    AddMsg vlastMsg = (AddMsg)ReplicationMsg.generateMsg(
+        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Check original version of message
     assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -351,7 +353,8 @@
     byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Un-serialize V1 message
-    DeleteMsg newMsg = (DeleteMsg)ReplicationMsg.generateMsg(v1MsgBytes);
+    DeleteMsg newMsg = (DeleteMsg)ReplicationMsg.generateMsg(
+        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Check original version of message
     assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -371,7 +374,8 @@
     newMsg.setSafeDataLevel(safeDataLevel);
 
     // Serialize in VLAST msg
-    DeleteMsg vlastMsg = (DeleteMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
+    DeleteMsg vlastMsg = (DeleteMsg)ReplicationMsg.generateMsg(
+        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Check original version of message
     assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -484,7 +488,8 @@
     byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Un-serialize V1 message
-    ModifyMsg newMsg = (ModifyMsg)ReplicationMsg.generateMsg(v1MsgBytes);
+    ModifyMsg newMsg = (ModifyMsg)ReplicationMsg.generateMsg(
+        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Check original version of message
     assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -518,7 +523,8 @@
     newMsg.setSafeDataLevel(safeDataLevel);
 
     // Serialize in VLAST msg
-    ModifyMsg vlastMsg = (ModifyMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
+    ModifyMsg vlastMsg = (ModifyMsg)ReplicationMsg.generateMsg(
+        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Check original version of message
     assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
@@ -548,7 +554,7 @@
 
   @DataProvider(name = "createModifyDnData")
   public Object[][] createModifyDnData() {
-    
+
     AttributeType type = DirectoryServer.getAttributeType("description");
 
     Attribute attr1 = Attributes.create("description", "new value");
@@ -579,7 +585,7 @@
       Modification mod = new Modification(ModificationType.ADD, attr);
       mods4.add(mod);
     }
-    
+
     return new Object[][] {
         {"dc=test,dc=com", "dc=new", "11111111-1111-1111-1111-111111111111", "22222222-2222-2222-2222-222222222222", false, "dc=change", mods1, false, AssuredMode.SAFE_DATA_MODE, (byte)0},
         {"dc=test,dc=com", "dc=new", "33333333-3333-3333-3333-333333333333", "44444444-4444-4444-4444-444444444444", true, "dc=change", mods2, true, AssuredMode.SAFE_READ_MODE, (byte)1},
@@ -632,7 +638,8 @@
     byte[] v1MsgBytes = msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Un-serialize V1 message
-    ModifyDNMsg newMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(v1MsgBytes);
+    ModifyDNMsg newMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(
+        v1MsgBytes, ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Check original version of message
     assertEquals(newMsg.getVersion(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
@@ -672,7 +679,8 @@
     newMsg.setMods(mods);
 
     // Serialize in VLAST msg
-    ModifyDNMsg vlastMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(newMsg.getBytes());
+    ModifyDNMsg vlastMsg = (ModifyDNMsg)ReplicationMsg.generateMsg(
+        newMsg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
 
     // Check original version of message
     assertEquals(vlastMsg.getVersion(), REPLICATION_PROTOCOL_VLAST);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index de2be1e..ea14331 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -177,8 +177,8 @@
     msg.setAssuredMode(assuredMode);
     msg.setSafeDataLevel(safeDataLevel);
 
-    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg
-        .generateMsg(msg.getBytes());
+    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
+        msg.getBytes(), ProtocolVersion.getCurrentVersion());
 
     // Test that generated attributes match original attributes.
     assertEquals(generatedMsg.isAssured(), isAssured);
@@ -234,8 +234,8 @@
     assertTrue(msg.getSafeDataLevel() == safeDataLevel);
 
     // Check equals
-    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg
-        .generateMsg(msg.getBytes());
+    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
+        msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
     assertFalse(msg.equals(null));
     assertFalse(msg.equals(new Object()));
 
@@ -298,8 +298,8 @@
       (short) 123, (short) 45);
     op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
     DeleteMsg msg = new DeleteMsg(op);
-    DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg
-        .generateMsg(msg.getBytes());
+    DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg(
+        msg.getBytes(), ProtocolVersion.getCurrentVersion());
 
     assertEquals(msg.toString(), generatedMsg.toString());
 
@@ -393,7 +393,7 @@
     msg.setSafeDataLevel(safeDataLevel);
 
     ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg
-        .generateMsg(msg.getBytes());
+        .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion());
 
     // Test that generated attributes match original attributes.
     assertEquals(generatedMsg.isAssured(), isAssured);
@@ -466,7 +466,7 @@
     msg.setSafeDataLevel(safeDataLevel);
 
     AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg
-        .getBytes());
+        .getBytes(), ProtocolVersion.getCurrentVersion());
     assertEquals(msg.getBytes(), generatedMsg.getBytes());
     assertEquals(msg.toString(), generatedMsg.toString());
 
@@ -611,7 +611,8 @@
     }
 
     // Check that retrieved CN is OK
-    msg2 = (AckMsg) ReplicationMsg.generateMsg(msg1.getBytes());
+    msg2 = (AckMsg) ReplicationMsg.generateMsg(
+        msg1.getBytes(), ProtocolVersion.getCurrentVersion());
   }
 
   @Test()
@@ -1006,7 +1007,7 @@
     msg.setServerState(sid3, s3, now+3, false);
 
     byte[] b = msg.getBytes();
-    MonitorMsg newMsg = new MonitorMsg(b);
+    MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
 
     assertEquals(rsState, msg.getReplServerDbState());
     assertEquals(newMsg.getReplServerDbState().toString(),

--
Gitblit v1.10.0