From 7b84e53457bce1f0733afa87797afc9928568c52 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 09 Oct 2006 14:15:55 +0000
Subject: [PATCH] - Change the synchronization code so that the changelog server can now be used directly through the ChangelogBroker class as a client API.

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java                             |   81 +++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java                                 |    2 
 opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java                                   |   15 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java                       |   61 +-
 opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java                                   |   14 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java                               |    2 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java                                      |    4 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java                                 |   26 +
 opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java                        |    8 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java                       |   29 +
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java |  853 ++++++++++++++++++++++++++++++++------------
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java        |    4 
 opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java                                     |    9 
 13 files changed, 807 insertions(+), 301 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java
index 177b974..1448e47 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/changelog/ProtocolSession.java
@@ -27,6 +27,7 @@
 package org.opends.server.changelog;
 
 import java.io.IOException;
+import java.net.SocketException;
 import java.util.zip.DataFormatException;
 
 import org.opends.server.synchronization.SynchronizationMessage;
@@ -90,4 +91,18 @@
    * @return The IP address of the remote server.
    */
   public abstract String getRemoteAddress();
+
+
+  /**
+  * Set a timeout value.
+  * With this option set to a non-zero value, calls to the receive() method
+  * block for only this amount of time after which a
+  * java.net.SocketTimeoutException is raised.
+  * The Broker is valid and useable even after such an Exception is raised.
+  *
+  * @param timeout the specified timeout, in milliseconds.
+  * @throws SocketException if there is an error in the underlying protocol,
+  *         such as a TCP error.
+  */
+  public abstract void setSoTimeout(int timeout) throws SocketException;
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java
index 23d87c2..c2b0a5c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/changelog/SerializingProtocolSession.java
@@ -111,4 +111,12 @@
   {
     return socket.getInetAddress().getHostAddress();
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void setSoTimeout(int timeout) throws SocketException
+  {
+    socket.setSoTimeout(timeout);
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java
index b19f27d..4bb87d6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/changelog/SocketSession.java
@@ -30,6 +30,7 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.net.SocketException;
 import java.util.zip.DataFormatException;
 
 import org.opends.server.synchronization.SynchronizationMessage;
@@ -128,4 +129,12 @@
   {
     return socket.getInetAddress().getHostAddress();
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void setSoTimeout(int timeout) throws SocketException
+  {
+    socket.setSoTimeout(timeout);
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
index d177029..a69c8b9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
@@ -55,8 +55,8 @@
 public class AddMsg extends UpdateMessage
 {
   private static final long serialVersionUID = -4905520652801395185L;
-  private byte[] encodedAttributes ;
-  private String parentUniqueId;
+  private final byte[] encodedAttributes;
+  private final String parentUniqueId;
 
   /**
    * Creates a new AddMessage.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index 3a4d466..b265bb3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -39,6 +39,8 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
 
 import org.opends.server.changelog.ProtocolSession;
 import org.opends.server.changelog.SocketSession;
@@ -47,6 +49,7 @@
 import org.opends.server.protocols.internal.InternalSearchListener;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.types.DN;
 import org.opends.server.types.DereferencePolicy;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
@@ -63,43 +66,64 @@
 {
   private boolean shutdown = false;
   private List<String> servers;
-  private Short identifier;
   private boolean connected = false;
-  private SynchronizationDomain domain;
   private final Object lock = new Object();
   private String changelogServer = "Not connected";
   private TreeSet<FakeOperation> replayOperations;
   private ProtocolSession session = null;
+  private final ServerState state;
+  private final DN baseDn;
+  private final short serverID;
+  private int maxSendDelay;
+  private int maxReceiveDelay;
+  private int maxSendQueue;
+  private int maxReceiveQueue;
 
   /**
    * Creates a new Changelog Broker for a particular SynchronizationDomain.
    *
-   * @param domain The SynchronizationDomain for which the borker is created.
+   * @param state The ServerState that should be used by this broker
+   *              when negociating the session with the changelog servers.
+   * @param baseDn The base DN that should be used by this broker
+   *              when negociating the session with the changelog servers.
+   * @param serverID The server ID that should be used by this broker
+   *              when negociating the session with the changelog servers.
+   * @param maxReceiveQueue The maximum size of the receive queue to use on
+   *                         the changelog server.
+   * @param maxReceiveDelay The maximum replication delay to use on the
+   *                        changelog server.
+   * @param maxSendQueue The maximum size of the send queue to use on
+   *                     the changelog server.
+   * @param maxSendDelay The maximum send delay to use on the changelog server.
    */
-  public ChangelogBroker(SynchronizationDomain domain)
+  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
+      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+      int maxSendDelay )
   {
-    this.domain = domain;
+    this.baseDn = baseDn;
+    this.serverID = serverID;
+    this.maxReceiveDelay = maxReceiveDelay;
+    this.maxSendDelay = maxSendDelay;
+    this.maxReceiveQueue = maxReceiveQueue;
+    this.maxSendQueue = maxSendQueue;
+    this.state = state;
     replayOperations =
       new TreeSet<FakeOperation>(new FakeOperationComparator());
   }
 
-
   /**
    * Start the ChangelogBroker.
    *
-   * @param identifier identifier of the changelog
    * @param servers list of servers used
    * @throws Exception : in case of errors
    */
-  public void start(Short identifier,
-                    List<String> servers)
+  public void start(List<String> servers)
                     throws Exception
   {
     /*
      * Open Socket to the Changelog
      * Send the Start message
      */
-    this.identifier = identifier;
     this.servers = servers;
     if (servers.size() < 1)
     {
@@ -147,7 +171,9 @@
           /*
            * Send our ServerStartMessage.
            */
-          ServerStartMessage msg = domain.newServerStartMessage();
+          ServerStartMessage msg = new ServerStartMessage(  serverID, baseDn,
+              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+              state);
           session.publish(msg);
 
 
@@ -167,10 +193,10 @@
            * those changes and send them again to any changelog server.
            */
           ChangeNumber changelogMaxChangeNumber =
-            startMsg.getServerState().getMaxChangeNumber(identifier);
+            startMsg.getServerState().getMaxChangeNumber(serverID);
           if (changelogMaxChangeNumber == null)
-            changelogMaxChangeNumber = new ChangeNumber(0, 0, identifier);
-          ChangeNumber ourMaxChangeNumber =  domain.getMaxChangeNumber();
+            changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
           if ((ourMaxChangeNumber == null) ||
               (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
           {
@@ -206,7 +232,7 @@
               LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
               attrs.add(Historical.HISTORICALATTRIBUTENAME);
               InternalSearchOperation op = conn.processSearch(
-                  new ASN1OctetString(domain.getBaseDN().toString()),
+                  new ASN1OctetString(baseDn.toString()),
                   SearchScope.WHOLE_SUBTREE,
                   DereferencePolicy.NEVER_DEREF_ALIASES,
                   0, 0, false, filter,
@@ -381,8 +407,10 @@
   /**
    * Receive a message.
    * @return the received message
+   * @throws SocketTimeoutException if the tiemout set by setSoTimeout
+   *         has expired
    */
-  public SynchronizationMessage receive()
+  public SynchronizationMessage receive() throws SocketTimeoutException
   {
     while (shutdown == false)
     {
@@ -392,6 +420,11 @@
         return session.receive();
       } catch (Exception e)
       {
+        if (e instanceof SocketTimeoutException)
+        {
+          SocketTimeoutException e1 = (SocketTimeoutException) e;
+          throw e1;
+        }
         if (shutdown == false)
         {
           synchronized (lock)
@@ -439,6 +472,22 @@
   }
 
   /**
+   * Set a timeout value.
+   * With this option set to a non-zero value, calls to the receive() method
+   * block for only this amount of time after which a
+   * java.net.SocketTimeoutException is raised.
+   * The Broker is valid and useable even after such an Exception is raised.
+   *
+   * @param timeout the specified timeout, in milliseconds.
+   * @throws SocketException if there is an error in the underlying protocol,
+   *         such as a TCP error.
+   */
+  public void setSoTimeout(int timeout) throws SocketException
+  {
+    session.setSoTimeout(timeout);
+  }
+
+  /**
    * Get the name of the changelog server to which this broker is currently
    * connected.
    *
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
index 3fec9b3..1d288e5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
@@ -45,6 +45,7 @@
 
   /**
    * Creates a new delete message.
+   *
    * @param op the Operation from which the message must be created.
    */
   public DeleteMsg(DeleteOperation op)
@@ -54,6 +55,19 @@
   }
 
   /**
+   * Creates a new delete message.
+   *
+   * @param dn The dn with which the message must be created.
+   * @param changeNumber The change number with which the message must be
+   *                     created.
+   * @param uid The unique id with which the message must be created.
+   */
+  public DeleteMsg(String dn, ChangeNumber changeNumber, String uid)
+  {
+    super(new DeleteContext(changeNumber, uid), dn);
+  }
+
+  /**
    * Creates a new Add message from a byte[].
    *
    * @param in The byte[] from which the operation must be read.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
index eb4a162..c794f4d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
@@ -69,6 +69,32 @@
   }
 
   /**
+   * construct a new Modify DN message.
+   *
+   * @param dn The dn to use for building the message.
+   * @param changeNumber The changeNumberto use for building the message.
+   * @param uid The unique id to use for building the message.
+   * @param newParentUid The new parent unique id to use for building
+   *                     the message.
+   * @param deleteOldRdn boolean indicating if old rdn must be deleted to use
+   *                     for building the message.
+   * @param newSuperior The new Superior entry to use for building the message.
+   * @param newRDN The new Rdn to use for building the message.
+   */
+  public ModifyDNMsg(String dn, ChangeNumber changeNumber, String uid,
+                     String newParentUid, boolean deleteOldRdn,
+                     String newSuperior, String newRDN)
+  {
+    super(new ModifyDnContext(changeNumber, uid, newParentUid), dn);
+
+    newSuperiorId = newParentUid;
+
+    this.deleteOldRdn = deleteOldRdn;
+    this.newSuperior = newSuperior;
+    this.newRDN = newRDN;
+  }
+
+  /**
    * Creates a new ModifyDN message from a byte[].
    *
    * @param in The byte[] from which the operation must be read.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java
index 336afb0..59560b3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ServerState.java
@@ -249,7 +249,7 @@
       for (Short key  : list.keySet())
       {
         ChangeNumber change = list.get(key);
-        str += change.toString();
+        str += " " + change.toString();
       }
 
       return str;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
index 7e71f2e..d593969 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -34,6 +34,7 @@
 import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT;
 import static org.opends.server.synchronization.Historical.*;
 
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -290,10 +291,11 @@
      */
     try
     {
-      broker = new ChangelogBroker(this);
+      broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
+          maxReceiveDelay, maxSendQueue, maxSendDelay);
       synchronized (broker)
       {
-        broker.start(serverId, changelogServers);
+        broker.start(changelogServers);
         if (!receiveStatus)
           broker.suspendReceive();
       }
@@ -379,7 +381,7 @@
       {
         broker.stop();
         changelogServers = newChangelogServers;
-        broker.start(serverId, changelogServers);
+        broker.start(changelogServers);
       }
 
       /*
@@ -676,14 +678,22 @@
       UpdateMessage update = null;
       while (update == null)
       {
-        SynchronizationMessage msg = broker.receive();
-        if (msg == null)
+        SynchronizationMessage msg;
+        try
         {
-          // The server is in the shutdown process
-          return null;
+          msg = broker.receive();
+          if (msg == null)
+          {
+            // The server is in the shutdown process
+            return null;
+          }
+
+          update = msg.processReceive(this);
+        } catch (SocketTimeoutException e)
+        {
+          // just retry
         }
 
-        update = msg.processReceive(this);
       }
       return update;
     }
@@ -1001,28 +1011,6 @@
   }
 
   /**
-   * Get the largest ChangeNumber that has been processed locally.
-   *
-   * @return The largest ChangeNumber that has been processed locally.
-   */
-  public ChangeNumber getMaxChangeNumber()
-  {
-    return state.getMaxChangeNumber(serverId);
-  }
-
-  /**
-   * Create a new serverStartMessage suitable for this SynchronizationDomain.
-   *
-   * @return A new serverStartMessage suitable for this SynchronizationDomain.
-   */
-  public ServerStartMessage newServerStartMessage()
-  {
-    return new ServerStartMessage(serverId, baseDN, maxReceiveDelay,
-                                  maxReceiveQueue, maxSendDelay, maxSendQueue,
-                                  state);
-  }
-
-  /**
    * Create and replay a synchronized Operation from an UpdateMessage.
    *
    * @param msg The UpdateMessage to be replayed.
@@ -1074,6 +1062,13 @@
           {
             done = true;  // unknown type of operation ?!
           }
+          if (done)
+          {
+            // the update became a dummy update and the result
+            // of the conflict resolution phase is to do nothing.
+            // however we still need to push this change to the serverState
+            updateError(changeNumber);
+          }
         }
         else
         {
@@ -1377,8 +1372,8 @@
       }
       else
       {
-        RDN entryRdn = op.getEntryDN().getRDN();
-        msg.setDn(parentDn + "," + entryRdn);
+        RDN entryRdn = DN.decode(msg.getDn()).getRDN();
+        msg.setDn(entryRdn + "," + parentDn);
         return false;
       }
     }
@@ -1534,7 +1529,7 @@
    */
   private String generateConflictDn(String entryUid, String dn)
   {
-    return dn + "entryuuid=" + entryUid;
+    return "entryuuid=" + entryUid + "+" + dn;
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
index ddf53f7..b5df1ee 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
@@ -102,7 +102,7 @@
   /**
    * Generates an Update Message which the provided information.
    *
-   * @param op The operation fo which the message must be created.
+   * @param op The operation for which the message must be created.
    * @param isAssured flag indicating if the operation is an assured operation.
    * @return The generated message.
    */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index 1c0cd69..90065bb 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -35,6 +35,7 @@
 import java.util.ArrayList;
 import java.net.ServerSocket;
 import java.net.InetSocketAddress;
+import java.net.SocketException;
 
 import org.opends.server.backends.MemoryBackend;
 import org.opends.server.config.ConfigException;
@@ -200,19 +201,13 @@
     ServerSocket serverJmxSocket   = null;
     ServerSocket serverLdapsSocket = null;
 
-    serverLdapSocket = new ServerSocket();
-    serverLdapSocket.setReuseAddress(true);
-    serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+    serverLdapSocket = bindFreePort();
     serverLdapPort = serverLdapSocket.getLocalPort();
 
-    serverJmxSocket = new ServerSocket();
-    serverJmxSocket.setReuseAddress(true);
-    serverJmxSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+    serverJmxSocket = bindFreePort();
     serverJmxPort = serverJmxSocket.getLocalPort();
 
-    serverLdapsSocket = new ServerSocket();
-    serverLdapsSocket.setReuseAddress(true);
-    serverLdapsSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+    serverLdapsSocket = bindFreePort();
     serverLdapsPort = serverLdapsSocket.getLocalPort();
 
     BufferedReader reader = new BufferedReader(new FileReader(
@@ -264,6 +259,22 @@
   }
 
   /**
+   * Find and binds to a free server socket port on the local host.
+   * @return the bounded Server socket.
+   *
+   * @throws IOException in case of underlying exception.
+   * @throws SocketExceptionin case of underlying exception.
+   */
+  public static ServerSocket bindFreePort() throws IOException, SocketException
+  {
+    ServerSocket serverLdapSocket;
+    serverLdapSocket = new ServerSocket();
+    serverLdapSocket.setReuseAddress(true);
+    serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+    return serverLdapSocket;
+  }
+
+  /**
    * Shut down the server, if it has been started.
    * @param reason The reason for the shutdown.
    */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java
index 1d4b710..f163395 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/protocols/jmx/JmxConnectTest.java
@@ -297,9 +297,7 @@
     
     // change the configuration of the connection handler to use 
     // a free port
-    ServerSocket serverJmxSocket = new ServerSocket();
-    serverJmxSocket.setReuseAddress(true);
-    serverJmxSocket.bind(new InetSocketAddress("127.0.0.1", 0));
+    ServerSocket serverJmxSocket = TestCaseUtils.bindFreePort();
     int serverJmxPort = serverJmxSocket.getLocalPort();
     
     ConfigEntry config = new ConfigEntry(TestCaseUtils.makeEntry(
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index 2b45b55..d90cab8 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -27,16 +27,13 @@
 
 package org.opends.server.synchronization;
 
+import static org.testng.Assert.*;
+
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
 
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.*;
-
 import org.opends.server.TestCaseUtils;
 import org.opends.server.config.ConfigEntry;
 import org.opends.server.config.ConfigException;
@@ -46,26 +43,26 @@
 import org.opends.server.core.ModifyDNOperation;
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.Operation;
-
 import org.opends.server.protocols.internal.InternalClientConnection;
-
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.Modification;
 import org.opends.server.types.ModificationType;
 import org.opends.server.types.OperationType;
 import org.opends.server.types.RDN;
-import org.opends.server.types.ResultCode;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 /**
- * Test the contructors, encoders and decoders of the synchronization
- * AckMsg, ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
+ * Test the contructors, encoders and decoders of the synchronization AckMsg,
+ * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
  */
-public class UpdateOperationTest
-    extends SynchronizationTestCase
+public class UpdateOperationTest extends SynchronizationTestCase
 {
 
   /**
@@ -87,18 +84,21 @@
    * The synchronization plugin entry
    */
   private String synchroPluginStringDN;
+
   private Entry synchroPluginEntry;
 
   /**
    * The Server synchro entry
    */
   private String synchroServerStringDN;
+
   private Entry synchroServerEntry;
 
   /**
    * The Change log entry
    */
   private String changeLogStringDN;
+
   private Entry changeLogEntry;
 
   /**
@@ -107,26 +107,38 @@
   private Entry personEntry;
 
   /**
+   * An entry with a entryUUID
+   */
+  private Entry personWithUUIDEntry;
+  private Entry personWithSecondUniqueID;
+
+  /**
    * schema check flag
    */
   private boolean schemaCheck;
 
-
   // WORKAROUND FOR BUG #639 - BEGIN -
   /**
    *
    */
   MultimasterSynchronization mms;
 
-  // WORKAROUND FOR BUG #639 - END -
+  private String baseUUID;
 
+  private String user1dn;
+
+  private String user1entrysecondUUID;
+
+  private String user1entryUUID;
+
+  // WORKAROUND FOR BUG #639 - END -
 
   /**
    * Set up the environment for performing the tests in this Class.
    * synchronization
    *
    * @throws Exception
-   *         If the environment could not be set up.
+   *           If the environment could not be set up.
    */
   @BeforeClass
   @Override
@@ -146,21 +158,23 @@
     String[] topEntries = new String[2];
     topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
         + "objectClass: domain\n";
-    topEntries[1] = "dn: ou=People,dc=example,dc=com\n"
-        + "objectClass: top\n" + "objectClass: organizationalUnit\n";
-    Entry entry ;
+    topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
+        + "objectClass: organizationalUnit\n"
+        + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
+    Entry entry;
     for (int i = 0; i < topEntries.length; i++)
     {
       entry = TestCaseUtils.entryFromLdifString(topEntries[i]);
       AddOperation addOp = new AddOperation(connection,
-          InternalClientConnection.nextOperationID(),
-          InternalClientConnection.nextMessageID(), null, entry.getDN(),
-          entry.getObjectClasses(), entry.getUserAttributes(), entry
-              .getOperationalAttributes());
+          InternalClientConnection.nextOperationID(), InternalClientConnection
+              .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
+          entry.getUserAttributes(), entry.getOperationalAttributes());
       addOp.setInternalOperation(true);
       addOp.run();
       entryList.add(entry);
     }
+    
+    baseUUID = getEntryUUID(DN.decode("ou=People,dc=example,dc=com"));
 
     // top level synchro provider
     synchroStringDN = "cn=Synchronization Providers,cn=config";
@@ -182,8 +196,7 @@
     String changeLogLdif = "dn: " + changeLogStringDN + "\n"
         + "objectClass: top\n"
         + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
-        + "cn: Changelog Server\n"
-        + "ds-cfg-changelog-port: 8989\n"
+        + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
         + "ds-cfg-changelog-server-id: 1\n";
     changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
 
@@ -195,8 +208,7 @@
         + "cn: example\n"
         + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
         + "ds-cfg-changelog-server: localhost:8989\n"
-        + "ds-cfg-directory-server-id: 1\n"
-        + "ds-cfg-receive-status: true\n";
+        + "ds-cfg-directory-server-id: 1\n" + "ds-cfg-receive-status: true\n";
     synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
 
     String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
@@ -204,8 +216,8 @@
         + "objectClass: organizationalPerson\n"
         + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
         + "homePhone: 951-245-7634\n"
-        + "description: This is the description for Aaccf Amar.\n"
-        + "st: NC\n" + "mobile: 027-085-0537\n"
+        + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+        + "mobile: 027-085-0537\n"
         + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
         + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
         + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
@@ -214,49 +226,613 @@
         + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
         + "userPassword: password\n" + "initials: AA\n";
     personEntry = TestCaseUtils.entryFromLdifString(personLdif);
+
+    /*
+     * The 2 entries defined in the following code are used for the naming
+     * conflict resolution test (called namingConflicts)
+     * They must have the same DN but different entryUUID.
+     */
+    user1entryUUID = "33333333-3333-3333-3333-333333333333";
+    user1entrysecondUUID = "22222222-2222-2222-2222-222222222222";
+    user1dn = "uid=user1,ou=People,dc=example,dc=com";
+    String entryWithUUIDldif = "dn: "+ user1dn + "\n"
+      + "objectClass: top\n" + "objectClass: person\n"
+      + "objectClass: organizationalPerson\n"
+      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+      + "homePhone: 951-245-7634\n"
+      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+      + "mobile: 027-085-0537\n"
+      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+      + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
+      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+      + "street: 17984 Thirteenth Street\n"
+      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+      + "userPassword: password\n" + "initials: AA\n"
+      + "entryUUID: " + user1entryUUID + "\n";
+    personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
+
+    String entryWithSecondUUID = "dn: "+ user1dn + "\n"
+      + "objectClass: top\n" + "objectClass: person\n"
+      + "objectClass: organizationalPerson\n"
+      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+      + "homePhone: 951-245-7634\n"
+      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+      + "mobile: 027-085-0537\n"
+      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+      + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
+      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+      + "street: 17984 Thirteenth Street\n"
+      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+      + "userPassword: password\n" + "initials: AA\n"
+      + "entryUUID: "+ user1entrysecondUUID + "\n";
+    personWithSecondUniqueID =
+      TestCaseUtils.entryFromLdifString(entryWithSecondUUID);
+
+    configureSynchronization();
   }
 
   /**
    * Clean up the environment. return null;
    *
    * @throws Exception
-   *         If the environment could not be set up.
+   *           If the environment could not be set up.
    */
   @AfterClass
   public void classCleanUp() throws Exception
   {
 
     DirectoryServer.setCheckSchema(schemaCheck);
-    DeleteOperation op;
 
-    //  WORKAROUND FOR BUG #639 - BEGIN -
+    // WORKAROUND FOR BUG #639 - BEGIN -
     DirectoryServer.deregisterSynchronizationProvider(mms);
     mms.finalizeSynchronizationProvider();
-    //  WORKAROUND FOR BUG #639 - END -
+    // WORKAROUND FOR BUG #639 - END -
 
+    cleanEntries();
+  }
+
+  /**
+   * suppress all the entries created by the tests in this class
+   */
+  private void cleanEntries()
+  {
+    DeleteOperation op;
     // Delete entries
-    Entry entries[] = entryList.toArray(new Entry[0]) ;
-    for (int i = entries.length -1 ; i != 0 ; i--)
+    Entry entries[] = entryList.toArray(new Entry[0]);
+    for (int i = entries.length - 1; i != 0; i--)
     {
       try
       {
         op = new DeleteOperation(connection, InternalClientConnection
-            .nextOperationID(), InternalClientConnection.nextMessageID(),
-            null, entries[i].getDN());
+            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
+            entries[i].getDN());
         op.run();
-      }
-      catch (Exception e)
+      } catch (Exception e)
       {
       }
     }
   }
 
   /**
-   * Tests that performed operation will generate synchronization messages
+   * Tests the naming conflict resolution code.
+   * In this test, the local server act both as an LDAP server and
+   * a changelog server that are inter-connected.
+   *
+   * The test creates an other session to the changelog server using
+   * directly the ChangelogBroker API.
+   * It then uses this session to siomulate conflicts and therefore
+   * test the naming conflict resolution code.
    */
-  @Test()
+  @Test(enabled=true)
+  public void namingConflicts() throws Exception
+  {
+    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+
+    /*
+     * Open a session to the changelog server using the Changelog broker API.
+     * This must use a serverId different from the LDAP server ID
+     */
+    ChangelogBroker broker = openChangelogSession(baseDn, (short) 2);
+
+    /*
+     * Create a Change number generator to generate new changenumbers
+     * when we need to send operations messages to the changelog server.
+     */
+    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
+
+
+    /*
+     * Test that the conflict resolution code is able to find entries
+     * that have been renamed by an other master.
+     * To simulate this, create and entry with a given UUID and a given DN
+     * then send a modify operation using another DN but the same UUID.
+     * Finally check that the modify operation has been applied.
+     */
+    // create the entry with a given DN
+    AddMsg addMsg = new AddMsg(gen.NewChangeNumber(),
+        personWithUUIDEntry.getDN().toString(),
+        user1entryUUID,
+        baseUUID,
+        personWithUUIDEntry.getObjectClassAttribute(),
+        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+    broker.publish(addMsg);
+
+    // Check that the entry has been created in the local DS.
+    Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+    assertNotNull(resultEntry,
+        "The send ADD synchronization message was not applied");
+    entryList.add(resultEntry);
+
+    // send a modify operation with the correct unique ID but another DN
+    List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+    ModifyMsg modMsg = new ModifyMsg(gen.NewChangeNumber(),
+        DN.decode("cn=something,ou=People,dc=example,dc=com"), mods,
+        user1entryUUID);
+    broker.publish(modMsg);
+
+    // check that the modify has been applied as if the entry had been renamed.
+    boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+                           "telephonenumber", "01 02 45", 1000);
+    if (found == false)
+     fail("The modification has not been correctly replayed.");
+
+    /*
+     * Test that the conflict resolution code is able to detect
+     * that and entry have been renamed and that a new entry has
+     * been created with the same DN but another entry UUID
+     * To simulate this, create and entry with a given UUID and a given DN
+     * then send a modify operation using the same DN but another UUID.
+     * Finally check that the modify operation has not been applied to the
+     * entry with the given DN.
+     */
+
+    //  create the entry with a given DN and unique ID
+    addMsg = new AddMsg(gen.NewChangeNumber(),
+        personWithUUIDEntry.getDN().toString(),
+        user1entryUUID, baseUUID,
+        personWithUUIDEntry.getObjectClassAttribute(),
+        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+    broker.publish(addMsg);
+
+    // Check that the entry has been created in the local DS.
+    resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+    assertNotNull(resultEntry,
+        "The ADD synchronization message was not applied");
+    entryList.add(resultEntry);
+
+    // send a modify operation with a wrong unique ID but the same DN
+    mods = generatemods("telephonenumber", "02 01 03 05");
+    modMsg = new ModifyMsg(gen.NewChangeNumber(),
+        DN.decode("cn=something,ou=People,dc=example,dc=com"), mods,
+        "10000000-9abc-def0-1234-1234567890ab");
+    broker.publish(modMsg);
+
+    // check that the modify has not been applied
+    found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+                           "telephonenumber", "02 01 03 05", 1000);
+    if (found == true)
+     fail("The modification has been replayed while it should not.");
+
+
+    /*
+     * Test that the conflict resolution code is able to find entries
+     * that have been renamed by an other master.
+     * To simulate this, send a delete operation using another DN but
+     * the same UUID has the entry that has been used in the tests above.
+     * Finally check that the delete operation has been applied.
+     */
+    // send a delete operation with a wong dn but the unique ID of the entry
+    // used above
+    DeleteMsg delMsg =
+      new DeleteMsg("cn=anotherdn,ou=People,dc=example,dc=com",
+          gen.NewChangeNumber(), user1entryUUID);
+    broker.publish(delMsg);
+    
+    // check that the delete operation has been applied
+    resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+
+    assertNull(resultEntry,
+        "The DELETE synchronization message was not replayed");
+
+    /*
+     * Test that two adds with the same DN but a different unique ID result
+     * cause a conflict and result in the second entry to be renamed.
+     */
+
+    //  create an entry with a given DN and unique ID
+    addMsg = new AddMsg(gen.NewChangeNumber(),
+        personWithUUIDEntry.getDN().toString(),
+        user1entryUUID, baseUUID,
+        personWithUUIDEntry.getObjectClassAttribute(),
+        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+    broker.publish(addMsg);
+
+    //  Check that the entry has been created in the local DS.
+    resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+    assertNotNull(resultEntry,
+        "The ADD synchronization message was not applied");
+    entryList.add(resultEntry);
+
+    //  create an entry with the same DN and another unique ID
+    addMsg = new AddMsg(gen.NewChangeNumber(),
+        personWithSecondUniqueID.getDN().toString(),
+        user1entrysecondUUID, baseUUID,
+        personWithSecondUniqueID.getObjectClassAttribute(),
+        personWithSecondUniqueID.getAttributes(), new ArrayList<Attribute>());
+    broker.publish(addMsg);
+
+    //  Check that the entry has been renamed and created in the local DS.
+    resultEntry = getEntry(
+        DN.decode("entryuuid=" + user1entrysecondUUID +" + " + user1dn), 1000);
+    assertNotNull(resultEntry,
+        "The ADD synchronization message was not applied");
+
+    //  delete the entries to clean the database.
+    delMsg =
+      new DeleteMsg(personWithUUIDEntry.getDN().toString(),
+          gen.NewChangeNumber(), user1entryUUID);
+    broker.publish(delMsg);
+    delMsg =
+      new DeleteMsg(personWithSecondUniqueID.getDN().toString(),
+          gen.NewChangeNumber(), user1entrysecondUUID);
+    broker.publish(delMsg);
+    resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+
+    // check that the delete operation has been applied
+    assertNull(resultEntry,
+        "The DELETE synchronization message was not replayed");
+    /*
+     * Check that and added entry is correctly added below it's
+     * parent entry when this parent entry has been renamed.
+     *
+     * Simulate this by trying to add an entry below a DN that does not
+     * exist but with a parent ID that exist.
+     */
+    addMsg = new AddMsg(gen.NewChangeNumber(),
+        "uid=new person,o=nothere,o=below,ou=People,dc=example,dc=com",
+        user1entryUUID,
+        baseUUID,
+        personWithUUIDEntry.getObjectClassAttribute(),
+        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+    broker.publish(addMsg);
+
+    //  Check that the entry has been renamed and created in the local DS.
+    resultEntry = getEntry(
+        DN.decode("uid=new person,ou=People,dc=example,dc=com"), 1000);
+    assertNotNull(resultEntry,
+        "The ADD synchronization message was not applied");
+
+    /*
+     * Check that when replaying delete the naming conflict code
+     * verify that the unique ID op the replayed operation is
+     * the same as the unique ID of the entry with the given DN
+     *
+     * To achieve this send a delete operation with a correct DN
+     * but a wrong unique ID.
+     */
+
+    //  delete the entry to clean the database
+    delMsg =
+      new DeleteMsg("uid=new person,ou=People,dc=example,dc=com",
+          gen.NewChangeNumber(), "11111111-9abc-def0-1234-1234567890ab");
+    broker.publish(delMsg);
+    resultEntry = getEntry(
+          DN.decode("uid=new person,ou=People,dc=example,dc=com"), 1000);
+
+    // check that the delete operation has not been applied
+    assertNotNull(resultEntry,
+        "The DELETE synchronization message was replayed when it should not");
+
+    // delete the entry to clean the database
+    delMsg =
+      new DeleteMsg("uid=new person,ou=People,dc=example,dc=com",
+          gen.NewChangeNumber(), user1entryUUID);
+    broker.publish(delMsg);
+    resultEntry = getEntry(
+          DN.decode("uid=new person,ou=People,dc=example,dc=com"), 1000);
+
+    // check that the delete operation has been applied
+    assertNull(resultEntry,
+        "The DELETE synchronization message was not replayed");
+
+    broker.stop();
+  }
+
+  /**
+   * Tests done using directly the ChangelogBroker interface.
+   */
+  @Test(enabled=true)
   public void updateOperations() throws Exception
   {
+    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+
+    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 3, 0);
+
+    /*
+     * Test that operations done on this server are sent to the
+     * changelog server and forwarded to our changelog broker session.
+     */
+
+    // Create an Entry (add operation)
+    AddOperation addOp = new AddOperation(connection,
+        InternalClientConnection.nextOperationID(), InternalClientConnection
+        .nextMessageID(), null, personEntry.getDN(), personEntry
+        .getObjectClasses(), personEntry.getUserAttributes(), personEntry
+        .getOperationalAttributes());
+    addOp.run();
+    entryList.add(personEntry);
+    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+      "The Add Entry operation failed");
+
+    // Check if the client has received the msg
+    SynchronizationMessage msg = broker.receive();
+    assertTrue(msg instanceof AddMsg,
+      "The received synchronization message is not an ADD msg");
+    AddMsg addMsg =  (AddMsg) msg;
+
+    Operation receivedOp = addMsg.createOperation(connection);
+    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+      "The received synchronization message is not an ADD msg");
+
+    assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+      "The received ADD synchronization message is not for the excepted DN");
+
+    // Modify the entry
+    List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+
+    ModifyOperation modOp = new ModifyOperation(connection,
+        InternalClientConnection.nextOperationID(), InternalClientConnection
+            .nextMessageID(), null, personEntry.getDN(), mods);
+    modOp.setInternalOperation(true);
+    modOp.run();
+
+    // See if the client has received the msg
+    msg = broker.receive();
+    assertTrue(msg instanceof ModifyMsg,
+        "The received synchronization message is not a MODIFY msg");
+    ModifyMsg modMsg = (ModifyMsg) msg;
+
+    receivedOp = modMsg.createOperation(connection);
+    assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
+    "The received MODIFY synchronization message is not for the excepted DN");
+
+    // Modify the entry DN
+    DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
+    ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
+        InternalClientConnection.nextOperationID(), InternalClientConnection
+            .nextMessageID(), null, personEntry.getDN(), RDN
+            .decode("uid=new person"), true, DN
+            .decode("ou=People,dc=example,dc=com"));
+    modDNOp.run();
+    assertNotNull(DirectoryServer.getEntry(newDN),
+        "The MOD_DN operation didn't create the new person entry");
+    assertNull(DirectoryServer.getEntry(personEntry.getDN()),
+        "The MOD_DN operation didn't delete the old person entry");
+    entryList.add(DirectoryServer.getEntry(newDN));
+
+    // See if the client has received the msg
+    msg = broker.receive();
+    assertTrue(msg instanceof ModifyDNMsg,
+        "The received synchronization message is not a MODIFY DN msg");
+    ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
+    receivedOp = moddnMsg.createOperation(connection);
+
+    assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
+        "The received MODIFY_DN message is not for the excepted DN");
+
+    // Delete the entry
+    Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
+    DeleteOperation delOp = new DeleteOperation(connection,
+        InternalClientConnection.nextOperationID(), InternalClientConnection
+            .nextMessageID(), null, DN
+            .decode("uid= new person,ou=People,dc=example,dc=com"));
+    delOp.run();
+    assertNull(DirectoryServer.getEntry(newDN),
+        "Unable to delete the new person Entry");
+    entryList.remove(newPersonEntry);
+
+    // See if the client has received the msg
+    msg = broker.receive();
+    assertTrue(msg instanceof DeleteMsg,
+        "The received synchronization message is not a MODIFY DN msg");
+    DeleteMsg delMsg = (DeleteMsg) msg;
+    receivedOp = delMsg.createOperation(connection);
+    assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
+        .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
+        "The received DELETE message is not for the excepted DN");
+
+    /*
+     * Now check that when we send message to the Changelog server
+     * and that they are received and correctly replayed by the server.
+     *
+     * Start by testing the Add message reception
+     */
+    addMsg = new AddMsg(gen.NewChangeNumber(),
+        personWithUUIDEntry.getDN().toString(),
+        user1entryUUID, baseUUID,
+        personWithUUIDEntry.getObjectClassAttribute(),
+        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+    broker.publish(addMsg);
+
+    /*
+     * Check that the entry has been created in the local DS.
+     */
+    Entry resultEntry = getEntry(personWithUUIDEntry.getDN(), 1000);
+    assertNotNull(resultEntry,
+        "The send ADD synchronization message was not applied");
+    entryList.add(resultEntry);
+
+    /*
+     * Test the reception of Modify Msg
+     */
+    modMsg = new ModifyMsg(gen.NewChangeNumber(), personWithUUIDEntry.getDN(),
+                           mods, user1entryUUID);
+    broker.publish(modMsg);
+
+    boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+                           "telephonenumber", "01 02 45", 1000);
+
+    if (found == false)
+     fail("The modification has not been correctly replayed.");
+
+    /*
+     * Test the Reception of Modify Dn Msg
+     */
+    moddnMsg = new ModifyDNMsg(personWithUUIDEntry.getDN().toString(),
+                           gen.NewChangeNumber(),
+                           user1entryUUID, null,
+                           true, null, "uid= new person");
+    broker.publish(moddnMsg);
+
+    resultEntry = getEntry(
+        DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000);
+
+    assertNotNull(resultEntry,
+        "The modify DN synchronization message was not applied");
+
+    /*
+     * Test the Reception of Delete Msg
+     */
+    delMsg = new DeleteMsg("uid= new person,ou=People,dc=example,dc=com",
+                           gen.NewChangeNumber(), user1entryUUID);
+    broker.publish(delMsg);
+    resultEntry = getEntry(
+          DN.decode("uid= new person,ou=People,dc=example,dc=com"), 1000);
+
+    assertNull(resultEntry,
+        "The DELETE synchronization message was not replayed");
+
+    broker.stop();
+  }
+
+  /**
+   * @return
+   */
+  private List<Modification> generatemods(String attrName, String attrValue)
+  {
+    AttributeType attrType =
+      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
+    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
+    values.add(new AttributeValue(attrType, attrValue));
+    Attribute attr = new Attribute(attrType, attrName, values);
+    List<Modification> mods = new ArrayList<Modification>();
+    Modification mod = new Modification(ModificationType.REPLACE, attr);
+    mods.add(mod);
+    return mods;
+  }
+
+  /**
+   * Open a changelog session to the local Changelog server.
+   *
+   */
+  private ChangelogBroker openChangelogSession(final DN baseDn, short serverId)
+          throws Exception, SocketException
+  {
+    ServerState state = new ServerState(baseDn);
+    state.loadState();
+    ChangelogBroker broker = new ChangelogBroker(state, baseDn,
+                                                 serverId, 0, 0, 0, 0);
+    ArrayList<String> servers = new ArrayList<String>(1);
+    servers.add("localhost:8989");
+    broker.start(servers);
+    broker.setSoTimeout(1000);
+    return broker;
+  }
+
+  /**
+   * Check that the entry with the given dn has the given valueString value
+   * for the given attrTypeStr attribute type.
+   */
+  private boolean checkEntryHasAttribute(DN dn, String attrTypeStr,
+      String valueString, int timeout) throws Exception
+  {
+    // Wait no more than 1 second (synchro operation has to be sent,
+    // received and replay)
+    int i = timeout/100;
+    if (i<1)
+      i=1;
+    boolean found = false;
+    while ((i> 0) && (!found))
+    {
+      Thread.sleep(100);
+      Entry newEntry = DirectoryServer.getEntry(personWithUUIDEntry.getDN());
+      if (newEntry == null)
+        fail("The entry " + personWithUUIDEntry.getDN() +
+             " has incorrectly been deleted from the database.");
+      List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
+      Attribute tmpAttr = tmpAttrList.get(0);
+
+      AttributeType attrType =
+        DirectoryServer.getAttributeType(attrTypeStr, true);
+      found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
+      i-- ;
+    }
+    return found;
+  }
+  
+  /**
+   * Check that the entry with the given dn has the given valueString value
+   * for the given attrTypeStr attribute type.
+   */
+  private String getEntryUUID(DN dn) throws Exception
+  {
+    // Wait no more than 1 second (synchro operation has to be sent,
+    // received and replay)
+    int i = 10;
+    if (i<1)
+      i=1;
+    String found = null;
+    while ((i> 0) && (found == null))
+    {
+      Thread.sleep(100);
+      Entry newEntry = DirectoryServer.getEntry(dn);
+      if (newEntry != null)
+      {
+        List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
+        Attribute tmpAttr = tmpAttrList.get(0);
+
+        LinkedHashSet<AttributeValue> vals = tmpAttr.getValues();
+        
+        for (AttributeValue val : vals)
+        {
+          found = val.getStringValue();
+          break;
+        }
+      }
+    }
+    return found;
+  }
+
+  /**
+   * Retrieves an entry from the local Directory Server.
+   *
+   * @throws InterruptedException
+   * @throws DirectoryException
+   */
+  private Entry getEntry(DN dn, int timeout)
+               throws InterruptedException, DirectoryException
+  {
+    Entry newEntry = null ;
+    int i = timeout/100;
+    if (i<1)
+      i=1;
+    while ((i> 0) && (newEntry == null))
+    {
+      Thread.sleep(100);
+      newEntry = DirectoryServer.getEntry(dn);
+      i--;
+    }
+    return newEntry;
+  }
+
+  /**
+   * Configure the Synchronization for this test.
+   */
+  private void configureSynchronization() throws Exception
+  {
     //
     // Add the Multimaster synchronization plugin
     DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null);
@@ -294,200 +870,5 @@
     assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
         "Unable to add the syncrhonized server");
     entryList.add(synchroServerEntry);
-
-    //
-    // Check Server state
-    DN synchroServerDN = DN.decode("ou=People,dc=example,dc=com");
-    ServerState ss = MultimasterSynchronization
-        .getServerState(synchroServerDN);
-    // TODO Check server state field
-
-    //
-    // Be Client of the Change log
-    // We will check that the Chanlog server send us messages
-    // suffix synchronized
-    String synchroServerLdif2 = "dn: cn=example2, " + synchroPluginStringDN + "\n"
-        + "objectClass: top\n"
-        + "objectClass: ds-cfg-synchronization-provider-config\n"
-        + "cn: example\n"
-        + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
-        + "ds-cfg-changelog-server: localhost:8989\n"
-        + "ds-cfg-directory-server-id: 2\n"
-        + "ds-cfg-receive-status: true\n";
-    Entry synchroServerEntry2 = TestCaseUtils
-        .entryFromLdifString(synchroServerLdif2);
-    ConfigEntry newConfigEntry = new ConfigEntry(synchroServerEntry2,
-        DirectoryServer.getConfigEntry(DN.decode(synchroPluginStringDN)));
-    SynchronizationDomain syncDomain2 = new SynchronizationDomain(
-        newConfigEntry);
-
-    //
-    // Create an Entry (add operation)
-    AddOperation addOp = new AddOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, personEntry.getDN(), personEntry
-            .getObjectClasses(), personEntry.getUserAttributes(), personEntry
-            .getOperationalAttributes());
-    addOp.run();
-    entryList.add(personEntry);
-    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
-        "The Add Entry operation fails");
-
-    // Check if the client has receive the msg
-    UpdateMessage msg = syncDomain2.receive() ;
-    Operation receivedOp = msg.createOperation(connection);
-    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
-        "The received synchronization message is not an ADD msg");
-    assertTrue(DN.decode(msg.getDn()).compareTo(personEntry.getDN()) == 0,
-        "The received ADD synchronization message is not for the excepted DN");
-
-
-    // Modify the entry
-    String attrName = "telephoneNumber";
-    AttributeType attrType = DirectoryServer.getAttributeType(attrName, true);
-
-    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
-    values.add(new AttributeValue(attrType, "01 02 45"));
-    Attribute attr = new Attribute(attrType, attrName, values);
-    List<Modification> mods = new ArrayList<Modification>();
-    Modification mod = new Modification(ModificationType.REPLACE, attr);
-    mods.add(mod);
-
-    ModifyOperation modOp = new ModifyOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, personEntry.getDN(), mods);
-    modOp.setInternalOperation(true);
-    modOp.run();
-    // TODO Check the telephoneNumber attribute
-
-    //  See if the client has receive the msg
-    msg = syncDomain2.receive() ;
-    receivedOp = msg.createOperation(connection);
-    assertTrue(OperationType.MODIFY.compareTo(receivedOp.getOperationType()) == 0,
-        "The received synchronization message is not a MODIFY msg");
-    assertTrue(DN.decode(msg.getDn()).compareTo(personEntry.getDN()) == 0,
-    "The received MODIFY synchronization message is not for the excepted DN");
-
-    //
-    // Modify the entry DN
-    DN newDN = DN.decode("uid= new person,ou=People,dc=example,dc=com") ;
-    ModifyDNOperation modDNOp = new ModifyDNOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, personEntry.getDN(), RDN
-            .decode("uid=new person"), true, DN
-            .decode("ou=People,dc=example,dc=com"));
-    modDNOp.run();
-    assertNotNull(DirectoryServer.getEntry(newDN),
-        "The MOD_DN operation didn't create the new person entry");
-    assertNull(DirectoryServer.getEntry(personEntry.getDN()),
-        "The MOD_DN operation didn't delete the old person entry");
-    entryList.add(DirectoryServer.getEntry(newDN));
-
-    //  See if the client has receive the msg
-    msg = syncDomain2.receive() ;
-    receivedOp = msg.createOperation(connection);
-    assertTrue(OperationType.MODIFY_DN.compareTo(receivedOp.getOperationType()) == 0,
-        "The received synchronization message is not a MODIFY_DN msg");
-    assertTrue(DN.decode(msg.getDn()).compareTo(personEntry.getDN()) == 0,
-        "The received MODIFY_DN synchronization message is not for the excepted DN");
-
-
-    // Delete the entry
-    Entry newPersonEntry = DirectoryServer.getEntry(newDN) ;
-    DeleteOperation delOp = new DeleteOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, DN
-            .decode("uid= new person,ou=People,dc=example,dc=com"));
-    delOp.run();
-    assertNull(DirectoryServer.getEntry(newDN),
-        "Unable to delete the new person Entry");
-    entryList.remove(newPersonEntry);
-
-    //  See if the client has receive the msg
-    msg = syncDomain2.receive() ;
-    receivedOp = msg.createOperation(connection);
-    assertTrue(OperationType.DELETE.compareTo(receivedOp.getOperationType()) == 0,
-        "The received synchronization message is not a DELETE msg");
-    assertTrue(DN.decode(msg.getDn()).compareTo(DN
-        .decode("uid= new person,ou=People,dc=example,dc=com")) == 0,
-        "The received DELETE synchronization message is not for the excepted DN");
-
-    // Purge the client
-    syncDomain2.shutdown() ;
-
-    //
-    // Be new Client of the Change log
-    // We will check that when we send message to the Chanlog server
-    // the synchronization domain apply those changes.
-
-    // Create a new synchronization domain
-    String synchroServerLdif3 = "dn: cn=example3, " + synchroPluginStringDN + "\n"
-        + "objectClass: top\n"
-        + "objectClass: ds-cfg-synchronization-provider-config\n"
-        + "cn: example\n"
-        + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
-        + "ds-cfg-changelog-server: localhost:8989\n"
-        + "ds-cfg-directory-server-id: 3\n"
-        + "ds-cfg-receive-status: true\n";
-    Entry synchroServerEntry3 = TestCaseUtils
-        .entryFromLdifString(synchroServerLdif3);
-    newConfigEntry = new ConfigEntry(synchroServerEntry3, DirectoryServer
-        .getConfigEntry(DN.decode(synchroPluginStringDN)));
-    SynchronizationDomain syncDomain3 = new SynchronizationDomain(
-        newConfigEntry);
-
-    //
-    // Message to Create the Entry
-    addOp = new AddOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, personEntry.getDN(), personEntry
-            .getObjectClasses(), personEntry.getUserAttributes(), personEntry
-            .getOperationalAttributes());
-    syncDomain3.doPreOperation(addOp);
-    addOp.setResultCode(ResultCode.SUCCESS) ;
-    syncDomain3.synchronize(addOp);
-
-    // Wait no more than 1 second (synchro operation has to be sent,
-    // received and replay)
-    Entry newEntry = null ;
-    int i = 10 ;
-    while ((i> 0) && (newEntry == null))
-    {
-      Thread.sleep(100);
-      newEntry = DirectoryServer.getEntry(personEntry.getDN());
-      i-- ;
-    }
-    newEntry = DirectoryServer.getEntry(personEntry.getDN());
-    assertNotNull(newEntry,
-        "The send ADD synchronization message was not applied");
-    entryList.add(newEntry);
-
-    // Message to Modify the new created entry
-
-    // Unable to test it: the doPreOperation operation use
-    // modifyOperation.getModifiedEntry() which cannot be set the
-    // the public level.
-    // Otherwise, code will look like:
-    /*
-    modOp = new ModifyOperation(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, personEntry.getDN(), mods);
-    modOp.setModifiedEntry(...);
-    mms.doPreOperation(modOp);
-    modOp.setResultCode(ResultCode.SUCCESS) ;
-    syncDomain3.synchronize(modOp);
-    //  TODO Check the telephoneNumber attribute
-     */
-
-    //  Purge the message sender
-    syncDomain3.shutdown() ;
-
-
-    // Check synchronization monitoring
-    SynchronizationDomain syncDomain = new SynchronizationDomain(
-        DirectoryServer.getConfigEntry(synchroServerEntry.getDN()));
-    SynchronizationMonitor mon = new SynchronizationMonitor(syncDomain);
-    ArrayList<Attribute> monData = mon.getMonitorData();
-    // TODO Check Monitoring Attributes
   }
 }

--
Gitblit v1.10.0