mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
10.05.2006 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -27,6 +27,7 @@
package org.opends.server.synchronization;
import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -53,6 +54,8 @@
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
@@ -139,96 +142,100 @@
  @Test(enabled=true, groups="slow")
  public void fromServertoBroker() throws Exception
  {
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "Starting Synchronization StressTest : fromServertoBroker" , 1);
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    final int TOTAL_MESSAGES = 1000;
    cleanEntries();
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 18);
    DirectoryServer.registerMonitorProvider(this);
    try {
    /*
     * loop receiving update until there is nothing left
     * to make sure that message from previous tests have been consumed.
     */
    try
    {
      while (true)
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        broker.receive();
        while (true)
        {
          broker.receive();
        }
      }
     }
    catch (Exception e)
    { }
    /*
     * Test that operations done on this server are sent to the
     * changelog server and forwarded to our changelog broker session.
     */
      catch (Exception e)
      { }
      /*
       * Test that operations done on this server are sent to the
       * changelog server and forwarded to our changelog broker session.
       */
    // Create an Entry (add operation) that will be later used in the test.
    Entry tmp = personEntry.duplicate();
    AddOperation addOp = new AddOperation(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null, tmp.getDN(),
        tmp.getObjectClasses(), tmp.getUserAttributes(),
        tmp.getOperationalAttributes());
    addOp.run();
    entryList.add(personEntry);
    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
      "The Add Entry operation failed");
      // Create an Entry (add operation) that will be later used in the test.
      Entry tmp = personEntry.duplicate();
      AddOperation addOp = new AddOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
          .nextMessageID(), null, tmp.getDN(),
          tmp.getObjectClasses(), tmp.getUserAttributes(),
          tmp.getOperationalAttributes());
      addOp.run();
      entryList.add(personEntry);
      assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
        "The Add Entry operation failed");
    // Check if the client has received the msg
    SynchronizationMessage msg = broker.receive();
    assertTrue(msg instanceof AddMsg,
      "The received synchronization message is not an ADD msg");
    AddMsg addMsg =  (AddMsg) msg;
      // Check if the client has received the msg
      SynchronizationMessage msg = broker.receive();
      assertTrue(msg instanceof AddMsg,
        "The received synchronization message is not an ADD msg");
      AddMsg addMsg =  (AddMsg) msg;
    Operation receivedOp = addMsg.createOperation(connection);
    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
      "The received synchronization message is not an ADD msg");
      Operation receivedOp = addMsg.createOperation(connection);
      assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
        "The received synchronization message is not an ADD msg");
    assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
      "The received ADD synchronization message is not for the excepted DN");
      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
        "The received ADD synchronization message is not for the excepted DN");
    reader = new BrokerReader(broker);
    reader.start();
      reader = new BrokerReader(broker);
      reader.start();
    long startTime = TimeThread.getTime();
    int count = TOTAL_MESSAGES;
      long startTime = TimeThread.getTime();
      int count = TOTAL_MESSAGES;
    // Create a number of writer thread that will loop modifying the entry
    List<Thread> writerThreadList = new LinkedList<Thread>();
    for (int n = 0; n < 1; n++)
    {
      BrokerWriter writer = new BrokerWriter(count);
      writerThreadList.add(writer);
    }
    for (Thread thread : writerThreadList)
    {
      thread.start();
    }
    // wait for all the threads to finish.
    for (Thread thread : writerThreadList)
    {
      thread.join();
    }
      // Create a number of writer thread that will loop modifying the entry
      List<Thread> writerThreadList = new LinkedList<Thread>();
      for (int n = 0; n < 1; n++)
      {
        BrokerWriter writer = new BrokerWriter(count);
        writerThreadList.add(writer);
      }
      for (Thread thread : writerThreadList)
      {
        thread.start();
      }
      // wait for all the threads to finish.
      for (Thread thread : writerThreadList)
      {
        thread.join();
      }
    long afterSendTime = TimeThread.getTime();
      long afterSendTime = TimeThread.getTime();
    int rcvCount = reader.getCount();
    long afterReceiveTime = TimeThread.getTime();
      int rcvCount = reader.getCount();
      long afterReceiveTime = TimeThread.getTime();
    if (rcvCount != TOTAL_MESSAGES)
    {
      fail("some messages were lost : expected : " +TOTAL_MESSAGES +
           " received : " + rcvCount);
    }
      if (rcvCount != TOTAL_MESSAGES)
      {
        fail("some messages were lost : expected : " +TOTAL_MESSAGES +
            " received : " + rcvCount);
      }
    }
    finally {
    DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
    broker.stop();
      DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
      broker.stop();
    }
  }
@@ -393,7 +400,7 @@
    ServerState state = new ServerState(baseDn);
    state.loadState();
    ChangelogBroker broker = new ChangelogBroker(state, baseDn,
                                                 serverId, 0, 0, 0, 0);
                                                 serverId, 0, 0, 0, 0, 100);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:8989");
    broker.start(servers);
@@ -441,7 +448,7 @@
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the syncrhonized server");
        "Unable to add the synchronized server");
    entryList.add(synchroServerEntry);
  }
@@ -553,7 +560,9 @@
      {
        while (true)
        {
          broker.receive();
          SynchronizationMessage msg = broker.receive();
          if (msg == null)
            break;
          count ++;
        }
      } catch (Exception e) {
@@ -577,7 +586,7 @@
          return count;
        try
        {
          this.wait();
          this.wait(60);
          return count;
        } catch (InterruptedException e)
        {