From db44a71ecc09f913fdea8aa0686b82df7566c35b Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 10 Sep 2015 13:54:09 +0000
Subject: [PATCH] Fixed random tests

---
 opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java |  193 +++++-----
 opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java        |  326 ++++++-----------
 opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java   |  351 ++++++++++---------
 opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java   |  153 +++-----
 4 files changed, 463 insertions(+), 560 deletions(-)

diff --git a/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java b/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java
index e603ef5..d0ddddc 100644
--- a/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java
+++ b/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java
@@ -29,9 +29,10 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.SortedSet;
-import java.util.TreeSet;
 
+import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ByteString;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.backends.MemoryBackend;
@@ -47,14 +48,16 @@
 import org.opends.server.replication.server.ReplServerFakeConfiguration;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.service.ReplicationBroker;
-import org.opends.server.types.AttributeType;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
+import org.opends.server.types.Modification;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.core.DirectoryServer.*;
+import static org.opends.server.util.CollectionUtils.*;
 import static org.testng.Assert.*;
 
 /**
@@ -105,7 +108,7 @@
     int brokerId = 2;
     int serverId = 1;
     int replServerId = 81;
-    int AddSequenceLength = 30;
+    int addSequenceLength = 30;
 
 
     cleanDB();
@@ -121,62 +124,36 @@
        * - Configure replication server
        * - check that the last entry has been correctly added
        */
+      Entry entry = TestCaseUtils.entryFromLdifString(
+          "dn:" + TEST_ROOT_DN_STRING + "\n"
+          + "objectClass: top\n"
+          + "objectClass: organization\n"
+          + "entryuuid: " + stringUID(1) + "\n");
 
-      String entryldif =
-        "dn:" + TEST_ROOT_DN_STRING + "\n"
-         + "objectClass: top\n"
-         + "objectClass: organization\n"
-         + "entryuuid: " + stringUID(1) + "\n";
-      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
-      AttributeType uidType =
-        DirectoryServer.getSchema().getAttributeType("entryuuid");
-
-      int replServerPort = TestCaseUtils.findFreePort();
-
-      ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddModDelDependencyTestDb",
-                                        replicationDbImplementation, 0, replServerId,
-                                        0, AddSequenceLength*5+100, null);
-      replServer = new ReplicationServer(conf);
+      replServer = newReplicationServer(replServerId, addSequenceLength * 5 + 100, "dependencyTestAddModDelDependencyTestDb");
 
       ReplicationBroker broker = openReplicationSession(
-          baseDN, brokerId, 1000, replServerPort, 1000, CLEAN_DB_GENERATION_ID);
+          baseDN, brokerId, 1000, replServer.getReplicationPort(), 1000, CLEAN_DB_GENERATION_ID);
 
       Thread.sleep(2000);
-      // send a sequence of add operation
 
+      // send a sequence of add operation
       DN addDN = TEST_ROOT_DN;
       CSNGenerator gen = new CSNGenerator(brokerId, 0L);
 
       int sequence;
-      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
       {
-        entry.removeAttribute(uidType);
+        entry.removeAttribute(getAttributeType("entryuuid"));
         entry.addAttribute(Attributes.create("entryuuid", stringUID(sequence+1)),
                            new LinkedList<ByteString>());
         addDN = DN.valueOf("dc=dependency" + sequence + "," + addDN);
-        AddMsg addMsg =
-          new AddMsg(gen.newCSN(), addDN, stringUID(sequence+1),
-                     stringUID(sequence),
-                     entry.getObjectClassAttribute(),
-                     entry.getAttributes(), null );
-        broker.publish(addMsg);
-
-        ModifyMsg modifyMsg =
-          new ModifyMsg(gen.newCSN(), addDN,
-                        generatemods("description", "test"),
-                        stringUID(sequence+1));
-        broker.publish(modifyMsg);
+        broker.publish(addMsg(addDN, entry, sequence + 1, sequence, gen));
+        broker.publish(modifyMsg(addDN, sequence + 1, generatemods("description", "test"), gen));
       }
 
       // configure and start replication of TEST_ROOT_DN_STRING on the server
-      SortedSet<String> replServers = new TreeSet<>();
-      replServers.add("localhost:"+replServerPort);
-      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
-      domainConf.setHeartbeatInterval(100000);
-
-      domain = MultimasterReplication.createNewDomain(domainConf);
-      domain.start();
+      domain = startNewLDAPReplicationDomain(replServer, baseDN, serverId, 100000);
 
       // check that last entry in sequence got added.
       Entry lastEntry = getEntry(addDN, 30000, true);
@@ -186,12 +163,11 @@
       // Check that all the modify have been replayed
       // (all the entries should have a description).
       addDN = TEST_ROOT_DN;
-      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
       {
         addDN = DN.valueOf("dc=dependency" + sequence + "," + addDN);
-
-        boolean found = checkEntryHasAttribute(addDN, "description", "test", 10000, true);
-        assertTrue(found, "The modification was not replayed on entry " + addDN);
+        checkEntryHasAttributeValue(
+            addDN, "description", "test", 10, "The modification was not replayed on entry " + addDN);
       }
 
       /*
@@ -206,14 +182,13 @@
        */
 
       domain.disable();
-      Thread.sleep(2000);  // necesary because disable does not wait
+      Thread.sleep(2000);  // necessary because disable does not wait
                            // for full termination of all threads. (issue 1571)
 
       DN deleteDN = addDN;
       while (sequence-->1)
       {
-        DeleteMsg delMsg = new DeleteMsg(deleteDN, gen.newCSN(), stringUID(sequence + 1));
-        broker.publish(delMsg);
+        broker.publish(delMsg(deleteDN, sequence + 1, gen));
         deleteDN = deleteDN.parent();
       }
 
@@ -237,6 +212,27 @@
     }
   }
 
