mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
14.22.2013 5d1c7d62d688c64af2627ceb8b1556ef313954ec
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -27,23 +27,17 @@
 */
package org.opends.server.replication;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.TaskMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
import java.io.File;
import java.net.SocketTimeoutException;
import java.util.*;
import org.assertj.core.api.Assertions;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
@@ -63,6 +57,14 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.TaskMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
 * Tests contained here:
 *
@@ -202,7 +204,7 @@
        "ds-task-initialize-replica-server-id: all");
  }
  // Tests that entries have been written in the db
  /** Tests that entries have been written in the db */
  private void testEntriesInDb()
  {
    log("TestEntriesInDb");
@@ -260,62 +262,19 @@
   * @param expectedDone The expected number of entries to be processed.
   */
  private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
      long expectedLeft, long expectedDone)
      long expectedLeft, long expectedDone) throws Exception
  {
    log("waitTaskCompleted " + taskEntry.toLDIFString());
    try
    {
      // FIXME - Factorize with TasksTestCase
      // Wait until the task completes.
      int timeout = 2000;
      AttributeType completionTimeType = DirectoryServer.getAttributeType(
          ATTR_TASK_COMPLETION_TIME.toLowerCase());
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      String completionTime = null;
      long startMillisecs = System.currentTimeMillis();
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          // FIXME How is this possible?  Must be issue 858.
          fail("Task entry was not returned from the search.");
          continue;
        }
        completionTime =
          resultEntry.getAttributeValue(completionTimeType,
              DirectoryStringSyntax.DECODER);
        if (completionTime == null)
        {
          if (System.currentTimeMillis() - startMillisecs > 1000*timeout)
          {
            break;
          }
          Thread.sleep(100);
        }
      } while (completionTime == null);
      if (completionTime == null)
      {
        fail("The task had not completed after " + timeout + " seconds.");
      }
      Entry resultEntry = getCompletionTime(taskEntry);
      // Check that the task state is as expected.
      AttributeType taskStateType =
        DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
      String stateString =
        resultEntry.getAttributeValue(taskStateType,
            DirectoryStringSyntax.DECODER);
          resultEntry.getAttributeValue(taskStateType,
              DirectoryStringSyntax.DECODER);
      TaskState taskState = TaskState.fromString(stateString);
      assertEquals(taskState, expectedState,
          "The task completed in an unexpected state");
