mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
12.41.2007 1c8b422d63f419d8c85a28b1f2276ac0f3e3632c
Fix for 1895
Summary: Total update does not work with 3 servers that are also replication servers

Description: If the first replication server contacted is not connected to the target LDAP server, an error is raised even if the target LDAP server is connected to another replication server. The fix relies on a basic "routing table" (replication servers, and LDAP servers connected to each replication server). This table is also necessary for other fixes/features to come on the total update. This adds a new message in the replication protocol.

One unit tests exist with 2 Repl servers but does not fall in this issue. With this commit this test is made more complex with a 3rd Repl Server and one LDAP server or broker for each RS. The total update tests are still not enabled by default because they are still fragile on the tests execution order but they pass successfully when ran alone. Enabling these tests is an issue I will work on.

Some unit tests have been created to test the routing table elaboration and update.
1 files added
9 files modified
675 ■■■■ changed files
opends/src/server/org/opends/server/messages/ReplicationMessages.java 35 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java 142 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationCache.java 136 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 82 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 6 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 233 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 3 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 30 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -450,6 +450,27 @@
    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63;
  /**
   * An error happened to send a ReplServerInfoMessage to another
   * replication server.
   */
  public static final int MSGID_CHANGELOG_ERROR_SENDING_INFO =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 64;
  /**
   * An error happened to send an ErrorMessage to another
   * replication server.
   */
  public static final int MSGID_CHANGELOG_ERROR_SENDING_ERROR =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 65;
  /**
   * An error happened to send a Message (probably a RoutableMessage)
   * to another replication server.
   */
  public static final int MSGID_CHANGELOG_ERROR_SENDING_MSG =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 66;
  /**
   * Register the messages from this class in the core server.
   *
   */
@@ -530,8 +551,8 @@
        "An unexpected error happened handling connection with %s." +
        "This connection is going to be closed. ");
    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ACK,
        "An unexpected error happened sending an ack to %s." +
        "This connection is going to be closed. ");
        "An unexpected error occurred  while sending an ack to %s." +
        "This connection is going to be closed and reopened. ");
    registerMessage(
        MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE,
        "An Exception was caught while receiving replication message : %s");
@@ -617,5 +638,15 @@
    registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG,
        "The connection to Replication Server %s has been dropped by the "
        + "Replication Server");
    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_INFO,
        "An unexpected error occurred  while sending a Server " +
        " Info message to %s. " +
        "This connection is going to be closed and reopened");
    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ERROR,
        "An unexpected error occurred  while sending an Error Message to %s. "+
        "This connection is going to be closed and reopened");
    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_MSG,
        "An unexpected error occurred  while sending a Message to %s. "+
        "This connection is going to be closed and reopened");
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
New file
@@ -0,0 +1,142 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.DataFormatException;
/**
 *
 * This class defines a message that is sent by a replication server
 * to the other replication servers in the topology containing the list
 * of LDAP servers directly connected to it.
 * A replication server sends a ReplServerInfoMessage when an LDAP
 * server connects or disconnects.
 *
 * Exchanging these messages allows to have each replication server
 * knowing the complete list of LDAP servers in the topology and
 * their associated replication server and thus take the appropriate
 * decision to route a message to an LDAP server.
 *
 */