+  private AddMsg addMsg(DN addDN, Entry entry, int uniqueId, int parentId, CSNGenerator gen)
+  {
+    return new AddMsg(gen.newCSN(), addDN, stringUID(uniqueId), stringUID(parentId),
+        entry.getObjectClassAttribute(), entry.getAttributes(), null);
+  }
+
+  private ModifyMsg modifyMsg(DN dn, int entryUUID, List<Modification> mods, CSNGenerator gen)
+  {
+    return new ModifyMsg(gen.newCSN(), dn, mods, stringUID(entryUUID));
+  }
+
+  private DeleteMsg delMsg(DN delDN, int entryUUID, CSNGenerator gen)
+  {
+    return new DeleteMsg(delDN, gen.newCSN(), stringUID(entryUUID));
+  }
+
+  private ModifyDNMsg modDNMsg(DN dn, String newRDN, int entryUUID, int newSuperiorEntryUUID, CSNGenerator gen)
+  {
+    return new ModifyDNMsg(dn, gen.newCSN(), stringUID(entryUUID), stringUID(newSuperiorEntryUUID), true, null, newRDN);
+  }
+
   /**
    * Check the dependency between moddn and delete operation
    * when an entry is renamed to a new dn and then deleted.
@@ -257,81 +253,48 @@
     try
     {
       // Create replication server, replication domain and broker.
-      String entryldif = "dn:" + TEST_ROOT_DN_STRING + "\n"
-      + "objectClass: top\n"
-      + "objectClass: organization\n";
-      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
-      AttributeType uidType =
-        DirectoryServer.getSchema().getAttributeType("entryuuid");
+      Entry entry = TestCaseUtils.entryFromLdifString(
+          "dn:" + TEST_ROOT_DN_STRING + "\n"
+          + "objectClass: top\n"
+          + "objectClass: organization\n");
+
       CSNGenerator gen = new CSNGenerator(brokerId, 0L);
       int renamedEntryUuid = 100;
 
-      int replServerPort = TestCaseUtils.findFreePort();
-
-      ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(replServerPort, "dependencyTestModdnDelDependencyTestDb",
-                                        replicationDbImplementation, 0, replServerId,
-                                        0, 200, null);
-      replServer = new ReplicationServer(conf);
+      replServer = newReplicationServer(replServerId, 200, "dependencyTestModdnDelDependencyTestDb");
 
       // configure and start replication of TEST_ROOT_DN_STRING on the server
-      SortedSet<String> replServers = new TreeSet<>();
-      replServers.add("localhost:"+replServerPort);
-      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
-      domainConf.setHeartbeatInterval(100000);
-
       Thread.sleep(2000);
-      domain = MultimasterReplication.createNewDomain(domainConf);
-      domain.start();
+      domain = startNewLDAPReplicationDomain(replServer, baseDN, serverId, 100000);
 
       ReplicationBroker broker = openReplicationSession(
-          baseDN, brokerId, 1000, replServerPort, 1000, CLEAN_DB_GENERATION_ID);
+          baseDN, brokerId, 1000, replServer.getReplicationPort(), 1000, CLEAN_DB_GENERATION_ID);
 
       // add an entry to play with.
-      entry.removeAttribute(uidType);
+      entry.removeAttribute(getAttributeType("entryuuid"));
       entry.addAttribute(Attributes.create("entryuuid",
                          stringUID(renamedEntryUuid)),
                          new LinkedList<ByteString>());
       DN addDN = DN.valueOf("dc=moddndel" + "," + TEST_ROOT_DN_STRING);
-      AddMsg addMsg =
-          new AddMsg(gen.newCSN(), addDN, stringUID(renamedEntryUuid),
-                   stringUID(1),
-                   entry.getObjectClassAttribute(),
-                   entry.getAttributes(), null );
-
-      broker.publish(addMsg);
+      broker.publish(addMsg(addDN, entry, renamedEntryUuid, 1, gen));
 
       // check that the entry was correctly added
-      boolean found = checkEntryHasAttribute(addDN, "entryuuid", stringUID(renamedEntryUuid), 30000, true);
-      assertTrue(found, "The initial entry add failed");
+      checkEntryHasAttributeValue(addDN, "entryuuid", stringUID(renamedEntryUuid), 30, "The initial entry add failed");
 
-
-      // disable the domain to make sure that the messages are
-      // all sent in a row.
+      // disable the domain to make sure that the messages are all sent in a row.
       domain.disable();
 
       // rename and delete the entry.
-      ModifyDNMsg moddnMsg =
-          new ModifyDNMsg(addDN, gen.newCSN(),
-                        stringUID(renamedEntryUuid),
-                        stringUID(1), true, null, "dc=new_name");
-      broker.publish(moddnMsg);
-      DeleteMsg delMsg =
-        new DeleteMsg(DN.valueOf("dc=new_name" + "," + TEST_ROOT_DN_STRING),
-                      gen.newCSN(), stringUID(renamedEntryUuid));
-      broker.publish(delMsg);
+      broker.publish(modDNMsg(addDN, "dc=new_name", renamedEntryUuid, 1, gen));
+      DN delDN = DN.valueOf("dc=new_name" + "," + TEST_ROOT_DN_STRING);
+      broker.publish(delMsg(delDN, renamedEntryUuid, gen));
 
       // enable back the domain to trigger message replay.
       domain.enable();
 
       // check that entry does not exist anymore.
-      Thread.sleep(10000);
-      found = checkEntryHasAttribute(DN.valueOf("dc=new_name" + "," + TEST_ROOT_DN_STRING),
-                                     "entryuuid",
-                                     stringUID(renamedEntryUuid),
-                                     30000, false);
-
-      assertFalse(found, "The delete dependencies was not correctly enforced");
+      checkEntryHasNoSuchAttributeValue(DN.valueOf("dc=new_name" + "," + TEST_ROOT_DN_STRING), "entryuuid",
+          stringUID(renamedEntryUuid), 30, "The delete dependencies was not correctly enforced");
     }
     finally
     {
@@ -356,18 +319,15 @@
     TestCaseUtils.initializeTestBackend(false);
 
     // Create top entry with uuid
-    String baseentryldif =
-      "dn:" + TEST_ROOT_DN_STRING + "\n"
-       + "objectClass: top\n"
-       + "objectClass: organization\n"
-       + "o: test\n"
-       + "entryuuid: " + stringUID(1) + "\n";
-    Entry topEntry = TestCaseUtils.entryFromLdifString(baseentryldif);
+    Entry topEntry = TestCaseUtils.entryFromLdifString(
+        "dn:" + TEST_ROOT_DN_STRING + "\n"
+         + "objectClass: top\n"
+         + "objectClass: organization\n"
+         + "o: test\n"
+         + "entryuuid: " + stringUID(1) + "\n");
 
-    MemoryBackend memoryBackend =
-      (MemoryBackend) DirectoryServer.getBackend(TEST_BACKEND_ID);
+    MemoryBackend memoryBackend = (MemoryBackend) DirectoryServer.getBackend(TEST_BACKEND_ID);
     memoryBackend.addEntry(topEntry, null);
-
   }
 
 
@@ -387,94 +347,58 @@
     int brokerId = 2;
     int serverId = 1;
     int replServerId = 83;
-    int AddSequenceLength = 30;
+    int addSequenceLength = 30;
 
     cleanDB();
 
     try
     {
-      String entryldif = "dn:" + TEST_ROOT_DN_STRING + "\n"
-      + "objectClass: top\n"
-      + "objectClass: organization\n";
-      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
-      AttributeType uidType =
-        DirectoryServer.getSchema().getAttributeType("entryuuid");
+      Entry entry = TestCaseUtils.entryFromLdifString(
+          "dn:" + TEST_ROOT_DN_STRING + "\n"
+          + "objectClass: top\n"
+          + "objectClass: organization\n");
 
-      int replServerPort = TestCaseUtils.findFreePort();
-
-      ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddDelAddDependencyTestDb", replicationDbImplementation,
-                                        0, replServerId, 0, 5*AddSequenceLength+100, null);
-      replServer = new ReplicationServer(conf);
+      replServer = newReplicationServer(replServerId, 5 * addSequenceLength + 100, "dependencyTestAddDelAddDependencyTestDb");
 
       ReplicationBroker broker = openReplicationSession(
-          baseDN, brokerId, 100, replServerPort, 1000, CLEAN_DB_GENERATION_ID);
+          baseDN, brokerId, 100, replServer.getReplicationPort(), 1000, CLEAN_DB_GENERATION_ID);
 
       // send a sequence of add/del/add operations
       CSNGenerator gen = new CSNGenerator(brokerId, 0L);
 
       int sequence;
-      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
       {
         // add the entry a first time
-        entry.removeAttribute(uidType);
+        entry.removeAttribute(getAttributeType("entryuuid"));
         entry.addAttribute(Attributes.create("entryuuid", stringUID(sequence+1)),
                            new LinkedList<ByteString>());
         DN addDN = DN.valueOf("dc=dependency" + sequence + "," + TEST_ROOT_DN_STRING);
-        AddMsg addMsg =
-          new AddMsg(gen.newCSN(), addDN, stringUID(sequence+1),
-                     stringUID(1),
-                     entry.getObjectClassAttribute(),
-                     entry.getAttributes(), null );
-        broker.publish(addMsg);
-
-        // delete the entry
-        DeleteMsg delMsg = new DeleteMsg(addDN, gen.newCSN(),
-                                         stringUID(sequence+1));
-        broker.publish(delMsg);
+        broker.publish(addMsg(addDN, entry, sequence + 1, 1, gen));
+        broker.publish(delMsg(addDN, sequence + 1, gen));
 
         // add again the entry with a new entryuuid.
-        entry.removeAttribute(uidType);
+        entry.removeAttribute(getAttributeType("entryuuid"));
         entry.addAttribute(Attributes.create("entryuuid", stringUID(sequence+1025)),
                            new LinkedList<ByteString>());
-        addMsg =
-          new AddMsg(gen.newCSN(), addDN, stringUID(sequence+1025),
-                     stringUID(1),
-                     entry.getObjectClassAttribute(),
-                     entry.getAttributes(), null );
-        broker.publish(addMsg);
+        broker.publish(addMsg(addDN, entry, sequence + 1025, 1, gen));
       }
 
-      // configure and start replication of TEST_ROOT_DN_STRING on the server
-      SortedSet<String> replServers = new TreeSet<>();
-      replServers.add("localhost:"+replServerPort);
-      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
-      domain = MultimasterReplication.createNewDomain(domainConf);
-      domain.start();
+      domain = startNewLDAPReplicationDomain(replServer, baseDN, serverId, -1);
 
       // check that all entries have been deleted and added
       // again by checking that they do have the correct entryuuid
-      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
       {
         String addDn = "dc=dependency" + sequence + "," + TEST_ROOT_DN_STRING;
-
-        boolean found =
-          checkEntryHasAttribute(DN.valueOf(addDn), "entryuuid",
-                                 stringUID(sequence+1025),
-                                 30000, true);
-        if (!found)
-        {
-          fail("The second add was not replayed on entry " + addDn);
-        }
+        checkEntryHasAttributeValue(DN.valueOf(addDn), "entryuuid", stringUID(sequence + 1025), 30,
+            "The second add was not replayed on entry " + addDn);
       }
 
-      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
       {
         DN deleteDN = DN.valueOf("dc=dependency" + sequence + "," + TEST_ROOT_DN_STRING);
-        DeleteMsg delMsg = new DeleteMsg(deleteDN,
-                                         gen.newCSN(),
-                                         stringUID(sequence + 1025));
-        broker.publish(delMsg);
+        broker.publish(delMsg(deleteDN, sequence + 1025, gen));
       }
 
       // check that the database was cleaned successfully
@@ -493,6 +417,28 @@
     }
   }
 
+  private ReplicationServer newReplicationServer(int replServerId, int windowSize, String dirName) throws Exception
+  {
+    int replServerPort = TestCaseUtils.findFreePort();
+    ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
+        replServerPort, dirName, replicationDbImplementation, 0, replServerId, 0, windowSize, null);
+    return new ReplicationServer(conf);
+  }
+
+  private LDAPReplicationDomain startNewLDAPReplicationDomain(ReplicationServer replServer, DN baseDN, int serverId,
+      int heartBeatInterval) throws ConfigException
+  {
+    SortedSet<String> replServers = newTreeSet("localhost:" + replServer.getReplicationPort());
+    DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
+    if (heartBeatInterval > 0)
+    {
+      domainConf.setHeartbeatInterval(heartBeatInterval);
+    }
+    LDAPReplicationDomain domain = MultimasterReplication.createNewDomain(domainConf);
+    domain.start();
+    return domain;
+  }
+
   /**
    * Check that the dependency of moddn operation are working by
    * issuing a set of Add operation followed by a modrdn of the added entry.
@@ -506,29 +452,21 @@
     int brokerId = 2;
     int serverId = 1;
     int replServerId = 84;
-    int AddSequenceLength = 30;
+    int addSequenceLength = 30;
 
     cleanDB();
 
-
     try
     {
-      String entryldif = "dn:" + TEST_ROOT_DN_STRING + "\n"
-      + "objectClass: top\n"
-      + "objectClass: organization\n";
-      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
-      AttributeType uidType =
-        DirectoryServer.getSchema().getAttributeType("entryuuid");
+      Entry entry = TestCaseUtils.entryFromLdifString(
+          "dn:" + TEST_ROOT_DN_STRING + "\n"
+          + "objectClass: top\n"
+          + "objectClass: organization\n");
 
-      int replServerPort = TestCaseUtils.findFreePort();
-
-      ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddModdnDependencyTestDb", replicationDbImplementation,
-                                        0, replServerId, 0, 5*AddSequenceLength+100, null);
-      replServer = new ReplicationServer(conf);
+      replServer = newReplicationServer(replServerId, 5 * addSequenceLength + 100, "dependencyTestAddModdnDependencyTestDb");
 
       ReplicationBroker broker = openReplicationSession(
-          baseDN, brokerId, 100, replServerPort, 1000, CLEAN_DB_GENERATION_ID);
+          baseDN, brokerId, 100, replServer.getReplicationPort(), 1000, CLEAN_DB_GENERATION_ID);
 
 
       DN addDN = TEST_ROOT_DN;
@@ -536,36 +474,24 @@
 
       // send a sequence of add/modrdn operations
       int sequence;
-      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
       {
         // add the entry
-        entry.removeAttribute(uidType);
+        entry.removeAttribute(getAttributeType("entryuuid"));
         entry.addAttribute(Attributes.create("entryuuid", stringUID(sequence+1)),
                            new LinkedList<ByteString>());
         addDN = DN.valueOf("dc=dependency" + sequence + "," + TEST_ROOT_DN_STRING);
-        AddMsg addMsg =
-          new AddMsg(gen.newCSN(), addDN, stringUID(sequence+1),
-                     stringUID(1),
-                     entry.getObjectClassAttribute(),
-                     entry.getAttributes(), null );
-        broker.publish(addMsg);
+        broker.publish(addMsg(addDN, entry, sequence + 1, 1, gen));
 
         // rename the entry
-        ModifyDNMsg moddnMsg =
-          new ModifyDNMsg(addDN, gen.newCSN(), stringUID(sequence+1),
-                          stringUID(1), true, null, "dc=new_dep" + sequence);
-        broker.publish(moddnMsg);
+        broker.publish(modDNMsg(addDN, "dc=new_dep" + sequence, sequence + 1, 1, gen));
       }
 
       // configure and start replication of TEST_ROOT_DN_STRING on the server
-      SortedSet<String> replServers = new TreeSet<>();
-      replServers.add("localhost:"+replServerPort);
-      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
-      domain = MultimasterReplication.createNewDomain(domainConf);
-      domain.start();
+      domain = startNewLDAPReplicationDomain(replServer, baseDN, serverId, -1);
 
       // check that all entries have been renamed
-      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
       {
         addDN = DN.valueOf("dc=new_dep" + sequence + "," + TEST_ROOT_DN_STRING);
         Entry baseEntry = getEntry(addDN, 30000, true);
@@ -574,11 +500,10 @@
       }
 
       // delete the entries to clean the database.
-      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
       {
         addDN = DN.valueOf("dc=new_dep" + sequence + "," + TEST_ROOT_DN_STRING);
-        DeleteMsg delMsg = new DeleteMsg(addDN, gen.newCSN(), stringUID(sequence + 1));
-        broker.publish(delMsg);
+        broker.publish(delMsg(addDN, sequence + 1, gen));
       }
     }
     finally
@@ -601,5 +526,4 @@
   {
     return String.format("11111111-1111-1111-1111-%012x", i);
   }
-
 }
diff --git a/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java b/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
index 9ee3a1b..b4d219e 100644
--- a/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
@@ -32,13 +32,14 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import org.assertj.core.api.Assertions;
+import org.assertj.core.api.SoftAssertions;
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ByteString;
-import org.forgerock.opendj.ldap.ModificationType;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.SearchScope;
 import org.opends.server.DirectoryServerTestCase;
@@ -71,12 +72,17 @@
 import org.opends.server.types.Entry;
 import org.opends.server.types.Modification;
 import org.opends.server.types.SearchResultEntry;
+import org.opends.server.util.TestTimer;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static java.util.concurrent.TimeUnit.*;
+
+import static org.forgerock.opendj.ldap.ModificationType.*;
 import static org.forgerock.opendj.ldap.ResultCode.*;
 import static org.forgerock.opendj.ldap.SearchScope.*;
+import static org.opends.server.backends.task.TaskState.*;
 import static org.opends.server.config.ConfigConstants.*;
 import static org.opends.server.protocols.internal.Requests.*;
 import static org.opends.server.util.CollectionUtils.*;
@@ -248,33 +254,31 @@
   }
 
   /**
-   * Check connection of the provided ds to the
-   * replication server. Waits for connection to be ok up to secTimeout seconds
-   * before failing.
+   * Check connection of the provided ds to the replication server. Waits for connection to be ok up
+   * to secTimeout seconds before failing.
    */
