From 4cbe378caee0ee7389b11e2177010a6f80c1ea9c Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 16 Oct 2006 13:02:50 +0000
Subject: [PATCH] Add a synchronization stress test : The test creates a number of modification on the LDAP server and checks that the correct number of modify Messages are received by another thread that has subscribed to the changelog service using the changelogbroker API. It is possible to use several threads to create the modifications.
---
opends/src/server/org/opends/server/changelog/Changelog.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java | 598 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
opends/src/server/org/opends/server/synchronization/ChangelogBroker.java | 3
3 files changed, 603 insertions(+), 1 deletions(-)
diff --git a/opends/src/server/org/opends/server/changelog/Changelog.java b/opends/src/server/org/opends/server/changelog/Changelog.java
index dd7006a..6660b1b 100644
--- a/opends/src/server/org/opends/server/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/changelog/Changelog.java
@@ -169,6 +169,9 @@
*/
public Changelog(ConfigEntry config) throws ConfigException
{
+ shutdown = false;
+ runListen = true;
+
IntegerConfigAttribute changelogPortAttr =
(IntegerConfigAttribute) config.getConfigAttribute(changelogPortStub);
/* if there is no changelog port configured, this process must not be a
diff --git a/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index b265bb3..c9a0605 100644
--- a/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -179,9 +179,10 @@
/*
* Read the ChangelogStartMessage that should come back.
- * TODO : should have a timeout here.
*/
+ session.setSoTimeout(1000);
startMsg = (ChangelogStartMessage) session.receive();
+ session.setSoTimeout(0);
/*
* We must not publish changes to a changelog that has not
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
new file mode 100644
index 0000000..218cc0c
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -0,0 +1,598 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.config.ConfigEntry;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ModifyOperation;
+import org.opends.server.core.Operation;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.OperationType;
+import org.opends.server.types.ResultCode;
+import org.opends.server.util.TimeThread;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test the contructors, encoders and decoders of the synchronization AckMsg,
+ * ModifyMsg, ModifyDnMsg, AddMsg and Delete Msg
+ */
+public class StressTest extends MonitorProvider
+{
+ private static final String SYNCHRONIZATION_STRESS_TEST =
+ "Synchronization Stress Test";
+
+ public StressTest()
+ {
+ super("synchronization Stress Test");
+ }
+
+ /**
+ * The internal connection used for operation
+ */
+ private InternalClientConnection connection;
+
+ /**
+ * Created entries that need to be deleted for cleanup
+ */
+ private ArrayList<Entry> entryList = new ArrayList<Entry>();
+
+ /**
+ * The Synchronization config manager entry
+ */
+ private String synchroStringDN;
+
+ /**
+ * The synchronization plugin entry
+ */
+ private String synchroPluginStringDN;
+
+ private Entry synchroPluginEntry;
+
+ /**
+ * The Server synchro entry
+ */
+ private String synchroServerStringDN;
+
+ private Entry synchroServerEntry;
+
+ /**
+ * The Change log entry
+ */
+ private String changeLogStringDN;
+
+ private Entry changeLogEntry;
+
+ /**
+ * A "person" entry
+ */
+ private Entry personEntry;
+
+ /**
+ * schema check flag
+ */
+ private boolean schemaCheck;
+
+ // WORKAROUND FOR BUG #639 - BEGIN -
+ /**
+ *
+ */
+ MultimasterSynchronization mms;
+
+ private BrokerReader reader = null;
+
+ // WORKAROUND FOR BUG #639 - END -
+
+ /**
+ * Stress test from LDAP server to client using the ChangelogBroker API.
+ */
+ @Test(enabled=true, groups="slow")
+ public void fromServertoBroker() throws Exception
+ {
+
+ final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+ final int TOTAL_MESSAGES = 1000;
+ cleanEntries();
+
+ ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+ DirectoryServer.registerMonitorProvider(this);
+
+ try {
+ /*
+ * loop receiving update until there is nothing left
+ * to make sure that message from previous tests have been consumed.
+ */
+ try
+ {
+ while (true)
+ {
+ broker.receive();
+ }
+ }
+ catch (Exception e)
+ { }
+ /*
+ * Test that operations done on this server are sent to the
+ * changelog server and forwarded to our changelog broker session.
+ */
+
+ // Create an Entry (add operation) that will be later used in the test.
+ Entry tmp = personEntry.duplicate();
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, tmp.getDN(),
+ tmp.getObjectClasses(), tmp.getUserAttributes(),
+ tmp.getOperationalAttributes());
+ addOp.run();
+ entryList.add(personEntry);
+ assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+ "The Add Entry operation failed");
+
+ // Check if the client has received the msg
+ SynchronizationMessage msg = broker.receive();
+ assertTrue(msg instanceof AddMsg,
+ "The received synchronization message is not an ADD msg");
+ AddMsg addMsg = (AddMsg) msg;
+
+ Operation receivedOp = addMsg.createOperation(connection);
+ assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+ "The received synchronization message is not an ADD msg");
+
+ assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+ "The received ADD synchronization message is not for the excepted DN");
+
+ reader = new BrokerReader(broker);
+ reader.start();
+
+ long startTime = TimeThread.getTime();
+ int count = TOTAL_MESSAGES;
+
+ // Create a number of writer thread that will loop modifying the entry
+ List<Thread> writerThreadList = new LinkedList<Thread>();
+ for (int n = 0; n < 1; n++)
+ {
+ BrokerWriter writer = new BrokerWriter(count);
+ writerThreadList.add(writer);
+ }
+ for (Thread thread : writerThreadList)
+ {
+ thread.start();
+ }
+ // wait for all the threads to finish.
+ for (Thread thread : writerThreadList)
+ {
+ thread.join();
+ }
+
+ long afterSendTime = TimeThread.getTime();
+
+ int rcvCount = reader.getCount();
+ long afterReceiveTime = TimeThread.getTime();
+
+ if (rcvCount != TOTAL_MESSAGES)
+ {
+ fail("some messages were lost : expected : " +TOTAL_MESSAGES +
+ " received : " + rcvCount);
+ }
+ System.out.println("Sent " + TOTAL_MESSAGES + " in " +
+ (afterSendTime-startTime)/1000 + "seconds.\n");
+ System.out.println("Received " + TOTAL_MESSAGES + " in " +
+ (afterReceiveTime-afterSendTime)/1000 + "seconds.\n");
+
+ }
+ finally {
+ DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
+ broker.stop();
+ }
+ }
+
+ /**
+ * Set up the environment for performing the tests in this Class.
+ * synchronization
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @BeforeClass
+ public void setUp() throws Exception
+ {
+ // This test suite depends on having the schema available.
+ TestCaseUtils.startServer();
+
+ // Disable schema check
+ schemaCheck = DirectoryServer.checkSchema();
+ DirectoryServer.setCheckSchema(false);
+
+ // Create an internal connection
+ connection = new InternalClientConnection();
+
+ // Create backend top level entries
+ String[] topEntries = new String[2];
+ topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
+ + "objectClass: domain\n";
+ topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
+ + "objectClass: organizationalUnit\n"
+ + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
+ Entry entry;
+ for (int i = 0; i < topEntries.length; i++)
+ {
+ entry = TestCaseUtils.entryFromLdifString(topEntries[i]);
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
+ entry.getUserAttributes(), entry.getOperationalAttributes());
+ addOp.setInternalOperation(true);
+ addOp.run();
+ entryList.add(entry);
+ }
+
+ // top level synchro provider
+ synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+ // Multimaster Synchro plugin
+ synchroPluginStringDN = "cn=Multimaster Synchronization, "
+ + synchroStringDN;
+ String synchroPluginLdif = "dn: "
+ + synchroPluginStringDN
+ + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-provider\n"
+ + "ds-cfg-synchronization-provider-enabled: true\n"
+ + "ds-cfg-synchronization-provider-class: org.opends.server.synchronization.MultimasterSynchronization\n";
+ synchroPluginEntry = TestCaseUtils.entryFromLdifString(synchroPluginLdif);
+
+ // Change log
+ changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
+ String changeLogLdif = "dn: " + changeLogStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8989\n"
+ + "ds-cfg-changelog-server-id: 1\n";
+ changeLogEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
+
+ // suffix synchronized
+ synchroServerStringDN = "cn=example, " + synchroPluginStringDN;
+ String synchroServerLdif = "dn: " + synchroServerStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-provider-config\n"
+ + "cn: example\n"
+ + "ds-cfg-synchronization-dn: ou=People,dc=example,dc=com\n"
+ + "ds-cfg-changelog-server: localhost:8989\n"
+ + "ds-cfg-directory-server-id: 1\n" + "ds-cfg-receive-status: true\n";
+ synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+
+ String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+ + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n";
+ personEntry = TestCaseUtils.entryFromLdifString(personLdif);
+
+ configureSynchronization();
+ }
+
+ /**
+ * Clean up the environment. return null;
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @AfterClass
+ public void classCleanUp() throws Exception
+ {
+ DirectoryServer.setCheckSchema(schemaCheck);
+
+ // WORKAROUND FOR BUG #639 - BEGIN -
+ DirectoryServer.deregisterSynchronizationProvider(mms);
+ mms.finalizeSynchronizationProvider();
+ // WORKAROUND FOR BUG #639 - END -
+
+ cleanEntries();
+ }
+
+ /**
+ * suppress all the entries created by the tests in this class
+ */
+ private void cleanEntries()
+ {
+ DeleteOperation op;
+ // Delete entries
+ Entry entries[] = entryList.toArray(new Entry[0]);
+ for (int i = entries.length - 1; i != 0; i--)
+ {
+ try
+ {
+ op = new DeleteOperation(connection, InternalClientConnection
+ .nextOperationID(), InternalClientConnection.nextMessageID(), null,
+ entries[i].getDN());
+ op.run();
+ } catch (Exception e)
+ {
+ }
+ }
+ }
+
+ /**
+ * @return
+ */
+ private List<Modification> generatemods(String attrName, String attrValue)
+ {
+ AttributeType attrType =
+ DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
+ LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
+ values.add(new AttributeValue(attrType, attrValue));
+ Attribute attr = new Attribute(attrType, attrName, values);
+ List<Modification> mods = new ArrayList<Modification>();
+ Modification mod = new Modification(ModificationType.REPLACE, attr);
+ mods.add(mod);
+ return mods;
+ }
+
+ /**
+ * Open a changelog session to the local Changelog server.
+ *
+ */
+ private ChangelogBroker openChangelogSession(final DN baseDn, short serverId)
+ throws Exception, SocketException
+ {
+ ServerState state = new ServerState(baseDn);
+ state.loadState();
+ ChangelogBroker broker = new ChangelogBroker(state, baseDn,
+ serverId, 0, 0, 0, 0);
+ ArrayList<String> servers = new ArrayList<String>(1);
+ servers.add("localhost:8989");
+ broker.start(servers);
+ broker.setSoTimeout(5000);
+ return broker;
+ }
+
+ /**
+ * Configure the Synchronization for this test.
+ */
+ private void configureSynchronization() throws Exception
+ {
+ //
+ // Add the Multimaster synchronization plugin
+ DirectoryServer.getConfigHandler().addEntry(synchroPluginEntry, null);
+ entryList.add(synchroPluginEntry);
+ assertNotNull(DirectoryServer.getConfigEntry(DN
+ .decode(synchroPluginStringDN)),
+ "Unable to add the Multimaster synchronization plugin");
+
+ // WORKAROUND FOR BUG #639 - BEGIN -
+ DN dn = DN.decode(synchroPluginStringDN);
+ ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
+ mms = new MultimasterSynchronization();
+ try
+ {
+ mms.initializeSynchronizationProvider(mmsConfigEntry);
+ }
+ catch (ConfigException e)
+ {
+ assertTrue(false,
+ "Unable to initialize the Multimaster synchronization plugin");
+ }
+ DirectoryServer.registerSynchronizationProvider(mms);
+ // WORKAROUND FOR BUG #639 - END -
+
+ //
+ // Add the changelog server
+ DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
+ "Unable to add the changeLog server");
+ entryList.add(changeLogEntry);
+
+ //
+ // We also have a replicated suffix (synchronization domain)
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ "Unable to add the syncrhonized server");
+ entryList.add(synchroServerEntry);
+ }
+
+ @Override
+ public List<Attribute> getMonitorData()
+ {
+ Attribute attr;
+ if (reader == null)
+ attr = new Attribute("received-messages", "not yet started");
+ else
+ attr = new Attribute("received-messages",
+ String.valueOf(reader.getCurrentCount()));
+ List<Attribute> list = new LinkedList<Attribute>();
+ list.add(attr);
+ attr = new Attribute("base-dn", "ou=People,dc=example,dc=com");
+ list.add(attr);
+ return list;
+ }
+
+ @Override
+ public String getMonitorInstanceName()
+ {
+ return SYNCHRONIZATION_STRESS_TEST;
+ }
+
+ @Override
+ public long getUpdateInterval()
+ {
+ // we don't wont to do polling on this monitor
+ return 0;
+ }
+
+ @Override
+ public void initializeMonitorProvider(ConfigEntry configEntry)
+ throws ConfigException, InitializationException
+ {
+ // nothing to do
+
+ }
+
+ @Override
+ public void updateMonitorData()
+ {
+ // nothing to do
+
+ }
+
+ private class BrokerWriter extends Thread
+ {
+ int count;
+
+ /**
+ * Creates a new Stress Test Reader
+ * @param broker
+ */
+ public BrokerWriter(int count)
+ {
+ this.count = count;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ while (count>0)
+ {
+ count--;
+ // must generate the mods for every operation because they are modified
+ // by processModify.
+ List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+
+ ModifyOperation modOp =
+ connection.processModify(personEntry.getDN(), mods);
+ assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
+ }
+ }
+ }
+
+ /**
+ * Continuously reads messages from a changelog broker until there is nothing
+ * left. Count the number of received messages.
+ */
+ private class BrokerReader extends Thread
+ {
+ private ChangelogBroker broker;
+ private int count = 0;
+ private Boolean finished = false;
+
+ /**
+ * Creates a new Stress Test Reader
+ * @param broker
+ */
+ public BrokerReader(ChangelogBroker broker)
+ {
+ this.broker = broker;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ // loop receiving messages until either we get a timeout
+ // because there is nothing left or an error condition happens.
+ try
+ {
+ while (true)
+ {
+ broker.receive();
+ count ++;
+ }
+ } catch (Exception e) {
+ synchronized (this)
+ {
+ finished = true;
+ this.notify();
+ }
+ }
+ }
+
+ /**
+ * wait until the thread has finished its job then return the number of
+ * received messages.
+ */
+ public int getCount()
+ {
+ synchronized (this)
+ {
+ if (finished == true)
+ return count;
+ try
+ {
+ this.wait();
+ return count;
+ } catch (InterruptedException e)
+ {
+ return -1;
+ }
+ }
+ }
+
+ public int getCurrentCount()
+ {
+ return count;
+ }
+ }
+}
--
Gitblit v1.10.0