mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 f73b655466092169abac34833fb628fce1fcdebe
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -62,13 +62,13 @@
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
@@ -80,6 +80,7 @@
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.opends.server.util.Base64;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -132,8 +133,10 @@
  private static final short changelog2ID =  9;
  private static final short changelog3ID = 10;
  private static final String EXAMPLE_DN = "dc=example,dc=com";
  private static int[] replServerPort = new int[20];
  private DN baseDn;
  ReplicationBroker server2 = null;
  ReplicationBroker server3 = null;
@@ -145,7 +148,7 @@
  private void log(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "InitOnLineTests/" + s));
    if (debugEnabled())
    {
@@ -166,11 +169,19 @@
  @BeforeClass
  public void setUp() throws Exception
  {
    super.setUp();
    log("Setup: debugEnabled:" + debugEnabled());
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    baseDn = DN.decode("dc=example,dc=com");
    baseDn = DN.decode(EXAMPLE_DN);
    // This test uses import tasks which do not work with memory backend
    // (like the test backend we use in every tests): backend is disabled then
    // re-enabled and this clears the backend reference and thus the underlying
    // data. So for this particular test, we use a classical backend. Let's
    // clear it.
    ReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    updatedEntries = newLDIFEntries();
@@ -178,21 +189,8 @@
    // to DS to populate the db -
    connection = InternalClientConnection.getRootConnection();
    // Synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Synchro multi-master
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
      + synchroStringDN;
    // Synchro suffix
    synchroServerEntry = null;
    // Add config entries to the current DS server based on :
    // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN
    // Add synchroServerEntry
    // Add replServerEntry
    configureReplication();
    replServerEntry = null;
    taskInitFromS2 = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
@@ -201,7 +199,7 @@
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: dc=example,dc=com",
        "ds-task-initialize-domain-dn: " + EXAMPLE_DN,
        "ds-task-initialize-replica-server-id: " + server2ID);
    taskInitTargetS2 = TestCaseUtils.makeEntry(
@@ -211,7 +209,7 @@
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-domain-dn: dc=example,dc=com",
        "ds-task-initialize-domain-dn: " + EXAMPLE_DN,
        "ds-task-initialize-replica-server-id: " + server2ID);
    taskInitTargetAll = TestCaseUtils.makeEntry(
@@ -221,11 +219,8 @@
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-domain-dn: dc=example,dc=com",
        "ds-task-initialize-domain-dn: " + EXAMPLE_DN,
        "ds-task-initialize-replica-server-id: all");
    replServerEntry = null;
  }
  // Tests that entries have been written in the db
@@ -238,8 +233,8 @@
    {
      int dns = entry.indexOf("dn: ");
      int dne = entry.indexOf("dc=com");
      String dn = entry.substring(dns+4,dne+6);
      int dne = entry.indexOf(EXAMPLE_DN);
      String dn = entry.substring(dns+4,dne+EXAMPLE_DN.length());
      log("Search Entry: " + dn);
@@ -281,9 +276,9 @@
   * Wait a task to be completed and check the expected state and expected
   * stats.
   * @param taskEntry The task to process.
   * @param expectedState The expected state fot this task.
   * @param expectedState The expected state for this task.
   * @param expectedLeft The expected number of entries still to be processed.
   * @param expectedDone The expected numner of entries to be processed.
   * @param expectedDone The expected number of entries to be processed.
   */
  private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
      long expectedLeft, long expectedDone)