-  protected void checkConnection(int secTimeout, ReplicationBroker rb, int rsPort) throws Exception
+  protected void checkConnection(int secTimeout, final ReplicationBroker rb, int rsPort) throws Exception
   {
-    int nSec = 0;
-
-    // Go out of the loop only if connection is verified or if timeout occurs
-    while (true)
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(secTimeout, SECONDS)
+      .sleepTimes(1, SECONDS)
+      .toTimer();
+    timer.repeatUntilSuccess(new Callable<Void>()
     {
-      if (rb.isConnected())
+      @Override
+      public Void call() throws Exception
       {
-        logger.trace("checkConnection: connection of broker "
-          + rb.getServerId() + " to RS " + rb.getRsGroupId()
-          + " obtained after " + nSec + " seconds.");
-        return;
+        if (rb.isConnected())
+        {
+          logger.trace("checkConnection: connection of broker " + rb.getServerId()
+              + " to RS " + rb.getRsGroupId() + " obtained.");
+          return null;
+        }
+
+        rb.start();
+        return null;
       }
-
-      Thread.sleep(1000);
-      rb.start();
-      nSec++;
-
-      assertTrue(nSec <= secTimeout,
-          "checkConnection: DS " + rb.getServerId() + " is not connected to "
-              + "the RS port " + rsPort + " after " + secTimeout + " seconds.");
-    }
+    });
   }
 
   protected void deleteEntry(DN dn) throws Exception
