From 9cdfa916fa7bdc0d4b56b8eddc5d623f5d4e2a95 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 26 Sep 2013 10:30:43 +0000
Subject: [PATCH] Code cleanup.

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java     |   93 ++---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java |  368 +++++++---------------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java        |  428 +++++++++++++-------------
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                |   46 +-
 4 files changed, 383 insertions(+), 552 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index f8d1c64..e4ab506 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -41,7 +41,7 @@
 import org.opends.messages.MessageBuilder;
 import org.opends.messages.Severity;
 import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
 import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.api.AlertGenerator;
@@ -62,7 +62,6 @@
 import org.opends.server.protocols.ldap.LDAPModification;
 import org.opends.server.replication.common.*;
 import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.tasks.PurgeConflictsHistoricalTask;
 import org.opends.server.tasks.TaskUtils;
@@ -71,7 +70,7 @@
 import org.opends.server.util.LDIFReader;
 import org.opends.server.util.TimeThread;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
-import org.opends.server.workflowelement.localbackend.*;
+import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.messages.ToolMessages.*;
@@ -436,7 +435,7 @@
        */
       try
       {
-        if (buildAndPublishMissingChanges(startCSN, broker))
+        if (buildAndPublishMissingChanges(startCSN))
         {
           message = DEBUG_CHANGES_SENT.get();
           logError(message);
@@ -1262,8 +1261,8 @@
           break;
         }
       }
-      boolean attributeToBeFiltered = (fractionalExclusive && found)
-          || (!fractionalExclusive && !found);
+      boolean attributeToBeFiltered = fractionalExclusive && found
+          || !fractionalExclusive && !found;
       if (attributeToBeFiltered
           && !newRdn.hasAttributeType(attributeType)
           && !modifyDNOperation.deleteOldRDN())
@@ -1437,7 +1436,7 @@
    private static boolean isFractionalProhibited(AttributeType attrType)
    {
      String attributeName = attrType.getPrimaryName();
-     return (attributeName != null && isFractionalProhibitedAttr(attributeName))
+     return attributeName != null && isFractionalProhibitedAttr(attributeName)
          || isFractionalProhibitedAttr(attrType.getOID());
    }
 
@@ -1453,8 +1452,8 @@
     // Now remove the attribute or modification if:
     // - exclusive mode and attribute is in configuration
     // - inclusive mode and attribute is not in configuration
-    return (foundAttribute && fractionalExclusive)
-        || (!foundAttribute && !fractionalExclusive);
+    return foundAttribute && fractionalExclusive
+        || !foundAttribute && !fractionalExclusive;
   }
 
   private static boolean contains(Set<String> fractionalConcernedAttributes,
@@ -1857,16 +1856,8 @@
       // this policy imply that we always accept updates.
       return true;
     }
-    if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
-    {
-      // this isolation policy specifies that the updates are denied
-      // when the broker had problems during the connection phase
-      // Updates are still accepted if the broker is currently connecting..
-      return !hasConnectionError();
-    }
-    // we should never get there as the only possible policies are
-    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
-    return true;
+    return !isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)
+        || !hasConnectionError();
   }
 
 
@@ -2473,7 +2464,7 @@
         op = msg.createOperation(conn);
         dependency = remotePendingChanges.checkDependencies(op, msg);
 