public class ReplServerInfoMessage extends ReplicationMessage
{
  private List<String> connectedServers = null;
  /**
   * Creates a new changelogInfo message from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public ReplServerInfoMessage(byte[] in) throws DataFormatException
  {
    try
    {
      /* first byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO)
        throw new DataFormatException(
        "Input is not a valid changelogInfo Message.");
      connectedServers = new ArrayList<String>();
      int pos = 1;
      while (pos < in.length)
      {
        /*
         * Read the next server ID
         * first calculate the length then construct the string
         */
        int length = getNextLength(in, pos);
        connectedServers.add(new String(in, pos, length, "UTF-8"));
        pos += length +1;
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Creates a new changelogInfo message from a list of the currently
   * connected servers.
   *
   * @param connectedServers The list of currently connected servers ID.
   */
  public ReplServerInfoMessage(List<String> connectedServers)
  {
    this.connectedServers = connectedServers;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try
    {
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      /* Put the message type */
      oStream.write(MSG_TYPE_REPL_SERVER_INFO);
      if (connectedServers.size() >= 1)
      {
        for (String server : connectedServers)
        {
          byte[] byteServerURL = server.getBytes("UTF-8");
          oStream.write(byteServerURL);
          oStream.write(0);
        }
      }
      return oStream.toByteArray();
    }
    catch (IOException e)
    {
      // never happens
      return null;
    }
  }
  /**
   * Get the list of servers currently connected to the Changelog server
   * that generated this message.
   *
   * @return A collection of the servers currently connected to the Changelog
   *         server that generated this message.
   */
  public List<String> getConnectedServers()
  {
    return connectedServers;
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -53,6 +53,7 @@
  static final byte MSG_TYPE_DONE = 13;
  static final byte MSG_TYPE_ERROR = 14;
  static final byte MSG_TYPE_WINDOW_PROBE = 15;
  static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -73,6 +74,8 @@
   * MSG_TYPE_ENTRY
   * MSG_TYPE_DONE
   * MSG_TYPE_ERROR
   * MSG_TYPE_WINDOW_PROBE
   * MSG_TYPE_REPL_SERVER_INFO
   *
   * @return the byte[] representation of this message.
   * @throws UnsupportedEncodingException  When the encoding of the message
@@ -140,6 +143,9 @@
      case MSG_TYPE_WINDOW_PROBE:
        msg = new WindowProbe(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_INFO:
        msg = new ReplServerInfoMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -43,9 +43,9 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -95,6 +95,7 @@
   * We add new TreeSet in the HashMap when a new replication server register
   * to this replication server.
   */
  private Map<Short, ServerHandler> replicationServers =
    new ConcurrentHashMap<Short, ServerHandler>();
@@ -253,6 +254,11 @@
        return false;
      }
      connectedServers.put(handler.getServerId(), handler);
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
      return true;
    }
  }
@@ -269,7 +275,13 @@
    if (handler.isReplicationServer())
      replicationServers.remove(handler.getServerId());
    else
    {
      connectedServers.remove(handler.getServerId());
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
    }
  }
  /**
@@ -312,6 +324,12 @@
        return false;
      }
      replicationServers.put(handler.getServerId(), handler);
      // Update this server with the list of LDAP servers
      // already connected
      handler.sendInfo(
          new ReplServerInfoMessage(getConnectedLDAPservers()));
      return true;
    }
  }
@@ -376,6 +394,22 @@
    return sourceDbHandlers.keySet();
  }
  /**
   * Returns as a set of String the list of LDAP servers connected to us.
   * Each string is the serverID of a connected LDAP server.
   *
   * @return The set of connected LDAP servers
   */
  public List<String> getConnectedLDAPservers()
  {
    List<String> mySet = new ArrayList<String>(0);
    for (ServerHandler handler : connectedServers.values())
    {
      mySet.add(String.valueOf(handler.getServerId()));
    }
    return mySet;
  }
  /**
   * Creates and returns an iterator.
@@ -473,15 +507,9 @@
  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
      ServerHandler senderHandler)
  {
    List<ServerHandler> servers =
      new ArrayList<ServerHandler>();
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "getDestinationServers"
        + " msgDest:" + msg.getDestination() , 1);
    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
    {
      // TODO Import from the "closest server" to be implemented
@@ -497,7 +525,7 @@
        }
      }
      // Send to all connected LDAP servers
      // Sends to all connected LDAP servers
      for (ServerHandler destinationHandler : connectedServers.values())
      {
        // Don't loop on the sender
@@ -518,14 +546,20 @@
      else
      {
        // the targeted server is NOT connected
        // Let's search for THE changelog server that MAY
        // have the targeted server connected.
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(replicationServers.values());
          for (ServerHandler h : replicationServers.values())
          {
            if (h.isRemoteLDAPServer(msg.getDestination()))
            {
              servers.add(h);
        }
      }
    }
      }
    }
    return servers;
  }
@@ -543,16 +577,9 @@
    if (servers.isEmpty())
    {
      if (!(msg instanceof InitializeRequestMessage))
      {
        // TODO A more elaborated policy is probably needed
      }
      else
      {
        ErrorMessage errMsg = new ErrorMessage(
            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
            "serverID:" + msg.getDestination());
        try
        {
          senderHandler.send(errMsg);
@@ -560,11 +587,21 @@
        catch(IOException ioe)
        {
          // TODO Handle error properly (sender timeout in addition)
        /*
         * An error happened trying the send back an ack to this server.
         * Log an error and close the connection to this server.
         */
        int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR;
        String message = getMessage(msgID, this.toString())
        + stackTraceToSingleLineString(ioe);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        senderHandler.shutdown();
        }
      }
      return;
    }
    else
    {
    for (ServerHandler targetHandler : servers)
    {
      try
@@ -573,9 +610,22 @@
      }
      catch(IOException ioe)
      {
          /*
           * An error happened trying the send back an ack to this server.
           * Log an error and close the connection to this server.
           */
          int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG;
          String message = getMessage(msgID, this.toString())
          + stackTraceToSingleLineString(ioe) + " "
          + msg.getClass().getCanonicalName();
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          senderHandler.shutdown();
        // TODO Handle error properly (sender timeout in addition)
      }
    }
    }
  }
