mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
23.00.2006 23f9030daa0f0d1585cca5550c52b8ed387ef151
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -29,11 +29,9 @@
import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -44,14 +42,11 @@
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.plugin.PersistentServerState;
import org.opends.server.synchronization.protocol.AddMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.Attribute;
@@ -66,8 +61,6 @@
import org.opends.server.types.ModificationType;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -81,58 +74,26 @@
    "Synchronization Stress Test";
  /**
   * The internal connection used for operation
   */
  private InternalClientConnection connection;
  /**
   * Created entries that need to be deleted for cleanup
   */
  private ArrayList<Entry> entryList = new ArrayList<Entry>();
  /**
   * The Synchronization config manager entry
   */
  private String synchroStringDN;
  /**
   * The synchronization plugin entry
   */
  private String synchroPluginStringDN;
  private Entry synchroPluginEntry;
  /**
   * The Server synchro entry
   */
  private String synchroServerStringDN;
  private Entry synchroServerEntry;
  /**
   * The Change log entry
   */
  private String changeLogStringDN;
  private Entry changeLogEntry;
  private BrokerReader reader = null;
  /**
   * A "person" entry
   */
  private Entry personEntry;
  /**
   * schema check flag
   */
  private boolean schemaCheck;
  // WORKAROUND FOR BUG #639 - BEGIN -
  /**
   *
   */
  MultimasterSynchronization mms;
  private BrokerReader reader = null;
  protected Entry personEntry;
  // WORKAROUND FOR BUG #639 - END -
@@ -150,7 +111,7 @@
    final int TOTAL_MESSAGES = 1000;
    cleanEntries();
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 18);
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 18, 100);
    Monitor monitor = new Monitor("stress test monitor");
    DirectoryServer.registerMonitorProvider(monitor);
@@ -181,7 +142,7 @@
          tmp.getObjectClasses(), tmp.getUserAttributes(),
          tmp.getOperationalAttributes());
      addOp.run();
      entryList.add(personEntry);
      entryList.add(personEntry.getDN());
      assertTrue(DirectoryServer.entryExists(personEntry.getDN()),
        "The Add Entry operation failed");
@@ -201,7 +162,6 @@
      reader = new BrokerReader(broker);
      reader.start();
      long startTime = TimeThread.getTime();
      int count = TOTAL_MESSAGES;
      // Create a number of writer thread that will loop modifying the entry
@@ -221,11 +181,7 @@
        thread.join();
      }
      long afterSendTime = TimeThread.getTime();
      int rcvCount = reader.getCount();
      long afterReceiveTime = TimeThread.getTime();
      if (rcvCount != TOTAL_MESSAGES)
      {
@@ -253,13 +209,13 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Create an internal connection
    connection = new InternalClientConnection();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = new InternalClientConnection();
    // Create backend top level entries
    String[] topEntries = new String[2];
    topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
@@ -277,7 +233,7 @@
          entry.getUserAttributes(), entry.getOperationalAttributes());
      addOp.setInternalOperation(true);
      addOp.run();
      entryList.add(entry);
      entryList.add(entry.getDN());
    }
    // top level synchro provider
@@ -335,47 +291,6 @@
  }
  /**
   * Clean up the environment. return null;
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @AfterClass
  public void classCleanUp() throws Exception
  {
    DirectoryServer.setCheckSchema(schemaCheck);
    // WORKAROUND FOR BUG #639 - BEGIN -
    DirectoryServer.deregisterSynchronizationProvider(mms);
    mms.finalizeSynchronizationProvider();
    // WORKAROUND FOR BUG #639 - END -
    cleanEntries();
  }
  /**
   * suppress all the entries created by the tests in this class
   */
  private void cleanEntries()
  {
    DeleteOperation op;
    // Delete entries
    Entry entries[] = entryList.toArray(new Entry[0]);
    for (int i = entries.length - 1; i != 0; i--)
    {
      try
      {
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            entries[i].getDN());
        op.run();
      } catch (Exception e)
      {
      }
    }
  }
  /**
   * @return
   */
  private List<Modification> generatemods(String attrName, String attrValue)
@@ -391,68 +306,6 @@
    return mods;
  }
  /**
   * Open a changelog session to the local Changelog server.
   *
   */
  private ChangelogBroker openChangelogSession(final DN baseDn, short serverId)
          throws Exception, SocketException
  {
    PersistentServerState state = new PersistentServerState(baseDn);
    state.loadState();
    ChangelogBroker broker = new ChangelogBroker(state, baseDn,
                                                 serverId, 0, 0, 0, 0, 100);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:8989");
    broker.start(servers);
    broker.setSoTimeout(5000);
    return broker;
  }
  /**
   * Configure the Synchronization for this test.
   */
  private void configureSynchronization() throws Exception
  {
    //
    // Add the Multimaster synchronization plugin
    DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null);
    entryList.add(synchroPluginEntry);
    assertNotNull(DirectoryServer.getConfigEntry(DN
        .decode(synchroPluginStringDN)),
        "Unable to add the Multimaster synchronization plugin");
    // WORKAROUND FOR BUG #639 - BEGIN -
    DN dn = DN.decode(synchroPluginStringDN);
    ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
    mms = new MultimasterSynchronization();
    try
    {
      mms.initializeSynchronizationProvider(mmsConfigEntry);
    }
    catch (ConfigException e)
    {
      assertTrue(false,
          "Unable to initialize the Multimaster synchronization plugin");
    }
    DirectoryServer.registerSynchronizationProvider(mms);
    // WORKAROUND FOR BUG #639 - END -
    //
    // Add the changelog server
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
        "Unable to add the changeLog server");
    entryList.add(changeLogEntry);
    //
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
    entryList.add(synchroServerEntry);
  }
  private class BrokerWriter extends Thread
  {
    int count;