@@ -372,6 +376,11 @@
 
   protected void clearChangelogDB(ReplicationServer rs) throws Exception
   {
+    if (rs == null)
+    {
+      return;
+    }
+
     if (replicationDbImplementation == ReplicationDBImplementation.JE)
     {
       ((JEChangelogDB) rs.getChangelogDB()).clearDB();
@@ -418,6 +427,10 @@
 
   protected void stop(ReplicationBroker... brokers)
   {
+    if (brokers == null)
+    {
+      return;
+    }
     for (ReplicationBroker broker : brokers)
     {
       if (broker != null)
@@ -484,83 +497,91 @@
    * @return The monitor value
    * @throws Exception If an error occurs.
    */
-  protected long getMonitorAttrValue(DN baseDN, String attr) throws Exception
+  protected long getMonitorAttrValue(final DN baseDN, final String attr) throws Exception
   {
-    String monitorFilter = "(&(cn=Directory server*)(domain-name=" + baseDN + "))";
-
-    InternalSearchOperation op;
-    int count = 0;
-    do
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(10, SECONDS)
+      .sleepTimes(100, MILLISECONDS)
+      .toTimer();
+    return timer.repeatUntilSuccess(new Callable<Long>()
     {
-      if (count++>0)
+      @Override
+      public Long call() throws Exception
       {
-        Thread.sleep(100);
-      }
-      op = connection.processSearch(newSearchRequest("cn=replication,cn=monitor", WHOLE_SUBTREE, monitorFilter));
-    }
-    while (op.getSearchEntries().isEmpty() && count<100);
-    assertFalse(op.getSearchEntries().isEmpty(), "Could not read monitoring information");
+        String monitorFilter = "(&(cn=Directory server*)(domain-name=" + baseDN + "))";
+        InternalSearchOperation op =
+            connection.processSearch(newSearchRequest("cn=replication,cn=monitor", WHOLE_SUBTREE, monitorFilter));
+        Assertions.assertThat(op.getSearchEntries()).as("Could not read monitoring information").isNotEmpty();
 
-    SearchResultEntry entry = op.getSearchEntries().getFirst();
-    return entry.parseAttribute(attr).asLong();
+        SearchResultEntry entry = op.getSearchEntries().getFirst();
+        return entry.parseAttribute(attr).asLong();
+      }
+    });
   }
 
-  /**
-   * Check that the entry with the given dn has the given valueString value
-   * for the given attrTypeStr attribute type.
-   */
-  protected boolean checkEntryHasAttribute(DN dn, String attrTypeStr,
-      String valueString, int timeout, boolean hasAttribute) throws Exception
+  protected void checkEntryHasAttributeValue(final DN dn, final String attrTypeStr, final String valueString,
+      int timeoutInSecs, String notFoundErrorMsg) throws Exception
   {
-    boolean found = false;
-    int count = timeout/100;
-    if (count<1)
-    {
-      count=1;
-    }
+    checkEntryHasAttribute(dn, attrTypeStr, valueString, timeoutInSecs, true, notFoundErrorMsg);
+  }
 
-    do
-    {
-      final Entry newEntry = DirectoryServer.getEntry(dn);
-      if (newEntry != null)
-      {
-        List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
-        if (tmpAttrList != null && !tmpAttrList.isEmpty())
-        {
-          Attribute tmpAttr = tmpAttrList.get(0);
-          found = tmpAttr.contains(ByteString.valueOf(valueString));
-        }
-      }
+  protected void checkEntryHasNoSuchAttributeValue(final DN dn, final String attrTypeStr, final String valueString,
+      int timeoutInSecs, String foundErrorMsg) throws Exception
+  {
+    checkEntryHasAttribute(dn, attrTypeStr, valueString, timeoutInSecs, false, foundErrorMsg);
+  }
 
-      if (found != hasAttribute)
+  protected boolean checkEntryHasAttribute(final DN dn, final String attrTypeStr, final String valueString,
+      int timeout, final boolean expectedAttributeValueFound) throws Exception
+  {
+    checkEntryHasAttribute(dn, attrTypeStr, valueString, timeout / 1000, expectedAttributeValueFound, null);
+    return expectedAttributeValueFound;
+  }
+
+  private void checkEntryHasAttribute(final DN dn, final String attrTypeStr, final String valueString,
+      int timeoutInSecs, final boolean expectedAttributeValueFound, final String foundMsg) throws Exception
+  {
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(timeoutInSecs, SECONDS)
+      .sleepTimes(100, MILLISECONDS)
+      .toTimer();
+    timer.repeatUntilSuccess(new Callable<Void>()
+    {
+      @Override
+      public Void call() throws Exception
       {
-        Thread.sleep(100);
+        final Entry newEntry = DirectoryServer.getEntry(dn);
+        assertNotNull(newEntry);
+        List<Attribute> attrList = newEntry.getAttribute(attrTypeStr);
+        Assertions.assertThat(attrList).isNotEmpty();
+        Attribute attr = attrList.get(0);
+        boolean foundAttributeValue = attr.contains(ByteString.valueOf(valueString));
+        assertEquals(foundAttributeValue, expectedAttributeValueFound, foundMsg);
+        return null;
       }
-    } while (--count > 0 && found != hasAttribute);
-    return found;
+    });
   }
 
   /**
    * Retrieves an entry from the local Directory Server.
    * @throws Exception When the entry cannot be locked.
    */
-  protected Entry getEntry(DN dn, int timeout, boolean exist) throws Exception
+  protected Entry getEntry(final DN dn, int timeoutInMillis, final boolean exist) throws Exception
   {
-    int count = timeout/200;
-    if (count<1)
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(timeoutInMillis, MILLISECONDS)
+      .sleepTimes(200, MILLISECONDS)
+      .toTimer();
+    timer.repeatUntilSuccess(new Callable<Void>()
     {
-      count=1;
-    }
-    Thread.sleep(50);
-    boolean found = DirectoryServer.entryExists(dn);
-    while (count> 0 && found != exist)
-    {
-      Thread.sleep(200);
-
-      found = DirectoryServer.entryExists(dn);
-      count--;
-    }
-
+      @Override
+      public Void call() throws Exception
+      {
+        assertEquals(DirectoryServer.entryExists(dn), exist);
+        return null;
+      }
+    });
+    
     Entry entry = DirectoryServer.getEntry(dn);
     if (entry != null)
     {
@@ -600,43 +621,34 @@
   protected List<Modification> generatemods(String attrName, String attrValue)
   {
     Attribute attr = Attributes.create(attrName, attrValue);
-    List<Modification> mods = new ArrayList<>();
-    Modification mod = new Modification(ModificationType.REPLACE, attr);
-    mods.add(mod);
-    return mods;
+    return newArrayList(new Modification(REPLACE, attr));
   }
 
   /** Utility method to create, run a task and check its result. */
   protected void task(String task) throws Exception
   {
-    Entry taskEntry = TestCaseUtils.addEntry(task);
+    final Entry taskEntry = TestCaseUtils.addEntry(task);
 
     // Wait until the task completes.
-    Entry resultEntry = null;
-    String completionTime = null;
-    long startMillisecs = System.currentTimeMillis();
-    do
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(30, SECONDS)
+      .sleepTimes(20, MILLISECONDS)
+      .toTimer();
+    Entry resultEntry = timer.repeatUntilSuccess(new Callable<Entry>()
     {
-      final SearchRequest request = newSearchRequest(taskEntry.getName(), SearchScope.BASE_OBJECT);
-      InternalSearchOperation searchOperation = connection.processSearch(request);
-      if (searchOperation.getSearchEntries().isEmpty())
+      @Override
+      public Entry call() throws Exception
       {
-        continue;
+        final SearchRequest request = newSearchRequest(taskEntry.getName(), SearchScope.BASE_OBJECT);
+        InternalSearchOperation searchOperation = connection.processSearch(request);
+        Assertions.assertThat(searchOperation.getSearchEntries()).isNotEmpty();
+        Entry resultEntry = searchOperation.getSearchEntries().get(0);
+        String completionTime = resultEntry.parseAttribute(
+            ATTR_TASK_COMPLETION_TIME.toLowerCase()).asString();
+        assertNotNull(completionTime, "The task has not completed");
+        return resultEntry;
       }
-      resultEntry = searchOperation.getSearchEntries().get(0);
-      completionTime = resultEntry.parseAttribute(
-          ATTR_TASK_COMPLETION_TIME.toLowerCase()).asString();
-      if (completionTime == null)
-      {
-        if (System.currentTimeMillis() - startMillisecs > 1000*30)
-        {
-          break;
-        }
-        Thread.sleep(10);
-      }
-    } while (completionTime == null);
-
-    assertNotNull(completionTime, "The task has not completed after 30 seconds.");
+    });
 
     // Check that the task state is as expected.
     String stateString = resultEntry.parseAttribute(
@@ -647,14 +659,12 @@
   }
 
   /**
-   * Create a new replication session security object that can be used in
-   * unit tests.
+   * Create a new replication session security object that can be used in unit tests.
    *
    * @return A new replication session security object.
    * @throws ConfigException If an error occurs.
    */
-  protected static ReplSessionSecurity getReplSessionSecurity()
-       throws ConfigException
+  protected static ReplSessionSecurity getReplSessionSecurity() throws ConfigException
   {
     return new ReplSessionSecurity(null, null, null, true);
   }
@@ -704,37 +714,34 @@
     logger.trace("AddedTask/" + taskEntry.getName());
   }
 
-  protected void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
+  protected void waitTaskState(final Entry taskEntry, final TaskState expectedTaskState,
       long maxWaitTimeInMillis, LocalizableMessage expectedMessage) throws Exception
   {
-    long startTime = System.currentTimeMillis();
-
-    Entry resultEntry = null;
-    TaskState taskState = null;
-    do
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(maxWaitTimeInMillis, MILLISECONDS)
+      .sleepTimes(100, MILLISECONDS)
+      .toTimer();
+    Entry resultEntry = timer.repeatUntilSuccess(new Callable<Entry>()
     {
-      final SearchRequest request = newSearchRequest(taskEntry.getName(), SearchScope.BASE_OBJECT);
-      InternalSearchOperation searchOperation = connection.processSearch(request);
-      resultEntry = searchOperation.getSearchEntries().getFirst();
+      @Override
+      public Entry call() throws Exception
+      {
+        final SearchRequest request = newSearchRequest(taskEntry.getName(), SearchScope.BASE_OBJECT);
+        InternalSearchOperation searchOperation = connection.processSearch(request);
+        Entry resultEntry = searchOperation.getSearchEntries().getFirst();
 
-      // Check that the task state is as expected.
-      String stateString = resultEntry.parseAttribute(
-          ATTR_TASK_STATE.toLowerCase()).asString();
-      taskState = TaskState.fromString(stateString);
-
-      Thread.sleep(100);
-    }
-    while (taskState != expectedTaskState
-        && taskState != TaskState.STOPPED_BY_ERROR
-        && taskState != TaskState.COMPLETED_SUCCESSFULLY
-        && System.currentTimeMillis() - startTime < maxWaitTimeInMillis);
+        TaskState taskState = getTaskState(resultEntry);
+        Assertions.assertThat(taskState).isIn(expectedTaskState, STOPPED_BY_ERROR, COMPLETED_SUCCESSFULLY);
+        return resultEntry;
+      }
+    });
 
     // Check that the task contains some log messages.
     Set<String> logMessages = resultEntry.parseAttribute(
         ATTR_TASK_LOG_MESSAGES.toLowerCase()).asSetOfString();
 
-    if (taskState != TaskState.COMPLETED_SUCCESSFULLY
-        && taskState != TaskState.RUNNING)
+    TaskState taskState = getTaskState(resultEntry);
+    if (taskState != COMPLETED_SUCCESSFULLY && taskState != RUNNING)
     {
       assertFalse(logMessages.isEmpty(),
           "No log messages were written to the task entry on a failed task");
@@ -750,8 +757,7 @@
       }
     }
 
-    if (expectedTaskState == TaskState.RUNNING
-        && taskState == TaskState.COMPLETED_SUCCESSFULLY)
+    if (expectedTaskState == RUNNING && taskState == COMPLETED_SUCCESSFULLY)
     {
       // We usually wait the running state after adding the task
       // and if the task is fast enough then it may be already done
@@ -764,6 +770,12 @@
     }
   }
 
