opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -418,7 +418,7 @@ ServerHandler handler = new ServerHandler( new SocketSession(socket), queueSize); handler.start(baseDn, serverId, serverURL, rcvWindow, this); handler.start(baseDn, serverId, this.serverURL, rcvWindow, this); } catch (IOException e) { @@ -545,6 +545,7 @@ } dbEnv.shutdown(); DirectoryServer.deregisterConfigurableComponent(this); } opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
@@ -170,10 +170,6 @@ public ChangelogCursor openReadCursor(ChangeNumber changeNumber) throws DatabaseException, Exception { if (changeNumber == null) changeNumber = readFirstChange(); if (changeNumber == null) return null; return new ChangelogCursor(changeNumber); } @@ -319,13 +315,16 @@ { cursor = db.openCursor(txn, null); DatabaseEntry key = new ChangelogKey(startingChangeNumber); DatabaseEntry data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (startingChangeNumber != null) { throw new Exception("ChangeNumber not available"); key = new ChangelogKey(startingChangeNumber); data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { throw new Exception("ChangeNumber not available"); } } } @@ -373,7 +372,7 @@ } /** * Get the next ChangeNumber inthe database from this Cursor. * Get the next ChangeNumber in the database from this Cursor. * * @return The next ChangeNumber in the database from this cursor. * @throws DatabaseException In case of underlying database problem. opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -81,6 +81,7 @@ private boolean shutdown = false; private boolean done = false; private DirectoryThread thread = null; private Object flushLock = new Object(); /** * Creates a New dbHandler associated to a given LDAP server. @@ -204,6 +205,12 @@ public ChangelogIterator generateIterator(ChangeNumber changeNumber) throws DatabaseException, Exception { /* * make sure to flush some changes in the database so that * we don't create the iterator on an empty database when the * dbHandler has just been started. */ flush(); return new ChangelogIterator(serverId, db, changeNumber); } @@ -320,17 +327,22 @@ while ((size < 5000 ) && (!finished)) { ChangeNumber changeNumber = cursor.nextChangeNumber(); if ((changeNumber != null) && (!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) if (changeNumber != null) { size++; cursor.delete(); if ((!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) { size++; cursor.delete(); } else { firstChange = changeNumber; finished = true; } } else { firstChange = changeNumber; finished = true; } } cursor.close(); @@ -350,19 +362,21 @@ do { // get N messages to save in the DB List<UpdateMessage> changes = getChanges(500); synchronized(flushLock) { // get N messages to save in the DB List<UpdateMessage> changes = getChanges(500); // if no more changes to save exit immediately. if ((changes == null) || ((size = changes.size()) == 0)) return; // if no more changes to save exit immediately. if ((changes == null) || ((size = changes.size()) == 0)) return; // save the change to the stable storage. db.addEntries(changes); // save the change to the stable storage. db.addEntries(changes); // remove the changes from the list of changes to be saved. clear(changes.size()); // remove the changes from the list of changes to be saved. clear(changes.size()); } } while (size >=500); } @@ -387,19 +401,17 @@ attributes.add(new Attribute("changelog-database", String.valueOf(serverId))); attributes.add(new Attribute("base-dn", baseDn.toString())); ChangeNumber first = getFirstChange(); ChangeNumber last = getLastChange(); if (first != null) if (firstChange != null) { Date firstTime = new Date(first.getTime()); Date firstTime = new Date(firstChange.getTime()); attributes.add(new Attribute("first-change", first.toString() + " " + firstTime.toString())); firstChange.toString() + " " + firstTime.toString())); } if (last != null) if (lastChange != null) { Date lastTime = new Date(last.getTime()); Date lastTime = new Date(lastChange.getTime()); attributes.add(new Attribute("last-change", last.toString() + " " + lastTime.toString())); lastChange.toString() + " " + lastTime.toString())); } return attributes; opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -118,57 +118,18 @@ return; savedStatus = true; ArrayList<ASN1OctetString> values = this.toASN1ArrayList(); if (values.size() == 0) return; LDAPAttribute attr = new LDAPAttribute(SYNCHRONIZATION_STATE, values); LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1); mods.add(mod); boolean done = false; while (!done) ResultCode resultCode = updateStateEntry(); if (resultCode != ResultCode.SUCCESS) { /* * Generate a modify operation on the Server State Entry : * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn */ ModifyOperation op = new ModifyOperation(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), new ArrayList<Control>(0), serverStateAsn1Dn, mods); op.setInternalOperation(true); op.setSynchronizationOperation(true); op.run(); ResultCode resultCode = op.getResultCode(); if (resultCode != ResultCode.SUCCESS) if (resultCode == ResultCode.NO_SUCH_OBJECT) { if (resultCode == ResultCode.NO_SUCH_OBJECT) { createStateEntry(); } else { savedStatus = false; int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); break; } createStateEntry(); } else done = true; { savedStatus = false; } } } @@ -297,6 +258,51 @@ } /** * Save the current values of this PersistentState object * in the appropiate entry of the database. * * @return a ResultCode indicating if the method was successfull. */ private ResultCode updateStateEntry() { /* * Generate a modify operation on the Server State Entry : * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn */ ArrayList<ASN1OctetString> values = this.toASN1ArrayList(); if (values.size() == 0) return ResultCode.SUCCESS; LDAPAttribute attr = new LDAPAttribute(SYNCHRONIZATION_STATE, values); LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1); mods.add(mod); ModifyOperation op = new ModifyOperation(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), new ArrayList<Control>(0), serverStateAsn1Dn, mods); op.setInternalOperation(true); op.setSynchronizationOperation(true); op.run(); ResultCode result = op.getResultCode(); if (result != ResultCode.SUCCESS) { int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } return result; } /** * Get the Dn where the ServerState is stored. * @return Returns the serverStateDn. */ opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -99,7 +99,7 @@ final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); ChangelogBroker broker = openChangelogSession(baseDn, (short) 13, WINDOW_SIZE, 8989, 1000); WINDOW_SIZE, 8989, 1000, true); try { opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -112,7 +112,7 @@ cleanEntries(); ChangelogBroker broker = openChangelogSession(baseDn, (short) 18, 100, 8989, 5000); openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true); Monitor monitor = new Monitor("stress test monitor"); DirectoryServer.registerMonitorProvider(monitor); opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -41,6 +41,7 @@ import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.plugin.ChangelogBroker; import org.opends.server.synchronization.plugin.MultimasterSynchronization; import org.opends.server.synchronization.plugin.PersistentServerState; @@ -52,7 +53,7 @@ import org.testng.annotations.BeforeClass; /** * An abstract class that all synchronization unit test should extend. * An abstract class that all synchronization unit test should extend. */ @Test(groups = { "precommit", "synchronization" }) public abstract class SynchronizationTestCase extends DirectoryServerTestCase @@ -91,7 +92,7 @@ /** * Set up the environment for performing the tests in this suite. * * * @throws Exception * If the environment could not be set up. */ @@ -100,21 +101,23 @@ { // This test suite depends on having the schema available. TestCaseUtils.startServer(); // Create an internal connection connection = new InternalClientConnection(); } /** * Open a changelog session to the local Changelog server. * */ protected ChangelogBroker openChangelogSession( final DN baseDn, short serverId, int window_size, int port, int timeout) final DN baseDn, short serverId, int window_size, int port, int timeout, boolean emptyOldChanges) throws Exception, SocketException { PersistentServerState state = new PersistentServerState(baseDn); state.loadState(); if (emptyOldChanges) state.loadState(); ChangelogBroker broker = new ChangelogBroker( state, baseDn, serverId, 0, 0, 0, 0, window_size); ArrayList<String> servers = new ArrayList<String>(1); @@ -122,23 +125,46 @@ broker.start(servers); if (timeout != 0) broker.setSoTimeout(timeout); /* * loop receiving update until there is nothing left * to make sure that message from previous tests have been consumed. */ try if (emptyOldChanges) { while (true) /* * loop receiving update until there is nothing left * to make sure that message from previous tests have been consumed. */ try { broker.receive(); while (true) { broker.receive(); } } catch (Exception e) { } } catch (Exception e) { } return broker; } /** * Open a new session to the Changelog Server * starting with a given ServerState. */ protected ChangelogBroker openChangelogSession( final DN baseDn, short serverId, int window_size, int port, int timeout, ServerState state) throws Exception, SocketException { ChangelogBroker broker = new ChangelogBroker( state, baseDn, serverId, 0, 0, 0, 0, window_size); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + port); broker.start(servers); if (timeout != 0) broker.setSoTimeout(timeout); return broker; } /** * suppress all the entries created by the tests in this class */ protected void cleanEntries() @@ -149,11 +175,11 @@ { while (true) { DN dn = entryList.removeLast(); DN dn = entryList.removeLast(); op = new DeleteOperation(connection, InternalClientConnection .nextOperationID(), InternalClientConnection.nextMessageID(), null, dn); op.run();; } } @@ -172,7 +198,7 @@ public void classCleanUp() throws Exception { DirectoryServer.setCheckSchema(schemaCheck); // WORKAROUND FOR BUG #639 - BEGIN - if (mms != null) { @@ -180,7 +206,7 @@ mms.finalizeSynchronizationProvider(); } // WORKAROUND FOR BUG #639 - END - cleanEntries(); } @@ -196,7 +222,7 @@ assertNotNull(DirectoryServer.getConfigEntry(DN .decode(synchroPluginStringDN)), "Unable to add the Multimaster synchronization plugin"); // WORKAROUND FOR BUG #639 - BEGIN - DN dn = DN.decode(synchroPluginStringDN); ConfigEntry mmsConfigEntry = DirectoryServer.getConfigEntry(dn); @@ -212,14 +238,14 @@ } DirectoryServer.registerSynchronizationProvider(mms); // WORKAROUND FOR BUG #639 - END - // // Add the changelog server DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), "Unable to add the changeLog server"); entryList.add(changeLogEntry.getDN()); // // We also have a replicated suffix (synchronization domain) DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -249,7 +249,7 @@ * This must use a serverId different from the LDAP server ID */ ChangelogBroker broker = openChangelogSession(baseDn, (short) 2, 100, 8989, 1000); openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true); /* * Create a Change number generator to generate new changenumbers @@ -562,7 +562,7 @@ cleanEntries(); ChangelogBroker broker = openChangelogSession(baseDn, (short) 27, 100, 8989, 1000); openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true); try { ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0); @@ -964,7 +964,7 @@ Thread.sleep(2000); ChangelogBroker broker = openChangelogSession(baseDn, (short) 11, 100, 8989, 1000); openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true); try { ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0); opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -32,31 +32,52 @@ import org.opends.server.config.ConfigEntry; import org.opends.server.synchronization.SynchronizationTestCase; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.plugin.ChangelogBroker; import org.opends.server.synchronization.protocol.DeleteMsg; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.util.TimeThread; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.*; /** * Tests for the chngelog service code. * Tests for the changelog service code. */ public class ChangelogTest extends SynchronizationTestCase { /** * Basic test of the changelog code. * Create a changelog server, connect 2 clients and exchange * messages between the clients. * The changelog server that will be used in this test. */ @Test() public void changelogBasic() throws Exception private Changelog changelog = null; /** * The port of the changelog server. */ private int changelogPort; private ChangeNumber firstChangeNumberServer1 = null; private ChangeNumber secondChangeNumberServer1 = null; private ChangeNumber firstChangeNumberServer2 = null; private ChangeNumber secondChangeNumberServer2 = null; /** * Before starting the tests, start the server and configure a * changelog server. */ @BeforeClass() public void configure() throws Exception { // find a free port TestCaseUtils.startServer(); // find a free port for the changelog server ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); changelogPort = socket.getLocalPort(); socket.close(); String changelogLdif = @@ -68,36 +89,280 @@ + "ds-cfg-changelog-server-id: 1\n"; Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); ConfigEntry changelogConfig = new ConfigEntry(tmp, null); Changelog changelog = new Changelog(changelogConfig); changelog = new Changelog(changelogConfig); } ChangelogBroker broker1 = null; ChangelogBroker broker2 = null; /** * Basic test of the changelog code : * Connect 2 clients to the changelog server and exchange messages * between the clients. * * Note : Other tests in this file depends on this test and may need to * change if this test is modified. */ @Test() public void changelogBasic() throws Exception { ChangelogBroker server1 = null; ChangelogBroker server2 = null; try { broker1 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000); broker2 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000); ChangeNumber cn = new ChangeNumber((long) 1, 1, (short)1); DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", cn, "uid"); broker1.publish(msg); SynchronizationMessage msg2 = broker2.receive(); /* * Open a sender session and a receiver session to the changelog */ server1 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000, true); server2 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000, true); /* * Create change numbers for the messages sent from server 1 * with current time sequence 1 and with current time + 2 sequence 2 */ long time = TimeThread.getTime(); firstChangeNumberServer1 = new ChangeNumber(time, 1, (short) 1); secondChangeNumberServer1 = new ChangeNumber(time + 2, 2, (short) 1); /* * Create change numbers for the messages sent from server 2 * with current time sequence 1 and with current time + 3 sequence 2 */ firstChangeNumberServer2 = new ChangeNumber(time+ 1, 1, (short) 2); secondChangeNumberServer2 = new ChangeNumber(time + 3, 2, (short) 2); /* * Send and receive a Delete Msg from server 1 to server 2 */ DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer1, "uid"); server1.publish(msg); SynchronizationMessage msg2 = server2.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg2.toString())); assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog transmission failed"); fail("Changelog basic : incorrect message type received."); /* * Send and receive a second Delete Msg */ msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid"); server1.publish(msg); msg2 = server2.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); /* * Send and receive a Delete Msg from server 1 to server 2 */ msg = new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2, "other-uid"); server2.publish(msg); msg2 = server1.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); /* * Send and receive a second Delete Msg */ msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid"); server2.publish(msg); msg2 = server1.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); } finally { changelog.shutdown(); if (broker1 != null) broker1.stop(); if (broker2 != null) broker2.stop(); if (server1 != null) server1.stop(); if (server2 != null) server2.stop(); } } /** * Test that a new client see the change that was sent in the * previous test. */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClient() throws Exception { ChangelogBroker broker = null; try { broker = openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3, 100, changelogPort, 1000, false); SynchronizationMessage msg2 = broker.receive(); if (!(msg2 instanceof DeleteMsg)) fail("Changelog basic transmission failed"); else { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1), "The first message received by a new client was the wrong one." + del.getChangeNumber() + " " + firstChangeNumberServer1); } } finally { if (broker != null) broker.stop(); } } /** * Test that a client that has already seen some changes now receive * the correct next change. */ private void newClientWithChanges( ServerState state, ChangeNumber nextChangeNumber) throws Exception { ChangelogBroker broker = null; /* * Connect to the changelog server using the state created above. */ try { broker = openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3, 100, changelogPort, 1000, state); SynchronizationMessage msg2 = broker.receive(); if (!(msg2 instanceof DeleteMsg)) fail("Changelog basic transmission failed"); else { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.getChangeNumber().equals(nextChangeNumber), "The second message received by a new client was the wrong one." + del.getChangeNumber() + " " + nextChangeNumber); } } finally { if (broker != null) broker.stop(); } } /** * Test that a client that has already seen the first change now see the * second change */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClientWithFirstChanges() throws Exception { /* * Create a ServerState updated with the first changes from both servers * done in test changelogBasic. */ ServerState state = new ServerState(); state.update(firstChangeNumberServer1); state.update(firstChangeNumberServer2); newClientWithChanges(state, secondChangeNumberServer1); } /** * Test that a client that has already seen the first change from server 1 * now see the first change from server 2 */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClientWithChangefromServer1() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(firstChangeNumberServer1); newClientWithChanges(state, firstChangeNumberServer2); } /** * Test that a client that has already seen the first chaneg from server 2 * now see the first change from server 1 */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClientWithChangefromServer2() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(firstChangeNumberServer2); newClientWithChanges(state, firstChangeNumberServer1); } /** * Test that a client that has not seen the second change from server 1 * now receive it. */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClientLateServer1() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(secondChangeNumberServer2); state.update(firstChangeNumberServer1); newClientWithChanges(state, secondChangeNumberServer1); } /** * Test that newClient() and newClientWithFirstChange() still works * after stopping and restarting the changelog server. */ @Test(dependsOnMethods = { "changelogBasic" }) public void stopChangelog() throws Exception { changelog.shutdown(); configure(); newClient(); newClientWithFirstChanges(); newClientWithChangefromServer1(); newClientWithChangefromServer2(); } /** * After the tests stop the changelog server. */ @AfterClass() public void shutdown() throws Exception { if (changelog != null) changelog.shutdown(); } }