mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
12.41.2007 1c8b422d63f419d8c85a28b1f2276ac0f3e3632c
opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -450,6 +450,27 @@
    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63;
  /**
   * An error happened to send a ReplServerInfoMessage to another
   * replication server.
   */
  public static final int MSGID_CHANGELOG_ERROR_SENDING_INFO =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 64;
  /**
   * An error happened to send an ErrorMessage to another
   * replication server.
   */
  public static final int MSGID_CHANGELOG_ERROR_SENDING_ERROR =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 65;
  /**
   * An error happened to send a Message (probably a RoutableMessage)
   * to another replication server.
   */
  public static final int MSGID_CHANGELOG_ERROR_SENDING_MSG =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 66;
  /**
   * Register the messages from this class in the core server.
   *
   */
@@ -530,8 +551,8 @@
        "An unexpected error happened handling connection with %s." +
        "This connection is going to be closed. ");
    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ACK,
        "An unexpected error happened sending an ack to %s." +
        "This connection is going to be closed. ");
        "An unexpected error occurred  while sending an ack to %s." +
        "This connection is going to be closed and reopened. ");
    registerMessage(
        MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE,
        "An Exception was caught while receiving replication message : %s");
@@ -617,5 +638,15 @@
    registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG,
        "The connection to Replication Server %s has been dropped by the "
        + "Replication Server");
    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_INFO,
        "An unexpected error occurred  while sending a Server " +
        " Info message to %s. " +
        "This connection is going to be closed and reopened");
    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ERROR,
        "An unexpected error occurred  while sending an Error Message to %s. "+
        "This connection is going to be closed and reopened");
    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_MSG,
        "An unexpected error occurred  while sending a Message to %s. "+
        "This connection is going to be closed and reopened");
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
New file
@@ -0,0 +1,142 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.DataFormatException;
/**
 *
 * This class defines a message that is sent by a replication server
 * to the other replication servers in the topology containing the list
 * of LDAP servers directly connected to it.
 * A replication server sends a ReplServerInfoMessage when an LDAP
 * server connects or disconnects.
 *
 * Exchanging these messages allows to have each replication server
 * knowing the complete list of LDAP servers in the topology and
 * their associated replication server and thus take the appropriate
 * decision to route a message to an LDAP server.
 *
 */
public class ReplServerInfoMessage extends ReplicationMessage
{
  private List<String> connectedServers = null;
  /**
   * Creates a new changelogInfo message from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public ReplServerInfoMessage(byte[] in) throws DataFormatException
  {
    try
    {
      /* first byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO)
        throw new DataFormatException(
        "Input is not a valid changelogInfo Message.");
      connectedServers = new ArrayList<String>();
      int pos = 1;
      while (pos < in.length)
      {
        /*
         * Read the next server ID
         * first calculate the length then construct the string
         */
        int length = getNextLength(in, pos);
        connectedServers.add(new String(in, pos, length, "UTF-8"));
        pos += length +1;
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Creates a new changelogInfo message from a list of the currently
   * connected servers.
   *
   * @param connectedServers The list of currently connected servers ID.
   */
  public ReplServerInfoMessage(List<String> connectedServers)
  {
    this.connectedServers = connectedServers;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try
    {
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      /* Put the message type */
      oStream.write(MSG_TYPE_REPL_SERVER_INFO);
      if (connectedServers.size() >= 1)
      {
        for (String server : connectedServers)
        {
          byte[] byteServerURL = server.getBytes("UTF-8");
          oStream.write(byteServerURL);
          oStream.write(0);
        }
      }
      return oStream.toByteArray();
    }
    catch (IOException e)
    {
      // never happens
      return null;
    }
  }
  /**
   * Get the list of servers currently connected to the Changelog server
   * that generated this message.
   *
   * @return A collection of the servers currently connected to the Changelog
   *         server that generated this message.
   */
  public List<String> getConnectedServers()
  {
    return connectedServers;
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -53,6 +53,7 @@
  static final byte MSG_TYPE_DONE = 13;
  static final byte MSG_TYPE_ERROR = 14;
  static final byte MSG_TYPE_WINDOW_PROBE = 15;
  static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -73,6 +74,8 @@
   * MSG_TYPE_ENTRY
   * MSG_TYPE_DONE
   * MSG_TYPE_ERROR
   * MSG_TYPE_WINDOW_PROBE
   * MSG_TYPE_REPL_SERVER_INFO
   *
   * @return the byte[] representation of this message.
   * @throws UnsupportedEncodingException  When the encoding of the message
@@ -140,6 +143,9 @@
      case MSG_TYPE_WINDOW_PROBE:
        msg = new WindowProbe(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_INFO:
        msg = new ReplServerInfoMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -43,9 +43,9 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -95,6 +95,7 @@
   * We add new TreeSet in the HashMap when a new replication server register
   * to this replication server.
   */
  private Map<Short, ServerHandler> replicationServers =
    new ConcurrentHashMap<Short, ServerHandler>();
@@ -253,6 +254,11 @@
        return false;
      }
      connectedServers.put(handler.getServerId(), handler);
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
      return true;
    }
  }
