mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
15.11.2013 9ec005a5ccb24feedff497ff97075a0303af46fe
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -83,9 +83,9 @@
  private static final int server1ID = 1;
  private static final int server2ID = 2;
  private static final int server3ID = 3;
  private static final int changelog1ID = 11;
  private static final int changelog2ID = 12;
  private static final int changelog3ID = 13;
  private static final int replServerId1 = 11;
  private static final int replServerId2 = 12;
  private static final int replServerId3 = 13;
  private DN baseDN;
  private ReplicationBroker broker2;
@@ -96,8 +96,7 @@
  private boolean emptyOldChanges = true;
  private Entry taskInitRemoteS2;
  private String[] updatedEntries;
  private static int[] replServerPort = new int[20];
  private static int[] replServerPort;
  /**
   * A makeldif template used to create some test entries.
@@ -157,9 +156,6 @@
  /**
   * Set up the environment for performing the tests in this Class.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @Override
  @BeforeClass
@@ -167,6 +163,7 @@
  {
    super.setUp();
    replServerPort = TestCaseUtils.findFreePorts(3);
    baseDN = DN.decode(baseDnStr);
    updatedEntries = newLDIFEntries();
@@ -314,42 +311,40 @@
  /**
   * Creates a new replicationServer.
   * @param changelogId The serverID of the replicationServer to create.
   * @param replServerId The serverID of the replicationServer to create.
   * @param all         Specifies whether to connect the created replication
   *                    server to the other replication servers in the test.
   * @return The new created replication server.
   */
  private ReplicationServer createReplicationServer(int changelogId,
  private ReplicationServer createReplicationServer(int replServerId,
      boolean all, String testCase) throws Exception
  {
    SortedSet<String> servers = new TreeSet<String>();
    if (all)
    {
      if (changelogId != changelog1ID)
        servers.add("localhost:" + getChangelogPort(changelog1ID));
      if (changelogId != changelog2ID)
        servers.add("localhost:" + getChangelogPort(changelog2ID));
      if (changelogId != changelog3ID)
        servers.add("localhost:" + getChangelogPort(changelog3ID));
      if (replServerId != replServerId1)
        servers.add("localhost:" + getRSPort(replServerId1));
      if (replServerId != replServerId2)
        servers.add("localhost:" + getRSPort(replServerId2));
      if (replServerId != replServerId3)
        servers.add("localhost:" + getRSPort(replServerId3));
    }
    int chPort = getChangelogPort(changelogId);
    String chDir = "generationIdTest" + changelogId + testCase + "Db";
    int rsPort = getRSPort(replServerId);
    String rsDir = "generationIdTest" + replServerId + testCase + "Db";
    ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100, servers);
        new ReplServerFakeConfiguration(rsPort, rsDir, 0, replServerId, 0, 100, servers);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    Thread.sleep(1000);
    return replicationServer;
  }
  /**
   * Create a synchronized suffix in the current server providing the
   * replication Server ID.
   * @param changeLogID replication Server ID
   * replication Server.
   */
  private void connectServer1ToChangelog(int changeLogID) throws Exception
  private void connectServer1ToReplServer(ReplicationServer rs) throws Exception
  {
    // Connect DS to the replicationServer
    debugInfo("Connecting DS1 to replicate to RS" + getRSNumber(rs) + "(" + rs.getServerId() + ")");
    {
      // suffix synchronized
      String synchroServerLdif =
@@ -358,8 +353,7 @@
        + "objectClass: ds-cfg-replication-domain\n"
        + "cn: " + testName + "\n"
        + "ds-cfg-base-dn: " + baseDnStr + "\n"
        + "ds-cfg-replication-server: localhost:"
        + getChangelogPort(changeLogID)+"\n"
        + "ds-cfg-replication-server: localhost:" + rs.getReplicationPort() + "\n"
        + "ds-cfg-server-id: " + server1ID + "\n"
        + "ds-cfg-receive-status: true\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
@@ -388,15 +382,35 @@
    }
  }
  /**
   * Disconnect DS from the replicationServer
   */
  private void disconnectFromReplServer(int changelogID) throws Exception
  private String getRSNumber(ReplicationServer rs)
  {
    if (rs == replServer1)
    {
      return "1";
    }
    else if (rs == replServer2)
    {
      return "2";
    }
    else if (rs == replServer3)
    {
      return "3";
    }
    fail("Unknown RS");
    return null;
  }
  /**
   * Disconnect DS from the provided replicationServer.
   *
   * @see #connectServer1ToReplServer(ReplicationServer) should have been called previously
   */
  private void disconnectFromReplServer(ReplicationServer rs) throws Exception
  {
    debugInfo("Disconnect DS1 from RS" + getRSNumber(rs) + "(" + rs.getServerId() + ")");
    {
      // suffix synchronized
      String synchroServerStringDN = "cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN;
      // Must have called connectServer1ToChangelog previously
      assertNotNull(synchroServerEntry);
      DN synchroServerDN = DN.decode(synchroServerStringDN);
@@ -429,18 +443,14 @@
      catch (DirectoryException e)
      {
        // success
        debugInfo("disconnectFromReplServer:" + changelogID, e);
        debugInfo("disconnectFromReplServer:" + rs.getServerId(), e);
      }
    }
  }
  private int getChangelogPort(int changelogID) throws Exception
  private int getRSPort(int replServerId) throws Exception
  {
    if (replServerPort[changelogID] == 0)
    {
      replServerPort[changelogID] = TestCaseUtils.findFreePort();
    }
    return replServerPort[changelogID];
    return replServerPort[replServerId - 11];
  }
  private long readGenIdFromSuffixRootEntry() throws Exception
