From 9cdfa916fa7bdc0d4b56b8eddc5d623f5d4e2a95 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 26 Sep 2013 10:30:43 +0000
Subject: [PATCH] Code cleanup.
---
opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java | 93 ++---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java | 368 +++++++---------------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 428 +++++++++++++-------------
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 46 +-
4 files changed, 383 insertions(+), 552 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index f8d1c64..e4ab506 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -41,7 +41,7 @@
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.AlertGenerator;
@@ -62,7 +62,6 @@
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
import org.opends.server.tasks.TaskUtils;
@@ -71,7 +70,7 @@
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
-import org.opends.server.workflowelement.localbackend.*;
+import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
@@ -436,7 +435,7 @@
*/
try
{
- if (buildAndPublishMissingChanges(startCSN, broker))
+ if (buildAndPublishMissingChanges(startCSN))
{
message = DEBUG_CHANGES_SENT.get();
logError(message);
@@ -1262,8 +1261,8 @@
break;
}
}
- boolean attributeToBeFiltered = (fractionalExclusive && found)
- || (!fractionalExclusive && !found);
+ boolean attributeToBeFiltered = fractionalExclusive && found
+ || !fractionalExclusive && !found;
if (attributeToBeFiltered
&& !newRdn.hasAttributeType(attributeType)
&& !modifyDNOperation.deleteOldRDN())
@@ -1437,7 +1436,7 @@
private static boolean isFractionalProhibited(AttributeType attrType)
{
String attributeName = attrType.getPrimaryName();
- return (attributeName != null && isFractionalProhibitedAttr(attributeName))
+ return attributeName != null && isFractionalProhibitedAttr(attributeName)
|| isFractionalProhibitedAttr(attrType.getOID());
}
@@ -1453,8 +1452,8 @@
// Now remove the attribute or modification if:
// - exclusive mode and attribute is in configuration
// - inclusive mode and attribute is not in configuration
- return (foundAttribute && fractionalExclusive)
- || (!foundAttribute && !fractionalExclusive);
+ return foundAttribute && fractionalExclusive
+ || !foundAttribute && !fractionalExclusive;
}
private static boolean contains(Set<String> fractionalConcernedAttributes,
@@ -1857,16 +1856,8 @@
// this policy imply that we always accept updates.
return true;
}
- if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
- {
- // this isolation policy specifies that the updates are denied
- // when the broker had problems during the connection phase
- // Updates are still accepted if the broker is currently connecting..
- return !hasConnectionError();
- }
- // we should never get there as the only possible policies are
- // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
- return true;
+ return !isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)
+ || !hasConnectionError();
}
@@ -2473,7 +2464,7 @@
op = msg.createOperation(conn);
dependency = remotePendingChanges.checkDependencies(op, msg);
- while (!dependency && !replayDone && (retryCount-- > 0))
+ while (!dependency && !replayDone && retryCount-- > 0)
{
if (shutdown.get())
{
@@ -2824,8 +2815,8 @@
for (Modification mod : mods)
{
AttributeType modAttrType = mod.getAttribute().getAttributeType();
- if ((mod.getModificationType() == ModificationType.DELETE
- || mod.getModificationType() == ModificationType.REPLACE)
+ if (mod.getModificationType() == ModificationType.DELETE
+ || mod.getModificationType() == ModificationType.REPLACE
&& currentRDN.hasAttributeType(modAttrType))
{
if (currentRDN.hasAttributeType(modAttrType))
@@ -4429,14 +4420,11 @@
*
* @param startCSN
* The CSN where we need to start the search
- * @param session
- * The session to use to publish the changes
* @return A boolean indicating he success of the operation.
* @throws Exception
* if an Exception happens during the search.
*/
- public boolean buildAndPublishMissingChanges(CSN startCSN,
- ReplicationBroker session) throws Exception
+ public boolean buildAndPublishMissingChanges(CSN startCSN) throws Exception
{
// Trim the changes in replayOperations that are older than the startCSN.
synchronized (replayOperations)
@@ -4494,7 +4482,7 @@
for (FakeOperation opToSend : opsToSend)
{
- session.publishRecovery(opToSend.generateMessage());
+ broker.publishRecovery(opToSend.generateMessage());
}
opsToSend.clear();
if (lastRetrievedChange != null)
@@ -5205,8 +5193,8 @@
return false;
// Compare modes
- if ((cfg1.isFractional() != cfg2.isFractional())
- || (cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive()))
+ if (cfg1.isFractional() != cfg2.isFractional()
+ || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive())
return false;
// Compare all classes attributes
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java
index c506257..9b9cfd5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java
@@ -23,11 +23,13 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS
*/
package org.opends.server.core.networkgroups;
import java.util.ArrayList;
import java.util.List;
+
import org.opends.messages.Message;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -35,18 +37,18 @@
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.types.DN;
+import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
-
-/*
+/**
* This set of tests test the resource limits.
*/
+@SuppressWarnings("javadoc")
public class ResourceLimitsPolicyTest extends DirectoryServerTestCase {
//===========================================================================
//
@@ -60,8 +62,7 @@
* @throws Exception if the environment could not be set up.
*/
@BeforeClass
- public void setUp()
- throws Exception
+ public void setUp() throws Exception
{
// This test suite depends on having the schema available,
// so we'll start the server.
@@ -119,13 +120,11 @@
public void testMaxNumberOfConnections()
throws Exception
{
- ArrayList<Message> messages = new ArrayList<Message>();
+ List<Message> messages = new ArrayList<Message>();
- ResourceLimitsPolicyFactory factory =
- new ResourceLimitsPolicyFactory();
+ ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
ResourceLimitsPolicy limits =
- factory
- .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
+ factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
{
@Override
@@ -139,17 +138,14 @@
InternalClientConnection conn1 = new InternalClientConnection(DN.NULL_DN);
limits.addConnection(conn1);
- boolean check = limits.isAllowed(conn1, null, true, messages);
- assertTrue(check);
+ assertTrue(limits.isAllowed(conn1, null, true, messages));
InternalClientConnection conn2 = new InternalClientConnection(DN.NULL_DN);
limits.addConnection(conn2);
- check = limits.isAllowed(conn2, null, true, messages);
- assertFalse(check);
+ assertFalse(limits.isAllowed(conn2, null, true, messages));
limits.removeConnection(conn1);
- check = limits.isAllowed(conn2, null, true, messages);
- assertTrue(check);
+ assertTrue(limits.isAllowed(conn2, null, true, messages));
limits.removeConnection(conn2);
}
@@ -162,13 +158,11 @@
public void testMaxNumberOfConnectionsFromSameIp()
throws Exception
{
- ArrayList<Message> messages = new ArrayList<Message>();
+ List<Message> messages = new ArrayList<Message>();
- ResourceLimitsPolicyFactory factory =
- new ResourceLimitsPolicyFactory();
+ ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
ResourceLimitsPolicy limits =
- factory
- .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
+ factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
{
@Override
@@ -182,17 +176,14 @@
InternalClientConnection conn1 = new InternalClientConnection(DN.NULL_DN);
limits.addConnection(conn1);
- boolean check = limits.isAllowed(conn1, null, true, messages);
- assertTrue(check);
+ assertTrue(limits.isAllowed(conn1, null, true, messages));
InternalClientConnection conn2 = new InternalClientConnection(DN.NULL_DN);
limits.addConnection(conn2);
- check = limits.isAllowed(conn2, null, true, messages);
- assertFalse(check);
+ assertFalse(limits.isAllowed(conn2, null, true, messages));
limits.removeConnection(conn1);
- check = limits.isAllowed(conn2, null, true, messages);
- assertTrue(check);
+ assertTrue(limits.isAllowed(conn2, null, true, messages));
limits.removeConnection(conn2);
}
@@ -213,11 +204,9 @@
{
List<Message> messages = new ArrayList<Message>();
- ResourceLimitsPolicyFactory factory =
- new ResourceLimitsPolicyFactory();
+ ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
ResourceLimitsPolicy limits =
- factory
- .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
+ factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
{
@Override
@@ -236,12 +225,7 @@
SearchScope.BASE_OBJECT,
LDAPFilter.decode(searchFilter).toSearchFilter());
- boolean check = limits.isAllowed(conn1, search, true, messages);
- if (success) {
- assertTrue(check);
- } else {
- assertFalse(check);
- }
+ assertEquals(limits.isAllowed(conn1, search, true, messages), success);
limits.removeConnection(conn1);
}
@@ -254,7 +238,7 @@
public void testMaxThroughput()
throws Exception
{
- ArrayList<Message> messages = new ArrayList<Message>();
+ List<Message> messages = new ArrayList<Message>();
final long interval = 1000; // Unit is milliseconds
ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
@@ -276,35 +260,26 @@
InternalClientConnection conn = new InternalClientConnection(DN.NULL_DN);
limits.addConnection(conn);
- InternalSearchOperation search1 = conn.processSearch(
- DN.decode("dc=example,dc=com"),
- SearchScope.BASE_OBJECT,
- LDAPFilter.decode("(objectclass=*)").toSearchFilter());
+ final DN dn = DN.decode("dc=example,dc=com");
+ final SearchFilter all = SearchFilter.createFilterFromString("(objectclass=*)");
// First operation is allowed
- boolean check = limits.isAllowed(conn, search1, true, messages);
- assertTrue(check);
-
- InternalSearchOperation search2 = conn.processSearch(
- DN.decode("dc=example,dc=com"),
- SearchScope.BASE_OBJECT,
- LDAPFilter.decode("(objectclass=*)").toSearchFilter());
+ InternalSearchOperation search1 =
+ conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
+ assertTrue(limits.isAllowed(conn, search1, true, messages));
// Second operation in the same interval is refused
- check = limits.isAllowed(conn, search2, true, messages);
- assertFalse(check);
+ InternalSearchOperation search2 =
+ conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
+ assertFalse(limits.isAllowed(conn, search2, true, messages));
// Wait for the end of the interval => counters are reset
Thread.sleep(interval);
- InternalSearchOperation search3 = conn.processSearch(
- DN.decode("dc=example,dc=com"),
- SearchScope.BASE_OBJECT,
- LDAPFilter.decode("(objectclass=*)").toSearchFilter());
-
// The operation is allowed
- check = limits.isAllowed(conn, search3, true, messages);
- assertTrue(check);
+ InternalSearchOperation search3 =
+ conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
+ assertTrue(limits.isAllowed(conn, search3, true, messages));
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index c4d102e..b2d8ac5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -46,7 +46,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
-import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
@@ -462,9 +461,8 @@
* Handle the handshake processing with the connecting DS
* returns true if handshake was performed without errors
*/
- private boolean performHandshake()
+ private boolean performHandshake() throws Exception
{
- try
{
// Receive server start
ServerStartMsg serverStartMsg = (ServerStartMsg) session.receive();
@@ -520,17 +518,6 @@
TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
rsList);
session.publish(topologyMsg);
-
- } catch (IOException e)
- {
- fail("Unexpected io exception in fake replication server handshake " +
- "processing: " + e);
- return false;
- } catch (Exception e)
- {
- fail("Unexpected exception in fake replication server handshake " +
- "processing: " + e);
- return false;
}
return true;
}
@@ -558,7 +545,7 @@
/**
* Handle client connection then call code specific to configured test
*/
- private void handleClientConnection()
+ private void handleClientConnection() throws Exception
{
debugInfo("handleClientConnection " + testcase + " " + scenario);
// Handle DS connection
@@ -610,13 +597,12 @@
debugInfo("handleClientConnection " + testcase + " " + scenario + " done");
}
- /*
+ /**
* Make the RS send an add message with the passed entry and return the ack
* message it receives from the DS
*/
- private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws SocketTimeoutException
+ private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws Exception
{
- try
{
AddMsg addMsg =
new AddMsg(gen.newCSN(), entry.getDN(), UUID.randomUUID().toString(),
@@ -632,86 +618,51 @@
// Read and return matching ack
return (AckMsg)session.receive();
-
- } catch(SocketTimeoutException e)
- {
- throw e;
- } catch (Throwable t)
- {
- fail("Unexpected exception in fake replication server sendAddUpdate " +
- "processing: " + t);
- return null;
}
}
/**
* Read the coming update and check parameters are not assured
*/
- private void executeNotAssuredScenario()
+ private void executeNotAssuredScenario() throws Exception
{
+ UpdateMsg updateMsg = (UpdateMsg) session.receive();
+ checkUpdateAssuredParameters(updateMsg);
- try
- {
- UpdateMsg updateMsg = (UpdateMsg) session.receive();
- checkUpdateAssuredParameters(updateMsg);
-
- scenarioExecuted = true;
- } catch (Exception e)
- {
- fail("Unexpected exception in fake replication server executeNotAssuredScenario " +
- "processing: " + e);
- }
+ scenarioExecuted = true;
}
/**
* Read the coming update and make the client time out by not sending back
* the ack
*/
- private void executeTimeoutScenario()
+ private void executeTimeoutScenario() throws Exception
{
+ UpdateMsg updateMsg = (UpdateMsg) session.receive();
+ checkUpdateAssuredParameters(updateMsg);
- try
- {
- UpdateMsg updateMsg = (UpdateMsg) session.receive();
- checkUpdateAssuredParameters(updateMsg);
+ scenarioExecuted = true;
- scenarioExecuted = true;
-
- // We do not send back an ack and the client code is expected to be
- // blocked at least for the programmed timeout time.
-
- } catch (Exception e)
- {
- fail("Unexpected exception in fake replication server executeTimeoutScenario " +
- "processing: " + e + " testcase= " + testcase +
- " groupId=" + groupId);
- }
+ // We do not send back an ack and the client code is expected to be
+ // blocked at least for the programmed timeout time.
}
/**
* Read the coming update, sleep some time then send back an ack
*/
- private void executeNoTimeoutScenario()
+ private void executeNoTimeoutScenario() throws Exception
{
- try
- {
- UpdateMsg updateMsg = (UpdateMsg) session.receive();
- checkUpdateAssuredParameters(updateMsg);
+ UpdateMsg updateMsg = (UpdateMsg) session.receive();
+ checkUpdateAssuredParameters(updateMsg);
- // Sleep before sending back the ack
- sleep(NO_TIMEOUT_RS_SLEEP_TIME);
+ // Sleep before sending back the ack
+ sleep(NO_TIMEOUT_RS_SLEEP_TIME);
- // Send the ack without errors
- AckMsg ackMsg = new AckMsg(updateMsg.getCSN());
- session.publish(ackMsg);
+ // Send the ack without errors
+ AckMsg ackMsg = new AckMsg(updateMsg.getCSN());
+ session.publish(ackMsg);
- scenarioExecuted = true;
-
- } catch (Exception e)
- {
- fail("Unexpected exception in fake replication server executeNoTimeoutScenario " +
- "processing: " + e);
- }
+ scenarioExecuted = true;
}
/**
@@ -733,9 +684,8 @@
/**
* Read the coming safe read mode updates and send back acks with errors
*/
- private void executeSafeReadManyErrorsScenario()
+ private void executeSafeReadManyErrorsScenario() throws Exception
{
- try
{
// Read first update
UpdateMsg updateMsg = (UpdateMsg) session.receive();
@@ -779,20 +729,14 @@
// let timeout occur
scenarioExecuted = true;
-
- } catch (Exception e)
- {
- fail("Unexpected exception in fake replication server executeSafeReadManyErrorsScenario " +
- "processing: " + e);
}
}
/**
- * Read the coming seaf data mode updates and send back acks with errors
+ * Read the coming safe data mode updates and send back acks with errors
*/
- private void executeSafeDataManyErrorsScenario()
+ private void executeSafeDataManyErrorsScenario() throws Exception
{
- try
{
// Read first update
UpdateMsg updateMsg = (UpdateMsg) session.receive();
@@ -830,32 +774,12 @@
checkUpdateAssuredParameters(updateMsg);
// let timeout occur
-
scenarioExecuted = true;
-
- } catch (Exception e)
- {
- fail("Unexpected exception in fake replication server executeSafeDataManyErrorsScenario " +
- "processing: " + e);
}
}
}
/**
- * Sleep a while
- */
- private void sleep(long time)
- {
- try
- {
- Thread.sleep(time);
- } catch (InterruptedException ex)
- {
- fail("Error sleeping " + ex);
- }
- }
-
- /**
* Return various group id values
*/
@DataProvider(name = "rsGroupIdProvider")
@@ -877,7 +801,6 @@
@Test(dataProvider = "rsGroupIdProvider")
public void testSafeDataModeTimeout(byte rsGroupId) throws Exception
{
-
int TIMEOUT = 5000;
String testcase = "testSafeDataModeTimeout" + rsGroupId;
try
@@ -894,8 +817,7 @@
// Create a safe data assured domain
if (rsGroupId == (byte)1)
{
- safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 1,
- TIMEOUT);
+ safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT);
// Wait for connection of domain to RS
waitForConnectionToRs(testcase, replicationServer);
@@ -926,15 +848,11 @@
if (rsGroupId == (byte)1)
{
// RS has same group id as DS
- // In this scenario, the fake RS will not send back an ack so we expect
- // the add entry code (LDAP client code emulation) to be blocked for the
- // timeout value at least. If the time we have slept is lower, timeout
- // handling code is not working...
- assertTrue((endTime - startTime) >= TIMEOUT);
+ assertBlockedLongerThanTimeout(startTime, endTime, TIMEOUT);
assertTrue(replicationServer.isScenarioExecuted());
// Check monitoring values
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDN = DN.decode(SAFE_DATA_DN);
new MonitorAssertions(baseDN)
.assertValue("assured-sd-sent-updates", 1)
@@ -948,7 +866,7 @@
// No error should be seen in monitoring and update should have not been
// sent in assured mode
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDN = DN.decode(NOT_ASSURED_DN);
new MonitorAssertions(baseDN).assertRemainingValuesAreZero();
assertNoServerErrors(baseDN);
@@ -992,7 +910,6 @@
@Test(dataProvider = "rsGroupIdProvider")
public void testSafeReadModeTimeout(byte rsGroupId) throws Exception
{
-
int TIMEOUT = 5000;
String testcase = "testSafeReadModeTimeout" + rsGroupId;
try
@@ -1010,8 +927,7 @@
if (rsGroupId == (byte)1)
{
// Create a safe read assured domain
- safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
- TIMEOUT);
+ safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
// Wait for connection of domain to RS
waitForConnectionToRs(testcase, replicationServer);
@@ -1041,16 +957,11 @@
if (rsGroupId == (byte)1)
{
- // RS has same group id as DS
- // In this scenario, the fake RS will not send back an ack so we expect
- // the add entry code (LDAP client code emulation) to be blocked for the
- // timeout value at least. If the time we have slept is lower, timeout
- // handling code is not working...
- assertTrue((endTime - startTime) >= TIMEOUT);
+ assertBlockedLongerThanTimeout(startTime, endTime, TIMEOUT);
assertTrue(replicationServer.isScenarioExecuted());
// Check monitoring values
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDN = DN.decode(SAFE_READ_DN);
new MonitorAssertions(baseDN)
.assertValue("assured-sr-sent-updates", 1)
@@ -1065,7 +976,7 @@
// No error should be seen in monitoring and update should have not been
// sent in assured mode
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDN = DN.decode(NOT_ASSURED_DN);
new MonitorAssertions(baseDN).assertRemainingValuesAreZero();
assertNoServerErrors(baseDN);
@@ -1117,7 +1028,7 @@
* Wait for connection to the fake replication server or times out with error
* after some seconds
*/
- private void waitForConnectionToRs(String testCase, FakeReplicationServer rs)
+ private void waitForConnectionToRs(String testCase, FakeReplicationServer rs) throws Exception
{
int nsec = -1;
do
@@ -1125,7 +1036,7 @@
nsec++;
if (nsec == 10) // 10 seconds timeout
fail(testCase + ": timeout waiting for domain connection to fake RS after " + nsec + " seconds.");
- sleep(1000);
+ Thread.sleep(1000);
} while (!rs.isHandshakeOk());
}
@@ -1133,7 +1044,7 @@
* Wait for the scenario to be executed by the fake replication server or
* times out with error after some seconds
*/
- private void waitForScenarioExecutedOnRs(String testCase, FakeReplicationServer rs)
+ private void waitForScenarioExecutedOnRs(String testCase, FakeReplicationServer rs) throws Exception
{
int nsec = -1;
do
@@ -1141,7 +1052,7 @@
nsec++;
if (nsec == 10) // 10 seconds timeout
fail(testCase + ": timeout waiting for scenario to be exectued on fake RS after " + nsec + " seconds.");
- sleep(1000);
+ Thread.sleep(1000);
} while (!rs.isScenarioExecuted());
}
@@ -1163,7 +1074,6 @@
@Test
public void testSafeDataModeAck() throws Exception
{
-
int TIMEOUT = 5000;
String testcase = "testSafeDataModeAck";
try
@@ -1175,8 +1085,7 @@
replicationServer.start(NO_TIMEOUT_SCENARIO);
// Create a safe data assured domain
- safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 2,
- TIMEOUT);
+ safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 2, TIMEOUT);
// Wait for connection of domain to RS
waitForConnectionToRs(testcase, replicationServer);
@@ -1187,16 +1096,11 @@
"objectClass: organizationalUnit\n";
addEntry(TestCaseUtils.entryFromLdifString(entry));
- // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
- // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
- // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
- long endTime = System.currentTimeMillis();
- long callTime = endTime - startTime;
- assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+ assertBlockedForLessThanTimeout(startTime, TIMEOUT);
assertTrue(replicationServer.isScenarioExecuted());
// Check monitoring values
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDN = DN.decode(SAFE_DATA_DN);
new MonitorAssertions(baseDN)
.assertValue("assured-sd-sent-updates", 1)
@@ -1216,7 +1120,6 @@
@Test
public void testSafeReadModeAck() throws Exception
{
-
int TIMEOUT = 5000;
String testcase = "testSafeReadModeAck";
try
@@ -1227,8 +1130,7 @@
replicationServer.start(NO_TIMEOUT_SCENARIO);
// Create a safe read assured domain
- safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
- TIMEOUT);
+ safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
// Wait for connection of domain to RS
waitForConnectionToRs(testcase, replicationServer);
@@ -1239,16 +1141,11 @@
"objectClass: organizationalUnit\n";
addEntry(TestCaseUtils.entryFromLdifString(entry));
- // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
- // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
- // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
- long endTime = System.currentTimeMillis();
- long callTime = endTime - startTime;
- assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+ assertBlockedForLessThanTimeout(startTime, TIMEOUT);
assertTrue(replicationServer.isScenarioExecuted());
// Check monitoring values
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDN = DN.decode(SAFE_READ_DN);
new MonitorAssertions(baseDN)
.assertValue("assured-sr-sent-updates", 1)
@@ -1268,7 +1165,6 @@
@Test(dataProvider = "rsGroupIdProvider", groups = "slow")
public void testSafeReadModeReply(byte rsGroupId) throws Exception
{
-
int TIMEOUT = 5000;
String testcase = "testSafeReadModeReply";
try
@@ -1279,8 +1175,7 @@
replicationServer.start(NO_READ);
// Create a safe read assured domain
- safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
- TIMEOUT);
+ safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
// Wait for connection of domain to RS
waitForConnectionToRs(testcase, replicationServer);
@@ -1356,7 +1251,6 @@
@Test(dataProvider = "rsGroupIdProvider", groups = "slow")
public void testSafeDataModeReply(byte rsGroupId) throws Exception
{
-
int TIMEOUT = 5000;
String testcase = "testSafeDataModeReply";
try
@@ -1367,8 +1261,7 @@
replicationServer.start(NO_READ);
// Create a safe data assured domain
- safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4,
- TIMEOUT);
+ safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4, TIMEOUT);
// Wait for connection of domain to RS
waitForConnectionToRs(testcase, replicationServer);
@@ -1380,19 +1273,14 @@
Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN));
- AckMsg ackMsg;
- try
- {
- ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
- } catch (SocketTimeoutException e)
- {
- // Expected
- return;
- }
-
- fail("DS should not reply an ack in safe data mode, however, it replied: " +
- ackMsg);
- } finally
+ AckMsg ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
+ fail("DS should not reply an ack in safe data mode, however, it replied: " + ackMsg);
+ }
+ catch (SocketTimeoutException expected)
+ {
+ return;
+ }
+ finally
{
endTest(testcase);
}
@@ -1405,7 +1293,6 @@
@Test(groups = "slow")
public void testSafeDataManyErrors() throws Exception
{
-
int TIMEOUT = 5000;
String testcase = "testSafeDataManyErrors";
try
@@ -1417,8 +1304,7 @@
replicationServer.start(SAFE_DATA_MANY_ERRORS);
// Create a safe data assured domain
- safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 3,
- TIMEOUT);
+ safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 3, TIMEOUT);
// Wait for connection of domain to RS
waitForConnectionToRs(testcase, replicationServer);
@@ -1430,18 +1316,13 @@
"objectClass: organizationalUnit\n";
addEntry(TestCaseUtils.entryFromLdifString(entry));
- // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
- // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
- // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
- long endTime = System.currentTimeMillis();
- long callTime = endTime - startTime;
- assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+ assertBlockedForLessThanTimeout(startTime, TIMEOUT);
// Check monitoring values
// The expected ack for the first update is:
// - timeout error
// - server 10 error
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDN = DN.decode(SAFE_DATA_DN);
new MonitorAssertions(baseDN)
.assertValue("assured-sd-sent-updates", 1)
@@ -1453,18 +1334,13 @@
startTime = System.currentTimeMillis(); // Time the update has been initiated
deleteEntry(entryDn);
- // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
- // seconds, so we expect the delete entry code (LDAP client code emulation) to be blocked
- // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
- endTime = System.currentTimeMillis();
- callTime = endTime - startTime;
- assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+ assertBlockedForLessThanTimeout(startTime, TIMEOUT);
// Check monitoring values
// The expected ack for the second update is:
// - timeout error
// - server 10 error, server 20 error
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
baseDN = DN.decode(SAFE_DATA_DN);
new MonitorAssertions(baseDN)
.assertValue("assured-sd-sent-updates", 2)
@@ -1476,17 +1352,12 @@
startTime = System.currentTimeMillis(); // Time the update has been initiated
addEntry(TestCaseUtils.entryFromLdifString(entry));
- // In this scenario, the fake RS will not send back an ack so we expect
- // the add entry code (LDAP client code emulation) to be blocked for the
- // timeout value at least. If the time we have slept is lower, timeout
- // handling code is not working...
- endTime = System.currentTimeMillis();
- assertTrue((endTime - startTime) >= TIMEOUT);
+ assertBlockedLongerThanTimeout(startTime, System.currentTimeMillis(), TIMEOUT);
assertTrue(replicationServer.isScenarioExecuted());
// Check monitoring values
// No ack should have comen back, so timeout incremented (flag and error for rs)
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
baseDN = DN.decode(SAFE_DATA_DN);
new MonitorAssertions(baseDN)
.assertValue("assured-sd-sent-updates", 3)
@@ -1494,7 +1365,6 @@
.assertRemainingValuesAreZero();
assertServerErrorsSafeDataMode(baseDN,
entry(10, 2), entry(20, 1), entry(RS_SERVER_ID, 1));
-
} finally
{
endTest(testcase);
@@ -1502,13 +1372,36 @@
}
/**
+ * In this scenario, the fake RS will not send back an ack so we expect the
+ * add entry code (LDAP client code emulation) to be blocked for the timeout
+ * value at least. If the time we have slept is lower, timeout handling code
+ * is not working...
+ */
+ private void assertBlockedLongerThanTimeout(long startTime, long endTime, int TIMEOUT)
+ {
+ assertTrue((endTime - startTime) >= TIMEOUT);
+ }
+
+ /**
+ * In this scenario, the fake RS will send back an ack after
+ * NO_TIMEOUT_RS_SLEEP_TIME seconds, so we expect the add/delete entry code
+ * (LDAP client code emulation) to be blocked for more than
+ * NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
+ */
+ private void assertBlockedForLessThanTimeout(long startTime, int TIMEOUT)
+ {
+ long endTime = System.currentTimeMillis();
+ long callTime = endTime - startTime;
+ assertTrue(NO_TIMEOUT_RS_SLEEP_TIME <= callTime && callTime <= TIMEOUT);
+ }
+
+ /**
* DS performs many successive modifications in safe read mode and receives RS
* acks with various errors. Check for monitoring right errors
*/
@Test(groups = "slow")
public void testSafeReadManyErrors() throws Exception
{
-
int TIMEOUT = 5000;
String testcase = "testSafeReadManyErrors";
try
@@ -1519,8 +1412,7 @@
replicationServer.start(SAFE_READ_MANY_ERRORS);
// Create a safe read assured domain
- safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
- TIMEOUT);
+ safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
// Wait for connection of domain to RS
waitForConnectionToRs(testcase, replicationServer);
@@ -1532,18 +1424,13 @@
"objectClass: organizationalUnit\n";
addEntry(TestCaseUtils.entryFromLdifString(entry));
- // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
- // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
- // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
- long endTime = System.currentTimeMillis();
- long callTime = endTime - startTime;
- assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+ assertBlockedForLessThanTimeout(startTime, TIMEOUT);
// Check monitoring values
// The expected ack for the first update is:
// - replay error
// - server 10 error, server 20 error
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
DN baseDN = DN.decode(SAFE_READ_DN);
new MonitorAssertions(baseDN)
.assertValue("assured-sr-sent-updates", 1)
@@ -1556,12 +1443,7 @@
startTime = System.currentTimeMillis(); // Time the update has been initiated
deleteEntry(entryDn);
- // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
- // seconds, so we expect the delete entry code (LDAP client code emulation) to be blocked
- // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
- endTime = System.currentTimeMillis();
- callTime = endTime - startTime;
- assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
+ assertBlockedForLessThanTimeout(startTime, TIMEOUT);
// Check monitoring values
// The expected ack for the second update is:
@@ -1569,7 +1451,7 @@
// - wrong status error
// - replay error
// - server 10 error, server 20 error, server 30 error
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
new MonitorAssertions(baseDN)
.assertValue("assured-sr-sent-updates", 2)
.assertValue("assured-sr-not-acknowledged-updates", 2)
@@ -1584,17 +1466,12 @@
startTime = System.currentTimeMillis(); // Time the update has been initiated
addEntry(TestCaseUtils.entryFromLdifString(entry));
- // In this scenario, the fake RS will not send back an ack so we expect
- // the add entry code (LDAP client code emulation) to be blocked for the
- // timeout value at least. If the time we have slept is lower, timeout
- // handling code is not working...
- endTime = System.currentTimeMillis();
- assertTrue((endTime - startTime) >= TIMEOUT);
+ assertBlockedLongerThanTimeout(startTime, System.currentTimeMillis(), TIMEOUT);
assertTrue(replicationServer.isScenarioExecuted());
// Check monitoring values
// No ack should have comen back, so timeout incremented (flag and error for rs)
- sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+ Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
new MonitorAssertions(baseDN)
.assertValue("assured-sr-sent-updates", 3)
.assertValue("assured-sr-not-acknowledged-updates", 3)
@@ -1615,14 +1492,14 @@
*/
private void deleteEntry(String dn) throws Exception
{
- DN realDn = DN.decode(dn);
+ DN realDN = DN.decode(dn);
DeleteOperationBasis delOp = new DeleteOperationBasis(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection.
- nextMessageID(), null, realDn);
+ InternalClientConnection.nextOperationID(),
+ InternalClientConnection.nextMessageID(), null, realDN);
delOp.setInternalOperation(true);
delOp.run();
waitOpResult(delOp, ResultCode.SUCCESS);
- assertNull(DirectoryServer.getEntry(realDn));
+ assertNull(DirectoryServer.getEntry(realDN));
}
/**
@@ -1636,8 +1513,9 @@
AssuredMode assuredMode) throws Exception
{
// Find monitoring entry for requested base DN
- String monitorFilter =
- "(&(cn=Directory server*)(domain-name=" + baseDN + "))";
+ SearchFilter monitorFilter = SearchFilter.createFilterFromString(
+ "(&(cn=Directory server*)(domain-name=" + baseDN + "))");
+ DN dn = DN.decode("cn=replication,cn=monitor");
InternalSearchOperation op;
int count = 0;
@@ -1645,19 +1523,14 @@
{
if (count++>0)
Thread.sleep(100);
- op = connection.processSearch(
- ByteString.valueOf("cn=replication,cn=monitor"),
- SearchScope.WHOLE_SUBTREE,
- LDAPFilter.decode(monitorFilter));
+ op = connection.processSearch(dn, SearchScope.WHOLE_SUBTREE, monitorFilter);
}
- while (op.getSearchEntries().isEmpty() && (count<100));
- if (op.getSearchEntries().isEmpty())
- throw new Exception("Could not read monitoring information");
+ while (op.getSearchEntries().isEmpty() && count < 100);
+
+ Assertions.assertThat(op.getSearchEntries()).isNotEmpty();
SearchResultEntry entry = op.getSearchEntries().getFirst();
-
- if (entry == null)
- throw new Exception("Could not find monitoring entry");
+ assertNotNull(entry);
/*
* Find the multi valued attribute matching the requested assured mode
@@ -1676,37 +1549,33 @@
}
List<Attribute> attrs = entry.getAttribute(assuredAttr);
+ if (attrs == null || attrs.isEmpty())
+ return Collections.emptyMap();
- Map<Integer,Integer> resultMap = new HashMap<Integer,Integer>();
- if ( (attrs == null) || (attrs.isEmpty()) )
- return resultMap; // Empty map
-
- Attribute attr = attrs.get(0);
// Parse and store values
- for (AttributeValue val : attr) {
- String srvStr = val.toString();
- StringTokenizer strtok = new StringTokenizer(srvStr, ":");
- String token = strtok.nextToken();
- if (token != null) {
- int serverId = Integer.valueOf(token);
- token = strtok.nextToken();
- if (token != null) {
- Integer nerrors = Integer.valueOf(token);
- resultMap.put(serverId, nerrors);
+ Map<Integer,Integer> resultMap = new HashMap<Integer,Integer>();
+ for (AttributeValue val : attrs.get(0))
+ {
+ StringTokenizer strtok = new StringTokenizer(val.toString(), ":");
+
+ String serverId = strtok.nextToken();
+ if (serverId != null) {
+ String nbErrors = strtok.nextToken();
+ if (nbErrors != null) {
+ resultMap.put(Integer.valueOf(serverId), Integer.valueOf(nbErrors));
}
}
}
-
return resultMap;
}
- private void waitOpResult(Operation operation, ResultCode expectedResult)
+ private void waitOpResult(Operation operation, ResultCode expectedResult) throws Exception
{
int ii=0;
while((operation.getResultCode()==ResultCode.UNDEFINED) ||
(operation.getResultCode()!=expectedResult))
{
- sleep(50);
+ Thread.sleep(50);
ii++;
if (ii>10)
assertEquals(operation.getResultCode(), expectedResult,
@@ -1714,4 +1583,3 @@
}
}
}
-
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 36a4d91..87cdf24 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -35,6 +35,7 @@
import java.net.SocketTimeoutException;
import java.util.*;
+import org.assertj.core.api.Assertions;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.task.TaskState;
@@ -646,33 +647,23 @@
}
/**
- * Chaining tests of the replication Server code with 2 replication servers involved
- * 2 tests are done here (itest=0 or itest=1)
- *
- * Test 1
- * - Create replication server 1
- * - Create replication server 2 connected with replication server 1
- * - Create and connect client 1 to replication server 1
- * - Create and connect client 2 to replication server 2
- * - Make client1 publish changes
- * - Check that client 2 receives the changes published by client 1
- *
- * Test 2
- * - Create replication server 1
- * - Create and connect client1 to replication server 1
- * - Make client1 publish changes
- * - Create replication server 2 connected with replication server 1
- * - Create and connect client 2 to replication server 2
- * - Check that client 2 receives the changes published by client 1
+ * <ol>
+ * <li>Create replication server 1</li>
+ * <li>Create replication server 2 connected with replication server 1</li>
+ * <li>Create and connect client 1 to replication server 1</li>
+ * <li>Create and connect client 2 to replication server 2</li>
+ * <li>Make client1 publish changes</li>
+ * <li>Check that client 2 receives the changes published by client 1</li>
+ * </ol>
*/
@Test(enabled=true, dependsOnMethods = { "searchBackend"})
- public void changelogChaining() throws Exception
+ public void changelogChaining0() throws Exception
{
- debugInfo("Starting changelogChaining");
+ final String tn = "changelogChaining0";
+ debugInfo("Starting " + tn);
replicationServer.clearDb();
TestCaseUtils.initializeTestBackend(true);
- for (int itest = 0; itest <2; itest++)
{
ReplicationBroker broker2 = null;
boolean emptyOldChanges = true;
@@ -683,13 +674,11 @@
int[] changelogIds = new int[] { 80, 81 };
int[] brokerIds = new int[] { 100, 101 };
- for (int i = 0; i < ((itest == 0) ? 2 : 1); i++)
+ for (int i = 0; i < 2; i++)
{
changelogs[i] = null;
- // for itest=0, create the 2 connected replicationServer
- // for itest=1, create the 1rst replicationServer, the second
- // one will be created later
+ // create the 2 connected replicationServer
SortedSet<String> servers = new TreeSet<String>();
servers.add(
"localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
@@ -703,30 +692,118 @@
try
{
- // For itest=0, create and connect client1 to changelog1
- // and client2 to changelog2
- // For itest=1, only create and connect client1 to changelog1
- // client2 will be created later
+ // create and connect client1 to changelog1 and client2 to changelog2
broker1 = openReplicationSession(TEST_ROOT_DN,
brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
assertTrue(broker1.isConnected());
-
- if (itest == 0)
- {
- broker2 = openReplicationSession(TEST_ROOT_DN,
- brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
- assertTrue(broker2.isConnected());
- }
+ broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100,
+ changelogPorts[0], 1000, !emptyOldChanges);
+ assertTrue(broker2.isConnected());
// - Test messages between clients by publishing now
// - Delete
- long time = TimeThread.getTime();
- int ts = 1;
- CSN csn = new CSN(time, ts++, brokerIds[0]);
+ CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
+ DN dn = DN.decode("o=example" + 0 + "," + TEST_ROOT_DN_STRING);
+ DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
+ broker1.publish(delMsg);
- DN dn = DN.decode("o=example" + itest + "," + TEST_ROOT_DN_STRING);
- DeleteMsg delMsg = new DeleteMsg(dn, csn, "uid");
+ String user1entryUUID = "33333333-3333-3333-3333-333333333333";
+ String baseUUID = "22222222-2222-2222-2222-222222222222";
+
+ // - Add
+ Entry entry = TestCaseUtils.entryFromLdifString(
+ "dn: o=example," + TEST_ROOT_DN_STRING + "\n"
+ + "objectClass: top\n" + "objectClass: domain\n"
+ + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
+ AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
+ user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
+ entry.getAttributes(), new ArrayList<Attribute>());
+ broker1.publish(addMsg);
+
+ // - Modify
+ Attribute attr1 = Attributes.create("description", "new value");
+ Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(mod1);
+ ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
+ broker1.publish(modMsg);
+
+ // - ModifyDN
+ ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null,
+ EXAMPLE_DN, RDN.decode("o=example2"), true, null);
+ op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
+ LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
+ ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
+ broker1.publish(modDNMsg);
+
+ // - Check msg receives by broker, through changeLog2
+ List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4);
+ Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg);
+ debugInfo("Ending " + tn);
+ }
+ finally
+ {
+ remove(changelogs);
+ stop(broker1, broker2);
+ }
+ }
+ }
+
+ /**
+ * <ol>
+ * <li>Create replication server 1</li>
+ * <li>Create and connect client1 to replication server 1</li>
+ * <li>Make client1 publish changes</li>
+ * <li>Create replication server 2 connected with replication server 1</li>
+ * <li>Create and connect client 2 to replication server 2</li>
+ * <li>Check that client 2 receives the changes published by client 1</li>
+ * <ol>
+ */
+ @Test(enabled=true, dependsOnMethods = { "searchBackend"})
+ public void changelogChaining1() throws Exception
+ {
+ final String tn = "changelogChaining1";
+ debugInfo("Starting " + tn);
+ replicationServer.clearDb();
+ TestCaseUtils.initializeTestBackend(true);
+
+ {
+ ReplicationBroker broker2 = null;
+ boolean emptyOldChanges = true;
+
+ // - Create 2 connected replicationServer
+ ReplicationServer[] changelogs = new ReplicationServer[2];
+ int[] changelogPorts = TestCaseUtils.findFreePorts(2);
+ int[] changelogIds = new int[] { 80, 81 };
+ int[] brokerIds = new int[] { 100, 101 };
+
+ {
+ // create the 1rst replicationServer, the second one will be created later
+ SortedSet<String> servers = new TreeSet<String>();
+ servers.add("localhost:" + changelogPorts[1]);
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb"+0, 0,
+ changelogIds[0], 0, 100, servers);
+ changelogs[0] = new ReplicationServer(conf);
+ }
+
+ ReplicationBroker broker1 = null;
+
+ try
+ {
+ // only create and connect client1 to changelog1 client2 will be created later
+ broker1 = openReplicationSession(TEST_ROOT_DN,
+ brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
+ assertTrue(broker1.isConnected());
+
+ // - Test messages between clients by publishing now
+
+ // - Delete
+ CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
+
+ DN dn = DN.decode("o=example" + 1 + "," + TEST_ROOT_DN_STRING);
+ DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
broker1.publish(delMsg);
String user1entryUUID = "33333333-3333-3333-3333-333333333333";
@@ -737,8 +814,7 @@
+ "objectClass: top\n" + "objectClass: domain\n"
+ "entryUUID: 11111111-1111-1111-1111-111111111111\n";
Entry entry = TestCaseUtils.entryFromLdifString(lentry);
- csn = new CSN(time, ts++, brokerIds[0]);
- AddMsg addMsg = new AddMsg(csn, EXAMPLE_DN,
+ AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
entry.getAttributes(), new ArrayList<Attribute>());
broker1.publish(addMsg);
@@ -748,88 +824,32 @@
Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
List<Modification> mods = new ArrayList<Modification>();
mods.add(mod1);
- csn = new CSN(time, ts++, brokerIds[0]);
- ModifyMsg modMsg = new ModifyMsg(csn, EXAMPLE_DN, mods, "fakeuniqueid");
+ ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
broker1.publish(modMsg);
// - ModifyDN
- csn = new CSN(time, ts++, brokerIds[0]);
ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null,
EXAMPLE_DN, RDN.decode("o=example2"), true, null);
- op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csn, "uniqueid",
- "newparentId"));
- LocalBackendModifyDNOperation localOp =
- new LocalBackendModifyDNOperation(op);
+ op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
+ LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
broker1.publish(modDNMsg);
- if (itest > 0)
- {
- SortedSet<String> servers = new TreeSet<String>();
- servers.add("localhost:"+changelogPorts[0]);
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPorts[1], null, 0,
- changelogIds[1], 0, 100, null);
- changelogs[1] = new ReplicationServer(conf);
+ SortedSet<String> servers = new TreeSet<String>();
+ servers.add("localhost:"+changelogPorts[0]);
+ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
+ changelogPorts[1], null, 0, changelogIds[1], 0, 100, null);
+ changelogs[1] = new ReplicationServer(conf);
- // Connect broker 2 to changelog2
- broker2 = openReplicationSession(TEST_ROOT_DN,
- brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
- assertTrue(broker2.isConnected());
- }
+ // Connect broker 2 to changelog2
+ broker2 = openReplicationSession(TEST_ROOT_DN,
+ brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
+ assertTrue(broker2.isConnected());
// - Check msg receives by broker, through changeLog2
- while (ts > 1)
- {
- ReplicationMsg msg2;
- try
- {
- msg2 = broker2.receive();
- if (msg2 == null)
- break;
- broker2.updateWindowAfterReplay();
- }
- catch (Exception e)
- {
- fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest);
- break;
- }
-
- if (msg2 instanceof DeleteMsg)
- {
- if (delMsg.equals(msg2))
- ts--;
- }
- else if (msg2 instanceof AddMsg)
- {
- if (addMsg.equals(msg2))
- ts--;
- }
- else if (msg2 instanceof ModifyMsg)
- {
- if (modMsg.equals(msg2))
- ts--;
- }
- else if (msg2 instanceof ModifyDNMsg)
- {
- if (modDNMsg.equals(msg2))
- ts--;
- }
- else if (msg2 instanceof TopologyMsg)
- {
- // Nothing to test here.
- }
- else
- {
- fail("ReplicationServer transmission failed: no expected message" +
- " class: " + msg2);
- break;
- }
- }
- // Check that everything expected has been received
- assertEquals(ts, 1, "Broker2 did not receive the complete set of"
- + " expected messages: #msg received " + ts);
- debugInfo("Ending changelogChaining");
+ List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4);
+ Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg);
+ debugInfo("Ending " + tn);
}
finally
{
@@ -839,6 +859,30 @@
}
}
+ private List<ReplicationMsg> receiveReplicationMsgs(ReplicationBroker broker2, int nbMessagesExpected)
+ {
+ List<ReplicationMsg> msgs = new ArrayList<ReplicationMsg>(nbMessagesExpected);
+ for (int i = 0; i < nbMessagesExpected; i++)
+ {
+ try
+ {
+ ReplicationMsg msg = broker2.receive();
+ if (msg == null)
+ break;
+ if (msg instanceof TopologyMsg)
+ continue; // ignore
+ msgs.add(msg);
+
+ broker2.updateWindowAfterReplay();
+ }
+ catch (SocketTimeoutException e)
+ {
+ fail("Broker receive failed: " + e.getMessage() + "#Msg:" + i);
+ }
+ }
+ return msgs;
+ }
+
/**
* Test that the Replication sends back correctly WindowsUpdate
* when we send a WindowProbeMsg.
@@ -859,7 +903,7 @@
* Some other tests may have been running before and therefore
* may have pushed some changes to the Replication Server
* When a new session is opened, the Replication Server is therefore
- * going to send all thoses old changes to this Replication Server.
+ * going to send all these old changes to this Replication Server.
* To avoid this, this test open a first session, save the
* ServerState from the ReplicationServer, close the session
* and re-open a new connection providing the ServerState it just
@@ -1207,28 +1251,26 @@
private List<UpdateMsg> createChanges(String suffix, int serverId) throws Exception
{
List<UpdateMsg> l = new ArrayList<UpdateMsg>();
- long time = TimeThread.getTime();
- int ts = 1;
{
String user1entryUUID = "33333333-3333-3333-3333-333333333333";
String baseUUID = "22222222-2222-2222-2222-222222222222";
// - Add
- String lentry = "dn: "+suffix+"\n"
+ Entry entry = TestCaseUtils.entryFromLdifString(
+ "dn: "+suffix+"\n"
+ "objectClass: top\n"
+ "objectClass: domain\n"
- + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
- Entry entry = TestCaseUtils.entryFromLdifString(lentry);
- CSN csn = new CSN(time, ts++, serverId);
+ + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
+ CSNGenerator csnGen = new CSNGenerator(serverId, TimeThread.getTime());
DN exampleSuffixDN = DN.decode("o=example," + suffix);
- AddMsg addMsg = new AddMsg(csn, exampleSuffixDN,
+ AddMsg addMsg = new AddMsg(csnGen.newCSN(), exampleSuffixDN,
user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
entry.getAttributes(), new ArrayList<Attribute>());
l.add(addMsg);
// - Add
- String luentry =
+ Entry uentry = TestCaseUtils.entryFromLdifString(
"dn: cn=Fiona Jensen,ou=People,"+suffix+"\n"
+ "objectClass: top\n"
+ "objectclass: person\n"
@@ -1239,12 +1281,10 @@
+ "givenName: fjensen\n"
+ "telephonenumber: +1 408 555 1212\n"
+ "entryUUID: " + user1entryUUID +"\n"
- + "userpassword: fjen$$en" + "\n";
- Entry uentry = TestCaseUtils.entryFromLdifString(luentry);
- csn = new CSN(time, ts++, serverId);
+ + "userpassword: fjen$$en" + "\n");
DN newPersonDN = DN.decode("uid=new person,ou=People,"+suffix);
AddMsg addMsg2 = new AddMsg(
- csn,
+ csnGen.newCSN(),
newPersonDN,
user1entryUUID,
baseUUID,
@@ -1263,22 +1303,18 @@
List<Modification> mods = Arrays.asList(mod1, mod2, mod3);
- csn = new CSN(time, ts++, serverId);
DN dn = exampleSuffixDN;
- ModifyMsg modMsg = new ModifyMsg(csn, dn,
- mods, "fakeuniqueid");
+ ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), dn, mods, "fakeuniqueid");
l.add(modMsg);
// Modify DN
- csn = new CSN(time, ts++, serverId);
- ModifyDNMsg modDnMsg = new ModifyDNMsg(newPersonDN, csn,
+ ModifyDNMsg modDnMsg = new ModifyDNMsg(newPersonDN, csnGen.newCSN(),
user1entryUUID, baseUUID, false,
"uid=wrong, ou=people,"+suffix, "uid=newrdn");
l.add(modDnMsg);
// Del
- csn = new CSN(time, ts++, serverId);
- DeleteMsg delMsg = new DeleteMsg(exampleSuffixDN, csn, "uid");
+ DeleteMsg delMsg = new DeleteMsg(exampleSuffixDN, csnGen.newCSN(), "uid");
l.add(delMsg);
}
return l;
@@ -1567,8 +1603,8 @@
changelogs[i] = new ReplicationServer(conf);
}
- try
- {
+ try
+ {
// Create and connect client1 to changelog1
// and client2 to changelog2
broker1 = openReplicationSession(TEST_ROOT_DN,
@@ -1580,9 +1616,7 @@
assertTrue(broker2.isConnected());
// - Test messages between clients by publishing now
- long time = TimeThread.getTime();
- int ts = 1;
- CSN csn;
+ CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
String user1entryUUID = "33333333-3333-3333-3333-333333333333";
String baseUUID = "22222222-2222-2222-2222-222222222222";
@@ -1591,10 +1625,9 @@
+ "objectClass: top\n" + "objectClass: domain\n"
+ "entryUUID: " + user1entryUUID + "\n";
Entry entry = TestCaseUtils.entryFromLdifString(lentry);
- csn = new CSN(time, ts++, brokerIds[0]);
- AddMsg addMsg = new AddMsg(csn, EXAMPLE_DN,
- user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
- .getAttributes(), new ArrayList<Attribute>());
+ AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
+ user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
+ entry.getAttributes(), new ArrayList<Attribute>());
broker1.publish(addMsg);
// - Modify
@@ -1602,47 +1635,12 @@
Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
List<Modification> mods = new ArrayList<Modification>();
mods.add(mod1);
- csn = new CSN(time, ts++, brokerIds[0]);
- ModifyMsg modMsg = new ModifyMsg(csn, EXAMPLE_DN, mods, "fakeuniqueid");
+ ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
broker1.publish(modMsg);
// - Check msg received by broker, through changeLog2
-
- while (ts > 1)
- {
- ReplicationMsg msg2;
- try
- {
- msg2 = broker2.receive();
- if (msg2 == null)
- break;
- broker2.updateWindowAfterReplay();
- }
- catch (Exception e)
- {
- fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts);
- break;
- }
-
- if (msg2 instanceof AddMsg)
- {
- if (addMsg.equals(msg2))
- ts--;
- }
- else if (msg2 instanceof ModifyMsg)
- {
- if (modMsg.equals(msg2))
- ts--;
- }
- else
- {
- fail("ReplicationServer transmission failed: did not expect message of class: " + msg2);
- break;
- }
- }
- // Check that everything expected has been received
- assertEquals(ts, 1, "Broker2 did not receive the complete set of"
- + " expected messages: #msg received " + ts);
+ List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 2);
+ Assertions.assertThat(msgs).containsExactly(addMsg, modMsg);
// Then change the config to remove replicationServer[1] from
// the configuration of replicationServer[0]
@@ -1657,41 +1655,43 @@
// The link between RS[0] & RS[1] should be destroyed by the new configuration.
// So we expect a timeout exception when calling receive on RS[1].
// Send an update and check that RS[1] does not receive the message after the timeout
- try
- {
- // - Del
- csn = new CSN(time, ts++, brokerIds[0]);
- DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csn, user1entryUUID);
- broker1.publish(delMsg);
- // Should receive some TopologyMsg messages for disconnection
- // between the 2 RSs
- ReplicationMsg msg = null;
- while (true)
- {
- msg = broker2.receive();
- if (msg instanceof TopologyMsg)
- {
- debugInfo("Broker 2 received: " + msg);
- } else
- {
- fail("Broker: receive successed when it should fail. " +
- "This broker was disconnected by configuration." +
- " Received: " + msg);
- }
- }
- }
- catch (SocketTimeoutException soExc)
- {
- // the receive fail as expected
- debugInfo("Ending replicationServerConnected");
- return;
- }
- }
- finally
- {
+
+ // - Del
+ DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csnGen.newCSN(), user1entryUUID);
+ broker1.publish(delMsg);
+ // Should receive some TopologyMsg messages for disconnection between the 2 RSs
+ assertOnlyTopologyMsgsReceived(broker2);
+ }
+ finally
+ {
remove(changelogs);
stop(broker1, broker2);
- }
+ }
+ }
+
+ private void assertOnlyTopologyMsgsReceived(ReplicationBroker broker2)
+ {
+ try
+ {
+ while (true)
+ {
+ ReplicationMsg msg = broker2.receive();
+ if (msg instanceof TopologyMsg)
+ {
+ debugInfo("Broker 2 received: " + msg);
+ }
+ else
+ {
+ fail("Broker: receive successed when it should fail. "
+ + "This broker was disconnected by configuration."
+ + " Received: " + msg);
+ }
+ }
+ }
+ catch (SocketTimeoutException expected)
+ {
+ debugInfo("Ending replicationServerConnected");
+ }
}
}
--
Gitblit v1.10.0