@@ -323,7 +282,7 @@
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      List<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
@@ -333,88 +292,94 @@
        fail("No log messages were written to the task entry on a failed task");
      }
      try
      {
        // Check that the task state is as expected.
        taskStateType =
          DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
        stateString =
          resultEntry.getAttributeValue(taskStateType,
              DirectoryStringSyntax.DECODER);
      // Check that the task state is as expected.
      assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_LEFT,
          expectedLeft, "The number of entries to process is not correct.");
        assertEquals(Long.decode(stateString).longValue(),expectedLeft,
            "The number of entries to process is not correct.");
        // Check that the task state is as expected.
        taskStateType =
          DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
        stateString =
          resultEntry.getAttributeValue(taskStateType,
              DirectoryStringSyntax.DECODER);
        assertEquals(Long.decode(stateString).longValue(),expectedDone,
            "The number of entries processed is not correct.");
      }
      catch(Exception e)
      {
        fail("Exception"+ e.getMessage()+e.getStackTrace());
      }
      // Check that the task state is as expected.
      assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_DONE,
          expectedDone, "The number of entries processed is not correct.");
    }
    catch(Exception e)
  }
  private Entry getCompletionTime(Entry taskEntry) throws Exception
  {
    // FIXME - Factorize with TasksTestCase
    // Wait until the task completes.
    int timeout = 2000;
    AttributeType completionTimeType = DirectoryServer.getAttributeType(
        ATTR_TASK_COMPLETION_TIME.toLowerCase());
    SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
    long startMillisecs = System.currentTimeMillis();
    do
    {
      fail("Exception"+ e.getMessage()+e.getStackTrace());
      InternalSearchOperation searchOperation = connection.processSearch(
          taskEntry.getDN(), SearchScope.BASE_OBJECT, filter);
      Entry resultEntry = searchOperation.getSearchEntries().getFirst();
      String completionTime = resultEntry.getAttributeValue(
          completionTimeType, DirectoryStringSyntax.DECODER);
      if (completionTime != null)
      {
        return resultEntry;
      }
      if (System.currentTimeMillis() - startMillisecs > 1000 * timeout)
      {
        fail("The task had not completed after " + timeout + " seconds.");
      }
      Thread.sleep(100);
    }
    while (true);
  }
  private void assertAttributeValue(Entry resultEntry, String lowerAttrName,
      long expected, String message) throws DirectoryException
  {
    AttributeType type = DirectoryServer.getAttributeType(lowerAttrName, true);
    String value = resultEntry.getAttributeValue(type, DirectoryStringSyntax.DECODER);
    assertEquals(Long.decode(value).longValue(), expected, message);
  }
  /**
   * Add to the current DB the entries necessary to the test.
   */
  private void addTestEntriesToDB()
  private void addTestEntriesToDB() throws Exception
  {
    try
    for (String ldifEntry : updatedEntries)
    {
      for (String ldifEntry : updatedEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        addTestEntryToDB(entry);
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
      log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB");
      Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
      addTestEntryToDB(entry);
      // They will be removed at the end of the test
      entryList.addLast(entry.getDN());
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
    log("addTestEntriesToDB : " + updatedEntries.length
        + " successfully added to DB");
  }
  private void addTestEntryToDB(Entry entry)
  {
    try
    AddOperation addOp =
        new AddOperationBasis(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(),
            entry.getOperationalAttributes());
    addOp.setInternalOperation(true);
    addOp.run();
    if (addOp.getResultCode() != ResultCode.SUCCESS)
    {
        AddOperationBasis addOp = new AddOperationBasis(connection,
            InternalClientConnection.nextOperationID(), InternalClientConnection
            .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
            entry.getUserAttributes(), entry.getOperationalAttributes());
        addOp.setInternalOperation(true);
        addOp.run();
        if (addOp.getResultCode() != ResultCode.SUCCESS)
        {
          log("addEntry: Failed" + addOp.getResultCode());
        }
      log("addEntry: Failed" + addOp.getResultCode());
    }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
    }
    catch(Exception e)
    {
      fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
    // They will be removed at the end of the test
    entryList.addLast(entry.getDN());
  }
  /*
  /**
   * Creates entries necessary to the test.
   */
  private String[] newLDIFEntries(int entriesCnt)
@@ -426,8 +391,6 @@
      bigAttributeValue[i] = Integer.toString(i).charAt(0);
    String[] entries = new String[entriesCnt + 2];
    String filler = "000000000000000000000000000000000000";
    entries[0] = "dn: " + EXAMPLE_DN + "\n"
                 + "objectClass: top\n"
                 + "objectClass: domain\n"
@@ -441,6 +404,7 @@
               + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
               + "\n";
    String filler = "000000000000000000000000000000000000";
    for (int i=0; i<entriesCnt; i++)
    {
      String useri="0000"+i;
@@ -472,36 +436,25 @@
  private void makeBrokerPublishEntries(ReplicationBroker broker,
      int senderID, int destinationServerID, int requestorID)
  {
    // Send entries
    try
    RoutableMsg initTargetMessage =
        new InitializeTargetMsg(EXAMPLE_DN, server2ID, destinationServerID,
            requestorID, updatedEntries.length, initWindow);
    broker.publish(initTargetMessage);
    int cnt = 0;
    for (String entry : updatedEntries)
    {
      RoutableMsg initTargetMessage =
        new InitializeTargetMsg(
          EXAMPLE_DN, server2ID, destinationServerID, requestorID,
          updatedEntries.length, initWindow);
      broker.publish(initTargetMessage);
      log("Broker will publish 1 entry: bytes:" + entry.length());
      int cnt = 0;
      for (String entry : updatedEntries)
      {
        log("Broker will publish 1 entry: bytes:"+ entry.length());
        EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
            entry.getBytes(), ++cnt);
        broker.publish(entryMsg);
      }
      DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
      broker.publish(doneMsg);
      log("Broker " + senderID + " published entries");
      EntryMsg entryMsg =
          new EntryMsg(senderID, destinationServerID, entry.getBytes(), ++cnt);
      broker.publish(entryMsg);
    }
    catch(Exception e)
    {
      fail("makeBrokerPublishEntries Exception:"+ e.getMessage() + " "
          + stackTraceToSingleLineString(e));
    }
    DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
    broker.publish(doneMsg);
    log("Broker " + senderID + " published entries");
  }
  void receiveUpdatedEntries(ReplicationBroker broker, int serverID,
@@ -565,8 +518,7 @@
    broker.setGenerationID(EMPTY_DN_GENID);
    broker.reStart(true);
    try { Thread.sleep(500); } catch(Exception e) {}
    sleep(500);
  }
  /**
@@ -598,19 +550,18 @@
   * @param changelogId The serverID of the replicationServer to create.
   * @return The new replicationServer.
   */
  private ReplicationServer createChangelogServer(int changelogId, String testCase)
  private ReplicationServer createChangelogServer(int changelogId,
      String testCase) throws Exception
  {
    SortedSet<String> servers = new TreeSet<String>();
    try
    {
      if (changelogId != changelog1ID)
          servers.add("localhost:" + getChangelogPort(changelog1ID));
      if (changelogId != changelog2ID)
          servers.add("localhost:" + getChangelogPort(changelog2ID));
      if (changelogId != changelog3ID)
          servers.add("localhost:" + getChangelogPort(changelog3ID));
    if (changelogId != changelog1ID)
      servers.add("localhost:" + getChangelogPort(changelog1ID));
    if (changelogId != changelog2ID)
      servers.add("localhost:" + getChangelogPort(changelog2ID));
    if (changelogId != changelog3ID)
      servers.add("localhost:" + getChangelogPort(changelog3ID));
      ReplServerFakeConfiguration conf =
    ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(
            getChangelogPort(changelogId),
            "initOnlineTest" + getChangelogPort(changelogId) + testCase + "Db",
@@ -619,16 +570,10 @@
            0,
            100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      Thread.sleep(1000);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    Thread.sleep(1000);
      return replicationServer;
    }
    catch (Exception e)
    {
      fail("createChangelog" + stackTraceToSingleLineString(e));
    }
    return null;
    return replicationServer;
  }
  /**
@@ -636,52 +581,42 @@
   * replication Server ID.
   * @param changelogID
   */
  private void connectServer1ToChangelog(int changelogID)
  private void connectServer1ToChangelog(int changelogID) throws Exception
  {
    connectServer1ToChangelog(changelogID, 0);
  }
  private void connectServer1ToChangelog(int changelogID, int heartbeat)
  private void connectServer1ToChangelog(int changelogID, int heartbeat) throws Exception
  {
    // Connect DS to the replicationServer
    try
    {
      // suffix synchronized
      String testName = "initOnLineTest";
      String synchroServerLdif =
        "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-synchronization-provider\n"
      + "objectClass: ds-cfg-replication-domain\n"
      + "cn: " + testName + "\n"
      + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
      + "ds-cfg-replication-server: localhost:"
      + getChangelogPort(changelogID)+"\n"
      + "ds-cfg-server-id: " + server1ID + "\n"
      + "ds-cfg-receive-status: true\n"
      + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
      + "ds-cfg-window-size: " + WINDOW_SIZE;
    // suffix synchronized
    String testName = "initOnLineTest";
    String synchroServerLdif =
      "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-provider\n"
    + "objectClass: ds-cfg-replication-domain\n"
    + "cn: " + testName + "\n"
    + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
    + "ds-cfg-replication-server: localhost:"
    + getChangelogPort(changelogID)+"\n"
    + "ds-cfg-server-id: " + server1ID + "\n"
    + "ds-cfg-receive-status: true\n"
    + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
    + "ds-cfg-window-size: " + WINDOW_SIZE;
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
      // Clear the backend
      LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
      synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
      configEntryList.add(synchroServerEntry.getDN());
    configEntryList.add(synchroServerEntry.getDN());
      replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn);
    replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn);
      assertTrue(!replDomain.ieRunning(),
    assertTrue(!replDomain.ieRunning(),
        "ReplicationDomain: Import/Export is not expected to be running");
    }
    catch(Exception e)
    {
      log("connectServer1ToChangelog", e);
      fail("connectServer1ToChangelog", e);
    }
  }
  private int getChangelogPort(int changelogID) throws Exception
@@ -748,10 +683,6 @@
      testEntriesInDb();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest(testCase);
@@ -967,10 +898,6 @@
      testEntriesInDb();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest(testCase);
@@ -1022,10 +949,6 @@
      // createTask(taskInitTargetS2);
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest(testCase);
@@ -1094,10 +1017,6 @@
      // createTask(taskInitTargetS2);
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest(testCase);
@@ -1155,40 +1074,32 @@
      // Check that the list of connected LDAP servers is correct
      // in each replication servers
      List<String> l1 = changelog1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
        getConnectedLDAPservers();
      assertEquals(l1.size(), 1);
      assertEquals(l1.get(0), String.valueOf(server1ID));
      Set<Integer> l1 = changelog1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l1).containsExactly(server1ID);
      List<String> l2;
    l2 = changelog2.getReplicationServerDomain(
        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
      assertEquals(l2.size(), 2);
      assertTrue(l2.contains(String.valueOf(server2ID)));
      assertTrue(l2.contains(String.valueOf(server3ID)));
      Set<Integer> l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server2ID, server3ID);
      List<String> l3;
    l3 = changelog3.getReplicationServerDomain(
        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
      assertEquals(l3.size(), 0);
      Set<Integer> l3 = changelog3.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l3).isEmpty();
      // Test updates
      broker3.stop();
      Thread.sleep(1000);
    l2 = changelog2.getReplicationServerDomain(
        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
      assertEquals(l2.size(), 1);
      assertEquals(l2.get(0), String.valueOf(server2ID));
      l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server2ID);
      broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      broker2.stop();
      Thread.sleep(1000);
    l2 = changelog2.getReplicationServerDomain(
        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
      assertEquals(l2.size(), 1);
      assertEquals(l2.get(0), String.valueOf(server3ID));
      l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server3ID);
    // TODO Test ReplicationServerDomain.getDestinationServers method.
@@ -1262,10 +1173,6 @@
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      log(testCase + e.getLocalizedMessage());
    }
    finally
    {
      afterTest(testCase);
@@ -1567,18 +1474,11 @@
      // in those cases, loop for a while waiting for completion.
      for (int i = 0; i< 10; i++)
      {
        if (replDomain.ieRunning())
        {
          try
          {
            Thread.sleep(500);
          } catch (InterruptedException e)
          { }
        }
        else
        if (!replDomain.ieRunning())
        {
          break;
        }
        sleep(500);
      }
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
@@ -1591,14 +1491,14 @@
    if (server2 != null)
    {
      server2.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      sleep(100); // give some time to the broker to disconnect
      // from the replicationServer.
      server2 = null;
    }
    if (server3 != null)
    {
      server3.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      sleep(100); // give some time to the broker to disconnect
      // from the replicationServer.
      server3 = null;
    }
@@ -1639,7 +1539,7 @@
    log("Successfully cleaned " + testCase);
  }
    /**
  /**
   * Clean up the environment.
   *
   * @throws Exception If the environment could not be set up.