-        while (!dependency && !replayDone && (retryCount-- > 0))
+        while (!dependency && !replayDone && retryCount-- > 0)
         {
           if (shutdown.get())
           {
@@ -2824,8 +2815,8 @@
       for (Modification mod : mods)
       {
         AttributeType modAttrType = mod.getAttribute().getAttributeType();
-        if ((mod.getModificationType() == ModificationType.DELETE
-            || mod.getModificationType() == ModificationType.REPLACE)
+        if (mod.getModificationType() == ModificationType.DELETE
+            || mod.getModificationType() == ModificationType.REPLACE
             && currentRDN.hasAttributeType(modAttrType))
         {
           if (currentRDN.hasAttributeType(modAttrType))
@@ -4429,14 +4420,11 @@
    *
    * @param startCSN
    *          The CSN where we need to start the search
-   * @param session
-   *          The session to use to publish the changes
    * @return A boolean indicating he success of the operation.
    * @throws Exception
    *           if an Exception happens during the search.
    */
-  public boolean buildAndPublishMissingChanges(CSN startCSN,
-      ReplicationBroker session) throws Exception
+  public boolean buildAndPublishMissingChanges(CSN startCSN) throws Exception
   {
     // Trim the changes in replayOperations that are older than the startCSN.
     synchronized (replayOperations)
@@ -4494,7 +4482,7 @@
 
       for (FakeOperation opToSend : opsToSend)
       {
-        session.publishRecovery(opToSend.generateMessage());
+        broker.publishRecovery(opToSend.generateMessage());
       }
       opsToSend.clear();
       if (lastRetrievedChange != null)
@@ -5205,8 +5193,8 @@
         return false;
 
       // Compare modes
-      if ((cfg1.isFractional() != cfg2.isFractional())
-          || (cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive()))
+      if (cfg1.isFractional() != cfg2.isFractional()
+          || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive())
         return false;
 
       // Compare all classes attributes
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java
index c506257..9b9cfd5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java
@@ -23,11 +23,13 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS
  */
 package org.opends.server.core.networkgroups;
 
 import java.util.ArrayList;
 import java.util.List;
+
 import org.opends.messages.Message;
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
@@ -35,18 +37,18 @@
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.types.DN;
+import org.opends.server.types.SearchFilter;
 import org.opends.server.types.SearchScope;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
-
-/*
+/**
  * This set of tests test the resource limits.
  */
+@SuppressWarnings("javadoc")
 public class ResourceLimitsPolicyTest extends DirectoryServerTestCase {
   //===========================================================================
   //
@@ -60,8 +62,7 @@
    * @throws Exception if the environment could not be set up.
    */
   @BeforeClass
-  public void setUp()
-    throws Exception
+  public void setUp() throws Exception
   {
     // This test suite depends on having the schema available,
     // so we'll start the server.
@@ -119,13 +120,11 @@
   public void testMaxNumberOfConnections()
           throws Exception
   {
-    ArrayList<Message> messages = new ArrayList<Message>();
+    List<Message> messages = new ArrayList<Message>();
 
-    ResourceLimitsPolicyFactory factory =
-        new ResourceLimitsPolicyFactory();
+    ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
     ResourceLimitsPolicy limits =
-        factory
-            .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
+        factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
               {
 
                 @Override
@@ -139,17 +138,14 @@
     InternalClientConnection conn1 = new InternalClientConnection(DN.NULL_DN);
     limits.addConnection(conn1);
 
-    boolean check = limits.isAllowed(conn1, null, true, messages);
-    assertTrue(check);
+    assertTrue(limits.isAllowed(conn1, null, true, messages));
 
     InternalClientConnection conn2 = new InternalClientConnection(DN.NULL_DN);
     limits.addConnection(conn2);
-    check = limits.isAllowed(conn2, null, true, messages);
-    assertFalse(check);
+    assertFalse(limits.isAllowed(conn2, null, true, messages));
 
     limits.removeConnection(conn1);
-    check = limits.isAllowed(conn2, null, true, messages);
-    assertTrue(check);
+    assertTrue(limits.isAllowed(conn2, null, true, messages));
 
     limits.removeConnection(conn2);
   }
@@ -162,13 +158,11 @@
   public void testMaxNumberOfConnectionsFromSameIp()
           throws Exception
   {
-    ArrayList<Message> messages = new ArrayList<Message>();
+    List<Message> messages = new ArrayList<Message>();
 
-    ResourceLimitsPolicyFactory factory =
-        new ResourceLimitsPolicyFactory();
+    ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
     ResourceLimitsPolicy limits =
-        factory
-            .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
+        factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
               {
 
                 @Override
@@ -182,17 +176,14 @@
     InternalClientConnection conn1 = new InternalClientConnection(DN.NULL_DN);
     limits.addConnection(conn1);
 
-    boolean check = limits.isAllowed(conn1, null, true, messages);
-    assertTrue(check);
+    assertTrue(limits.isAllowed(conn1, null, true, messages));
 
     InternalClientConnection conn2 = new InternalClientConnection(DN.NULL_DN);
     limits.addConnection(conn2);
-    check = limits.isAllowed(conn2, null, true, messages);
-    assertFalse(check);
+    assertFalse(limits.isAllowed(conn2, null, true, messages));
 
     limits.removeConnection(conn1);
-    check = limits.isAllowed(conn2, null, true, messages);
-    assertTrue(check);
+    assertTrue(limits.isAllowed(conn2, null, true, messages));
 
     limits.removeConnection(conn2);
   }
@@ -213,11 +204,9 @@
   {
     List<Message> messages = new ArrayList<Message>();
 
-    ResourceLimitsPolicyFactory factory =
-        new ResourceLimitsPolicyFactory();
+    ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
     ResourceLimitsPolicy limits =
-        factory
-            .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
+        factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
               {
 
                 @Override
@@ -236,12 +225,7 @@
         SearchScope.BASE_OBJECT,
         LDAPFilter.decode(searchFilter).toSearchFilter());
 
-    boolean check = limits.isAllowed(conn1, search, true, messages);
-    if (success) {
-      assertTrue(check);
-    } else {
-      assertFalse(check);
-    }
+    assertEquals(limits.isAllowed(conn1, search, true, messages), success);
     limits.removeConnection(conn1);
   }
 
@@ -254,7 +238,7 @@
   public void testMaxThroughput()
           throws Exception
   {
-    ArrayList<Message> messages = new ArrayList<Message>();
+    List<Message> messages = new ArrayList<Message>();
     final long interval = 1000; // Unit is milliseconds
 
     ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
@@ -276,35 +260,26 @@
     InternalClientConnection conn = new InternalClientConnection(DN.NULL_DN);
     limits.addConnection(conn);
 
-    InternalSearchOperation search1 = conn.processSearch(
-      DN.decode("dc=example,dc=com"),
-      SearchScope.BASE_OBJECT,
-      LDAPFilter.decode("(objectclass=*)").toSearchFilter());
+    final DN dn = DN.decode("dc=example,dc=com");
+    final SearchFilter all = SearchFilter.createFilterFromString("(objectclass=*)");
 
     // First operation is allowed
-    boolean check = limits.isAllowed(conn, search1, true, messages);
-    assertTrue(check);
-
-    InternalSearchOperation search2 = conn.processSearch(
-      DN.decode("dc=example,dc=com"),
-      SearchScope.BASE_OBJECT,
-      LDAPFilter.decode("(objectclass=*)").toSearchFilter());
+    InternalSearchOperation search1 =
+        conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
+    assertTrue(limits.isAllowed(conn, search1, true, messages));
 
     // Second operation in the same interval is refused
-    check = limits.isAllowed(conn, search2, true, messages);
-    assertFalse(check);
+    InternalSearchOperation search2 =
+        conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
+    assertFalse(limits.isAllowed(conn, search2, true, messages));
 
     // Wait for the end of the interval => counters are reset
     Thread.sleep(interval);
 
-    InternalSearchOperation search3 = conn.processSearch(
-      DN.decode("dc=example,dc=com"),
-      SearchScope.BASE_OBJECT,
-      LDAPFilter.decode("(objectclass=*)").toSearchFilter());
-
     // The operation is allowed
-    check = limits.isAllowed(conn, search3, true, messages);
-    assertTrue(check);
+    InternalSearchOperation search3 =
+        conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
+    assertTrue(limits.isAllowed(conn, search3, true, messages));
   }
 
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index c4d102e..b2d8ac5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -46,7 +46,6 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
-import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.*;
 import org.opends.server.replication.protocol.*;
@@ -462,9 +461,8 @@
      * Handle the handshake processing with the connecting DS
      * returns true if handshake was performed without errors
      */
-    private boolean performHandshake()
+    private boolean performHandshake() throws Exception
     {
-      try
       {
         // Receive server start
         ServerStartMsg serverStartMsg = (ServerStartMsg) session.receive();
@@ -520,17 +518,6 @@
         TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
           rsList);
         session.publish(topologyMsg);
-
-      } catch (IOException e)
-      {
-        fail("Unexpected io exception in fake replication server handshake " +
-          "processing: " + e);
-        return false;
-      } catch (Exception e)
-      {
-        fail("Unexpected exception in fake replication server handshake " +
-          "processing: " + e);
-        return false;
       }
       return true;
     }
@@ -558,7 +545,7 @@
     /**
      * Handle client connection then call code specific to configured test
      */
-    private void handleClientConnection()
+    private void handleClientConnection() throws Exception
     {
       debugInfo("handleClientConnection " + testcase + " " + scenario);
       // Handle DS connection
@@ -610,13 +597,12 @@
       debugInfo("handleClientConnection " + testcase + " " + scenario + " done");
     }
 
-    /*
+    /**
      * Make the RS send an add message with the passed entry and return the ack
      * message it receives from the DS
      */
-    private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws SocketTimeoutException
+    private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws Exception
     {
-      try
       {
         AddMsg addMsg =
           new AddMsg(gen.newCSN(), entry.getDN(), UUID.randomUUID().toString(),
@@ -632,86 +618,51 @@
 
         // Read and return matching ack
         return (AckMsg)session.receive();
-
-      } catch(SocketTimeoutException e)
-      {
-        throw e;
-      } catch (Throwable t)
-      {
-        fail("Unexpected exception in fake replication server sendAddUpdate " +
-          "processing: " + t);
-        return null;
       }
     }
 
     /**
      * Read the coming update and check parameters are not assured
      */
-    private void executeNotAssuredScenario()
+    private void executeNotAssuredScenario() throws Exception
     {
+      UpdateMsg updateMsg = (UpdateMsg) session.receive();
+      checkUpdateAssuredParameters(updateMsg);
 
-      try
-      {
-        UpdateMsg updateMsg = (UpdateMsg) session.receive();
-        checkUpdateAssuredParameters(updateMsg);
-
-        scenarioExecuted = true;
-      } catch (Exception e)
-      {
-        fail("Unexpected exception in fake replication server executeNotAssuredScenario " +
-          "processing: " + e);
-      }
+      scenarioExecuted = true;
     }
 
     /**
      * Read the coming update and make the client time out by not sending back
      * the ack
      */
-    private void executeTimeoutScenario()
+    private void executeTimeoutScenario() throws Exception
     {
+      UpdateMsg updateMsg = (UpdateMsg) session.receive();
+      checkUpdateAssuredParameters(updateMsg);
 
-      try
-      {
-        UpdateMsg updateMsg = (UpdateMsg) session.receive();
-        checkUpdateAssuredParameters(updateMsg);
+      scenarioExecuted = true;
 
-        scenarioExecuted = true;
-
-        // We do not send back an ack and the client code is expected to be
-        // blocked at least for the programmed timeout time.
-
-      } catch (Exception e)
-      {
-        fail("Unexpected exception in fake replication server executeTimeoutScenario " +
-          "processing: " + e + " testcase= " + testcase +
-          " groupId=" + groupId);
-      }
+      // We do not send back an ack and the client code is expected to be
+      // blocked at least for the programmed timeout time.
     }
 
     /**
      * Read the coming update, sleep some time then send back an ack
      */
-    private void executeNoTimeoutScenario()
+    private void executeNoTimeoutScenario() throws Exception
     {
-      try
-      {
-        UpdateMsg updateMsg = (UpdateMsg) session.receive();
-        checkUpdateAssuredParameters(updateMsg);
+      UpdateMsg updateMsg = (UpdateMsg) session.receive();
+      checkUpdateAssuredParameters(updateMsg);
 
-        // Sleep before sending back the ack
-        sleep(NO_TIMEOUT_RS_SLEEP_TIME);
+      // Sleep before sending back the ack
+      sleep(NO_TIMEOUT_RS_SLEEP_TIME);
 
-        // Send the ack without errors
-        AckMsg ackMsg = new AckMsg(updateMsg.getCSN());
-        session.publish(ackMsg);
+      // Send the ack without errors
+      AckMsg ackMsg = new AckMsg(updateMsg.getCSN());
+      session.publish(ackMsg);
 
-        scenarioExecuted = true;
-
-      } catch (Exception e)
-      {
-        fail("Unexpected exception in fake replication server executeNoTimeoutScenario " +
-          "processing: " + e);
-      }
+      scenarioExecuted = true;
     }
 
     /**
@@ -733,9 +684,8 @@
     /**
      * Read the coming safe read mode updates and send back acks with errors
      */
-    private void executeSafeReadManyErrorsScenario()
+    private void executeSafeReadManyErrorsScenario() throws Exception
     {
-      try
       {
         // Read first update
         UpdateMsg updateMsg = (UpdateMsg) session.receive();
@@ -779,20 +729,14 @@
         // let timeout occur
 
         scenarioExecuted = true;
-
-      } catch (Exception e)
-      {
-        fail("Unexpected exception in fake replication server executeSafeReadManyErrorsScenario " +
-          "processing: " + e);
       }
     }
 
     /**
-     * Read the coming seaf data mode updates and send back acks with errors
+     * Read the coming safe data mode updates and send back acks with errors
      */
-    private void executeSafeDataManyErrorsScenario()
+    private void executeSafeDataManyErrorsScenario() throws Exception
     {
-      try
       {
         // Read first update
         UpdateMsg updateMsg = (UpdateMsg) session.receive();
@@ -830,32 +774,12 @@
         checkUpdateAssuredParameters(updateMsg);
 
         // let timeout occur
-
         scenarioExecuted = true;
-
-      } catch (Exception e)
-      {
-        fail("Unexpected exception in fake replication server executeSafeDataManyErrorsScenario " +
-          "processing: " + e);
       }
     }
   }
 
   /**
-   * Sleep a while
-   */
-  private void sleep(long time)
-  {
-    try
-    {
-      Thread.sleep(time);
-    } catch (InterruptedException ex)
-    {
-      fail("Error sleeping " + ex);
-    }
-  }
-
-  /**
    * Return various group id values
    */
   @DataProvider(name = "rsGroupIdProvider")
@@ -877,7 +801,6 @@
   @Test(dataProvider = "rsGroupIdProvider")
   public void testSafeDataModeTimeout(byte rsGroupId) throws Exception
   {
-
     int TIMEOUT = 5000;
     String testcase = "testSafeDataModeTimeout" + rsGroupId;
     try
@@ -894,8 +817,7 @@
       // Create a safe data assured domain
       if (rsGroupId == (byte)1)
       {
-        safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 1,
-        TIMEOUT);
+        safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT);
         // Wait for connection of domain to RS
         waitForConnectionToRs(testcase, replicationServer);
 
@@ -926,15 +848,11 @@
       if (rsGroupId == (byte)1)
       {
         // RS has same group id as DS
-        // In this scenario, the fake RS will not send back an ack so we expect
-        // the add entry code (LDAP client code emulation) to be blocked for the
-        // timeout value at least. If the time we have slept is lower, timeout
-        // handling code is not working...
-        assertTrue((endTime - startTime) >= TIMEOUT);
+        assertBlockedLongerThanTimeout(startTime, endTime, TIMEOUT);
         assertTrue(replicationServer.isScenarioExecuted());
 
         // Check monitoring values
-        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+        Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
         DN baseDN = DN.decode(SAFE_DATA_DN);
         new MonitorAssertions(baseDN)
           .assertValue("assured-sd-sent-updates", 1)
@@ -948,7 +866,7 @@
 
         // No error should be seen in monitoring and update should have not been
         // sent in assured mode
-        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+        Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
         DN baseDN = DN.decode(NOT_ASSURED_DN);
         new MonitorAssertions(baseDN).assertRemainingValuesAreZero();
         assertNoServerErrors(baseDN);
@@ -992,7 +910,6 @@
   @Test(dataProvider = "rsGroupIdProvider")
   public void testSafeReadModeTimeout(byte rsGroupId) throws Exception
   {
-
     int TIMEOUT = 5000;
     String testcase = "testSafeReadModeTimeout" + rsGroupId;
     try
@@ -1010,8 +927,7 @@
       if (rsGroupId == (byte)1)
       {
         // Create a safe read assured domain
-        safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
-            TIMEOUT);
+        safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
         // Wait for connection of domain to RS
         waitForConnectionToRs(testcase, replicationServer);
 
@@ -1041,16 +957,11 @@
 
       if (rsGroupId == (byte)1)
       {
-        // RS has same group id as DS
-        // In this scenario, the fake RS will not send back an ack so we expect
-        // the add entry code (LDAP client code emulation) to be blocked for the
-        // timeout value at least. If the time we have slept is lower, timeout
-        // handling code is not working...
-        assertTrue((endTime - startTime) >= TIMEOUT);
+        assertBlockedLongerThanTimeout(startTime, endTime, TIMEOUT);
         assertTrue(replicationServer.isScenarioExecuted());
 
         // Check monitoring values
-        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+        Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
         DN baseDN = DN.decode(SAFE_READ_DN);
         new MonitorAssertions(baseDN)
           .assertValue("assured-sr-sent-updates", 1)
@@ -1065,7 +976,7 @@
 
         // No error should be seen in monitoring and update should have not been
         // sent in assured mode
-        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+        Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
         DN baseDN = DN.decode(NOT_ASSURED_DN);
         new MonitorAssertions(baseDN).assertRemainingValuesAreZero();
         assertNoServerErrors(baseDN);
@@ -1117,7 +1028,7 @@
    * Wait for connection to the fake replication server or times out with error
    * after some seconds
    */
-  private void waitForConnectionToRs(String testCase, FakeReplicationServer rs)
+  private void waitForConnectionToRs(String testCase, FakeReplicationServer rs) throws Exception
   {
     int nsec = -1;
     do
@@ -1125,7 +1036,7 @@
       nsec++;
       if (nsec == 10) // 10 seconds timeout
         fail(testCase + ": timeout waiting for domain connection to fake RS after " + nsec + " seconds.");
-      sleep(1000);
+      Thread.sleep(1000);
     } while (!rs.isHandshakeOk());
   }
 
@@ -1133,7 +1044,7 @@
    * Wait for the scenario to be executed by the fake replication server or
    * times out with error after some seconds
    */
-  private void waitForScenarioExecutedOnRs(String testCase, FakeReplicationServer rs)
+  private void waitForScenarioExecutedOnRs(String testCase, FakeReplicationServer rs) throws Exception
   {
     int nsec = -1;
     do
@@ -1141,7 +1052,7 @@
       nsec++;
       if (nsec == 10) // 10 seconds timeout
         fail(testCase + ": timeout waiting for scenario to be exectued on fake RS after " + nsec + " seconds.");
-      sleep(1000);
+      Thread.sleep(1000);
     } while (!rs.isScenarioExecuted());
   }
 
@@ -1163,7 +1074,6 @@
   @Test
   public void testSafeDataModeAck() throws Exception
   {
-
     int TIMEOUT = 5000;
     String testcase = "testSafeDataModeAck";
     try
@@ -1175,8 +1085,7 @@
       replicationServer.start(NO_TIMEOUT_SCENARIO);
 
       // Create a safe data assured domain
-      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 2,
-        TIMEOUT);
+      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 2, TIMEOUT);
       // Wait for connection of domain to RS
       waitForConnectionToRs(testcase, replicationServer);
 
@@ -1187,16 +1096,11 @@
         "objectClass: organizationalUnit\n";
       addEntry(TestCaseUtils.entryFromLdifString(entry));
 
-      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
-      // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
-      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
-      long endTime = System.currentTimeMillis();
-      long callTime = endTime - startTime;
-      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
       assertTrue(replicationServer.isScenarioExecuted());
 
       // Check monitoring values
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       DN baseDN = DN.decode(SAFE_DATA_DN);
       new MonitorAssertions(baseDN)
         .assertValue("assured-sd-sent-updates", 1)
@@ -1216,7 +1120,6 @@
   @Test
   public void testSafeReadModeAck() throws Exception
   {
-
     int TIMEOUT = 5000;
     String testcase = "testSafeReadModeAck";
     try
@@ -1227,8 +1130,7 @@
       replicationServer.start(NO_TIMEOUT_SCENARIO);
 
       // Create a safe read assured domain
-      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
-        TIMEOUT);
+      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
       // Wait for connection of domain to RS
       waitForConnectionToRs(testcase, replicationServer);
 
@@ -1239,16 +1141,11 @@
         "objectClass: organizationalUnit\n";
       addEntry(TestCaseUtils.entryFromLdifString(entry));
 
-      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
-      // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
-      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
-      long endTime = System.currentTimeMillis();
-      long callTime = endTime - startTime;
-      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
       assertTrue(replicationServer.isScenarioExecuted());
 
       // Check monitoring values
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       DN baseDN = DN.decode(SAFE_READ_DN);
       new MonitorAssertions(baseDN)
         .assertValue("assured-sr-sent-updates", 1)
@@ -1268,7 +1165,6 @@
   @Test(dataProvider = "rsGroupIdProvider", groups = "slow")
   public void testSafeReadModeReply(byte rsGroupId) throws Exception
   {
-
     int TIMEOUT = 5000;
     String testcase = "testSafeReadModeReply";
     try
@@ -1279,8 +1175,7 @@
       replicationServer.start(NO_READ);
 
       // Create a safe read assured domain
-      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
-        TIMEOUT);
+      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
       // Wait for connection of domain to RS
       waitForConnectionToRs(testcase, replicationServer);
 
@@ -1356,7 +1251,6 @@
   @Test(dataProvider = "rsGroupIdProvider", groups = "slow")
   public void testSafeDataModeReply(byte rsGroupId) throws Exception
   {
-
     int TIMEOUT = 5000;
     String testcase = "testSafeDataModeReply";
     try
@@ -1367,8 +1261,7 @@
       replicationServer.start(NO_READ);
 
       // Create a safe data assured domain
-      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4,
-        TIMEOUT);
+      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4, TIMEOUT);
       // Wait for connection of domain to RS
       waitForConnectionToRs(testcase, replicationServer);
 
@@ -1380,19 +1273,14 @@
       Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
       String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN));
 
-      AckMsg ackMsg;
-      try
-      {
-        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
-      } catch (SocketTimeoutException e)
-      {
-        // Expected
-        return;
-      }
-
-      fail("DS should not reply an ack in safe data mode, however, it replied: " +
-        ackMsg);
-    } finally
+      AckMsg ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
+      fail("DS should not reply an ack in safe data mode, however, it replied: " + ackMsg);
+    }
+    catch (SocketTimeoutException expected)
+    {
+      return;
+    }
+    finally
     {
       endTest(testcase);
     }
@@ -1405,7 +1293,6 @@
   @Test(groups = "slow")
   public void testSafeDataManyErrors() throws Exception
   {
-
     int TIMEOUT = 5000;
     String testcase = "testSafeDataManyErrors";
     try
@@ -1417,8 +1304,7 @@
       replicationServer.start(SAFE_DATA_MANY_ERRORS);
 
       // Create a safe data assured domain
-      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 3,
-        TIMEOUT);
+      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 3, TIMEOUT);
       // Wait for connection of domain to RS
       waitForConnectionToRs(testcase, replicationServer);
 
@@ -1430,18 +1316,13 @@
         "objectClass: organizationalUnit\n";
       addEntry(TestCaseUtils.entryFromLdifString(entry));
 
-      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
-      // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
-      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
-      long endTime = System.currentTimeMillis();
-      long callTime = endTime - startTime;
-      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
 
       // Check monitoring values
       // The expected ack for the first update is:
       // - timeout error
       // - server 10 error
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       DN baseDN = DN.decode(SAFE_DATA_DN);
       new MonitorAssertions(baseDN)
         .assertValue("assured-sd-sent-updates", 1)
@@ -1453,18 +1334,13 @@
       startTime = System.currentTimeMillis(); // Time the update has been initiated
       deleteEntry(entryDn);
 
-      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
-      // seconds, so we expect the delete entry code (LDAP client code emulation) to be blocked
-      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
-      endTime = System.currentTimeMillis();
-      callTime = endTime - startTime;
-      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
 
       // Check monitoring values
       // The expected ack for the second update is:
       // - timeout error
       // - server 10 error, server 20 error
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       baseDN = DN.decode(SAFE_DATA_DN);
       new MonitorAssertions(baseDN)
         .assertValue("assured-sd-sent-updates", 2)
@@ -1476,17 +1352,12 @@
       startTime = System.currentTimeMillis(); // Time the update has been initiated
       addEntry(TestCaseUtils.entryFromLdifString(entry));
 
-      // In this scenario, the fake RS will not send back an ack so we expect
-      // the add entry code (LDAP client code emulation) to be blocked for the
-      // timeout value at least. If the time we have slept is lower, timeout
-      // handling code is not working...
-      endTime = System.currentTimeMillis();
-      assertTrue((endTime - startTime) >= TIMEOUT);
+      assertBlockedLongerThanTimeout(startTime, System.currentTimeMillis(), TIMEOUT);
       assertTrue(replicationServer.isScenarioExecuted());
 
       // Check monitoring values
       // No ack should have comen back, so timeout incremented (flag and error for rs)
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       baseDN = DN.decode(SAFE_DATA_DN);
       new MonitorAssertions(baseDN)
         .assertValue("assured-sd-sent-updates", 3)
@@ -1494,7 +1365,6 @@
         .assertRemainingValuesAreZero();
       assertServerErrorsSafeDataMode(baseDN,
           entry(10, 2), entry(20, 1), entry(RS_SERVER_ID, 1));
-
     } finally
     {
       endTest(testcase);
@@ -1502,13 +1372,36 @@
   }
 
   /**
+   * In this scenario, the fake RS will not send back an ack so we expect the
+   * add entry code (LDAP client code emulation) to be blocked for the timeout
+   * value at least. If the time we have slept is lower, timeout handling code
+   * is not working...
+   */
+  private void assertBlockedLongerThanTimeout(long startTime, long endTime, int TIMEOUT)
+  {
+    assertTrue((endTime - startTime) >= TIMEOUT);
+  }
+
+  /**
+   * In this scenario, the fake RS will send back an ack after
+   * NO_TIMEOUT_RS_SLEEP_TIME seconds, so we expect the add/delete entry code
+   * (LDAP client code emulation) to be blocked for more than
+   * NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
+   */
+  private void assertBlockedForLessThanTimeout(long startTime, int TIMEOUT)
+  {
+    long endTime = System.currentTimeMillis();
+    long callTime = endTime - startTime;
+    assertTrue(NO_TIMEOUT_RS_SLEEP_TIME <= callTime && callTime <= TIMEOUT);
+  }
+
+  /**
    * DS performs many successive modifications in safe read mode and receives RS
    * acks with various errors. Check for monitoring right errors
    */
   @Test(groups = "slow")
   public void testSafeReadManyErrors() throws Exception
   {
-
     int TIMEOUT = 5000;
     String testcase = "testSafeReadManyErrors";
     try
@@ -1519,8 +1412,7 @@
       replicationServer.start(SAFE_READ_MANY_ERRORS);
 
       // Create a safe read assured domain
-      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
-        TIMEOUT);
+      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
       // Wait for connection of domain to RS
       waitForConnectionToRs(testcase, replicationServer);
 
@@ -1532,18 +1424,13 @@
         "objectClass: organizationalUnit\n";
       addEntry(TestCaseUtils.entryFromLdifString(entry));
 
-      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
-      // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
-      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
-      long endTime = System.currentTimeMillis();
-      long callTime = endTime - startTime;
-      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
 
       // Check monitoring values
       // The expected ack for the first update is:
       // - replay error
       // - server 10 error, server 20 error
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       DN baseDN = DN.decode(SAFE_READ_DN);
       new MonitorAssertions(baseDN)
         .assertValue("assured-sr-sent-updates", 1)
@@ -1556,12 +1443,7 @@
       startTime = System.currentTimeMillis(); // Time the update has been initiated
       deleteEntry(entryDn);
 
-      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
-      // seconds, so we expect the delete entry code (LDAP client code emulation) to be blocked
-      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
-      endTime = System.currentTimeMillis();
-      callTime = endTime - startTime;
-      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
 
       // Check monitoring values
       // The expected ack for the second update is:
@@ -1569,7 +1451,7 @@
       // - wrong status error
       // - replay error
       // - server 10 error, server 20 error, server 30 error
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       new MonitorAssertions(baseDN)
         .assertValue("assured-sr-sent-updates", 2)
         .assertValue("assured-sr-not-acknowledged-updates", 2)
@@ -1584,17 +1466,12 @@
       startTime = System.currentTimeMillis(); // Time the update has been initiated
       addEntry(TestCaseUtils.entryFromLdifString(entry));
 
-      // In this scenario, the fake RS will not send back an ack so we expect
-      // the add entry code (LDAP client code emulation) to be blocked for the
-      // timeout value at least. If the time we have slept is lower, timeout
-      // handling code is not working...
-      endTime = System.currentTimeMillis();
-      assertTrue((endTime - startTime) >= TIMEOUT);
+      assertBlockedLongerThanTimeout(startTime, System.currentTimeMillis(), TIMEOUT);
       assertTrue(replicationServer.isScenarioExecuted());
 
       // Check monitoring values
       // No ack should have comen back, so timeout incremented (flag and error for rs)
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       new MonitorAssertions(baseDN)
         .assertValue("assured-sr-sent-updates", 3)
         .assertValue("assured-sr-not-acknowledged-updates", 3)
@@ -1615,14 +1492,14 @@
    */
   private void deleteEntry(String dn) throws Exception
   {
-    DN realDn = DN.decode(dn);
+    DN realDN = DN.decode(dn);
     DeleteOperationBasis delOp = new DeleteOperationBasis(connection,
-      InternalClientConnection.nextOperationID(), InternalClientConnection.
-      nextMessageID(), null, realDn);
+      InternalClientConnection.nextOperationID(),
+      InternalClientConnection.nextMessageID(), null, realDN);
     delOp.setInternalOperation(true);
     delOp.run();
     waitOpResult(delOp, ResultCode.SUCCESS);
-    assertNull(DirectoryServer.getEntry(realDn));
+    assertNull(DirectoryServer.getEntry(realDN));
   }
 
   /**
@@ -1636,8 +1513,9 @@
     AssuredMode assuredMode) throws Exception
   {
     // Find monitoring entry for requested base DN
-    String monitorFilter =
-         "(&(cn=Directory server*)(domain-name=" + baseDN + "))";
+    SearchFilter monitorFilter = SearchFilter.createFilterFromString(
+        "(&(cn=Directory server*)(domain-name=" + baseDN + "))");
+    DN dn = DN.decode("cn=replication,cn=monitor");
 
     InternalSearchOperation op;
     int count = 0;
@@ -1645,19 +1523,14 @@
     {
       if (count++>0)
         Thread.sleep(100);
-      op = connection.processSearch(
-                                    ByteString.valueOf("cn=replication,cn=monitor"),
-                                    SearchScope.WHOLE_SUBTREE,
-                                    LDAPFilter.decode(monitorFilter));
+      op = connection.processSearch(dn, SearchScope.WHOLE_SUBTREE, monitorFilter);
     }
-    while (op.getSearchEntries().isEmpty() && (count<100));
-    if (op.getSearchEntries().isEmpty())
-      throw new Exception("Could not read monitoring information");
+    while (op.getSearchEntries().isEmpty() && count < 100);
+
+    Assertions.assertThat(op.getSearchEntries()).isNotEmpty();
 
     SearchResultEntry entry = op.getSearchEntries().getFirst();
-
-    if (entry == null)
-      throw new Exception("Could not find monitoring entry");
+    assertNotNull(entry);
 
     /*
      * Find the multi valued attribute matching the requested assured mode
@@ -1676,37 +1549,33 @@
     }
 
     List<Attribute> attrs = entry.getAttribute(assuredAttr);
+    if (attrs == null || attrs.isEmpty())
+      return Collections.emptyMap();
 
-    Map<Integer,Integer> resultMap = new HashMap<Integer,Integer>();
-    if ( (attrs == null) || (attrs.isEmpty()) )
-      return resultMap; // Empty map
-
-    Attribute attr = attrs.get(0);
     // Parse and store values
-    for (AttributeValue val : attr) {
-      String srvStr = val.toString();
-      StringTokenizer strtok = new StringTokenizer(srvStr, ":");
-      String token = strtok.nextToken();
-      if (token != null) {
-        int serverId = Integer.valueOf(token);
-        token = strtok.nextToken();
-        if (token != null) {
-          Integer nerrors = Integer.valueOf(token);
-          resultMap.put(serverId, nerrors);
+    Map<Integer,Integer> resultMap = new HashMap<Integer,Integer>();
+    for (AttributeValue val : attrs.get(0))
+    {
+      StringTokenizer strtok = new StringTokenizer(val.toString(), ":");
+
+      String serverId = strtok.nextToken();
+      if (serverId != null) {
+        String nbErrors = strtok.nextToken();
+        if (nbErrors != null) {
+          resultMap.put(Integer.valueOf(serverId), Integer.valueOf(nbErrors));
         }
       }
     }
-
     return resultMap;
   }
 
-  private void waitOpResult(Operation operation, ResultCode expectedResult)
+  private void waitOpResult(Operation operation, ResultCode expectedResult) throws Exception
   {
     int ii=0;
     while((operation.getResultCode()==ResultCode.UNDEFINED) ||
         (operation.getResultCode()!=expectedResult))
     {
-      sleep(50);
+      Thread.sleep(50);
       ii++;
       if (ii>10)
         assertEquals(operation.getResultCode(), expectedResult,
@@ -1714,4 +1583,3 @@
     }
   }
 }
-
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 36a4d91..87cdf24 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -35,6 +35,7 @@
 import java.net.SocketTimeoutException;
 import java.util.*;
 
+import org.assertj.core.api.Assertions;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.api.SynchronizationProvider;
 import org.opends.server.backends.task.TaskState;
@@ -646,33 +647,23 @@
   }
 
   /**
-   * Chaining tests of the replication Server code with 2 replication servers involved
-   * 2 tests are done here (itest=0 or itest=1)
-   *
-   * Test 1
-   * - Create replication server 1
-   * - Create replication server 2 connected with replication server 1
-   * - Create and connect client 1 to replication server 1
-   * - Create and connect client 2 to replication server 2
-   * - Make client1 publish changes
-   * - Check that client 2 receives the changes published by client 1
-   *
-   * Test 2
-   * - Create replication server 1
-   * - Create and connect client1 to replication server 1
-   * - Make client1 publish changes
-   * - Create replication server 2 connected with replication server 1
-   * - Create and connect client 2 to replication server 2
-   * - Check that client 2 receives the changes published by client 1
+   * <ol>
+   * <li>Create replication server 1</li>
+   * <li>Create replication server 2 connected with replication server 1</li>
+   * <li>Create and connect client 1 to replication server 1</li>
+   * <li>Create and connect client 2 to replication server 2</li>
+   * <li>Make client1 publish changes</li>
+   * <li>Check that client 2 receives the changes published by client 1</li>
+   * </ol>
    */
   @Test(enabled=true, dependsOnMethods = { "searchBackend"})
-  public void changelogChaining() throws Exception
+  public void changelogChaining0() throws Exception
   {
-    debugInfo("Starting changelogChaining");
+    final String tn = "changelogChaining0";
+    debugInfo("Starting " + tn);
     replicationServer.clearDb();
     TestCaseUtils.initializeTestBackend(true);
 
-    for (int itest = 0; itest <2; itest++)
     {
       ReplicationBroker broker2 = null;
       boolean emptyOldChanges = true;
@@ -683,13 +674,11 @@
       int[] changelogIds = new int[] { 80, 81 };
       int[] brokerIds = new int[] { 100, 101 };
 
-      for (int i = 0; i < ((itest == 0) ? 2 : 1); i++)
+      for (int i = 0; i < 2; i++)
       {
         changelogs[i] = null;
 
-        // for itest=0, create the 2 connected replicationServer
-        // for itest=1, create the 1rst replicationServer, the second
-        // one will be created later
+        // create the 2 connected replicationServer
         SortedSet<String> servers = new TreeSet<String>();
         servers.add(
           "localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
@@ -703,30 +692,118 @@
 
       try
       {
-        // For itest=0, create and connect client1 to changelog1
-        //              and client2 to changelog2
-        // For itest=1, only create and connect client1 to changelog1
-        //              client2 will be created later
+        // create and connect client1 to changelog1 and client2 to changelog2
         broker1 = openReplicationSession(TEST_ROOT_DN,
              brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
         assertTrue(broker1.isConnected());
-
-        if (itest == 0)
-        {
-          broker2 = openReplicationSession(TEST_ROOT_DN,
-             brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
-          assertTrue(broker2.isConnected());
-        }
+        broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100,
+             changelogPorts[0], 1000, !emptyOldChanges);
+        assertTrue(broker2.isConnected());
 
         // - Test messages between clients by publishing now
 
         // - Delete
-        long time = TimeThread.getTime();
-        int ts = 1;
-        CSN csn = new CSN(time, ts++, brokerIds[0]);
+        CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
+        DN dn = DN.decode("o=example" + 0 + "," + TEST_ROOT_DN_STRING);
+        DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
+        broker1.publish(delMsg);
 
-        DN dn = DN.decode("o=example" + itest + "," + TEST_ROOT_DN_STRING);
-        DeleteMsg delMsg = new DeleteMsg(dn, csn, "uid");
+        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
+        String baseUUID = "22222222-2222-2222-2222-222222222222";
+
+        // - Add
+        Entry entry = TestCaseUtils.entryFromLdifString(
+        "dn: o=example," + TEST_ROOT_DN_STRING + "\n"
+            + "objectClass: top\n" + "objectClass: domain\n"
+            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
+        AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
+            user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
+            entry.getAttributes(), new ArrayList<Attribute>());
+        broker1.publish(addMsg);
+
+        // - Modify
+        Attribute attr1 = Attributes.create("description", "new value");
+        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
+        List<Modification> mods = new ArrayList<Modification>();
+        mods.add(mod1);
+        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
+        broker1.publish(modMsg);
+
+        // - ModifyDN
+        ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null,
+            EXAMPLE_DN, RDN.decode("o=example2"), true, null);
+        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
+        LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
+        ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
+        broker1.publish(modDNMsg);
+
+        // - Check msg receives by broker, through changeLog2
+        List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4);
+        Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg);
+        debugInfo("Ending " + tn);
+      }
+      finally
+      {
+        remove(changelogs);
+        stop(broker1, broker2);
+      }
+    }
+  }
+
+  /**
+   * <ol>
+   * <li>Create replication server 1</li>
+   * <li>Create and connect client1 to replication server 1</li>
+   * <li>Make client1 publish changes</li>
+   * <li>Create replication server 2 connected with replication server 1</li>
+   * <li>Create and connect client 2 to replication server 2</li>
+   * <li>Check that client 2 receives the changes published by client 1</li>
+   * <ol>
+   */
+  @Test(enabled=true, dependsOnMethods = { "searchBackend"})
+  public void changelogChaining1() throws Exception
+  {
+    final String tn = "changelogChaining1";
+    debugInfo("Starting " + tn);
+    replicationServer.clearDb();
+    TestCaseUtils.initializeTestBackend(true);
+
+    {
+      ReplicationBroker broker2 = null;
+      boolean emptyOldChanges = true;
+
+      // - Create 2 connected replicationServer
+      ReplicationServer[] changelogs = new ReplicationServer[2];
+      int[] changelogPorts = TestCaseUtils.findFreePorts(2);
+      int[] changelogIds = new int[] { 80, 81 };
+      int[] brokerIds = new int[] { 100, 101 };
+
+      {
+        // create the 1rst replicationServer, the second one will be created later
+        SortedSet<String> servers = new TreeSet<String>();
+        servers.add("localhost:" + changelogPorts[1]);
+        ReplServerFakeConfiguration conf =
+          new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb"+0, 0,
+                                         changelogIds[0], 0, 100, servers);
+        changelogs[0] = new ReplicationServer(conf);
+      }
+
+      ReplicationBroker broker1 = null;
+
+      try
+      {
+        // only create and connect client1 to changelog1 client2 will be created later
+        broker1 = openReplicationSession(TEST_ROOT_DN,
+             brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
+        assertTrue(broker1.isConnected());
+
+        // - Test messages between clients by publishing now
+
+        // - Delete
+        CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
+
+        DN dn = DN.decode("o=example" + 1 + "," + TEST_ROOT_DN_STRING);
+        DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
         broker1.publish(delMsg);
 
         String user1entryUUID = "33333333-3333-3333-3333-333333333333";
@@ -737,8 +814,7 @@
             + "objectClass: top\n" + "objectClass: domain\n"
             + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
         Entry entry = TestCaseUtils.entryFromLdifString(lentry);
-        csn = new CSN(time, ts++, brokerIds[0]);
-        AddMsg addMsg = new AddMsg(csn, EXAMPLE_DN,
+        AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
             user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
             entry.getAttributes(), new ArrayList<Attribute>());
         broker1.publish(addMsg);
@@ -748,88 +824,32 @@
         Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
         List<Modification> mods = new ArrayList<Modification>();
         mods.add(mod1);
-        csn = new CSN(time, ts++, brokerIds[0]);
-        ModifyMsg modMsg = new ModifyMsg(csn, EXAMPLE_DN, mods, "fakeuniqueid");
+        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
         broker1.publish(modMsg);
 
         // - ModifyDN
-        csn = new CSN(time, ts++, brokerIds[0]);
         ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null,
             EXAMPLE_DN, RDN.decode("o=example2"), true, null);
-        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csn, "uniqueid",
-        "newparentId"));
-        LocalBackendModifyDNOperation localOp =
-          new LocalBackendModifyDNOperation(op);
+        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
+        LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
         ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
         broker1.publish(modDNMsg);
 
-        if (itest > 0)
-        {
-          SortedSet<String> servers = new TreeSet<String>();
-          servers.add("localhost:"+changelogPorts[0]);
-          ReplServerFakeConfiguration conf =
-            new ReplServerFakeConfiguration(changelogPorts[1], null, 0,
-                                           changelogIds[1], 0, 100, null);
-          changelogs[1] = new ReplicationServer(conf);
+        SortedSet<String> servers = new TreeSet<String>();
+        servers.add("localhost:"+changelogPorts[0]);
+        ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
+            changelogPorts[1], null, 0, changelogIds[1], 0, 100, null);
+        changelogs[1] = new ReplicationServer(conf);
 
-          // Connect broker 2 to changelog2
-          broker2 = openReplicationSession(TEST_ROOT_DN,
-              brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
-          assertTrue(broker2.isConnected());
-        }
+        // Connect broker 2 to changelog2
+        broker2 = openReplicationSession(TEST_ROOT_DN,
+            brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
+        assertTrue(broker2.isConnected());
 
         // - Check msg receives by broker, through changeLog2
-        while (ts > 1)
-        {
-          ReplicationMsg msg2;
-          try
-          {
-            msg2 = broker2.receive();
-            if (msg2 == null)
-              break;
-            broker2.updateWindowAfterReplay();
-          }
-          catch (Exception e)
-          {
-            fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest);
-            break;
-          }
-
-          if (msg2 instanceof DeleteMsg)
-          {
-            if (delMsg.equals(msg2))
-              ts--;
-          }
-          else if (msg2 instanceof AddMsg)
-          {
-            if (addMsg.equals(msg2))
-              ts--;
-          }
-          else if (msg2 instanceof ModifyMsg)
-          {
-            if (modMsg.equals(msg2))
-              ts--;
-          }
-          else if (msg2 instanceof ModifyDNMsg)
-          {
-            if (modDNMsg.equals(msg2))
-              ts--;
-          }
-          else if (msg2 instanceof TopologyMsg)
-          {
-            // Nothing to test here.
-          }
-          else
-          {
-            fail("ReplicationServer transmission failed: no expected message" +
-              " class: " + msg2);
-            break;
-          }
-        }
-        // Check that everything expected has been received
-        assertEquals(ts, 1, "Broker2 did not receive the complete set of"
-            + " expected messages: #msg received " + ts);
-        debugInfo("Ending changelogChaining");
+        List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4);
+        Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg);
+        debugInfo("Ending " + tn);
       }
       finally
       {
@@ -839,6 +859,30 @@
     }
   }
 
