mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
10.54.2015 28215a00c6a6c49ab982f51dee97d501ce954ad3
Fixed random tests

*.java:
Used TestTimer
Extracted methods
Code cleanup

ReplicationTestCase.java:
In stop() and clearChangelogDB(), added null checks.

UpdateOperationTest.java:
In delEntry(), fixed an off-by-one error
4 files modified
1023 ■■■■ changed files
opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java 326 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java 351 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java 193 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java 153 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java
@@ -29,9 +29,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.MemoryBackend;
@@ -47,14 +48,16 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.AttributeType;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.CollectionUtils.*;
import static org.testng.Assert.*;
/**
@@ -105,7 +108,7 @@
    int brokerId = 2;
    int serverId = 1;
    int replServerId = 81;
    int AddSequenceLength = 30;
    int addSequenceLength = 30;
    cleanDB();
@@ -121,62 +124,36 @@
       * - Configure replication server
       * - check that the last entry has been correctly added
       */
      Entry entry = TestCaseUtils.entryFromLdifString(
          "dn:" + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n"
          + "objectClass: organization\n"
          + "entryuuid: " + stringUID(1) + "\n");
      String entryldif =
        "dn:" + TEST_ROOT_DN_STRING + "\n"
         + "objectClass: top\n"
         + "objectClass: organization\n"
         + "entryuuid: " + stringUID(1) + "\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      int replServerPort = TestCaseUtils.findFreePort();
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddModDelDependencyTestDb",
                                        replicationDbImplementation, 0, replServerId,
                                        0, AddSequenceLength*5+100, null);
      replServer = new ReplicationServer(conf);
      replServer = newReplicationServer(replServerId, addSequenceLength * 5 + 100, "dependencyTestAddModDelDependencyTestDb");
      ReplicationBroker broker = openReplicationSession(
          baseDN, brokerId, 1000, replServerPort, 1000, CLEAN_DB_GENERATION_ID);
          baseDN, brokerId, 1000, replServer.getReplicationPort(), 1000, CLEAN_DB_GENERATION_ID);
      Thread.sleep(2000);
      // send a sequence of add operation
      // send a sequence of add operation
      DN addDN = TEST_ROOT_DN;
      CSNGenerator gen = new CSNGenerator(brokerId, 0L);
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
      {
        entry.removeAttribute(uidType);
        entry.removeAttribute(getAttributeType("entryuuid"));
        entry.addAttribute(Attributes.create("entryuuid", stringUID(sequence+1)),
                           new LinkedList<ByteString>());
        addDN = DN.valueOf("dc=dependency" + sequence + "," + addDN);
        AddMsg addMsg =
          new AddMsg(gen.newCSN(), addDN, stringUID(sequence+1),
                     stringUID(sequence),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        ModifyMsg modifyMsg =
          new ModifyMsg(gen.newCSN(), addDN,
                        generatemods("description", "test"),
                        stringUID(sequence+1));
        broker.publish(modifyMsg);
        broker.publish(addMsg(addDN, entry, sequence + 1, sequence, gen));
        broker.publish(modifyMsg(addDN, sequence + 1, generatemods("description", "test"), gen));
      }
      // configure and start replication of TEST_ROOT_DN_STRING on the server
      SortedSet<String> replServers = new TreeSet<>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
      domainConf.setHeartbeatInterval(100000);
      domain = MultimasterReplication.createNewDomain(domainConf);
      domain.start();
      domain = startNewLDAPReplicationDomain(replServer, baseDN, serverId, 100000);
      // check that last entry in sequence got added.
      Entry lastEntry = getEntry(addDN, 30000, true);
@@ -186,12 +163,11 @@
      // Check that all the modify have been replayed
      // (all the entries should have a description).
      addDN = TEST_ROOT_DN;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
      {
        addDN = DN.valueOf("dc=dependency" + sequence + "," + addDN);
        boolean found = checkEntryHasAttribute(addDN, "description", "test", 10000, true);
        assertTrue(found, "The modification was not replayed on entry " + addDN);
        checkEntryHasAttributeValue(
            addDN, "description", "test", 10, "The modification was not replayed on entry " + addDN);
      }
      /*
@@ -206,14 +182,13 @@
       */
      domain.disable();
      Thread.sleep(2000);  // necesary because disable does not wait
      Thread.sleep(2000);  // necessary because disable does not wait
                           // for full termination of all threads. (issue 1571)
      DN deleteDN = addDN;
      while (sequence-->1)
      {
        DeleteMsg delMsg = new DeleteMsg(deleteDN, gen.newCSN(), stringUID(sequence + 1));
        broker.publish(delMsg);
        broker.publish(delMsg(deleteDN, sequence + 1, gen));
        deleteDN = deleteDN.parent();
      }
@@ -237,6 +212,27 @@
    }
  }
  private AddMsg addMsg(DN addDN, Entry entry, int uniqueId, int parentId, CSNGenerator gen)
  {
    return new AddMsg(gen.newCSN(), addDN, stringUID(uniqueId), stringUID(parentId),
        entry.getObjectClassAttribute(), entry.getAttributes(), null);
  }
  private ModifyMsg modifyMsg(DN dn, int entryUUID, List<Modification> mods, CSNGenerator gen)
  {
    return new ModifyMsg(gen.newCSN(), dn, mods, stringUID(entryUUID));
  }
  private DeleteMsg delMsg(DN delDN, int entryUUID, CSNGenerator gen)
  {
    return new DeleteMsg(delDN, gen.newCSN(), stringUID(entryUUID));
  }
  private ModifyDNMsg modDNMsg(DN dn, String newRDN, int entryUUID, int newSuperiorEntryUUID, CSNGenerator gen)
  {
    return new ModifyDNMsg(dn, gen.newCSN(), stringUID(entryUUID), stringUID(newSuperiorEntryUUID), true, null, newRDN);
  }
  /**
   * Check the dependency between moddn and delete operation
   * when an entry is renamed to a new dn and then deleted.
@@ -257,81 +253,48 @@
    try
    {
      // Create replication server, replication domain and broker.
      String entryldif = "dn:" + TEST_ROOT_DN_STRING + "\n"
      + "objectClass: top\n"
      + "objectClass: organization\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      Entry entry = TestCaseUtils.entryFromLdifString(
          "dn:" + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n"
          + "objectClass: organization\n");
      CSNGenerator gen = new CSNGenerator(brokerId, 0L);
      int renamedEntryUuid = 100;
      int replServerPort = TestCaseUtils.findFreePort();
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "dependencyTestModdnDelDependencyTestDb",
                                        replicationDbImplementation, 0, replServerId,
                                        0, 200, null);
      replServer = new ReplicationServer(conf);
      replServer = newReplicationServer(replServerId, 200, "dependencyTestModdnDelDependencyTestDb");
      // configure and start replication of TEST_ROOT_DN_STRING on the server
      SortedSet<String> replServers = new TreeSet<>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
      domainConf.setHeartbeatInterval(100000);
      Thread.sleep(2000);
      domain = MultimasterReplication.createNewDomain(domainConf);
      domain.start();
      domain = startNewLDAPReplicationDomain(replServer, baseDN, serverId, 100000);
      ReplicationBroker broker = openReplicationSession(
          baseDN, brokerId, 1000, replServerPort, 1000, CLEAN_DB_GENERATION_ID);
          baseDN, brokerId, 1000, replServer.getReplicationPort(), 1000, CLEAN_DB_GENERATION_ID);
      // add an entry to play with.
      entry.removeAttribute(uidType);
      entry.removeAttribute(getAttributeType("entryuuid"));
      entry.addAttribute(Attributes.create("entryuuid",
                         stringUID(renamedEntryUuid)),
                         new LinkedList<ByteString>());
      DN addDN = DN.valueOf("dc=moddndel" + "," + TEST_ROOT_DN_STRING);
      AddMsg addMsg =
          new AddMsg(gen.newCSN(), addDN, stringUID(renamedEntryUuid),
                   stringUID(1),
                   entry.getObjectClassAttribute(),
                   entry.getAttributes(), null );
      broker.publish(addMsg);
      broker.publish(addMsg(addDN, entry, renamedEntryUuid, 1, gen));
      // check that the entry was correctly added
      boolean found = checkEntryHasAttribute(addDN, "entryuuid", stringUID(renamedEntryUuid), 30000, true);
      assertTrue(found, "The initial entry add failed");
      checkEntryHasAttributeValue(addDN, "entryuuid", stringUID(renamedEntryUuid), 30, "The initial entry add failed");
      // disable the domain to make sure that the messages are
      // all sent in a row.
      // disable the domain to make sure that the messages are all sent in a row.
      domain.disable();
      // rename and delete the entry.
      ModifyDNMsg moddnMsg =
          new ModifyDNMsg(addDN, gen.newCSN(),
                        stringUID(renamedEntryUuid),
                        stringUID(1), true, null, "dc=new_name");
      broker.publish(moddnMsg);
      DeleteMsg delMsg =
        new DeleteMsg(DN.valueOf("dc=new_name" + "," + TEST_ROOT_DN_STRING),
                      gen.newCSN(), stringUID(renamedEntryUuid));
      broker.publish(delMsg);
      broker.publish(modDNMsg(addDN, "dc=new_name", renamedEntryUuid, 1, gen));
      DN delDN = DN.valueOf("dc=new_name" + "," + TEST_ROOT_DN_STRING);
      broker.publish(delMsg(delDN, renamedEntryUuid, gen));
      // enable back the domain to trigger message replay.
      domain.enable();
      // check that entry does not exist anymore.
      Thread.sleep(10000);
      found = checkEntryHasAttribute(DN.valueOf("dc=new_name" + "," + TEST_ROOT_DN_STRING),
                                     "entryuuid",
                                     stringUID(renamedEntryUuid),
                                     30000, false);
      assertFalse(found, "The delete dependencies was not correctly enforced");
      checkEntryHasNoSuchAttributeValue(DN.valueOf("dc=new_name" + "," + TEST_ROOT_DN_STRING), "entryuuid",
          stringUID(renamedEntryUuid), 30, "The delete dependencies was not correctly enforced");
    }
    finally
    {
@@ -356,18 +319,15 @@
    TestCaseUtils.initializeTestBackend(false);
    // Create top entry with uuid
    String baseentryldif =
      "dn:" + TEST_ROOT_DN_STRING + "\n"
       + "objectClass: top\n"
       + "objectClass: organization\n"
       + "o: test\n"
       + "entryuuid: " + stringUID(1) + "\n";
    Entry topEntry = TestCaseUtils.entryFromLdifString(baseentryldif);
    Entry topEntry = TestCaseUtils.entryFromLdifString(
        "dn:" + TEST_ROOT_DN_STRING + "\n"
         + "objectClass: top\n"
         + "objectClass: organization\n"
         + "o: test\n"
         + "entryuuid: " + stringUID(1) + "\n");
    MemoryBackend memoryBackend =
      (MemoryBackend) DirectoryServer.getBackend(TEST_BACKEND_ID);
    MemoryBackend memoryBackend = (MemoryBackend) DirectoryServer.getBackend(TEST_BACKEND_ID);
    memoryBackend.addEntry(topEntry, null);
  }
@@ -387,94 +347,58 @@
    int brokerId = 2;
    int serverId = 1;
    int replServerId = 83;
    int AddSequenceLength = 30;
    int addSequenceLength = 30;
    cleanDB();
    try
    {
      String entryldif = "dn:" + TEST_ROOT_DN_STRING + "\n"
      + "objectClass: top\n"
      + "objectClass: organization\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      Entry entry = TestCaseUtils.entryFromLdifString(
          "dn:" + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n"
          + "objectClass: organization\n");
      int replServerPort = TestCaseUtils.findFreePort();
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddDelAddDependencyTestDb", replicationDbImplementation,
                                        0, replServerId, 0, 5*AddSequenceLength+100, null);
      replServer = new ReplicationServer(conf);
      replServer = newReplicationServer(replServerId, 5 * addSequenceLength + 100, "dependencyTestAddDelAddDependencyTestDb");
      ReplicationBroker broker = openReplicationSession(
          baseDN, brokerId, 100, replServerPort, 1000, CLEAN_DB_GENERATION_ID);
          baseDN, brokerId, 100, replServer.getReplicationPort(), 1000, CLEAN_DB_GENERATION_ID);
      // send a sequence of add/del/add operations
      CSNGenerator gen = new CSNGenerator(brokerId, 0L);
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
      {
        // add the entry a first time
        entry.removeAttribute(uidType);
        entry.removeAttribute(getAttributeType("entryuuid"));
        entry.addAttribute(Attributes.create("entryuuid", stringUID(sequence+1)),
                           new LinkedList<ByteString>());
        DN addDN = DN.valueOf("dc=dependency" + sequence + "," + TEST_ROOT_DN_STRING);
        AddMsg addMsg =
          new AddMsg(gen.newCSN(), addDN, stringUID(sequence+1),
                     stringUID(1),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        // delete the entry
        DeleteMsg delMsg = new DeleteMsg(addDN, gen.newCSN(),
                                         stringUID(sequence+1));
        broker.publish(delMsg);
        broker.publish(addMsg(addDN, entry, sequence + 1, 1, gen));
        broker.publish(delMsg(addDN, sequence + 1, gen));
        // add again the entry with a new entryuuid.
        entry.removeAttribute(uidType);
        entry.removeAttribute(getAttributeType("entryuuid"));
        entry.addAttribute(Attributes.create("entryuuid", stringUID(sequence+1025)),
                           new LinkedList<ByteString>());
        addMsg =
          new AddMsg(gen.newCSN(), addDN, stringUID(sequence+1025),
                     stringUID(1),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        broker.publish(addMsg(addDN, entry, sequence + 1025, 1, gen));
      }
      // configure and start replication of TEST_ROOT_DN_STRING on the server
      SortedSet<String> replServers = new TreeSet<>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
      domain = MultimasterReplication.createNewDomain(domainConf);
      domain.start();
      domain = startNewLDAPReplicationDomain(replServer, baseDN, serverId, -1);
      // check that all entries have been deleted and added
      // again by checking that they do have the correct entryuuid
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
      {
        String addDn = "dc=dependency" + sequence + "," + TEST_ROOT_DN_STRING;
        boolean found =
          checkEntryHasAttribute(DN.valueOf(addDn), "entryuuid",
                                 stringUID(sequence+1025),
                                 30000, true);
        if (!found)
        {
          fail("The second add was not replayed on entry " + addDn);
        }
        checkEntryHasAttributeValue(DN.valueOf(addDn), "entryuuid", stringUID(sequence + 1025), 30,
            "The second add was not replayed on entry " + addDn);
      }
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
      {
        DN deleteDN = DN.valueOf("dc=dependency" + sequence + "," + TEST_ROOT_DN_STRING);
        DeleteMsg delMsg = new DeleteMsg(deleteDN,
                                         gen.newCSN(),
                                         stringUID(sequence + 1025));
        broker.publish(delMsg);
        broker.publish(delMsg(deleteDN, sequence + 1025, gen));
      }
      // check that the database was cleaned successfully
@@ -493,6 +417,28 @@
    }
  }
  private ReplicationServer newReplicationServer(int replServerId, int windowSize, String dirName) throws Exception
  {
    int replServerPort = TestCaseUtils.findFreePort();
    ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
        replServerPort, dirName, replicationDbImplementation, 0, replServerId, 0, windowSize, null);
    return new ReplicationServer(conf);
  }
  private LDAPReplicationDomain startNewLDAPReplicationDomain(ReplicationServer replServer, DN baseDN, int serverId,
      int heartBeatInterval) throws ConfigException
  {
    SortedSet<String> replServers = newTreeSet("localhost:" + replServer.getReplicationPort());
    DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
    if (heartBeatInterval > 0)
    {
      domainConf.setHeartbeatInterval(heartBeatInterval);
    }
    LDAPReplicationDomain domain = MultimasterReplication.createNewDomain(domainConf);
    domain.start();
    return domain;
  }
  /**
   * Check that the dependency of moddn operation are working by
   * issuing a set of Add operation followed by a modrdn of the added entry.
@@ -506,29 +452,21 @@
    int brokerId = 2;
    int serverId = 1;
    int replServerId = 84;
    int AddSequenceLength = 30;
    int addSequenceLength = 30;
    cleanDB();
    try
    {
      String entryldif = "dn:" + TEST_ROOT_DN_STRING + "\n"
      + "objectClass: top\n"
      + "objectClass: organization\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      Entry entry = TestCaseUtils.entryFromLdifString(
          "dn:" + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n"
          + "objectClass: organization\n");
      int replServerPort = TestCaseUtils.findFreePort();
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "dependencyTestAddModdnDependencyTestDb", replicationDbImplementation,
                                        0, replServerId, 0, 5*AddSequenceLength+100, null);
      replServer = new ReplicationServer(conf);
      replServer = newReplicationServer(replServerId, 5 * addSequenceLength + 100, "dependencyTestAddModdnDependencyTestDb");
      ReplicationBroker broker = openReplicationSession(
          baseDN, brokerId, 100, replServerPort, 1000, CLEAN_DB_GENERATION_ID);
          baseDN, brokerId, 100, replServer.getReplicationPort(), 1000, CLEAN_DB_GENERATION_ID);
      DN addDN = TEST_ROOT_DN;
@@ -536,36 +474,24 @@
      // send a sequence of add/modrdn operations
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
      {
        // add the entry
        entry.removeAttribute(uidType);
        entry.removeAttribute(getAttributeType("entryuuid"));
        entry.addAttribute(Attributes.create("entryuuid", stringUID(sequence+1)),
                           new LinkedList<ByteString>());
        addDN = DN.valueOf("dc=dependency" + sequence + "," + TEST_ROOT_DN_STRING);
        AddMsg addMsg =
          new AddMsg(gen.newCSN(), addDN, stringUID(sequence+1),
                     stringUID(1),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        broker.publish(addMsg(addDN, entry, sequence + 1, 1, gen));
        // rename the entry
        ModifyDNMsg moddnMsg =
          new ModifyDNMsg(addDN, gen.newCSN(), stringUID(sequence+1),
                          stringUID(1), true, null, "dc=new_dep" + sequence);
        broker.publish(moddnMsg);
        broker.publish(modDNMsg(addDN, "dc=new_dep" + sequence, sequence + 1, 1, gen));
      }
      // configure and start replication of TEST_ROOT_DN_STRING on the server
      SortedSet<String> replServers = new TreeSet<>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf = new DomainFakeCfg(baseDN, serverId, replServers);
      domain = MultimasterReplication.createNewDomain(domainConf);
      domain.start();
      domain = startNewLDAPReplicationDomain(replServer, baseDN, serverId, -1);
      // check that all entries have been renamed
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
      {
        addDN = DN.valueOf("dc=new_dep" + sequence + "," + TEST_ROOT_DN_STRING);
        Entry baseEntry = getEntry(addDN, 30000, true);
@@ -574,11 +500,10 @@
      }
      // delete the entries to clean the database.
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      for (sequence = 1; sequence<=addSequenceLength; sequence ++)
      {
        addDN = DN.valueOf("dc=new_dep" + sequence + "," + TEST_ROOT_DN_STRING);
        DeleteMsg delMsg = new DeleteMsg(addDN, gen.newCSN(), stringUID(sequence + 1));
        broker.publish(delMsg);
        broker.publish(delMsg(addDN, sequence + 1, gen));
      }
    }
    finally
@@ -601,5 +526,4 @@
  {
    return String.format("11111111-1111-1111-1111-%012x", i);
  }
}
opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
@@ -32,13 +32,14 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ModificationType;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SearchScope;
import org.opends.server.DirectoryServerTestCase;
@@ -71,12 +72,17 @@
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.util.TestTimer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static java.util.concurrent.TimeUnit.*;
import static org.forgerock.opendj.ldap.ModificationType.*;
import static org.forgerock.opendj.ldap.ResultCode.*;
import static org.forgerock.opendj.ldap.SearchScope.*;
import static org.opends.server.backends.task.TaskState.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.protocols.internal.Requests.*;
import static org.opends.server.util.CollectionUtils.*;
@@ -248,33 +254,31 @@
  }
  /**
   * Check connection of the provided ds to the
   * replication server. Waits for connection to be ok up to secTimeout seconds
   * before failing.
   * Check connection of the provided ds to the replication server. Waits for connection to be ok up
   * to secTimeout seconds before failing.
   */
  protected void checkConnection(int secTimeout, ReplicationBroker rb, int rsPort) throws Exception
  protected void checkConnection(int secTimeout, final ReplicationBroker rb, int rsPort) throws Exception
  {
    int nSec = 0;
    // Go out of the loop only if connection is verified or if timeout occurs
    while (true)
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(secTimeout, SECONDS)
      .sleepTimes(1, SECONDS)
      .toTimer();
    timer.repeatUntilSuccess(new Callable<Void>()
    {
      if (rb.isConnected())
      @Override
      public Void call() throws Exception
      {
        logger.trace("checkConnection: connection of broker "
          + rb.getServerId() + " to RS " + rb.getRsGroupId()
          + " obtained after " + nSec + " seconds.");
        return;
        if (rb.isConnected())
        {
          logger.trace("checkConnection: connection of broker " + rb.getServerId()
              + " to RS " + rb.getRsGroupId() + " obtained.");
          return null;
        }
        rb.start();
        return null;
      }
      Thread.sleep(1000);
      rb.start();
      nSec++;
      assertTrue(nSec <= secTimeout,
          "checkConnection: DS " + rb.getServerId() + " is not connected to "
              + "the RS port " + rsPort + " after " + secTimeout + " seconds.");
    }
    });
  }
  protected void deleteEntry(DN dn) throws Exception
