mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
06.11.2009 3a9e211d36ee94ff99941943b3b51e0f768624f5
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -47,7 +47,6 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -65,9 +64,10 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.schema.DirectoryStringSyntax;
@@ -260,34 +260,6 @@
      broker.setSoTimeout(timeout);
    checkConnection(30, broker, port); // give some time to the broker to connect
                                       // to the replicationServer.
    if (emptyOldChanges)
    {
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      {
        logError(new MessageBuilder(
            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
            .append(" when emptying old changes").toMessage());
      }
    }
    return broker;
  }
@@ -313,32 +285,6 @@
      broker.setSoTimeout(timeout);
    checkConnection(30, broker, port); // give some time to the broker to connect
                                       // to the replicationServer.
    if (emptyOldChanges)
    {
      // loop receiving update until there is nothing left
      // to make sure that message from previous tests have been consumed.
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      {
        logError(new MessageBuilder(
            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
            .append(" when emptying old changes").toMessage());
      }
    }
    return broker;
  }
  */
@@ -435,17 +381,6 @@
      boolean emptyOldChanges)
      throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
        getGenerationId(baseDn));
  }
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, int serverId, int window_size,
        int port, int timeout, int maxSendQueue, int maxRcvQueue,
        boolean emptyOldChanges, long generationId)
            throws Exception, SocketException
  {
    ServerState state = new ServerState();
    if (emptyOldChanges)
@@ -453,37 +388,13 @@
    ReplicationBroker broker = new ReplicationBroker(null,
        state, baseDn.toNormalizedString(), serverId, window_size,
        generationId, 0, getReplSessionSecurity(), (byte)1, 500);
        getGenerationId(baseDn), 0, getReplSessionSecurity(), (byte)1, 500);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    checkConnection(30, broker, port);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    if (emptyOldChanges)
    {
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      { }
    }
    return broker;
  }
@@ -575,11 +486,14 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
      " ##### Calling ReplicationTestCase.classCleanUp ##### "));
    // Clean RS databases
    cleanUpReplicationServersDB();
    cleanConfigEntries();
    configEntryList = null;
    configEntryList = new LinkedList<DN>();
    cleanRealEntries();
    entryList = null;
    entryList = new LinkedList<DN>();
    // Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING)
    // (in case our test created some emtries in it)
@@ -631,6 +545,10 @@
    assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-server)",
      "Found unexpected replication server config left");
    // Be sure that no replication server instance is left
    List<ReplicationServer> allRSInstances = ReplicationServer.getAllInstances();
    assertTrue(allRSInstances.size() == 0, "Some replication servers left: " + allRSInstances);
    // Check for config entries for replication domain
    assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-domain)",
      "Found unexpected replication domain config left");
@@ -648,6 +566,17 @@
  }
  /**
   * Cleanup databases of the currently instantiated replication servers in the
   * VM
   */
  protected void cleanUpReplicationServersDB() {
    for (ReplicationServer rs : ReplicationServer.getAllInstances()) {
      rs.clearDb();
    }
  }
  /**
   * Performs a search on the config backend with the specified filter.
   * Fails if a config entry is found.
   * @param filter The filter to apply for the search
@@ -1266,4 +1195,90 @@
      // done
    }
  }
  /**
   * Wait for the arrival of a specific message type on the provided session
   * before going in timeout and failing.
   * @param session Session from which we should receive the message.
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      try
      {
        replMsg = session.receive();
      } catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : " +
          ex.getClass().getName()  + " : " + ex.getMessage());
      }
      // Get message type
      String rcvMsgType = replMsg.getClass().getName();
      if (rcvMsgType.equals(msgType))
      {
        // Ok, got it, let's return the expected message
        return replMsg;
      }
      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
      nMsg++;
      curTime = System.currentTimeMillis();
    }
    // Timeout
    fail("Failed to receive an expected " + msgType +
      " message after 5 seconds : also received " + nMsg +
      " other messages during wait time.");
    return null;
  }
  /**
   * Wait for the arrival of a specific message type on the provided broker
   * before going in timeout and failing.
   * @param broker Broker from which we should receive the message.
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      try
      {
        replMsg = broker.receive();
      } catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : " +
          ex.getClass().getName()  + " : " + ex.getMessage());
      }
      // Get message type
      String rcvMsgType = replMsg.getClass().getName();
      if (rcvMsgType.equals(msgType))
      {
        // Ok, got it, let's return the expected message
        return replMsg;
      }
      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
      nMsg++;
      curTime = System.currentTimeMillis();
    }
    // Timeout
    fail("Failed to receive an expected " + msgType +
      " message after 5 seconds : also received " + nMsg +
      " other messages during wait time.");
    return null;
  }
}