opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -340,6 +340,7 @@ { newSocket = listenSocket.accept(); newSocket.setReceiveBufferSize(1000000); newSocket.setTcpNoDelay(true); ServerHandler handler = new ServerHandler( new SocketSession(newSocket), queueSize); handler.start(null, serverId, serverURL, rcvWindow, this); @@ -414,11 +415,12 @@ InetAddress.getByName(hostname), Integer.parseInt(port)); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(ServerAddr, 500); ServerHandler handler = new ServerHandler( new SocketSession(socket), queueSize); handler.start(baseDn, serverId, serverURL, rcvWindow, this); handler.start(baseDn, serverId, this.serverURL, rcvWindow, this); } catch (IOException e) { @@ -545,6 +547,7 @@ } dbEnv.shutdown(); DirectoryServer.deregisterConfigurableComponent(this); } opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
@@ -170,10 +170,6 @@ public ChangelogCursor openReadCursor(ChangeNumber changeNumber) throws DatabaseException, Exception { if (changeNumber == null) changeNumber = readFirstChange(); if (changeNumber == null) return null; return new ChangelogCursor(changeNumber); } @@ -319,13 +315,16 @@ { cursor = db.openCursor(txn, null); DatabaseEntry key = new ChangelogKey(startingChangeNumber); DatabaseEntry data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (startingChangeNumber != null) { throw new Exception("ChangeNumber not available"); key = new ChangelogKey(startingChangeNumber); data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { throw new Exception("ChangeNumber not available"); } } } @@ -373,7 +372,7 @@ } /** * Get the next ChangeNumber inthe database from this Cursor. * Get the next ChangeNumber in the database from this Cursor. * * @return The next ChangeNumber in the database from this cursor. * @throws DatabaseException In case of underlying database problem. opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -81,6 +81,7 @@ private boolean shutdown = false; private boolean done = false; private DirectoryThread thread = null; private Object flushLock = new Object(); /** * Creates a New dbHandler associated to a given LDAP server. @@ -204,6 +205,12 @@ public ChangelogIterator generateIterator(ChangeNumber changeNumber) throws DatabaseException, Exception { /* * make sure to flush some changes in the database so that * we don't create the iterator on an empty database when the * dbHandler has just been started. */ flush(); return new ChangelogIterator(serverId, db, changeNumber); } @@ -320,17 +327,22 @@ while ((size < 5000 ) && (!finished)) { ChangeNumber changeNumber = cursor.nextChangeNumber(); if ((changeNumber != null) && (!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) if (changeNumber != null) { size++; cursor.delete(); if ((!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) { size++; cursor.delete(); } else { firstChange = changeNumber; finished = true; } } else { firstChange = changeNumber; finished = true; } } cursor.close(); @@ -350,19 +362,21 @@ do { // get N messages to save in the DB List<UpdateMessage> changes = getChanges(500); synchronized(flushLock) { // get N messages to save in the DB List<UpdateMessage> changes = getChanges(500); // if no more changes to save exit immediately. if ((changes == null) || ((size = changes.size()) == 0)) return; // if no more changes to save exit immediately. if ((changes == null) || ((size = changes.size()) == 0)) return; // save the change to the stable storage. db.addEntries(changes); // save the change to the stable storage. db.addEntries(changes); // remove the changes from the list of changes to be saved. clear(changes.size()); // remove the changes from the list of changes to be saved. clear(changes.size()); } } while (size >=500); } @@ -387,19 +401,17 @@ attributes.add(new Attribute("changelog-database", String.valueOf(serverId))); attributes.add(new Attribute("base-dn", baseDn.toString())); ChangeNumber first = getFirstChange(); ChangeNumber last = getLastChange(); if (first != null) if (firstChange != null) { Date firstTime = new Date(first.getTime()); Date firstTime = new Date(firstChange.getTime()); attributes.add(new Attribute("first-change", first.toString() + " " + firstTime.toString())); firstChange.toString() + " " + firstTime.toString())); } if (last != null) if (lastChange != null) { Date lastTime = new Date(last.getTime()); Date lastTime = new Date(lastChange.getTime()); attributes.add(new Attribute("last-change", last.toString() + " " + lastTime.toString())); lastChange.toString() + " " + lastTime.toString())); } return attributes; opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -184,6 +184,7 @@ InetAddress.getByName(hostname), Integer.parseInt(port)); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(ServerAddr, 500); session = new SocketSession(socket); opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -118,57 +118,18 @@ return; savedStatus = true; ArrayList<ASN1OctetString> values = this.toASN1ArrayList(); if (values.size() == 0) return; LDAPAttribute attr = new LDAPAttribute(SYNCHRONIZATION_STATE, values); LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1); mods.add(mod); boolean done = false; while (!done) ResultCode resultCode = updateStateEntry(); if (resultCode != ResultCode.SUCCESS) { /* * Generate a modify operation on the Server State Entry : * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn */ ModifyOperation op = new ModifyOperation(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), new ArrayList<Control>(0), serverStateAsn1Dn, mods); op.setInternalOperation(true); op.setSynchronizationOperation(true); op.run(); ResultCode resultCode = op.getResultCode(); if (resultCode != ResultCode.SUCCESS) if (resultCode == ResultCode.NO_SUCH_OBJECT) { if (resultCode == ResultCode.NO_SUCH_OBJECT) { createStateEntry(); } else { savedStatus = false; int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); break; } createStateEntry(); } else done = true; { savedStatus = false; } } } @@ -297,6 +258,51 @@ } /** * Save the current values of this PersistentState object * in the appropiate entry of the database. * * @return a ResultCode indicating if the method was successfull. */ private ResultCode updateStateEntry() { /* * Generate a modify operation on the Server State Entry : * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn */ ArrayList<ASN1OctetString> values = this.toASN1ArrayList(); if (values.size() == 0) return ResultCode.SUCCESS; LDAPAttribute attr = new LDAPAttribute(SYNCHRONIZATION_STATE, values); LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1); mods.add(mod); ModifyOperation op = new ModifyOperation(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), new ArrayList<Control>(0), serverStateAsn1Dn, mods); op.setInternalOperation(true); op.setSynchronizationOperation(true); op.run(); ResultCode result = op.getResultCode(); if (result != ResultCode.SUCCESS) { int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } return result; } /** * Get the Dn where the ServerState is stored. * @return Returns the serverStateDn. */ opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -95,14 +95,14 @@ logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "Starting synchronization ProtocolWindowTest : saturateAndRestart" , 1); final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); ChangelogBroker broker = openChangelogSession(baseDn, (short) 13, WINDOW_SIZE, 8989, 1000); WINDOW_SIZE, 8989, 1000, true); try { /* Test that changelog monitor and synchro plugin monitor informations * publish the correct window size. * This allows both the check the monitoring code and to test that @@ -111,7 +111,7 @@ Thread.sleep(1500); assertTrue(checkWindows(WINDOW_SIZE)); assertTrue(checkChangelogQueueSize(CHANGELOG_QUEUE_SIZE)); // Create an Entry (add operation) that will be later used in the test. Entry tmp = personEntry.duplicate(); AddOperation addOp = new AddOperation(connection, @@ -138,7 +138,7 @@ "The received ADD synchronization message is not for the excepted DN"); // send (2 * window + changelog queue) modify operations // so that window + changelog queue get stuck in the changelog queue // so that window + changelog queue get stuck in the changelog queue int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE; processModify(count); @@ -173,7 +173,7 @@ /** * Check that the Changelog queue size has correctly been configured * by reading the monitoring information. * @throws LDAPException * @throws LDAPException */ private boolean checkChangelogQueueSize(int changelog_queue_size) throws LDAPException @@ -188,7 +188,7 @@ /** * Check that the window configuration has been successfull * by reading the monitoring information and checking * by reading the monitoring information and checking * that we do have 2 entries with the configured max-rcv-window. */ private boolean checkWindows(int windowSize) throws LDAPException @@ -200,7 +200,7 @@ assertEquals(op.getResultCode(), ResultCode.SUCCESS); return (op.getEntriesSent() == 3); } /** * Search that the changelog has stopped sending changes after * having reach the limit of the window size. @@ -216,7 +216,7 @@ assertEquals(op.getResultCode(), ResultCode.SUCCESS); if (op.getEntriesSent() != 1) return false; op = connection.processSearch( new ASN1OctetString("cn=monitor"), SearchScope.WHOLE_SUBTREE, opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -89,7 +89,7 @@ private String changeLogStringDN; private BrokerReader reader = null; /** * A "person" entry */ @@ -106,13 +106,13 @@ logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "Starting Synchronization StressTest : fromServertoBroker" , 1); final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); final int TOTAL_MESSAGES = 1000; cleanEntries(); ChangelogBroker broker = openChangelogSession(baseDn, (short) 18, 100, 8989, 5000); openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true); Monitor monitor = new Monitor("stress test monitor"); DirectoryServer.registerMonitorProvider(monitor); @@ -212,7 +212,7 @@ // Create an internal connection connection = new InternalClientConnection(); // Disable schema check schemaCheck = DirectoryServer.checkSchema(); DirectoryServer.setCheckSchema(false); @@ -411,7 +411,7 @@ return count; } } private class Monitor extends MonitorProvider { protected Monitor(String threadName) @@ -445,7 +445,7 @@ public void updateMonitorData() { // nothing to do } @Override @@ -453,7 +453,7 @@ throws ConfigException, InitializationException { // nothing to do } @Override @@ -463,7 +463,7 @@ return 0; } } } opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -41,6 +41,7 @@ import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.plugin.ChangelogBroker; import org.opends.server.synchronization.plugin.MultimasterSynchronization; import org.opends.server.synchronization.plugin.PersistentServerState; @@ -52,7 +53,7 @@ import org.testng.annotations.BeforeClass; /** * An abstract class that all synchronization unit test should extend. * An abstract class that all synchronization unit test should extend. */ @Test(groups = { "precommit", "synchronization" }) public abstract class SynchronizationTestCase extends DirectoryServerTestCase @@ -91,7 +92,7 @@ /** * Set up the environment for performing the tests in this suite. * * * @throws Exception * If the environment could not be set up. */ @@ -100,21 +101,24 @@ { // This test suite depends on having the schema available. TestCaseUtils.startServer(); schemaCheck = DirectoryServer.checkSchema(); // Create an internal connection connection = new InternalClientConnection(); } /** * Open a changelog session to the local Changelog server. * */ protected ChangelogBroker openChangelogSession( final DN baseDn, short serverId, int window_size, int port, int timeout) final DN baseDn, short serverId, int window_size, int port, int timeout, boolean emptyOldChanges) throws Exception, SocketException { PersistentServerState state = new PersistentServerState(baseDn); state.loadState(); if (emptyOldChanges) state.loadState(); ChangelogBroker broker = new ChangelogBroker( state, baseDn, serverId, 0, 0, 0, 0, window_size); ArrayList<String> servers = new ArrayList<String>(1); @@ -122,22 +126,45 @@ broker.start(servers); if (timeout != 0) broker.setSoTimeout(timeout); /* * loop receiving update until there is nothing left * to make sure that message from previous tests have been consumed. */ try if (emptyOldChanges) { while (true) /* * loop receiving update until there is nothing left * to make sure that message from previous tests have been consumed. */ try { broker.receive(); while (true) { broker.receive(); } } catch (Exception e) { } } catch (Exception e) { } return broker; } /** * Open a new session to the Changelog Server * starting with a given ServerState. */ protected ChangelogBroker openChangelogSession( final DN baseDn, short serverId, int window_size, int port, int timeout, ServerState state) throws Exception, SocketException { ChangelogBroker broker = new ChangelogBroker( state, baseDn, serverId, 0, 0, 0, 0, window_size); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + port); broker.start(servers); if (timeout != 0) broker.setSoTimeout(timeout); return broker; } /** * suppress all the entries created by the tests in this class */ @@ -149,11 +176,11 @@ { while (true) { DN dn = entryList.removeLast(); DN dn = entryList.removeLast(); op = new DeleteOperation(connection, InternalClientConnection .nextOperationID(), InternalClientConnection.nextMessageID(), null, dn); op.run();; } } @@ -172,7 +199,7 @@ public void classCleanUp() throws Exception { DirectoryServer.setCheckSchema(schemaCheck); // WORKAROUND FOR BUG #639 - BEGIN - if (mms != null) { @@ -180,7 +207,7 @@ mms.finalizeSynchronizationProvider(); } // WORKAROUND FOR BUG #639 - END - cleanEntries(); } @@ -196,7 +223,7 @@ assertNotNull(DirectoryServer.getConfigEntry(DN .decode(synchroPluginStringDN)), "Unable to add the Multimaster synchronization plugin"); // WORKAROUND FOR BUG #639 - BEGIN - DN dn = DN.decode(synchroPluginStringDN); ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn); @@ -212,14 +239,14 @@ } DirectoryServer.registerSynchronizationProvider(mms); // WORKAROUND FOR BUG #639 - END - // // Add the changelog server DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), "Unable to add the changeLog server"); entryList.add(changeLogEntry.getDN()); // // We also have a replicated suffix (synchronization domain) DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -84,7 +84,7 @@ private String user1entrysecondUUID; private String user1entryUUID; /** * A "person" entry */ @@ -254,7 +254,7 @@ * This must use a serverId different from the LDAP server ID */ ChangelogBroker broker = openChangelogSession(baseDn, (short) 2, 100, 8989, 1000); openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true); /* * Create a Change number generator to generate new changenumbers @@ -660,7 +660,7 @@ cleanEntries(); ChangelogBroker broker = openChangelogSession(baseDn, (short) 27, 100, 8989, 1000); openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true); try { ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0); @@ -883,7 +883,7 @@ break; } } if (lock == null) { throw new Exception("could not lock entry " + dn); @@ -892,8 +892,8 @@ try { newEntry = DirectoryServer.getEntry(personWithUUIDEntry.getDN()); if (newEntry == null) fail("The entry " + personWithUUIDEntry.getDN() + " has incorrectly been deleted from the database."); @@ -903,13 +903,13 @@ AttributeType attrType = DirectoryServer.getAttributeType(attrTypeStr, true); found = tmpAttr.hasValue(new AttributeValue(attrType, valueString)); } finally { LockManager.unlock(dn, lock); } if (found != hasAttribute) Thread.sleep(100); } while ((--count > 0) && (found != hasAttribute)); @@ -918,7 +918,7 @@ /** * Get the entryUUID for a given DN. * * * @throws Exception if the entry does not exist or does not have * an entryUUID. */ @@ -932,7 +932,7 @@ while ((count> 0) && (found == null)) { Thread.sleep(100); Lock lock = null; for (int i=0; i < 3; i++) { @@ -942,7 +942,7 @@ break; } } if (lock == null) { throw new Exception("could not lock entry " + dn); @@ -951,7 +951,7 @@ try { newEntry = DirectoryServer.getEntry(dn); if (newEntry != null) { List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid"); @@ -991,11 +991,11 @@ while ((count> 0) && (found != exist)) { Thread.sleep(200); found = DirectoryServer.entryExists(dn); count--; } Lock lock = null; for (int i=0; i < 3; i++) { @@ -1005,7 +1005,7 @@ break; } } if (lock == null) { throw new Exception("could not lock entry " + dn); @@ -1062,7 +1062,7 @@ Thread.sleep(2000); ChangelogBroker broker = openChangelogSession(baseDn, (short) 11, 100, 8989, 1000); openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true); try { ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0); opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -30,33 +30,56 @@ import org.opends.server.TestCaseUtils; import org.opends.server.config.ConfigEntry; import org.opends.server.core.DirectoryServer; import org.opends.server.synchronization.SynchronizationTestCase; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ChangeNumberGenerator; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.plugin.ChangelogBroker; import org.opends.server.synchronization.protocol.DeleteMsg; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.util.TimeThread; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.*; /** * Tests for the chngelog service code. * Tests for the changelog service code. */ public class ChangelogTest extends SynchronizationTestCase { /** * Basic test of the changelog code. * Create a changelog server, connect 2 clients and exchange * messages between the clients. * The changelog server that will be used in this test. */ @Test() public void changelogBasic() throws Exception private Changelog changelog = null; /** * The port of the changelog server. */ private int changelogPort; private ChangeNumber firstChangeNumberServer1 = null; private ChangeNumber secondChangeNumberServer1 = null; private ChangeNumber firstChangeNumberServer2 = null; private ChangeNumber secondChangeNumberServer2 = null; /** * Before starting the tests, start the server and configure a * changelog server. */ @BeforeClass() public void configure() throws Exception { // find a free port TestCaseUtils.startServer(); // find a free port for the changelog server ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); changelogPort = socket.getLocalPort(); socket.close(); String changelogLdif = @@ -65,39 +88,363 @@ + "objectClass: ds-cfg-synchronization-changelog-server-config\n" + "cn: Changelog Server\n" + "ds-cfg-changelog-port: "+ changelogPort + "\n" + "ds-cfg-changelog-server-id: 1\n"; + "ds-cfg-changelog-server-id: 1\n" + "ds-cfg-window-size: 100"; Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); ConfigEntry changelogConfig = new ConfigEntry(tmp, null); Changelog changelog = new Changelog(changelogConfig); changelog = new Changelog(changelogConfig); } ChangelogBroker broker1 = null; ChangelogBroker broker2 = null; /** * Basic test of the changelog code : * Connect 2 clients to the changelog server and exchange messages * between the clients. * * Note : Other tests in this file depends on this test and may need to * change if this test is modified. */ @Test() public void changelogBasic() throws Exception { ChangelogBroker server1 = null; ChangelogBroker server2 = null; try { broker1 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000); broker2 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000); /* * Open a sender session and a receiver session to the changelog */ server1 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000, true); server2 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000, true); ChangeNumber cn = new ChangeNumber((long) 1, 1, (short)1); DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", cn, "uid"); broker1.publish(msg); SynchronizationMessage msg2 = broker2.receive(); /* * Create change numbers for the messages sent from server 1 * with current time sequence 1 and with current time + 2 sequence 2 */ long time = TimeThread.getTime(); firstChangeNumberServer1 = new ChangeNumber(time, 1, (short) 1); secondChangeNumberServer1 = new ChangeNumber(time + 2, 2, (short) 1); /* * Create change numbers for the messages sent from server 2 * with current time sequence 1 and with current time + 3 sequence 2 */ firstChangeNumberServer2 = new ChangeNumber(time+ 1, 1, (short) 2); secondChangeNumberServer2 = new ChangeNumber(time + 3, 2, (short) 2); /* * Send and receive a Delete Msg from server 1 to server 2 */ DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer1, "uid"); server1.publish(msg); SynchronizationMessage msg2 = server2.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg2.toString())); assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog transmission failed"); fail("Changelog basic : incorrect message type received."); /* * Send and receive a second Delete Msg */ msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid"); server1.publish(msg); msg2 = server2.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); /* * Send and receive a Delete Msg from server 1 to server 2 */ msg = new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2, "other-uid"); server2.publish(msg); msg2 = server1.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); /* * Send and receive a second Delete Msg */ msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid"); server2.publish(msg); msg2 = server1.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); } finally { if (server1 != null) server1.stop(); if (server2 != null) server2.stop(); } } /** * Test that a new client see the change that was sent in the * previous test. */ @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) public void newClient() throws Exception { ChangelogBroker broker = null; try { broker = openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3, 100, changelogPort, 1000, false); SynchronizationMessage msg2 = broker.receive(); if (!(msg2 instanceof DeleteMsg)) fail("Changelog basic transmission failed"); else { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1), "The first message received by a new client was the wrong one." + del.getChangeNumber() + " " + firstChangeNumberServer1); } } finally { if (broker != null) broker.stop(); } } /** * Test that a client that has already seen some changes now receive * the correct next change. */ private void newClientWithChanges( ServerState state, ChangeNumber nextChangeNumber) throws Exception { ChangelogBroker broker = null; /* * Connect to the changelog server using the state created above. */ try { broker = openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3, 100, changelogPort, 1000, state); SynchronizationMessage msg2 = broker.receive(); if (!(msg2 instanceof DeleteMsg)) fail("Changelog basic transmission failed"); else { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.getChangeNumber().equals(nextChangeNumber), "The second message received by a new client was the wrong one." + del.getChangeNumber() + " " + nextChangeNumber); } } finally { if (broker != null) broker.stop(); } } /** * Test that a client that has already seen the first change now see the * second change */ @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) public void newClientWithFirstChanges() throws Exception { /* * Create a ServerState updated with the first changes from both servers * done in test changelogBasic. */ ServerState state = new ServerState(); state.update(firstChangeNumberServer1); state.update(firstChangeNumberServer2); newClientWithChanges(state, secondChangeNumberServer1); } /** * Test that a client that has already seen the first change from server 1 * now see the first change from server 2 */ @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) public void newClientWithChangefromServer1() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(firstChangeNumberServer1); newClientWithChanges(state, firstChangeNumberServer2); } /** * Test that a client that has already seen the first chaneg from server 2 * now see the first change from server 1 */ @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) public void newClientWithChangefromServer2() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(firstChangeNumberServer2); newClientWithChanges(state, firstChangeNumberServer1); } /** * Test that a client that has not seen the second change from server 1 * now receive it. */ @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) public void newClientLateServer1() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(secondChangeNumberServer2); state.update(firstChangeNumberServer1); newClientWithChanges(state, secondChangeNumberServer1); } /** * Test that newClient() and newClientWithFirstChange() still works * after stopping and restarting the changelog server. */ @Test(enabled=false, dependsOnMethods = { "changelogBasic" }) public void stopChangelog() throws Exception { changelog.shutdown(); configure(); newClient(); newClientWithFirstChanges(); newClientWithChangefromServer1(); newClientWithChangefromServer2(); } /** * Stress test from client using the ChangelogBroker API * to the changelog server. */ @Test(enabled=false, groups="slow") public void stressFromBrokertoChangelog() throws Exception { ChangelogBroker server = null; try { /* * Open a sender session */ server = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort, 1000, true); BrokerReader reader = new BrokerReader(server); reader.start(); ChangeNumberGenerator gen = new ChangeNumberGenerator((short)5 , (long) 0); /* * Simple loop creating changes and sending them * to the changelog server. */ for (int i = 0; i< 100000; i++) { DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", gen.NewChangeNumber(), "uid"); server.publish(msg); } } finally { if (server != null) server.stop(); } } /** * After the tests stop the changelog server. */ @AfterClass() public void shutdown() throws Exception { if (changelog != null) changelog.shutdown(); if (broker1 != null) broker1.stop(); if (broker2 != null) broker2.stop(); } /** * Continuously reads messages from a changelog broker until there is nothing * left. Count the number of received messages. */ private class BrokerReader extends Thread { private ChangelogBroker broker; /** * Creates a new Stress Test Reader * @param broker */ public BrokerReader(ChangelogBroker broker) { this.broker = broker; } /** * {@inheritDoc} */ @Override public void run() { // loop receiving messages until either we get a timeout // because there is nothing left or an error condition happens. try { while (true) { SynchronizationMessage msg = broker.receive(); if (msg == null) break; } } catch (Exception e) { } } } }