@@ -372,6 +376,11 @@
  protected void clearChangelogDB(ReplicationServer rs) throws Exception
  {
    if (rs == null)
    {
      return;
    }
    if (replicationDbImplementation == ReplicationDBImplementation.JE)
    {
      ((JEChangelogDB) rs.getChangelogDB()).clearDB();
@@ -418,6 +427,10 @@
  protected void stop(ReplicationBroker... brokers)
  {
    if (brokers == null)
    {
      return;
    }
    for (ReplicationBroker broker : brokers)
    {
      if (broker != null)
@@ -484,83 +497,91 @@
   * @return The monitor value
   * @throws Exception If an error occurs.
   */
  protected long getMonitorAttrValue(DN baseDN, String attr) throws Exception
  protected long getMonitorAttrValue(final DN baseDN, final String attr) throws Exception
  {
    String monitorFilter = "(&(cn=Directory server*)(domain-name=" + baseDN + "))";
    InternalSearchOperation op;
    int count = 0;
    do
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(10, SECONDS)
      .sleepTimes(100, MILLISECONDS)
      .toTimer();
    return timer.repeatUntilSuccess(new Callable<Long>()
    {
      if (count++>0)
      @Override
      public Long call() throws Exception
      {
        Thread.sleep(100);
      }
      op = connection.processSearch(newSearchRequest("cn=replication,cn=monitor", WHOLE_SUBTREE, monitorFilter));
    }
    while (op.getSearchEntries().isEmpty() && count<100);
    assertFalse(op.getSearchEntries().isEmpty(), "Could not read monitoring information");
        String monitorFilter = "(&(cn=Directory server*)(domain-name=" + baseDN + "))";
        InternalSearchOperation op =
            connection.processSearch(newSearchRequest("cn=replication,cn=monitor", WHOLE_SUBTREE, monitorFilter));
        Assertions.assertThat(op.getSearchEntries()).as("Could not read monitoring information").isNotEmpty();
    SearchResultEntry entry = op.getSearchEntries().getFirst();
    return entry.parseAttribute(attr).asLong();
        SearchResultEntry entry = op.getSearchEntries().getFirst();
        return entry.parseAttribute(attr).asLong();
      }
    });
  }
  /**
   * Check that the entry with the given dn has the given valueString value
   * for the given attrTypeStr attribute type.
   */
  protected boolean checkEntryHasAttribute(DN dn, String attrTypeStr,
      String valueString, int timeout, boolean hasAttribute) throws Exception
  protected void checkEntryHasAttributeValue(final DN dn, final String attrTypeStr, final String valueString,
      int timeoutInSecs, String notFoundErrorMsg) throws Exception
  {
    boolean found = false;
    int count = timeout/100;
    if (count<1)
    {
      count=1;
    }
    checkEntryHasAttribute(dn, attrTypeStr, valueString, timeoutInSecs, true, notFoundErrorMsg);
  }
    do
    {
      final Entry newEntry = DirectoryServer.getEntry(dn);
      if (newEntry != null)
      {
        List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
        if (tmpAttrList != null && !tmpAttrList.isEmpty())
        {
          Attribute tmpAttr = tmpAttrList.get(0);
          found = tmpAttr.contains(ByteString.valueOf(valueString));
        }
      }
  protected void checkEntryHasNoSuchAttributeValue(final DN dn, final String attrTypeStr, final String valueString,
      int timeoutInSecs, String foundErrorMsg) throws Exception
  {
    checkEntryHasAttribute(dn, attrTypeStr, valueString, timeoutInSecs, false, foundErrorMsg);
  }
      if (found != hasAttribute)
  protected boolean checkEntryHasAttribute(final DN dn, final String attrTypeStr, final String valueString,
      int timeout, final boolean expectedAttributeValueFound) throws Exception
  {
    checkEntryHasAttribute(dn, attrTypeStr, valueString, timeout / 1000, expectedAttributeValueFound, null);
    return expectedAttributeValueFound;
  }
  private void checkEntryHasAttribute(final DN dn, final String attrTypeStr, final String valueString,
      int timeoutInSecs, final boolean expectedAttributeValueFound, final String foundMsg) throws Exception
  {
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(timeoutInSecs, SECONDS)
      .sleepTimes(100, MILLISECONDS)
      .toTimer();
    timer.repeatUntilSuccess(new Callable<Void>()
    {
      @Override
      public Void call() throws Exception
      {
        Thread.sleep(100);
        final Entry newEntry = DirectoryServer.getEntry(dn);
        assertNotNull(newEntry);
        List<Attribute> attrList = newEntry.getAttribute(attrTypeStr);
        Assertions.assertThat(attrList).isNotEmpty();
        Attribute attr = attrList.get(0);
        boolean foundAttributeValue = attr.contains(ByteString.valueOf(valueString));
        assertEquals(foundAttributeValue, expectedAttributeValueFound, foundMsg);
        return null;
      }
    } while (--count > 0 && found != hasAttribute);
    return found;
    });
  }
  /**
   * Retrieves an entry from the local Directory Server.
   * @throws Exception When the entry cannot be locked.
   */
  protected Entry getEntry(DN dn, int timeout, boolean exist) throws Exception
  protected Entry getEntry(final DN dn, int timeoutInMillis, final boolean exist) throws Exception
  {
    int count = timeout/200;
    if (count<1)
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(timeoutInMillis, MILLISECONDS)
      .sleepTimes(200, MILLISECONDS)
      .toTimer();
    timer.repeatUntilSuccess(new Callable<Void>()
    {
      count=1;
    }
    Thread.sleep(50);
    boolean found = DirectoryServer.entryExists(dn);
    while (count> 0 && found != exist)
    {
      Thread.sleep(200);
      found = DirectoryServer.entryExists(dn);
      count--;
    }
      @Override
      public Void call() throws Exception
      {
        assertEquals(DirectoryServer.entryExists(dn), exist);
        return null;
      }
    });
    Entry entry = DirectoryServer.getEntry(dn);
    if (entry != null)
    {
@@ -600,43 +621,34 @@
  protected List<Modification> generatemods(String attrName, String attrValue)
  {
    Attribute attr = Attributes.create(attrName, attrValue);
    List<Modification> mods = new ArrayList<>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
    return newArrayList(new Modification(REPLACE, attr));
  }
  /** Utility method to create, run a task and check its result. */
  protected void task(String task) throws Exception
  {
    Entry taskEntry = TestCaseUtils.addEntry(task);
    final Entry taskEntry = TestCaseUtils.addEntry(task);
    // Wait until the task completes.
    Entry resultEntry = null;
    String completionTime = null;
    long startMillisecs = System.currentTimeMillis();
    do
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(30, SECONDS)
      .sleepTimes(20, MILLISECONDS)
      .toTimer();
    Entry resultEntry = timer.repeatUntilSuccess(new Callable<Entry>()
    {
      final SearchRequest request = newSearchRequest(taskEntry.getName(), SearchScope.BASE_OBJECT);
      InternalSearchOperation searchOperation = connection.processSearch(request);
      if (searchOperation.getSearchEntries().isEmpty())
      @Override
      public Entry call() throws Exception
      {
        continue;
        final SearchRequest request = newSearchRequest(taskEntry.getName(), SearchScope.BASE_OBJECT);
        InternalSearchOperation searchOperation = connection.processSearch(request);
        Assertions.assertThat(searchOperation.getSearchEntries()).isNotEmpty();
        Entry resultEntry = searchOperation.getSearchEntries().get(0);
        String completionTime = resultEntry.parseAttribute(
            ATTR_TASK_COMPLETION_TIME.toLowerCase()).asString();
        assertNotNull(completionTime, "The task has not completed");
        return resultEntry;
      }
      resultEntry = searchOperation.getSearchEntries().get(0);
      completionTime = resultEntry.parseAttribute(
          ATTR_TASK_COMPLETION_TIME.toLowerCase()).asString();
      if (completionTime == null)
      {
        if (System.currentTimeMillis() - startMillisecs > 1000*30)
        {
          break;
        }
        Thread.sleep(10);
      }
    } while (completionTime == null);
    assertNotNull(completionTime, "The task has not completed after 30 seconds.");
    });
    // Check that the task state is as expected.
    String stateString = resultEntry.parseAttribute(
@@ -647,14 +659,12 @@
  }
  /**
   * Create a new replication session security object that can be used in
   * unit tests.
   * Create a new replication session security object that can be used in unit tests.
   *
   * @return A new replication session security object.
   * @throws ConfigException If an error occurs.
   */
  protected static ReplSessionSecurity getReplSessionSecurity()
       throws ConfigException
  protected static ReplSessionSecurity getReplSessionSecurity() throws ConfigException
  {
    return new ReplSessionSecurity(null, null, null, true);
  }
@@ -704,37 +714,34 @@
    logger.trace("AddedTask/" + taskEntry.getName());
  }
  protected void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
  protected void waitTaskState(final Entry taskEntry, final TaskState expectedTaskState,
      long maxWaitTimeInMillis, LocalizableMessage expectedMessage) throws Exception
  {
    long startTime = System.currentTimeMillis();
    Entry resultEntry = null;
    TaskState taskState = null;
    do
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(maxWaitTimeInMillis, MILLISECONDS)
      .sleepTimes(100, MILLISECONDS)
      .toTimer();
    Entry resultEntry = timer.repeatUntilSuccess(new Callable<Entry>()
    {
      final SearchRequest request = newSearchRequest(taskEntry.getName(), SearchScope.BASE_OBJECT);
      InternalSearchOperation searchOperation = connection.processSearch(request);
      resultEntry = searchOperation.getSearchEntries().getFirst();
      @Override
      public Entry call() throws Exception
      {
        final SearchRequest request = newSearchRequest(taskEntry.getName(), SearchScope.BASE_OBJECT);
        InternalSearchOperation searchOperation = connection.processSearch(request);
        Entry resultEntry = searchOperation.getSearchEntries().getFirst();
      // Check that the task state is as expected.
      String stateString = resultEntry.parseAttribute(
          ATTR_TASK_STATE.toLowerCase()).asString();
      taskState = TaskState.fromString(stateString);
      Thread.sleep(100);
    }
    while (taskState != expectedTaskState
        && taskState != TaskState.STOPPED_BY_ERROR
        && taskState != TaskState.COMPLETED_SUCCESSFULLY
        && System.currentTimeMillis() - startTime < maxWaitTimeInMillis);
        TaskState taskState = getTaskState(resultEntry);
        Assertions.assertThat(taskState).isIn(expectedTaskState, STOPPED_BY_ERROR, COMPLETED_SUCCESSFULLY);
        return resultEntry;
      }
    });
    // Check that the task contains some log messages.
    Set<String> logMessages = resultEntry.parseAttribute(
        ATTR_TASK_LOG_MESSAGES.toLowerCase()).asSetOfString();
    if (taskState != TaskState.COMPLETED_SUCCESSFULLY
        && taskState != TaskState.RUNNING)
    TaskState taskState = getTaskState(resultEntry);
    if (taskState != COMPLETED_SUCCESSFULLY && taskState != RUNNING)
    {
      assertFalse(logMessages.isEmpty(),
          "No log messages were written to the task entry on a failed task");
@@ -750,8 +757,7 @@
      }
    }
    if (expectedTaskState == TaskState.RUNNING
        && taskState == TaskState.COMPLETED_SUCCESSFULLY)
    if (expectedTaskState == RUNNING && taskState == COMPLETED_SUCCESSFULLY)
    {
      // We usually wait the running state after adding the task
      // and if the task is fast enough then it may be already done
@@ -764,6 +770,12 @@
    }
  }
  private TaskState getTaskState(Entry resultEntry)
  {
    String stateString = resultEntry.parseAttribute(ATTR_TASK_STATE.toLowerCase()).asString();
    return TaskState.fromString(stateString);
  }
  /** Add to the current DB the entries necessary to the test. */
  protected void addTestEntriesToDB(String... ldifEntries) throws Exception
  {
@@ -789,28 +801,25 @@
   * @throws Exception if the entry does not exist or does not have
   *                   an entryUUID.
   */
  protected String getEntryUUID(DN dn) throws Exception
  protected String getEntryUUID(final DN dn) throws Exception
  {
    int count = 10;
    String found = null;
    while (count > 0 && found == null)
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(1, SECONDS)
      .sleepTimes(100, MILLISECONDS)
      .toTimer();
    return timer.repeatUntilSuccess(new Callable<String>()
    {
      Thread.sleep(100);
      Entry newEntry = DirectoryServer.getEntry(dn);
      if (newEntry != null)
      @Override
      public String call() throws Exception
      {
        Entry newEntry = DirectoryServer.getEntry(dn);
        assertNotNull(newEntry);
        Attribute attribute = newEntry.getAttribute("entryuuid").get(0);
        for (ByteString val : attribute)
        {
          found = val.toString();
          break;
        }
        String found = attribute.iterator().next().toString();
        assertNotNull(found, "Entry: " + dn + " Could not be found.");
        return found;
      }
      count --;
    }
    assertNotNull(found, "Entry: " + dn + " Could not be found.");
    return found;
    });
  }
  /** Utility method : removes a domain deleting the passed config entry */
