mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
24.14.2007 b45a7bf251b59ef156cfd7f3235384ac8835fcd4
[Issue 1085]  Synchronization protocol must be extensible
2 files added
9 files modified
430 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 36 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java 76 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java 30 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java 30 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartMessage.java 150 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 27 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTask.java 5 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java 42 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 25 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -33,10 +33,6 @@
import static org.opends.server.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
@@ -44,6 +40,10 @@
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -52,11 +52,12 @@
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.types.DN;
@@ -99,6 +100,7 @@
  private int halfRcvWindow;
  private int maxRcvWindow;
  private int timeout = 0;
  private short protocolVersion;
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -155,6 +157,7 @@
    this.maxRcvWindow = window;
    this.halfRcvWindow = window/2;
    this.heartbeatInterval = heartbeatInterval;
    this.protocolVersion = ProtocolVersion.currentVersion();
  }
  /**
@@ -230,7 +233,8 @@
           */
          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              halfRcvWindow*2, heartbeatInterval, state);
              halfRcvWindow*2, heartbeatInterval, state,
              protocolVersion);
          session.publish(msg);
@@ -239,6 +243,14 @@
           */
          session.setSoTimeout(1000);
          startMsg = (ReplServerStartMessage) session.receive();
          /*
           * We have sent our own protocol version to the replication server.
           * The replication server will use the same one (or an older one
           * if it is an old replication server).
           */
          protocolVersion = ProtocolVersion.minWithCurrent(
              startMsg.getVersion());
          session.setSoTimeout(timeout);
          /*
@@ -727,4 +739,14 @@
    // session with the replicationServer or renegociate the parameters that
    // were sent in the ServerStart message
  }
  /**
   * Get the version of the replication protocol.
   * @return The version of the replication protocol.
   */
  public short getProtocolVersion()
  {
    return protocolVersion;
  }
}
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
New file
@@ -0,0 +1,76 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
/**
 * The version utility class for the replication protocol.
 */
