mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 f73b655466092169abac34833fb628fce1fcdebe
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -49,13 +49,12 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.Attribute;
@@ -63,6 +62,7 @@
import org.opends.server.types.Entry;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
/**
 * Tests for the replicationServer code.
@@ -73,18 +73,17 @@
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  private static final String baseDnStr = "dc=example,dc=com";
  private static final String baseSnStr = "genidcom";
  private static final String baseDnStr = TEST_ROOT_DN_STRING;
  private static final String testName = "monitorTest";
  private static final int   WINDOW_SIZE = 10;
  private static final int   CHANGELOG_QUEUE_SIZE = 100;
  private static final short server1ID = 1;
  private static final short server2ID = 2;
  private static final short server3ID = 3;
  private static final short server4ID = 4;
  private static final short changelog1ID = 11;
  private static final short changelog2ID = 12;
  private static final short changelog3ID = 13;
  private static final short changelog1ID = 21;
  private static final short changelog2ID = 22;
  private static final short changelog3ID = 23;
  private DN baseDn;
  private ReplicationBroker broker2 = null;
@@ -99,7 +98,7 @@
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  private static int[] replServerPort = new int[20];
  private static int[] replServerPort = new int[30];
  private void debugInfo(String s)
  {
@@ -123,47 +122,11 @@
  @BeforeClass
  public void setUp() throws Exception
  {
    //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled());
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    super.setUp();
    baseDn = DN.decode(baseDnStr);
    updatedEntries = newLDIFEntries();
    // Create an internal connection in order to provide operations
    // to DS to populate the db -
    connection = InternalClientConnection.getRootConnection();
    // Synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Synchro multi-master
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
      + synchroStringDN;
    // Synchro suffix
    synchroServerEntry = null;
    // Add config entries to the current DS server based on :
    // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN
    // Add synchroServerEntry
    // Add replServerEntry
    configureReplication();
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
    + "ds-cfg-changelog-server-id: 1\n"
    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    replServerEntry = null;
  }
  /*
@@ -175,7 +138,7 @@
    {
        "dn: " + baseDn + "\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "objectClass: organization\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
        "dn: ou=People," + baseDn + "\n"
@@ -213,7 +176,7 @@
  /**
   * Creates a new replicationServer.
   * @param changelogId The serverID of the replicationServer to create.
   * @param all         Specifies whether to coonect the created replication
   * @param all         Specifies whether to connect the created replication
   *                    server to the other replication servers in the test.
   * @return The new created replication server.
   */
@@ -224,28 +187,15 @@
    servers = new TreeSet<String>();
    try
    {
      if (changelogId==changelog1ID)
      {
        if (replServer1!=null)
          return replServer1;
      }
      else if (changelogId==changelog2ID)
      {
        if (replServer2!=null)
          return replServer2;
      }
      else if (changelogId==changelog3ID)
      {
        if (replServer3!=null)
          return replServer3;
      }
      if (all)
      {
        servers.add("localhost:" + getChangelogPort(changelog1ID));
        servers.add("localhost:" + getChangelogPort(changelog2ID));
        if (changelogId != changelog1ID)
          servers.add("localhost:" + getChangelogPort(changelog1ID));
        if (changelogId != changelog2ID)
          servers.add("localhost:" + getChangelogPort(changelog2ID));
      }
      int chPort = getChangelogPort(changelogId);
      String chDir = "genid"+changelogId+suffix+"Db";
      String chDir = "monitorTest"+changelogId+suffix+"Db";
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100,
            servers);
@@ -253,7 +203,6 @@
      Thread.sleep(1000);
      return replicationServer;
    }
    catch (Exception e)
    {
@@ -273,12 +222,11 @@
    try
    {
      // suffix synchronized
      String synchroServerStringDN = synchroPluginStringDN;
      String synchroServerLdif =
        "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n"
        "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-replication-domain\n"
        + "cn: " + baseSnStr + "\n"
        + "cn: " + testName + "\n"
        + "ds-cfg-base-dn: " + baseDnStr + "\n"
        + "ds-cfg-replication-server: localhost:"
        + getChangelogPort(changelogID)+"\n"
@@ -286,26 +234,24 @@
        + "ds-cfg-receive-status: true\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
      if (synchroServerEntry == null)
      {
        synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
      synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        configEntryList.add(synchroServerEntry.getDN());
      configEntryList.add(synchroServerEntry.getDN());
        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
      replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
      }
      if (replDomain != null)
      {
        debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning());
        debugInfo("ReplicationDomain: Import/Export is running ? " +
          replDomain.ieRunning());
      }
    }
    catch(Exception e)
    {
      debugInfo("connectToReplServer", e);
      fail("connectToReplServer", e);
      fail("connectToReplServer: " + e.getMessage() + " : " + e.getStackTrace(), e);
    }
  }
