From b45a7bf251b59ef156cfd7f3235384ac8835fcd4 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 24 May 2007 13:14:06 +0000
Subject: [PATCH] [Issue 1085]  Synchronization protocol must be extensible

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                                |   36 +++
 opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java                                |   76 +++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java              |   42 +++++
 opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java                         |   30 +-
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java             |   25 +-
 opends/src/server/org/opends/server/tasks/InitializeTask.java                                                |    5 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java |    6 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                                    |   27 +++
 opends/src/server/org/opends/server/tasks/InitializeTargetTask.java                                          |    3 
 opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java                             |   30 ++-
 opends/src/server/org/opends/server/replication/protocol/StartMessage.java                                   |  150 ++++++++++++++++++
 11 files changed, 376 insertions(+), 54 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 913e039..a27dc96 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -33,10 +33,6 @@
 import static org.opends.server.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.TreeSet;
-import java.util.concurrent.Semaphore;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetAddress;
@@ -44,6 +40,10 @@
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
 
 import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.protocols.internal.InternalClientConnection;
@@ -52,11 +52,12 @@
 import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.ReplServerStartMessage;
 import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMessage;
+import org.opends.server.replication.protocol.ReplicationMessage;
 import org.opends.server.replication.protocol.ServerStartMessage;
 import org.opends.server.replication.protocol.SocketSession;
-import org.opends.server.replication.protocol.ReplicationMessage;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.WindowMessage;
 import org.opends.server.types.DN;
@@ -99,6 +100,7 @@
   private int halfRcvWindow;
   private int maxRcvWindow;
   private int timeout = 0;
+  private short protocolVersion;
 
   /**
    * The time in milliseconds between heartbeats from the replication
@@ -155,6 +157,7 @@
     this.maxRcvWindow = window;
     this.halfRcvWindow = window/2;
     this.heartbeatInterval = heartbeatInterval;
+    this.protocolVersion = ProtocolVersion.currentVersion();
   }
 
   /**
@@ -230,7 +233,8 @@
            */
           ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
               maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-              halfRcvWindow*2, heartbeatInterval, state);
+              halfRcvWindow*2, heartbeatInterval, state,
+              protocolVersion);
           session.publish(msg);
 
 
@@ -239,6 +243,14 @@
            */
           session.setSoTimeout(1000);
           startMsg = (ReplServerStartMessage) session.receive();
+
+          /*
+           * We have sent our own protocol version to the replication server.
+           * The replication server will use the same one (or an older one
+           * if it is an old replication server).
+           */
+          protocolVersion = ProtocolVersion.minWithCurrent(
+              startMsg.getVersion());
           session.setSoTimeout(timeout);
 
           /*
@@ -727,4 +739,14 @@
     // session with the replicationServer or renegociate the parameters that
     // were sent in the ServerStart message
   }
+
+  /**
+   * Get the version of the replication protocol.
+   * @return The version of the replication protocol.
+   */
+  public short getProtocolVersion()
+  {
+    return protocolVersion;
+  }
+
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
new file mode 100644
index 0000000..7d791c0
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -0,0 +1,76 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+
+/**
+ * The version utility class for the replication protocol.
+ */
+public class ProtocolVersion
+{
+  /**
+   * Get the version included in the Start Message mean the replication
+   * protocol version used by the server that created the message.
+   *
+   * @return The version used by the server that created the message.
+   */
+  static short CURRENT_VERSION = 1;
+
+  /**
+   * Specifies the current version of the replication protocol.
+   *
+   * @return The current version of the protocol.
+   */
+  public static short currentVersion()
+  {
+    return CURRENT_VERSION;
+  }
+
+  /**
+   * For test purpose.
+   * @param currentVersion The provided current version.
+   */
+  public static void setCurrentVersion(short currentVersion)
+  {
+    CURRENT_VERSION = currentVersion;
+  }
+
+  /**
+   * Specifies the oldest version of the protocol from the provided one
+   * and the current one.
+   *
+   * @param version The version to be compared to the current one.
+   * @return The minimal protocol version.
+   */
+  public static short minWithCurrent(short version)
+  {
+    Short sVersion = Short.valueOf(version);
+    Short newVersion = (sVersion<CURRENT_VERSION?sVersion:CURRENT_VERSION);
+    return newVersion;
+  }
+}
+
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
index 4b0e8ff..013d4cd 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
@@ -38,7 +38,7 @@
  * Message sent by a replication server to another replication server
  * at Startup.
  */
