From 97c337fae8aa9247da4fd9ea3d7b9974887eb124 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 29 Jan 2007 17:21:53 +0000
Subject: [PATCH] - Fix issue 1160 : synchronization is flushing the msgQueue to the database too often The synchronization server is flushing all the queues of the messages received from a LDAP server each time a server needs to retrieve some changes that are not in memory anymore.
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 2
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java | 35 ++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java | 39 +++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java | 287 +++++++++++++++++++++++++++++++----------
4 files changed, 288 insertions(+), 75 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 4761d06..2dd214f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -328,7 +328,7 @@
List<String> unacceptableReasons)
{
// TODO NYI
- return false;
+ return true;
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
index 0fb35e6..90132ac 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -35,6 +35,7 @@
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
+import java.util.NoSuchElementException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
@@ -206,11 +207,37 @@
throws DatabaseException, Exception
{
/*
- * make sure to flush some changes in the database so that
- * we don't create the iterator on an empty database when the
- * dbHandler has just been started.
+ * When we create an iterator we need to make sure that we
+ * don't miss some changes because the iterator is created
+ * close to the limit of the changed that have not yet been
+ * flushed to the database.
+ * We detect this by comparing the date of the changeNumber where
+ * we want to start with the date of the first ChangeNumber
+ * of the msgQueue.
+ * If this is the case we flush the queue to the database.
*/
- flush();
+ ChangeNumber recentChangeNumber = null;
+
+ if (changeNumber == null)
+ flush();
+
+ synchronized (msgQueue)
+ {
+ try
+ {
+ UpdateMessage msg = msgQueue.getFirst();
+ recentChangeNumber = msg.getChangeNumber();
+ }
+ catch (NoSuchElementException e)
+ {}
+ }
+
+ if ( (recentChangeNumber != null) &&
+ (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2))
+ {
+ flush();
+ }
+
return new ChangelogIterator(serverId, db, changeNumber);
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index fec54b9..1c59158 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -164,6 +164,45 @@
return broker;
}
+
+ /**
+ * Open a changelog session with flow control to the local Changelog server.
+ *
+ */
+ protected ChangelogBroker openChangelogSession(
+ final DN baseDn, short serverId, int window_size,
+ int port, int timeout, int maxSendQueue, int maxRcvQueue,
+ boolean emptyOldChanges)
+ throws Exception, SocketException
+ {
+ PersistentServerState state = new PersistentServerState(baseDn);
+ if (emptyOldChanges)
+ state.loadState();
+ ChangelogBroker broker = new ChangelogBroker(
+ state, baseDn, serverId, maxRcvQueue, 0, maxSendQueue, 0, window_size);
+ ArrayList<String> servers = new ArrayList<String>(1);
+ servers.add("localhost:" + port);
+ broker.start(servers);
+ if (timeout != 0)
+ broker.setSoTimeout(timeout);
+ if (emptyOldChanges)
+ {
+ /*
+ * loop receiving update until there is nothing left
+ * to make sure that message from previous tests have been consumed.
+ */
+ try
+ {
+ while (true)
+ {
+ broker.receive();
+ }
+ }
+ catch (Exception e)
+ { }
+ }
+ return broker;
+ }
/**
* suppress all the entries created by the tests in this class
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index 9639cea..743aa4a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -26,10 +26,11 @@
*/
package org.opends.server.synchronization.changelog;
-import static org.opends.server.synchronization.protocol.OperationContext.SYNCHROCONTEXT;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import static org.opends.server.synchronization.protocol.OperationContext.*;
+
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
@@ -375,11 +376,25 @@
/**
* Stress test from client using the ChangelogBroker API
* to the changelog server.
+ * This test allow to investigate the behaviour of the
+ * Changelog server when it needs to distribute the load of
+ * updates from a single LDAP server to a number of LDAP servers.
+ *
+ * This test i sconfigured by a relatively low stress
+ * but can be changed using TOTAL_MSG and CLIENT_THREADS consts.
*/
- @Test(enabled=false, groups="slow")
- public void stressFromBrokertoChangelog() throws Exception
+ @Test(enabled=true, groups="slow")
+ public void oneWriterMultipleReader() throws Exception
{
ChangelogBroker server = null;
+ int TOTAL_MSG = 1000; // number of messages to send during the test
+ int CLIENT_THREADS = 2; // number of threads that will try to read
+ // the messages
+ ChangeNumberGenerator gen =
+ new ChangeNumberGenerator((short)5 , (long) 0);
+
+ BrokerReader client[] = new BrokerReader[CLIENT_THREADS];
+ ChangelogBroker clientBroker[] = new ChangelogBroker[CLIENT_THREADS];
try
{
@@ -388,24 +403,112 @@
*/
server = openChangelogSession(
DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
- 1000, true);
+ 1000, 1000, 0, true);
BrokerReader reader = new BrokerReader(server);
+
+ /*
+ * Start the client threads.
+ */
+ for (int i =0; i< CLIENT_THREADS; i++)
+ {
+ clientBroker[i] = openChangelogSession(
+ DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort,
+ 1000, true);
+ client[i] = new BrokerReader(clientBroker[i]);
+ }
+
+ for (int i =0; i< CLIENT_THREADS; i++)
+ {
+ client[i].start();
+ }
reader.start();
- ChangeNumberGenerator gen =
- new ChangeNumberGenerator((short)5 , (long) 0);
/*
* Simple loop creating changes and sending them
* to the changelog server.
*/
- for (int i = 0; i< 100000; i++)
+ for (int i = 0; i< TOTAL_MSG; i++)
{
DeleteMsg msg =
new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
"uid");
server.publish(msg);
}
+
+ for (int i =0; i< CLIENT_THREADS; i++)
+ {
+ client[i].join();
+ reader.join();
+ }
+ }
+ finally
+ {
+ if (server != null)
+ server.stop();
+ for (int i =0; i< CLIENT_THREADS; i++)
+ {
+ clientBroker[i].stop();
+ }
+ }
+ }
+
+ /**
+ * Stress test from client using the ChangelogBroker API
+ * to the changelog server.
+ *
+ * This test allow to investigate the behaviour of the
+ * Changelog server when it needs to distribute the load of
+ * updates from multiple LDAP server to a number of LDAP servers.
+ *
+ * This test is sconfigured for a relatively low stress
+ * but can be changed using TOTAL_MSG and THREADS consts.
+ */
+ @Test(enabled=false, groups="slow")
+ public void multipleWriterMultipleReader() throws Exception
+ {
+ ChangelogBroker server = null;
+ final int TOTAL_MSG = 1000; // number of messages to send during the test
+ final int THREADS = 2; // number of threads that will produce
+ // and read the messages.
+
+ BrokerWriter producer[] = new BrokerWriter[THREADS];
+ BrokerReader reader[] = new BrokerReader[THREADS];
+
+ try
+ {
+ /*
+ * Start the producer threads.
+ */
+ for (int i =0; i< THREADS; i++)
+ {
+ short serverId = (short) (10+i);
+ ChangeNumberGenerator gen =
+ new ChangeNumberGenerator(serverId , (long) 0);
+ ChangelogBroker broker =
+ openChangelogSession( DN.decode("dc=example,dc=com"), serverId,
+ 100, changelogPort, 1000, 1000, 0, true);
+
+ producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS);
+ reader[i] = new BrokerReader(broker);
+
+ }
+
+ for (int i =0; i< THREADS; i++)
+ {
+ producer[i].start();
+ }
+
+ for (int i =0; i< THREADS; i++)
+ {
+ reader[i].start();
+ }
+
+ for (int i =0; i< THREADS; i++)
+ {
+ producer[i].join();
+ reader[i].join();
+ }
}
finally
{
@@ -414,57 +517,11 @@
}
}
- /**
- * After the tests stop the changelog server.
- */
- @AfterClass()
- public void shutdown() throws Exception
- {
- if (changelog != null)
- changelog.shutdown();
- }
- /**
- * Continuously reads messages from a changelog broker until there is nothing
- * left. Count the number of received messages.
- */
- private class BrokerReader extends Thread
- {
- private ChangelogBroker broker;
- /**
- * Creates a new Stress Test Reader
- * @param broker
- */
- public BrokerReader(ChangelogBroker broker)
- {
- this.broker = broker;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run()
- {
- // loop receiving messages until either we get a timeout
- // because there is nothing left or an error condition happens.
- try
- {
- while (true)
- {
- SynchronizationMessage msg = broker.receive();
- if (msg == null)
- break;
- }
- } catch (Exception e) {
- }
- }
- }
-
/**
* Chaining tests of the changelog code with 2 changelog servers involved
* 2 tests are done here (itest=0 or itest=1)
- *
+ *
* Test 1
* - Create changelog server 1
* - Create changelog server 2 connected with changelog server 1
@@ -472,7 +529,7 @@
* - Create and connect client 2 to changelog server 2
* - Make client1 publish changes
* - Check that client 2 receives the changes published by client 1
- *
+ *
* Test 2
* - Create changelog server 1
* - Create and connect client1 to changelog server 1
@@ -480,7 +537,7 @@
* - Create changelog server 2 connected with changelog server 1
* - Create and connect client 2 to changelog server 2
* - Check that client 2 receives the changes published by client 1
- *
+ *
*/
@Test(enabled=true)
public void changelogChaining() throws Exception
@@ -520,8 +577,8 @@
String changelogLdif = "dn: cn=Changelog Server\n"
+ "objectClass: top\n"
+ "objectClass: ds-cfg-synchronization-changelog-server-config\n"
- + "cn: Changelog Server\n"
- + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
+ + "cn: Changelog Server\n"
+ + "ds-cfg-changelog-port: " + changelogPorts[i] + "\n"
+ "ds-cfg-changelog-server: localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]) + "\n"
+ "ds-cfg-changelog-server-id: " + changelogIds[0] + "\n"
+ "ds-cfg-window-size: 100" + "\n"
@@ -535,17 +592,17 @@
try
{
- // For itest=0, create and connect client1 to changelog1
+ // For itest=0, create and connect client1 to changelog1
// and client2 to changelog2
- // For itest=1, only create and connect client1 to changelog1
+ // For itest=1, only create and connect client1 to changelog1
// client2 will be created later
broker1 = openChangelogSession(DN.decode("dc=example,dc=com"),
- (short) brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
+ brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
if (itest == 0)
{
broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
- (short) brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
+ brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
}
// - Test messages between clients by publishing now
@@ -553,7 +610,7 @@
// - Delete
long time = TimeThread.getTime();
int ts = 1;
- ChangeNumber cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+ ChangeNumber cn = new ChangeNumber(time, ts++, brokerIds[0]);
DeleteMsg delMsg = new DeleteMsg("o=test"+itest+",dc=example,dc=com", cn, "uid");
broker1.publish(delMsg);
@@ -566,7 +623,7 @@
+ "objectClass: top\n" + "objectClass: domain\n"
+ "entryUUID: 11111111-1111-1111-1111-111111111111\n");
Entry entry = TestCaseUtils.entryFromLdifString(lentry);
- cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+ cn = new ChangeNumber(time, ts++, brokerIds[0]);
AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
.getAttributes(), new ArrayList<Attribute>());
@@ -577,13 +634,13 @@
Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
List<Modification> mods = new ArrayList<Modification>();
mods.add(mod1);
- cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+ cn = new ChangeNumber(time, ts++, brokerIds[0]);
ModifyMsg modMsg = new ModifyMsg(cn, DN
.decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
broker1.publish(modMsg);
// - ModifyDN
- cn = new ChangeNumber((long) time, ts++, (short) brokerIds[0]);
+ cn = new ChangeNumber(time, ts++, brokerIds[0]);
ModifyDNOperation op = new ModifyDNOperation(connection, 1, 1, null, DN
.decode("o=test,dc=example,dc=com"), RDN.decode("o=test2"), true,
null);
@@ -598,9 +655,9 @@
String changelogLdif = "dn: cn=Changelog Server\n"
+ "objectClass: top\n"
+ "objectClass: ds-cfg-synchronization-changelog-server-config\n"
- + "cn: Changelog Server\n"
+ + "cn: Changelog Server\n"
+ "ds-cfg-changelog-port: " + changelogPorts[1] + "\n"
- + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
+ + "ds-cfg-changelog-server: localhost:" + changelogPorts[0] + "\n"
+ "ds-cfg-changelog-server-id: " + changelogIds[1] + "\n";
Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
@@ -608,7 +665,7 @@
// Connect broker 2 to changelog2
broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
- (short) brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
+ brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
}
// - Check msg receives by broker, through changeLog2
@@ -658,7 +715,7 @@
}
}
// Check that everything expected has been received
- assertTrue(ts == 1, "Broker2 did not receive the complete set of"
+ assertTrue(ts == 1, "Broker2 did not receive the complete set of"
+ " expected messages: #msg received " + ts);
}
finally
@@ -674,4 +731,94 @@
}
}
}
+
+ /**
+ * After the tests stop the changelog server.
+ */
+ @AfterClass()
+ public void shutdown() throws Exception
+ {
+ if (changelog != null)
+ changelog.shutdown();
+ }
+
+ /**
+ * This class allows to creater reader thread.
+ * They continuously reads messages from a changelog broker until
+ * there is nothing left.
+ * They Count the number of received messages.
+ */
+ private class BrokerReader extends Thread
+ {
+ private ChangelogBroker broker;
+
+ /**
+ * Creates a new Stress Test Reader
+ * @param broker
+ */
+ public BrokerReader(ChangelogBroker broker)
+ {
+ this.broker = broker;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ // loop receiving messages until either we get a timeout
+ // because there is nothing left or an error condition happens.
+ try
+ {
+ while (true)
+ {
+ SynchronizationMessage msg = broker.receive();
+ if (msg == null)
+ break;
+ }
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ /**
+ * This class allows to create writer thread that can
+ * be used as producers for the Changelog stress tests.
+ */
+ private class BrokerWriter extends Thread
+ {
+ int count;
+ private ChangelogBroker broker;
+ ChangeNumberGenerator gen;
+
+ public BrokerWriter(ChangelogBroker broker, ChangeNumberGenerator gen,
+ int count)
+ {
+ this.broker = broker;
+ this.count = count;
+ this.gen = gen;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ /*
+ * Simple loop creating changes and sending them
+ * to the changelog server.
+ */
+ while (count>0)
+ {
+ count--;
+
+ DeleteMsg msg =
+ new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(),
+ "uid");
+ broker.publish(msg);
+ }
+ }
+ }
}
--
Gitblit v1.10.0