From 1c8b422d63f419d8c85a28b1f2276ac0f3e3632c Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 12 Jul 2007 15:41:32 +0000
Subject: [PATCH] Fix for 1895 Summary: Total update does not work with 3 servers that are also replication servers

---
 opends/src/server/org/opends/server/messages/ReplicationMessages.java                                        |   35 ++
 opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java                             |    6 
 opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java                          |  142 +++++++++++
 opends/src/server/org/opends/server/replication/server/ReplicationCache.java                                 |  166 ++++++++++---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                |    2 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java             |    3 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java |   30 +
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                                    |   82 +++++-
 opends/src/server/org/opends/server/replication/server/ServerReader.java                                     |    6 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java                  |  247 +++++++++++++++-----
 10 files changed, 594 insertions(+), 125 deletions(-)

diff --git a/opends/src/server/org/opends/server/messages/ReplicationMessages.java b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
index 72f4ad0..388a21b 100644
--- a/opends/src/server/org/opends/server/messages/ReplicationMessages.java
+++ b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -450,6 +450,27 @@
     CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63;
 
   /**
+   * An error happened to send a ReplServerInfoMessage to another
+   * replication server.
+   */
+  public static final int MSGID_CHANGELOG_ERROR_SENDING_INFO =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 64;
+
+  /**
+   * An error happened to send an ErrorMessage to another
+   * replication server.
+   */
+  public static final int MSGID_CHANGELOG_ERROR_SENDING_ERROR =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 65;
+
+  /**
+   * An error happened to send a Message (probably a RoutableMessage)
+   * to another replication server.
+   */
+  public static final int MSGID_CHANGELOG_ERROR_SENDING_MSG =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 66;
+
+  /**
    * Register the messages from this class in the core server.
    *
    */
@@ -530,8 +551,8 @@
         "An unexpected error happened handling connection with %s." +
         "This connection is going to be closed. ");
     registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ACK,
-        "An unexpected error happened sending an ack to %s." +
-        "This connection is going to be closed. ");
+        "An unexpected error occurred  while sending an ack to %s." +
+        "This connection is going to be closed and reopened. ");
     registerMessage(
         MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE,
         "An Exception was caught while receiving replication message : %s");
@@ -617,5 +638,15 @@
     registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG,
         "The connection to Replication Server %s has been dropped by the "
         + "Replication Server");
+    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_INFO,
+        "An unexpected error occurred  while sending a Server " +
+        " Info message to %s. " +
+        "This connection is going to be closed and reopened");
+    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ERROR,
+        "An unexpected error occurred  while sending an Error Message to %s. "+
+        "This connection is going to be closed and reopened");
+    registerMessage(MSGID_CHANGELOG_ERROR_SENDING_MSG,
+        "An unexpected error occurred  while sending a Message to %s. "+
+        "This connection is going to be closed and reopened");
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
new file mode 100644
index 0000000..b048b8a
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -0,0 +1,142 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.replication.protocol;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.DataFormatException;
+
+/**
+ *
+ * This class defines a message that is sent by a replication server
+ * to the other replication servers in the topology containing the list
+ * of LDAP servers directly connected to it.
+ * A replication server sends a ReplServerInfoMessage when an LDAP
+ * server connects or disconnects.
+ *
+ * Exchanging these messages allows to have each replication server
+ * knowing the complete list of LDAP servers in the topology and
+ * their associated replication server and thus take the appropriate
+ * decision to route a message to an LDAP server.
+ *
+ */
+public class ReplServerInfoMessage extends ReplicationMessage
+{
+  private List<String> connectedServers = null;
+
+  /**
+   * Creates a new changelogInfo message from its encoded form.
+   *
+   * @param in The byte array containing the encoded form of the message.
+   * @throws java.util.zip.DataFormatException If the byte array does not
+   * contain a valid encoded form of the message.
+   */
+  public ReplServerInfoMessage(byte[] in) throws DataFormatException
+  {
+    try
+    {
+      /* first byte is the type */
+      if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO)
+        throw new DataFormatException(
+        "Input is not a valid changelogInfo Message.");
+
+      connectedServers = new ArrayList<String>();
+      int pos = 1;
+      while (pos < in.length)
+      {
+        /*
+         * Read the next server ID
+         * first calculate the length then construct the string
+         */
+        int length = getNextLength(in, pos);
+        connectedServers.add(new String(in, pos, length, "UTF-8"));
+        pos += length +1;
+      }
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+  }
+
+
+  /**
+   * Creates a new changelogInfo message from a list of the currently
+   * connected servers.
+   *
+   * @param connectedServers The list of currently connected servers ID.
+   */
+  public ReplServerInfoMessage(List<String> connectedServers)
+  {
+    this.connectedServers = connectedServers;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  {
+    try
+    {
+      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
+
+      /* Put the message type */
+      oStream.write(MSG_TYPE_REPL_SERVER_INFO);
+      if (connectedServers.size() >= 1)
+      {
+        for (String server : connectedServers)
+        {
+          byte[] byteServerURL = server.getBytes("UTF-8");
+          oStream.write(byteServerURL);
+          oStream.write(0);
+        }
+      }
+      return oStream.toByteArray();
+    }
+    catch (IOException e)
+    {
+      // never happens
+      return null;
+    }
+  }
+
+  /**
+   * Get the list of servers currently connected to the Changelog server
+   * that generated this message.
+   *
+   * @return A collection of the servers currently connected to the Changelog
+   *         server that generated this message.
+   */
+  public List<String> getConnectedServers()
+  {
+    return connectedServers;
+  }
+}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
index 15847ed..4bed982 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -53,6 +53,7 @@
   static final byte MSG_TYPE_DONE = 13;
   static final byte MSG_TYPE_ERROR = 14;
   static final byte MSG_TYPE_WINDOW_PROBE = 15;
+  static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
   // Adding a new type of message here probably requires to
   // change accordingly generateMsg method below
 
@@ -73,6 +74,8 @@
    * MSG_TYPE_ENTRY
    * MSG_TYPE_DONE
    * MSG_TYPE_ERROR
+   * MSG_TYPE_WINDOW_PROBE
+   * MSG_TYPE_REPL_SERVER_INFO
    *
    * @return the byte[] representation of this message.
    * @throws UnsupportedEncodingException  When the encoding of the message
@@ -140,6 +143,9 @@
       case MSG_TYPE_WINDOW_PROBE:
         msg = new WindowProbe(buffer);
       break;
+      case MSG_TYPE_REPL_SERVER_INFO:
+        msg = new ReplServerInfoMessage(buffer);
+      break;
       default:
         throw new DataFormatException("received message with unknown type");
     }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 2fbc4da..24a167f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -43,9 +43,9 @@
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.AckMessage;
 import org.opends.server.replication.protocol.ErrorMessage;
