From 067771ecaf547a6a25a52f4c8a38575d0dbd8015 Mon Sep 17 00:00:00 2001
From: neil_a_wilson <neil_a_wilson@localhost>
Date: Tue, 16 Jan 2007 20:13:21 +0000
Subject: [PATCH] Revert the changes made in revision 980 because they appear to be responsible for a number of test case failures and a potential deadlock.
---
opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java | 100 ++++-----
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java | 6
opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java | 21 +-
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java | 2
opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java | 62 ++---
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java | 2
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java | 321 ++-----------------------------
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java | 70 ++----
9 files changed, 139 insertions(+), 448 deletions(-)
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index fd6be9a..2a74fb9 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -418,7 +418,7 @@
ServerHandler handler = new ServerHandler(
new SocketSession(socket), queueSize);
- handler.start(baseDn, serverId, this.serverURL, rcvWindow, this);
+ handler.start(baseDn, serverId, serverURL, rcvWindow, this);
}
catch (IOException e)
{
@@ -545,7 +545,6 @@
}
dbEnv.shutdown();
- DirectoryServer.deregisterConfigurableComponent(this);
}
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
index 29e92ba..fc94e70 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
@@ -170,6 +170,10 @@
public ChangelogCursor openReadCursor(ChangeNumber changeNumber)
throws DatabaseException, Exception
{
+ if (changeNumber == null)
+ changeNumber = readFirstChange();
+ if (changeNumber == null)
+ return null;
return new ChangelogCursor(changeNumber);
}
@@ -315,16 +319,13 @@
{
cursor = db.openCursor(txn, null);
- if (startingChangeNumber != null)
- {
- key = new ChangelogKey(startingChangeNumber);
- data = new DatabaseEntry();
+ DatabaseEntry key = new ChangelogKey(startingChangeNumber);
+ DatabaseEntry data = new DatabaseEntry();
- if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
- OperationStatus.SUCCESS)
- {
- throw new Exception("ChangeNumber not available");
- }
+ if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
+ OperationStatus.SUCCESS)
+ {
+ throw new Exception("ChangeNumber not available");
}
}
@@ -372,7 +373,7 @@
}
/**
- * Get the next ChangeNumber in the database from this Cursor.
+ * Get the next ChangeNumber inthe database from this Cursor.
*
* @return The next ChangeNumber in the database from this cursor.
* @throws DatabaseException In case of underlying database problem.
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java b/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
index 64bb929..ac53f2b 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -81,7 +81,6 @@
private boolean shutdown = false;
private boolean done = false;
private DirectoryThread thread = null;
- private Object flushLock = new Object();
/**
* Creates a New dbHandler associated to a given LDAP server.
@@ -205,12 +204,6 @@
public ChangelogIterator generateIterator(ChangeNumber changeNumber)
throws DatabaseException, Exception
{
- /*
- * make sure to flush some changes in the database so that
- * we don't create the iterator on an empty database when the
- * dbHandler has just been started.
- */
- flush();
return new ChangelogIterator(serverId, db, changeNumber);
}
@@ -327,22 +320,17 @@
while ((size < 5000 ) && (!finished))
{
ChangeNumber changeNumber = cursor.nextChangeNumber();
- if (changeNumber != null)
+ if ((changeNumber != null) && (!changeNumber.equals(lastChange))
+ && (changeNumber.older(trimDate)))
{
- if ((!changeNumber.equals(lastChange))
- && (changeNumber.older(trimDate)))
- {
- size++;
- cursor.delete();
- }
- else
- {
- firstChange = changeNumber;
- finished = true;
- }
+ size++;
+ cursor.delete();
}
else
+ {
+ firstChange = changeNumber;
finished = true;
+ }
}
cursor.close();
@@ -362,21 +350,19 @@
do
{
- synchronized(flushLock)
- {
- // get N messages to save in the DB
- List<UpdateMessage> changes = getChanges(500);
+ // get N messages to save in the DB
+ List<UpdateMessage> changes = getChanges(500);
- // if no more changes to save exit immediately.
- if ((changes == null) || ((size = changes.size()) == 0))
- return;
+ // if no more changes to save exit immediately.
+ if ((changes == null) || ((size = changes.size()) == 0))
+ return;
- // save the change to the stable storage.
- db.addEntries(changes);
+ // save the change to the stable storage.
+ db.addEntries(changes);
- // remove the changes from the list of changes to be saved.
- clear(changes.size());
- }
+ // remove the changes from the list of changes to be saved.
+ clear(changes.size());
+
} while (size >=500);
}
@@ -401,17 +387,19 @@
attributes.add(new Attribute("changelog-database",
String.valueOf(serverId)));
attributes.add(new Attribute("base-dn", baseDn.toString()));
- if (firstChange != null)
+ ChangeNumber first = getFirstChange();
+ ChangeNumber last = getLastChange();
+ if (first != null)
{
- Date firstTime = new Date(firstChange.getTime());
+ Date firstTime = new Date(first.getTime());
attributes.add(new Attribute("first-change",
- firstChange.toString() + " " + firstTime.toString()));
+ first.toString() + " " + firstTime.toString()));
}
- if (lastChange != null)
+ if (last != null)
{
- Date lastTime = new Date(lastChange.getTime());
+ Date lastTime = new Date(last.getTime());
attributes.add(new Attribute("last-change",
- lastChange.toString() + " " + lastTime.toString()));
+ last.toString() + " " + lastTime.toString()));
}
return attributes;
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java b/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
index fd1f3d1..f2c2d70 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -118,18 +118,57 @@
return;
savedStatus = true;
- ResultCode resultCode = updateStateEntry();
- if (resultCode != ResultCode.SUCCESS)
+
+ ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
+
+ if (values.size() == 0)
+ return;
+
+ LDAPAttribute attr =
+ new LDAPAttribute(SYNCHRONIZATION_STATE, values);
+ LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
+ ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
+ mods.add(mod);
+
+ boolean done = false;
+ while (!done)
{
- if (resultCode == ResultCode.NO_SUCH_OBJECT)
+ /*
+ * Generate a modify operation on the Server State Entry :
+ * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
+ */
+ ModifyOperation op =
+ new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
+ InternalClientConnection.nextMessageID(),
+ new ArrayList<Control>(0), serverStateAsn1Dn,
+ mods);
+ op.setInternalOperation(true);
+ op.setSynchronizationOperation(true);
+
+ op.run();
+ ResultCode resultCode = op.getResultCode();
+ if (resultCode != ResultCode.SUCCESS)
{
- createStateEntry();
+ if (resultCode == ResultCode.NO_SUCH_OBJECT)
+ {
+ createStateEntry();
+ }
+ else
+ {
+ savedStatus = false;
+ int msgID = MSGID_ERROR_UPDATING_RUV;
+ String message = getMessage(msgID,
+ op.getResultCode().getResultCodeName(),
+ op.toString(), op.getErrorMessage(),
+ baseDn.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ break;
+ }
}
else
- {
- savedStatus = false;
-
- }
+ done = true;
}
}
@@ -258,51 +297,6 @@
}
/**
- * Save the current values of this PersistentState object
- * in the appropiate entry of the database.
- *
- * @return a ResultCode indicating if the method was successfull.
- */
- private ResultCode updateStateEntry()
- {
- /*
- * Generate a modify operation on the Server State Entry :
- * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn
- */
- ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
-
- if (values.size() == 0)
- return ResultCode.SUCCESS;
-
- LDAPAttribute attr =
- new LDAPAttribute(SYNCHRONIZATION_STATE, values);
- LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
- ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1);
- mods.add(mod);
-
- ModifyOperation op =
- new ModifyOperation(conn, InternalClientConnection.nextOperationID(),
- InternalClientConnection.nextMessageID(),
- new ArrayList<Control>(0), serverStateAsn1Dn,
- mods);
- op.setInternalOperation(true);
- op.setSynchronizationOperation(true);
-
- op.run();
-
- ResultCode result = op.getResultCode();
- if (result != ResultCode.SUCCESS)
- {
- int msgID = MSGID_ERROR_UPDATING_RUV;
- String message = getMessage(msgID, op.getResultCode().getResultCodeName(),
- op.toString(), op.getErrorMessage(), baseDn.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
- return result;
- }
-
- /**
* Get the Dn where the ServerState is stored.
* @return Returns the serverStateDn.
*/
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
index 1646289..e4922e4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -99,7 +99,7 @@
final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
ChangelogBroker broker = openChangelogSession(baseDn, (short) 13,
- WINDOW_SIZE, 8989, 1000, true);
+ WINDOW_SIZE, 8989, 1000);
try {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
index 3c8426c..a0d621f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -112,7 +112,7 @@
cleanEntries();
ChangelogBroker broker =
- openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true);
+ openChangelogSession(baseDn, (short) 18, 100, 8989, 5000);
Monitor monitor = new Monitor("stress test monitor");
DirectoryServer.registerMonitorProvider(monitor);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index 0c7dc49..70308cb 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -41,7 +41,6 @@
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.plugin.MultimasterSynchronization;
import org.opends.server.synchronization.plugin.PersistentServerState;
@@ -53,7 +52,7 @@
import org.testng.annotations.BeforeClass;
/**
- * An abstract class that all synchronization unit test should extend.
+ * An abstract class that all synchronization unit test should extend.
*/
@Test(groups = { "precommit", "synchronization" })
public abstract class SynchronizationTestCase extends DirectoryServerTestCase
@@ -92,7 +91,7 @@
/**
* Set up the environment for performing the tests in this suite.
- *
+ *
* @throws Exception
* If the environment could not be set up.
*/
@@ -101,23 +100,21 @@
{
// This test suite depends on having the schema available.
TestCaseUtils.startServer();
-
+
// Create an internal connection
connection = new InternalClientConnection();
}
-
+
/**
* Open a changelog session to the local Changelog server.
*
*/
protected ChangelogBroker openChangelogSession(
- final DN baseDn, short serverId, int window_size,
- int port, int timeout, boolean emptyOldChanges)
+ final DN baseDn, short serverId, int window_size, int port, int timeout)
throws Exception, SocketException
{
PersistentServerState state = new PersistentServerState(baseDn);
- if (emptyOldChanges)
- state.loadState();
+ state.loadState();
ChangelogBroker broker = new ChangelogBroker(
state, baseDn, serverId, 0, 0, 0, 0, window_size);
ArrayList<String> servers = new ArrayList<String>(1);
@@ -125,46 +122,23 @@
broker.start(servers);
if (timeout != 0)
broker.setSoTimeout(timeout);
- if (emptyOldChanges)
+ /*
+ * loop receiving update until there is nothing left
+ * to make sure that message from previous tests have been consumed.
+ */
+ try
{
- /*
- * loop receiving update until there is nothing left
- * to make sure that message from previous tests have been consumed.
- */
- try
+ while (true)
{
- while (true)
- {
- broker.receive();
- }
+ broker.receive();
}
- catch (Exception e)
- { }
}
+ catch (Exception e)
+ { }
return broker;
}
/**
- * Open a new session to the Changelog Server
- * starting with a given ServerState.
- */
- protected ChangelogBroker openChangelogSession(
- final DN baseDn, short serverId, int window_size,
- int port, int timeout, ServerState state)
- throws Exception, SocketException
- {
- ChangelogBroker broker = new ChangelogBroker(
- state, baseDn, serverId, 0, 0, 0, 0, window_size);
- ArrayList<String> servers = new ArrayList<String>(1);
- servers.add("localhost:" + port);
- broker.start(servers);
- if (timeout != 0)
- broker.setSoTimeout(timeout);
-
- return broker;
- }
-
- /**
* suppress all the entries created by the tests in this class
*/
protected void cleanEntries()
@@ -175,11 +149,11 @@
{
while (true)
{
- DN dn = entryList.removeLast();
+ DN dn = entryList.removeLast();
op = new DeleteOperation(connection, InternalClientConnection
.nextOperationID(), InternalClientConnection.nextMessageID(), null,
dn);
-
+
op.run();;
}
}
@@ -198,7 +172,7 @@
public void classCleanUp() throws Exception
{
DirectoryServer.setCheckSchema(schemaCheck);
-
+
// WORKAROUND FOR BUG #639 - BEGIN -
if (mms != null)
{
@@ -206,7 +180,7 @@
mms.finalizeSynchronizationProvider();
}
// WORKAROUND FOR BUG #639 - END -
-
+
cleanEntries();
}
@@ -222,7 +196,7 @@
assertNotNull(DirectoryServer.getConfigEntry(DN
.decode(synchroPluginStringDN)),
"Unable to add the Multimaster synchronization plugin");
-
+
// WORKAROUND FOR BUG #639 - BEGIN -
DN dn = DN.decode(synchroPluginStringDN);
ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn);
@@ -238,14 +212,14 @@
}
DirectoryServer.registerSynchronizationProvider(mms);
// WORKAROUND FOR BUG #639 - END -
-
+
//
// Add the changelog server
DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
"Unable to add the changeLog server");
entryList.add(changeLogEntry.getDN());
-
+
//
// We also have a replicated suffix (synchronization domain)
DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index 04e5ba5..dffa201 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -249,7 +249,7 @@
* This must use a serverId different from the LDAP server ID
*/
ChangelogBroker broker =
- openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
+ openChangelogSession(baseDn, (short) 2, 100, 8989, 1000);
/*
* Create a Change number generator to generate new changenumbers
@@ -562,7 +562,7 @@
cleanEntries();
ChangelogBroker broker =
- openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true);
+ openChangelogSession(baseDn, (short) 27, 100, 8989, 1000);
try {
ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
@@ -964,7 +964,7 @@
Thread.sleep(2000);
ChangelogBroker broker =
- openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true);
+ openChangelogSession(baseDn, (short) 11, 100, 8989, 1000);
try
{
ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index aa12952..02a2521 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -32,52 +32,31 @@
import org.opends.server.config.ConfigEntry;
import org.opends.server.synchronization.SynchronizationTestCase;
import org.opends.server.synchronization.common.ChangeNumber;
-import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.ChangelogBroker;
import org.opends.server.synchronization.protocol.DeleteMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
-import org.opends.server.util.TimeThread;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
- * Tests for the changelog service code.
+ * Tests for the chngelog service code.
*/
public class ChangelogTest extends SynchronizationTestCase
{
/**
- * The changelog server that will be used in this test.
+ * Basic test of the changelog code.
+ * Create a changelog server, connect 2 clients and exchange
+ * messages between the clients.
*/
- private Changelog changelog = null;
-
- /**
- * The port of the changelog server.
- */
- private int changelogPort;
-
- private ChangeNumber firstChangeNumberServer1 = null;
- private ChangeNumber secondChangeNumberServer1 = null;
- private ChangeNumber firstChangeNumberServer2 = null;
- private ChangeNumber secondChangeNumberServer2 = null;
-
-
- /**
- * Before starting the tests, start the server and configure a
- * changelog server.
- */
- @BeforeClass()
- public void configure() throws Exception
+ @Test()
+ public void changelogBasic() throws Exception
{
- TestCaseUtils.startServer();
-
- // find a free port for the changelog server
+ // find a free port
ServerSocket socket = TestCaseUtils.bindFreePort();
- changelogPort = socket.getLocalPort();
+ int changelogPort = socket.getLocalPort();
socket.close();
String changelogLdif =
@@ -89,280 +68,36 @@
+ "ds-cfg-changelog-server-id: 1\n";
Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
- changelog = new Changelog(changelogConfig);
- }
+ Changelog changelog = new Changelog(changelogConfig);
- /**
- * Basic test of the changelog code :
- * Connect 2 clients to the changelog server and exchange messages
- * between the clients.
- *
- * Note : Other tests in this file depends on this test and may need to
- * change if this test is modified.
- */
- @Test()
- public void changelogBasic() throws Exception
- {
- ChangelogBroker server1 = null;
- ChangelogBroker server2 = null;
-
+ ChangelogBroker broker1 = null;
+ ChangelogBroker broker2 = null;
+
try {
- /*
- * Open a sender session and a receiver session to the changelog
- */
- server1 = openChangelogSession(
- DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
- 1000, true);
- server2 = openChangelogSession(
- DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort,
- 1000, true);
-
- /*
- * Create change numbers for the messages sent from server 1
- * with current time sequence 1 and with current time + 2 sequence 2
- */
- long time = TimeThread.getTime();
- firstChangeNumberServer1 = new ChangeNumber(time, 1, (short) 1);
- secondChangeNumberServer1 = new ChangeNumber(time + 2, 2, (short) 1);
-
- /*
- * Create change numbers for the messages sent from server 2
- * with current time sequence 1 and with current time + 3 sequence 2
- */
- firstChangeNumberServer2 = new ChangeNumber(time+ 1, 1, (short) 2);
- secondChangeNumberServer2 = new ChangeNumber(time + 3, 2, (short) 2);
-
- /*
- * Send and receive a Delete Msg from server 1 to server 2
- */
- DeleteMsg msg =
- new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer1,
- "uid");
- server1.publish(msg);
- SynchronizationMessage msg2 = server2.receive();
+ broker1 = openChangelogSession(
+ DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000);
+ broker2 = openChangelogSession(
+ DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000);
+
+ ChangeNumber cn = new ChangeNumber((long) 1, 1, (short)1);
+ DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", cn, "uid");
+ broker1.publish(msg);
+ SynchronizationMessage msg2 = broker2.receive();
if (msg2 instanceof DeleteMsg)
{
DeleteMsg del = (DeleteMsg) msg2;
- assertTrue(del.toString().equals(msg.toString()),
- "Changelog basic : incorrect message body received.");
+ assertTrue(del.toString().equals(msg2.toString()));
}
else
- fail("Changelog basic : incorrect message type received.");
-
- /*
- * Send and receive a second Delete Msg
- */
- msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
- server1.publish(msg);
- msg2 = server2.receive();
- if (msg2 instanceof DeleteMsg)
- {
- DeleteMsg del = (DeleteMsg) msg2;
- assertTrue(del.toString().equals(msg.toString()),
- "Changelog basic : incorrect message body received.");
- }
- else
- fail("Changelog basic : incorrect message type received.");
-
- /*
- * Send and receive a Delete Msg from server 1 to server 2
- */
- msg =
- new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2,
- "other-uid");
- server2.publish(msg);
- msg2 = server1.receive();
- if (msg2 instanceof DeleteMsg)
- {
- DeleteMsg del = (DeleteMsg) msg2;
- assertTrue(del.toString().equals(msg.toString()),
- "Changelog basic : incorrect message body received.");
- }
- else
- fail("Changelog basic : incorrect message type received.");
-
- /*
- * Send and receive a second Delete Msg
- */
- msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
- server2.publish(msg);
- msg2 = server1.receive();
- if (msg2 instanceof DeleteMsg)
- {
- DeleteMsg del = (DeleteMsg) msg2;
- assertTrue(del.toString().equals(msg.toString()),
- "Changelog basic : incorrect message body received.");
- }
- else
- fail("Changelog basic : incorrect message type received.");
+ fail("Changelog transmission failed");
}
finally
{
- if (server1 != null)
- server1.stop();
- if (server2 != null)
- server2.stop();
- }
- }
-
- /**
- * Test that a new client see the change that was sent in the
- * previous test.
- */
- @Test(dependsOnMethods = { "changelogBasic" })
- public void newClient() throws Exception
- {
- ChangelogBroker broker = null;
-
- try {
- broker =
- openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
- 100, changelogPort, 1000, false);
-
- SynchronizationMessage msg2 = broker.receive();
- if (!(msg2 instanceof DeleteMsg))
- fail("Changelog basic transmission failed");
- else
- {
- DeleteMsg del = (DeleteMsg) msg2;
- assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1),
- "The first message received by a new client was the wrong one."
- + del.getChangeNumber() + " " + firstChangeNumberServer1);
- }
- }
- finally
- {
- if (broker != null)
- broker.stop();
- }
- }
-
-
-
- /**
- * Test that a client that has already seen some changes now receive
- * the correct next change.
- */
- private void newClientWithChanges(
- ServerState state, ChangeNumber nextChangeNumber) throws Exception
- {
- ChangelogBroker broker = null;
-
- /*
- * Connect to the changelog server using the state created above.
- */
- try {
- broker =
- openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3,
- 100, changelogPort, 1000, state);
-
- SynchronizationMessage msg2 = broker.receive();
- if (!(msg2 instanceof DeleteMsg))
- fail("Changelog basic transmission failed");
- else
- {
- DeleteMsg del = (DeleteMsg) msg2;
- assertTrue(del.getChangeNumber().equals(nextChangeNumber),
- "The second message received by a new client was the wrong one."
- + del.getChangeNumber() + " " + nextChangeNumber);
- }
- }
- finally
- {
- if (broker != null)
- broker.stop();
- }
- }
-
- /**
- * Test that a client that has already seen the first change now see the
- * second change
- */
- @Test(dependsOnMethods = { "changelogBasic" })
- public void newClientWithFirstChanges() throws Exception
- {
- /*
- * Create a ServerState updated with the first changes from both servers
- * done in test changelogBasic.
- */
- ServerState state = new ServerState();
- state.update(firstChangeNumberServer1);
- state.update(firstChangeNumberServer2);
-
- newClientWithChanges(state, secondChangeNumberServer1);
- }
-
- /**
- * Test that a client that has already seen the first change from server 1
- * now see the first change from server 2
- */
- @Test(dependsOnMethods = { "changelogBasic" })
- public void newClientWithChangefromServer1() throws Exception
- {
- /*
- * Create a ServerState updated with the first change from server 1
- */
- ServerState state = new ServerState();
- state.update(firstChangeNumberServer1);
-
- newClientWithChanges(state, firstChangeNumberServer2);
- }
-
- /**
- * Test that a client that has already seen the first chaneg from server 2
- * now see the first change from server 1
- */
- @Test(dependsOnMethods = { "changelogBasic" })
- public void newClientWithChangefromServer2() throws Exception
- {
- /*
- * Create a ServerState updated with the first change from server 1
- */
- ServerState state = new ServerState();
- state.update(firstChangeNumberServer2);
-
- newClientWithChanges(state, firstChangeNumberServer1);
- }
-
- /**
- * Test that a client that has not seen the second change from server 1
- * now receive it.
- */
- @Test(dependsOnMethods = { "changelogBasic" })
- public void newClientLateServer1() throws Exception
- {
- /*
- * Create a ServerState updated with the first change from server 1
- */
- ServerState state = new ServerState();
- state.update(secondChangeNumberServer2);
- state.update(firstChangeNumberServer1);
-
- newClientWithChanges(state, secondChangeNumberServer1);
- }
-
- /**
- * Test that newClient() and newClientWithFirstChange() still works
- * after stopping and restarting the changelog server.
- */
- @Test(dependsOnMethods = { "changelogBasic" })
- public void stopChangelog() throws Exception
- {
- changelog.shutdown();
- configure();
- newClient();
- newClientWithFirstChanges();
- newClientWithChangefromServer1();
- newClientWithChangefromServer2();
- }
-
- /**
- * After the tests stop the changelog server.
- */
- @AfterClass()
- public void shutdown() throws Exception
- {
- if (changelog != null)
changelog.shutdown();
+ if (broker1 != null)
+ broker1.stop();
+ if (broker2 != null)
+ broker2.stop();
+ }
}
}
--
Gitblit v1.10.0