@@ -394,7 +389,7 @@
  }
  /**
   * Add to the current DB the entries necessary to the test
   * Add to the current DB the entries necessary to the test.
   */
  private void addTestEntriesToDB()
  {
@@ -437,19 +432,19 @@
    String[] entries =
    {
        "dn: dc=example,dc=com\n"
        "dn: " + EXAMPLE_DN + "\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "dc: example\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
          "dn: ou=People,dc=example,dc=com\n"
          "dn: ou=People," + EXAMPLE_DN + "\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "ou: People\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
          "dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n"
          "dn: cn=Fiona Jensen,ou=people," + EXAMPLE_DN + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
@@ -461,7 +456,7 @@
            new String(bigAttributeValue).getBytes())+"\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
        + "\n",
          "dn: cn=Robert Langman,ou=people,dc=example,dc=com\n"
          "dn: cn=Robert Langman,ou=people," + EXAMPLE_DN + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
@@ -490,7 +485,7 @@
    // Send entries
    try
    {
      RoutableMessage initTargetMessage = new InitializeTargetMessage(
      RoutableMsg initTargetMessage = new InitializeTargetMsg(
          baseDn, server2ID, destinationServerID, requestorID, updatedEntries.length);
      broker.publish(initTargetMessage);
@@ -498,12 +493,12 @@
      {
        log("Broker will pusblish 1 entry: bytes:"+ entry.length());
        EntryMessage entryMsg = new EntryMessage(senderID, destinationServerID,
        EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
            entry.getBytes());
        broker.publish(entryMsg);
      }
      DoneMessage doneMsg = new DoneMessage(senderID, destinationServerID);
      DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
      broker.publish(doneMsg);
      log("Broker " + senderID + " published entries");
@@ -520,7 +515,7 @@
      String[] updatedEntries)
  {
    // Expect the broker to receive the entries
    ReplicationMessage msg;
    ReplicationMsg msg;
    short entriesReceived = 0;
    while (true)
    {
@@ -532,25 +527,25 @@
        if (msg == null)
          break;
        if (msg instanceof InitializeTargetMessage)
        if (msg instanceof InitializeTargetMsg)
        {
          log("Broker " + serverID + " receives InitializeTargetMessage ");
          entriesReceived = 0;
        }
        else if (msg instanceof EntryMessage)
        else if (msg instanceof EntryMsg)
        {
          EntryMessage em = (EntryMessage)msg;
          EntryMsg em = (EntryMsg)msg;
          log("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
          entriesReceived++;
        }
        else if (msg instanceof DoneMessage)
        else if (msg instanceof DoneMsg)
        {
          log("Broker " + serverID + "  receives done ");
          break;
        }
        else if (msg instanceof ErrorMessage)
        else if (msg instanceof ErrorMsg)
        {
          ErrorMessage em = (ErrorMessage)msg;
          ErrorMsg em = (ErrorMsg)msg;
          log("Broker " + serverID + "  receives ERROR "
              + " " + em.getDetails());
          break;
@@ -581,38 +576,26 @@
   * @param changelogId The serverID of the replicationServer to create.
   * @return The new replicationServer.
   */
  private ReplicationServer createChangelogServer(short changelogId)
  private ReplicationServer createChangelogServer(short changelogId, String testCase)
  {
    SortedSet<String> servers = null;
    servers = new TreeSet<String>();
    try
    {
      if (changelogId==changelog1ID)
      {
        if (changelog1!=null)
          return changelog1;
      }
      else if (changelogId==changelog2ID)
      {
        if (changelog2!=null)
          return changelog2;
      }
      else if (changelogId==changelog3ID)
      {
        if (changelog3!=null)
          return changelog3;
      }
      servers.add("localhost:" + getChangelogPort(changelog1ID));
      servers.add("localhost:" + getChangelogPort(changelog2ID));
      servers.add("localhost:" + getChangelogPort(changelog3ID));
      if (changelogId != changelog1ID)
          servers.add("localhost:" + getChangelogPort(changelog1ID));
      if (changelogId != changelog2ID)
          servers.add("localhost:" + getChangelogPort(changelog2ID));
      if (changelogId != changelog3ID)
          servers.add("localhost:" + getChangelogPort(changelog3ID));
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(
            getChangelogPort(changelogId),
            "rsdbdirname" + getChangelogPort(changelogId),
            0,
            changelogId,
            0,
            getChangelogPort(changelogId),
            "initOnlineTest" + getChangelogPort(changelogId) + testCase + "Db",
            0,
            changelogId,
            0,
            100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
@@ -638,14 +621,14 @@
    try
    {
      // suffix synchronized
      String synchroServerStringDN = synchroPluginStringDN;
      String testName = "initOnLineTest";
      String synchroServerLdif =
        "dn: cn=example, cn=domains," + synchroServerStringDN + "\n"
        "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-synchronization-provider\n"
      + "objectC7lass: ds-cfg-synchronization-provider\n"
      + "objectClass: ds-cfg-replication-domain\n"
      + "cn: example\n"
      + "ds-cfg-base-dn: dc=example,dc=com\n"
      + "cn: " + testName + "\n"
      + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
      + "ds-cfg-replication-server: localhost:"
      + getChangelogPort(changelogID)+"\n"
      + "ds-cfg-server-id: " + server1ID + "\n"
@@ -653,27 +636,20 @@
//    + "ds-cfg-heartbeat-interval: 0 ms\n"
      + "ds-cfg-window-size: " + WINDOW_SIZE;
      if (synchroServerEntry == null)
      {
        synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
      // Clear the backend
      ReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
      synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        super.configEntryList.add(synchroServerEntry.getDN());
      configEntryList.add(synchroServerEntry.getDN());
        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
      replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
        // Clear the backend
        ReplicationDomain.clearJEBackend(false,
            replDomain.getBackend().getBackendID(),
            baseDn.toNormalizedString());
      }
      if (replDomain != null)
      {
         assertTrue(!replDomain.ieRunning(),
           "ReplicationDomain: Import/Export is not expected to be running");
      }
      assertTrue(!replDomain.ieRunning(),
        "ReplicationDomain: Import/Export is not expected to be running");
    }
    catch(Exception e)
    {
@@ -708,19 +684,19 @@
  @Test(enabled=true, groups="slow")
  public void initializeImport() throws Exception
  {
    String testCase = "InitializeImport";
    String testCase = "initializeImport";
    log("Starting "+testCase);
    try
    {
      changelog1 = createChangelogServer(changelog1ID);
      changelog1 = createChangelogServer(changelog1ID, testCase);
      // Connect DS to the replicationServer
      connectServer1ToChangelog(changelog1ID);
      if (server2 == null)
        server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
      Thread.sleep(2000);
@@ -729,13 +705,13 @@
      addTask(taskInitFromS2, ResultCode.SUCCESS, null);
      // S2 should receive init msg
      ReplicationMessage msg;
      ReplicationMsg msg;
      msg = server2.receive();
      if (!(msg instanceof InitializeRequestMessage))
      if (!(msg instanceof InitializeRequestMsg))
      {
        fail(testCase + " Message received by S2 is of unexpected class" + msg);
      }
      InitializeRequestMessage initMsg = (InitializeRequestMessage)msg;
      InitializeRequestMsg initMsg = (InitializeRequestMsg)msg;
      // S2 publishes entries to S1
      makeBrokerPublishEntries(server2, server2ID, initMsg.getsenderID(),
@@ -748,13 +724,14 @@
      // Test import result in S1
      testEntriesInDb();
      afterTest();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
    }
  }
@@ -764,32 +741,36 @@
  @Test(enabled=true, groups="slow")
  public void initializeExport() throws Exception
  {
    String testCase = "Replication/InitializeExport";
    String testCase = "initializeExport";
    log("Starting "+testCase);
    try
    {
      changelog1 = createChangelogServer(changelog1ID, testCase);
    changelog1 = createChangelogServer(changelog1ID);
      // Connect DS to the replicationServer
      connectServer1ToChangelog(changelog1ID);
    // Connect DS to the replicationServer
    connectServer1ToChangelog(changelog1ID);
      addTestEntriesToDB();
    addTestEntriesToDB();
      if (server2 == null)
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    if (server2 == null)
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
      Thread.sleep(3000);
    Thread.sleep(3000);
    InitializeRequestMessage initMsg = new InitializeRequestMessage(baseDn,
      InitializeRequestMsg initMsg = new InitializeRequestMsg(baseDn,
        server2ID, server1ID);
    server2.publish(initMsg);
      server2.publish(initMsg);
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
    afterTest();
    log("Successfully ending "+testCase);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
    }
}
  /**
@@ -798,38 +779,42 @@
  @Test(enabled=true, groups="slow")
  public void initializeTargetExport() throws Exception
  {
    String testCase = "Replication/InitializeTargetExport";
    String testCase = "initializeTargetExport";
    log("Starting " + testCase);
    try
    {
    changelog1 = createChangelogServer(changelog1ID);
      changelog1 = createChangelogServer(changelog1ID, testCase);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
    // Add in S1 the entries to be exported
    addTestEntriesToDB();
      // Add in S1 the entries to be exported
      addTestEntriesToDB();
    // S1 is the server we are running in, S2 is simulated by a broker
    if (server2 == null)
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
      // S1 is the server we are running in, S2 is simulated by a broker
      if (server2 == null)
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    Thread.sleep(1000);
      Thread.sleep(1000);
    // Launch in S1 the task that will initialize S2
    addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
      // Launch in S1 the task that will initialize S2
      addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
    // Wait for task completion
    waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
      // Wait for task completion
      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
      // Tests that entries have been received by S2
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
    afterTest();
    log("Successfully ending " + testCase);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
    }
  }
  /**
@@ -838,43 +823,46 @@
  @Test(enabled=true, groups="slow")
  public void initializeTargetExportAll() throws Exception
  {
    String testCase = "Replication/InitializeTargetExportAll";
    String testCase = "initializeTargetExportAll";
    log("Starting " + testCase);
    changelog1 = createChangelogServer(changelog1ID);
    try
    {
      changelog1 = createChangelogServer(changelog1ID, testCase);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
    // Add in S1 the entries to be exported
    addTestEntriesToDB();
      // Add in S1 the entries to be exported
      addTestEntriesToDB();
    // S1 is the server we are running in, S2 and S3 are simulated by brokers
    if (server2==null)
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
      // S1 is the server we are running in, S2 and S3 are simulated by brokers
      if (server2 == null)
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    if (server3==null)
    server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
      if (server3 == null)
        server3 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    Thread.sleep(1000);
      Thread.sleep(1000);
    // Launch in S1 the task that will initialize S2
    addTask(taskInitTargetAll, ResultCode.SUCCESS, null);
      // Launch in S1 the task that will initialize S2
      addTask(taskInitTargetAll, ResultCode.SUCCESS, null);
    // Wait for task completion
    waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
      // Wait for task completion
      waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    receiveUpdatedEntries(server3, server3ID, updatedEntries);
      // Tests that entries have been received by S2
      receiveUpdatedEntries(server2, server2ID, updatedEntries);
      receiveUpdatedEntries(server3, server3ID, updatedEntries);
    afterTest();
    log("Successfully ending " + testCase);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
    }
  }
 /**
@@ -883,18 +871,18 @@
  @Test(enabled=true, groups="slow")
  public void initializeTargetImport() throws Exception
  {
    String testCase = "InitializeTargetImport";
    String testCase = "initializeTargetImport";
    try
    {
      log("Starting " + testCase + " debugEnabled:" + debugEnabled());
      // Start SS
      changelog1 = createChangelogServer(changelog1ID);
      changelog1 = createChangelogServer(changelog1ID, testCase);
      // S1 is the server we are running in, S2 is simulated by a broker
      if (server2==null)
        server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
      // Creates config to synchronize suffix
@@ -908,13 +896,14 @@
      // Test that entries have been imported in S1
      testEntriesInDb();
      afterTest();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
    }
  }
@@ -962,13 +951,14 @@
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
      afterTest();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
    }
  }
@@ -978,14 +968,14 @@
  @Test(enabled=true)
  public void initializeConfigErrors() throws Exception
  {
    String testCase = "InitializeConfigErrors";
    String testCase = "initializeConfigErrors";
    try
    {
      log("Starting " + testCase);
      // Start SS
      changelog1 = createChangelogServer(changelog1ID);
      changelog1 = createChangelogServer(changelog1ID, testCase);
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
@@ -1031,13 +1021,14 @@
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
      afterTest();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest();
    }
  }
@@ -1066,73 +1057,72 @@
  @Test(enabled=true, groups="slow")
  public void testReplServerInfos() throws Exception
  {
    String testCase = "Replication/TestReplServerInfos";
    String testCase = "testReplServerInfos";
    log("Starting " + testCase);
    // Create the Repl Servers
    changelog1 = createChangelogServer(changelog1ID);
    changelog2 = createChangelogServer(changelog2ID);
    changelog3 = createChangelogServer(changelog3ID);
    ReplicationBroker broker2 = null;
    ReplicationBroker broker3 = null;
    try
    {
      // Create the Repl Servers
      changelog1 = createChangelogServer(changelog1ID, testCase);
      changelog2 = createChangelogServer(changelog2ID, testCase);
      changelog3 = createChangelogServer(changelog3ID, testCase);
    // Connects lDAP1 to replServer1
    connectServer1ToChangelog(changelog1ID);
    // Connects lDAP2 to replServer2
    ReplicationBroker broker2 =
      openReplicationSession(DN.decode("dc=example,dc=com"),
      // Connects lDAP1 to replServer1
      connectServer1ToChangelog(changelog1ID);
      // Connects lDAP2 to replServer2
      broker2 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    // Connects lDAP3 to replServer2
    ReplicationBroker broker3 =
      openReplicationSession(DN.decode("dc=example,dc=com"),
      // Connects lDAP3 to replServer2
      broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    // Check that the list of connected LDAP servers is correct
    // in each replication servers
    List<String> l1 = changelog1.getReplicationServerDomain(baseDn, false).
      getConnectedLDAPservers();
    assertEquals(l1.size(), 1);
    assertEquals(l1.get(0), String.valueOf(server1ID));
    List<String> l2;
      // Check that the list of connected LDAP servers is correct
      // in each replication servers
      List<String> l1 = changelog1.getReplicationServerDomain(baseDn, false).
        getConnectedLDAPservers();
      assertEquals(l1.size(), 1);
      assertEquals(l1.get(0), String.valueOf(server1ID));
      List<String> l2;
    l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 2);
    assertTrue(l2.contains(String.valueOf(server2ID)));
    assertTrue(l2.contains(String.valueOf(server3ID)));
    List<String> l3;
      assertEquals(l2.size(), 2);
      assertTrue(l2.contains(String.valueOf(server2ID)));
      assertTrue(l2.contains(String.valueOf(server3ID)));
      List<String> l3;
    l3 = changelog3.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
    assertEquals(l3.size(), 0);
      assertEquals(l3.size(), 0);
    // Test updates
    broker3.stop();
    Thread.sleep(1000);
      // Test updates
      broker3.stop();
      Thread.sleep(1000);
    l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server2ID));
      assertEquals(l2.size(), 1);
      assertEquals(l2.get(0), String.valueOf(server2ID));
    broker3 = openReplicationSession(DN.decode("dc=example,dc=com"),
      broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    broker2.stop();
    Thread.sleep(1000);
      broker2.stop();
      Thread.sleep(1000);
    l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server3ID));
      assertEquals(l2.size(), 1);
      assertEquals(l2.get(0), String.valueOf(server3ID));
    // TODO Test ReplicationServerDomain.getDestinationServers method.
    broker2.stop();
    broker3.stop();
    afterTest();
    changelog3.shutdown();
    changelog3 = null;
    changelog2.shutdown();
    changelog2 = null;
    changelog1.shutdown();
    changelog1 = null;
    } finally
    {
      if (broker2 != null)
        broker2.stop();
      if (broker3 != null)
        broker3.stop();
      afterTest();
    }
  }
  
  @Test(enabled=true, groups="slow")
@@ -1140,14 +1130,14 @@
  {
    try
    {
      String testCase = "Replication/InitializeTargetExportMultiSS";
      String testCase = "initializeTargetExportMultiSS";
      log("Starting " + testCase);
      // Create 2 changelogs
      changelog1 = createChangelogServer(changelog1ID);
      changelog1 = createChangelogServer(changelog1ID, testCase);
      changelog2 = createChangelogServer(changelog2ID);
      changelog2 = createChangelogServer(changelog2ID, testCase);
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
@@ -1159,7 +1149,7 @@
      // connected to changelog2
      if (server2 == null)
      {
        server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
            server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      }
@@ -1179,178 +1169,183 @@
    finally
    {
      afterTest();
      changelog2.shutdown();
      changelog2 = null;
    }
  }
  @Test(enabled=true, groups="slow")
  public void initializeExportMultiSS() throws Exception
  {
    String testCase = "Replication/InitializeExportMultiSS";
    String testCase = "initializeExportMultiSS";
    log("Starting "+testCase);
    // Create 2 changelogs
    changelog1 = createChangelogServer(changelog1ID);
    Thread.sleep(1000);
    changelog2 = createChangelogServer(changelog2ID);
    Thread.sleep(1000);
    // Connect DS to the replicationServer 1
    connectServer1ToChangelog(changelog1ID);
    // Put entries in DB
    log(testCase + " Will add entries");
    addTestEntriesToDB();
    // Connect a broker acting as server 2 to Repl Server 2
    if (server2 == null)
    try
    {
      log(testCase + " Will connect server 2 to " + changelog2ID);
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog2ID),
        1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
    }
      // Create 2 changelogs
      changelog1 = createChangelogServer(changelog1ID, testCase);
      Thread.sleep(1000);
    // Connect a broker acting as server 3 to Repl Server 3
    log(testCase + " Will create replServer " + changelog3ID);
    changelog3 = createChangelogServer(changelog3ID);
    Thread.sleep(500);
    if (server3 == null)
    {
      log(testCase + " Will connect server 3 to " + changelog3ID);
      server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog3ID),
        1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
    }
      changelog2 = createChangelogServer(changelog2ID, testCase);
      Thread.sleep(1000);
    Thread.sleep(500);
      // Connect DS to the replicationServer 1
      connectServer1ToChangelog(changelog1ID);
    // S3 sends init request
    log(testCase + " server 3 Will send reqinit to " + server1ID);
    InitializeRequestMessage initMsg =
      new InitializeRequestMessage(baseDn, server3ID, server1ID);
    server3.publish(initMsg);
      // Put entries in DB
      log(testCase + " Will add entries");
      addTestEntriesToDB();
    // S3 should receive target, entries & done
    log(testCase + " Will verify server 3 has received expected entries");
    receiveUpdatedEntries(server3, server3ID, updatedEntries);
    while(true)
    {
      try
      // Connect a broker acting as server 2 to Repl Server 2
      if (server2 == null)
      {
        ReplicationMessage msg = server3.receive();
        fail("Receive unexpected message " + msg);
        log(testCase + " Will connect server 2 to " + changelog2ID);
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server2ID, 100, getChangelogPort(changelog2ID),
          1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
      }
      catch(SocketTimeoutException e)
      // Connect a broker acting as server 3 to Repl Server 3
      log(testCase + " Will create replServer " + changelog3ID);
      changelog3 = createChangelogServer(changelog3ID, testCase);
      Thread.sleep(500);
      if (server3 == null)
      {
        // Test is a success
        break;
        log(testCase + " Will connect server 3 to " + changelog3ID);
        server3 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server3ID, 100, getChangelogPort(changelog3ID),
          1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
      }
      Thread.sleep(500);
      // S3 sends init request
      log(testCase + " server 3 Will send reqinit to " + server1ID);
      InitializeRequestMsg initMsg =
        new InitializeRequestMsg(baseDn, server3ID, server1ID);
      server3.publish(initMsg);
      // S3 should receive target, entries & done
      log(testCase + " Will verify server 3 has received expected entries");
      receiveUpdatedEntries(server3, server3ID, updatedEntries);
      while (true)
      {
        try
        {
          ReplicationMsg msg = server3.receive();
          fail("Receive unexpected message " + msg);
        } catch (SocketTimeoutException e)
        {
          // Test is a success
          break;
        }
      }
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
    }
    afterTest();
    changelog3.shutdown();
    changelog3 = null;
    changelog2.shutdown();
    changelog2 = null;
    log("Successfully ending "+testCase);
  }
  @Test(enabled=false)
  public void initializeNoSource() throws Exception
  {
    String testCase = "InitializeNoSource";
    String testCase = "initializeNoSource";
    log("Starting "+testCase);
    // Start Replication Server
    changelog1 = createChangelogServer(changelog1ID);
    try
    {
      // Start Replication Server
      changelog1 = createChangelogServer(changelog1ID, testCase);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
    // Test 1
    Entry taskInit = TestCaseUtils.makeEntry(
      // Test 1
      Entry taskInit = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-domain-dn: " + baseDn,
        "ds-task-initialize-replica-server-id: " + 20);
    addTask(taskInit, ResultCode.SUCCESS, null);
      addTask(taskInit, ResultCode.SUCCESS, null);
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
      waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
    // Test 2
    taskInit = TestCaseUtils.makeEntry(
      // Test 2
      taskInit = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-domain-dn: " + baseDn,
        "ds-task-initialize-replica-server-id: " + server1ID);
    addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
      addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
    if (replDomain != null)
      if (replDomain != null)
      {
        assertTrue(!replDomain.ieRunning(),
          "ReplicationDomain: Import/Export is not expected to be running");
      }
      log("Successfully ending " + testCase);
    } finally
    {
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
      afterTest();
    }
    log("Successfully ending "+testCase);
  }
  @Test(enabled=false)
  public void initializeTargetNoTarget() throws Exception
  {
    String testCase = "InitializeTargetNoTarget"  + baseDn;
    String testCase = "initializeTargetNoTarget"  + baseDn;
    log("Starting "+testCase);
    // Start SS
    changelog1 = createChangelogServer(changelog1ID);
    try
    {
      // Start SS
      changelog1 = createChangelogServer(changelog1ID, testCase);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
    // Put entries in DB
    addTestEntriesToDB();
      // Put entries in DB
      addTestEntriesToDB();
    Entry taskInit = TestCaseUtils.makeEntry(
      Entry taskInit = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-domain-dn: " + baseDn,
        "ds-task-initialize-replica-server-id: " + 0);
    addTask(taskInit, ResultCode.SUCCESS, null);
      addTask(taskInit, ResultCode.SUCCESS, null);
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
      waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
    if (replDomain != null)
    {
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
    }
      if (replDomain != null)
      {
        assertTrue(!replDomain.ieRunning(),
          "ReplicationDomain: Import/Export is not expected to be running");
      }
    log("Successfully ending "+testCase);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
    }
  }
  @Test(enabled=false)
@@ -1378,73 +1373,76 @@
    fail(testCase + " NYI");
  }
  @Test(enabled=false)
  @Test(enabled = false)
  public void initializeSimultaneous() throws Exception
  {
    String testCase = "InitializeSimultaneous";
    String testCase = "initializeSimultaneous";
    // Start SS
    changelog1 = createChangelogServer(changelog1ID);
    // Connect a broker acting as server 2 to changelog2
    if (server2 == null)
    try
    {
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID),
        1000, emptyOldChanges);
    }
      // Start SS
      changelog1 = createChangelogServer(changelog1ID, testCase);
    // Creates config to synchronize suffix
    connectServer1ToChangelog(changelog1ID);
      // Connect a broker acting as server 2 to changelog2
      if (server2 == null)
      {
        server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
          server2ID, 100, getChangelogPort(changelog1ID),
          1000, emptyOldChanges);
      }
    Entry taskInit = TestCaseUtils.makeEntry(
      // Creates config to synchronize suffix
      connectServer1ToChangelog(changelog1ID);
      Entry taskInit = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-domain-dn: " + baseDn,
        "ds-task-initialize-replica-server-id: " + server2ID);
    addTask(taskInit, ResultCode.SUCCESS, null);
      addTask(taskInit, ResultCode.SUCCESS, null);
    Thread.sleep(3000);
      Thread.sleep(3000);
    Entry taskInit2 = TestCaseUtils.makeEntry(
      Entry taskInit2 = TestCaseUtils.makeEntry(
        "dn: ds-task-id=" + UUID.randomUUID() +
        ",cn=Scheduled Tasks,cn=Tasks",
        "objectclass: top",
        "objectclass: ds-task",
        "objectclass: ds-task-initialize-from-remote-replica",
        "ds-task-class-name: org.opends.server.tasks.InitializeTask",
        "ds-task-initialize-domain-dn: "+baseDn,
        "ds-task-initialize-domain-dn: " + baseDn,
        "ds-task-initialize-replica-server-id: " + server2ID);
    // Second task is expected to be rejected
    addTask(taskInit2, ResultCode.SUCCESS, null);
      // Second task is expected to be rejected
      addTask(taskInit2, ResultCode.SUCCESS, null);
    waitTaskState(taskInit2, TaskState.STOPPED_BY_ERROR,
      waitTaskState(taskInit2, TaskState.STOPPED_BY_ERROR,
        ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get());
    // First task is stilll running
    waitTaskState(taskInit, TaskState.RUNNING, null);
      // First task is stilll running
      waitTaskState(taskInit, TaskState.RUNNING, null);
    // External request is supposed to be rejected
      // External request is supposed to be rejected
    // Now tests error in the middle of an import
    // S2 sends init request
    ErrorMessage msg =
      new ErrorMessage(server1ID, (short) 1, Message.EMPTY);
    server2.publish(msg);
      // Now tests error in the middle of an import
      // S2 sends init request
      ErrorMsg msg =
        new ErrorMsg(server1ID, (short) 1, Message.EMPTY);
      server2.publish(msg);
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
      waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        null);
    afterTest();
    log("Successfully ending "+testCase);
      log("Successfully ending " + testCase);
    } finally
    {
      afterTest();
    }
  }
  /**
@@ -1453,10 +1451,10 @@
  protected void afterTest()
  {
    // Check that the domain hsa completed the import/export task.
    // Check that the domain has completed the import/export task.
    if (replDomain != null)
    {
      // race condition could cause the main thread to reach
      // race condition could cause the main thread to reach
      // this code before the task is fully completed.
      // in those cases, loop for a while waiting for completion.
      for (int i = 0; i< 10; i++)
@@ -1477,6 +1475,9 @@
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
    }
    // Remove domain config
    super.cleanConfigEntries();
    replDomain = null;
    // Clean brokers
    if (server2 != null)
@@ -1494,14 +1495,50 @@
      server3 = null;
    }
    super.cleanRealEntries();
    if (changelog1 != null)
      changelog1.clearDb();
    {
        changelog1.clearDb();
        changelog1.remove();
        changelog1 = null;
    }
    if (changelog2 != null)
      changelog2.clearDb();
    {
        changelog2.clearDb();
        changelog2.remove();
        changelog2 = null;
    }
    if (changelog3 != null)
      changelog3.clearDb();
    {
        changelog3.clearDb();
        changelog3.remove();
        changelog3 = null;
    }
    // Clean replication server ports
    for (int i = 0; i < replServerPort.length; i++)
    {
      replServerPort[i] = 0;
    }
  }
    /**
   * Clean up the environment.
   *
   * @throws Exception If the environment could not be set up.
   */
  @AfterClass
  @Override
  public void classCleanUp() throws Exception
  {
    callParanoiaCheck = false;
    super.classCleanUp();
    // Clear the backend
    ReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    paranoiaCheck();
  }
}