-import org.opends.server.replication.protocol.InitializeRequestMessage;
 import org.opends.server.replication.protocol.RoutableMessage;
 import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.replication.protocol.ReplServerInfoMessage;
 import org.opends.server.types.DN;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
@@ -95,6 +95,7 @@
    * We add new TreeSet in the HashMap when a new replication server register
    * to this replication server.
    */
+
   private Map<Short, ServerHandler> replicationServers =
     new ConcurrentHashMap<Short, ServerHandler>();
 
@@ -253,6 +254,11 @@
         return false;
       }
       connectedServers.put(handler.getServerId(), handler);
+
+      // Update the remote replication servers with our list
+      // of connected LDAP servers
+      sendReplServerInfo();
+
       return true;
     }
   }
@@ -269,7 +275,13 @@
     if (handler.isReplicationServer())
       replicationServers.remove(handler.getServerId());
     else
+    {
       connectedServers.remove(handler.getServerId());
+
+      // Update the remote replication servers with our list
+      // of connected LDAP servers
+      sendReplServerInfo();
+    }
   }
 
   /**
@@ -312,6 +324,12 @@
         return false;
       }
       replicationServers.put(handler.getServerId(), handler);
+
+      // Update this server with the list of LDAP servers
+      // already connected
+      handler.sendInfo(
+          new ReplServerInfoMessage(getConnectedLDAPservers()));
+
       return true;
     }
   }
@@ -376,6 +394,22 @@
     return sourceDbHandlers.keySet();
   }
 
+  /**
+   * Returns as a set of String the list of LDAP servers connected to us.
+   * Each string is the serverID of a connected LDAP server.
+   *
+   * @return The set of connected LDAP servers
+   */
+  public List<String> getConnectedLDAPservers()
+  {
+    List<String> mySet = new ArrayList<String>(0);
+
+    for (ServerHandler handler : connectedServers.values())
+    {
+      mySet.add(String.valueOf(handler.getServerId()));
+    }
+    return mySet;
+  }
 
   /**
    * Creates and returns an iterator.
@@ -473,15 +507,9 @@
   protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
       ServerHandler senderHandler)
   {
-
     List<ServerHandler> servers =
       new ArrayList<ServerHandler>();
 
-    logError(ErrorLogCategory.SYNCHRONIZATION,
-        ErrorLogSeverity.SEVERE_ERROR,
-        "getDestinationServers"
-        + " msgDest:" + msg.getDestination() , 1);
-
     if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
     {
       // TODO Import from the "closest server" to be implemented
@@ -497,7 +525,7 @@
         }
       }
 
-      // Send to all connected LDAP servers
+      // Sends to all connected LDAP servers
       for (ServerHandler destinationHandler : connectedServers.values())
       {
         // Don't loop on the sender
@@ -518,14 +546,20 @@
       else
       {
         // the targeted server is NOT connected
+        // Let's search for THE changelog server that MAY
+        // have the targeted server connected.
         if (senderHandler.isLDAPserver())
         {
-          // let's forward to the other changelogs
-          servers.addAll(replicationServers.values());
+          for (ServerHandler h : replicationServers.values())
+          {
+            if (h.isRemoteLDAPServer(msg.getDestination()))
+            {
+              servers.add(h);
+            }
+          }
         }
       }
     }
-
     return servers;
   }
 
@@ -543,37 +577,53 @@
 
     if (servers.isEmpty())
     {
-      if (!(msg instanceof InitializeRequestMessage))
-      {
-        // TODO A more elaborated policy is probably needed
-      }
-      else
-      {
-        ErrorMessage errMsg = new ErrorMessage(
-            msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
-            "serverID:" + msg.getDestination());
-
-        try
-        {
-          senderHandler.send(errMsg);
-        }
-        catch(IOException ioe)
-        {
-          // TODO Handle error properly (sender timeout in addition)
-        }
-      }
-      return;
-    }
-
-    for (ServerHandler targetHandler : servers)
-    {
+      ErrorMessage errMsg = new ErrorMessage(
+          msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+          "serverID:" + msg.getDestination());
       try
       {
-        targetHandler.send(msg);
+        senderHandler.send(errMsg);
       }
       catch(IOException ioe)
       {
         // TODO Handle error properly (sender timeout in addition)
+        /*
+         * An error happened trying the send back an ack to this server.
+         * Log an error and close the connection to this server.
+         */
+        int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR;
+        String message = getMessage(msgID, this.toString())
+        + stackTraceToSingleLineString(ioe);
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.SEVERE_ERROR,
+            message, msgID);
+        senderHandler.shutdown();
+      }
+    }
+    else
+    {
+      for (ServerHandler targetHandler : servers)
+      {
+        try
+        {
+          targetHandler.send(msg);
+        }
+        catch(IOException ioe)
+        {
+          /*
+           * An error happened trying the send back an ack to this server.
+           * Log an error and close the connection to this server.
+           */
+          int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG;
+          String message = getMessage(msgID, this.toString())
+          + stackTraceToSingleLineString(ioe) + " "
+          + msg.getClass().getCanonicalName();
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.SEVERE_ERROR,
+              message, msgID);
+          senderHandler.shutdown();
+          // TODO Handle error properly (sender timeout in addition)
+        }
       }
     }
 