@@ -567,8 +577,6 @@
  /**
   * SingleRS tests basic features of generationID
   * with one single Replication Server.
   *
   * @throws Exception
   */
  @Test(enabled=false)
  public void testSingleRS() throws Exception
@@ -586,7 +594,7 @@
      long rsGenId;
      long dsGenId;
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      replServer1 = createReplicationServer(replServerId1, false, testCase);
      // To search the replication server db later in these tests, we need
      // to attach the search backend to the replication server just created.
@@ -597,8 +605,7 @@
      //===========================================================
      debugInfo(testCase + " ** TEST ** Empty backend");
      debugInfo(testCase + " Configuring DS1 to replicate to RS1(" + changelog1ID + ") on an empty backend");
      connectServer1ToChangelog(changelog1ID);
      connectServer1ToReplServer(replServer1);
      debugInfo(testCase + " Expect genId to be not retrievable from suffix root entry");
      dsGenId = readGenIdFromSuffixRootEntry();
@@ -610,8 +617,7 @@
      assertEquals(rsGenId, EMPTY_DN_GENID);
      // Clean for next test
      debugInfo(testCase + " Unconfiguring DS1 to replicate to RS1(" + changelog1ID + ")");
      disconnectFromReplServer(changelog1ID);
      disconnectFromReplServer(replServer1);
      //===========================================================
@@ -620,8 +626,7 @@
      debugInfo(testCase + " Adding test entries to DS");
      addTestEntriesToDB(updatedEntries);
      debugInfo(testCase + " Configuring DS1 to replicate to RS1(" + changelog1ID + ") on a non empty backend");
      connectServer1ToChangelog(changelog1ID);
      connectServer1ToReplServer(replServer1);
      debugInfo(testCase + " Test that the generationId is written in the DB in the root entry on DS1");
      dsGenId = readGenIdFromSuffixRootEntry();
@@ -636,12 +641,12 @@
      debugInfo(testCase + " ** TEST ** DS2 connection to RS1 with bad genID");
      broker2 = openReplicationSession(baseDN, server2ID, 100,
          getChangelogPort(changelog1ID), 1000, !emptyOldChanges, dsGenId+1);
          replServer1.getReplicationPort(), 1000, !emptyOldChanges, dsGenId+1);
      //===========================================================
      debugInfo(testCase + " ** TEST ** DS3 connection to RS1 with good genID");
      broker3 = openReplicationSession(baseDN, server3ID, 100,
          getChangelogPort(changelog1ID), 1000, !emptyOldChanges, dsGenId);
          replServer1.getReplicationPort(), 1000, !emptyOldChanges, dsGenId);
      //===========================================================
      debugInfo(testCase + " ** TEST ** DS2 (bad genID) changes must be ignored.");
@@ -676,7 +681,7 @@
      replServer1 = null;
      debugInfo("Create again replServer1");
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      replServer1 = createReplicationServer(replServerId1, false, testCase);
      // To search the replication server db later in these tests, we need
      // to attach the search backend to the replication server just created.
@@ -702,21 +707,21 @@
          " spread a new gen ID on the topology, verify DS1 and RS1");
      debugInfo("Create again broker2");
      broker2 = openReplicationSession(baseDN,
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, dsGenId);
          server2ID, 100, replServer1.getReplicationPort(), 1000, emptyOldChanges, dsGenId);
      assertTrue(broker2.isConnected(), "Broker2 failed to connect to replication server");
      debugInfo("Create again broker3");
      broker3 = openReplicationSession(baseDN,
          server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, dsGenId);
          server3ID, 100, replServer1.getReplicationPort(), 1000, emptyOldChanges, dsGenId);
      assertTrue(broker3.isConnected(), "Broker3 failed to connect to replication server");
      debugInfo("Launch on-line import on DS1");
      long oldGenId = dsGenId;
      dsGenId=-1;
      disconnectFromReplServer(changelog1ID);
      disconnectFromReplServer(replServer1);
      performLdifImport();
      connectServer1ToChangelog(changelog1ID);
      connectServer1ToReplServer(replServer1);
      debugInfo("Create Reset task on DS1 to propagate the new gen ID as the reference");
      executeTask(createSetGenerationIdTask(dsGenId, ""), 20000);