@@ -722,4 +772,48 @@
      }
      return true;
    }
    /**
     * Send a ReplServerInfoMessage to all the connected replication servers
     * in order to let them know our connected LDAP servers.
     */
    private void sendReplServerInfo()
    {
      ReplServerInfoMessage info =
        new ReplServerInfoMessage(getConnectedLDAPservers());
      for (ServerHandler handler : replicationServers.values())
      {
        try
        {
          handler.sendInfo(info);
        }
        catch (IOException e)
        {
          /*
           * An error happened trying the send back an ack to this server.
           * Log an error and close the connection to this server.
           */
          int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_INFO;
          String message = getMessage(msgID, this.toString())
          + stackTraceToSingleLineString(e);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          handler.shutdown();
        }
      }
    }
    /**
     * Sets the replication server informations for the provided
     * handler from the provided ReplServerInfoMessage.
     *
     * @param handler The server handler from which the info was received.
     * @param infoMsg The information message that was received.
     */
    public void setReplServerInfo(
        ServerHandler handler, ReplServerInfoMessage infoMsg)
    {
      handler.setReplServerInfo(infoMsg);
    }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -357,7 +357,7 @@
   * @param baseDn The base Dn for which the ReplicationCache must be returned.
   * @return The ReplicationCache associated to the base DN given in parameter.
   */
  ReplicationCache getReplicationCache(DN baseDn)
  public ReplicationCache getReplicationCache(DN baseDn)
  {
    ReplicationCache replicationCache;
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -35,6 +35,7 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -61,6 +62,7 @@
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -129,6 +131,14 @@
  private short protocolVersion;
  /**
   * When this Handler is connected to a changelog server this collection
   * will contain the list of LDAP servers connected to the remote changelog
   * server.
   */
  private List<String> remoteLDAPservers = new ArrayList<String>();
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
@@ -1342,18 +1352,70 @@
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + replicationServerId + ") receives " + msg +
            " from " + serverId, 1);
      TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
                 msg + " from " + serverId);
    replicationCache.process(msg, this);
  }
  /**
   * Sends the provided ReplServerInfoMessage.
   *
   * @param info The ReplServerInfoMessage message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
   public void sendInfo(ReplServerInfoMessage info)
   throws IOException
   {
     session.publish(info);
   }
   /**
    *
    * Sets the replication server from the message provided.
    *
    * @param infoMsg The information message.
    */
   public void setReplServerInfo(ReplServerInfoMessage infoMsg)
   {
     remoteLDAPservers = infoMsg.getConnectedServers();
   }
   /**
    * When this handler is connected to a replication server, specifies if
    * a wanted server is connected to this replication server.
    *
    * @param wantedServer The server we want to know if it is connected
    * to the replication server represented by this handler.
    * @return boolean True is the wanted server is connected to the server
    * represented by this handler.
    */
   public boolean isRemoteLDAPServer(short wantedServer)
   {
     for (String server : remoteLDAPservers)
     {
       if (wantedServer == Short.valueOf(server))
       {
         return true;
       }
     }
     return false;
   }
   /**
    * When the handler is connected to a replication server, specifies the
    * replication server has remote LDAP servers connected to it.
    *
    * @return boolean True is the replication server has remote LDAP servers
    * connected to it.
    */
   public List<String> getRemoteLDAPServers()
   {
     return remoteLDAPservers;
   }
  /**
   * Send an InitializeRequestMessage to the server connected through this
   * handler.
   *
@@ -1365,12 +1427,6 @@
    if (debugEnabled())
      TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + replicationServerId + ") forwards " +
             msg + " to " + serverId, 1);
    session.publish(msg);
  }
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.loggers.debug.DebugTracer;
@@ -165,6 +166,11 @@
          WindowProbe windowProbeMsg = (WindowProbe) msg;
          handler.process(windowProbeMsg);
        }
        else if (msg instanceof ReplServerInfoMessage)
        {
          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
          handler.setReplServerInfo(infoMsg);
        }
        else if (msg == null)
        {
          /*
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -45,7 +45,11 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.opends.server.TestCaseUtils;
@@ -124,17 +128,21 @@
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  boolean externalDS = false;
  short server1ID = 1;
  short server2ID = 2;
  short server3ID = 3;
  short changelog1ID = 12;
  short changelog2ID = 13;
  int changelogPort = 8989;
  private static final short server1ID = 11;
  private static final short server2ID = 21;
  private static final short server3ID = 31;
  private static final short changelog1ID = 1;
  private static final short changelog2ID = 2;
  private static final short changelog3ID = 3;
  private static int[] replServerPort = new int[4];
  private DN baseDn;
  ReplicationBroker server2 = null;
  ReplicationBroker server3 = null;
  ReplicationServer changelog1 = null;
  ReplicationServer changelog2 = null;
  ReplicationServer changelog3 = null;
  boolean emptyOldChanges = true;
  ReplicationDomain sd = null;
@@ -221,16 +229,6 @@
        "ds-task-initialize-domain-dn: dc=example,dc=com",
        "ds-task-initialize-replica-server-id: all");
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
    + "ds-cfg-changelog-server-id: 1\n"
    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    replServerEntry = null;
  }
@@ -604,17 +602,20 @@
        "dn: dc=example,dc=com\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "dc: example\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
          "dn: ou=People,dc=example,dc=com\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "ou: People\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
          "dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Fiona Jensen\n"
        + "sn: Jensen\n"
        + "uid: fiona\n"
@@ -625,6 +626,7 @@
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
@@ -738,26 +740,38 @@
   */
  private ReplicationServer createChangelogServer(short changelogId)
  {
    SortedSet<String> servers = null;
    servers = new TreeSet<String>();
    try
    {
      if ((changelogId==changelog1ID)&&(changelog1!=null))
        return changelog1;
      if ((changelogId==changelog2ID)&&(changelog2!=null))
        return changelog2;
      if (changelogId==changelog1ID)
      {
        int chPort = getChangelogPort(changelogId);
        if (changelog1!=null)
          return changelog1;
      }
      else if (changelogId==changelog2ID)
      {
        if (changelog2!=null)
          return changelog2;
      }
      else if (changelogId==changelog3ID)
      {
        if (changelog3!=null)
          return changelog3;
      }
      servers.add("localhost:" + getChangelogPort(changelog1ID));
      servers.add("localhost:" + getChangelogPort(changelog2ID));
      servers.add("localhost:" + getChangelogPort(changelog3ID));
      int chPort = getChangelogPort(changelogId);
        ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
                                         null);
            servers);
        ReplicationServer replicationServer = new ReplicationServer(conf);
        Thread.sleep(1000);
        return replicationServer;
      }
    }
    catch (Exception e)
    {
      fail("createChangelog" + stackTraceToSingleLineString(e));
@@ -796,7 +810,6 @@
        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        entryList.add(synchroServerEntry.getDN());
        sd = ReplicationDomain.retrievesReplicationDomain(baseDn);
@@ -820,14 +833,29 @@
  private int getChangelogPort(short changelogID)
  {
    return (changelogPort+changelogID);
    if (replServerPort[changelogID] == 0)
    {
      try
      {
        // Find  a free port for the replicationServer
        ServerSocket socket = TestCaseUtils.bindFreePort();
        replServerPort[changelogID] = socket.getLocalPort();
        socket.close();
      }
      catch(Exception e)
      {
        fail("Cannot retrieve a free port for replication server."
          + e.getMessage());
      }
    }
    return replServerPort[changelogID];
  }
  /**
   * Tests the import side of the Initialize task
   */
  @Test(enabled=false)
  public void InitializeImport() throws Exception
  public void initializeImport() throws Exception
  {
    String testCase = "InitializeImport";
@@ -883,7 +911,7 @@
   * Tests the export side of the Initialize task
   */
  @Test(enabled=false)
  public void InitializeExport() throws Exception
  public void initializeExport() throws Exception
  {
    String testCase = "Replication/InitializeExport";
@@ -917,7 +945,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetExport() throws Exception
  public void initializeTargetExport() throws Exception
  {
    String testCase = "Replication/InitializeTargetExport";
@@ -957,7 +985,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetExportAll() throws Exception
  public void initializeTargetExportAll() throws Exception
  {
    String testCase = "Replication/InitializeTargetExportAll";
@@ -1001,7 +1029,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetImport() throws Exception
  public void initializeTargetImport() throws Exception
  {
    String testCase = "InitializeTargetImport";
@@ -1042,7 +1070,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetConfigErrors() throws Exception
  public void initializeTargetConfigErrors() throws Exception
  {
    String testCase = "InitializeTargetConfigErrors";
@@ -1096,7 +1124,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeConfigErrors() throws Exception
  public void initializeConfigErrors() throws Exception
  {
    String testCase = "InitializeConfigErrors";
@@ -1116,10 +1144,10 @@
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "objectclass: ds-task-initialize-from-remote-replica",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: foo",
          "ds-task-initialize-source: " + server2ID);
          "ds-task-initialize-replica-server-id: " + server2ID);
      addTask(taskInit, ResultCode.INVALID_DN_SYNTAX,
          TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN);
@@ -1129,10 +1157,10 @@
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "objectclass: ds-task-initialize-from-remote-replica",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: dc=foo",
          "ds-task-initialize-source: " + server2ID);
          "ds-task-initialize-replica-server-id: " + server2ID);
      addTask(taskInit, ResultCode.OTHER, MSGID_NO_MATCHING_DOMAIN);
      // Invalid Source
@@ -1141,10 +1169,10 @@
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "objectclass: ds-task-initialize-from-remote-replica",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: " + baseDn,
          "ds-task-initialize-source: -3");
          "ds-task-initialize-replica-server-id: -3");
      addTask(taskInit, ResultCode.OTHER,
          MSGID_INVALID_IMPORT_SOURCE);
@@ -1162,21 +1190,101 @@
  }
  @Test(enabled=false)
  public void InitializeTargetBroken() throws Exception
  public void initializeTargetBroken() throws Exception
  {
    String testCase = "InitializeTargetBroken";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeBroken() throws Exception
  public void initializeBroken() throws Exception
  {
    String testCase = "InitializeBroken";
    fail(testCase + " NYI");
  }
  /*
   * TestReplServerInfos tests that in a topology with more
   * than one replication server, in each replication server
   * is stored the list of LDAP servers connected to each
   * replication server of the topology, thanks to the
   * ReplServerInfoMessage(s) exchanged by the replication
   * servers.
   */
  @Test(enabled=false)
  public void InitializeTargetExportMultiSS() throws Exception
  public void testReplServerInfos() throws Exception
  {
    String testCase = "Replication/TestReplServerInfos";
    log("Starting " + testCase);
    // Create the Repl Servers
    changelog1 = createChangelogServer(changelog1ID);
    changelog2 = createChangelogServer(changelog2ID);
    changelog3 = createChangelogServer(changelog3ID);
    // Connects lDAP1 to replServer1
    connectServer1ToChangelog(changelog1ID);
    // Connects lDAP2 to replServer2
    ReplicationBroker broker2 =
      openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    // Connects lDAP3 to replServer2
    ReplicationBroker broker3 =
      openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    // Check that the list of connected LDAP servers is correct
    // in each replication servers
    List<String> l1 = changelog1.getReplicationCache(baseDn).
      getConnectedLDAPservers();
    assertEquals(l1.size(), 1);
    assertEquals(l1.get(0), String.valueOf(server1ID));
    List<String> l2;
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    assertEquals(l2.size(), 2);
    assertEquals(l2.get(0), String.valueOf(server3ID));
    assertEquals(l2.get(1), String.valueOf(server2ID));
    List<String> l3;
    l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers();
    assertEquals(l3.size(), 0);
    // Test updates
    broker3.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server2ID));
    broker3 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    broker2.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server3ID));
    // TODO Test ReplicationCache.getDestinationServers method.
    broker2.stop();
    broker3.stop();
    cleanEntries();
    changelog3.shutdown();
    changelog3 = null;
    changelog2.shutdown();
    changelog2 = null;
    changelog1.shutdown();
    changelog1 = null;
  }
  @Test(enabled=false)
  public void initializeTargetExportMultiSS() throws Exception
  {
    String testCase = "Replication/InitializeTargetExportMultiSS";
@@ -1222,17 +1330,20 @@
  }
  @Test(enabled=false)
  public void InitializeExportMultiSS() throws Exception
  public void initializeExportMultiSS() throws Exception
  {
    String testCase = "Replication/InitializeExportMultiSS";
    log("Starting "+testCase);
    // Create 2 changelogs
    changelog1 = createChangelogServer(changelog1ID);
    Thread.sleep(3000);
    Thread.sleep(1000);
    changelog2 = createChangelogServer(changelog2ID);
    Thread.sleep(3000);
    Thread.sleep(1000);
    changelog3 = createChangelogServer(changelog3ID);
    Thread.sleep(1000);
    // Connect DS to the replicationServer 1
    connectServer1ToChangelog(changelog1ID);
@@ -1240,7 +1351,7 @@
    // Put entries in DB
    addTestEntriesToDB();
    // Connect a broker acting as server 2 to changelog2
    // Connect a broker acting as server 2 to Repl Server 2
    if (server2 == null)
    {
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
@@ -1248,6 +1359,14 @@
        1000, emptyOldChanges);
    }
    // Connect a broker acting as server 3 to Repl Server 3
    if (server3 == null)
    {
      server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog3ID),
        1000, emptyOldChanges);
    }
    Thread.sleep(3000);
    // S2 sends init request