@@ -722,4 +772,48 @@
       }
       return true;
     }
+
+    /**
+     * Send a ReplServerInfoMessage to all the connected replication servers
+     * in order to let them know our connected LDAP servers.
+     */
+    private void sendReplServerInfo()
+    {
+      ReplServerInfoMessage info =
+        new ReplServerInfoMessage(getConnectedLDAPservers());
+      for (ServerHandler handler : replicationServers.values())
+      {
+        try
+        {
+          handler.sendInfo(info);
+        }
+        catch (IOException e)
+        {
+          /*
+           * An error happened trying the send back an ack to this server.
+           * Log an error and close the connection to this server.
+           */
+          int    msgID   = MSGID_CHANGELOG_ERROR_SENDING_INFO;
+          String message = getMessage(msgID, this.toString())
+          + stackTraceToSingleLineString(e);
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.SEVERE_ERROR,
+              message, msgID);
+          handler.shutdown();
+        }
+      }
+    }
+
+    /**
+     * Sets the replication server informations for the provided
+     * handler from the provided ReplServerInfoMessage.
+     *
+     * @param handler The server handler from which the info was received.
+     * @param infoMsg The information message that was received.
+     */
+    public void setReplServerInfo(
+        ServerHandler handler, ReplServerInfoMessage infoMsg)
+    {
+      handler.setReplServerInfo(infoMsg);
+    }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 8eb31b9..7027dda 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -357,7 +357,7 @@
    * @param baseDn The base Dn for which the ReplicationCache must be returned.
    * @return The ReplicationCache associated to the base DN given in parameter.
    */