@@ -269,7 +275,13 @@
    if (handler.isReplicationServer())
      replicationServers.remove(handler.getServerId());
    else
    {
      connectedServers.remove(handler.getServerId());
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
    }
  }
  /**
@@ -312,6 +324,12 @@
        return false;
      }
      replicationServers.put(handler.getServerId(), handler);
      // Update this server with the list of LDAP servers
      // already connected
      handler.sendInfo(
          new ReplServerInfoMessage(getConnectedLDAPservers()));
      return true;
    }
  }
@@ -376,6 +394,22 @@
    return sourceDbHandlers.keySet();
  }
  /**
   * Returns as a set of String the list of LDAP servers connected to us.
   * Each string is the serverID of a connected LDAP server.
   *
   * @return The set of connected LDAP servers
   */
  public List<String> getConnectedLDAPservers()
  {
    List<String> mySet = new ArrayList<String>(0);
    for (ServerHandler handler : connectedServers.values())
    {
      mySet.add(String.valueOf(handler.getServerId()));
    }
    return mySet;
  }
  /**
   * Creates and returns an iterator.
@@ -473,15 +507,9 @@
  protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
      ServerHandler senderHandler)
  {
    List<ServerHandler> servers =
      new ArrayList<ServerHandler>();
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "getDestinationServers"
        + " msgDest:" + msg.getDestination() , 1);
    if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
    {
      // TODO Import from the "closest server" to be implemented
@@ -497,7 +525,7 @@
        }
      }
      // Send to all connected LDAP servers
      // Sends to all connected LDAP servers
      for (ServerHandler destinationHandler : connectedServers.values())
      {
        // Don't loop on the sender
@@ -518,14 +546,20 @@
      else
      {
        // the targeted server is NOT connected
        // Let's search for THE changelog server that MAY
        // have the targeted server connected.
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(replicationServers.values());
          for (ServerHandler h : replicationServers.values())
          {
            if (h.isRemoteLDAPServer(msg.getDestination()))
            {
              servers.add(h);
            }
          }
        }
      }
    }
    return servers;
  }
@@ -543,37 +577,53 @@
    if (servers.isEmpty())
    {
      if (!(msg instanceof InitializeRequestMessage))
      {
        // TODO A more elaborated policy is probably needed
      }
      else
      {
        ErrorMessage errMsg = new ErrorMessage(
            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
            "serverID:" + msg.getDestination());
        try
        {
          senderHandler.send(errMsg);
        }
        catch(IOException ioe)
        {
          // TODO Handle error properly (sender timeout in addition)
        }
      }
      return;
    }
    for (ServerHandler targetHandler : servers)
    {
      ErrorMessage errMsg = new ErrorMessage(
          msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
          "serverID:" + msg.getDestination());
      try
      {
        targetHandler.send(msg);
        senderHandler.send(errMsg);
      }
      catch(IOException ioe)
      {
        // TODO Handle error properly (sender timeout in addition)
        /*
         * An error happened trying the send back an ack to this server.
         * Log an error and close the connection to this server.
         */
        int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR;
        String message = getMessage(msgID, this.toString())
        + stackTraceToSingleLineString(ioe);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
        senderHandler.shutdown();
      }
    }
    else
    {
      for (ServerHandler targetHandler : servers)
      {
        try
        {
          targetHandler.send(msg);
        }
        catch(IOException ioe)
        {
          /*
           * An error happened trying the send back an ack to this server.
           * Log an error and close the connection to this server.
           */
          int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG;
          String message = getMessage(msgID, this.toString())
          + stackTraceToSingleLineString(ioe) + " "
          + msg.getClass().getCanonicalName();
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          senderHandler.shutdown();
          // TODO Handle error properly (sender timeout in addition)
        }
      }
    }