@@ -796,12 +801,12 @@
      // Simulates the broker restart at the end of the import
      broker2.stop();
      broker2 = openReplicationSession(baseDN,
          server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, dsGenId);
          server2ID, 100, replServer1.getReplicationPort(), 1000, emptyOldChanges, dsGenId);
      // Simulates the broker restart at the end of the import
      broker3.stop();
      broker3 = openReplicationSession(baseDN,
          server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, dsGenId);
          server3ID, 100, replServer1.getReplicationPort(), 1000, emptyOldChanges, dsGenId);
      debugInfo("Adding reset task to DS1");
      executeTask(createSetGenerationIdTask(null, ""), 20000);
@@ -844,7 +849,7 @@
      debugInfo(testCase + " ** TEST ** General cleaning");
      debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
      disconnectFromReplServer(changelog1ID);
      disconnectFromReplServer(replServer1);
      debugInfo("Successfully ending " + testCase);
    } finally
@@ -902,18 +907,14 @@
      TestCaseUtils.initializeTestBackend(false);
      debugInfo("Creating 3 RSs");
      replServer1 = createReplicationServer(changelog1ID, true, testCase);
      replServer2 = createReplicationServer(changelog2ID, true, testCase);
      replServer3 = createReplicationServer(changelog3ID, true, testCase);
      replServer1 = createReplicationServer(replServerId1, true, testCase);
      replServer2 = createReplicationServer(replServerId2, true, testCase);
      replServer3 = createReplicationServer(replServerId3, true, testCase);
      debugInfo("Connecting DS to replServer1");
      connectServer1ToChangelog(changelog1ID);
      connectServer1ToReplServer(replServer1);
      debugInfo("Expect genId are set in all replServers.");
      waitForStableGenerationId(EMPTY_DN_GENID);
      debugInfo("Disconnect DS from replServer1.");
      disconnectFromReplServer(changelog1ID);
      disconnectFromReplServer(replServer1);
      debugInfo("Expect genIds to be resetted in all servers to -1 as no more DS in topo - after 10 sec");
      waitForStableGenerationId(-1);
@@ -921,8 +922,7 @@
      debugInfo("Add entries to DS");
      addTestEntriesToDB(updatedEntries);
      debugInfo("Connecting DS to replServer2");
      connectServer1ToChangelog(changelog2ID);
      connectServer1ToReplServer(replServer2);
      debugInfo("Expect genIds to be set in all servers based on the added entries.");
      genId = readGenIdFromSuffixRootEntry();
@@ -931,14 +931,13 @@
      debugInfo("Connecting broker2 to replServer3 with a good genId");
      broker2 = openReplicationSession(baseDN, server2ID, 100,
          getChangelogPort(changelog3ID), 1000, !emptyOldChanges, genId);
          replServer3.getReplicationPort(), 1000, !emptyOldChanges, genId);
      Thread.sleep(1000);
      debugInfo("Expecting that broker2 is not in bad gen id since it has a correct genId");
      assertFalse(isDegradedDueToGenerationId(replServer1, server2ID));
      debugInfo("Disconnecting DS from replServer1");
      disconnectFromReplServer(changelog1ID);
      disconnectFromReplServer(replServer1);
      debugInfo("Verifying that all replservers genIds have been reset.");
@@ -948,15 +947,14 @@
      debugInfo("Connecting broker3 to replServer1 with a bad genId");
      long badGenId = 1;
      broker3 = openReplicationSession(baseDN, server3ID, 100,
          getChangelogPort(changelog1ID), 1000, !emptyOldChanges, badGenId);
          replServer1.getReplicationPort(), 1000, !emptyOldChanges, badGenId);
      Thread.sleep(1000);
      debugInfo("Expecting that broker3 is in bad gen id since it has a bad genId");
      assertTrue(isDegradedDueToGenerationId(replServer1, server3ID));
      assertEquals(countUpdatedEntriesInDb(), updatedEntries.length);
      debugInfo("Connecting DS to replServer1.");
      connectServer1ToChangelog(changelog1ID);
      connectServer1ToReplServer(replServer1);
      debugInfo("Adding reset task to DS.");