-public class ReplServerStartMessage extends ReplicationMessage implements
+public class ReplServerStartMessage extends StartMessage implements
     Serializable
 {
   private static final long serialVersionUID = -5871385537169856856L;
@@ -58,11 +58,14 @@
    * @param baseDn base DN for which the ReplServerStartMessage is created.
    * @param windowSize The window size.
    * @param serverState our ServerState for this baseDn.
+   * @param protocolVersion The replication protocol version of the creator.
    */
   public ReplServerStartMessage(short serverId, String serverURL, DN baseDn,
                                int windowSize,
-                               ServerState serverState)
+                               ServerState serverState,
+                               short protocolVersion)
   {
+    super(protocolVersion);
     this.serverId = serverId;
     this.serverURL = serverURL;
     if (baseDn != null)
@@ -85,13 +88,12 @@
     /* The ReplServerStartMessage is encoded in the form :
      * <baseDn><ServerId><ServerUrl><windowsize><ServerState>
      */
+    super(MSG_TYPE_REPL_SERVER_START, in);
+
     try
     {
-      /* first byte is the type */
-      if (in[0] != MSG_TYPE_REPL_SERVER_START)
-        throw new DataFormatException(
-              "input is not a valid ReplServerStartMsg");
-      int pos = 1;
+      /* first bytes are the header */
+      int pos = headerLength;
 
       /* read the dn
        * first calculate the length then construct the string
@@ -193,15 +195,13 @@
       byte[] byteServerState = serverState.getBytes();
       byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
 
-      int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
-          byteServerUrl.length + 1 + byteWindowSize.length + 1 +
-          byteServerState.length + 1;
+      int length = byteDn.length + 1 + byteServerId.length + 1 +
+                   byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+                   byteServerState.length + 1;
 
-      byte[] resultByteArray = new byte[length];
-
-      /* put the type of the operation */
-      resultByteArray[0] = MSG_TYPE_REPL_SERVER_START;
-      int pos = 1;
+      /* encode the header in a byte[] large enough to also contain the mods */
+      byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length);
+      int pos = headerLength;
 
       /* put the baseDN and a terminating 0 */
       pos = addByteArray(byteDn, resultByteArray, pos);
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
index cdd9df2..a046356 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -33,15 +33,15 @@
 import java.util.zip.DataFormatException;
 
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.types.DirectoryException;
 import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
 
 /**
  * This message is used by LDAP server when they first connect.
  * to a replication server to let them know who they are and what is their state
  * (their RUV)
  */
-public class ServerStartMessage extends ReplicationMessage implements
+public class ServerStartMessage extends StartMessage implements
     Serializable
 {
   private static final long serialVersionUID = 8649393307038290287L;
@@ -75,13 +75,17 @@
    * @param windowSize   The window size used by this server.
    * @param heartbeatInterval The requested heartbeat interval.
    * @param serverState  The state of this server.
+   * @param protocolVersion The replication protocol version of the creator.
    */
   public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
                             int maxReceiveQueue, int maxSendDelay,
                             int maxSendQueue, int windowSize,
                             long heartbeatInterval,
-                            ServerState serverState)
+                            ServerState serverState,
+                            short protocolVersion)
   {
+    super(protocolVersion);
+
     this.serverId = serverId;
     this.baseDn = baseDn.toString();
     this.maxReceiveDelay = maxReceiveDelay;
@@ -113,16 +117,16 @@
    */
   public ServerStartMessage(byte[] in) throws DataFormatException
   {
+    super(MSG_TYPE_SERVER_START, in);
+
     /* The ServerStartMessage is encoded in the form :
-     * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
+     * <header><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
      * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState>
      */
     try
     {
-      /* first byte is the type */
-      if (in[0] != MSG_TYPE_SERVER_START)
-        throw new DataFormatException("input is not a valid ServerStart msg");
-      int pos = 1;
+      /* first bytes are the header */
+      int pos = headerLength;
 
       /*
        * read the dn
@@ -306,7 +310,7 @@
                      String.valueOf(heartbeatInterval).getBytes("UTF-8");
       byte[] byteServerState = serverState.getBytes();
 
-      int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
+      int length = byteDn.length + 1 + byteServerId.length + 1 +
                    byteServerUrl.length + 1 +
                    byteMaxRecvDelay.length + 1 +
                    byteMaxRecvQueue.length + 1 +
@@ -316,11 +320,9 @@
                    byteHeartbeatInterval.length + 1 +
                    byteServerState.length + 1;
 
-      byte[] resultByteArray = new byte[length];
-
-      /* put the type of the operation */
-      resultByteArray[0] = MSG_TYPE_SERVER_START;
-      int pos = 1;
+      /* encode the header in a byte[] large enough to also contain the mods */
+      byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length);
+      int pos = headerLength;
 
       pos = addByteArray(byteDn, resultByteArray, pos);
 
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartMessage.java b/opends/src/server/org/opends/server/replication/protocol/StartMessage.java
new file mode 100644
index 0000000..150d3d9
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/StartMessage.java
@@ -0,0 +1,150 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+
+/**
+ * This abstract message class is the superclass for start messages used
+ * by LDAP servers and Replication servers to initiate their communications.
+ * This class specifies a message header that contains the Replication
+ * Protocol version.
+ */
+public abstract class StartMessage extends ReplicationMessage
+{
+  private short protocolVersion;
+
+  /**
+   * The length of the header of this message.
+   */
+  protected int headerLength;
+
+  /**
+   * Create a new StartMessage.
+   *
+   * @param protocolVersion The Replication Protocol version of the server
+   * for which the StartMessage is created.
+   */
+  public StartMessage(short protocolVersion)
+  {
+    this.protocolVersion = protocolVersion;
+  }
+
+  /**
+   * Creates a new ServerStartMessage from its encoded form.
+   *
+   * @param type The type of the message to create.
+   * @param encodedMsg The byte array containing the encoded form of the
+   *           StartMessage.
+   * @throws DataFormatException If the byte array does not contain a valid
+   *                             encoded form of the ServerStartMessage.
+   */
+  public StartMessage(byte type, byte [] encodedMsg) throws DataFormatException
+  {
+    headerLength = decodeHeader(type, encodedMsg);
+  }
+
+  /**
+   * Encode the header for the start message.
+   *
+   * @param type The type of the message to create.
+   * @param additionalLength additional length needed to encode the remaining
+   *                         part of the UpdateMessage.
+   * @return a byte array containing the common header and enough space to
+   *         encode the reamining bytes of the UpdateMessage as was specified
+   *         by the additionalLength.
+   *         (byte array length = common header length + additionalLength)
+   * @throws UnsupportedEncodingException if UTF-8 is not supported.
+   */
+  public byte[] encodeHeader(byte type, int additionalLength)
+  throws UnsupportedEncodingException
+  {
+    byte[] versionByte = Short.toString(protocolVersion).getBytes("UTF-8");
+
+    /* The message header is stored in the form :
+     * <message type><protocol version>
+     */
+    int length = 1 + versionByte.length + 1 +
+                     additionalLength;
+
+    byte[] encodedMsg = new byte[length];
+
+    /* put the type of the operation */
+    encodedMsg[0] = type;
+    int pos = 1;
+
+    /* put the protocol version */
+    headerLength = addByteArray(versionByte, encodedMsg, pos);
+
+    return encodedMsg;
+  }
+
+  /**
+   * Decode the Header part of this message, and check its type.
+   *
+   * @param type The type of this message.
+   * @param encodedMsg the encoded form of the message.
+   * @return the position at which the remaining part of the message starts.
+   * @throws DataFormatException if the encodedMsg does not contain a valid
+   *         common header.
+   */
+  public int decodeHeader(byte type, byte [] encodedMsg)
+  throws DataFormatException
+  {
+    /* first byte is the type */
+    if (encodedMsg[0] != type)
+      throw new DataFormatException("byte[] is not a valid msg");
+
+    try
+    {
+      /* then read the version */
+      int pos = 1;
+      int length = getNextLength(encodedMsg, pos);
+      protocolVersion = Short.valueOf(
+          new String(encodedMsg, pos, length, "UTF-8"));
+      pos += length + 1;
+      return pos;
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+
+  }
+
+  /**
+   * Get the version included in the Start Message mean the replication
+   * protocol version used by the server that created the message.
+   *
+   * @return The version used by the server that created the message.
+   */
+  public short getVersion()
+  {
+    return protocolVersion;
+  }
+}
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 9a34d92..a098c4b 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,6 +49,7 @@
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.AckMessage;
 import org.opends.server.replication.protocol.ReplServerStartMessage;
 import org.opends.server.replication.protocol.HeartbeatThread;
@@ -118,6 +119,8 @@
   private int saturationCount = 0;
   private short replicationServerId;
 
+  private short protocolVersion;
+
   /**
    * The time in milliseconds between heartbeats from the replication
    * server.  Zero means heartbeats are off.
@@ -146,6 +149,7 @@
     super("Server Handler");
     this.session = session;
     this.maxQueueSize = queueSize;
+    this.protocolVersion = ProtocolVersion.currentVersion();
   }
 
   /**
@@ -175,20 +179,27 @@
     {
       if (baseDn != null)
       {
+        // This is an outgoing connection. Publish our start message.
         this.baseDn = baseDn;
         replicationCache = replicationServer.getReplicationCache(baseDn);
         ServerState localServerState = replicationCache.getDbServerState();
         ReplServerStartMessage msg =
           new ReplServerStartMessage(replicationServerId, replicationServerURL,
-                                    baseDn, windowSize, localServerState);
+                                    baseDn, windowSize, localServerState,
+                                    protocolVersion);
 
         session.publish(msg);
       }
 
+      // Wait and process ServerStart or ReplServerStart
       ReplicationMessage msg = session.receive();
       if (msg instanceof ServerStartMessage)
       {
+        // The remote server is an LDAP Server
         ServerStartMessage receivedMsg = (ServerStartMessage) msg;
+
+        protocolVersion = ProtocolVersion.minWithCurrent(
+            receivedMsg.getVersion());
         serverId = receivedMsg.getServerId();
         serverURL = receivedMsg.getServerURL();
         this.baseDn = receivedMsg.getBaseDn();
@@ -233,17 +244,22 @@
 
         serverIsLDAPserver = true;
 
+        // This an incoming connection. Publish our start message
         replicationCache = replicationServer.getReplicationCache(this.baseDn);
         ServerState localServerState = replicationCache.getDbServerState();
         ReplServerStartMessage myStartMsg =
           new ReplServerStartMessage(replicationServerId, replicationServerURL,
-                                    this.baseDn, windowSize, localServerState);
+                                    this.baseDn, windowSize, localServerState,
+                                    protocolVersion);
         session.publish(myStartMsg);
         sendWindowSize = receivedMsg.getWindowSize();
       }
       else if (msg instanceof ReplServerStartMessage)
       {
+        // The remote server is a replication server
         ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
+        protocolVersion = ProtocolVersion.minWithCurrent(
+            receivedMsg.getVersion());
         serverId = receivedMsg.getServerId();
         serverURL = receivedMsg.getServerURL();
         int separator = serverURL.lastIndexOf(':');
@@ -255,14 +271,19 @@
         {
           replicationCache = replicationServer.getReplicationCache(this.baseDn);
           ServerState serverState = replicationCache.getDbServerState();
+
+          // Publish our start message
           ReplServerStartMessage outMsg =
             new ReplServerStartMessage(replicationServerId,
                                        replicationServerURL,
-                                       this.baseDn, windowSize, serverState);
+                                       this.baseDn, windowSize, serverState,
+                                       protocolVersion);
           session.publish(outMsg);
         }
         else
+        {
           this.baseDn = baseDn;
+        }
         this.serverState = receivedMsg.getServerState();
         sendWindowSize = receivedMsg.getWindowSize();
       }
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
index e586f78..c4cf747 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -133,9 +133,10 @@
     }
     catch(DirectoryException de)
     {
+      // This log will go to the task log message
       logError(ErrorLogCategory.TASK,
           ErrorLogSeverity.SEVERE_ERROR,
-          "Initialize Task stopped by error", 1);
+          "Initialize Task stopped by error" + de.getErrorMessage(), 1);
 
       return TaskState.STOPPED_BY_ERROR;
     }
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTask.java b/opends/src/server/org/opends/server/tasks/InitializeTask.java
index 2733b89..3f828c1 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -87,6 +87,10 @@
    */
   @Override public void initializeTask() throws DirectoryException
   {
+    if (TaskState.isDone(getTaskState()))
+    {
+      return;
+    }
 
     // FIXME -- Do we need any special authorization here?
     Entry taskEntry = getTaskEntry();
@@ -117,7 +121,6 @@
 
     domain=ReplicationDomain.retrievesReplicationDomain(domainDN);
 
-
     attrList = taskEntry.getAttribute(typeSourceScope);
     String sourceString = TaskUtils.getSingleValueString(attrList);
     source = domain.decodeSource(sourceString);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
index a6733f7..f6921f5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -28,10 +28,13 @@
 package org.opends.server.replication;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
+import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.opends.server.TestCaseUtils;
@@ -42,8 +45,10 @@
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.plugin.ReplicationBroker;
 import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplicationMessage;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
@@ -329,4 +334,39 @@
       assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
     }
   }
+  
+  @Test(enabled=true)
+  public void protocolVersion() throws Exception
+  {
+    logError(ErrorLogCategory.SYNCHRONIZATION,
+        ErrorLogSeverity.NOTICE,
+        "Starting Replication ProtocolWindowTest : protocolVersion" , 1);
+
+    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+
+    // Test : Make a broker degrade its version when connecting to an old
+    // replication server.
+    ProtocolVersion.setCurrentVersion((short)2);
+
+    ReplicationBroker broker = new ReplicationBroker(
+        new ServerState(), 
+        baseDn, 
+        (short) 13, 0, 0, 0, 0, 1000, 0);
+ 
+
+    // Check broker hard-coded version
+    short pversion = broker.getProtocolVersion();
+    assertEquals(pversion, 2);
+    
+    // Connect the broker to the replication server
+    ProtocolVersion.setCurrentVersion((short)0);
+    ArrayList<String> servers = new ArrayList<String>(1);
+    servers.add("localhost:" + replServerPort);
+    broker.start(servers);
+    TestCaseUtils.sleep(100); // wait for connection established
+    
+    // Check broker negociated version
+    pversion = broker.getProtocolVersion();
+    assertEquals(pversion, 0);
+  }    
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index dd19136..d37f9cc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -340,18 +340,23 @@
       DN.decode(synchroPluginStringDN)),
       "Unable to add the Multimaster replication plugin");
 