@@ -722,4 +772,48 @@
      }
      return true;
    }
    /**
     * Send a ReplServerInfoMessage to all the connected replication servers
     * in order to let them know our connected LDAP servers.
     */
    private void sendReplServerInfo()
    {
      ReplServerInfoMessage info =
        new ReplServerInfoMessage(getConnectedLDAPservers());
      for (ServerHandler handler : replicationServers.values())
      {
        try
        {
          handler.sendInfo(info);
        }
        catch (IOException e)
        {
          /*
           * An error happened trying the send back an ack to this server.
           * Log an error and close the connection to this server.
           */
          int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_INFO;
          String message = getMessage(msgID, this.toString())
          + stackTraceToSingleLineString(e);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          handler.shutdown();
        }
      }
    }
    /**
     * Sets the replication server informations for the provided
     * handler from the provided ReplServerInfoMessage.
     *
     * @param handler The server handler from which the info was received.
     * @param infoMsg The information message that was received.
     */
    public void setReplServerInfo(
        ServerHandler handler, ReplServerInfoMessage infoMsg)
    {
      handler.setReplServerInfo(infoMsg);
    }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -357,7 +357,7 @@
   * @param baseDn The base Dn for which the ReplicationCache must be returned.
   * @return The ReplicationCache associated to the base DN given in parameter.
   */
  ReplicationCache getReplicationCache(DN baseDn)
  public ReplicationCache getReplicationCache(DN baseDn)
  {
    ReplicationCache replicationCache;
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -35,6 +35,7 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -61,6 +62,7 @@
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -129,6 +131,14 @@
  private short protocolVersion;
  /**
   * When this Handler is connected to a changelog server this collection
   * will contain the list of LDAP servers connected to the remote changelog
   * server.
   */
  private List<String> remoteLDAPservers = new ArrayList<String>();
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
@@ -1342,18 +1352,70 @@
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + replicationServerId + ") receives " + msg +
            " from " + serverId, 1);
      TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
                 msg + " from " + serverId);
    replicationCache.process(msg, this);
  }
  /**
   * Sends the provided ReplServerInfoMessage.
   *
   * @param info The ReplServerInfoMessage message to be sent.
   * @throws IOException When it occurs while sending the message,
   *
   */
   public void sendInfo(ReplServerInfoMessage info)
   throws IOException
   {
     session.publish(info);
   }
   /**
    *
    * Sets the replication server from the message provided.
    *
    * @param infoMsg The information message.
    */
   public void setReplServerInfo(ReplServerInfoMessage infoMsg)
   {
     remoteLDAPservers = infoMsg.getConnectedServers();
   }
   /**
    * When this handler is connected to a replication server, specifies if
    * a wanted server is connected to this replication server.
    *
    * @param wantedServer The server we want to know if it is connected
    * to the replication server represented by this handler.
    * @return boolean True is the wanted server is connected to the server
    * represented by this handler.
    */
   public boolean isRemoteLDAPServer(short wantedServer)
   {
     for (String server : remoteLDAPservers)
     {
       if (wantedServer == Short.valueOf(server))
       {
         return true;
       }
     }
     return false;
   }
   /**
    * When the handler is connected to a replication server, specifies the
    * replication server has remote LDAP servers connected to it.
    *
    * @return boolean True is the replication server has remote LDAP servers
    * connected to it.
    */
   public List<String> getRemoteLDAPServers()
   {
     return remoteLDAPservers;
   }
  /**
   * Send an InitializeRequestMessage to the server connected through this
   * handler.
   *
@@ -1365,12 +1427,6 @@
    if (debugEnabled())
      TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + replicationServerId + ") forwards " +
             msg + " to " + serverId, 1);
    session.publish(msg);
  }
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.loggers.debug.DebugTracer;
@@ -165,6 +166,11 @@
          WindowProbe windowProbeMsg = (WindowProbe) msg;
          handler.process(windowProbeMsg);
        }
        else if (msg instanceof ReplServerInfoMessage)
        {
          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
          handler.setReplServerInfo(infoMsg);
        }
        else if (msg == null)
        {
          /*
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -45,7 +45,11 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.opends.server.TestCaseUtils;
@@ -103,7 +107,7 @@
 */
