mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
16.02.2006 4cbe378caee0ee7389b11e2177010a6f80c1ea9c
Add a synchronization stress test :
The test creates a number of modification on the LDAP server and checks that the correct number
of modify Messages are received by another thread that has subscribed to the changelog
service using the changelogbroker API.

It is possible to use several threads to create the modifications.

This test can take some time and is therefore tagegd as a slow test and will
only be run when the testall ant target is invoked.
1 files added
2 files modified
604 ■■■■■ changed files
opends/src/server/org/opends/server/changelog/Changelog.java 3 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/ChangelogBroker.java 3 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java 598 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/changelog/Changelog.java
@@ -169,6 +169,9 @@
   */
  public Changelog(ConfigEntry config) throws ConfigException
  {
    shutdown = false;
    runListen = true;
    IntegerConfigAttribute changelogPortAttr =
      (IntegerConfigAttribute) config.getConfigAttribute(changelogPortStub);
    /* if there is no changelog port configured, this process must not be a
opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -179,9 +179,10 @@
          /*
           * Read the ChangelogStartMessage that should come back.
           * TODO : should have a timeout here.
           */
          session.setSoTimeout(1000);
          startMsg = (ChangelogStartMessage) session.receive();
          session.setSoTimeout(0);
          /*
           * We must not publish changes to a changelog that has not
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
New file
@@ -0,0 +1,598 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Test the contructors, encoders and decoders of the synchronization AckMsg,
 * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
 */
public class StressTest extends MonitorProvider
{
  private static final String SYNCHRONIZATION_STRESS_TEST =
    "Synchronization Stress Test";
  public StressTest()
  {
    super("synchronization Stress Test");
  }
  /**
   * The internal connection used for operation
   */
  private InternalClientConnection connection;
  /**
   * Created entries that need to be deleted for cleanup
   */
  private ArrayList<Entry> entryList = new ArrayList<Entry>();
  /**
   * The Synchronization config manager entry
   */
  private String synchroStringDN;
  /**
   * The synchronization plugin entry
   */
  private String synchroPluginStringDN;
  private Entry synchroPluginEntry;
  /**
   * The Server synchro entry
   */
  private String synchroServerStringDN;
  private Entry synchroServerEntry;
  /**
   * The Change log entry
   */
  private String changeLogStringDN;
  private Entry changeLogEntry;
  /**
   * A "person" entry
   */
  private Entry personEntry;
  /**
   * schema check flag
   */
  private boolean schemaCheck;
  // WORKAROUND FOR BUG #639 - BEGIN -
  /**
   *
   */
  MultimasterSynchronization mms;
  private BrokerReader reader = null;
  // WORKAROUND FOR BUG #639 - END -
  /**
   * Stress test from LDAP server to client using the ChangelogBroker API.
   */
  @Test(enabled=true, groups="slow")
  public void fromServertoBroker() throws Exception
  {
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    final int TOTAL_MESSAGES = 1000;
    cleanEntries();
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
    DirectoryServer.registerMonitorProvider(this);
    try {
    /*
     * loop receiving update until there is nothing left
     * to make sure that message from previous tests have been consumed.
     */
    try
    {
      while (true)
      {
        broker.receive();
      }
     }
    catch (Exception e)
    { }
    /*
     * Test that operations done on this server are sent to the
     * changelog server and forwarded to our changelog broker session.
     */
    // Create an Entry (add operation) that will be later used in the test.
    Entry tmp = personEntry.duplicate();
    AddOperation addOp = new AddOperation(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null, tmp.getDN(),
        tmp.getObjectClasses(), tmp.getUserAttributes(),
        tmp.getOperationalAttributes());
    addOp.run();
    entryList.add(personEntry);
    assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
      "The Add Entry operation failed");
    // Check if the client has received the msg
    SynchronizationMessage msg = broker.receive();
    assertTrue(msg instanceof AddMsg,
      "The received synchronization message is not an ADD msg");
    AddMsg addMsg =  (AddMsg) msg;
    Operation receivedOp = addMsg.createOperation(connection);
    assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
      "The received synchronization message is not an ADD msg");
    assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
      "The received ADD synchronization message is not for the excepted DN");
    reader = new BrokerReader(broker);
    reader.start();
    long startTime = TimeThread.getTime();
    int count = TOTAL_MESSAGES;
    // Create a number of writer thread that will loop modifying the entry
    List<Thread> writerThreadList = new LinkedList<Thread>();
    for (int n = 0; n < 1; n++)
    {
      BrokerWriter writer = new BrokerWriter(count);
      writerThreadList.add(writer);
    }
    for (Thread thread : writerThreadList)
    {
      thread.start();
    }
    // wait for all the threads to finish.
    for (Thread thread : writerThreadList)
    {
      thread.join();
    }
    long afterSendTime = TimeThread.getTime();
    int rcvCount = reader.getCount();
    long afterReceiveTime = TimeThread.getTime();
    if (rcvCount != TOTAL_MESSAGES)
    {
      fail("some messages were lost : expected : " +TOTAL_MESSAGES +
           " received : " + rcvCount);
    }
    System.out.println("Sent " + TOTAL_MESSAGES + " in " +
                       (afterSendTime-startTime)/1000 + "seconds.\n");
    System.out.println("Received " + TOTAL_MESSAGES + " in " +
                       (afterReceiveTime-afterSendTime)/1000 + "seconds.\n");
    }
    finally {
    DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
    broker.stop();
    }
  }
  /**
   * Set up the environment for performing the tests in this Class.
   * synchronization
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  public void setUp() throws Exception
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = new InternalClientConnection();
    // Create backend top level entries
    String[] topEntries = new String[2];
    topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
        + "objectClass: domain\n";
    topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
    Entry entry;
    for (int i = 0; i < topEntries.length; i++)
    {
      entry = TestCaseUtils.entryFromLdifString(topEntries[i]);
      AddOperation addOp = new AddOperation(connection,
          InternalClientConnection.nextOperationID(), InternalClientConnection
              .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
          entry.getUserAttributes(), entry.getOperationalAttributes());
      addOp.setInternalOperation(true);
      addOp.run();
      entryList.add(entry);
    }
    // top level synchro provider
    synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Multimaster Synchro plugin
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
        + synchroStringDN;
    String synchroPluginLdif = "dn: "
        + synchroPluginStringDN
        + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider\n"
        + "ds-cfg-synchronization-provider-enabled: true\n"
        + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
    synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
    // Change log
    changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
        + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
        + "ds-cfg-changelog-server-id: 1\n";
    changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    // suffix synchronized
    synchroServerStringDN = "cn=example, " + synchroPluginStringDN;
    String synchroServerLdif = "dn: " + synchroServerStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-synchronization-provider-config\n"
        + "cn: example\n"
        + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
        + "ds-cfg-changelog-server: localhost:8989\n"
        + "ds-cfg-directory-server-id: 1\n" + "ds-cfg-receive-status: true\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: person\n"
        + "objectClass: organizationalPerson\n"
        + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
        + "homePhone: 951-245-7634\n"
        + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
        + "mobile: 027-085-0537\n"
        + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
        + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
        + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
        + "street: 17984 Thirteenth Street\n"
        + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
        + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
        + "userPassword: password\n" + "initials: AA\n";
    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
    configureSynchronization();
  }
  /**
   * Clean up the environment. return null;
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @AfterClass
  public void classCleanUp() throws Exception
  {
    DirectoryServer.setCheckSchema(schemaCheck);
    // WORKAROUND FOR BUG #639 - BEGIN -
    DirectoryServer.deregisterSynchronizationProvider(mms);
    mms.finalizeSynchronizationProvider();
    // WORKAROUND FOR BUG #639 - END -
    cleanEntries();
  }
  /**
   * suppress all the entries created by the tests in this class
   */
  private void cleanEntries()
  {
    DeleteOperation op;
    // Delete entries
    Entry entries[] = entryList.toArray(new Entry[0]);
    for (int i = entries.length - 1; i != 0; i--)
    {
      try
      {
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            entries[i].getDN());
        op.run();
      } catch (Exception e)
      {
      }
    }
  }
  /**
   * @return
   */
  private List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  /**
   * Open a changelog session to the local Changelog server.
   *
   */
  private ChangelogBroker openChangelogSession(final DN baseDn, short serverId)
          throws Exception, SocketException
  {
    ServerState state = new ServerState(baseDn);
    state.loadState();
    ChangelogBroker broker = new ChangelogBroker(state, baseDn,
                                                 serverId, 0, 0, 0, 0);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:8989");
    broker.start(servers);
    broker.setSoTimeout(5000);
    return broker;
  }
  /**
   * Configure the Synchronization for this test.
   */
  private void configureSynchronization() throws Exception
  {
    //
    // Add the Multimaster synchronization plugin
    DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null);
    entryList.add(synchroPluginEntry);
    assertNotNull(DirectoryServer.getConfigEntry(DN
        .decode(synchroPluginStringDN)),
        "Unable to add the Multimaster synchronization plugin");
    // WORKAROUND FOR BUG #639 - BEGIN -
    DN dn = DN.decode(synchroPluginStringDN);
    ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
    mms = new MultimasterSynchronization();
    try
    {
      mms.initializeSynchronizationProvider(mmsConfigEntry);
    }
    catch (ConfigException e)
    {
      assertTrue(false,
          "Unable to initialize the Multimaster synchronization plugin");
    }
    DirectoryServer.registerSynchronizationProvider(mms);
    // WORKAROUND FOR BUG #639 - END -
    //
    // Add the changelog server
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
        "Unable to add the changeLog server");
    entryList.add(changeLogEntry);
    //
    // We also have a replicated suffix (synchronization domain)
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the syncrhonized server");
    entryList.add(synchroServerEntry);
  }
  @Override
  public List<Attribute> getMonitorData()
  {
    Attribute attr;
    if (reader == null)
      attr = new Attribute("received-messages", "not yet started");
    else
      attr = new Attribute("received-messages",
                           String.valueOf(reader.getCurrentCount()));
    List<Attribute>  list = new LinkedList<Attribute>();
    list.add(attr);
    attr = new Attribute("base-dn", "ou=People,dc=example,dc=com");
    list.add(attr);
    return list;
  }
  @Override
  public String getMonitorInstanceName()
  {
    return SYNCHRONIZATION_STRESS_TEST;
  }
  @Override
  public long getUpdateInterval()
  {
    // we don't wont to do polling on this monitor
    return 0;
  }
  @Override
  public void initializeMonitorProvider(ConfigEntry configEntry)
  throws ConfigException, InitializationException
  {
    // nothing to do
  }
  @Override
  public void updateMonitorData()
  {
    // nothing to do
  }
  private class BrokerWriter extends Thread
  {
    int count;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerWriter(int count)
    {
      this.count = count;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      while (count>0)
      {
        count--;
        // must generate the mods for every operation because they are modified
        // by processModify.
        List<Modification> mods = generatemods("telephonenumber", "01 02 45");
        ModifyOperation modOp =
          connection.processModify(personEntry.getDN(), mods);
        assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
      }
    }
  }
  /**
   * Continuously reads messages from a changelog broker until there is nothing
   * left. Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    private int count = 0;
    private Boolean finished = false;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    {
      this.broker = broker;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // loop receiving messages until either we get a timeout
      // because there is nothing left or an error condition happens.
      try
      {
        while (true)
        {
          broker.receive();
          count ++;
        }
      } catch (Exception e) {
        synchronized (this)
        {
          finished = true;
          this.notify();
        }
      }
    }
    /**
     * wait until the thread has finished its job then return the number of
     * received messages.
     */
    public int getCount()
    {
      synchronized (this)
      {
        if (finished == true)
          return count;
        try
        {
          this.wait();
          return count;
        } catch (InterruptedException e)
        {
          return -1;
        }
      }
    }
    public int getCurrentCount()
    {
      return count;
    }
  }
}