mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.59.2013 cbf6bfd149ce305652be0aac68d210778b5cbba6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -45,7 +45,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.GenerationIdChecksum;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
@@ -68,7 +67,9 @@
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.opends.server.protocols.internal.InternalClientConnection.*;
import static org.opends.server.types.ResultCode.*;
import static org.opends.server.types.SearchScope.*;
import static org.testng.Assert.*;
/**
@@ -100,14 +101,13 @@
  */
  protected InternalClientConnection connection;
  /**
   * Created entries that need to be deleted for cleanup
   */
  protected LinkedList<DN> entryList = new LinkedList<DN>();
  protected LinkedList<DN> configEntryList = new LinkedList<DN>();
  /** Created entries that will be deleted on class cleanup. */
  protected final Set<DN> entriesToCleanup = new HashSet<DN>();
  /** Created config entries that will be deleted on class cleanup. */
  protected final Set<DN> configEntriesToCleanup = new HashSet<DN>();
  /** Replicated suffix (replication domain). */
  protected Entry synchroServerEntry;
  protected Entry replServerEntry;
  /**
@@ -236,14 +236,18 @@
    ReplicationBroker broker = new ReplicationBroker(replicationDomain,
        state, baseDN, serverId, window_size,
        generationId, 100000, getReplSessionSecurity(), (byte)1, 500);
    List<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    connect(broker, port, timeout);
    return broker;
  }
  private void connect(ReplicationBroker broker, int port, int timeout) throws Exception
  {
    broker.start(Collections.singletonList("localhost:" + port));
    // give some time to the broker to connect to the replicationServer.
    checkConnection(30, broker, port);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    checkConnection(30, broker, port); // give some time to the broker to connect
                                       // to the replicationServer.
    return broker;
  }
  /**
@@ -251,42 +255,28 @@
   * replication server. Waits for connection to be ok up to secTimeout seconds
   * before failing.
   */
  protected void checkConnection(int secTimeout, ReplicationBroker rb, int rsPort)
  protected void checkConnection(int secTimeout, ReplicationBroker rb, int rsPort) throws Exception
  {
    int nSec = 0;
    // Go out of the loop only if connection is verified or if timeout occurs
    while (true)
    {
      // Test connection
      boolean connected = rb.isConnected();
      if (connected)
      if (rb.isConnected())
      {
        // Connection verified
        TRACER.debugInfo("checkConnection: connection of broker "
          + rb.getServerId() + " to RS " + rb.getRsGroupId()
          + " obtained after " + nSec + " seconds.");
        return;
      }
      // Sleep 1 second
      try
      {
        Thread.sleep(1000);
        rb.start();
      } catch (InterruptedException ex)
      {
        fail("Error sleeping " + stackTraceToSingleLineString(ex));
      }
      Thread.sleep(1000);
      rb.start();
      nSec++;
      if (nSec > secTimeout)
      {
        // Timeout reached, end with error
        fail("checkConnection: DS " + rb.getServerId() + " is not connected to "
           + "the RS port " + rsPort + " after " + secTimeout + " seconds.");
      }
      assertTrue(nSec <= secTimeout,
          "checkConnection: DS " + rb.getServerId() + " is not connected to "
              + "the RS port " + rsPort + " after " + secTimeout + " seconds.");
    }
  }