@@ -1267,7 +1386,7 @@
  }
  @Test(enabled=false)
  public void InitializeNoSource() throws Exception
  public void initializeNoSource() throws Exception
  {
    String testCase = "InitializeNoSource";
    log("Starting "+testCase);
@@ -1317,7 +1436,7 @@
  }
  @Test(enabled=false)
  public void InitializeTargetNoTarget() throws Exception
  public void initializeTargetNoTarget() throws Exception
  {
    String testCase = "InitializeTargetNoTarget"  + baseDn;
    log("Starting "+testCase);
@@ -1336,10 +1455,10 @@
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-target",
        "objectclass: ds-task-initialize-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-target-domain-dn: "+baseDn,
        "ds-task-initialize-target-scope: " + 10);
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-replica-server-id: " + 0);
    addTask(taskInit, ResultCode.SUCCESS, 0);
@@ -1355,32 +1474,32 @@
  }
  @Test(enabled=false)
  public void InitializeStopped() throws Exception
  public void initializeStopped() throws Exception
  {
    String testCase = "InitializeStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeTargetStopped() throws Exception
  public void initializeTargetStopped() throws Exception
  {
    String testCase = "InitializeTargetStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeCompressed() throws Exception
  public void initializeCompressed() throws Exception
  {
    String testCase = "InitializeStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeTargetEncrypted() throws Exception
  public void initializeTargetEncrypted() throws Exception
  {
    String testCase = "InitializeTargetCompressed";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeSimultaneous() throws Exception
  public void initializeSimultaneous() throws Exception
  {
    String testCase = "InitializeSimultaneous";
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -173,7 +173,8 @@
      {
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "ReplicationTestCase/openChangelogSession" + e.getMessage(), 1);
            "ReplicationTestCase/openChangelogSession " + e.getMessage()
            + " when emptying old changes", 1);
      }
    }
    return broker;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -483,7 +483,7 @@
   * by checking that : msg == new ServerStartMessage(msg.getBytes()).
   */
  @Test(dataProvider="serverStart")
  public void ServerStartMessageTest(short serverId, DN baseDN, int window,
  public void serverStartMessageTest(short serverId, DN baseDN, int window,
         ServerState state) throws Exception
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
@@ -516,7 +516,7 @@
   * by checking that : msg == new ReplServerStartMessage(msg.getBytes()).
   */
  @Test(dataProvider="changelogStart")
  public void ChangelogStartMessageTest(short serverId, DN baseDN, int window,
  public void replserverStartMessageTest(short serverId, DN baseDN, int window,
         String url, ServerState state) throws Exception
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
@@ -537,7 +537,7 @@
   * by checking that : msg == new WindowMessage(msg.getBytes()).
   */
  @Test()
  public void WindowMessageTest() throws Exception
  public void windowMessageTest() throws Exception
  {
    WindowMessage msg = new WindowMessage(123);
    WindowMessage newMsg = new WindowMessage(msg.getBytes());
@@ -557,11 +557,25 @@
  }
  /**
   * Test ReplServerInfoMessage encoding and decoding.
   */
  @Test()
  public void replServerInfoMessageTest() throws Exception
  {
    List<String> connectedServers = new ArrayList<String>(0);
    connectedServers.add("s1");
    connectedServers.add("s2");
    ReplServerInfoMessage msg = new ReplServerInfoMessage(connectedServers);
    ReplServerInfoMessage newMsg = new ReplServerInfoMessage(msg.getBytes());
    assertEquals(msg.getConnectedServers(), newMsg.getConnectedServers());
  }
  /**
   * Test that EntryMessage encoding and decoding works
   * by checking that : msg == new EntryMessageTest(msg.getBytes()).
   */
  @Test()
  public void EntryMessageTest() throws Exception
  public void entryMessageTest() throws Exception
  {
    String taskInitFromS2 = new String(
        "dn: ds-task-id=" + UUID.randomUUID() +
@@ -586,7 +600,7 @@
   * Test that InitializeRequestMessage encoding and decoding works
   */
  @Test()
  public void InitializeRequestMessageTest() throws Exception
  public void initializeRequestMessageTest() throws Exception
  {
    short sender = 1;
    short target = 2;
@@ -602,7 +616,7 @@
   * Test that InitializeTargetMessage encoding and decoding works
   */
  @Test()
  public void InitializeTargetMessageTest() throws Exception
  public void initializeTargetMessageTest() throws Exception
  {
    short senderID = 1;
    short targetID = 2;
@@ -631,7 +645,7 @@
   * Test that DoneMessage encoding and decoding works
   */
  @Test()
  public void DoneMessage() throws Exception
  public void doneMessageTest() throws Exception
  {
    DoneMessage msg = new DoneMessage((short)1, (short)2);
    DoneMessage newMsg = new DoneMessage(msg.getBytes());
@@ -643,7 +657,7 @@
   * Test that ErrorMessage encoding and decoding works
   */
  @Test()
  public void ErrorMessage() throws Exception
  public void errorMessageTest() throws Exception
  {
    ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
    ErrorMessage newMsg = new ErrorMessage(msg.getBytes());