opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -41,7 +41,7 @@ import org.opends.messages.MessageBuilder; import org.opends.messages.Severity; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*; import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy; import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; import org.opends.server.admin.std.server.ReplicationDomainCfg; import org.opends.server.api.AlertGenerator; @@ -62,7 +62,6 @@ import org.opends.server.protocols.ldap.LDAPModification; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.tasks.PurgeConflictsHistoricalTask; import org.opends.server.tasks.TaskUtils; @@ -71,7 +70,7 @@ import org.opends.server.util.LDIFReader; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import org.opends.server.workflowelement.localbackend.*; import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation; import static org.opends.messages.ReplicationMessages.*; import static org.opends.messages.ToolMessages.*; @@ -436,7 +435,7 @@ */ try { if (buildAndPublishMissingChanges(startCSN, broker)) if (buildAndPublishMissingChanges(startCSN)) { message = DEBUG_CHANGES_SENT.get(); logError(message); @@ -1262,8 +1261,8 @@ break; } } boolean attributeToBeFiltered = (fractionalExclusive && found) || (!fractionalExclusive && !found); boolean attributeToBeFiltered = fractionalExclusive && found || !fractionalExclusive && !found; if (attributeToBeFiltered && !newRdn.hasAttributeType(attributeType) && !modifyDNOperation.deleteOldRDN()) @@ -1437,7 +1436,7 @@ private static boolean isFractionalProhibited(AttributeType attrType) { String attributeName = attrType.getPrimaryName(); return (attributeName != null && isFractionalProhibitedAttr(attributeName)) return attributeName != null && isFractionalProhibitedAttr(attributeName) || isFractionalProhibitedAttr(attrType.getOID()); } @@ -1453,8 +1452,8 @@ // Now remove the attribute or modification if: // - exclusive mode and attribute is in configuration // - inclusive mode and attribute is not in configuration return (foundAttribute && fractionalExclusive) || (!foundAttribute && !fractionalExclusive); return foundAttribute && fractionalExclusive || !foundAttribute && !fractionalExclusive; } private static boolean contains(Set<String> fractionalConcernedAttributes, @@ -1857,16 +1856,8 @@ // this policy imply that we always accept updates. return true; } if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) { // this isolation policy specifies that the updates are denied // when the broker had problems during the connection phase // Updates are still accepted if the broker is currently connecting.. return !hasConnectionError(); } // we should never get there as the only possible policies are // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES return true; return !isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES) || !hasConnectionError(); } @@ -2473,7 +2464,7 @@ op = msg.createOperation(conn); dependency = remotePendingChanges.checkDependencies(op, msg); while (!dependency && !replayDone && (retryCount-- > 0)) while (!dependency && !replayDone && retryCount-- > 0) { if (shutdown.get()) { @@ -2824,8 +2815,8 @@ for (Modification mod : mods) { AttributeType modAttrType = mod.getAttribute().getAttributeType(); if ((mod.getModificationType() == ModificationType.DELETE || mod.getModificationType() == ModificationType.REPLACE) if (mod.getModificationType() == ModificationType.DELETE || mod.getModificationType() == ModificationType.REPLACE && currentRDN.hasAttributeType(modAttrType)) { if (currentRDN.hasAttributeType(modAttrType)) @@ -4429,14 +4420,11 @@ * * @param startCSN * The CSN where we need to start the search * @param session * The session to use to publish the changes * @return A boolean indicating he success of the operation. * @throws Exception * if an Exception happens during the search. */ public boolean buildAndPublishMissingChanges(CSN startCSN, ReplicationBroker session) throws Exception public boolean buildAndPublishMissingChanges(CSN startCSN) throws Exception { // Trim the changes in replayOperations that are older than the startCSN. synchronized (replayOperations) @@ -4494,7 +4482,7 @@ for (FakeOperation opToSend : opsToSend) { session.publishRecovery(opToSend.generateMessage()); broker.publishRecovery(opToSend.generateMessage()); } opsToSend.clear(); if (lastRetrievedChange != null) @@ -5205,8 +5193,8 @@ return false; // Compare modes if ((cfg1.isFractional() != cfg2.isFractional()) || (cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive())) if (cfg1.isFractional() != cfg2.isFractional() || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive()) return false; // Compare all classes attributes opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java
@@ -23,11 +23,13 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS */ package org.opends.server.core.networkgroups; import java.util.ArrayList; import java.util.List; import org.opends.messages.Message; import org.opends.server.DirectoryServerTestCase; import org.opends.server.TestCaseUtils; @@ -35,18 +37,18 @@ import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.types.DN; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchScope; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.*; /* /** * This set of tests test the resource limits. */ @SuppressWarnings("javadoc") public class ResourceLimitsPolicyTest extends DirectoryServerTestCase { //=========================================================================== // @@ -60,8 +62,7 @@ * @throws Exception if the environment could not be set up. */ @BeforeClass public void setUp() throws Exception public void setUp() throws Exception { // This test suite depends on having the schema available, // so we'll start the server. @@ -119,13 +120,11 @@ public void testMaxNumberOfConnections() throws Exception { ArrayList<Message> messages = new ArrayList<Message>(); List<Message> messages = new ArrayList<Message>(); ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory(); ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory(); ResourceLimitsPolicy limits = factory .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg() factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg() { @Override @@ -139,17 +138,14 @@ InternalClientConnection conn1 = new InternalClientConnection(DN.NULL_DN); limits.addConnection(conn1); boolean check = limits.isAllowed(conn1, null, true, messages); assertTrue(check); assertTrue(limits.isAllowed(conn1, null, true, messages)); InternalClientConnection conn2 = new InternalClientConnection(DN.NULL_DN); limits.addConnection(conn2); check = limits.isAllowed(conn2, null, true, messages); assertFalse(check); assertFalse(limits.isAllowed(conn2, null, true, messages)); limits.removeConnection(conn1); check = limits.isAllowed(conn2, null, true, messages); assertTrue(check); assertTrue(limits.isAllowed(conn2, null, true, messages)); limits.removeConnection(conn2); } @@ -162,13 +158,11 @@ public void testMaxNumberOfConnectionsFromSameIp() throws Exception { ArrayList<Message> messages = new ArrayList<Message>(); List<Message> messages = new ArrayList<Message>(); ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory(); ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory(); ResourceLimitsPolicy limits = factory .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg() factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg() { @Override @@ -182,17 +176,14 @@ InternalClientConnection conn1 = new InternalClientConnection(DN.NULL_DN); limits.addConnection(conn1); boolean check = limits.isAllowed(conn1, null, true, messages); assertTrue(check); assertTrue(limits.isAllowed(conn1, null, true, messages)); InternalClientConnection conn2 = new InternalClientConnection(DN.NULL_DN); limits.addConnection(conn2); check = limits.isAllowed(conn2, null, true, messages); assertFalse(check); assertFalse(limits.isAllowed(conn2, null, true, messages)); limits.removeConnection(conn1); check = limits.isAllowed(conn2, null, true, messages); assertTrue(check); assertTrue(limits.isAllowed(conn2, null, true, messages)); limits.removeConnection(conn2); } @@ -213,11 +204,9 @@ { List<Message> messages = new ArrayList<Message>(); ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory(); ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory(); ResourceLimitsPolicy limits = factory .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg() factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg() { @Override @@ -236,12 +225,7 @@ SearchScope.BASE_OBJECT, LDAPFilter.decode(searchFilter).toSearchFilter()); boolean check = limits.isAllowed(conn1, search, true, messages); if (success) { assertTrue(check); } else { assertFalse(check); } assertEquals(limits.isAllowed(conn1, search, true, messages), success); limits.removeConnection(conn1); } @@ -254,7 +238,7 @@ public void testMaxThroughput() throws Exception { ArrayList<Message> messages = new ArrayList<Message>(); List<Message> messages = new ArrayList<Message>(); final long interval = 1000; // Unit is milliseconds ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory(); @@ -276,35 +260,26 @@ InternalClientConnection conn = new InternalClientConnection(DN.NULL_DN); limits.addConnection(conn); InternalSearchOperation search1 = conn.processSearch( DN.decode("dc=example,dc=com"), SearchScope.BASE_OBJECT, LDAPFilter.decode("(objectclass=*)").toSearchFilter()); final DN dn = DN.decode("dc=example,dc=com"); final SearchFilter all = SearchFilter.createFilterFromString("(objectclass=*)"); // First operation is allowed boolean check = limits.isAllowed(conn, search1, true, messages); assertTrue(check); InternalSearchOperation search2 = conn.processSearch( DN.decode("dc=example,dc=com"), SearchScope.BASE_OBJECT, LDAPFilter.decode("(objectclass=*)").toSearchFilter()); InternalSearchOperation search1 = conn.processSearch(dn, SearchScope.BASE_OBJECT, all); assertTrue(limits.isAllowed(conn, search1, true, messages)); // Second operation in the same interval is refused check = limits.isAllowed(conn, search2, true, messages); assertFalse(check); InternalSearchOperation search2 = conn.processSearch(dn, SearchScope.BASE_OBJECT, all); assertFalse(limits.isAllowed(conn, search2, true, messages)); // Wait for the end of the interval => counters are reset Thread.sleep(interval); InternalSearchOperation search3 = conn.processSearch( DN.decode("dc=example,dc=com"), SearchScope.BASE_OBJECT, LDAPFilter.decode("(objectclass=*)").toSearchFilter()); // The operation is allowed check = limits.isAllowed(conn, search3, true, messages); assertTrue(check); InternalSearchOperation search3 = conn.processSearch(dn, SearchScope.BASE_OBJECT, all); assertTrue(limits.isAllowed(conn, search3, true, messages)); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -46,7 +46,6 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; @@ -462,9 +461,8 @@ * Handle the handshake processing with the connecting DS * returns true if handshake was performed without errors */ private boolean performHandshake() private boolean performHandshake() throws Exception { try { // Receive server start ServerStartMsg serverStartMsg = (ServerStartMsg) session.receive(); @@ -520,17 +518,6 @@ TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(), rsList); session.publish(topologyMsg); } catch (IOException e) { fail("Unexpected io exception in fake replication server handshake " + "processing: " + e); return false; } catch (Exception e) { fail("Unexpected exception in fake replication server handshake " + "processing: " + e); return false; } return true; } @@ -558,7 +545,7 @@ /** * Handle client connection then call code specific to configured test */ private void handleClientConnection() private void handleClientConnection() throws Exception { debugInfo("handleClientConnection " + testcase + " " + scenario); // Handle DS connection @@ -610,13 +597,12 @@ debugInfo("handleClientConnection " + testcase + " " + scenario + " done"); } /* /** * Make the RS send an add message with the passed entry and return the ack * message it receives from the DS */ private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws SocketTimeoutException private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws Exception { try { AddMsg addMsg = new AddMsg(gen.newCSN(), entry.getDN(), UUID.randomUUID().toString(), @@ -632,86 +618,51 @@ // Read and return matching ack return (AckMsg)session.receive(); } catch(SocketTimeoutException e) { throw e; } catch (Throwable t) { fail("Unexpected exception in fake replication server sendAddUpdate " + "processing: " + t); return null; } } /** * Read the coming update and check parameters are not assured */ private void executeNotAssuredScenario() private void executeNotAssuredScenario() throws Exception { UpdateMsg updateMsg = (UpdateMsg) session.receive(); checkUpdateAssuredParameters(updateMsg); try { UpdateMsg updateMsg = (UpdateMsg) session.receive(); checkUpdateAssuredParameters(updateMsg); scenarioExecuted = true; } catch (Exception e) { fail("Unexpected exception in fake replication server executeNotAssuredScenario " + "processing: " + e); } scenarioExecuted = true; } /** * Read the coming update and make the client time out by not sending back * the ack */ private void executeTimeoutScenario() private void executeTimeoutScenario() throws Exception { UpdateMsg updateMsg = (UpdateMsg) session.receive(); checkUpdateAssuredParameters(updateMsg); try { UpdateMsg updateMsg = (UpdateMsg) session.receive(); checkUpdateAssuredParameters(updateMsg); scenarioExecuted = true; scenarioExecuted = true; // We do not send back an ack and the client code is expected to be // blocked at least for the programmed timeout time. } catch (Exception e) { fail("Unexpected exception in fake replication server executeTimeoutScenario " + "processing: " + e + " testcase= " + testcase + " groupId=" + groupId); } // We do not send back an ack and the client code is expected to be // blocked at least for the programmed timeout time. } /** * Read the coming update, sleep some time then send back an ack */ private void executeNoTimeoutScenario() private void executeNoTimeoutScenario() throws Exception { try { UpdateMsg updateMsg = (UpdateMsg) session.receive(); checkUpdateAssuredParameters(updateMsg); UpdateMsg updateMsg = (UpdateMsg) session.receive(); checkUpdateAssuredParameters(updateMsg); // Sleep before sending back the ack sleep(NO_TIMEOUT_RS_SLEEP_TIME); // Sleep before sending back the ack sleep(NO_TIMEOUT_RS_SLEEP_TIME); // Send the ack without errors AckMsg ackMsg = new AckMsg(updateMsg.getCSN()); session.publish(ackMsg); // Send the ack without errors AckMsg ackMsg = new AckMsg(updateMsg.getCSN()); session.publish(ackMsg); scenarioExecuted = true; } catch (Exception e) { fail("Unexpected exception in fake replication server executeNoTimeoutScenario " + "processing: " + e); } scenarioExecuted = true; } /** @@ -733,9 +684,8 @@ /** * Read the coming safe read mode updates and send back acks with errors */ private void executeSafeReadManyErrorsScenario() private void executeSafeReadManyErrorsScenario() throws Exception { try { // Read first update UpdateMsg updateMsg = (UpdateMsg) session.receive(); @@ -779,20 +729,14 @@ // let timeout occur scenarioExecuted = true; } catch (Exception e) { fail("Unexpected exception in fake replication server executeSafeReadManyErrorsScenario " + "processing: " + e); } } /** * Read the coming seaf data mode updates and send back acks with errors * Read the coming safe data mode updates and send back acks with errors */ private void executeSafeDataManyErrorsScenario() private void executeSafeDataManyErrorsScenario() throws Exception { try { // Read first update UpdateMsg updateMsg = (UpdateMsg) session.receive(); @@ -830,32 +774,12 @@ checkUpdateAssuredParameters(updateMsg); // let timeout occur scenarioExecuted = true; } catch (Exception e) { fail("Unexpected exception in fake replication server executeSafeDataManyErrorsScenario " + "processing: " + e); } } } /** * Sleep a while */ private void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException ex) { fail("Error sleeping " + ex); } } /** * Return various group id values */ @DataProvider(name = "rsGroupIdProvider") @@ -877,7 +801,6 @@ @Test(dataProvider = "rsGroupIdProvider") public void testSafeDataModeTimeout(byte rsGroupId) throws Exception { int TIMEOUT = 5000; String testcase = "testSafeDataModeTimeout" + rsGroupId; try @@ -894,8 +817,7 @@ // Create a safe data assured domain if (rsGroupId == (byte)1) { safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT); safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT); // Wait for connection of domain to RS waitForConnectionToRs(testcase, replicationServer); @@ -926,15 +848,11 @@ if (rsGroupId == (byte)1) { // RS has same group id as DS // In this scenario, the fake RS will not send back an ack so we expect // the add entry code (LDAP client code emulation) to be blocked for the // timeout value at least. If the time we have slept is lower, timeout // handling code is not working... assertTrue((endTime - startTime) >= TIMEOUT); assertBlockedLongerThanTimeout(startTime, endTime, TIMEOUT); assertTrue(replicationServer.isScenarioExecuted()); // Check monitoring values sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked DN baseDN = DN.decode(SAFE_DATA_DN); new MonitorAssertions(baseDN) .assertValue("assured-sd-sent-updates", 1) @@ -948,7 +866,7 @@ // No error should be seen in monitoring and update should have not been // sent in assured mode sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked DN baseDN = DN.decode(NOT_ASSURED_DN); new MonitorAssertions(baseDN).assertRemainingValuesAreZero(); assertNoServerErrors(baseDN); @@ -992,7 +910,6 @@ @Test(dataProvider = "rsGroupIdProvider") public void testSafeReadModeTimeout(byte rsGroupId) throws Exception { int TIMEOUT = 5000; String testcase = "testSafeReadModeTimeout" + rsGroupId; try @@ -1010,8 +927,7 @@ if (rsGroupId == (byte)1) { // Create a safe read assured domain safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT); safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT); // Wait for connection of domain to RS waitForConnectionToRs(testcase, replicationServer); @@ -1041,16 +957,11 @@ if (rsGroupId == (byte)1) { // RS has same group id as DS // In this scenario, the fake RS will not send back an ack so we expect // the add entry code (LDAP client code emulation) to be blocked for the // timeout value at least. If the time we have slept is lower, timeout // handling code is not working... assertTrue((endTime - startTime) >= TIMEOUT); assertBlockedLongerThanTimeout(startTime, endTime, TIMEOUT); assertTrue(replicationServer.isScenarioExecuted()); // Check monitoring values sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked DN baseDN = DN.decode(SAFE_READ_DN); new MonitorAssertions(baseDN) .assertValue("assured-sr-sent-updates", 1) @@ -1065,7 +976,7 @@ // No error should be seen in monitoring and update should have not been // sent in assured mode sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked DN baseDN = DN.decode(NOT_ASSURED_DN); new MonitorAssertions(baseDN).assertRemainingValuesAreZero(); assertNoServerErrors(baseDN); @@ -1117,7 +1028,7 @@ * Wait for connection to the fake replication server or times out with error * after some seconds */ private void waitForConnectionToRs(String testCase, FakeReplicationServer rs) private void waitForConnectionToRs(String testCase, FakeReplicationServer rs) throws Exception { int nsec = -1; do @@ -1125,7 +1036,7 @@ nsec++; if (nsec == 10) // 10 seconds timeout fail(testCase + ": timeout waiting for domain connection to fake RS after " + nsec + " seconds."); sleep(1000); Thread.sleep(1000); } while (!rs.isHandshakeOk()); } @@ -1133,7 +1044,7 @@ * Wait for the scenario to be executed by the fake replication server or * times out with error after some seconds */ private void waitForScenarioExecutedOnRs(String testCase, FakeReplicationServer rs) private void waitForScenarioExecutedOnRs(String testCase, FakeReplicationServer rs) throws Exception { int nsec = -1; do @@ -1141,7 +1052,7 @@ nsec++; if (nsec == 10) // 10 seconds timeout fail(testCase + ": timeout waiting for scenario to be exectued on fake RS after " + nsec + " seconds."); sleep(1000); Thread.sleep(1000); } while (!rs.isScenarioExecuted()); } @@ -1163,7 +1074,6 @@ @Test public void testSafeDataModeAck() throws Exception { int TIMEOUT = 5000; String testcase = "testSafeDataModeAck"; try @@ -1175,8 +1085,7 @@ replicationServer.start(NO_TIMEOUT_SCENARIO); // Create a safe data assured domain safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 2, TIMEOUT); safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 2, TIMEOUT); // Wait for connection of domain to RS waitForConnectionToRs(testcase, replicationServer); @@ -1187,16 +1096,11 @@ "objectClass: organizationalUnit\n"; addEntry(TestCaseUtils.entryFromLdifString(entry)); // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value. long endTime = System.currentTimeMillis(); long callTime = endTime - startTime; assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT)); assertBlockedForLessThanTimeout(startTime, TIMEOUT); assertTrue(replicationServer.isScenarioExecuted()); // Check monitoring values sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked DN baseDN = DN.decode(SAFE_DATA_DN); new MonitorAssertions(baseDN) .assertValue("assured-sd-sent-updates", 1) @@ -1216,7 +1120,6 @@ @Test public void testSafeReadModeAck() throws Exception { int TIMEOUT = 5000; String testcase = "testSafeReadModeAck"; try @@ -1227,8 +1130,7 @@ replicationServer.start(NO_TIMEOUT_SCENARIO); // Create a safe read assured domain safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT); safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT); // Wait for connection of domain to RS waitForConnectionToRs(testcase, replicationServer); @@ -1239,16 +1141,11 @@ "objectClass: organizationalUnit\n"; addEntry(TestCaseUtils.entryFromLdifString(entry)); // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value. long endTime = System.currentTimeMillis(); long callTime = endTime - startTime; assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT)); assertBlockedForLessThanTimeout(startTime, TIMEOUT); assertTrue(replicationServer.isScenarioExecuted()); // Check monitoring values sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked DN baseDN = DN.decode(SAFE_READ_DN); new MonitorAssertions(baseDN) .assertValue("assured-sr-sent-updates", 1) @@ -1268,7 +1165,6 @@ @Test(dataProvider = "rsGroupIdProvider", groups = "slow") public void testSafeReadModeReply(byte rsGroupId) throws Exception { int TIMEOUT = 5000; String testcase = "testSafeReadModeReply"; try @@ -1279,8 +1175,7 @@ replicationServer.start(NO_READ); // Create a safe read assured domain safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT); safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT); // Wait for connection of domain to RS waitForConnectionToRs(testcase, replicationServer); @@ -1356,7 +1251,6 @@ @Test(dataProvider = "rsGroupIdProvider", groups = "slow") public void testSafeDataModeReply(byte rsGroupId) throws Exception { int TIMEOUT = 5000; String testcase = "testSafeDataModeReply"; try @@ -1367,8 +1261,7 @@ replicationServer.start(NO_READ); // Create a safe data assured domain safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4, TIMEOUT); safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4, TIMEOUT); // Wait for connection of domain to RS waitForConnectionToRs(testcase, replicationServer); @@ -1380,19 +1273,14 @@ Entry entry = TestCaseUtils.entryFromLdifString(entryStr); String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN)); AckMsg ackMsg; try { ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid); } catch (SocketTimeoutException e) { // Expected return; } fail("DS should not reply an ack in safe data mode, however, it replied: " + ackMsg); } finally AckMsg ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid); fail("DS should not reply an ack in safe data mode, however, it replied: " + ackMsg); } catch (SocketTimeoutException expected) { return; } finally { endTest(testcase); } @@ -1405,7 +1293,6 @@ @Test(groups = "slow") public void testSafeDataManyErrors() throws Exception { int TIMEOUT = 5000; String testcase = "testSafeDataManyErrors"; try @@ -1417,8 +1304,7 @@ replicationServer.start(SAFE_DATA_MANY_ERRORS); // Create a safe data assured domain safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 3, TIMEOUT); safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 3, TIMEOUT); // Wait for connection of domain to RS waitForConnectionToRs(testcase, replicationServer); @@ -1430,18 +1316,13 @@ "objectClass: organizationalUnit\n"; addEntry(TestCaseUtils.entryFromLdifString(entry)); // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value. long endTime = System.currentTimeMillis(); long callTime = endTime - startTime; assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT)); assertBlockedForLessThanTimeout(startTime, TIMEOUT); // Check monitoring values // The expected ack for the first update is: // - timeout error // - server 10 error sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked DN baseDN = DN.decode(SAFE_DATA_DN); new MonitorAssertions(baseDN) .assertValue("assured-sd-sent-updates", 1) @@ -1453,18 +1334,13 @@ startTime = System.currentTimeMillis(); // Time the update has been initiated deleteEntry(entryDn); // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME // seconds, so we expect the delete entry code (LDAP client code emulation) to be blocked // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value. endTime = System.currentTimeMillis(); callTime = endTime - startTime; assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT)); assertBlockedForLessThanTimeout(startTime, TIMEOUT); // Check monitoring values // The expected ack for the second update is: // - timeout error // - server 10 error, server 20 error sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked baseDN = DN.decode(SAFE_DATA_DN); new MonitorAssertions(baseDN) .assertValue("assured-sd-sent-updates", 2) @@ -1476,17 +1352,12 @@ startTime = System.currentTimeMillis(); // Time the update has been initiated addEntry(TestCaseUtils.entryFromLdifString(entry)); // In this scenario, the fake RS will not send back an ack so we expect // the add entry code (LDAP client code emulation) to be blocked for the // timeout value at least. If the time we have slept is lower, timeout // handling code is not working... endTime = System.currentTimeMillis(); assertTrue((endTime - startTime) >= TIMEOUT); assertBlockedLongerThanTimeout(startTime, System.currentTimeMillis(), TIMEOUT); assertTrue(replicationServer.isScenarioExecuted()); // Check monitoring values // No ack should have comen back, so timeout incremented (flag and error for rs) sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked baseDN = DN.decode(SAFE_DATA_DN); new MonitorAssertions(baseDN) .assertValue("assured-sd-sent-updates", 3) @@ -1494,7 +1365,6 @@ .assertRemainingValuesAreZero(); assertServerErrorsSafeDataMode(baseDN, entry(10, 2), entry(20, 1), entry(RS_SERVER_ID, 1)); } finally { endTest(testcase); @@ -1502,13 +1372,36 @@ } /** * In this scenario, the fake RS will not send back an ack so we expect the * add entry code (LDAP client code emulation) to be blocked for the timeout * value at least. If the time we have slept is lower, timeout handling code * is not working... */ private void assertBlockedLongerThanTimeout(long startTime, long endTime, int TIMEOUT) { assertTrue((endTime - startTime) >= TIMEOUT); } /** * In this scenario, the fake RS will send back an ack after * NO_TIMEOUT_RS_SLEEP_TIME seconds, so we expect the add/delete entry code * (LDAP client code emulation) to be blocked for more than * NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value. */ private void assertBlockedForLessThanTimeout(long startTime, int TIMEOUT) { long endTime = System.currentTimeMillis(); long callTime = endTime - startTime; assertTrue(NO_TIMEOUT_RS_SLEEP_TIME <= callTime && callTime <= TIMEOUT); } /** * DS performs many successive modifications in safe read mode and receives RS * acks with various errors. Check for monitoring right errors */ @Test(groups = "slow") public void testSafeReadManyErrors() throws Exception { int TIMEOUT = 5000; String testcase = "testSafeReadManyErrors"; try @@ -1519,8 +1412,7 @@ replicationServer.start(SAFE_READ_MANY_ERRORS); // Create a safe read assured domain safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT); safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT); // Wait for connection of domain to RS waitForConnectionToRs(testcase, replicationServer); @@ -1532,18 +1424,13 @@ "objectClass: organizationalUnit\n"; addEntry(TestCaseUtils.entryFromLdifString(entry)); // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value. long endTime = System.currentTimeMillis(); long callTime = endTime - startTime; assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT)); assertBlockedForLessThanTimeout(startTime, TIMEOUT); // Check monitoring values // The expected ack for the first update is: // - replay error // - server 10 error, server 20 error sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked DN baseDN = DN.decode(SAFE_READ_DN); new MonitorAssertions(baseDN) .assertValue("assured-sr-sent-updates", 1) @@ -1556,12 +1443,7 @@ startTime = System.currentTimeMillis(); // Time the update has been initiated deleteEntry(entryDn); // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME // seconds, so we expect the delete entry code (LDAP client code emulation) to be blocked // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value. endTime = System.currentTimeMillis(); callTime = endTime - startTime; assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT)); assertBlockedForLessThanTimeout(startTime, TIMEOUT); // Check monitoring values // The expected ack for the second update is: @@ -1569,7 +1451,7 @@ // - wrong status error // - replay error // - server 10 error, server 20 error, server 30 error sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked new MonitorAssertions(baseDN) .assertValue("assured-sr-sent-updates", 2) .assertValue("assured-sr-not-acknowledged-updates", 2) @@ -1584,17 +1466,12 @@ startTime = System.currentTimeMillis(); // Time the update has been initiated addEntry(TestCaseUtils.entryFromLdifString(entry)); // In this scenario, the fake RS will not send back an ack so we expect // the add entry code (LDAP client code emulation) to be blocked for the // timeout value at least. If the time we have slept is lower, timeout // handling code is not working... endTime = System.currentTimeMillis(); assertTrue((endTime - startTime) >= TIMEOUT); assertBlockedLongerThanTimeout(startTime, System.currentTimeMillis(), TIMEOUT); assertTrue(replicationServer.isScenarioExecuted()); // Check monitoring values // No ack should have comen back, so timeout incremented (flag and error for rs) sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked new MonitorAssertions(baseDN) .assertValue("assured-sr-sent-updates", 3) .assertValue("assured-sr-not-acknowledged-updates", 3) @@ -1615,14 +1492,14 @@ */ private void deleteEntry(String dn) throws Exception { DN realDn = DN.decode(dn); DN realDN = DN.decode(dn); DeleteOperationBasis delOp = new DeleteOperationBasis(connection, InternalClientConnection.nextOperationID(), InternalClientConnection. nextMessageID(), null, realDn); InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, realDN); delOp.setInternalOperation(true); delOp.run(); waitOpResult(delOp, ResultCode.SUCCESS); assertNull(DirectoryServer.getEntry(realDn)); assertNull(DirectoryServer.getEntry(realDN)); } /** @@ -1636,8 +1513,9 @@ AssuredMode assuredMode) throws Exception { // Find monitoring entry for requested base DN String monitorFilter = "(&(cn=Directory server*)(domain-name=" + baseDN + "))"; SearchFilter monitorFilter = SearchFilter.createFilterFromString( "(&(cn=Directory server*)(domain-name=" + baseDN + "))"); DN dn = DN.decode("cn=replication,cn=monitor"); InternalSearchOperation op; int count = 0; @@ -1645,19 +1523,14 @@ { if (count++>0) Thread.sleep(100); op = connection.processSearch( ByteString.valueOf("cn=replication,cn=monitor"), SearchScope.WHOLE_SUBTREE, LDAPFilter.decode(monitorFilter)); op = connection.processSearch(dn, SearchScope.WHOLE_SUBTREE, monitorFilter); } while (op.getSearchEntries().isEmpty() && (count<100)); if (op.getSearchEntries().isEmpty()) throw new Exception("Could not read monitoring information"); while (op.getSearchEntries().isEmpty() && count < 100); Assertions.assertThat(op.getSearchEntries()).isNotEmpty(); SearchResultEntry entry = op.getSearchEntries().getFirst(); if (entry == null) throw new Exception("Could not find monitoring entry"); assertNotNull(entry); /* * Find the multi valued attribute matching the requested assured mode @@ -1676,37 +1549,33 @@ } List<Attribute> attrs = entry.getAttribute(assuredAttr); if (attrs == null || attrs.isEmpty()) return Collections.emptyMap(); Map<Integer,Integer> resultMap = new HashMap<Integer,Integer>(); if ( (attrs == null) || (attrs.isEmpty()) ) return resultMap; // Empty map Attribute attr = attrs.get(0); // Parse and store values for (AttributeValue val : attr) { String srvStr = val.toString(); StringTokenizer strtok = new StringTokenizer(srvStr, ":"); String token = strtok.nextToken(); if (token != null) { int serverId = Integer.valueOf(token); token = strtok.nextToken(); if (token != null) { Integer nerrors = Integer.valueOf(token); resultMap.put(serverId, nerrors); Map<Integer,Integer> resultMap = new HashMap<Integer,Integer>(); for (AttributeValue val : attrs.get(0)) { StringTokenizer strtok = new StringTokenizer(val.toString(), ":"); String serverId = strtok.nextToken(); if (serverId != null) { String nbErrors = strtok.nextToken(); if (nbErrors != null) { resultMap.put(Integer.valueOf(serverId), Integer.valueOf(nbErrors)); } } } return resultMap; } private void waitOpResult(Operation operation, ResultCode expectedResult) private void waitOpResult(Operation operation, ResultCode expectedResult) throws Exception { int ii=0; while((operation.getResultCode()==ResultCode.UNDEFINED) || (operation.getResultCode()!=expectedResult)) { sleep(50); Thread.sleep(50); ii++; if (ii>10) assertEquals(operation.getResultCode(), expectedResult, @@ -1714,4 +1583,3 @@ } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -35,6 +35,7 @@ import java.net.SocketTimeoutException; import java.util.*; import org.assertj.core.api.Assertions; import org.opends.server.TestCaseUtils; import org.opends.server.api.SynchronizationProvider; import org.opends.server.backends.task.TaskState; @@ -646,33 +647,23 @@ } /** * Chaining tests of the replication Server code with 2 replication servers involved * 2 tests are done here (itest=0 or itest=1) * * Test 1 * - Create replication server 1 * - Create replication server 2 connected with replication server 1 * - Create and connect client 1 to replication server 1 * - Create and connect client 2 to replication server 2 * - Make client1 publish changes * - Check that client 2 receives the changes published by client 1 * * Test 2 * - Create replication server 1 * - Create and connect client1 to replication server 1 * - Make client1 publish changes * - Create replication server 2 connected with replication server 1 * - Create and connect client 2 to replication server 2 * - Check that client 2 receives the changes published by client 1 * <ol> * <li>Create replication server 1</li> * <li>Create replication server 2 connected with replication server 1</li> * <li>Create and connect client 1 to replication server 1</li> * <li>Create and connect client 2 to replication server 2</li> * <li>Make client1 publish changes</li> * <li>Check that client 2 receives the changes published by client 1</li> * </ol> */ @Test(enabled=true, dependsOnMethods = { "searchBackend"}) public void changelogChaining() throws Exception public void changelogChaining0() throws Exception { debugInfo("Starting changelogChaining"); final String tn = "changelogChaining0"; debugInfo("Starting " + tn); replicationServer.clearDb(); TestCaseUtils.initializeTestBackend(true); for (int itest = 0; itest <2; itest++) { ReplicationBroker broker2 = null; boolean emptyOldChanges = true; @@ -683,13 +674,11 @@ int[] changelogIds = new int[] { 80, 81 }; int[] brokerIds = new int[] { 100, 101 }; for (int i = 0; i < ((itest == 0) ? 2 : 1); i++) for (int i = 0; i < 2; i++) { changelogs[i] = null; // for itest=0, create the 2 connected replicationServer // for itest=1, create the 1rst replicationServer, the second // one will be created later // create the 2 connected replicationServer SortedSet<String> servers = new TreeSet<String>(); servers.add( "localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0])); @@ -703,30 +692,118 @@ try { // For itest=0, create and connect client1 to changelog1 // and client2 to changelog2 // For itest=1, only create and connect client1 to changelog1 // client2 will be created later // create and connect client1 to changelog1 and client2 to changelog2 broker1 = openReplicationSession(TEST_ROOT_DN, brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges); assertTrue(broker1.isConnected()); if (itest == 0) { broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges); assertTrue(broker2.isConnected()); } broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges); assertTrue(broker2.isConnected()); // - Test messages between clients by publishing now // - Delete long time = TimeThread.getTime(); int ts = 1; CSN csn = new CSN(time, ts++, brokerIds[0]); CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime()); DN dn = DN.decode("o=example" + 0 + "," + TEST_ROOT_DN_STRING); DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid"); broker1.publish(delMsg); DN dn = DN.decode("o=example" + itest + "," + TEST_ROOT_DN_STRING); DeleteMsg delMsg = new DeleteMsg(dn, csn, "uid"); String user1entryUUID = "33333333-3333-3333-3333-333333333333"; String baseUUID = "22222222-2222-2222-2222-222222222222"; // - Add Entry entry = TestCaseUtils.entryFromLdifString( "dn: o=example," + TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: 11111111-1111-1111-1111-111111111111\n"); AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN, user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry.getAttributes(), new ArrayList<Attribute>()); broker1.publish(addMsg); // - Modify Attribute attr1 = Attributes.create("description", "new value"); Modification mod1 = new Modification(ModificationType.REPLACE, attr1); List<Modification> mods = new ArrayList<Modification>(); mods.add(mod1); ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid"); broker1.publish(modMsg); // - ModifyDN ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null, EXAMPLE_DN, RDN.decode("o=example2"), true, null); op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId")); LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op); ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp); broker1.publish(modDNMsg); // - Check msg receives by broker, through changeLog2 List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4); Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg); debugInfo("Ending " + tn); } finally { remove(changelogs); stop(broker1, broker2); } } } /** * <ol> * <li>Create replication server 1</li> * <li>Create and connect client1 to replication server 1</li> * <li>Make client1 publish changes</li> * <li>Create replication server 2 connected with replication server 1</li> * <li>Create and connect client 2 to replication server 2</li> * <li>Check that client 2 receives the changes published by client 1</li> * <ol> */ @Test(enabled=true, dependsOnMethods = { "searchBackend"}) public void changelogChaining1() throws Exception { final String tn = "changelogChaining1"; debugInfo("Starting " + tn); replicationServer.clearDb(); TestCaseUtils.initializeTestBackend(true); { ReplicationBroker broker2 = null; boolean emptyOldChanges = true; // - Create 2 connected replicationServer ReplicationServer[] changelogs = new ReplicationServer[2]; int[] changelogPorts = TestCaseUtils.findFreePorts(2); int[] changelogIds = new int[] { 80, 81 }; int[] brokerIds = new int[] { 100, 101 }; { // create the 1rst replicationServer, the second one will be created later SortedSet<String> servers = new TreeSet<String>(); servers.add("localhost:" + changelogPorts[1]); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb"+0, 0, changelogIds[0], 0, 100, servers); changelogs[0] = new ReplicationServer(conf); } ReplicationBroker broker1 = null; try { // only create and connect client1 to changelog1 client2 will be created later broker1 = openReplicationSession(TEST_ROOT_DN, brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges); assertTrue(broker1.isConnected()); // - Test messages between clients by publishing now // - Delete CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime()); DN dn = DN.decode("o=example" + 1 + "," + TEST_ROOT_DN_STRING); DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid"); broker1.publish(delMsg); String user1entryUUID = "33333333-3333-3333-3333-333333333333"; @@ -737,8 +814,7 @@ + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: 11111111-1111-1111-1111-111111111111\n"; Entry entry = TestCaseUtils.entryFromLdifString(lentry); csn = new CSN(time, ts++, brokerIds[0]); AddMsg addMsg = new AddMsg(csn, EXAMPLE_DN, AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN, user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry.getAttributes(), new ArrayList<Attribute>()); broker1.publish(addMsg); @@ -748,88 +824,32 @@ Modification mod1 = new Modification(ModificationType.REPLACE, attr1); List<Modification> mods = new ArrayList<Modification>(); mods.add(mod1); csn = new CSN(time, ts++, brokerIds[0]); ModifyMsg modMsg = new ModifyMsg(csn, EXAMPLE_DN, mods, "fakeuniqueid"); ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid"); broker1.publish(modMsg); // - ModifyDN csn = new CSN(time, ts++, brokerIds[0]); ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null, EXAMPLE_DN, RDN.decode("o=example2"), true, null); op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csn, "uniqueid", "newparentId")); LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op); op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId")); LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op); ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp); broker1.publish(modDNMsg); if (itest > 0) { SortedSet<String> servers = new TreeSet<String>(); servers.add("localhost:"+changelogPorts[0]); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPorts[1], null, 0, changelogIds[1], 0, 100, null); changelogs[1] = new ReplicationServer(conf); SortedSet<String> servers = new TreeSet<String>(); servers.add("localhost:"+changelogPorts[0]); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration( changelogPorts[1], null, 0, changelogIds[1], 0, 100, null); changelogs[1] = new ReplicationServer(conf); // Connect broker 2 to changelog2 broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges); assertTrue(broker2.isConnected()); } // Connect broker 2 to changelog2 broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges); assertTrue(broker2.isConnected()); // - Check msg receives by broker, through changeLog2 while (ts > 1) { ReplicationMsg msg2; try { msg2 = broker2.receive(); if (msg2 == null) break; broker2.updateWindowAfterReplay(); } catch (Exception e) { fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest); break; } if (msg2 instanceof DeleteMsg) { if (delMsg.equals(msg2)) ts--; } else if (msg2 instanceof AddMsg) { if (addMsg.equals(msg2)) ts--; } else if (msg2 instanceof ModifyMsg) { if (modMsg.equals(msg2)) ts--; } else if (msg2 instanceof ModifyDNMsg) { if (modDNMsg.equals(msg2)) ts--; } else if (msg2 instanceof TopologyMsg) { // Nothing to test here. } else { fail("ReplicationServer transmission failed: no expected message" + " class: " + msg2); break; } } // Check that everything expected has been received assertEquals(ts, 1, "Broker2 did not receive the complete set of" + " expected messages: #msg received " + ts); debugInfo("Ending changelogChaining"); List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4); Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg); debugInfo("Ending " + tn); } finally { @@ -839,6 +859,30 @@ } } private List<ReplicationMsg> receiveReplicationMsgs(ReplicationBroker broker2, int nbMessagesExpected) { List<ReplicationMsg> msgs = new ArrayList<ReplicationMsg>(nbMessagesExpected); for (int i = 0; i < nbMessagesExpected; i++) { try { ReplicationMsg msg = broker2.receive(); if (msg == null) break; if (msg instanceof TopologyMsg) continue; // ignore msgs.add(msg); broker2.updateWindowAfterReplay(); } catch (SocketTimeoutException e) { fail("Broker receive failed: " + e.getMessage() + "#Msg:" + i); } } return msgs; } /** * Test that the Replication sends back correctly WindowsUpdate * when we send a WindowProbeMsg. @@ -859,7 +903,7 @@ * Some other tests may have been running before and therefore * may have pushed some changes to the Replication Server * When a new session is opened, the Replication Server is therefore * going to send all thoses old changes to this Replication Server. * going to send all these old changes to this Replication Server. * To avoid this, this test open a first session, save the * ServerState from the ReplicationServer, close the session * and re-open a new connection providing the ServerState it just @@ -1207,28 +1251,26 @@ private List<UpdateMsg> createChanges(String suffix, int serverId) throws Exception { List<UpdateMsg> l = new ArrayList<UpdateMsg>(); long time = TimeThread.getTime(); int ts = 1; { String user1entryUUID = "33333333-3333-3333-3333-333333333333"; String baseUUID = "22222222-2222-2222-2222-222222222222"; // - Add String lentry = "dn: "+suffix+"\n" Entry entry = TestCaseUtils.entryFromLdifString( "dn: "+suffix+"\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: 11111111-1111-1111-1111-111111111111\n"; Entry entry = TestCaseUtils.entryFromLdifString(lentry); CSN csn = new CSN(time, ts++, serverId); + "entryUUID: 11111111-1111-1111-1111-111111111111\n"); CSNGenerator csnGen = new CSNGenerator(serverId, TimeThread.getTime()); DN exampleSuffixDN = DN.decode("o=example," + suffix); AddMsg addMsg = new AddMsg(csn, exampleSuffixDN, AddMsg addMsg = new AddMsg(csnGen.newCSN(), exampleSuffixDN, user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry.getAttributes(), new ArrayList<Attribute>()); l.add(addMsg); // - Add String luentry = Entry uentry = TestCaseUtils.entryFromLdifString( "dn: cn=Fiona Jensen,ou=People,"+suffix+"\n" + "objectClass: top\n" + "objectclass: person\n" @@ -1239,12 +1281,10 @@ + "givenName: fjensen\n" + "telephonenumber: +1 408 555 1212\n" + "entryUUID: " + user1entryUUID +"\n" + "userpassword: fjen$$en" + "\n"; Entry uentry = TestCaseUtils.entryFromLdifString(luentry); csn = new CSN(time, ts++, serverId); + "userpassword: fjen$$en" + "\n"); DN newPersonDN = DN.decode("uid=new person,ou=People,"+suffix); AddMsg addMsg2 = new AddMsg( csn, csnGen.newCSN(), newPersonDN, user1entryUUID, baseUUID, @@ -1263,22 +1303,18 @@ List<Modification> mods = Arrays.asList(mod1, mod2, mod3); csn = new CSN(time, ts++, serverId); DN dn = exampleSuffixDN; ModifyMsg modMsg = new ModifyMsg(csn, dn, mods, "fakeuniqueid"); ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), dn, mods, "fakeuniqueid"); l.add(modMsg); // Modify DN csn = new CSN(time, ts++, serverId); ModifyDNMsg modDnMsg = new ModifyDNMsg(newPersonDN, csn, ModifyDNMsg modDnMsg = new ModifyDNMsg(newPersonDN, csnGen.newCSN(), user1entryUUID, baseUUID, false, "uid=wrong, ou=people,"+suffix, "uid=newrdn"); l.add(modDnMsg); // Del csn = new CSN(time, ts++, serverId); DeleteMsg delMsg = new DeleteMsg(exampleSuffixDN, csn, "uid"); DeleteMsg delMsg = new DeleteMsg(exampleSuffixDN, csnGen.newCSN(), "uid"); l.add(delMsg); } return l; @@ -1567,8 +1603,8 @@ changelogs[i] = new ReplicationServer(conf); } try { try { // Create and connect client1 to changelog1 // and client2 to changelog2 broker1 = openReplicationSession(TEST_ROOT_DN, @@ -1580,9 +1616,7 @@ assertTrue(broker2.isConnected()); // - Test messages between clients by publishing now long time = TimeThread.getTime(); int ts = 1; CSN csn; CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime()); String user1entryUUID = "33333333-3333-3333-3333-333333333333"; String baseUUID = "22222222-2222-2222-2222-222222222222"; @@ -1591,10 +1625,9 @@ + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: " + user1entryUUID + "\n"; Entry entry = TestCaseUtils.entryFromLdifString(lentry); csn = new CSN(time, ts++, brokerIds[0]); AddMsg addMsg = new AddMsg(csn, EXAMPLE_DN, user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry .getAttributes(), new ArrayList<Attribute>()); AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN, user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry.getAttributes(), new ArrayList<Attribute>()); broker1.publish(addMsg); // - Modify @@ -1602,47 +1635,12 @@ Modification mod1 = new Modification(ModificationType.REPLACE, attr1); List<Modification> mods = new ArrayList<Modification>(); mods.add(mod1); csn = new CSN(time, ts++, brokerIds[0]); ModifyMsg modMsg = new ModifyMsg(csn, EXAMPLE_DN, mods, "fakeuniqueid"); ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid"); broker1.publish(modMsg); // - Check msg received by broker, through changeLog2 while (ts > 1) { ReplicationMsg msg2; try { msg2 = broker2.receive(); if (msg2 == null) break; broker2.updateWindowAfterReplay(); } catch (Exception e) { fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts); break; } if (msg2 instanceof AddMsg) { if (addMsg.equals(msg2)) ts--; } else if (msg2 instanceof ModifyMsg) { if (modMsg.equals(msg2)) ts--; } else { fail("ReplicationServer transmission failed: did not expect message of class: " + msg2); break; } } // Check that everything expected has been received assertEquals(ts, 1, "Broker2 did not receive the complete set of" + " expected messages: #msg received " + ts); List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 2); Assertions.assertThat(msgs).containsExactly(addMsg, modMsg); // Then change the config to remove replicationServer[1] from // the configuration of replicationServer[0] @@ -1657,41 +1655,43 @@ // The link between RS[0] & RS[1] should be destroyed by the new configuration. // So we expect a timeout exception when calling receive on RS[1]. // Send an update and check that RS[1] does not receive the message after the timeout try { // - Del csn = new CSN(time, ts++, brokerIds[0]); DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csn, user1entryUUID); broker1.publish(delMsg); // Should receive some TopologyMsg messages for disconnection // between the 2 RSs ReplicationMsg msg = null; while (true) { msg = broker2.receive(); if (msg instanceof TopologyMsg) { debugInfo("Broker 2 received: " + msg); } else { fail("Broker: receive successed when it should fail. " + "This broker was disconnected by configuration." + " Received: " + msg); } } } catch (SocketTimeoutException soExc) { // the receive fail as expected debugInfo("Ending replicationServerConnected"); return; } } finally { // - Del DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csnGen.newCSN(), user1entryUUID); broker1.publish(delMsg); // Should receive some TopologyMsg messages for disconnection between the 2 RSs assertOnlyTopologyMsgsReceived(broker2); } finally { remove(changelogs); stop(broker1, broker2); } } } private void assertOnlyTopologyMsgsReceived(ReplicationBroker broker2) { try { while (true) { ReplicationMsg msg = broker2.receive(); if (msg instanceof TopologyMsg) { debugInfo("Broker 2 received: " + msg); } else { fail("Broker: receive successed when it should fail. " + "This broker was disconnected by configuration." + " Received: " + msg); } } } catch (SocketTimeoutException expected) { debugInfo("Ending replicationServerConnected"); } } }