public class InitOnLineTest extends ReplicationTestCase
{
 {
  /**
   * The tracer object for the debug logger
   */
@@ -124,17 +128,21 @@
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  boolean externalDS = false;
  short server1ID = 1;
  short server2ID = 2;
  short server3ID = 3;
  short changelog1ID = 12;
  short changelog2ID = 13;
  int changelogPort = 8989;
  private static final short server1ID = 11;
  private static final short server2ID = 21;
  private static final short server3ID = 31;
  private static final short changelog1ID = 1;
  private static final short changelog2ID = 2;
  private static final short changelog3ID = 3;
  private static int[] replServerPort = new int[4];
  private DN baseDn;
  ReplicationBroker server2 = null;
  ReplicationBroker server3 = null;
  ReplicationServer changelog1 = null;
  ReplicationServer changelog2 = null;
  ReplicationServer changelog3 = null;
  boolean emptyOldChanges = true;
  ReplicationDomain sd = null;
@@ -221,16 +229,6 @@
        "ds-task-initialize-domain-dn: dc=example,dc=com",
        "ds-task-initialize-replica-server-id: all");
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
    + "ds-cfg-changelog-server-id: 1\n"
    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    replServerEntry = null;
  }
@@ -604,17 +602,20 @@
        "dn: dc=example,dc=com\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "dc: example\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
          "dn: ou=People,dc=example,dc=com\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "ou: People\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
          "dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Fiona Jensen\n"
        + "sn: Jensen\n"
        + "uid: fiona\n"