+  private TaskState getTaskState(Entry resultEntry)
+  {
+    String stateString = resultEntry.parseAttribute(ATTR_TASK_STATE.toLowerCase()).asString();
+    return TaskState.fromString(stateString);
+  }
+
   /** Add to the current DB the entries necessary to the test. */
   protected void addTestEntriesToDB(String... ldifEntries) throws Exception
   {
@@ -789,28 +801,25 @@
    * @throws Exception if the entry does not exist or does not have
    *                   an entryUUID.
    */
-  protected String getEntryUUID(DN dn) throws Exception
+  protected String getEntryUUID(final DN dn) throws Exception
   {
-    int count = 10;
-    String found = null;
-    while (count > 0 && found == null)
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(1, SECONDS)
+      .sleepTimes(100, MILLISECONDS)
+      .toTimer();
+    return timer.repeatUntilSuccess(new Callable<String>()
     {
-      Thread.sleep(100);
-
-      Entry newEntry = DirectoryServer.getEntry(dn);
-      if (newEntry != null)
+      @Override
+      public String call() throws Exception
       {
+        Entry newEntry = DirectoryServer.getEntry(dn);
+        assertNotNull(newEntry);
         Attribute attribute = newEntry.getAttribute("entryuuid").get(0);
-        for (ByteString val : attribute)
-        {
-          found = val.toString();
-          break;
-        }
+        String found = attribute.iterator().next().toString();
+        assertNotNull(found, "Entry: " + dn + " Could not be found.");
+        return found;
       }
-      count --;
-    }
-    assertNotNull(found, "Entry: " + dn + " Could not be found.");
-    return found;
+    });
   }
 
   /** Utility method : removes a domain deleting the passed config entry */
@@ -903,25 +912,27 @@
    * Performs an internal search, waiting for at most 3 seconds for expected result code and expected
    * number of entries.
    */