-  ReplicationCache getReplicationCache(DN baseDn)
+  public ReplicationCache getReplicationCache(DN baseDn)
   {
     ReplicationCache replicationCache;
 
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 7a0f72c..9dc3848 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -35,6 +35,7 @@
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -61,6 +62,7 @@
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.WindowMessage;
 import org.opends.server.replication.protocol.WindowProbe;
+import org.opends.server.replication.protocol.ReplServerInfoMessage;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
@@ -129,6 +131,14 @@
 
   private short protocolVersion;
 
+
+  /**
+   * When this Handler is connected to a changelog server this collection
+   * will contain the list of LDAP servers connected to the remote changelog
+   * server.
+   */
+  private List<String> remoteLDAPservers = new ArrayList<String>();
+
   /**
    * The time in milliseconds between heartbeats from the replication
    * server.  Zero means heartbeats are off.
@@ -1342,18 +1352,70 @@
   public void process(RoutableMessage msg)
   {
     if (debugEnabled())
-      TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
-                 msg + " to " + serverId);
-
-    logError(ErrorLogCategory.SYNCHRONIZATION,
-        ErrorLogSeverity.SEVERE_ERROR,
-        "SH(" + replicationServerId + ") receives " + msg +
-            " from " + serverId, 1);
+      TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
+                 msg + " from " + serverId);
 
     replicationCache.process(msg, this);
   }
 
   /**
+   * Sends the provided ReplServerInfoMessage.
+   *
+   * @param info The ReplServerInfoMessage message to be sent.
+   * @throws IOException When it occurs while sending the message,
+   *
+   */
+   public void sendInfo(ReplServerInfoMessage info)
+   throws IOException
+   {
+     session.publish(info);
+   }
+
+   /**
+    *
+    * Sets the replication server from the message provided.
+    *
+    * @param infoMsg The information message.
+    */
+   public void setReplServerInfo(ReplServerInfoMessage infoMsg)
+   {
+     remoteLDAPservers = infoMsg.getConnectedServers();
+   }
+
+   /**
+    * When this handler is connected to a replication server, specifies if
+    * a wanted server is connected to this replication server.
+    *
+    * @param wantedServer The server we want to know if it is connected
+    * to the replication server represented by this handler.
+    * @return boolean True is the wanted server is connected to the server
+    * represented by this handler.
+    */
+   public boolean isRemoteLDAPServer(short wantedServer)
+   {
+     for (String server : remoteLDAPservers)
+     {
+       if (wantedServer == Short.valueOf(server))
+       {
+         return true;
+       }
+     }
+     return false;
+   }
+
+   /**
+    * When the handler is connected to a replication server, specifies the
+    * replication server has remote LDAP servers connected to it.
+    *
+    * @return boolean True is the replication server has remote LDAP servers
+    * connected to it.
+    */
+   public List<String> getRemoteLDAPServers()
+   {
+     return remoteLDAPservers;
+   }
+
+  /**
    * Send an InitializeRequestMessage to the server connected through this
    * handler.
    *
@@ -1365,12 +1427,6 @@
     if (debugEnabled())
       TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
                  msg + " to " + serverId);
-
-    logError(ErrorLogCategory.SYNCHRONIZATION,
-        ErrorLogSeverity.SEVERE_ERROR,
-        "SH(" + replicationServerId + ") forwards " +
-             msg + " to " + serverId, 1);
-
     session.publish(msg);
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 16373fc..0c0875d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -46,6 +46,7 @@
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.WindowMessage;
 import org.opends.server.replication.protocol.WindowProbe;
+import org.opends.server.replication.protocol.ReplServerInfoMessage;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
 import org.opends.server.loggers.debug.DebugTracer;
@@ -165,6 +166,11 @@
           WindowProbe windowProbeMsg = (WindowProbe) msg;
           handler.process(windowProbeMsg);
         }
+        else if (msg instanceof ReplServerInfoMessage)
+        {
+          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
+          handler.setReplServerInfo(infoMsg);
+        }
         else if (msg == null)
         {
           /*
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index 2f6f0c8..dd84def 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -45,7 +45,11 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 
 import org.opends.server.TestCaseUtils;
@@ -103,7 +107,7 @@
  */
 
 public class InitOnLineTest extends ReplicationTestCase
-{
+ {
   /**
    * The tracer object for the debug logger
    */
@@ -124,17 +128,21 @@
   boolean ssShutdownRequested = false;
   protected String[] updatedEntries;
   boolean externalDS = false;
-  short server1ID = 1;
-  short server2ID = 2;
-  short server3ID = 3;
-  short changelog1ID = 12;
-  short changelog2ID = 13;
-  int changelogPort = 8989;
+  private static final short server1ID = 11;
+  private static final short server2ID = 21;
+  private static final short server3ID = 31;
+  private static final short changelog1ID = 1;
+  private static final short changelog2ID = 2;
+  private static final short changelog3ID = 3;
 
+  private static int[] replServerPort = new int[4];
+  
   private DN baseDn;
   ReplicationBroker server2 = null;
+  ReplicationBroker server3 = null;
   ReplicationServer changelog1 = null;
   ReplicationServer changelog2 = null;
+  ReplicationServer changelog3 = null;
   boolean emptyOldChanges = true;
   ReplicationDomain sd = null;
 
@@ -221,16 +229,6 @@
         "ds-task-initialize-domain-dn: dc=example,dc=com",
         "ds-task-initialize-replica-server-id: all");
 
-    // Change log
-    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
-    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
-    + "objectClass: top\n"
-    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
-    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
-    + "ds-cfg-changelog-server-id: 1\n"
-    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
-    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
-    replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
     replServerEntry = null;
 
   }
@@ -604,17 +602,20 @@
         "dn: dc=example,dc=com\n"
         + "objectClass: top\n"
         + "objectClass: domain\n"
+        + "dc: example\n"
         + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
         + "\n",
           "dn: ou=People,dc=example,dc=com\n"
         + "objectClass: top\n"
         + "objectClass: organizationalUnit\n"
+        + "ou: People\n"
         + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
         + "\n",
           "dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n"
         + "objectclass: top\n"
         + "objectclass: person\n"
         + "objectclass: organizationalPerson\n"
+        + "objectclass: inetOrgPerson\n"
         + "cn: Fiona Jensen\n"
         + "sn: Jensen\n"
         + "uid: fiona\n"
@@ -625,6 +626,7 @@
         + "objectclass: top\n"
         + "objectclass: person\n"
         + "objectclass: organizationalPerson\n"
+        + "objectclass: inetOrgPerson\n"
         + "cn: Robert Langman\n"
         + "sn: Langman\n"
         + "uid: robert\n"
@@ -738,25 +740,37 @@
    */
   private ReplicationServer createChangelogServer(short changelogId)
   {
+    SortedSet<String> servers = null;
+    servers = new TreeSet<String>();
     try
     {
-      if ((changelogId==changelog1ID)&&(changelog1!=null))
-        return changelog1;
-
-      if ((changelogId==changelog2ID)&&(changelog2!=null))
-        return changelog2;
-
+      if (changelogId==changelog1ID)
       {
-        int chPort = getChangelogPort(changelogId);
-
-        ReplServerFakeConfiguration conf =
-          new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
-                                         null);
-        ReplicationServer replicationServer = new ReplicationServer(conf);
-        Thread.sleep(1000);
-
-        return replicationServer;
+        if (changelog1!=null)
+          return changelog1;
       }
+      else if (changelogId==changelog2ID)
+      {
+        if (changelog2!=null)
+          return changelog2;
+      }
+      else if (changelogId==changelog3ID)
+      {
+        if (changelog3!=null)
+          return changelog3;
+      }
+      servers.add("localhost:" + getChangelogPort(changelog1ID));
+      servers.add("localhost:" + getChangelogPort(changelog2ID));
+      servers.add("localhost:" + getChangelogPort(changelog3ID));
+
+      int chPort = getChangelogPort(changelogId);
+      ReplServerFakeConfiguration conf =
+        new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
+            servers);
+      ReplicationServer replicationServer = new ReplicationServer(conf);
+      Thread.sleep(1000);
+
+      return replicationServer;
     }
     catch (Exception e)
     {
@@ -796,7 +810,6 @@
         DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
         assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
         "Unable to add the synchronized server");