@@ -315,57 +305,36 @@
    ReplicationBroker broker = new ReplicationBroker(null,
        state, baseDN, serverId, window_size, generationId,
        100000, getReplSessionSecurity(), (byte)1, 500);
    List<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    checkConnection(30, broker, port);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    connect(broker, port, timeout);
    return broker;
  }
  protected void deleteEntry(DN dn)
  protected void deleteEntry(DN dn) throws Exception
  {
    try
    {
    if (dn.getParent().getRDN().toString().equalsIgnoreCase("cn=domains"))
      deleteEntry(DN.decode("cn=external changelog,"+dn.toString()));
    }
    catch(Exception e)
    {}
      deleteEntry(DN.decode("cn=external changelog," + dn));
    DeleteOperationBasis op = new DeleteOperationBasis(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(),
        null, dn);
    op.run();
    if ((op.getResultCode() != ResultCode.SUCCESS) &&
        (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
    {
      fail("Delete entry " + dn +
          " failed: " + op.getResultCode().getResultCodeName());
    }
    assertTrue(op.getResultCode() == SUCCESS || op.getResultCode() == NO_SUCH_OBJECT,
        "Delete entry " + dn + " failed: " + op.getResultCode().getResultCodeName());
  }
  /**
   * suppress all the config entries created by the tests in this class
   */
  protected void cleanConfigEntries()
  protected void cleanConfigEntries() throws Exception
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
    "ReplicationTestCase/Cleaning config entries"));
    logError(Message.raw(Category.SYNC, Severity.NOTICE, "ReplicationTestCase/Cleaning config entries"));
    try
    for (DN dn : configEntriesToCleanup)
    {
      while (true)
      {
        DN dn = configEntryList.removeLast();
        deleteEntry(dn);
      }
      deleteEntry(dn);
    }
    catch (NoSuchElementException e) {
      // done
    }
    configEntriesToCleanup.clear();
    synchroServerEntry = null;
    replServerEntry = null;
  }
@@ -373,23 +342,15 @@
  /**
   * suppress all the real entries created by the tests in this class
   */
  protected void cleanRealEntries()
  protected void cleanRealEntries() throws Exception
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
    "ReplicationTestCase/Cleaning entries"));
    logError(Message.raw(Category.SYNC, Severity.NOTICE, "ReplicationTestCase/Cleaning entries"));
    // Delete entries
    try
    for (DN dn : entriesToCleanup)
    {
      while (true)
      {
        DN dn = entryList.removeLast();
        deleteEntry(dn);
      }
      deleteEntry(dn);
    }
    catch (NoSuchElementException e) {
      // done
    }
    entriesToCleanup.clear();
  }
  /**
@@ -406,10 +367,7 @@
    removeReplicationServerDB();
    cleanConfigEntries();
    configEntryList = new LinkedList<DN>();
    cleanRealEntries();
    entryList = new LinkedList<DN>();
    // Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING)
    // (in case our test created some entries in it)
@@ -429,7 +387,7 @@
   * - replication changes backend object
   * This method checks for existence of anything of that type.
   */
  protected void paranoiaCheck()
  protected void paranoiaCheck() throws Exception
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
      "Performing paranoia check"));
@@ -440,7 +398,7 @@
    // Be sure that no replication server instance is left
    List<ReplicationServer> allRSInstances = ReplicationServer.getAllInstances();
    assertTrue(allRSInstances.size() == 0, "Some replication servers left: " + allRSInstances);
    assertEquals(allRSInstances.size(), 0, "Some replication servers left: " + allRSInstances);
    // Check for config entries for replication domain
    assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-domain)",