@@ -964,28 +962,16 @@
      debugInfo("Verifying that all replservers genIds have been reset.");
      genId = readGenIdFromSuffixRootEntry();
      assertGenIdEquals(genId, "");
      assertGenIdEquals(genId);
      debugInfo("Adding reset task to DS." + genId);
      executeTask(createSetGenerationIdTask(null, "ds-task-reset-generation-id-new-value: -1"), 20000);
      debugInfo("Verifying that all replservers genIds have been reset.");
      int waitRes = 0;
      while(waitRes<100)
      {
        readGenIdFromSuffixRootEntry();
        if (replServer1.getGenerationId(baseDN) == -1
            && replServer2.getGenerationId(baseDN) == -1
            && replServer3.getGenerationId(baseDN) == -1)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      assertGenIdEquals(-1, "test" + i);
      waitForStableGenerationId(-1);
      debugInfo(
        "Disconnect DS from replServer1 (required in order to DEL entries).");
      disconnectFromReplServer(changelog1ID);
      debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
      disconnectFromReplServer(replServer1);
      debugInfo("Successfully ending " + testCase);
    } finally
@@ -1006,14 +992,14 @@
      wait++;
      Thread.sleep(100);
    }
    assertGenIdEquals(expectedGenId, "");
    assertGenIdEquals(expectedGenId);
  }
  private void assertGenIdEquals(long expectedGenId, String reason)
  private void assertGenIdEquals(long expectedGenId)
  {
    assertEquals(replServer1.getGenerationId(baseDN), expectedGenId, " in replServer1: " + reason);
    assertEquals(replServer2.getGenerationId(baseDN), expectedGenId, " in replServer2: " + reason);
    assertEquals(replServer3.getGenerationId(baseDN), expectedGenId, " in replServer3: " + reason);
    assertEquals(replServer1.getGenerationId(baseDN), expectedGenId, " in replServer1");
    assertEquals(replServer2.getGenerationId(baseDN), expectedGenId, " in replServer2");
    assertEquals(replServer3.getGenerationId(baseDN), expectedGenId, " in replServer3");
  }
  private boolean isDegradedDueToGenerationId(ReplicationServer rs, int serverId)
@@ -1036,7 +1022,7 @@
    super.cleanRealEntries();
    Arrays.fill(replServerPort, 0);
    replServerPort = TestCaseUtils.findFreePorts(3);
    debugInfo("Clearing DJ backend");
    TestCaseUtils.initializeTestBackend(false);
@@ -1045,7 +1031,6 @@
  /**
   * Test generationID saving when the root entry does not exist
   * at the moment when the replication is enabled.
   * @throws Exception
   */
  @Test(enabled=false, groups="slow")
  public void testServerStop() throws Exception
@@ -1060,27 +1045,21 @@
    try
    {
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      replServer1 = createReplicationServer(replServerId1, false, testCase);
      /*
       * Test  : empty replicated backend
       * Check : nothing is broken - no generationId generated
       */
      // Connect DS to RS with no data
      // Read generationId - should be not retrievable since no entry
      debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")");
      connectServer1ToChangelog(changelog1ID);
      debugInfo(testCase + " Expect genId attribute to be not retrievable");
      assertEquals(readGenIdFromSuffixRootEntry(), -1);
      connectServer1ToReplServer(replServer1);
      assertEquals(readGenIdFromSuffixRootEntry(), -1,
          "genId attribute should not be retrievable since there are NO entry in the backend");
      addTestEntriesToDB(updatedEntries);
      assertEquals(readGenIdFromSuffixRootEntry(), EMPTY_DN_GENID,
          "genId attribute should be retrievable since there IS one entry in the backend");
      debugInfo(testCase + " Expect genId attribute to be retrievable");
      assertEquals(readGenIdFromSuffixRootEntry(), EMPTY_DN_GENID);
      disconnectFromReplServer(changelog1ID);
      disconnectFromReplServer(replServer1);
    }
    finally
    {
@@ -1098,13 +1077,13 @@
  {
    String testCase = "testLoop";
    debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled());
    long rgenId;
    long rsGenId;
    // Special test were we want to test with an empty backend. So free it
    // for this special test.
    TestCaseUtils.initializeTestBackend(false);
    replServer1 = createReplicationServer(changelog1ID, false, testCase);
    replServer1 = createReplicationServer(replServerId1, false, testCase);
    clearChangelogDB(replServer1);
    ReplicationBroker broker = null;
@@ -1114,11 +1093,11 @@
      {
        long generationId = 1000+i;
        broker = openReplicationSession(baseDN, server2ID, 100,
            getChangelogPort(changelog1ID), 1000, !emptyOldChanges, generationId);
            replServer1.getReplicationPort(), 1000, !emptyOldChanges, generationId);
        debugInfo(testCase + " Expect genId to be set in memory on the replication " +
          " server side even if not wrote on disk/db since no change occurred.");
        rgenId = replServer1.getGenerationId(baseDN);
        assertEquals(rgenId, generationId);
        rsGenId = replServer1.getGenerationId(baseDN);
        assertEquals(rsGenId, generationId);
        broker.stop();
        broker = null;
        Thread.sleep(2000); // Let time to RS to clear info about previous connection