opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -418,7 +418,7 @@ ServerHandler handler = new ServerHandler( new SocketSession(socket), queueSize); handler.start(baseDn, serverId, serverURL, rcvWindow, this); handler.start(baseDn, serverId, this.serverURL, rcvWindow, this); } catch (IOException e) { @@ -545,6 +545,7 @@ } dbEnv.shutdown(); DirectoryServer.deregisterConfigurableComponent(this); } opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogDB.java
@@ -170,10 +170,6 @@ public ChangelogCursor openReadCursor(ChangeNumber changeNumber) throws DatabaseException, Exception { if (changeNumber == null) changeNumber = readFirstChange(); if (changeNumber == null) return null; return new ChangelogCursor(changeNumber); } @@ -319,8 +315,10 @@ { cursor = db.openCursor(txn, null); DatabaseEntry key = new ChangelogKey(startingChangeNumber); DatabaseEntry data = new DatabaseEntry(); if (startingChangeNumber != null) { key = new ChangelogKey(startingChangeNumber); data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) @@ -328,6 +326,7 @@ throw new Exception("ChangeNumber not available"); } } } private ChangelogCursor() throws DatabaseException { opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -81,6 +81,7 @@ private boolean shutdown = false; private boolean done = false; private DirectoryThread thread = null; private Object flushLock = new Object(); /** * Creates a New dbHandler associated to a given LDAP server. @@ -204,6 +205,12 @@ public ChangelogIterator generateIterator(ChangeNumber changeNumber) throws DatabaseException, Exception { /* * make sure to flush some changes in the database so that * we don't create the iterator on an empty database when the * dbHandler has just been started. */ flush(); return new ChangelogIterator(serverId, db, changeNumber); } @@ -320,7 +327,9 @@ while ((size < 5000 ) && (!finished)) { ChangeNumber changeNumber = cursor.nextChangeNumber(); if ((changeNumber != null) && (!changeNumber.equals(lastChange)) if (changeNumber != null) { if ((!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) { size++; @@ -332,6 +341,9 @@ finished = true; } } else finished = true; } cursor.close(); } catch (DatabaseException e) @@ -350,6 +362,8 @@ do { synchronized(flushLock) { // get N messages to save in the DB List<UpdateMessage> changes = getChanges(500); @@ -362,7 +376,7 @@ // remove the changes from the list of changes to be saved. clear(changes.size()); } } while (size >=500); } @@ -387,19 +401,17 @@ attributes.add(new Attribute("changelog-database", String.valueOf(serverId))); attributes.add(new Attribute("base-dn", baseDn.toString())); ChangeNumber first = getFirstChange(); ChangeNumber last = getLastChange(); if (first != null) if (firstChange != null) { Date firstTime = new Date(first.getTime()); Date firstTime = new Date(firstChange.getTime()); attributes.add(new Attribute("first-change", first.toString() + " " + firstTime.toString())); firstChange.toString() + " " + firstTime.toString())); } if (last != null) if (lastChange != null) { Date lastTime = new Date(last.getTime()); Date lastTime = new Date(lastChange.getTime()); attributes.add(new Attribute("last-change", last.toString() + " " + lastTime.toString())); lastChange.toString() + " " + lastTime.toString())); } return attributes; opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/PersistentServerState.java
@@ -118,35 +118,7 @@ return; savedStatus = true; ArrayList<ASN1OctetString> values = this.toASN1ArrayList(); if (values.size() == 0) return; LDAPAttribute attr = new LDAPAttribute(SYNCHRONIZATION_STATE, values); LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1); mods.add(mod); boolean done = false; while (!done) { /* * Generate a modify operation on the Server State Entry : * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn */ ModifyOperation op = new ModifyOperation(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), new ArrayList<Control>(0), serverStateAsn1Dn, mods); op.setInternalOperation(true); op.setSynchronizationOperation(true); op.run(); ResultCode resultCode = op.getResultCode(); ResultCode resultCode = updateStateEntry(); if (resultCode != ResultCode.SUCCESS) { if (resultCode == ResultCode.NO_SUCH_OBJECT) @@ -156,20 +128,9 @@ else { savedStatus = false; int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); break; } } else done = true; } } /** @@ -297,6 +258,51 @@ } /** * Save the current values of this PersistentState object * in the appropiate entry of the database. * * @return a ResultCode indicating if the method was successfull. */ private ResultCode updateStateEntry() { /* * Generate a modify operation on the Server State Entry : * cn=ffffffff-ffffffff-ffffffff-ffffffff, baseDn */ ArrayList<ASN1OctetString> values = this.toASN1ArrayList(); if (values.size() == 0) return ResultCode.SUCCESS; LDAPAttribute attr = new LDAPAttribute(SYNCHRONIZATION_STATE, values); LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); ArrayList<LDAPModification> mods = new ArrayList<LDAPModification>(1); mods.add(mod); ModifyOperation op = new ModifyOperation(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), new ArrayList<Control>(0), serverStateAsn1Dn, mods); op.setInternalOperation(true); op.setSynchronizationOperation(true); op.run(); ResultCode result = op.getResultCode(); if (result != ResultCode.SUCCESS) { int msgID = MSGID_ERROR_UPDATING_RUV; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), op.toString(), op.getErrorMessage(), baseDn.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } return result; } /** * Get the Dn where the ServerState is stored. * @return Returns the serverStateDn. */ opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ProtocolWindowTest.java
@@ -99,7 +99,7 @@ final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); ChangelogBroker broker = openChangelogSession(baseDn, (short) 13, WINDOW_SIZE, 8989, 1000); WINDOW_SIZE, 8989, 1000, true); try { opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/StressTest.java
@@ -112,7 +112,7 @@ cleanEntries(); ChangelogBroker broker = openChangelogSession(baseDn, (short) 18, 100, 8989, 5000); openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true); Monitor monitor = new Monitor("stress test monitor"); DirectoryServer.registerMonitorProvider(monitor); opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -41,6 +41,7 @@ import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.plugin.ChangelogBroker; import org.opends.server.synchronization.plugin.MultimasterSynchronization; import org.opends.server.synchronization.plugin.PersistentServerState; @@ -110,10 +111,12 @@ * */ protected ChangelogBroker openChangelogSession( final DN baseDn, short serverId, int window_size, int port, int timeout) final DN baseDn, short serverId, int window_size, int port, int timeout, boolean emptyOldChanges) throws Exception, SocketException { PersistentServerState state = new PersistentServerState(baseDn); if (emptyOldChanges) state.loadState(); ChangelogBroker broker = new ChangelogBroker( state, baseDn, serverId, 0, 0, 0, 0, window_size); @@ -122,6 +125,8 @@ broker.start(servers); if (timeout != 0) broker.setSoTimeout(timeout); if (emptyOldChanges) { /* * loop receiving update until there is nothing left * to make sure that message from previous tests have been consumed. @@ -135,6 +140,27 @@ } catch (Exception e) { } } return broker; } /** * Open a new session to the Changelog Server * starting with a given ServerState. */ protected ChangelogBroker openChangelogSession( final DN baseDn, short serverId, int window_size, int port, int timeout, ServerState state) throws Exception, SocketException { ChangelogBroker broker = new ChangelogBroker( state, baseDn, serverId, 0, 0, 0, 0, window_size); ArrayList<String> servers = new ArrayList<String>(1); servers.add("localhost:" + port); broker.start(servers); if (timeout != 0) broker.setSoTimeout(timeout); return broker; } opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -249,7 +249,7 @@ * This must use a serverId different from the LDAP server ID */ ChangelogBroker broker = openChangelogSession(baseDn, (short) 2, 100, 8989, 1000); openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true); /* * Create a Change number generator to generate new changenumbers @@ -562,7 +562,7 @@ cleanEntries(); ChangelogBroker broker = openChangelogSession(baseDn, (short) 27, 100, 8989, 1000); openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true); try { ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0); @@ -964,7 +964,7 @@ Thread.sleep(2000); ChangelogBroker broker = openChangelogSession(baseDn, (short) 11, 100, 8989, 1000); openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true); try { ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 11, 0); opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -32,31 +32,52 @@ import org.opends.server.config.ConfigEntry; import org.opends.server.synchronization.SynchronizationTestCase; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ServerState; import org.opends.server.synchronization.plugin.ChangelogBroker; import org.opends.server.synchronization.protocol.DeleteMsg; import org.opends.server.synchronization.protocol.SynchronizationMessage; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.util.TimeThread; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.*; /** * Tests for the chngelog service code. * Tests for the changelog service code. */ public class ChangelogTest extends SynchronizationTestCase { /** * Basic test of the changelog code. * Create a changelog server, connect 2 clients and exchange * messages between the clients. * The changelog server that will be used in this test. */ @Test() public void changelogBasic() throws Exception private Changelog changelog = null; /** * The port of the changelog server. */ private int changelogPort; private ChangeNumber firstChangeNumberServer1 = null; private ChangeNumber secondChangeNumberServer1 = null; private ChangeNumber firstChangeNumberServer2 = null; private ChangeNumber secondChangeNumberServer2 = null; /** * Before starting the tests, start the server and configure a * changelog server. */ @BeforeClass() public void configure() throws Exception { // find a free port TestCaseUtils.startServer(); // find a free port for the changelog server ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); changelogPort = socket.getLocalPort(); socket.close(); String changelogLdif = @@ -68,36 +89,280 @@ + "ds-cfg-changelog-server-id: 1\n"; Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); ConfigEntry changelogConfig = new ConfigEntry(tmp, null); Changelog changelog = new Changelog(changelogConfig); changelog = new Changelog(changelogConfig); } ChangelogBroker broker1 = null; ChangelogBroker broker2 = null; /** * Basic test of the changelog code : * Connect 2 clients to the changelog server and exchange messages * between the clients. * * Note : Other tests in this file depends on this test and may need to * change if this test is modified. */ @Test() public void changelogBasic() throws Exception { ChangelogBroker server1 = null; ChangelogBroker server2 = null; try { broker1 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000); broker2 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000); /* * Open a sender session and a receiver session to the changelog */ server1 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, 1000, true); server2 = openChangelogSession( DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, 1000, true); ChangeNumber cn = new ChangeNumber((long) 1, 1, (short)1); DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", cn, "uid"); broker1.publish(msg); SynchronizationMessage msg2 = broker2.receive(); /* * Create change numbers for the messages sent from server 1 * with current time sequence 1 and with current time + 2 sequence 2 */ long time = TimeThread.getTime(); firstChangeNumberServer1 = new ChangeNumber(time, 1, (short) 1); secondChangeNumberServer1 = new ChangeNumber(time + 2, 2, (short) 1); /* * Create change numbers for the messages sent from server 2 * with current time sequence 1 and with current time + 3 sequence 2 */ firstChangeNumberServer2 = new ChangeNumber(time+ 1, 1, (short) 2); secondChangeNumberServer2 = new ChangeNumber(time + 3, 2, (short) 2); /* * Send and receive a Delete Msg from server 1 to server 2 */ DeleteMsg msg = new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer1, "uid"); server1.publish(msg); SynchronizationMessage msg2 = server2.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg2.toString())); assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog transmission failed"); fail("Changelog basic : incorrect message type received."); /* * Send and receive a second Delete Msg */ msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid"); server1.publish(msg); msg2 = server2.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); /* * Send and receive a Delete Msg from server 1 to server 2 */ msg = new DeleteMsg("o=test,dc=example,dc=com", firstChangeNumberServer2, "other-uid"); server2.publish(msg); msg2 = server1.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); /* * Send and receive a second Delete Msg */ msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid"); server2.publish(msg); msg2 = server1.receive(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.toString().equals(msg.toString()), "Changelog basic : incorrect message body received."); } else fail("Changelog basic : incorrect message type received."); } finally { changelog.shutdown(); if (broker1 != null) broker1.stop(); if (broker2 != null) broker2.stop(); if (server1 != null) server1.stop(); if (server2 != null) server2.stop(); } } /** * Test that a new client see the change that was sent in the * previous test. */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClient() throws Exception { ChangelogBroker broker = null; try { broker = openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3, 100, changelogPort, 1000, false); SynchronizationMessage msg2 = broker.receive(); if (!(msg2 instanceof DeleteMsg)) fail("Changelog basic transmission failed"); else { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1), "The first message received by a new client was the wrong one." + del.getChangeNumber() + " " + firstChangeNumberServer1); } } finally { if (broker != null) broker.stop(); } } /** * Test that a client that has already seen some changes now receive * the correct next change. */ private void newClientWithChanges( ServerState state, ChangeNumber nextChangeNumber) throws Exception { ChangelogBroker broker = null; /* * Connect to the changelog server using the state created above. */ try { broker = openChangelogSession(DN.decode("dc=example,dc=com"), (short) 3, 100, changelogPort, 1000, state); SynchronizationMessage msg2 = broker.receive(); if (!(msg2 instanceof DeleteMsg)) fail("Changelog basic transmission failed"); else { DeleteMsg del = (DeleteMsg) msg2; assertTrue(del.getChangeNumber().equals(nextChangeNumber), "The second message received by a new client was the wrong one." + del.getChangeNumber() + " " + nextChangeNumber); } } finally { if (broker != null) broker.stop(); } } /** * Test that a client that has already seen the first change now see the * second change */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClientWithFirstChanges() throws Exception { /* * Create a ServerState updated with the first changes from both servers * done in test changelogBasic. */ ServerState state = new ServerState(); state.update(firstChangeNumberServer1); state.update(firstChangeNumberServer2); newClientWithChanges(state, secondChangeNumberServer1); } /** * Test that a client that has already seen the first change from server 1 * now see the first change from server 2 */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClientWithChangefromServer1() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(firstChangeNumberServer1); newClientWithChanges(state, firstChangeNumberServer2); } /** * Test that a client that has already seen the first chaneg from server 2 * now see the first change from server 1 */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClientWithChangefromServer2() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(firstChangeNumberServer2); newClientWithChanges(state, firstChangeNumberServer1); } /** * Test that a client that has not seen the second change from server 1 * now receive it. */ @Test(dependsOnMethods = { "changelogBasic" }) public void newClientLateServer1() throws Exception { /* * Create a ServerState updated with the first change from server 1 */ ServerState state = new ServerState(); state.update(secondChangeNumberServer2); state.update(firstChangeNumberServer1); newClientWithChanges(state, secondChangeNumberServer1); } /** * Test that newClient() and newClientWithFirstChange() still works * after stopping and restarting the changelog server. */ @Test(dependsOnMethods = { "changelogBasic" }) public void stopChangelog() throws Exception { changelog.shutdown(); configure(); newClient(); newClientWithFirstChanges(); newClientWithChangefromServer1(); newClientWithChangefromServer2(); } /** * After the tests stop the changelog server. */ @AfterClass() public void shutdown() throws Exception { if (changelog != null) changelog.shutdown(); } }