From 26ff1f0755680cbce7b5bdb136750b2b1bc9e4ed Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 10 Nov 2006 08:05:56 +0000
Subject: [PATCH] issue 508 These changes implement a window mechanism in the sycnhronization protocol.
---
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java | 153 +++++++++++++++++++++++++++------------------------
1 files changed, 81 insertions(+), 72 deletions(-)
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
index d553104..f73ca4b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -27,6 +27,7 @@
package org.opends.server.synchronization;
+import static org.opends.server.loggers.Error.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -53,6 +54,8 @@
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
@@ -139,96 +142,100 @@
@Test(enabled=true, groups="slow")
public void fromServertoBroker() throws Exception
{
-
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting Synchronization StressTest : fromServertoBroker" , 1);
+
final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
final int TOTAL_MESSAGES = 1000;
cleanEntries();
- ChangelogBroker broker = openChangelogSession(baseDn, (short) 3);
+ ChangelogBroker broker = openChangelogSession(baseDn, (short) 18);
DirectoryServer.registerMonitorProvider(this);
try {
- /*
- * loop receiving update until there is nothing left
- * to make sure that message from previous tests have been consumed.
- */
- try
- {
- while (true)
+ /*
+ * loop receiving update until there is nothing left
+ * to make sure that message from previous tests have been consumed.
+ */
+ try
{
- broker.receive();
+ while (true)
+ {
+ broker.receive();
+ }
}
- }
- catch (Exception e)
- { }
- /*
- * Test that operations done on this server are sent to the
- * changelog server and forwarded to our changelog broker session.
- */
+ catch (Exception e)
+ { }
+ /*
+ * Test that operations done on this server are sent to the
+ * changelog server and forwarded to our changelog broker session.
+ */
- // Create an Entry (add operation) that will be later used in the test.
- Entry tmp = personEntry.duplicate();
- AddOperation addOp = new AddOperation(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, tmp.getDN(),
- tmp.getObjectClasses(), tmp.getUserAttributes(),
- tmp.getOperationalAttributes());
- addOp.run();
- entryList.add(personEntry);
- assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
- "The Add Entry operation failed");
+ // Create an Entry (add operation) that will be later used in the test.
+ Entry tmp = personEntry.duplicate();
+ AddOperation addOp = new AddOperation(connection,
+ InternalClientConnection.nextOperationID(), InternalClientConnection
+ .nextMessageID(), null, tmp.getDN(),
+ tmp.getObjectClasses(), tmp.getUserAttributes(),
+ tmp.getOperationalAttributes());
+ addOp.run();
+ entryList.add(personEntry);
+ assertNotNull(DirectoryServer.getEntry(personEntry.getDN()),
+ "The Add Entry operation failed");
- // Check if the client has received the msg
- SynchronizationMessage msg = broker.receive();
- assertTrue(msg instanceof AddMsg,
- "The received synchronization message is not an ADD msg");
- AddMsg addMsg = (AddMsg) msg;
+ // Check if the client has received the msg
+ SynchronizationMessage msg = broker.receive();
+ assertTrue(msg instanceof AddMsg,
+ "The received synchronization message is not an ADD msg");
+ AddMsg addMsg = (AddMsg) msg;
- Operation receivedOp = addMsg.createOperation(connection);
- assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
- "The received synchronization message is not an ADD msg");
+ Operation receivedOp = addMsg.createOperation(connection);
+ assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
+ "The received synchronization message is not an ADD msg");
- assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
- "The received ADD synchronization message is not for the excepted DN");
+ assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
+ "The received ADD synchronization message is not for the excepted DN");
- reader = new BrokerReader(broker);
- reader.start();
+ reader = new BrokerReader(broker);
+ reader.start();
- long startTime = TimeThread.getTime();
- int count = TOTAL_MESSAGES;
+ long startTime = TimeThread.getTime();
+ int count = TOTAL_MESSAGES;
- // Create a number of writer thread that will loop modifying the entry
- List<Thread> writerThreadList = new LinkedList<Thread>();
- for (int n = 0; n < 1; n++)
- {
- BrokerWriter writer = new BrokerWriter(count);
- writerThreadList.add(writer);
- }
- for (Thread thread : writerThreadList)
- {
- thread.start();
- }
- // wait for all the threads to finish.
- for (Thread thread : writerThreadList)
- {
- thread.join();
- }
+ // Create a number of writer thread that will loop modifying the entry
+ List<Thread> writerThreadList = new LinkedList<Thread>();
+ for (int n = 0; n < 1; n++)
+ {
+ BrokerWriter writer = new BrokerWriter(count);
+ writerThreadList.add(writer);
+ }
+ for (Thread thread : writerThreadList)
+ {
+ thread.start();
+ }
+ // wait for all the threads to finish.
+ for (Thread thread : writerThreadList)
+ {
+ thread.join();
+ }
- long afterSendTime = TimeThread.getTime();
+ long afterSendTime = TimeThread.getTime();
- int rcvCount = reader.getCount();
- long afterReceiveTime = TimeThread.getTime();
+ int rcvCount = reader.getCount();
+
+ long afterReceiveTime = TimeThread.getTime();
- if (rcvCount != TOTAL_MESSAGES)
- {
- fail("some messages were lost : expected : " +TOTAL_MESSAGES +
- " received : " + rcvCount);
- }
+ if (rcvCount != TOTAL_MESSAGES)
+ {
+ fail("some messages were lost : expected : " +TOTAL_MESSAGES +
+ " received : " + rcvCount);
+ }
}
finally {
- DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
- broker.stop();
+ DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST);
+ broker.stop();
}
}
@@ -393,7 +400,7 @@
ServerState state = new ServerState(baseDn);
state.loadState();
ChangelogBroker broker = new ChangelogBroker(state, baseDn,
- serverId, 0, 0, 0, 0);
+ serverId, 0, 0, 0, 0, 100);
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:8989");
broker.start(servers);
@@ -441,7 +448,7 @@
// We also have a replicated suffix (synchronization domain)
DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
- "Unable to add the syncrhonized server");
+ "Unable to add the synchronized server");
entryList.add(synchroServerEntry);
}
@@ -553,7 +560,9 @@
{
while (true)
{
- broker.receive();
+ SynchronizationMessage msg = broker.receive();
+ if (msg == null)
+ break;
count ++;
}
} catch (Exception e) {
@@ -577,7 +586,7 @@
return count;
try
{
- this.wait();
+ this.wait(60);
return count;
} catch (InterruptedException e)
{
--
Gitblit v1.10.0