@@ -521,56 +479,49 @@
   * @param errorMsg The error message to display if a config entry is found
   */
  private void assertNoConfigEntriesWithFilter(String filter, String errorMsg)
      throws Exception
  {
    try
    {
      // Search for matching entries in config backend
      InternalSearchOperation op = connection.processSearch(
        ByteString.valueOf("cn=config"),
        SearchScope.WHOLE_SUBTREE,
        LDAPFilter.decode(filter));
    // Search for matching entries in config backend
    InternalSearchOperation op = connection.processSearch("cn=config", WHOLE_SUBTREE, filter);
    assertEquals(op.getResultCode(), ResultCode.SUCCESS, op.getErrorMessage() .toString());
      assertEquals(op.getResultCode(), ResultCode.SUCCESS,
        op.getErrorMessage().toString());
      // Check that no entries have been found
      LinkedList<SearchResultEntry> entries = op.getSearchEntries();
      assertNotNull(entries);
      StringBuilder sb = new StringBuilder();
      for (SearchResultEntry entry : entries)
      {
        sb.append(entry.toLDIFString());
        sb.append(' ');
      }
      assertEquals(entries.size(), 0, errorMsg + ":\n" + sb);
    } catch (Exception e)
    // Check that no entries have been found
    List<SearchResultEntry> entries = op.getSearchEntries();
    assertNotNull(entries);
    StringBuilder sb = new StringBuilder();
    for (SearchResultEntry entry : entries)
    {
      fail("assertNoConfigEntriesWithFilter: could not search config backend" +
        "with filter: " + filter + ": " + e.getMessage());
      sb.append(entry.toLDIFString());
      sb.append(' ');
    }
    assertEquals(entries.size(), 0, errorMsg + ":\n" + sb);
  }
  /**
   * Configure the replication for this test.
   */
  protected void configureReplication() throws Exception
  protected void configureReplication(String replServerEntryLdif,
      String synchroServerEntryLdif) throws Exception
  {
    if (replServerEntry != null)
    {
      // Add the replication server
      DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()),
       "Unable to add the replication server");
      configEntryList.add(replServerEntry.getDN());
    }
    replServerEntry = TestCaseUtils.entryFromLdifString(replServerEntryLdif);
    addConfigEntry(replServerEntry, "Unable to add the replication server");
    addSynchroServerEntry(synchroServerEntryLdif);
  }
    if (synchroServerEntry != null)
  protected void addSynchroServerEntry(String synchroServerEntryLdif)
      throws Exception
  {
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerEntryLdif);
    addConfigEntry(synchroServerEntry, "Unable to add the synchronized server");
  }
  private void addConfigEntry(Entry configEntry, String errorMessage) throws Exception
  {
    if (configEntry != null)
    {
      // We also have a replicated suffix (replication domain)
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
          "Unable to add the synchronized server");
      configEntryList.add(synchroServerEntry.getDN());
      DirectoryServer.getConfigHandler().addEntry(configEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(configEntry.getDN()), errorMessage);
      configEntriesToCleanup.add(configEntry.getDN());
    }
  }
@@ -591,18 +542,13 @@
    {
      if (count++>0)
        Thread.sleep(100);
      op = connection.processSearch(
          ByteString.valueOf("cn=replication,cn=monitor"),
                                    SearchScope.WHOLE_SUBTREE,
                                    LDAPFilter.decode(monitorFilter));
      op = connection.processSearch("cn=replication,cn=monitor", WHOLE_SUBTREE, monitorFilter);
    }
    while (op.getSearchEntries().isEmpty() && (count<100));
    if (op.getSearchEntries().isEmpty())
      throw new Exception("Could not read monitoring information");
    assertFalse(op.getSearchEntries().isEmpty(), "Could not read monitoring information");
    SearchResultEntry entry = op.getSearchEntries().getFirst();
    AttributeType attrType =
         DirectoryServer.getDefaultAttributeType(attr);
    AttributeType attrType = DirectoryServer.getDefaultAttributeType(attr);
    return entry.getAttributeValue(attrType, IntegerSyntax.DECODER).longValue();
  }
@@ -741,15 +687,10 @@
  {
    Entry taskEntry = TestCaseUtils.makeEntry(task);
    InternalClientConnection connection =
         InternalClientConnection.getRootConnection();
    InternalClientConnection connection = getRootConnection();
    // Add the task.
    AddOperation addOperation =
         connection.processAdd(taskEntry.getDN(),
                               taskEntry.getObjectClasses(),
                               taskEntry.getUserAttributes(),
                               taskEntry.getOperationalAttributes());
    AddOperation addOperation = connection.processAdd(taskEntry);
    assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
                 "Add of the task definition was not successful");