@@ -317,18 +263,20 @@
    try
    {
      // suffix synchronized
      String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," +
      synchroPluginStringDN;
      if (synchroServerEntry != null)
      {
        DN synchroServerDN = DN.decode(synchroServerStringDN);
        DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null);
        assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null,
        "Unable to delete the synchronized domain");
        synchroServerEntry = null;
      String synchroServerStringDN = "cn=" + testName + ", cn=domains," +
      SYNCHRO_PLUGIN_DN;
      // Must have called connectServer1ToChangelog previously
      assertTrue(synchroServerEntry != null);
        configEntryList.remove(configEntryList.indexOf(synchroServerDN));
      }
      DN synchroServerDN = DN.decode(synchroServerStringDN);
      DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN, null);
      assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()) ==
        null,
        "Unable to delete the synchronized domain");
      synchroServerEntry = null;
      configEntryList.remove(configEntryList.indexOf(synchroServerDN));
    }
    catch(Exception e)
    {
@@ -376,7 +324,7 @@
        + "userPassword: password\n" + "initials: AA\n");
  }
  static protected ReplicationMessage createAddMsg(short serverId)
  static protected ReplicationMsg createAddMsg(short serverId)
  {
    Entry personWithUUIDEntry = null;
    String user1entryUUID;
@@ -427,89 +375,93 @@
    return addMsg;
  }
  @Test(enabled=false)
  @Test(enabled=true)
  public void testMultiRS() throws Exception
  {
    String testCase = "testMultiRS";
    debugInfo("Starting " + testCase);
    ReplicationDomain.clearJEBackend(false,
        "userRoot",
        baseDn.toNormalizedString());
    debugInfo ("Creating 2 RS");
    replServer1 = createReplicationServer(changelog1ID, true, testCase);
    replServer2 = createReplicationServer(changelog2ID, true, testCase);
    replServer3 = createReplicationServer(changelog3ID, true, testCase);
    Thread.sleep(500);
    debugInfo("Connecting DS to replServer1");
    connectServer1ToChangelog(changelog1ID);
    Thread.sleep(1500);
    try
    {
      debugInfo("Connecting broker2 to replServer1");
      broker2 = openReplicationSession(baseDn,
      TestCaseUtils.initializeTestBackend(false);
      debugInfo("Creating 2 RS");
      replServer1 = createReplicationServer(changelog1ID, true, testCase);
      replServer2 = createReplicationServer(changelog2ID, true, testCase);
      replServer3 = createReplicationServer(changelog3ID, true, testCase);
      Thread.sleep(500);
      debugInfo("Connecting DS to replServer1");
      connectServer1ToChangelog(changelog1ID);
      Thread.sleep(1500);
      try
      {
        debugInfo("Connecting broker2 to replServer1");
        broker2 = openReplicationSession(baseDn,
          server2ID, 100, getChangelogPort(changelog1ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
        Thread.sleep(1000);
      } catch (SocketException se)
      {
        fail("Broker connection is expected to be accepted.");
      }
    try
    {
      debugInfo("Connecting broker3 to replServer2");
      broker3 = openReplicationSession(baseDn,
      try
      {
        debugInfo("Connecting broker3 to replServer2");
        broker3 = openReplicationSession(baseDn,
          server3ID, 100, getChangelogPort(changelog2ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
        Thread.sleep(1000);
      } catch (SocketException se)
      {
        fail("Broker connection is expected to be accepted.");
      }
    try
    {
      debugInfo("Connecting broker4 to replServer2");
      broker4 = openReplicationSession(baseDn,
      try
      {
        debugInfo("Connecting broker4 to replServer2");
        broker4 = openReplicationSession(baseDn,
          server4ID, 100, getChangelogPort(changelog2ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
        Thread.sleep(1000);
      } catch (SocketException se)
      {
        fail("Broker connection is expected to be accepted.");
      }
      // Do a bunch of change
      updatedEntries = newLDIFEntries();
      this.addTestEntriesToDB(updatedEntries);
      for (int i = 0; i < 200; i++)
      {
        String ent1[] =
        {
          createEntry(UUID.randomUUID())
        };
        this.addTestEntriesToDB(ent1);
      }
      for (int i = 0; i < 10; i++)
      {
        broker3.publish(createAddMsg(server3ID));
      }
      searchMonitor();
      debugInfo(
        "Disconnect DS from replServer1 (required in order to DEL entries).");
      disconnectFromReplServer(changelog1ID);
      debugInfo("Successfully ending " + testCase);
    } finally
    {
      fail("Broker connection is expected to be accepted.");
      debugInfo("Cleaning entries");
      postTest();
    }
    // Do a bunch of change
    updatedEntries = newLDIFEntries();
    this.addTestEntriesToDB(updatedEntries);
    for (int i = 0; i<200; i++)
    {
      String ent1[] = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent1);
    }
    for (int i=0; i<10; i++)
    {
      broker3.publish(createAddMsg(server3ID));
    }
    searchMonitor();
    debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
    disconnectFromReplServer(changelog1ID);
    debugInfo("Cleaning entries");
    postTest();
    debugInfo("Successfully ending " + testCase);
  }
  /**
@@ -532,26 +484,39 @@
    broker4 = null;
    if (replServer1 != null)
    {
      replServer1.clearDb();
      replServer1.remove();
      replServer1 = null;
    }
    if (replServer2 != null)
    {
      replServer2.clearDb();
      replServer2.remove();
      replServer2 = null;
    }
    if (replServer3 != null)
    {
      replServer3.clearDb();
      replServer3.remove();
    replServer1 = null;
    replServer2 = null;
    replServer3 = null;
      replServer3 = null;
    }
    super.cleanRealEntries();
    // Clean replication server ports
    for (int i = 0; i < replServerPort.length; i++)
    {
      replServerPort[i] = 0;
    }
    try
    {
      ReplicationDomain.clearJEBackend(false,
        replDomain.getBackend().getBackendID(),
        baseDn.toNormalizedString());
      TestCaseUtils.initializeTestBackend(false);
    }
    catch (Exception e) {}
  }
  private static final ByteArrayOutputStream oStream =
    new ByteArrayOutputStream();
  private static final ByteArrayOutputStream eStream =
@@ -563,7 +528,8 @@
    String[] args3 =
    {
      "-h", "127.0.0.1",
      "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
      "-p", String.valueOf(TestCaseUtils.getServerAdminPort()),
      "-Z", "-X",
      "-D", "cn=Directory Manager",
      "-w", "password",
      "-b", "cn=monitor",