-        entryList.add(synchroServerEntry.getDN());
 
         sd = ReplicationDomain.retrievesReplicationDomain(baseDn);
 
@@ -820,14 +833,29 @@
 
   private int getChangelogPort(short changelogID)
   {
-    return (changelogPort+changelogID);
+    if (replServerPort[changelogID] == 0)
+    {
+      try
+      {
+        // Find  a free port for the replicationServer
+        ServerSocket socket = TestCaseUtils.bindFreePort();
+        replServerPort[changelogID] = socket.getLocalPort();
+        socket.close();
+      }
+      catch(Exception e)
+      {
+        fail("Cannot retrieve a free port for replication server."
+          + e.getMessage());
+      }
+    }
+    return replServerPort[changelogID];
   }
 
   /**
    * Tests the import side of the Initialize task
    */
   @Test(enabled=false)
-  public void InitializeImport() throws Exception
+  public void initializeImport() throws Exception
   {
     String testCase = "InitializeImport";
 
@@ -883,7 +911,7 @@
    * Tests the export side of the Initialize task
    */
   @Test(enabled=false)
-  public void InitializeExport() throws Exception
+  public void initializeExport() throws Exception
   {
     String testCase = "Replication/InitializeExport";
 
@@ -917,7 +945,7 @@
    * Tests the import side of the InitializeTarget task
    */
   @Test(enabled=false)
-  public void InitializeTargetExport() throws Exception
+  public void initializeTargetExport() throws Exception
   {
     String testCase = "Replication/InitializeTargetExport";
 
@@ -957,7 +985,7 @@
    * Tests the import side of the InitializeTarget task
    */
   @Test(enabled=false)
-  public void InitializeTargetExportAll() throws Exception
+  public void initializeTargetExportAll() throws Exception
   {
     String testCase = "Replication/InitializeTargetExportAll";
 
@@ -1001,7 +1029,7 @@
    * Tests the import side of the InitializeTarget task
    */
   @Test(enabled=false)
-  public void InitializeTargetImport() throws Exception
+  public void initializeTargetImport() throws Exception
   {
     String testCase = "InitializeTargetImport";
 
@@ -1042,7 +1070,7 @@
    * Tests the import side of the InitializeTarget task
    */
   @Test(enabled=false)
-  public void InitializeTargetConfigErrors() throws Exception
+  public void initializeTargetConfigErrors() throws Exception
   {
     String testCase = "InitializeTargetConfigErrors";
 
@@ -1096,7 +1124,7 @@
    * Tests the import side of the InitializeTarget task
    */
   @Test(enabled=false)
-  public void InitializeConfigErrors() throws Exception
+  public void initializeConfigErrors() throws Exception
   {
     String testCase = "InitializeConfigErrors";
 
@@ -1116,10 +1144,10 @@
           ",cn=Scheduled Tasks,cn=Tasks",
           "objectclass: top",
           "objectclass: ds-task",
-          "objectclass: ds-task-initialize",
+          "objectclass: ds-task-initialize-from-remote-replica",
           "ds-task-class-name: org.opends.server.tasks.InitializeTask",
           "ds-task-initialize-domain-dn: foo",
-          "ds-task-initialize-source: " + server2ID);
+          "ds-task-initialize-replica-server-id: " + server2ID);
       addTask(taskInit, ResultCode.INVALID_DN_SYNTAX,
           TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN);
 
@@ -1129,10 +1157,10 @@
           ",cn=Scheduled Tasks,cn=Tasks",
           "objectclass: top",
           "objectclass: ds-task",
-          "objectclass: ds-task-initialize",
+          "objectclass: ds-task-initialize-from-remote-replica",
           "ds-task-class-name: org.opends.server.tasks.InitializeTask",
           "ds-task-initialize-domain-dn: dc=foo",
-          "ds-task-initialize-source: " + server2ID);
+          "ds-task-initialize-replica-server-id: " + server2ID);
       addTask(taskInit, ResultCode.OTHER, MSGID_NO_MATCHING_DOMAIN);
 
       // Invalid Source
@@ -1141,10 +1169,10 @@
           ",cn=Scheduled Tasks,cn=Tasks",
           "objectclass: top",
           "objectclass: ds-task",
-          "objectclass: ds-task-initialize",
+          "objectclass: ds-task-initialize-from-remote-replica",
           "ds-task-class-name: org.opends.server.tasks.InitializeTask",
           "ds-task-initialize-domain-dn: " + baseDn,
-          "ds-task-initialize-source: -3");
+          "ds-task-initialize-replica-server-id: -3");
       addTask(taskInit, ResultCode.OTHER,
           MSGID_INVALID_IMPORT_SOURCE);
 
@@ -1162,21 +1190,101 @@
   }
 
   @Test(enabled=false)