@@ -822,23 +763,13 @@
   * result code is not SUCCESS
   */
  protected void addTask(Entry taskEntry, ResultCode expectedResult,
      Message errorMessage)
      Message errorMessage) throws Exception
  {
    try
    {
      TRACER.debugInfo("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      AddOperation addOperation = getRootConnection().processAdd(taskEntry);
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
@@ -849,15 +780,10 @@
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(errorMessage.toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + errorMessage + ">");
        assertTrue(addOperation.getErrorMessage().toString().startsWith(errorMessage.toString()),
            "Error MsgID of the task <" + addOperation.getErrorMessage() + "> equals <" + errorMessage + ">");
        TRACER.debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId()
                + addOperation.getErrorMessage() + ">");
      }
      else
      {
@@ -865,14 +791,10 @@
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      entriesToCleanup.add(taskEntry.getDN());
      TRACER.debugInfo("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  protected void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
@@ -913,10 +835,8 @@
    if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
        && (taskState != TaskState.RUNNING))
    {
      if (logMessages.size() == 0)
      {
        fail("No log messages were written to the task entry on a failed task");
      }
      assertTrue(logMessages.size() != 0,
          "No log messages were written to the task entry on a failed task");
    }
    if (logMessages.size() != 0)
    {
@@ -945,9 +865,8 @@
  /**
   * Add to the current DB the entries necessary to the test
   */
  protected void addTestEntriesToDB(String... ldifEntries)
  protected void addTestEntriesToDB(String... ldifEntries) throws Exception
  {
    try
    {
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
@@ -979,10 +898,6 @@
        }
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
@@ -1010,8 +925,8 @@
        Entry newEntry = DirectoryServer.getEntry(dn);
        if (newEntry != null)
        {
          List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
          for (AttributeValue val : tmpAttrList.get(0))
          Attribute attribute = newEntry.getAttribute("entryuuid").get(0);
          for (AttributeValue val : attribute)
          {
            found = val.getValue().toString();
            break;
@@ -1032,7 +947,7 @@
  /**
   * Utility method : removes a domain deleting the passed config entry
   */
  protected void removeDomain(Entry... domainCfgEntries)
  protected void removeDomain(Entry... domainCfgEntries) throws Exception
  {
    for (Entry entry : domainCfgEntries)
    {
@@ -1050,38 +965,8 @@
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(Session session, String msgType) {
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      ReplicationMsg replMsg = null;
      try
      {
        replMsg = session.receive();
      } catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : " +
          ex.getClass().getName()  + " : " + ex.getMessage());
      }
      // Get message type
      String rcvMsgType = replMsg.getClass().getName();
      if (rcvMsgType.equals(msgType))
      {
        // Ok, got it, let's return the expected message
        return replMsg;
      }
      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
      nMsg++;
      curTime = System.currentTimeMillis();
    }
    // Timeout
    fail("Failed to receive an expected " + msgType +
      " message after 5 seconds : also received " + nMsg +
      " other messages during wait time.");
    return null;
  protected static <T extends ReplicationMsg> T waitForSpecificMsg(Session session, Class<T> msgType) {
    return waitForSpecificMsg(session, null, msgType);
  }
  /**
@@ -1091,7 +976,15 @@
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
  protected static <T extends ReplicationMsg> T waitForSpecificMsg(ReplicationBroker broker, Class<T> msgType) {
    return waitForSpecificMsg(null, broker, msgType);
  }
  protected static <T extends ReplicationMsg> T waitForSpecificMsg(Session session, ReplicationBroker broker, Class<T> msgType)
  {
    assertTrue(session != null || broker != null, "One of Session or ReplicationBroker parameter must not be null");
    assertTrue(session == null || broker == null, "Only one of Session or ReplicationBroker parameter must not be null");
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
@@ -1101,27 +994,34 @@
      ReplicationMsg replMsg = null;
      try
      {
        replMsg = broker.receive();
      } catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : " +
          ex.getClass().getName()  + " : " + ex.getMessage());
        if (session != null)
        {
          replMsg = session.receive();
        }
        else if (broker != null)
        {
          replMsg = broker.receive();
        }
      }
      // Get message type
      String rcvMsgType = replMsg.getClass().getName();
      if (rcvMsgType.equals(msgType))
      catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : "
            + ex.getClass().getName() + " : " + ex.getMessage());
      }
      if (replMsg.equals(msgType.getClass()))
      {
        // Ok, got it, let's return the expected message
        return replMsg;
        return (T) replMsg;
      }
      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
      nMsg++;
      curTime = System.currentTimeMillis();
    }
    // Timeout
    fail("Failed to receive an expected " + msgType +
      " message after 5 seconds : also received " + nMsg +
      " other messages during wait time.");
    fail("Failed to receive an expected " + msgType
        + " message after 5 seconds : also received " + nMsg
        + " other messages during wait time.");
    return null;
  }
}