From cbe451be8db8a35dce815148d04dc47d49ef4742 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 15 Oct 2013 07:30:03 +0000
Subject: [PATCH] Fixing UpdateOperationTest.csnGeneratorAdjust() test failure in Continuous Integration.
---
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 80 +++++++++++--------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java | 133 +++++++++++++++++---------------
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java | 6 +
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 9 ++
4 files changed, 131 insertions(+), 97 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
index 1ae8b2c..4e46086 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -293,4 +293,10 @@
return sslEncryption;
}
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " " + (sslEncryption ? "with SSL" : "");
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 8bd15cb..e4bfe14 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -82,7 +82,7 @@
*/
public final static String NO_CONNECTED_SERVER = "Not connected";
private volatile String replicationServer = NO_CONNECTED_SERVER;
- private volatile Session session = null;
+ private volatile Session session;
private final ServerState state;
private final DN baseDN;
private final int serverId;
@@ -1284,7 +1284,8 @@
// Send our Start Session
StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
startECLSessionMsg.setOperationId("-1");
- session.publish(startECLSessionMsg);
+ final Session localSession = session;
+ localSession.publish(startECLSessionMsg);
// FIXME ECL In the handshake phase two, should RS send back a topo msg ?
if (debugEnabled())
@@ -1294,7 +1295,7 @@
}
// Alright set the timeout to the desired value
- session.setSoTimeout(timeout);
+ localSession.setSoTimeout(timeout);
connected = true;
} catch (Exception e)
{
@@ -1319,8 +1320,6 @@
private TopologyMsg performPhaseTwoHandshake(String server,
ServerStatus initStatus)
{
- TopologyMsg topologyMsg;
-
try
{
/*
@@ -1347,12 +1346,13 @@
startSessionMsg =
new StartSessionMsg(initStatus, new ArrayList<String>());
}
- session.publish(startSessionMsg);
+ final Session localSession = session;
+ localSession.publish(startSessionMsg);
/*
* Read the TopologyMsg that should come back.
*/
- topologyMsg = (TopologyMsg) session.receive();
+ TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
if (debugEnabled())
{
@@ -1361,8 +1361,8 @@
}
// Alright set the timeout to the desired value
- session.setSoTimeout(timeout);
-
+ localSession.setSoTimeout(timeout);
+ return topologyMsg;
} catch (Exception e)
{
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
@@ -1372,9 +1372,8 @@
setSession(null);
// Be sure to return null.
- topologyMsg = null;
+ return null;
}
- return topologyMsg;
}
/**
@@ -1423,6 +1422,7 @@
* local DS
* - replication server in the same VM as local DS one
*/
+ // TODO JNR log why an RS was evicted as best server
Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
/*
The list of best replication servers is filtered with each criteria. At
@@ -2225,10 +2225,10 @@
Check the session. If it has changed, some disconnection or
reconnection happened and we need to restart from scratch.
*/
-
- if (session != null && session == currentSession)
+ final Session localSession = session;
+ if (localSession != null && session == currentSession)
{
- session.publish(msg);
+ localSession.publish(msg);
done = true;
}
}
@@ -2243,8 +2243,10 @@
window update message was lost somehow...
then loop to check again if connection was closed.
*/
- if (session != null) {
- session.publish(new WindowProbeMsg());
+ Session localSession = session;
+ if (localSession != null)
+ {
+ localSession.publish(new WindowProbeMsg());
}
}
}
@@ -2330,8 +2332,8 @@
// Save session information for later in case we need it for log messages
// after the session has been closed and/or failed.
- final Session savedSession = session;
- if (savedSession == null)
+ final Session localSession = session;
+ if (localSession == null)
{
// Must be shutting down.
break;
@@ -2340,7 +2342,7 @@
final int previousRsServerID = rsServerId;
try
{
- ReplicationMsg msg = savedSession.receive();
+ ReplicationMsg msg = localSession.receive();
if (msg instanceof UpdateMsg)
{
synchronized (this)
@@ -2372,12 +2374,12 @@
{
// RS performs a proper disconnection
Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
- previousRsServerID, savedSession.getReadableRemoteAddress(),
+ previousRsServerID, localSession.getReadableRemoteAddress(),
serverId, baseDN.toNormalizedString());
logError(message);
// Try to find a suitable RS
- reStart(savedSession, true);
+ reStart(localSession, true);
}
else if (msg instanceof MonitorMsg)
{
@@ -2436,14 +2438,15 @@
{
message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
serverId, previousRsServerID,
- savedSession.getReadableRemoteAddress(),
+ localSession.getReadableRemoteAddress(),
baseDN.toNormalizedString());
}
else
{
+ // TODO JNR log why an RS was evicted as best server
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
serverId, previousRsServerID,
- savedSession.getReadableRemoteAddress(),
+ localSession.getReadableRemoteAddress(),
bestServerInfo.getServerId(),
baseDN.toNormalizedString());
}
@@ -2480,13 +2483,13 @@
// We did not initiate the close on our side, log an error message.
Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
serverId, baseDN.toNormalizedString(), previousRsServerID,
- savedSession.getReadableRemoteAddress());
+ localSession.getReadableRemoteAddress());
logError(message);
}
if (reconnectOnFailure)
{
- reStart(savedSession, true);
+ reStart(localSession, true);
}
else
{
@@ -2546,9 +2549,10 @@
try
{
updateDoneCount++;
- if ((updateDoneCount >= halfRcvWindow) && (session != null))
+ final Session localSession = session;
+ if (updateDoneCount >= halfRcvWindow && localSession != null)
{
- session.publish(new WindowMsg(updateDoneCount));
+ localSession.publish(new WindowMsg(updateDoneCount));
rcvWindow += updateDoneCount;
updateDoneCount = 0;
}
@@ -2598,9 +2602,10 @@
public void setSoTimeout(int timeout) throws SocketException
{
this.timeout = timeout;
- if (session != null)
+ final Session localSession = session;
+ if (localSession != null)
{
- session.setSoTimeout(timeout);
+ localSession.setSoTimeout(timeout);
}
}
@@ -2905,14 +2910,14 @@
// Start a CSN heartbeat thread.
if (changeTimeHeartbeatSendInterval > 0)
{
- String threadName = "Replica DS(" + getServerId()
+ final Session localSession = session;
+ final String threadName = "Replica DS(" + getServerId()
+ ") change time heartbeat publisher for domain \""
- + this.baseDN + "\" to RS(" + getRsServerId()
- + ") at " + session.getReadableRemoteAddress();
+ + baseDN + "\" to RS(" + getRsServerId()
+ + ") at " + localSession.getReadableRemoteAddress();
ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
- threadName, session, changeTimeHeartbeatSendInterval,
- serverId);
+ threadName, localSession, changeTimeHeartbeatSendInterval, serverId);
ctHeartbeatPublisherThread.start();
} else
{
@@ -3030,4 +3035,11 @@
DirectoryServer.deregisterMonitorProvider(monitor);
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " " + baseDN + " " + serverId;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 5db7c3b..4cb07d3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -69,7 +69,7 @@
* should read the list of replication servers from the configuration,
* instantiate a {@link ServerState} then start the publish service
* by calling
- * {@link #startPublishService(Collection, int, long, long)}.
+ * {@link #startPublishService(Set, int, long, long)}.
* At this point it can start calling the {@link #publish(UpdateMsg)}
* method if needed.
* <p>
@@ -3675,4 +3675,11 @@
{
return state.getCSN(serverID);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
+ }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
index e65deab..64e72ed 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -27,6 +27,7 @@
*/
package org.opends.server.replication;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -375,14 +376,12 @@
// Check that the modify has been replayed.
found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
- "description", "Description was changed",
- 10000, true);
+ "description", "Description was changed", 10000, true);
assertTrue(found, "The second modification was not replayed.");
// Delete the entries to clean the database.
- DeleteMsg delMsg = new DeleteMsg(personWithUUIDEntry.getDN(), gen.newCSN(), user1entryUUID);
- broker.publish(delMsg);
-
+ broker.publish(
+ new DeleteMsg(personWithUUIDEntry.getDN(), gen.newCSN(), user1entryUUID));
assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false),
"The DELETE replication message was not replayed");
}
@@ -552,16 +551,12 @@
* Open a session to the replicationServer using the ReplicationServer broker API.
* This must use a serverId different from the LDAP server ID
*/
+ final int serverId = 2;
ReplicationBroker broker =
- openReplicationSession(baseDN, 2, 100, replServerPort, 1000, true);
-
+ openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
try
{
- /*
- * Create a CSN generator to generate new CSNs when we need to send
- * operations messages to the replicationServer.
- */
- CSNGenerator gen = new CSNGenerator(2, 0);
+ CSNGenerator gen = new CSNGenerator(serverId, 0);
/*
* Test that the conflict resolution code is able to find entries
@@ -656,18 +651,17 @@
* the same UUID has the entry that has been used in the tests above.
* Finally check that the delete operation has been applied.
*/
- // send a delete operation with a wrong dn but the unique ID of the entry
- // used above
- DN delDN = DN.decode("cn=anotherdn," + baseDN);
- DeleteMsg delMsg = new DeleteMsg(delDN, gen.newCSN(), user1entryUUID);
- updateMonitorCount(baseDN, resolvedMonitorAttr);
+ // send a delete operation with a wrong dn but the unique ID of the entry
+ // used above
+ updateMonitorCount(baseDN, resolvedMonitorAttr);
alertCount = DummyAlertHandler.getAlertCount();
- broker.publish(delMsg);
+ DN delDN = DN.decode("cn=anotherdn," + baseDN);
+ broker.publish(new DeleteMsg(delDN, gen.newCSN(), user1entryUUID));
- // check that the delete operation has been applied
- assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false),
- "The DELETE replication message was not replayed");
- assertEquals(getMonitorDelta(), 1);
+ // check that the delete operation has been applied
+ assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false),
+ "The DELETE replication message was not replayed");
+ assertEquals(getMonitorDelta(), 1);
assertConflictAutomaticallyResolved(alertCount);
/*
@@ -739,15 +733,15 @@
* To achieve this send a delete operation with a correct DN
* but a wrong unique ID.
*/
- delMsg = new DeleteMsg(newPersonDN, gen.newCSN(), "11111111-9abc-def0-1234-1234567890ab");
- updateMonitorCount(baseDN, resolvedMonitorAttr);
+ updateMonitorCount(baseDN, resolvedMonitorAttr);
alertCount = DummyAlertHandler.getAlertCount();
- broker.publish(delMsg);
+ broker.publish(
+ new DeleteMsg(newPersonDN, gen.newCSN(), "11111111-9abc-def0-1234-1234567890ab"));
- // check that the delete operation has not been applied
+ // check that the delete operation has not been applied
assertNotNull(getEntry(newPersonDN, 10000, true),
"The DELETE replication message was replayed when it should not");
- assertEquals(getMonitorDelta(), 1);
+ assertEquals(getMonitorDelta(), 1);
assertConflictAutomaticallyResolved(alertCount);
@@ -823,20 +817,14 @@
assertNewAlertsGenerated(alertCount, 1);
- // delete the entries to clean the database
- DN delDN2 = DN.decode("entryUUID = " + user1entrysecondUUID + "+"
- + user1dn.getRDN() + "," + baseDN);
- delMsg = new DeleteMsg(delDN2, gen.newCSN(), user1entrysecondUUID);
- broker.publish(delMsg);
-
- // check that the delete operation has been applied
+ // delete the entries to clean the database
+ DN delDN2 = DN.decode(
+ "entryUUID = " + user1entrysecondUUID + "+" + user1dn.getRDN() + "," + baseDN);
+ broker.publish(new DeleteMsg(delDN2, gen.newCSN(), user1entrysecondUUID));
assertNull(getEntry(delDN2, 10000, false),
- "The DELETE replication message was not replayed");
+ "The DELETE replication message was not replayed");
- delMsg = new DeleteMsg(reallyNewDN, gen.newCSN(), user1entryUUID);
- broker.publish(delMsg);
-
- // check that the delete operation has been applied
+ broker.publish(new DeleteMsg(reallyNewDN, gen.newCSN(), user1entryUUID));
assertNull(getEntry(reallyNewDN, 10000, false),
"The DELETE replication message was not replayed");
@@ -859,7 +847,7 @@
DN baseDN1 = DN.decode("ou=baseDn1," + baseDN);
DN baseDN2 = DN.decode("ou=baseDn2," + baseDN);
- // - create parent entry 1 with baseDn1
+ // - create parent entry 1 with baseDn1
connection.processAdd(TestCaseUtils.entryFromLdifString(
"dn: " + baseDN1 + "\n"
+ "objectClass: top\n"
@@ -926,12 +914,11 @@
DN conflictDomain3dn = DN.decode(
"entryUUID = " + domain3uid + "+dc=domain3," + baseDN);
- updateMonitorCount(baseDN, unresolvedMonitorAttr);
+ updateMonitorCount(baseDN, unresolvedMonitorAttr);
alertCount = DummyAlertHandler.getAlertCount();
- // delete domain1
- delMsg = new DeleteMsg(domain1dn, olderCSN, domain1uid);
- broker.publish(delMsg);
+ // delete domain1
+ broker.publish(new DeleteMsg(domain1dn, olderCSN, domain1uid));
// check that the domain1 has correctly been deleted
assertNull(getEntry(domain1dn, 10000, false),
@@ -971,12 +958,11 @@
gen.adjust(addCSN);
domain3uid = getEntryUUID(domain3dn);
- updateMonitorCount(baseDN, unresolvedMonitorAttr);
+ updateMonitorCount(baseDN, unresolvedMonitorAttr);
alertCount = DummyAlertHandler.getAlertCount();
- // delete domain1
- delMsg = new DeleteMsg(domain1dn, gen.newCSN(), domain1uid);
- broker.publish(delMsg);
+ // delete domain1
+ broker.publish(new DeleteMsg(domain1dn, gen.newCSN(), domain1uid));
// check that the domain1 has correctly been deleted
assertNull(getEntry(domain1dn, 10000, false),
@@ -1131,11 +1117,11 @@
// Cleanup from previous run
cleanupTest();
+ final int serverId = 27;
ReplicationBroker broker =
- openReplicationSession(baseDN, 27, 100, replServerPort, 2000, true);
-
+ openReplicationSession(baseDN, serverId, 100, replServerPort, 2000, true);
try {
- CSNGenerator gen = new CSNGenerator( 27, 0);
+ CSNGenerator gen = new CSNGenerator(serverId, 0);
/*
* Test that operations done on this server are sent to the
@@ -1303,12 +1289,12 @@
logError(Message.raw(Category.SYNC, Severity.INFORMATION,
"Starting replication test : infiniteReplayLoop"));
- Thread.sleep(2000);
+ int serverId = 11;
ReplicationBroker broker =
- openReplicationSession(baseDN, 11, 100, replServerPort, 1000, true);
+ openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
try
{
- CSNGenerator gen = new CSNGenerator( 11, 0);
+ CSNGenerator gen = new CSNGenerator(serverId, 0);
// Create a test entry.
Entry tmp = TestCaseUtils.entryFromLdifString(
@@ -1400,7 +1386,6 @@
public void csnGeneratorAdjust() throws Exception
{
testSetUp("csnGeneratorAdjust");
- int serverId = 88;
logError(Message.raw(Category.SYNC, Severity.INFORMATION,
"Starting synchronization test : CSNGeneratorAdjust"));
@@ -1408,16 +1393,13 @@
* Open a session to the replicationServer using the broker API.
* This must use a different serverId to that of the directory server.
*/
+ final int serverId = 88;
ReplicationBroker broker =
- openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
-
+ openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
+ consumeAllMessages(broker); // clean leftover messages from lostHeartbeatFailover()
try
{
- /*
- * Create a CSN generator to generate new CSNs
- * when we need to send operation messages to the replicationServer.
- */
- long inTheFuture = System.currentTimeMillis() + (3600 * 1000);
+ final long inTheFuture = System.currentTimeMillis() + (3600 * 1000);
CSNGenerator gen = new CSNGenerator(serverId, inTheFuture);
// Create and publish an update message to add an entry.
@@ -1453,4 +1435,31 @@
broker.stop();
}
}
+
+ /**
+ * Consumes all the messages sent to this broker. This is useful at the start
+ * of a test to avoid leftover messages from previous test runs.
+ */
+ private void consumeAllMessages(ReplicationBroker broker)
+ {
+ final List<ReplicationMsg> msgs = new ArrayList<ReplicationMsg>();
+ try
+ {
+ while (true)
+ {
+ msgs.add(broker.receive());
+ }
+ }
+ catch (SocketTimeoutException expectedAtSomeStage)
+ {
+ // this is expected to happen when there will not be any more messages to
+ // consume from the socket
+ }
+
+ if (!msgs.isEmpty())
+ {
+ logError(Message.raw(Category.SYNC, Severity.SEVERE_ERROR,
+ "Leftover messages from previous test runs " + msgs));
+ }
+ }
}
--
Gitblit v1.10.0