@@ -903,25 +912,27 @@
   * Performs an internal search, waiting for at most 3 seconds for expected result code and expected
   * number of entries.
   */
  protected InternalSearchOperation waitForSearchResult(String dn, SearchScope scope, String filter,
      ResultCode expectedResultCode, int expectedNbEntries) throws Exception
  protected InternalSearchOperation waitForSearchResult(final String dn, final SearchScope scope, final String filter,
      final ResultCode expectedResultCode, final int expectedNbEntries) throws Exception
  {
    InternalSearchOperation searchOp = null;
    int count = 0;
    do
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(3, SECONDS)
      .sleepTimes(10, MILLISECONDS)
      .toTimer();
    return timer.repeatUntilSuccess(new Callable<InternalSearchOperation>()
    {
      Thread.sleep(10);
      final SearchRequest request = newSearchRequest(dn, scope, filter).addAttribute("*", "+");
      searchOp = connection.processSearch(request);
      count++;
    }
    while (count < 300
        && searchOp.getResultCode() != expectedResultCode
        && searchOp.getSearchEntries().size() != expectedNbEntries);
    final List<SearchResultEntry> entries = searchOp.getSearchEntries();
    Assertions.assertThat(entries).hasSize(expectedNbEntries);
    return searchOp;
      @Override
      public InternalSearchOperation call() throws Exception
      {
        final SearchRequest request = newSearchRequest(dn, scope, filter).addAttribute("*", "+");
        InternalSearchOperation searchOp = connection.processSearch(request);
        SoftAssertions softly = new SoftAssertions();
        softly.assertThat(searchOp.getResultCode()).isEqualTo(expectedResultCode);
        softly.assertThat(searchOp.getSearchEntries()).hasSize(expectedNbEntries);
        softly.assertAll();
        return searchOp;
      }
    });
  }
  protected static void setReplicationDBImplementation(ReplicationDBImplementation impl)
opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java
@@ -28,15 +28,18 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.assertj.core.api.Assertions;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.ModificationType;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.server.SynchronizationProviderCfg;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
@@ -46,30 +49,34 @@
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.*;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.Modification;
import org.opends.server.types.Operation;
import org.opends.server.types.RawModification;
import org.opends.server.util.TestTimer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static java.util.concurrent.TimeUnit.*;
import static org.forgerock.opendj.ldap.ModificationType.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.CollectionUtils.*;
import static org.testng.Assert.*;
/**
 * Test for the schema replication.
 */
/** Test for the schema replication. */
@SuppressWarnings("javadoc")
public class SchemaReplicationTest extends ReplicationTestCase
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private List<Modification> rcvdMods;
  private int replServerPort;
  /**
   * Set up the environment for performing the tests in this Class.
   */
  /** Set up the environment for performing the tests in this Class. */
  @Override
  @BeforeClass
  public void setUp() throws Exception
@@ -107,10 +114,7 @@
    configureReplication(replServerLdif, domainLdif);
  }
  /**
   * Checks that changes done to the schema are pushed to the replicationServer
   * clients.
   */
  /** Checks that changes done to the schema are pushed to the replicationServer clients. */
  @Test
  public void pushSchemaChange() throws Exception
  {
@@ -128,45 +132,18 @@
      // Modify the schema
      Attribute attr = Attributes.create("attributetypes",
          "( 2.5.44.77.33 NAME 'dummy' )");
      List<Modification> mods = new ArrayList<>();
      Modification mod = new Modification(ModificationType.ADD, attr);
      mods.add(mod);
      ModifyOperation modOp = connection.processModify(baseDN, mods);
      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS,
          "The original operation failed");
      Modification mod = new Modification(ADD, attr);
      processModify(baseDN, mod);
      // See if the client has received the msg
      ReplicationMsg msg = broker.receive();
      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
      ModifyMsg modMsg = (ModifyMsg) msg;
      ModifyMsg modMsg = receiveModifyMsg(broker);
      assertModReceived(mod, baseDN, modMsg);
      Operation receivedOp = modMsg.createOperation(connection);
      assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
      Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
      ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
      this.rcvdMods = new ArrayList<>();
      for (RawModification m : receivedModifyOperation.getRawModifications())
      {
        this.rcvdMods.add(m.toModification());
      }
      assertTrue(this.rcvdMods.contains(mod),
                 "The received mod does not contain the original change");
      /*
       * Now cleanup the schema for the next test
       */
      mod = new Modification(ModificationType.DELETE, attr);
      mods.clear();
      mods.add(mod);
      modOp = connection.processModify(baseDN, mods);
      assertEquals(modOp.getResultCode(), ResultCode.SUCCESS,
          "The original operation failed");
      /* Now cleanup the schema for the next test */
      processModify(baseDN, new Modification(DELETE, attr));
      // See if the client has received the msg
      msg = broker.receive();
      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
      receiveModifyMsg(broker);
    }
    finally
    {
@@ -174,6 +151,12 @@
    }
  }
  private void processModify(final DN baseDN, Modification mod)
  {
    ModifyOperation modOp = connection.processModify(baseDN, newArrayList(mod));
    assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
  }
  /**
   * Checks that changes to the schema pushed to the replicationServer
   * are received and correctly replayed by replication plugin.
@@ -198,10 +181,8 @@
          EntryHistorical.getEntryUUID(DirectoryServer.getEntry(baseDN)));
      broker.publish(modMsg);
      boolean found = checkEntryHasAttribute(baseDN, "attributetypes",
        "( 2.5.44.77.33 NAME 'dummy' )",
        10000, true);
      assertTrue(found, "The modification has not been correctly replayed.");
      checkEntryHasAttributeValue(baseDN, "attributetypes", "( 2.5.44.77.33 NAME 'dummy' )", 10,
          "The modification has not been correctly replayed.");
    }
    finally
    {
@@ -231,73 +212,91 @@
      // create a schema change Notification
      Attribute attr = Attributes.create("attributetypes",
        "( 2.5.44.76.35 NAME 'push' )");
      List<Modification> mods = new ArrayList<>();
      Modification mod = new Modification(ModificationType.ADD, attr);
      mods.add(mod);
      Modification mod = new Modification(ADD, attr);
      List<Modification> mods = newArrayList(mod);
      for (SynchronizationProvider<SynchronizationProviderCfg> provider : DirectoryServer.
        getSynchronizationProviders())
      for (SynchronizationProvider<?> provider : getSynchronizationProviders())
      {
        provider.processSchemaChange(mods);
      }
      // receive the message on the broker side.
      ReplicationMsg msg = broker.receive();
      Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
      ModifyMsg modMsg = (ModifyMsg) msg;
      Operation receivedOp = modMsg.createOperation(connection);
      assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
      Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
      ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
      this.rcvdMods = new ArrayList<>();
      for (RawModification m : receivedModifyOperation.getRawModifications())
      {
        this.rcvdMods.add(m.toModification());
      }
      assertTrue(this.rcvdMods.contains(mod),
        "The received mod does not contain the original change");
      ModifyMsg modMsg = receiveModifyMsg(broker);
      assertModReceived(mod, baseDN, modMsg);
      // check that the schema files were updated with the new ServerState.
      // by checking that the CSN of msg we just received has been
      // added to the user schema file.
      // build the string to find in the schema file
      String stateStr = modMsg.getCSN().toString();
      final String stateStr = modMsg.getCSN().toString();
      // open the schema file
      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
      String buildDir = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
              buildRoot + File.separator + "build");
      String path = buildDir + File.separator +
        "unit-tests" + File.separator + "package-instance" + File.separator +
        "config" + File.separator + "schema" + File.separator +
        "99-user.ldif";
      final File schemaFile = getSchemaFile();
      // it is necessary to loop on this check because the state is not
      // written immediately but only every so often.
      int count = 0;
      while (true)
      TestTimer timer = new TestTimer.Builder()
        .maxSleep(5, SECONDS)
        .sleepTimes(100, MILLISECONDS)
        .toTimer();
      timer.repeatUntilSuccess(new Callable<Void>()
      {
        File file = new File(path);
        FileInputStream input = new FileInputStream(file);
        byte[] bytes = new byte[input.available()];
        input.read(bytes);
        String fileStr = new String(bytes);
        if (fileStr.contains(stateStr))
        @Override
        public Void call() throws Exception
        {
          break;
          String fileStr = readAsString(schemaFile);
          assertTrue(fileStr.contains(stateStr), "The Schema persistentState (CSN:" + stateStr
              + ") has not been saved to " + schemaFile + " : " + fileStr);
          return null;
        }
        assertTrue(count++ <= 50, "The Schema persistentState (CSN:" + stateStr
            + ") has not been saved to " + path + " : " + fileStr);
        TestCaseUtils.sleep(100);
      }
      });
    } finally
    {
      broker.stop();
    }
    logger.error(LocalizableMessage.raw("Ending replication test : pushSchemaFilesChange "));
  }
  private File getSchemaFile()
  {
    String sep = File.separator;
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String buildDir = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + sep + "target");
    final String path = buildDir + sep
        + "unit-tests" + sep + "package-instance" + sep + "config" + sep + "schema" + sep + "99-user.ldif";
    return new File(path);
  }
  private ModifyMsg receiveModifyMsg(ReplicationBroker broker) throws SocketTimeoutException
  {
    ReplicationMsg msg = broker.receive();
    Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class);
    return (ModifyMsg) msg;
  }
  private void assertModReceived(Modification mod, final DN baseDN, ModifyMsg modMsg) throws Exception
  {
    Operation receivedOp = modMsg.createOperation(connection);
    assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema");
    Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class);
    ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp;
    this.rcvdMods = new ArrayList<>();
    for (RawModification m : receivedModifyOperation.getRawModifications())
    {
      this.rcvdMods.add(m.toModification());
    }
    Assertions.assertThat(this.rcvdMods)
      .as("The received mod does not contain the original change")
      .contains(mod);
  }
  private String readAsString(File file) throws FileNotFoundException, IOException
  {
    FileInputStream input = new FileInputStream(file);
    byte[] bytes = new byte[input.available()];
    input.read(bytes);
    return new String(bytes);
  }
}
opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java
@@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import org.assertj.core.api.Assertions;
import org.forgerock.i18n.LocalizableMessage;
@@ -45,17 +46,20 @@
import org.opends.server.plugins.ShortCircuitPlugin;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.*;
import org.opends.server.util.TestTimer;
import org.opends.server.util.TimeThread;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static java.util.concurrent.TimeUnit.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.protocols.internal.InternalClientConnection.*;
import static org.opends.server.replication.plugin.LDAPReplicationDomain.*;
import static org.opends.server.util.CollectionUtils.*;
import static org.testng.Assert.*;
@@ -66,12 +70,9 @@
@SuppressWarnings("javadoc")
public class UpdateOperationTest extends ReplicationTestCase
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /**
   * An entry with a entryUUID.
   */
  /** An entry with a entryUUID. */
  private Entry personWithUUIDEntry;
  private Entry personWithSecondUniqueID;