@@ -625,6 +626,7 @@
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
@@ -738,25 +740,37 @@
   */
  private ReplicationServer createChangelogServer(short changelogId)
  {
    SortedSet<String> servers = null;
    servers = new TreeSet<String>();
    try
    {
      if ((changelogId==changelog1ID)&&(changelog1!=null))
        return changelog1;
      if ((changelogId==changelog2ID)&&(changelog2!=null))
        return changelog2;
      if (changelogId==changelog1ID)
      {
        int chPort = getChangelogPort(changelogId);
        ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
                                         null);
        ReplicationServer replicationServer = new ReplicationServer(conf);
        Thread.sleep(1000);
        return replicationServer;
        if (changelog1!=null)
          return changelog1;
      }
      else if (changelogId==changelog2ID)
      {
        if (changelog2!=null)
          return changelog2;
      }
      else if (changelogId==changelog3ID)
      {
        if (changelog3!=null)
          return changelog3;
      }
      servers.add("localhost:" + getChangelogPort(changelog1ID));
      servers.add("localhost:" + getChangelogPort(changelog2ID));
      servers.add("localhost:" + getChangelogPort(changelog3ID));
      int chPort = getChangelogPort(changelogId);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      Thread.sleep(1000);
      return replicationServer;
    }
    catch (Exception e)
    {
@@ -796,7 +810,6 @@
        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        entryList.add(synchroServerEntry.getDN());
        sd = ReplicationDomain.retrievesReplicationDomain(baseDn);
@@ -820,14 +833,29 @@
  private int getChangelogPort(short changelogID)
  {
    return (changelogPort+changelogID);
    if (replServerPort[changelogID] == 0)
    {
      try
      {
        // Find  a free port for the replicationServer
        ServerSocket socket = TestCaseUtils.bindFreePort();
        replServerPort[changelogID] = socket.getLocalPort();
        socket.close();
      }
      catch(Exception e)
      {
        fail("Cannot retrieve a free port for replication server."
          + e.getMessage());
      }
    }
    return replServerPort[changelogID];
  }
  /**
   * Tests the import side of the Initialize task
   */
  @Test(enabled=false)
  public void InitializeImport() throws Exception
  public void initializeImport() throws Exception
  {
    String testCase = "InitializeImport";
@@ -883,7 +911,7 @@
   * Tests the export side of the Initialize task
   */
  @Test(enabled=false)
  public void InitializeExport() throws Exception
  public void initializeExport() throws Exception
  {
    String testCase = "Replication/InitializeExport";
@@ -917,7 +945,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetExport() throws Exception
  public void initializeTargetExport() throws Exception
  {
    String testCase = "Replication/InitializeTargetExport";
@@ -957,7 +985,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetExportAll() throws Exception
  public void initializeTargetExportAll() throws Exception
  {
    String testCase = "Replication/InitializeTargetExportAll";
@@ -1001,7 +1029,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetImport() throws Exception
  public void initializeTargetImport() throws Exception
  {
    String testCase = "InitializeTargetImport";
@@ -1042,7 +1070,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeTargetConfigErrors() throws Exception
  public void initializeTargetConfigErrors() throws Exception
  {
    String testCase = "InitializeTargetConfigErrors";
@@ -1096,7 +1124,7 @@
   * Tests the import side of the InitializeTarget task
   */
  @Test(enabled=false)
  public void InitializeConfigErrors() throws Exception
  public void initializeConfigErrors() throws Exception
  {
    String testCase = "InitializeConfigErrors";
@@ -1116,10 +1144,10 @@
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "objectclass: ds-task-initialize-from-remote-replica",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: foo",
          "ds-task-initialize-source: " + server2ID);
          "ds-task-initialize-replica-server-id: " + server2ID);
      addTask(taskInit, ResultCode.INVALID_DN_SYNTAX,
          TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN);
@@ -1129,10 +1157,10 @@
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "objectclass: ds-task-initialize-from-remote-replica",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: dc=foo",
          "ds-task-initialize-source: " + server2ID);
          "ds-task-initialize-replica-server-id: " + server2ID);
      addTask(taskInit, ResultCode.OTHER, MSGID_NO_MATCHING_DOMAIN);
      // Invalid Source
@@ -1141,10 +1169,10 @@
          ",cn=Scheduled Tasks,cn=Tasks",
          "objectclass: top",
          "objectclass: ds-task",
          "objectclass: ds-task-initialize",
          "objectclass: ds-task-initialize-from-remote-replica",
          "ds-task-class-name: org.opends.server.tasks.InitializeTask",
          "ds-task-initialize-domain-dn: " + baseDn,
          "ds-task-initialize-source: -3");
          "ds-task-initialize-replica-server-id: -3");
      addTask(taskInit, ResultCode.OTHER,
          MSGID_INVALID_IMPORT_SOURCE);
@@ -1162,21 +1190,101 @@
  }
  @Test(enabled=false)
  public void InitializeTargetBroken() throws Exception
  public void initializeTargetBroken() throws Exception
  {
    String testCase = "InitializeTargetBroken";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeBroken() throws Exception
  public void initializeBroken() throws Exception
  {
    String testCase = "InitializeBroken";
    fail(testCase + " NYI");
  }
  /*
   * TestReplServerInfos tests that in a topology with more
   * than one replication server, in each replication server
   * is stored the list of LDAP servers connected to each
   * replication server of the topology, thanks to the
   * ReplServerInfoMessage(s) exchanged by the replication
   * servers.
   */
  @Test(enabled=false)
  public void InitializeTargetExportMultiSS() throws Exception
  public void testReplServerInfos() throws Exception
  {
    String testCase = "Replication/TestReplServerInfos";
    log("Starting " + testCase);
    // Create the Repl Servers
    changelog1 = createChangelogServer(changelog1ID);
    changelog2 = createChangelogServer(changelog2ID);
    changelog3 = createChangelogServer(changelog3ID);
    // Connects lDAP1 to replServer1
    connectServer1ToChangelog(changelog1ID);
    // Connects lDAP2 to replServer2
    ReplicationBroker broker2 =
      openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    // Connects lDAP3 to replServer2
    ReplicationBroker broker3 =
      openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    // Check that the list of connected LDAP servers is correct
    // in each replication servers
    List<String> l1 = changelog1.getReplicationCache(baseDn).
      getConnectedLDAPservers();
    assertEquals(l1.size(), 1);
    assertEquals(l1.get(0), String.valueOf(server1ID));
    List<String> l2;
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    assertEquals(l2.size(), 2);
    assertEquals(l2.get(0), String.valueOf(server3ID));
    assertEquals(l2.get(1), String.valueOf(server2ID));
    List<String> l3;
    l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers();
    assertEquals(l3.size(), 0);
    // Test updates
    broker3.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server2ID));
    broker3 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    broker2.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server3ID));
    // TODO Test ReplicationCache.getDestinationServers method.
    broker2.stop();
    broker3.stop();
    cleanEntries();
    changelog3.shutdown();
    changelog3 = null;
    changelog2.shutdown();
    changelog2 = null;
    changelog1.shutdown();
    changelog1 = null;
  }
  @Test(enabled=false)
  public void initializeTargetExportMultiSS() throws Exception
  {
    String testCase = "Replication/InitializeTargetExportMultiSS";
@@ -1222,17 +1330,20 @@
  }
  @Test(enabled=false)
  public void InitializeExportMultiSS() throws Exception
  public void initializeExportMultiSS() throws Exception
  {
    String testCase = "Replication/InitializeExportMultiSS";
    log("Starting "+testCase);
    // Create 2 changelogs
    changelog1 = createChangelogServer(changelog1ID);
    Thread.sleep(3000);
    Thread.sleep(1000);
    changelog2 = createChangelogServer(changelog2ID);
    Thread.sleep(3000);
    Thread.sleep(1000);
    changelog3 = createChangelogServer(changelog3ID);
    Thread.sleep(1000);
    // Connect DS to the replicationServer 1
    connectServer1ToChangelog(changelog1ID);
@@ -1240,7 +1351,7 @@
    // Put entries in DB
    addTestEntriesToDB();
    // Connect a broker acting as server 2 to changelog2
    // Connect a broker acting as server 2 to Repl Server 2
    if (server2 == null)
    {
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
@@ -1248,6 +1359,14 @@
        1000, emptyOldChanges);
    }
    // Connect a broker acting as server 3 to Repl Server 3
    if (server3 == null)
    {
      server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog3ID),
        1000, emptyOldChanges);
    }
    Thread.sleep(3000);
    // S2 sends init request