+  private List<ReplicationMsg> receiveReplicationMsgs(ReplicationBroker broker2, int nbMessagesExpected)
+  {
+    List<ReplicationMsg> msgs = new ArrayList<ReplicationMsg>(nbMessagesExpected);
+    for (int i = 0; i < nbMessagesExpected; i++)
+    {
+      try
+      {
+        ReplicationMsg msg = broker2.receive();
+        if (msg == null)
+          break;
+        if (msg instanceof TopologyMsg)
+          continue; // ignore
+        msgs.add(msg);
+
+        broker2.updateWindowAfterReplay();
+      }
+      catch (SocketTimeoutException e)
+      {
+        fail("Broker receive failed: " + e.getMessage() + "#Msg:" + i);
+      }
+    }
+    return msgs;
+  }
+
   /**
    * Test that the Replication sends back correctly WindowsUpdate
    * when we send a WindowProbeMsg.
@@ -859,7 +903,7 @@
      * Some other tests may have been running before and therefore
      * may have pushed some changes to the Replication Server
      * When a new session is opened, the Replication Server is therefore
-     * going to send all thoses old changes to this Replication Server.
+     * going to send all these old changes to this Replication Server.
      * To avoid this, this test open a first session, save the
      * ServerState from the ReplicationServer, close the session
      * and re-open a new connection providing the ServerState it just
@@ -1207,28 +1251,26 @@
    private List<UpdateMsg> createChanges(String suffix, int serverId) throws Exception
    {
      List<UpdateMsg> l = new ArrayList<UpdateMsg>();
-     long time = TimeThread.getTime();
-     int ts = 1;
 
      {
        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
        String baseUUID       = "22222222-2222-2222-2222-222222222222";
 
        // - Add
-       String lentry = "dn: "+suffix+"\n"
+       Entry entry = TestCaseUtils.entryFromLdifString(
+       "dn: "+suffix+"\n"
            + "objectClass: top\n"
            + "objectClass: domain\n"
-           + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
-       Entry entry = TestCaseUtils.entryFromLdifString(lentry);
-       CSN csn = new CSN(time, ts++, serverId);
+           + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
+       CSNGenerator csnGen = new CSNGenerator(serverId, TimeThread.getTime());
        DN exampleSuffixDN = DN.decode("o=example," + suffix);
-       AddMsg addMsg = new AddMsg(csn, exampleSuffixDN,
+       AddMsg addMsg = new AddMsg(csnGen.newCSN(), exampleSuffixDN,
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
            entry.getAttributes(), new ArrayList<Attribute>());
        l.add(addMsg);
 
        // - Add
-       String luentry =
+       Entry uentry = TestCaseUtils.entryFromLdifString(
              "dn: cn=Fiona Jensen,ou=People,"+suffix+"\n"
            + "objectClass: top\n"
            + "objectclass: person\n"
@@ -1239,12 +1281,10 @@
            + "givenName: fjensen\n"
            + "telephonenumber: +1 408 555 1212\n"
            + "entryUUID: " + user1entryUUID +"\n"
-           + "userpassword: fjen$$en" + "\n";
-       Entry uentry = TestCaseUtils.entryFromLdifString(luentry);
-       csn = new CSN(time, ts++, serverId);
+           + "userpassword: fjen$$en" + "\n");
        DN newPersonDN = DN.decode("uid=new person,ou=People,"+suffix);
        AddMsg addMsg2 = new AddMsg(
-           csn,
+           csnGen.newCSN(),
            newPersonDN,
            user1entryUUID,
            baseUUID,
@@ -1263,22 +1303,18 @@
 
        List<Modification> mods = Arrays.asList(mod1, mod2, mod3);
 
-       csn = new CSN(time, ts++, serverId);
        DN dn = exampleSuffixDN;
-       ModifyMsg modMsg = new ModifyMsg(csn, dn,
-           mods, "fakeuniqueid");
+       ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), dn, mods, "fakeuniqueid");
        l.add(modMsg);
 
        // Modify DN
-       csn = new CSN(time, ts++, serverId);
-       ModifyDNMsg  modDnMsg = new ModifyDNMsg(newPersonDN, csn,
+       ModifyDNMsg  modDnMsg = new ModifyDNMsg(newPersonDN, csnGen.newCSN(),
            user1entryUUID, baseUUID, false,
            "uid=wrong, ou=people,"+suffix, "uid=newrdn");
        l.add(modDnMsg);
 
        // Del
-       csn = new CSN(time, ts++, serverId);
-       DeleteMsg delMsg = new DeleteMsg(exampleSuffixDN, csn, "uid");
+       DeleteMsg delMsg = new DeleteMsg(exampleSuffixDN, csnGen.newCSN(), "uid");
        l.add(delMsg);
      }
      return l;
@@ -1567,8 +1603,8 @@
          changelogs[i] = new ReplicationServer(conf);
        }
 
-       try
-       {
+    try
+    {
          // Create and connect client1 to changelog1
          // and client2 to changelog2
          broker1 = openReplicationSession(TEST_ROOT_DN,
@@ -1580,9 +1616,7 @@
          assertTrue(broker2.isConnected());
 
          // - Test messages between clients by publishing now
-         long time = TimeThread.getTime();
-         int ts = 1;
-         CSN csn;
+         CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
          String user1entryUUID = "33333333-3333-3333-3333-333333333333";
          String baseUUID  = "22222222-2222-2222-2222-222222222222";
 
@@ -1591,10 +1625,9 @@
              + "objectClass: top\n" + "objectClass: domain\n"
              + "entryUUID: " + user1entryUUID + "\n";
          Entry entry = TestCaseUtils.entryFromLdifString(lentry);
-         csn = new CSN(time, ts++, brokerIds[0]);
-         AddMsg addMsg = new AddMsg(csn, EXAMPLE_DN,
-             user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
-             .getAttributes(), new ArrayList<Attribute>());
+         AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
+             user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
+             entry.getAttributes(), new ArrayList<Attribute>());
          broker1.publish(addMsg);
 
          // - Modify
@@ -1602,47 +1635,12 @@
          Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
          List<Modification> mods = new ArrayList<Modification>();
          mods.add(mod1);
-         csn = new CSN(time, ts++, brokerIds[0]);
-         ModifyMsg modMsg = new ModifyMsg(csn, EXAMPLE_DN, mods, "fakeuniqueid");
+         ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
          broker1.publish(modMsg);
 
          // - Check msg received by broker, through changeLog2
-
-         while (ts > 1)
-         {
-           ReplicationMsg msg2;
-           try
-           {
-             msg2 = broker2.receive();
-             if (msg2 == null)
-               break;
-             broker2.updateWindowAfterReplay();
-           }
-           catch (Exception e)
-           {
-             fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts);
-             break;
-           }
-
-           if (msg2 instanceof AddMsg)
-           {
-             if (addMsg.equals(msg2))
-               ts--;
-           }
-           else if (msg2 instanceof ModifyMsg)
-           {
-             if (modMsg.equals(msg2))
-               ts--;
-           }
-           else
-           {
-          fail("ReplicationServer transmission failed: did not expect message of class: " + msg2);
-             break;
-           }
-         }
-         // Check that everything expected has been received
-         assertEquals(ts, 1, "Broker2 did not receive the complete set of"
-             + " expected messages: #msg received " + ts);
+         List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 2);
+         Assertions.assertThat(msgs).containsExactly(addMsg, modMsg);
 
          // Then change the config to remove replicationServer[1] from
          // the configuration of replicationServer[0]
@@ -1657,41 +1655,43 @@
          // The link between RS[0] & RS[1] should be destroyed by the new configuration.
          // So we expect a timeout exception when calling receive on RS[1].
          // Send an update and check that RS[1] does not receive the message after the timeout
-         try
-         {
-           // - Del
-           csn = new CSN(time, ts++, brokerIds[0]);
-           DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csn, user1entryUUID);
-           broker1.publish(delMsg);
-           // Should receive some TopologyMsg messages for disconnection
-           // between the 2 RSs
-           ReplicationMsg msg = null;
-           while (true)
-           {
-             msg = broker2.receive();
-             if (msg instanceof TopologyMsg)
-             {
-               debugInfo("Broker 2 received: " + msg);
-             } else
-             {
-               fail("Broker: receive successed when it should fail. " +
-                 "This broker was disconnected by configuration." +
-                 " Received: " + msg);
-             }
-           }
-         }
-         catch (SocketTimeoutException soExc)
-         {
-         // the receive fail as expected
-         debugInfo("Ending replicationServerConnected");
-         return;
-         }
-       }
-       finally
-       {
+
+      // - Del
+      DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csnGen.newCSN(), user1entryUUID);
+      broker1.publish(delMsg);
+      // Should receive some TopologyMsg messages for disconnection between the 2 RSs
+      assertOnlyTopologyMsgsReceived(broker2);
+    }
+    finally
+    {
       remove(changelogs);
       stop(broker1, broker2);
-       }
+    }
+  }
+
+  private void assertOnlyTopologyMsgsReceived(ReplicationBroker broker2)
+  {
+    try
+    {
+      while (true)
+      {
+        ReplicationMsg msg = broker2.receive();
+        if (msg instanceof TopologyMsg)
+        {
+          debugInfo("Broker 2 received: " + msg);
+        }
+        else
+        {
+          fail("Broker: receive successed when it should fail. "
+              + "This broker was disconnected by configuration."
+              + " Received: " + msg);
+        }
+      }
+    }
+    catch (SocketTimeoutException expected)
+    {
+      debugInfo("Ending replicationServerConnected");
+    }
   }
 
 }

--
Gitblit v1.10.0