-  public void InitializeTargetBroken() throws Exception
+  public void initializeTargetBroken() throws Exception
   {
     String testCase = "InitializeTargetBroken";
     fail(testCase + " NYI");
   }
 
   @Test(enabled=false)
-  public void InitializeBroken() throws Exception
+  public void initializeBroken() throws Exception
   {
     String testCase = "InitializeBroken";
     fail(testCase + " NYI");
   }
 
+  /*
+   * TestReplServerInfos tests that in a topology with more
+   * than one replication server, in each replication server
+   * is stored the list of LDAP servers connected to each
+   * replication server of the topology, thanks to the
+   * ReplServerInfoMessage(s) exchanged by the replication
+   * servers.
+   */
   @Test(enabled=false)
-  public void InitializeTargetExportMultiSS() throws Exception
+  public void testReplServerInfos() throws Exception
+  {
+    String testCase = "Replication/TestReplServerInfos";
+
+    log("Starting " + testCase);
+
+    // Create the Repl Servers
+    changelog1 = createChangelogServer(changelog1ID);
+    changelog2 = createChangelogServer(changelog2ID);
+    changelog3 = createChangelogServer(changelog3ID);
+
+    // Connects lDAP1 to replServer1
+    connectServer1ToChangelog(changelog1ID);
+
+    // Connects lDAP2 to replServer2
+    ReplicationBroker broker2 = 
+      openReplicationSession(DN.decode("dc=example,dc=com"),
+        server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
+
+    // Connects lDAP3 to replServer2
+    ReplicationBroker broker3 =
+      openReplicationSession(DN.decode("dc=example,dc=com"),
+        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
+
+    // Check that the list of connected LDAP servers is correct
+    // in each replication servers
+    List<String> l1 = changelog1.getReplicationCache(baseDn).
+      getConnectedLDAPservers();
+    assertEquals(l1.size(), 1);
+    assertEquals(l1.get(0), String.valueOf(server1ID));
+    
+    List<String> l2;
+    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+    assertEquals(l2.size(), 2);
+    assertEquals(l2.get(0), String.valueOf(server3ID));
+    assertEquals(l2.get(1), String.valueOf(server2ID));
+        
+    List<String> l3;
+    l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers();
+    assertEquals(l3.size(), 0);
+
+    // Test updates
+    broker3.stop();
+    Thread.sleep(1000);
+    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+    assertEquals(l2.size(), 1);
+    assertEquals(l2.get(0), String.valueOf(server2ID));
+
+    broker3 = openReplicationSession(DN.decode("dc=example,dc=com"),
+        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
+    broker2.stop();
+    Thread.sleep(1000);
+    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+    assertEquals(l2.size(), 1);
+    assertEquals(l2.get(0), String.valueOf(server3ID));
+
+    // TODO Test ReplicationCache.getDestinationServers method.
+
+    broker2.stop();
+    broker3.stop();
+
+    cleanEntries();
+
+    changelog3.shutdown();
+    changelog3 = null;
+    changelog2.shutdown();
+    changelog2 = null;
+    changelog1.shutdown();
+    changelog1 = null;
+  }
+  
+  @Test(enabled=false)
+  public void initializeTargetExportMultiSS() throws Exception
   {
     String testCase = "Replication/InitializeTargetExportMultiSS";
 
@@ -1222,17 +1330,20 @@
   }
 
   @Test(enabled=false)
-  public void InitializeExportMultiSS() throws Exception
+  public void initializeExportMultiSS() throws Exception
   {
     String testCase = "Replication/InitializeExportMultiSS";
     log("Starting "+testCase);
 
     // Create 2 changelogs
     changelog1 = createChangelogServer(changelog1ID);
-    Thread.sleep(3000);
+    Thread.sleep(1000);
 
     changelog2 = createChangelogServer(changelog2ID);
-    Thread.sleep(3000);
+    Thread.sleep(1000);
+
+    changelog3 = createChangelogServer(changelog3ID);
+    Thread.sleep(1000);
 
     // Connect DS to the replicationServer 1
     connectServer1ToChangelog(changelog1ID);
@@ -1240,7 +1351,7 @@
     // Put entries in DB
     addTestEntriesToDB();
 
-    // Connect a broker acting as server 2 to changelog2
+    // Connect a broker acting as server 2 to Repl Server 2
     if (server2 == null)
     {
       server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
@@ -1248,6 +1359,14 @@
         1000, emptyOldChanges);
     }
 
+    // Connect a broker acting as server 3 to Repl Server 3
+    if (server3 == null)
+    {
+      server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
+        server3ID, 100, getChangelogPort(changelog3ID),
+        1000, emptyOldChanges);
+    }
+
     Thread.sleep(3000);
 
     // S2 sends init request
@@ -1267,7 +1386,7 @@
   }
 
   @Test(enabled=false)
-  public void InitializeNoSource() throws Exception
+  public void initializeNoSource() throws Exception
   {
     String testCase = "InitializeNoSource";
     log("Starting "+testCase);
@@ -1317,7 +1436,7 @@
   }
 
   @Test(enabled=false)
-  public void InitializeTargetNoTarget() throws Exception
+  public void initializeTargetNoTarget() throws Exception
   {
     String testCase = "InitializeTargetNoTarget"  + baseDn;
     log("Starting "+testCase);
@@ -1336,10 +1455,10 @@
         ",cn=Scheduled Tasks,cn=Tasks",
         "objectclass: top",
         "objectclass: ds-task",
-        "objectclass: ds-task-initialize-target",
+        "objectclass: ds-task-initialize-remote-replica",
         "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
-        "ds-task-initialize-target-domain-dn: "+baseDn,
-        "ds-task-initialize-target-scope: " + 10);
+        "ds-task-initialize-domain-dn: "+baseDn,
+        "ds-task-initialize-replica-server-id: " + 0);
 
     addTask(taskInit, ResultCode.SUCCESS, 0);
 
