mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -48,6 +48,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.net.SocketTimeoutException;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
@@ -124,14 +125,14 @@
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  boolean externalDS = false;
  private static final short server1ID = 11;
  private static final short server2ID = 21;
  private static final short server3ID = 31;
  private static final short changelog1ID = 1;
  private static final short changelog2ID = 2;
  private static final short changelog3ID = 3;
  private static final short server1ID = 1;
  private static final short server2ID = 2;
  private static final short server3ID = 3;
  private static final short changelog1ID =  8;
  private static final short changelog2ID =  9;
  private static final short changelog3ID = 10;
  private static int[] replServerPort = new int[4];
  private static int[] replServerPort = new int[20];
  
  private DN baseDn;
  ReplicationBroker server2 = null;
@@ -140,7 +141,7 @@
  ReplicationServer changelog2 = null;
  ReplicationServer changelog3 = null;
  boolean emptyOldChanges = true;
  ReplicationDomain sd = null;
  ReplicationDomain replDomain = null;
  private void log(String s)
  {
@@ -169,7 +170,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    baseDn = DN.decode("dc=example,dc=com");
    updatedEntries = newLDIFEntries();
@@ -756,9 +756,14 @@
      servers.add("localhost:" + getChangelogPort(changelog2ID));
      servers.add("localhost:" + getChangelogPort(changelog3ID));
      int chPort = getChangelogPort(changelogId);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
        new ReplServerFakeConfiguration(
            getChangelogPort(changelogId),
            "rsdbdirname" + getChangelogPort(changelogId),
            0,
            changelogId,
            0,
            100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      Thread.sleep(1000);
@@ -804,17 +809,18 @@
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        sd = ReplicationDomain.retrievesReplicationDomain(baseDn);
        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
        // Clear the backend
        ReplicationDomain.clearJEBackend(false,
            sd.getBackend().getBackendID(),
            replDomain.getBackend().getBackendID(),
            baseDn.toNormalizedString());
      }
      if (sd != null)
      if (replDomain != null)
      {
         log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
         assertTrue(!replDomain.ieRunning(),
           "ReplicationDomain: Import/Export is not expected to be running");
      }
    }
    catch(Exception e)
@@ -890,7 +896,7 @@
      // Test import result in S1
      testEntriesInDb();
      cleanEntries();
      afterTest();
      log("Successfully ending " + testCase);
    }
@@ -929,7 +935,7 @@
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    afterTest();
    log("Successfully ending "+testCase);
}
@@ -968,7 +974,7 @@
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    afterTest();
    log("Successfully ending " + testCase);
@@ -1012,7 +1018,7 @@
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    receiveUpdatedEntries(server3, server3ID, updatedEntries);
    cleanEntries();
    afterTest();
    log("Successfully ending " + testCase);
@@ -1049,7 +1055,7 @@
      // Test that entries have been imported in S1
      testEntriesInDb();
      cleanEntries();
      afterTest();
      log("Successfully ending " + testCase);
    }
@@ -1103,7 +1109,7 @@
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
      cleanEntries();
      afterTest();
      log("Successfully ending " + testCase);
    }
@@ -1172,7 +1178,7 @@
      // Scope containing a serverID absent from the domain
      // createTask(taskInitTargetS2);
      cleanEntries();
      afterTest();
      log("Successfully ending " + testCase);
    }
@@ -1231,25 +1237,25 @@
    // Check that the list of connected LDAP servers is correct
    // in each replication servers
    List<String> l1 = changelog1.getReplicationCache(baseDn).
    List<String> l1 = changelog1.getReplicationCache(baseDn, false).
      getConnectedLDAPservers();
    assertEquals(l1.size(), 1);
    assertEquals(l1.get(0), String.valueOf(server1ID));
    
    List<String> l2;
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 2);
    assertEquals(l2.get(0), String.valueOf(server3ID));
    assertEquals(l2.get(1), String.valueOf(server2ID));
        
    List<String> l3;
    l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers();
    l3 = changelog3.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    assertEquals(l3.size(), 0);
    // Test updates
    broker3.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server2ID));
@@ -1257,7 +1263,7 @@
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    broker2.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server3ID));
@@ -1266,7 +1272,7 @@
    broker2.stop();
    broker3.stop();
    cleanEntries();
    afterTest();
    changelog3.shutdown();
    changelog3 = null;
@@ -1313,7 +1319,7 @@
    // Tests that entries have been received by S2
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    cleanEntries();
    afterTest();
    changelog2.shutdown();
    changelog2 = null;
@@ -1335,42 +1341,64 @@
    changelog2 = createChangelogServer(changelog2ID);
    Thread.sleep(1000);
    changelog3 = createChangelogServer(changelog3ID);
    Thread.sleep(1000);
    // Connect DS to the replicationServer 1
    connectServer1ToChangelog(changelog1ID);
    // Put entries in DB
    log(testCase + " Will add entries");
    addTestEntriesToDB();
    // Connect a broker acting as server 2 to Repl Server 2
    if (server2 == null)
    {
      log(testCase + " Will connect server 2 to " + changelog2ID);
      server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog2ID),
        1000, emptyOldChanges);
        1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
    }
    // Connect a broker acting as server 3 to Repl Server 3
    log(testCase + " Will create replServer " + changelog3ID);
    changelog3 = createChangelogServer(changelog3ID);
    Thread.sleep(500);
    if (server3 == null)
    {
      log(testCase + " Will connect server 3 to " + changelog3ID);
      server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog3ID),
        1000, emptyOldChanges);
        1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
    }
    Thread.sleep(3000);
    Thread.sleep(500);
    // S2 sends init request
    // S3 sends init request
    log(testCase + " server 3 Will send reqinit to " + server1ID);
    InitializeRequestMessage initMsg =
      new InitializeRequestMessage(baseDn, server2ID, server1ID);
    server2.publish(initMsg);
      new InitializeRequestMessage(baseDn, server3ID, server1ID);
    server3.publish(initMsg);
    // S2 should receive target, entries & done
    receiveUpdatedEntries(server2, server2ID, updatedEntries);
    // S3 should receive target, entries & done
    log(testCase + " Will verify server 3 has received expected entries");
    receiveUpdatedEntries(server3, server3ID, updatedEntries);
    cleanEntries();
    while(true)
    {
      try
      {
        ReplicationMessage msg = server3.receive();
        fail("Receive unexpected message " + msg);
      }
      catch(SocketTimeoutException e)
      {
        // Test is a success
        break;
      }
    }
    afterTest();
    changelog3.shutdown();
    changelog3 = null;
    changelog2.shutdown();
    changelog2 = null;
@@ -1419,9 +1447,10 @@
    addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
    if (sd != null)
    if (replDomain != null)
    {
       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
    }
    log("Successfully ending "+testCase);
@@ -1458,9 +1487,10 @@
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
    if (sd != null)
    if (replDomain != null)
    {
       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
    }
    log("Successfully ending "+testCase);
@@ -1554,7 +1584,7 @@
    waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
        null);
    cleanEntries();
    afterTest();
    log("Successfully ending "+testCase);
@@ -1563,23 +1593,30 @@
  /**
   * Disconnect broker and remove entries from the local DB
   */
  protected void cleanEntries()
  protected void afterTest()
  {
    if (sd != null)
    if (replDomain != null)
    {
       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
    }
    // Clean brokers
    if (server2 != null)
    {
      server2.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      // from the replicationServer.
      server2 = null;
    }
    if (server3 != null)
    {
      server3.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      // from the replicationServer.
      server3 = null;
    }
    super.cleanRealEntries();
  }
}