@@ -85,9 +86,7 @@
  private String user1entrysecondUUID;
  private String user1entryUUID;
  /**
   * A "person" entry.
   */
  /** A "person" entry. */
  private Entry personEntry;
  private int replServerPort;
  private String domain1uid;
@@ -103,9 +102,7 @@
  private int domainSid = 55;
  private DN baseDN;
  /**
   * Set up the environment for performing the tests in this Class.
   */
  /** Set up the environment for performing the tests in this Class. */
  @BeforeClass
  @Override
  public void setUp() throws Exception
@@ -272,9 +269,7 @@
        "dc:domain3");
  }
  /**
   * Add an entry in the database.
   */
  /** Add an entry in the database. */
  private CSN addEntry(Entry entry) throws Exception
  {
    AddOperation addOp = connection.processAdd(entry);
@@ -283,13 +278,11 @@
    return OperationContext.getCSN(addOp);
  }
  /**
   * Delete an entry in the database.
   */
  /** Delete an entry in the database. */
  private void delEntry(DN dn) throws Exception
  {
    connection.processDelete(dn);
    assertNull(getEntry(dn, 1000, true));
    assertNull(getEntry(dn, 1000, false));
  }
  /**
@@ -389,9 +382,8 @@
      broker.publish(modMsg);
      // Check that the modify has been replayed.
      boolean found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
          "telephonenumber", "01 02 45", 10000, true);
      assertTrue(found, "The first modification was not replayed.");
      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "telephonenumber", "01 02 45", 10,
          "The first modification was not replayed.");
      // Simulate loss of heartbeats.
      HeartbeatThread.setHeartbeatsDisabled(true);
@@ -405,9 +397,8 @@
      broker.publish(modMsg);
      // Check that the modify has been replayed.
      found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
          "description", "Description was changed", 10000, true);
      assertTrue(found, "The second modification was not replayed.");
      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "description", "Description was changed", 10,
          "The second modification was not replayed.");
      // Delete the entries to clean the database.
      broker.publish(
@@ -546,7 +537,6 @@
    }
  }
  /**
   * Tests the naming conflict resolution code.
   * In this test, the local server act both as an LDAP server and
@@ -600,9 +590,8 @@
    broker.publish(modMsg);
    // check that the modify has been applied as if the entry had been renamed.
    boolean found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
                           "telephonenumber", "01 02 45", 10000, true);
      assertTrue(found, "The modification has not been correctly replayed.");
      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "telephonenumber", "01 02 45", 10,
          "The modification has not been correctly replayed.");
    assertEquals(getMonitorDelta(), 1);
      assertConflictAutomaticallyResolved(alertCount);
@@ -623,9 +612,8 @@
    broker.publish(modMsg);
    // check that the modify has been applied.
    found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
                           "uid", "AnotherUid", 10000, true);
      assertTrue(found, "The modification has not been correctly replayed.");
      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "uid", "AnotherUid", 10,
          "The modification has not been correctly replayed.");
    assertEquals(getMonitorDelta(), 1);
    /*
@@ -655,9 +643,7 @@
    // check that the modify has not been applied
    Thread.sleep(2000);
    found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
                           "telephonenumber", "02 01 03 05", 10000, false);
      assertFalse(found,
      checkEntryHasNoSuchAttributeValue(personWithUUIDEntry.getName(), "telephonenumber", "02 01 03 05", 10,
          "The modification has been replayed while it should not.");
    assertEquals(getMonitorDelta(), 1);
      assertConflictAutomaticallyResolved(alertCount);
@@ -787,10 +773,7 @@
      assertEquals(getMonitorDelta(), 1);
      assertConflictAutomaticallyResolved(alertCount);
    /*
     * same test but by giving a bad entry DN
     */
      /* same test but by giving a bad entry DN */
      DN modDN = DN.valueOf("uid=wrong," + baseDN);
    modDnMsg = new ModifyDNMsg(modDN, gen.newCSN(),
        user1entryUUID, null, false, null, "uid=reallynewrdn");