@@ -1267,7 +1386,7 @@
  }
  @Test(enabled=false)
  public void InitializeNoSource() throws Exception
  public void initializeNoSource() throws Exception
  {
    String testCase = "InitializeNoSource";
    log("Starting "+testCase);
@@ -1317,7 +1436,7 @@
  }
  @Test(enabled=false)
  public void InitializeTargetNoTarget() throws Exception
  public void initializeTargetNoTarget() throws Exception
  {
    String testCase = "InitializeTargetNoTarget"  + baseDn;
    log("Starting "+testCase);
@@ -1336,10 +1455,10 @@
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-target",
        "objectclass: ds-task-initialize-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-target-domain-dn: "+baseDn,
        "ds-task-initialize-target-scope: " + 10);
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-replica-server-id: " + 0);
    addTask(taskInit, ResultCode.SUCCESS, 0);
@@ -1355,32 +1474,32 @@
  }
  @Test(enabled=false)
  public void InitializeStopped() throws Exception
  public void initializeStopped() throws Exception
  {
    String testCase = "InitializeStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeTargetStopped() throws Exception
  public void initializeTargetStopped() throws Exception
  {
    String testCase = "InitializeTargetStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeCompressed() throws Exception
  public void initializeCompressed() throws Exception
  {
    String testCase = "InitializeStopped";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeTargetEncrypted() throws Exception
  public void initializeTargetEncrypted() throws Exception
  {
    String testCase = "InitializeTargetCompressed";
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  public void InitializeSimultaneous() throws Exception
  public void initializeSimultaneous() throws Exception
  {
    String testCase = "InitializeSimultaneous";
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -173,7 +173,8 @@
      {
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "ReplicationTestCase/openChangelogSession" + e.getMessage(), 1);
            "ReplicationTestCase/openChangelogSession " + e.getMessage()
            + " when emptying old changes", 1);
      }
    }
    return broker;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -483,7 +483,7 @@
   * by checking that : msg == new ServerStartMessage(msg.getBytes()).
   */
  @Test(dataProvider="serverStart")
  public void ServerStartMessageTest(short serverId, DN baseDN, int window,
  public void serverStartMessageTest(short serverId, DN baseDN, int window,
         ServerState state) throws Exception
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
@@ -516,7 +516,7 @@
   * by checking that : msg == new ReplServerStartMessage(msg.getBytes()).
   */
  @Test(dataProvider="changelogStart")
  public void ChangelogStartMessageTest(short serverId, DN baseDN, int window,
  public void replserverStartMessageTest(short serverId, DN baseDN, int window,
         String url, ServerState state) throws Exception
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
@@ -537,7 +537,7 @@
   * by checking that : msg == new WindowMessage(msg.getBytes()).
   */
  @Test()
  public void WindowMessageTest() throws Exception
  public void windowMessageTest() throws Exception
  {
    WindowMessage msg = new WindowMessage(123);
    WindowMessage newMsg = new WindowMessage(msg.getBytes());
@@ -557,11 +557,25 @@
  }
  /**
   * Test ReplServerInfoMessage encoding and decoding.
   */
  @Test()
  public void replServerInfoMessageTest() throws Exception
  {
    List<String> connectedServers = new ArrayList<String>(0);
    connectedServers.add("s1");
    connectedServers.add("s2");
    ReplServerInfoMessage msg = new ReplServerInfoMessage(connectedServers);
    ReplServerInfoMessage newMsg = new ReplServerInfoMessage(msg.getBytes());
    assertEquals(msg.getConnectedServers(), newMsg.getConnectedServers());
  }
  /**
   * Test that EntryMessage encoding and decoding works
   * by checking that : msg == new EntryMessageTest(msg.getBytes()).
   */
  @Test()
  public void EntryMessageTest() throws Exception
  public void entryMessageTest() throws Exception
  {
    String taskInitFromS2 = new String(
        "dn: ds-task-id=" + UUID.randomUUID() +
@@ -586,7 +600,7 @@
   * Test that InitializeRequestMessage encoding and decoding works
   */
  @Test()
  public void InitializeRequestMessageTest() throws Exception
  public void initializeRequestMessageTest() throws Exception
  {
    short sender = 1;
    short target = 2;
@@ -602,7 +616,7 @@
   * Test that InitializeTargetMessage encoding and decoding works
   */
  @Test()
  public void InitializeTargetMessageTest() throws Exception
  public void initializeTargetMessageTest() throws Exception
  {
    short senderID = 1;
    short targetID = 2;
@@ -631,7 +645,7 @@
   * Test that DoneMessage encoding and decoding works
   */
  @Test()
  public void DoneMessage() throws Exception
  public void doneMessageTest() throws Exception
  {
    DoneMessage msg = new DoneMessage((short)1, (short)2);
    DoneMessage newMsg = new DoneMessage(msg.getBytes());
@@ -643,7 +657,7 @@
   * Test that ErrorMessage encoding and decoding works
   */
  @Test()
  public void ErrorMessage() throws Exception
  public void errorMessageTest() throws Exception
  {
    ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
    ErrorMessage newMsg = new ErrorMessage(msg.getBytes());