-  protected InternalSearchOperation waitForSearchResult(String dn, SearchScope scope, String filter,
-      ResultCode expectedResultCode, int expectedNbEntries) throws Exception
+  protected InternalSearchOperation waitForSearchResult(final String dn, final SearchScope scope, final String filter,
+      final ResultCode expectedResultCode, final int expectedNbEntries) throws Exception
   {
-    InternalSearchOperation searchOp = null;
-    int count = 0;
-    do
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(3, SECONDS)
+      .sleepTimes(10, MILLISECONDS)
+      .toTimer();
+    return timer.repeatUntilSuccess(new Callable<InternalSearchOperation>()
     {
-      Thread.sleep(10);
-      final SearchRequest request = newSearchRequest(dn, scope, filter).addAttribute("*", "+");
-      searchOp = connection.processSearch(request);
-      count++;
-    }
-    while (count < 300
-        && searchOp.getResultCode() != expectedResultCode
-        && searchOp.getSearchEntries().size() != expectedNbEntries);
-
-    final List<SearchResultEntry> entries = searchOp.getSearchEntries();
-    Assertions.assertThat(entries).hasSize(expectedNbEntries);
-    return searchOp;
+      @Override
+      public InternalSearchOperation call() throws Exception
+      {
+        final SearchRequest request = newSearchRequest(dn, scope, filter).addAttribute("*", "+");
+        InternalSearchOperation searchOp = connection.processSearch(request);
+        SoftAssertions softly = new SoftAssertions();
+        softly.assertThat(searchOp.getResultCode()).isEqualTo(expectedResultCode);
+        softly.assertThat(searchOp.getSearchEntries()).hasSize(expectedNbEntries);
+        softly.assertAll();
+        return searchOp;
+      }
+    });
   }
 
   protected static void setReplicationDBImplementation(ReplicationDBImplementation impl)
diff --git a/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java b/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java
index f3122be..70c23a9 100644
--- a/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java
+++ b/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java
@@ -28,15 +28,18 @@
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import org.assertj.core.api.Assertions;
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.opendj.ldap.ModificationType;
+import org.forgerock.opendj.ldap.ResultCode;
 import org.opends.server.TestCaseUtils;
-import org.opends.server.admin.std.server.SynchronizationProviderCfg;
 import org.opends.server.api.SynchronizationProvider;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.ModifyOperation;
@@ -46,30 +49,34 @@
 import org.opends.server.replication.protocol.ModifyMsg;
 import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.service.ReplicationBroker;
-import org.opends.server.types.*;
-import org.forgerock.opendj.ldap.ResultCode;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.Modification;
+import org.opends.server.types.Operation;
+import org.opends.server.types.RawModification;
+import org.opends.server.util.TestTimer;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static java.util.concurrent.TimeUnit.*;
+
+import static org.forgerock.opendj.ldap.ModificationType.*;
+import static org.opends.server.core.DirectoryServer.*;
+import static org.opends.server.util.CollectionUtils.*;
 import static org.testng.Assert.*;
 
-/**
- * Test for the schema replication.
- */
+/** Test for the schema replication. */
 @SuppressWarnings("javadoc")
 public class SchemaReplicationTest extends ReplicationTestCase
 {
-
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
-
   private List<Modification> rcvdMods;
 
   private int replServerPort;
 
-  /**
-   * Set up the environment for performing the tests in this Class.
-   */
+  /** Set up the environment for performing the tests in this Class. */
   @Override
   @BeforeClass
   public void setUp() throws Exception
@@ -107,10 +114,7 @@
     configureReplication(replServerLdif, domainLdif);
   }
 
-  /**
-   * Checks that changes done to the schema are pushed to the replicationServer
-   * clients.
-   */
+  /** Checks that changes done to the schema are pushed to the replicationServer clients. */
   @Test
   public void pushSchemaChange() throws Exception
   {
@@ -128,45 +132,18 @@
       // Modify the schema
       Attribute attr = Attributes.create("attributetypes",
           "( 2.5.44.77.33 NAME 'dummy' )");
-      List<Modification> mods = new ArrayList<>();
-      Modification mod = new Modification(ModificationType.ADD, attr);
-      mods.add(mod);
-      ModifyOperation modOp = connection.processModify(baseDN, mods);
-      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS,
-          "The original operation failed");
+      Modification mod = new Modification(ADD, attr);
+      processModify(baseDN, mod);
 
       // See if the client has received the msg
-      ReplicationMsg msg = broker.receive();
-      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
-      ModifyMsg modMsg = (ModifyMsg) msg;
+      ModifyMsg modMsg = receiveModifyMsg(broker);
+      assertModReceived(mod, baseDN, modMsg);
 
-      Operation receivedOp = modMsg.createOperation(connection);
-      assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
-      Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
-      ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
-
-      this.rcvdMods = new ArrayList<>();
-      for (RawModification m : receivedModifyOperation.getRawModifications())
-      {
-        this.rcvdMods.add(m.toModification());
-      }
-
-      assertTrue(this.rcvdMods.contains(mod),
-                 "The received mod does not contain the original change");
-
-      /*
-       * Now cleanup the schema for the next test
-       */
-      mod = new Modification(ModificationType.DELETE, attr);
-      mods.clear();
-      mods.add(mod);
-      modOp = connection.processModify(baseDN, mods);
-      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS,
-          "The original operation failed");
+      /* Now cleanup the schema for the next test */
+      processModify(baseDN, new Modification(DELETE, attr));
 
       // See if the client has received the msg
-      msg = broker.receive();
-      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
+      receiveModifyMsg(broker);
     }
     finally
     {
@@ -174,6 +151,12 @@
     }
   }
 
