opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -114,6 +114,7 @@ import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.RoutableMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.replication.service.ReplicationMonitor; import org.opends.server.tasks.TaskUtils; @@ -174,9 +175,62 @@ */ public class LDAPReplicationDomain extends ReplicationDomain implements ConfigurationChangeListener<ReplicationDomainCfg>, AlertGenerator, InternalSearchListener AlertGenerator { /** * This class is used in the session establishment phase * when no Replication Server with all the local changes has been found * and we therefore need to recover them. * A search is then performed on the database using this * internalSearchListener. */ private class ScanSearchListener implements InternalSearchListener { private ChangeNumber startingChangeNumber = null; private ChangeNumber endChangeNumber = null; public ScanSearchListener( ChangeNumber startingChangeNumber, ChangeNumber endChangeNumber) { this.startingChangeNumber = startingChangeNumber; this.endChangeNumber = endChangeNumber; } @Override public void handleInternalSearchEntry( InternalSearchOperation searchOperation, SearchResultEntry searchEntry) throws DirectoryException { // Build the list of Operations that happened on this entry // after startingChangeNumber and before endChangeNumber and // add them to the replayOperations list Iterable<FakeOperation> updates = Historical.generateFakeOperations(searchEntry); for (FakeOperation op : updates) { ChangeNumber cn = op.getChangeNumber(); if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber))) { synchronized (replayOperations) { replayOperations.put(cn, op); } } } } @Override public void handleInternalSearchReference( InternalSearchOperation searchOperation, SearchResultReference searchReference) throws DirectoryException { // Nothing to do. } } /** * The fully-qualified name of this class. */ private static final String CLASS_NAME = @@ -398,6 +452,80 @@ } /** * The thread that is responsible to update the RS to which this domain is * connected in case it is late and there is no RS which is up to date. */ private class RSUpdater extends DirectoryThread { private ChangeNumber startChangeNumber; protected RSUpdater(ChangeNumber replServerMaxChangeNumber) { super("Replication Server Updater for server id " + serverId + " and domain " + baseDn.toString()); this.startChangeNumber = replServerMaxChangeNumber; } /** * {@inheritDoc} */ @Override public void run() { // Replication server is missing some of our changes: let's // send them to him. Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); logError(message); /* * Get all the changes that have not been seen by this * replication server and publish them. */ try { if (buildAndPublishMissingChanges(startChangeNumber, broker)) { message = DEBUG_CHANGES_SENT.get(); logError(message); synchronized(replayOperations) { replayOperations.clear(); } } else { /* * An error happened trying to search for the updates * This server will start accepting again new updates but * some inconsistencies will stay between servers. * Log an error for the repair tool * that will need to re-synchronize the servers. */ message = ERR_CANNOT_RECOVER_CHANGES.get( baseDn.toNormalizedString()); logError(message); } } catch (Exception e) { /* * An error happened trying to search for the updates * This server will start accepting again new updates but * some inconsistencies will stay between servers. * Log an error for the repair tool * that will need to re-synchronize the servers. */ message = ERR_CANNOT_RECOVER_CHANGES.get( baseDn.toNormalizedString()); logError(message); } finally { broker.setRecoveryRequired(false); } } } /** * Creates a new ReplicationDomain using configuration from configEntry. * * @param configuration The configuration of this ReplicationDomain. @@ -490,9 +618,6 @@ saveGenerationId(generationId); } startPublishService(replicationServers, window, heartbeatInterval, configuration.getChangetimeHeartbeatInterval()); /* * ChangeNumberGenerator is used to create new unique ChangeNumbers * for each operation done on this replication domain. @@ -505,6 +630,9 @@ pendingChanges = new PendingChanges(generator, this); startPublishService(replicationServers, window, heartbeatInterval, configuration.getChangetimeHeartbeatInterval()); remotePendingChanges = new RemotePendingChanges(getServerState()); // listen for changes on the configuration @@ -4356,74 +4484,9 @@ if ((ourMaxChangeNumber != null) && (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) { // Replication server is missing some of our changes: let's // send them to him. Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); logError(message); /* * Get all the changes that have not been seen by this * replication server and populate the replayOperations * list. */ InternalSearchOperation op = searchForChangedEntries( baseDn, replServerMaxChangeNumber, this); if (op.getResultCode() != ResultCode.SUCCESS) { /* * An error happened trying to search for the updates * This server will start accepting again new updates but * some inconsistencies will stay between servers. * Log an error for the repair tool * that will need to re-synchronize the servers. */ message = ERR_CANNOT_RECOVER_CHANGES.get( baseDn.toNormalizedString()); logError(message); } else { for (FakeOperation replayOp : replayOperations.tailMap(replServerMaxChangeNumber).values()) { ChangeNumber cn = replayOp.getChangeNumber(); /* * Because the entry returned by the search operation * can contain old historical information, it is * possible that some of the FakeOperation are * actually older than the last ChangeNumber known by * the Replication Server. * In such case don't send the operation. */ if (!cn.newer(replServerMaxChangeNumber)) { continue; } /* * Check if the DeleteOperation has been abandoned before * being processed. This is necessary because the replayOperation * */ if (replayOp instanceof FakeDelOperation) { FakeDelOperation delOp = (FakeDelOperation) replayOp; if (findEntryDN(delOp.getUUID()) != null) { continue; } } message = DEBUG_SENDING_CHANGE.get( replayOp.getChangeNumber().toString()); logError(message); session.publish(replayOp.generateMessage()); } message = DEBUG_CHANGES_SENT.get(); logError(message); } replayOperations.clear(); pendingChanges.setRecovering(true); broker.setRecoveryRequired(true); new RSUpdater(replServerMaxChangeNumber).start(); } } } catch (Exception e) @@ -4437,19 +4500,124 @@ } /** * Build the list of changes that have been processed by this server * after the ChangeNumber given as a parameter and publish them * using the given session. * * @param startingChangeNumber The ChangeNumber whe we need to start the * search * @param session The session to use to publish the changes * * @return A boolean indicating he success of the * operation. * @throws Exception if an Exception happens during the search. */ public boolean buildAndPublishMissingChanges( ChangeNumber startingChangeNumber, ReplicationBroker session) throws Exception { // Trim the changes in replayOperations that are older than // the startingChangeNumber. synchronized (replayOperations) { Iterator<ChangeNumber> it = replayOperations.keySet().iterator(); while (it.hasNext()) { if (it.next().olderOrEqual(startingChangeNumber)) { it.remove(); } else { break; } } } ChangeNumber lastRetrievedChange = null; long missingChangesDelta; InternalSearchOperation op; ChangeNumber currentStartChangeNumber = startingChangeNumber; do { lastRetrievedChange = null; // We can't do the search in one go because we need to // store the results so that we are sure we send the operations // in order and because the list might be large // So we search by interval of 10 seconds // and store the results in the replayOperations list // so that they are sorted before sending them. missingChangesDelta = currentStartChangeNumber.getTime() + 10000; ChangeNumber endChangeNumber = new ChangeNumber( missingChangesDelta, 0xffffffff, serverId); ScanSearchListener listener = new ScanSearchListener(currentStartChangeNumber, endChangeNumber); op = searchForChangedEntries( baseDn, currentStartChangeNumber, endChangeNumber, listener); // Publish and remove all the changes from the replayOperations list // that are older than the endChangeNumber. LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>(); synchronized (replayOperations) { Iterator<FakeOperation> itOp = replayOperations.values().iterator(); while (itOp.hasNext()) { FakeOperation fakeOp = itOp.next(); if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber)) && state.cover(fakeOp.getChangeNumber())) { lastRetrievedChange = fakeOp.getChangeNumber(); opsToSend.add(fakeOp); itOp.remove(); } else { break; } } } for (FakeOperation opToSend : opsToSend) { session.publishRecovery(opToSend.generateMessage()); } opsToSend.clear(); if (lastRetrievedChange != null) { currentStartChangeNumber = lastRetrievedChange; } else { currentStartChangeNumber = endChangeNumber; } } while (pendingChanges.RecoveryUntil(lastRetrievedChange) && (op.getResultCode().equals(ResultCode.SUCCESS))); return op.getResultCode().equals(ResultCode.SUCCESS); } /** * Search for the changes that happened since fromChangeNumber * based on the historical attribute. The only changes that will * be send will be the one generated on the serverId provided in * fromChangeNumber. * @param baseDn the base DN * @param fromChangeNumber The change number from which we want the changes * @param resultListener that will process the entries returned. * @param fromChangeNumber The ChangeNumber from which we want the changes * @param lastChangeNumber The max ChangeNumber that the search should return * @param resultListener The listener that will process the entries returned * @return the internal search operation * @throws Exception when raised. */ public static InternalSearchOperation searchForChangedEntries( DN baseDn, ChangeNumber fromChangeNumber, ChangeNumber lastChangeNumber, InternalSearchListener resultListener) throws Exception { @@ -4457,8 +4625,16 @@ InternalClientConnection.getRootConnection(); Integer serverId = fromChangeNumber.getServerId(); String maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId) + "ffffffff"; String maxValueForId; if (lastChangeNumber == null) { maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId) + "ffffffff"; } else { maxValueForId = lastChangeNumber.toString(); } LDAPFilter filter = LDAPFilter.decode( "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:" @@ -4479,36 +4655,24 @@ } /** * {@inheritDoc} * Search for the changes that happened since fromChangeNumber * based on the historical attribute. The only changes that will * be send will be the one generated on the serverId provided in * fromChangeNumber. * @param baseDn the base DN * @param fromChangeNumber The change number from which we want the changes * @param resultListener that will process the entries returned. * @return the internal search operation * @throws Exception when raised. */ public void handleInternalSearchEntry( InternalSearchOperation searchOperation, SearchResultEntry searchEntry) public static InternalSearchOperation searchForChangedEntries( DN baseDn, ChangeNumber fromChangeNumber, InternalSearchListener resultListener) throws Exception { /* * This call back is called at session establishment phase * for each entry that has been changed by this server and the changes * have not been sent to any Replication Server. * The role of this method is to build equivalent operation from * the historical information and add them in the replayOperations * table. */ Iterable<FakeOperation> updates = Historical.generateFakeOperations(searchEntry); for (FakeOperation op : updates) { replayOperations.put(op.getChangeNumber(), op); } } /** * {@inheritDoc} */ public void handleInternalSearchReference( InternalSearchOperation searchOperation, SearchResultReference searchReference) { // TODO to be implemented return searchForChangedEntries( baseDn, fromChangeNumber, null, resultListener); } opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -212,7 +212,14 @@ { numSentUpdates++; LDAPUpdateMsg updateMsg = firstChange.getMsg(); domain.publish(updateMsg); if (!recoveringOldChanges) { domain.publish(updateMsg); } else { domain.getServerState().update(updateMsg.getChangeNumber()); } } pendingChanges.remove(firstChangeNumber); @@ -248,4 +255,47 @@ _commit(changeNumber, msg); return _pushCommittedChanges(); } private boolean recoveringOldChanges = false; /** * Set the PendingChangesList structure in a mode where it is * waiting for the RS to receive all the previous changes to * be sent before starting to process the changes normally. * In this mode, The Domain does not publish the changes from * the pendingChanges because there are older changes that * need to be published before. * * @param b The recovering status that must be set. */ public void setRecovering(boolean b) { recoveringOldChanges = b; } /** * Allows to update the recovery situation by comparing the ChangeNumber of * the last change that was sent to the ReplicationServer with the * ChangeNumber of the last operation that was taken out of the * PendingChanges list. * If he two match then the recovery is completed and normal procedure can * restart. Otherwise the RSUpdate thread must continue to look for * older changes and no changes can be committed from the pendingChanges list. * * @param recovered The ChangeNumber of the last change that was published * to the ReplicationServer. * * @return A boolean indicating if the recovery is completed (false) * or must continue (true). */ public synchronized boolean RecoveryUntil(ChangeNumber recovered) { ChangeNumber lastLocalChange = domain.getLastLocalChange(); if ((recovered != null) && (recovered.newerOrEquals(lastLocalChange))) { recoveringOldChanges = false; } return recoveringOldChanges; } } opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -123,6 +123,18 @@ } /** * Checks that the ChangeNumber given as a parameter is in this ServerState. * * @param covered The ChangeNumber that should be checked. * @return A boolean indicating if this ServerState contains the ChangeNumber * given in parameter. */ public boolean cover(ChangeNumber covered) { return state.cover(covered); } /** * Update the Server State with a ChangeNumber. * All operations with smaller CSN and the same serverID must be committed * before calling this method. opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -188,6 +188,7 @@ private long generationID; private int updateDoneCount = 0; private boolean connectRequiresRecovery = false; /** * Creates a new ReplicationServer Broker for a particular ReplicationDomain. @@ -694,6 +695,21 @@ rsServerId = serverInfo.getServerId(); rsServerUrl = bestServer; receiveTopo(topologyMsg); // Log a message to let the administrator know that the failure // was resolved. // Wakeup all the thread that were waiting on the window // on the previous connection. connectionError = false; if (sendWindow != null) { sendWindow.release(Integer.MAX_VALUE); } sendWindow = new Semaphore(maxSendWindow); rcvWindow = maxRcvWindow; connected = true; // May have created a broker with null replication domain for // unit test purpose. if (domain != null) @@ -703,8 +719,7 @@ serverInfo.getGenerationId(), session); } receiveTopo(topologyMsg); connected = true; if (getRsGroupId() != groupId) { // Connected to replication server with wrong group id: @@ -766,17 +781,6 @@ if (connected) { // Log a message to let the administrator know that the failure was // resolved. // Wakeup all the thread that were waiting on the window // on the previous connection. connectionError = false; if (sendWindow != null) { sendWindow.release(Integer.MAX_VALUE); } sendWindow = new Semaphore(maxSendWindow); rcvWindow = maxRcvWindow; connectPhaseLock.notify(); if ((serverInfo.getGenerationId() == this.getGenerationID()) || @@ -1786,6 +1790,25 @@ */ public void publish(ReplicationMsg msg) { _publish(msg, false); } /** * Publish a recovery message to the other servers. * @param msg the message to publish */ public void publishRecovery(ReplicationMsg msg) { _publish(msg, true); } /** * Publish a message to the other servers. * @param msg the message to publish * @param recoveryMsg the message is a recovery Message */ void _publish(ReplicationMsg msg, boolean recoveryMsg) { boolean done = false; while (!done && !shutdown) @@ -1825,6 +1848,15 @@ currentWindowSemaphore = sendWindow; } // If the Replication domain has decided that there is a need to // recover some changes then it is not allowed to send this // change but it will be the responsibility of the recovery thread to // do it. if (!recoveryMsg & connectRequiresRecovery) { return; } if (msg instanceof UpdateMsg) { // Acquiring the window credit must be done outside of the @@ -2548,4 +2580,18 @@ ctHeartbeatPublisherThread = null; } } /** * Set the connectRequiresRecovery to the provided value. * This flag is used to indicate if a recovery of Update is necessary * after a reconnection to a RS. * It is the responsibility of the ReplicationDomain to set it during the * sessionInitiated phase. * * @param b the new value of the connectRequiresRecovery. */ public void setRecoveryRequired(boolean b) { connectRequiresRecovery = b; } } opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2909,4 +2909,16 @@ { return eClIncludes; } /** * Returns the ChangeNUmber of the last Change that was fully processed * by this ReplicationDomain. * * @return The ChangeNUmber of the last Change that was fully processed * by this ReplicationDomain. */ public ChangeNumber getLastLocalChange() { return state.getMaxChangeNumber(serverID); } } opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1231,17 +1231,31 @@ * Deletes the provided entry from the Directory Server using an * internal operation. * * @param entry The entry to be added. * @param entry The entry to be deleted. * * @throws Exception If an unexpected problem occurs. */ public static void deleteEntry(Entry entry) throws Exception { deleteEntry(entry.getDN()); } /** * Deletes the provided entry from the Directory Server using an * internal operation. * * @param dn The dn of entry to be deleted * * @throws Exception If an unexpected problem occurs. */ public static void deleteEntry(DN dn) throws Exception { InternalClientConnection conn = InternalClientConnection.getRootConnection(); DeleteOperation deleteOperation = conn.processDelete(entry.getDN()); DeleteOperation deleteOperation = conn.processDelete(dn); assertEquals(deleteOperation.getResultCode(), ResultCode.SUCCESS); } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -26,127 +26,71 @@ */ package org.opends.server.replication.plugin; import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.IOException; import java.net.ServerSocket; import java.util.LinkedList; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.core.AddOperationBasis; import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.LDAPUpdateMsg; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.ByteString; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchResultEntry; import org.testng.annotations.BeforeClass; import org.opends.server.util.TimeThread; import org.testng.annotations.Test; import static org.opends.server.TestCaseUtils.*; /** * Test the usage of the historical data of the replication. */ public class HistoricalCsnOrderingTest extends ReplicationTestCase extends ReplicationTestCase { /** * A "person" entry */ protected Entry personEntry; private int replServerPort; final int serverId = 123; /** * Set up the environment for performing the tests in this Class. * * @throws Exception * If the environment could not be set up. */ @BeforeClass @Override public void setUp() throws Exception public class TestBroker extends ReplicationBroker { super.setUp(); LinkedList<ReplicationMsg> list = null; // Create necessary backend top level entry String topEntry = "dn: ou=People," + TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" + "objectClass: organizationalUnit\n" + "entryUUID: 11111111-1111-1111-1111-111111111111\n"; addEntry(TestCaseUtils.entryFromLdifString(topEntry)); public TestBroker(LinkedList<ReplicationMsg> list) { super(null, null, null, 0, 0, (long) 0, (long) 0, null, (byte) 0, (long) 0); this.list = list; } // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); replServerPort = socket.getLocalPort(); socket.close(); public void publishRecovery(ReplicationMsg msg) { list.add(msg); } // replication server String replServerLdif = "dn: cn=Replication Server, " + SYNCHRO_PLUGIN_DN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-replication-server\n" + "cn: Replication Server\n" + "ds-cfg-replication-port: " + replServerPort + "\n" + "ds-cfg-replication-db-directory: HistoricalCsnOrderingTestDb\n" + "ds-cfg-replication-server-id: 101\n"; replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif); // suffix synchronized String testName = "historicalCsnOrderingTest"; String synchroServerLdif = "dn: cn=" + testName + ", cn=domains, " + SYNCHRO_PLUGIN_DN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-replication-domain\n" + "cn: " + testName + "\n" + "ds-cfg-base-dn: ou=People," + TEST_ROOT_DN_STRING + "\n" + "ds-cfg-replication-server: localhost:" + replServerPort + "\n" + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n"; synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); String personLdif = "dn: uid=user.1,ou=People," + TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "uid: user.1\n" + "homePhone: 951-245-7634\n" + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" + "mobile: 027-085-0537\n" + "postalAddress: Aaccf Amar$17984 Thirteenth Street" + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" + "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n" + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n"; personEntry = TestCaseUtils.entryFromLdifString(personLdif); configureReplication(); } /** * Add an entry in the database * */ private void addEntry(Entry entry) throws Exception { AddOperationBasis addOp = new AddOperationBasis(connection, InternalClientConnection.nextOperationID(), InternalClientConnection .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(), entry.getOperationalAttributes()); addOp.setInternalOperation(true); addOp.run(); assertNotNull(getEntry(entry.getDN(), 1000, true)); } /** @@ -182,10 +126,19 @@ * informations. */ @Test() public void changesCmpTest() public void buildAndPublishMissingChangesOneEntryTest() throws Exception { final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING); final int serverId = 123; final DN baseDn = DN.decode(TEST_ROOT_DN_STRING); TestCaseUtils.initializeTestBackend(true); ReplicationServer rs = createReplicationServer(); // Create Replication Server and Domain LDAPReplicationDomain rd1 = createReplicationDomain(serverId); try { long startTime = TimeThread.getTime(); final DN dn1 = DN.decode("cn=test1," + baseDn.toString()); final AttributeType histType = DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME); @@ -246,39 +199,183 @@ "Second historical value:" + av.getValue().toString())); } LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>(); TestBroker session = new TestBroker(opList); boolean result = rd1.buildAndPublishMissingChanges( new ChangeNumber(startTime, 0, serverId), session); assertTrue(result, "buildAndPublishMissingChanges has failed"); assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations"); assertTrue(opList.getFirst().getClass().equals(AddMsg.class)); // Build a change number from the first modification String hv[] = histValue.split(":"); logError(Message.raw(Category.SYNC, Severity.INFORMATION, hv[1])); ChangeNumber fromChangeNumber = new ChangeNumber(hv[1]); logError(Message.raw(Category.SYNC, Severity.INFORMATION, hv[1])); ChangeNumber fromChangeNumber = new ChangeNumber(hv[1]); // Retrieves the entries that have changed since the first modification InternalSearchOperation op = LDAPReplicationDomain.searchForChangedEntries( baseDn, fromChangeNumber, null); opList = new LinkedList<ReplicationMsg>(); session = new TestBroker(opList); // The expected result is one entry .. the one previously modified assertEquals(op.getResultCode(), ResultCode.SUCCESS); assertEquals(op.getSearchEntries().size(), 1); // From the historical of this entry, rebuild operations // Since there have been 2 modifications and 1 add, there should be 3 // operations rebuild from this state. int updatesCnt = 0; for (SearchResultEntry searchEntry : op.getSearchEntries()) { logError(Message.raw(Category.SYNC, Severity.INFORMATION, searchEntry.toString())); Iterable<FakeOperation> updates = Historical.generateFakeOperations(searchEntry); for (FakeOperation fop : updates) { logError(Message.raw(Category.SYNC, Severity.INFORMATION, fop.generateMessage().toString())); updatesCnt++; } result = rd1.buildAndPublishMissingChanges( fromChangeNumber, session); assertTrue(result, "buildAndPublishMissingChanges has failed"); assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation"); assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class)); } assertTrue(updatesCnt == 3); finally { MultimasterReplication.deleteDomain(baseDn); rs.remove(); } } /** * Test that we can retrieve the entries that were missed by * a replication server and can re-build operations from the historical * informations. */ @Test() public void buildAndPublishMissingChangesSeveralEntriesTest() throws Exception { final DN baseDn = DN.decode(TEST_ROOT_DN_STRING); TestCaseUtils.initializeTestBackend(true); ReplicationServer rs = createReplicationServer(); // Create Replication Server and Domain LDAPReplicationDomain rd1 = createReplicationDomain(serverId); long startTime = TimeThread.getTime(); try { logError(Message.raw(Category.SYNC, Severity.INFORMATION, "Starting replication test : changesCmpTest")); // Add 3 entries. String dnTest1 = "cn=test1," + baseDn.toString(); String dnTest2 = "cn=test2," + baseDn.toString(); String dnTest3 = "cn=test3," + baseDn.toString(); TestCaseUtils.addEntry( "dn: " + dnTest3, "displayname: Test1", "objectClass: top", "objectClass: person", "objectClass: organizationalPerson", "objectClass: inetOrgPerson", "cn: test1", "sn: test" ); TestCaseUtils.addEntry( "dn: " + dnTest1, "displayname: Test1", "objectClass: top", "objectClass: person", "objectClass: organizationalPerson", "objectClass: inetOrgPerson", "cn: test1", "sn: test" ); TestCaseUtils.deleteEntry(DN.decode(dnTest3)); TestCaseUtils.addEntry( "dn: " + dnTest2, "displayname: Test1", "objectClass: top", "objectClass: person", "objectClass: organizationalPerson", "objectClass: inetOrgPerson", "cn: test1", "sn: test" ); // Perform modifications on the 2 entries int resultCode = TestCaseUtils.applyModifications(false, "dn: cn=test2," + baseDn.toString(), "changetype: modify", "add: description", "description: foo"); resultCode = TestCaseUtils.applyModifications(false, "dn: cn=test1," + baseDn.toString(), "changetype: modify", "add: description", "description: foo"); assertEquals(resultCode, 0); LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>(); TestBroker session = new TestBroker(opList); // Call the buildAndPublishMissingChanges and check that this method // correctly generates the 4 operations in the correct order. boolean result = rd1.buildAndPublishMissingChanges( new ChangeNumber(startTime, 0, serverId), session); assertTrue(result, "buildAndPublishMissingChanges has failed"); assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations"); ReplicationMsg msg = opList.removeFirst(); assertTrue(msg.getClass().equals(AddMsg.class)); assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1); msg = opList.removeFirst(); assertTrue(msg.getClass().equals(DeleteMsg.class)); assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest3); msg = opList.removeFirst(); assertTrue(msg.getClass().equals(AddMsg.class)); assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2); msg = opList.removeFirst(); assertTrue(msg.getClass().equals(ModifyMsg.class)); assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2); msg = opList.removeFirst(); assertTrue(msg.getClass().equals(ModifyMsg.class)); assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1); } finally { MultimasterReplication.deleteDomain(baseDn); rs.remove(); } } SortedSet<String> replServers = new TreeSet<String>(); private ReplicationServer createReplicationServer() throws ConfigException { int rsPort; try { ServerSocket socket1 = TestCaseUtils.bindFreePort(); rsPort = socket1.getLocalPort(); socket1.close(); replServers.add("localhost:" + rsPort); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering", 0, 1, 0, 100, replServers, 1, 1000, 5000); ReplicationServer replicationServer = new ReplicationServer(conf); replicationServer.clearDb(); return replicationServer; } catch (IOException e) { fail("Unable to determinate some free ports " + stackTraceToSingleLineString(e)); return null; } } private LDAPReplicationDomain createReplicationDomain(int dsId) throws DirectoryException, ConfigException { DN baseDn = DN.decode(TEST_ROOT_DN_STRING); DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, dsId, replServers, AssuredType.NOT_ASSURED, 2, 1, 0, null); LDAPReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf); replicationDomain.start(); return replicationDomain; } }