@@ -948,10 +931,8 @@
        "The conflicting entries were not created");
    // check that the 2 conflicting entries have been correctly marked
    assertTrue(checkEntryHasAttribute(conflictDomain2dn,
        LDAPReplicationDomain.DS_SYNC_CONFLICT, domain2dn.toString(), 1000, true));
    assertTrue(checkEntryHasAttribute(conflictDomain3dn,
        LDAPReplicationDomain.DS_SYNC_CONFLICT, domain3dn.toString(), 1000, true));
      checkEntryHasAttributeValue(conflictDomain2dn, DS_SYNC_CONFLICT, domain2dn.toString(), 1, null);
      checkEntryHasAttributeValue(conflictDomain3dn, DS_SYNC_CONFLICT, domain3dn.toString(), 1, null);
    // check that unresolved conflict count has been incremented
    assertEquals(getMonitorDelta(), 1);
@@ -1005,8 +986,7 @@
      "The conflicting entries were not created");
    // check that the entry have been correctly marked as conflicting.
    assertTrue(checkEntryHasAttribute(conflictDomain2dn,
        LDAPReplicationDomain.DS_SYNC_CONFLICT, domain2dn.toString(), 1000, true));
      checkEntryHasAttributeValue(conflictDomain2dn, DS_SYNC_CONFLICT, domain2dn.toString(), 1, null);
    // check that unresolved conflict count has been incremented
    assertEquals(getMonitorDelta(), 1);