+  private void processModify(final DN baseDN, Modification mod)
+  {
+    ModifyOperation modOp = connection.processModify(baseDN, newArrayList(mod));
+    assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
+  }
+
   /**
    * Checks that changes to the schema pushed to the replicationServer
    * are received and correctly replayed by replication plugin.
@@ -198,10 +181,8 @@
           EntryHistorical.getEntryUUID(DirectoryServer.getEntry(baseDN)));
       broker.publish(modMsg);
 
-      boolean found = checkEntryHasAttribute(baseDN, "attributetypes",
-        "( 2.5.44.77.33 NAME 'dummy' )",
-        10000, true);
-      assertTrue(found, "The modification has not been correctly replayed.");
+      checkEntryHasAttributeValue(baseDN, "attributetypes", "( 2.5.44.77.33 NAME 'dummy' )", 10,
+          "The modification has not been correctly replayed.");
     }
     finally
     {
@@ -231,73 +212,91 @@
       // create a schema change Notification
       Attribute attr = Attributes.create("attributetypes",
         "( 2.5.44.76.35 NAME 'push' )");
-      List<Modification> mods = new ArrayList<>();
-      Modification mod = new Modification(ModificationType.ADD, attr);
-      mods.add(mod);
+      Modification mod = new Modification(ADD, attr);
+      List<Modification> mods = newArrayList(mod);
 
-      for (SynchronizationProvider<SynchronizationProviderCfg> provider : DirectoryServer.
-        getSynchronizationProviders())
+      for (SynchronizationProvider<?> provider : getSynchronizationProviders())
       {
         provider.processSchemaChange(mods);
       }
 
       // receive the message on the broker side.
-      ReplicationMsg msg = broker.receive();
-      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
-      ModifyMsg modMsg = (ModifyMsg) msg;
-
-      Operation receivedOp = modMsg.createOperation(connection);
-      assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
-      Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
-      ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
-
-      this.rcvdMods = new ArrayList<>();
-      for (RawModification m : receivedModifyOperation.getRawModifications())
-      {
-        this.rcvdMods.add(m.toModification());
-      }
-
-      assertTrue(this.rcvdMods.contains(mod),
-        "The received mod does not contain the original change");
+      ModifyMsg modMsg = receiveModifyMsg(broker);
+      assertModReceived(mod, baseDN, modMsg);
 
       // check that the schema files were updated with the new ServerState.
       // by checking that the CSN of msg we just received has been
       // added to the user schema file.
 
       // build the string to find in the schema file
-      String stateStr = modMsg.getCSN().toString();
+      final String stateStr = modMsg.getCSN().toString();
 
       // open the schema file
-      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
-      String buildDir = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
-              buildRoot + File.separator + "build");
-      String path = buildDir + File.separator +
-        "unit-tests" + File.separator + "package-instance" + File.separator +
-        "config" + File.separator + "schema" + File.separator +
-        "99-user.ldif";
+      final File schemaFile = getSchemaFile();
 
       // it is necessary to loop on this check because the state is not
       // written immediately but only every so often.
-      int count = 0;
-      while (true)
+      TestTimer timer = new TestTimer.Builder()
+        .maxSleep(5, SECONDS)
+        .sleepTimes(100, MILLISECONDS)
+        .toTimer();
+      timer.repeatUntilSuccess(new Callable<Void>()
       {
-        File file = new File(path);
-        FileInputStream input = new FileInputStream(file);
-        byte[] bytes = new byte[input.available()];
-        input.read(bytes);
-        String fileStr = new String(bytes);
-        if (fileStr.contains(stateStr))
+        @Override
+        public Void call() throws Exception
         {
-          break;
+          String fileStr = readAsString(schemaFile);
+          assertTrue(fileStr.contains(stateStr), "The Schema persistentState (CSN:" + stateStr
+              + ") has not been saved to " + schemaFile + " : " + fileStr);
+          return null;
         }
-        assertTrue(count++ <= 50, "The Schema persistentState (CSN:" + stateStr
-            + ") has not been saved to " + path + " : " + fileStr);
-        TestCaseUtils.sleep(100);
-      }
+      });
     } finally
     {
       broker.stop();
     }
     logger.error(LocalizableMessage.raw("Ending replication test : pushSchemaFilesChange "));
   }
+
+  private File getSchemaFile()
+  {
+    String sep = File.separator;
+    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+    String buildDir = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + sep + "target");
+    final String path = buildDir + sep
+        + "unit-tests" + sep + "package-instance" + sep + "config" + sep + "schema" + sep + "99-user.ldif";
+    return new File(path);
+  }
+
+  private ModifyMsg receiveModifyMsg(ReplicationBroker broker) throws SocketTimeoutException
+  {
+    ReplicationMsg msg = broker.receive();
+    Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
+    return (ModifyMsg) msg;
+  }
+
+  private void assertModReceived(Modification mod, final DN baseDN, ModifyMsg modMsg) throws Exception
+  {
+    Operation receivedOp = modMsg.createOperation(connection);
+    assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
+    Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
+    ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
+
+    this.rcvdMods = new ArrayList<>();
+    for (RawModification m : receivedModifyOperation.getRawModifications())
+    {
+      this.rcvdMods.add(m.toModification());
+    }
+    Assertions.assertThat(this.rcvdMods)
+      .as("The received mod does not contain the original change")
+      .contains(mod);
+  }
+
+  private String readAsString(File file) throws FileNotFoundException, IOException
+  {
+    FileInputStream input = new FileInputStream(file);
+    byte[] bytes = new byte[input.available()];
+    input.read(bytes);
+    return new String(bytes);
+  }
 }
diff --git a/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java b/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java
index f9ad618..fa6c12a 100644
--- a/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java
+++ b/opendj-sdk/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java
@@ -30,6 +30,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import org.assertj.core.api.Assertions;
 import org.forgerock.i18n.LocalizableMessage;
@@ -45,17 +46,20 @@
 import org.opends.server.plugins.ShortCircuitPlugin;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.CSNGenerator;
-import org.opends.server.replication.plugin.LDAPReplicationDomain;
 import org.opends.server.replication.protocol.*;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.types.*;
+import org.opends.server.util.TestTimer;
 import org.opends.server.util.TimeThread;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static java.util.concurrent.TimeUnit.*;
+
 import static org.opends.server.TestCaseUtils.*;
 import static org.opends.server.protocols.internal.InternalClientConnection.*;
+import static org.opends.server.replication.plugin.LDAPReplicationDomain.*;
 import static org.opends.server.util.CollectionUtils.*;
 import static org.testng.Assert.*;
 
@@ -66,12 +70,9 @@
 @SuppressWarnings("javadoc")
 public class UpdateOperationTest extends ReplicationTestCase
 {
-
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
-  /**
-   * An entry with a entryUUID.
-   */
+  /** An entry with a entryUUID. */
   private Entry personWithUUIDEntry;
   private Entry personWithSecondUniqueID;
 
@@ -85,9 +86,7 @@
   private String user1entrysecondUUID;
   private String user1entryUUID;
 
-  /**
-   * A "person" entry.
-   */
+  /** A "person" entry. */
   private Entry personEntry;
   private int replServerPort;
   private String domain1uid;
@@ -103,9 +102,7 @@
   private int domainSid = 55;
   private DN baseDN;
 
-  /**
-   * Set up the environment for performing the tests in this Class.
-   */
+  /** Set up the environment for performing the tests in this Class. */
   @BeforeClass
   @Override
   public void setUp() throws Exception
@@ -272,9 +269,7 @@
         "dc:domain3");
   }
 
-  /**
-   * Add an entry in the database.
-   */
+  /** Add an entry in the database. */
   private CSN addEntry(Entry entry) throws Exception
   {
     AddOperation addOp = connection.processAdd(entry);
@@ -283,13 +278,11 @@
     return OperationContext.getCSN(addOp);
   }
 
-  /**
-   * Delete an entry in the database.
-   */
+  /** Delete an entry in the database. */
   private void delEntry(DN dn) throws Exception
   {
     connection.processDelete(dn);
-    assertNull(getEntry(dn, 1000, true));
+    assertNull(getEntry(dn, 1000, false));
   }
 
   /**
@@ -389,9 +382,8 @@
       broker.publish(modMsg);
 
       // Check that the modify has been replayed.
-      boolean found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
-          "telephonenumber", "01 02 45", 10000, true);
-      assertTrue(found, "The first modification was not replayed.");
+      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "telephonenumber", "01 02 45", 10,
+          "The first modification was not replayed.");
 
       // Simulate loss of heartbeats.
       HeartbeatThread.setHeartbeatsDisabled(true);
@@ -405,9 +397,8 @@
       broker.publish(modMsg);
 
       // Check that the modify has been replayed.
-      found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
-          "description", "Description was changed", 10000, true);
-      assertTrue(found, "The second modification was not replayed.");
+      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "description", "Description was changed", 10,
+          "The second modification was not replayed.");
 
       // Delete the entries to clean the database.
       broker.publish(
@@ -546,7 +537,6 @@
     }
   }
 
-
   /**
    * Tests the naming conflict resolution code.
    * In this test, the local server act both as an LDAP server and
@@ -600,9 +590,8 @@
     broker.publish(modMsg);
 
     // check that the modify has been applied as if the entry had been renamed.
-    boolean found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
-                           "telephonenumber", "01 02 45", 10000, true);
-      assertTrue(found, "The modification has not been correctly replayed.");
+      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "telephonenumber", "01 02 45", 10,
+          "The modification has not been correctly replayed.");
     assertEquals(getMonitorDelta(), 1);
       assertConflictAutomaticallyResolved(alertCount);
 
@@ -623,9 +612,8 @@
     broker.publish(modMsg);
 
     // check that the modify has been applied.
-    found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
-                           "uid", "AnotherUid", 10000, true);
-      assertTrue(found, "The modification has not been correctly replayed.");
+      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "uid", "AnotherUid", 10,
+          "The modification has not been correctly replayed.");
     assertEquals(getMonitorDelta(), 1);
 
     /*
@@ -655,9 +643,7 @@
 
     // check that the modify has not been applied
     Thread.sleep(2000);
-    found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
-                           "telephonenumber", "02 01 03 05", 10000, false);
-      assertFalse(found,
+      checkEntryHasNoSuchAttributeValue(personWithUUIDEntry.getName(), "telephonenumber", "02 01 03 05", 10,
           "The modification has been replayed while it should not.");
     assertEquals(getMonitorDelta(), 1);
       assertConflictAutomaticallyResolved(alertCount);
@@ -787,10 +773,7 @@
       assertEquals(getMonitorDelta(), 1);
       assertConflictAutomaticallyResolved(alertCount);
 
-
-    /*
-     * same test but by giving a bad entry DN
-     */
+      /* same test but by giving a bad entry DN */
       DN modDN = DN.valueOf("uid=wrong," + baseDN);
     modDnMsg = new ModifyDNMsg(modDN, gen.newCSN(),
         user1entryUUID, null, false, null, "uid=reallynewrdn");