-
-    // Add the replication server
-    DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
-    assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()),
+    if (replServerEntry != null)
+    {
+      // Add the replication server
+      DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
+      assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()),
        "Unable to add the replication server");
-    configEntryList.add(replServerEntry.getDN());
+      configEntryList.add(replServerEntry.getDN());
+    }
 
-    // We also have a replicated suffix (replication domain)
-    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
-    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
-        "Unable to add the synchronized server");
-    configEntryList.add(synchroServerEntry.getDN());
+    if (synchroServerEntry != null)
+    {
+      // We also have a replicated suffix (replication domain)
+      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+          "Unable to add the synchronized server");
+      configEntryList.add(synchroServerEntry.getDN());
+    }
   }
 
   /**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 669271b..d3a5557 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -486,7 +486,7 @@
   {
     state.update(new ChangeNumber((long)1, 1,(short)1));
     ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
-        window, window, window, window, window, window, state);
+        window, window, window, window, window, window, state, (short)1);
     ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
     assertEquals(msg.getServerId(), newMsg.getServerId());
     assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -498,6 +498,7 @@
     assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
     assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
         newMsg.getServerState().getMaxChangeNumber((short)1));
+    assertEquals(msg.getVersion(), newMsg.getVersion());
   }
 
   @DataProvider(name="changelogStart")
@@ -518,7 +519,7 @@
   {
     state.update(new ChangeNumber((long)1, 1,(short)1));
     ReplServerStartMessage msg = new ReplServerStartMessage(serverId,
-        url, baseDN, window, state);
+        url, baseDN, window, state, (short)1);
     ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes());
     assertEquals(msg.getServerId(), newMsg.getServerId());
     assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -526,6 +527,7 @@
     assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
     assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
         newMsg.getServerState().getMaxChangeNumber((short)1));
+    assertEquals(msg.getVersion(), newMsg.getVersion());
   }
 
   /**

--
Gitblit v1.10.0