@@ -1355,32 +1474,32 @@
   }
 
   @Test(enabled=false)
-  public void InitializeStopped() throws Exception
+  public void initializeStopped() throws Exception
   {
     String testCase = "InitializeStopped";
     fail(testCase + " NYI");
   }
   @Test(enabled=false)
-  public void InitializeTargetStopped() throws Exception
+  public void initializeTargetStopped() throws Exception
   {
     String testCase = "InitializeTargetStopped";
     fail(testCase + " NYI");
   }
   @Test(enabled=false)
-  public void InitializeCompressed() throws Exception
+  public void initializeCompressed() throws Exception
   {
     String testCase = "InitializeStopped";
     fail(testCase + " NYI");
   }
   @Test(enabled=false)
-  public void InitializeTargetEncrypted() throws Exception
+  public void initializeTargetEncrypted() throws Exception
   {
     String testCase = "InitializeTargetCompressed";
     fail(testCase + " NYI");
   }
 
   @Test(enabled=false)
-  public void InitializeSimultaneous() throws Exception
+  public void initializeSimultaneous() throws Exception
   {
     String testCase = "InitializeSimultaneous";
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 7ab93ba..86a13f3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -173,7 +173,8 @@
       {
         logError(ErrorLogCategory.SYNCHRONIZATION,
             ErrorLogSeverity.NOTICE,
-            "ReplicationTestCase/openChangelogSession" + e.getMessage(), 1);
+            "ReplicationTestCase/openChangelogSession " + e.getMessage()
+            + " when emptying old changes", 1);
       }
     }
     return broker;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index f112a94..68de891 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -483,7 +483,7 @@
    * by checking that : msg == new ServerStartMessage(msg.getBytes()).
    */
   @Test(dataProvider="serverStart")
