From 90856376b31a5d0071a17b86c2f0c92950873612 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 04 Nov 2013 11:06:40 +0000
Subject: [PATCH] OPENDJ-1190 (CR-2523) Under rare circumstances the DS replication recovery thread (RSUpdater) can spin
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 85 ++++++++++++++++++++++++++++++++++--------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java | 34 +++++++----------
2 files changed, 82 insertions(+), 37 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 821f7b8..57535f3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -34,6 +34,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException;
import org.opends.messages.Category;
@@ -199,6 +200,8 @@
* is not updated too early.
*/
private final PendingChanges pendingChanges;
+ private final AtomicReference<RSUpdater> rsUpdater =
+ new AtomicReference<RSUpdater>(null);
/**
* It contain the updates that were done on other servers, transmitted
@@ -335,7 +338,7 @@
* The thread that periodically saves the ServerState of this
* LDAPReplicationDomain in the database.
*/
- private class ServerStateFlush extends DirectoryThread
+ private class ServerStateFlush extends DirectoryThread
{
protected ServerStateFlush()
{
@@ -351,7 +354,7 @@
{
done = false;
- while (!shutdown)
+ while (!isShutdownInitiated())
{
try
{
@@ -368,6 +371,7 @@
catch (InterruptedException e)
{
// Thread interrupted: check for shutdown.
+ Thread.currentThread().interrupt();
}
}
state.save();
@@ -383,6 +387,11 @@
private class RSUpdater extends DirectoryThread
{
private final CSN startCSN;
+ /**
+ * Used to communicate that the current thread computation needs to
+ * shutdown.
+ */
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
protected RSUpdater(CSN replServerMaxCSN)
{
@@ -400,8 +409,7 @@
{
// Replication server is missing some of our changes: let's
// send them to him.
- Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
- logError(message);
+ logError(DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get());
/*
* Get all the changes that have not been seen by this
@@ -409,10 +417,9 @@
*/
try
{
- if (buildAndPublishMissingChanges(startCSN, broker))
+ if (buildAndPublishMissingChanges(startCSN, broker, shutdown))
{
- message = DEBUG_CHANGES_SENT.get();
- logError(message);
+ logError(DEBUG_CHANGES_SENT.get());
synchronized(replayOperations)
{
replayOperations.clear();
@@ -427,8 +434,7 @@
* Log an error for the repair tool
* that will need to re-synchronize the servers.
*/
- message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
- logError(message);
+ logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
}
} catch (Exception e)
{
@@ -439,14 +445,24 @@
* Log an error for the repair tool
* that will need to re-synchronize the servers.
*/
- message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
- logError(message);
+ logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
}
finally
{
broker.setRecoveryRequired(false);
+ // RSUpdater thread has finished its work, let's remove it from memory
+ // so another RSUpdater thread can be started if needed.
+ rsUpdater.compareAndSet(this, null);
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public void initiateShutdown()
+ {
+ this.shutdown.set(true);
+ super.initiateShutdown();
+ }
}
@@ -2372,10 +2388,16 @@
if (!shutdown)
{
shutdown = true;
+ final RSUpdater rsUpdater = this.rsUpdater.get();
+ if (rsUpdater != null)
+ {
+ rsUpdater.initiateShutdown();
+ }
// stop the thread in charge of flushing the ServerState.
if (flushThread != null)
{
+ flushThread.initiateShutdown();
synchronized (flushThread)
{
flushThread.notify();
@@ -4356,7 +4378,11 @@
{
pendingChanges.setRecovering(true);
broker.setRecoveryRequired(true);
- new RSUpdater(replServerMaxCSN).start();
+ final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
+ if (this.rsUpdater.compareAndSet(null, rsUpdater))
+ {
+ rsUpdater.start();
+ }
}
}
} catch (Exception e)
@@ -4375,12 +4401,14 @@
* The CSN where we need to start the search
* @param session
* The session to use to publish the changes
+ * @param shutdown
+ * whether the current run must be stopped
* @return A boolean indicating he success of the operation.
* @throws Exception
* if an Exception happens during the search.
*/
public boolean buildAndPublishMissingChanges(CSN startCSN,
- ReplicationBroker session) throws Exception
+ ReplicationBroker session, AtomicBoolean shutdown) throws Exception
{
// Trim the changes in replayOperations that are older than the startCSN.
synchronized (replayOperations)
@@ -4388,6 +4416,10 @@
Iterator<CSN> it = replayOperations.keySet().iterator();
while (it.hasNext())
{
+ if (shutdown.get())
+ {
+ return false;
+ }
if (it.next().isNewerThan(startCSN))
{
break;
@@ -4401,6 +4433,11 @@
CSN currentStartCSN = startCSN;
do
{
+ if (shutdown.get())
+ {
+ return false;
+ }
+
lastRetrievedChange = null;
// We can't do the search in one go because we need to store the results
// so that we are sure we send the operations in order and because the
@@ -4417,15 +4454,21 @@
// Publish and remove all the changes from the replayOperations list
// that are older than the endCSN.
- List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
+ final List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
synchronized (replayOperations)
{
Iterator<FakeOperation> itOp = replayOperations.values().iterator();
while (itOp.hasNext())
{
+ if (shutdown.get())
+ {
+ return false;
+ }
FakeOperation fakeOp = itOp.next();
if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
- || !state.cover(fakeOp.getCSN()))
+ || !state.cover(fakeOp.getCSN())
+ // do not look for replay operations in the future
+ || endCSN.isNewerThan(now()))
{
break;
}
@@ -4438,9 +4481,13 @@
for (FakeOperation opToSend : opsToSend)
{
+ if (shutdown.get())
+ {
+ return false;
+ }
session.publishRecovery(opToSend.generateMessage());
}
- opsToSend.clear();
+
if (lastRetrievedChange != null)
{
currentStartCSN = lastRetrievedChange;
@@ -4449,13 +4496,16 @@
{
currentStartCSN = endCSN;
}
-
} while (pendingChanges.recoveryUntil(lastRetrievedChange)
&& op.getResultCode().equals(ResultCode.SUCCESS));
return op.getResultCode().equals(ResultCode.SUCCESS);
}
+ private static CSN now()
+ {
+ return new CSN(TimeThread.getTime(), 0, 0);
+ }
/**
* Search for the changes that happened since fromCSN based on the historical
@@ -4589,6 +4639,7 @@
catch (InterruptedException e)
{
// Thread interrupted: check for shutdown.
+ Thread.currentThread().interrupt();
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index acdf3c2..a3e55ad 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.opends.messages.Category;
@@ -187,10 +188,8 @@
LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
TestBroker session = new TestBroker(opList);
- boolean result =
- rd1.buildAndPublishMissingChanges(
- new CSN(startTime, 0, serverId),
- session);
+ CSN csn = new CSN(startTime, 0, serverId);
+ boolean result = rd1.buildAndPublishMissingChanges(csn, session, new AtomicBoolean());
assertTrue(result, "buildAndPublishMissingChanges has failed");
assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations");
assertTrue(opList.getFirst().getClass().equals(AddMsg.class));
@@ -204,7 +203,7 @@
opList = new LinkedList<ReplicationMsg>();
session = new TestBroker(opList);
- result = rd1.buildAndPublishMissingChanges(fromCSN, session);
+ result = rd1.buildAndPublishMissingChanges(fromCSN, session, new AtomicBoolean());
assertTrue(result, "buildAndPublishMissingChanges has failed");
assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation");
assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class));
@@ -278,23 +277,21 @@
"dn: cn=test2," + baseDN,
"changetype: modify",
"add: description",
- "description: foo");
+ "description: foo");
resultCode = TestCaseUtils.applyModifications(false,
"dn: cn=test1," + baseDN,
"changetype: modify",
"add: description",
- "description: foo");
+ "description: foo");
assertEquals(resultCode, 0);
LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
TestBroker session = new TestBroker(opList);
- // Call the buildAndPublishMissingChanges and check that this method
- // correctly generates the 4 operations in the correct order.
- boolean result =
- rd1.buildAndPublishMissingChanges(
- new CSN(startTime, 0, serverId),
- session);
+ // Call the buildAndPublishMissingChanges and check that this method
+ // correctly generates the 4 operations in the correct order.
+ CSN csn = new CSN(startTime, 0, serverId);
+ boolean result = rd1.buildAndPublishMissingChanges(csn, session, new AtomicBoolean());
assertTrue(result, "buildAndPublishMissingChanges has failed");
assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations");
ReplicationMsg msg = opList.removeFirst();
@@ -336,14 +333,11 @@
private LDAPReplicationDomain createReplicationDomain(int dsId)
throws DirectoryException, ConfigException
{
- DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
- DomainFakeCfg domainConf =
- new DomainFakeCfg(baseDN, dsId, replServers, AssuredType.NOT_ASSURED,
- 2, 1, 0, null);
- LDAPReplicationDomain replicationDomain =
- MultimasterReplication.createNewDomain(domainConf);
+ final DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
+ final DomainFakeCfg domainConf = new DomainFakeCfg(
+ baseDN, dsId, replServers, AssuredType.NOT_ASSURED, 2, 1, 0, null);
+ LDAPReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf);
replicationDomain.start();
-
return replicationDomain;
}
}
--
Gitblit v1.10.0