@@ -948,10 +931,8 @@
         "The conflicting entries were not created");
 
     // check that the 2 conflicting entries have been correctly marked
-    assertTrue(checkEntryHasAttribute(conflictDomain2dn,
-        LDAPReplicationDomain.DS_SYNC_CONFLICT, domain2dn.toString(), 1000, true));
-    assertTrue(checkEntryHasAttribute(conflictDomain3dn,
-        LDAPReplicationDomain.DS_SYNC_CONFLICT, domain3dn.toString(), 1000, true));
+      checkEntryHasAttributeValue(conflictDomain2dn, DS_SYNC_CONFLICT, domain2dn.toString(), 1, null);
+      checkEntryHasAttributeValue(conflictDomain3dn, DS_SYNC_CONFLICT, domain3dn.toString(), 1, null);
 
     // check that unresolved conflict count has been incremented
     assertEquals(getMonitorDelta(), 1);
@@ -1005,8 +986,7 @@
       "The conflicting entries were not created");
 
     // check that the entry have been correctly marked as conflicting.
-    assertTrue(checkEntryHasAttribute(conflictDomain2dn,
-        LDAPReplicationDomain.DS_SYNC_CONFLICT, domain2dn.toString(), 1000, true));
+      checkEntryHasAttributeValue(conflictDomain2dn, DS_SYNC_CONFLICT, domain2dn.toString(), 1, null);
 
     // check that unresolved conflict count has been incremented
     assertEquals(getMonitorDelta(), 1);
@@ -1031,8 +1011,7 @@
       assertConflictAutomaticallyResolved(alertCount);
 
     /*
-     * Check that a conflict is detected when an entry is
-     * moved below an entry that does not exist.
+     * Check that a conflict is detected when an entry is moved below an entry that does not exist.
      */
     updateMonitorCount(baseDN, unresolvedMonitorAttr);
       alertCount = DummyAlertHandler.getAlertCount();
@@ -1047,10 +1026,8 @@
       waitForNonZeroMonitorDelta();
 
       // check that the entry have been correctly marked as conflicting.
-      assertTrue(checkEntryHasAttribute(
-          DN.valueOf("uid=new person," + baseDN2),
-          LDAPReplicationDomain.DS_SYNC_CONFLICT,
-          "uid=newrdn," + baseDN2, 1000, true));
+      checkEntryHasAttributeValue(
+          DN.valueOf("uid=new person," + baseDN2), DS_SYNC_CONFLICT, "uid=newrdn," + baseDN2, 1, null);
     }
     finally
     {
@@ -1060,17 +1037,21 @@
 
   private void waitForNonZeroMonitorDelta() throws Exception, InterruptedException
   {
-    int count = 0;
-    while (count < 2000 && getMonitorDelta() == 0)
-    {
-      // it is possible that the update has not yet been applied
-      // wait a short time and try again.
-      Thread.sleep(100);
-      count++;
-    }
     // if the monitor counter did not get incremented after 200sec
     // then something got wrong.
-    Assertions.assertThat(count).isLessThan(200);
+    TestTimer timer = new TestTimer.Builder()
+      .maxSleep(200, SECONDS)
+      .sleepTimes(100, MILLISECONDS)
+      .toTimer();
+    timer.repeatUntilSuccess(new Callable<Void>()
+    {
+      @Override
+      public Void call() throws Exception
+      {
+        assertNotEquals(getMonitorDelta() , 0);
+        return null;
+      }
+    });
   }
 
   /**
@@ -1117,9 +1098,7 @@
     setUp();
   }
 
-  /**
-   * Tests done using directly the ReplicationBroker interface.
-   */
+  /** Tests done using directly the ReplicationBroker interface. */
   @Test(enabled=true, dataProvider="assured")
   public void updateOperations(boolean assured) throws Exception
   {
@@ -1194,9 +1173,8 @@
       modMsg.setAssured(assured);
       broker.publish(modMsg);
 
-      boolean found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
-          "telephonenumber", "01 02 45", 10000, true);
-      assertTrue(found, "The modification has not been correctly replayed.");
+      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "telephonenumber", "01 02 45", 10,
+          "The modification has not been correctly replayed.");
 
       // Test that replication is able to add attribute that do
       // not exist in the schema.
@@ -1206,9 +1184,8 @@
       modMsg.setAssured(assured);
       broker.publish(modMsg);
 
-      found = checkEntryHasAttribute(
-          personWithUUIDEntry.getName(), "badattribute", "value", 10000, true);
-      assertTrue(found, "The modification has not been correctly replayed.");
+      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "badattribute", "value", 10,
+          "The modification has not been correctly replayed.");
 
       /*
        * Test the Reception of Modify Dn Msg
@@ -1274,10 +1251,7 @@
     throw new RuntimeException("Unhandled type: " + msg.getClass());
   }
 
-  /**
-   * Test case for
-   * [Issue 635] NullPointerException when trying to access non existing entry.
-   */
+  /** Test case for [Issue 635] NullPointerException when trying to access non existing entry. */
   @Test(enabled=true)
   public void deleteNoSuchObject() throws Exception
   {
@@ -1288,11 +1262,7 @@
     assertEquals(op.getResultCode(), ResultCode.NO_SUCH_OBJECT);
   }
 
-  /**
-   * Test case for
-   * [Issue 798]  break infinite loop when problems with naming resolution
-   *              conflict.
-   */
+  /** Test case for [Issue 798] break infinite loop when problems with naming resolution conflict. */
   @Test(enabled=true)
   public void infiniteReplayLoop() throws Exception
   {
@@ -1332,7 +1302,7 @@
           "userPassword: password",
           "initials: AA");
 
-      long initialCount = getMonitorAttrValue(baseDN, "replayed-updates");
+      final long initialCount = getMonitorAttrValue(baseDN, "replayed-updates");
 
       // Get the UUID of the test entry.
       Entry resultEntry = getEntry(tmp.getName(), 1, true);
@@ -1344,28 +1314,27 @@
       try
       {
         // Publish a delete message for this test entry.
-        DeleteMsg delMsg = new DeleteMsg(tmp.getName(), gen.newCSN(), uuid);
-        broker.publish(delMsg);
+        broker.publish(new DeleteMsg(tmp.getName(), gen.newCSN(), uuid));
 
         // Wait for the operation to be replayed.
-        long endTime = System.currentTimeMillis() + 5000;
-        while (getMonitorAttrValue(baseDN, "replayed-updates") == initialCount &&
-             System.currentTimeMillis() < endTime)
+        TestTimer timer = new TestTimer.Builder()
+          .maxSleep(5, SECONDS)
+          .sleepTimes(100, MILLISECONDS)
+          .toTimer();
+        timer.repeatUntilSuccess(new Callable<Void>()
         {
-          Thread.sleep(100);
-        }
+          @Override
+          public Void call() throws Exception
+          {
+            assertNotEquals(getMonitorAttrValue(baseDN, "replayed-updates"), initialCount);
+            return null;
+          }
+        });
       }
       finally
       {
         ShortCircuitPlugin.deregisterShortCircuit(OperationType.DELETE, "PreParse");
       }
-
-      // If the replication replay loop was detected and broken then the
-      // counter will still be updated even though the replay was unsuccessful.
-      if (getMonitorAttrValue(baseDN, "replayed-updates") == initialCount)
-      {
-        fail("Operation was not replayed");
-      }
     }
     finally
     {

--
Gitblit v1.10.0