-  public void ServerStartMessageTest(short serverId, DN baseDN, int window,
+  public void serverStartMessageTest(short serverId, DN baseDN, int window,
          ServerState state) throws Exception
   {
     state.update(new ChangeNumber((long)1, 1,(short)1));
@@ -516,7 +516,7 @@
    * by checking that : msg == new ReplServerStartMessage(msg.getBytes()).
    */
   @Test(dataProvider="changelogStart")
-  public void ChangelogStartMessageTest(short serverId, DN baseDN, int window,
+  public void replserverStartMessageTest(short serverId, DN baseDN, int window,
          String url, ServerState state) throws Exception
   {
     state.update(new ChangeNumber((long)1, 1,(short)1));
@@ -537,7 +537,7 @@
    * by checking that : msg == new WindowMessage(msg.getBytes()).
    */
   @Test()
-  public void WindowMessageTest() throws Exception
+  public void windowMessageTest() throws Exception
   {
     WindowMessage msg = new WindowMessage(123);
     WindowMessage newMsg = new WindowMessage(msg.getBytes());
@@ -557,11 +557,25 @@
   }
 
   /**
+   * Test ReplServerInfoMessage encoding and decoding.
+   */
+  @Test()
+  public void replServerInfoMessageTest() throws Exception
+  {
+    List<String> connectedServers = new ArrayList<String>(0);
+    connectedServers.add("s1");
+    connectedServers.add("s2");
+    ReplServerInfoMessage msg = new ReplServerInfoMessage(connectedServers);
+    ReplServerInfoMessage newMsg = new ReplServerInfoMessage(msg.getBytes());
+    assertEquals(msg.getConnectedServers(), newMsg.getConnectedServers());
+  }
+
+  /**
    * Test that EntryMessage encoding and decoding works
    * by checking that : msg == new EntryMessageTest(msg.getBytes()).
    */
   @Test()
-  public void EntryMessageTest() throws Exception
+  public void entryMessageTest() throws Exception
   {
     String taskInitFromS2 = new String(
         "dn: ds-task-id=" + UUID.randomUUID() +
@@ -586,7 +600,7 @@
    * Test that InitializeRequestMessage encoding and decoding works
    */
   @Test()
-  public void InitializeRequestMessageTest() throws Exception
+  public void initializeRequestMessageTest() throws Exception
   {
     short sender = 1;
     short target = 2;
@@ -602,7 +616,7 @@
    * Test that InitializeTargetMessage encoding and decoding works
    */
   @Test()
-  public void InitializeTargetMessageTest() throws Exception
+  public void initializeTargetMessageTest() throws Exception
   {
     short senderID = 1;
     short targetID = 2;
@@ -631,7 +645,7 @@
    * Test that DoneMessage encoding and decoding works
    */
   @Test()
-  public void DoneMessage() throws Exception
+  public void doneMessageTest() throws Exception
   {
     DoneMessage msg = new DoneMessage((short)1, (short)2);
     DoneMessage newMsg = new DoneMessage(msg.getBytes());
@@ -643,7 +657,7 @@
    * Test that ErrorMessage encoding and decoding works
    */
   @Test()
-  public void ErrorMessage() throws Exception
+  public void errorMessageTest() throws Exception
   {
     ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
     ErrorMessage newMsg = new ErrorMessage(msg.getBytes());

--
Gitblit v1.10.0