public class ProtocolVersion
{
  /**
   * Get the version included in the Start Message mean the replication
   * protocol version used by the server that created the message.
   *
   * @return The version used by the server that created the message.
   */
  static short CURRENT_VERSION = 1;
  /**
   * Specifies the current version of the replication protocol.
   *
   * @return The current version of the protocol.
   */
  public static short currentVersion()
  {
    return CURRENT_VERSION;
  }
  /**
   * For test purpose.
   * @param currentVersion The provided current version.
   */
  public static void setCurrentVersion(short currentVersion)
  {
    CURRENT_VERSION = currentVersion;
  }
  /**
   * Specifies the oldest version of the protocol from the provided one
   * and the current one.
   *
   * @param version The version to be compared to the current one.
   * @return The minimal protocol version.
   */
  public static short minWithCurrent(short version)
  {
    Short sVersion = Short.valueOf(version);
    Short newVersion = (sVersion<CURRENT_VERSION?sVersion:CURRENT_VERSION);
    return newVersion;
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
@@ -38,7 +38,7 @@
 * Message sent by a replication server to another replication server
 * at Startup.
 */
public class ReplServerStartMessage extends ReplicationMessage implements
public class ReplServerStartMessage extends StartMessage implements
    Serializable
{
  private static final long serialVersionUID = -5871385537169856856L;
@@ -58,11 +58,14 @@
   * @param baseDn base DN for which the ReplServerStartMessage is created.
   * @param windowSize The window size.
   * @param serverState our ServerState for this baseDn.
   * @param protocolVersion The replication protocol version of the creator.
   */
  public ReplServerStartMessage(short serverId, String serverURL, DN baseDn,
                               int windowSize,
                               ServerState serverState)
                               ServerState serverState,
                               short protocolVersion)
  {
    super(protocolVersion);
    this.serverId = serverId;
    this.serverURL = serverURL;
    if (baseDn != null)
@@ -85,13 +88,12 @@
    /* The ReplServerStartMessage is encoded in the form :
     * <baseDn><ServerId><ServerUrl><windowsize><ServerState>
     */
    super(MSG_TYPE_REPL_SERVER_START, in);
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_REPL_SERVER_START)
        throw new DataFormatException(
              "input is not a valid ReplServerStartMsg");
      int pos = 1;
      /* first bytes are the header */
      int pos = headerLength;
      /* read the dn
       * first calculate the length then construct the string
@@ -193,15 +195,13 @@
      byte[] byteServerState = serverState.getBytes();
      byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
      int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
          byteServerUrl.length + 1 + byteWindowSize.length + 1 +
          byteServerState.length + 1;
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 + byteWindowSize.length + 1 +
                   byteServerState.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_START;
      int pos = 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length);
      int pos = headerLength;
      /* put the baseDN and a terminating 0 */
      pos = addByteArray(byteDn, resultByteArray, pos);
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -33,15 +33,15 @@
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is used by LDAP server when they first connect.
 * to a replication server to let them know who they are and what is their state
 * (their RUV)
 */
public class ServerStartMessage extends ReplicationMessage implements
public class ServerStartMessage extends StartMessage implements
    Serializable
{
  private static final long serialVersionUID = 8649393307038290287L;
@@ -75,13 +75,17 @@
   * @param windowSize   The window size used by this server.
   * @param heartbeatInterval The requested heartbeat interval.
   * @param serverState  The state of this server.
   * @param protocolVersion The replication protocol version of the creator.
   */
  public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
                            int maxReceiveQueue, int maxSendDelay,
                            int maxSendQueue, int windowSize,
                            long heartbeatInterval,
                            ServerState serverState)
                            ServerState serverState,
                            short protocolVersion)
  {
    super(protocolVersion);
    this.serverId = serverId;
    this.baseDn = baseDn.toString();
    this.maxReceiveDelay = maxReceiveDelay;
@@ -113,16 +117,16 @@
   */
  public ServerStartMessage(byte[] in) throws DataFormatException
  {
    super(MSG_TYPE_SERVER_START, in);
    /* The ServerStartMessage is encoded in the form :
     * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
     * <header><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
     * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState>
     */
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_SERVER_START)
        throw new DataFormatException("input is not a valid ServerStart msg");
      int pos = 1;
      /* first bytes are the header */
      int pos = headerLength;
      /*
       * read the dn
@@ -306,7 +310,7 @@
                     String.valueOf(heartbeatInterval).getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 +
                   byteMaxRecvDelay.length + 1 +
                   byteMaxRecvQueue.length + 1 +
@@ -316,11 +320,9 @@
                   byteHeartbeatInterval.length + 1 +
                   byteServerState.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_SERVER_START;
      int pos = 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length);
      int pos = headerLength;
      pos = addByteArray(byteDn, resultByteArray, pos);
opends/src/server/org/opends/server/replication/protocol/StartMessage.java
New file
@@ -0,0 +1,150 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This abstract message class is the superclass for start messages used
 * by LDAP servers and Replication servers to initiate their communications.
 * This class specifies a message header that contains the Replication
 * Protocol version.
 */
public abstract class StartMessage extends ReplicationMessage
{
  private short protocolVersion;
  /**
   * The length of the header of this message.
   */
  protected int headerLength;
  /**
   * Create a new StartMessage.
   *
   * @param protocolVersion The Replication Protocol version of the server
   * for which the StartMessage is created.
   */
  public StartMessage(short protocolVersion)
  {
    this.protocolVersion = protocolVersion;
  }
  /**
   * Creates a new ServerStartMessage from its encoded form.
   *
   * @param type The type of the message to create.
   * @param encodedMsg The byte array containing the encoded form of the
   *           StartMessage.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public StartMessage(byte type, byte [] encodedMsg) throws DataFormatException
  {
    headerLength = decodeHeader(type, encodedMsg);
  }
  /**
   * Encode the header for the start message.
   *
   * @param type The type of the message to create.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @return a byte array containing the common header and enough space to
   *         encode the reamining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   */
  public byte[] encodeHeader(byte type, int additionalLength)
  throws UnsupportedEncodingException
  {
    byte[] versionByte = Short.toString(protocolVersion).getBytes("UTF-8");
    /* The message header is stored in the form :
     * <message type><protocol version>
     */
    int length = 1 + versionByte.length + 1 +
                     additionalLength;
    byte[] encodedMsg = new byte[length];
    /* put the type of the operation */
    encodedMsg[0] = type;
    int pos = 1;
    /* put the protocol version */
    headerLength = addByteArray(versionByte, encodedMsg, pos);
    return encodedMsg;
  }
  /**
   * Decode the Header part of this message, and check its type.
   *
   * @param type The type of this message.
   * @param encodedMsg the encoded form of the message.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
  public int decodeHeader(byte type, byte [] encodedMsg)
  throws DataFormatException
  {
    /* first byte is the type */
    if (encodedMsg[0] != type)
      throw new DataFormatException("byte[] is not a valid msg");
    try
    {
      /* then read the version */
      int pos = 1;
      int length = getNextLength(encodedMsg, pos);
      protocolVersion = Short.valueOf(
          new String(encodedMsg, pos, length, "UTF-8"));
      pos += length + 1;
      return pos;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Get the version included in the Start Message mean the replication
   * protocol version used by the server that created the message.
   *
   * @return The version used by the server that created the message.
   */
  public short getVersion()
  {
    return protocolVersion;
  }
}
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,6 +49,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.HeartbeatThread;
@@ -118,6 +119,8 @@
  private int saturationCount = 0;
  private short replicationServerId;
  private short protocolVersion;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
@@ -146,6 +149,7 @@
    super("Server Handler");
    this.session = session;
    this.maxQueueSize = queueSize;
    this.protocolVersion = ProtocolVersion.currentVersion();
  }
  /**
@@ -175,20 +179,27 @@
    {
      if (baseDn != null)
      {
        // This is an outgoing connection. Publish our start message.
        this.baseDn = baseDn;
        replicationCache = replicationServer.getReplicationCache(baseDn);
        ServerState localServerState = replicationCache.getDbServerState();
        ReplServerStartMessage msg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    baseDn, windowSize, localServerState);
                                    baseDn, windowSize, localServerState,
                                    protocolVersion);
        session.publish(msg);
      }
      // Wait and process ServerStart or ReplServerStart
      ReplicationMessage msg = session.receive();
      if (msg instanceof ServerStartMessage)
      {
        // The remote server is an LDAP Server
        ServerStartMessage receivedMsg = (ServerStartMessage) msg;
        protocolVersion = ProtocolVersion.minWithCurrent(
            receivedMsg.getVersion());
        serverId = receivedMsg.getServerId();
        serverURL = receivedMsg.getServerURL();
        this.baseDn = receivedMsg.getBaseDn();
@@ -233,17 +244,22 @@
        serverIsLDAPserver = true;
        // This an incoming connection. Publish our start message
        replicationCache = replicationServer.getReplicationCache(this.baseDn);
        ServerState localServerState = replicationCache.getDbServerState();
        ReplServerStartMessage myStartMsg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    this.baseDn, windowSize, localServerState);
                                    this.baseDn, windowSize, localServerState,
                                    protocolVersion);
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
      }
      else if (msg instanceof ReplServerStartMessage)
      {
        // The remote server is a replication server
        ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
        protocolVersion = ProtocolVersion.minWithCurrent(
            receivedMsg.getVersion());
        serverId = receivedMsg.getServerId();
        serverURL = receivedMsg.getServerURL();
        int separator = serverURL.lastIndexOf(':');
@@ -255,14 +271,19 @@
        {
          replicationCache = replicationServer.getReplicationCache(this.baseDn);
          ServerState serverState = replicationCache.getDbServerState();
          // Publish our start message
          ReplServerStartMessage outMsg =
            new ReplServerStartMessage(replicationServerId,
                                       replicationServerURL,
                                       this.baseDn, windowSize, serverState);
                                       this.baseDn, windowSize, serverState,
                                       protocolVersion);
          session.publish(outMsg);
        }
        else
        {
          this.baseDn = baseDn;
        }
        this.serverState = receivedMsg.getServerState();
        sendWindowSize = receivedMsg.getWindowSize();
      }
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -133,9 +133,10 @@
    }
    catch(DirectoryException de)
    {
      // This log will go to the task log message
      logError(ErrorLogCategory.TASK,
          ErrorLogSeverity.SEVERE_ERROR,
          "Initialize Task stopped by error", 1);
          "Initialize Task stopped by error" + de.getErrorMessage(), 1);
      return TaskState.STOPPED_BY_ERROR;
    }
opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -87,6 +87,10 @@
   */
  @Override public void initializeTask() throws DirectoryException
  {
    if (TaskState.isDone(getTaskState()))
    {
      return;
    }
    // FIXME -- Do we need any special authorization here?
    Entry taskEntry = getTaskEntry();
@@ -117,7 +121,6 @@
    domain=ReplicationDomain.retrievesReplicationDomain(domainDN);
    attrList = taskEntry.getAttribute(typeSourceScope);
    String sourceString = TaskUtils.getSingleValueString(attrList);
    source = domain.decodeSource(sourceString);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -28,10 +28,13 @@
package org.opends.server.replication;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import org.opends.server.TestCaseUtils;
@@ -42,8 +45,10 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
@@ -329,4 +334,39 @@
      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
    }
  }
  @Test(enabled=true)
  public void protocolVersion() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting Replication ProtocolWindowTest : protocolVersion" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    // Test : Make a broker degrade its version when connecting to an old
    // replication server.
    ProtocolVersion.setCurrentVersion((short)2);
    ReplicationBroker broker = new ReplicationBroker(
        new ServerState(),
        baseDn,
        (short) 13, 0, 0, 0, 0, 1000, 0);
    // Check broker hard-coded version
    short pversion = broker.getProtocolVersion();
    assertEquals(pversion, 2);
    // Connect the broker to the replication server
    ProtocolVersion.setCurrentVersion((short)0);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + replServerPort);
    broker.start(servers);
    TestCaseUtils.sleep(100); // wait for connection established
    // Check broker negociated version
    pversion = broker.getProtocolVersion();
    assertEquals(pversion, 0);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -340,18 +340,23 @@
      DN.decode(synchroPluginStringDN)),
      "Unable to add the Multimaster replication plugin");
    // Add the replication server
    DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()),
    if (replServerEntry != null)
    {
      // Add the replication server
      DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()),
       "Unable to add the replication server");
    configEntryList.add(replServerEntry.getDN());
      configEntryList.add(replServerEntry.getDN());
    }
    // We also have a replicated suffix (replication domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
    configEntryList.add(synchroServerEntry.getDN());
    if (synchroServerEntry != null)
    {
      // We also have a replicated suffix (replication domain)
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
          "Unable to add the synchronized server");
      configEntryList.add(synchroServerEntry.getDN());
    }
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -486,7 +486,7 @@
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
        window, window, window, window, window, window, state);
        window, window, window, window, window, window, state, (short)1);
    ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -498,6 +498,7 @@
    assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
        newMsg.getServerState().getMaxChangeNumber((short)1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
  }
  @DataProvider(name="changelogStart")
@@ -518,7 +519,7 @@
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ReplServerStartMessage msg = new ReplServerStartMessage(serverId,
        url, baseDN, window, state);
        url, baseDN, window, state, (short)1);
    ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -526,6 +527,7 @@
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
        newMsg.getServerState().getMaxChangeNumber((short)1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
  }
  /**