@@ -1031,8 +1011,7 @@
      assertConflictAutomaticallyResolved(alertCount);
    /*
     * Check that a conflict is detected when an entry is
     * moved below an entry that does not exist.
     * Check that a conflict is detected when an entry is moved below an entry that does not exist.
     */
    updateMonitorCount(baseDN, unresolvedMonitorAttr);
      alertCount = DummyAlertHandler.getAlertCount();
@@ -1047,10 +1026,8 @@
      waitForNonZeroMonitorDelta();
      // check that the entry have been correctly marked as conflicting.
      assertTrue(checkEntryHasAttribute(
          DN.valueOf("uid=new person," + baseDN2),
          LDAPReplicationDomain.DS_SYNC_CONFLICT,
          "uid=newrdn," + baseDN2, 1000, true));
      checkEntryHasAttributeValue(
          DN.valueOf("uid=new person," + baseDN2), DS_SYNC_CONFLICT, "uid=newrdn," + baseDN2, 1, null);
    }
    finally
    {
@@ -1060,17 +1037,21 @@
  private void waitForNonZeroMonitorDelta() throws Exception, InterruptedException
  {
    int count = 0;
    while (count < 2000 && getMonitorDelta() == 0)
    {
      // it is possible that the update has not yet been applied
      // wait a short time and try again.
      Thread.sleep(100);
      count++;
    }
    // if the monitor counter did not get incremented after 200sec
    // then something got wrong.
    Assertions.assertThat(count).isLessThan(200);
    TestTimer timer = new TestTimer.Builder()
      .maxSleep(200, SECONDS)
      .sleepTimes(100, MILLISECONDS)
      .toTimer();
    timer.repeatUntilSuccess(new Callable<Void>()
    {
      @Override
      public Void call() throws Exception
      {
        assertNotEquals(getMonitorDelta() , 0);
        return null;
      }
    });
  }
  /**
@@ -1117,9 +1098,7 @@
    setUp();
  }
  /**
   * Tests done using directly the ReplicationBroker interface.
   */
  /** Tests done using directly the ReplicationBroker interface. */
  @Test(enabled=true, dataProvider="assured")
  public void updateOperations(boolean assured) throws Exception
  {
@@ -1194,9 +1173,8 @@
      modMsg.setAssured(assured);
      broker.publish(modMsg);
      boolean found = checkEntryHasAttribute(personWithUUIDEntry.getName(),
          "telephonenumber", "01 02 45", 10000, true);
      assertTrue(found, "The modification has not been correctly replayed.");
      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "telephonenumber", "01 02 45", 10,
          "The modification has not been correctly replayed.");
      // Test that replication is able to add attribute that do
      // not exist in the schema.
@@ -1206,9 +1184,8 @@
      modMsg.setAssured(assured);
      broker.publish(modMsg);
      found = checkEntryHasAttribute(
          personWithUUIDEntry.getName(), "badattribute", "value", 10000, true);
      assertTrue(found, "The modification has not been correctly replayed.");
      checkEntryHasAttributeValue(personWithUUIDEntry.getName(), "badattribute", "value", 10,
          "The modification has not been correctly replayed.");
      /*
       * Test the Reception of Modify Dn Msg
@@ -1274,10 +1251,7 @@
    throw new RuntimeException("Unhandled type: " + msg.getClass());
  }
  /**
   * Test case for
   * [Issue 635] NullPointerException when trying to access non existing entry.
   */
  /** Test case for [Issue 635] NullPointerException when trying to access non existing entry. */
  @Test(enabled=true)
  public void deleteNoSuchObject() throws Exception
  {
@@ -1288,11 +1262,7 @@
    assertEquals(op.getResultCode(), ResultCode.NO_SUCH_OBJECT);
  }
  /**
   * Test case for
   * [Issue 798]  break infinite loop when problems with naming resolution
   *              conflict.
   */
  /** Test case for [Issue 798] break infinite loop when problems with naming resolution conflict. */
  @Test(enabled=true)
  public void infiniteReplayLoop() throws Exception
  {
@@ -1332,7 +1302,7 @@
          "userPassword: password",
          "initials: AA");
      long initialCount = getMonitorAttrValue(baseDN, "replayed-updates");
      final long initialCount = getMonitorAttrValue(baseDN, "replayed-updates");
      // Get the UUID of the test entry.
      Entry resultEntry = getEntry(tmp.getName(), 1, true);
@@ -1344,28 +1314,27 @@
      try
      {
        // Publish a delete message for this test entry.
        DeleteMsg delMsg = new DeleteMsg(tmp.getName(), gen.newCSN(), uuid);
        broker.publish(delMsg);
        broker.publish(new DeleteMsg(tmp.getName(), gen.newCSN(), uuid));
        // Wait for the operation to be replayed.
        long endTime = System.currentTimeMillis() + 5000;
        while (getMonitorAttrValue(baseDN, "replayed-updates") == initialCount &&
             System.currentTimeMillis() < endTime)
        TestTimer timer = new TestTimer.Builder()
          .maxSleep(5, SECONDS)
          .sleepTimes(100, MILLISECONDS)
          .toTimer();
        timer.repeatUntilSuccess(new Callable<Void>()
        {
          Thread.sleep(100);
        }
          @Override
          public Void call() throws Exception
          {
            assertNotEquals(getMonitorAttrValue(baseDN, "replayed-updates"), initialCount);
            return null;
          }
        });
      }
      finally
      {
        ShortCircuitPlugin.deregisterShortCircuit(OperationType.DELETE, "PreParse");
      }
      // If the replication replay loop was detected and broken then the
      // counter will still be updated even though the replay was unsuccessful.
      if (getMonitorAttrValue(baseDN, "replayed-updates") == initialCount)
      {
        fail("Operation was not replayed");
      }
    }
    finally
    {