From 3253906b33605684c3e071a6e5c3af0f20c9e375 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 04 Nov 2013 11:06:40 +0000
Subject: [PATCH] OPENDJ-1190 (CR-2523) Under rare circumstances the DS replication recovery thread (RSUpdater) can spin
---
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 85 ++++++++++++++++++++++++++++++++++--------
1 files changed, 68 insertions(+), 17 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 821f7b8..57535f3 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -34,6 +34,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException;
import org.opends.messages.Category;
@@ -199,6 +200,8 @@
* is not updated too early.
*/
private final PendingChanges pendingChanges;
+ private final AtomicReference<RSUpdater> rsUpdater =
+ new AtomicReference<RSUpdater>(null);
/**
* It contain the updates that were done on other servers, transmitted
@@ -335,7 +338,7 @@
* The thread that periodically saves the ServerState of this
* LDAPReplicationDomain in the database.
*/
- private class ServerStateFlush extends DirectoryThread
+ private class ServerStateFlush extends DirectoryThread
{
protected ServerStateFlush()
{
@@ -351,7 +354,7 @@
{
done = false;
- while (!shutdown)
+ while (!isShutdownInitiated())
{
try
{
@@ -368,6 +371,7 @@
catch (InterruptedException e)
{
// Thread interrupted: check for shutdown.
+ Thread.currentThread().interrupt();
}
}
state.save();
@@ -383,6 +387,11 @@
private class RSUpdater extends DirectoryThread
{
private final CSN startCSN;
+ /**
+ * Used to communicate that the current thread computation needs to
+ * shutdown.
+ */
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
protected RSUpdater(CSN replServerMaxCSN)
{
@@ -400,8 +409,7 @@
{
// Replication server is missing some of our changes: let's
// send them to him.
- Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
- logError(message);
+ logError(DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get());
/*
* Get all the changes that have not been seen by this
@@ -409,10 +417,9 @@
*/
try
{
- if (buildAndPublishMissingChanges(startCSN, broker))
+ if (buildAndPublishMissingChanges(startCSN, broker, shutdown))
{
- message = DEBUG_CHANGES_SENT.get();
- logError(message);
+ logError(DEBUG_CHANGES_SENT.get());
synchronized(replayOperations)
{
replayOperations.clear();
@@ -427,8 +434,7 @@
* Log an error for the repair tool
* that will need to re-synchronize the servers.
*/
- message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
- logError(message);
+ logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
}
} catch (Exception e)
{
@@ -439,14 +445,24 @@
* Log an error for the repair tool
* that will need to re-synchronize the servers.
*/
- message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
- logError(message);
+ logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
}
finally
{
broker.setRecoveryRequired(false);
+ // RSUpdater thread has finished its work, let's remove it from memory
+ // so another RSUpdater thread can be started if needed.
+ rsUpdater.compareAndSet(this, null);
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public void initiateShutdown()
+ {
+ this.shutdown.set(true);
+ super.initiateShutdown();
+ }
}
@@ -2372,10 +2388,16 @@
if (!shutdown)
{
shutdown = true;
+ final RSUpdater rsUpdater = this.rsUpdater.get();
+ if (rsUpdater != null)
+ {
+ rsUpdater.initiateShutdown();
+ }
// stop the thread in charge of flushing the ServerState.
if (flushThread != null)
{
+ flushThread.initiateShutdown();
synchronized (flushThread)
{
flushThread.notify();
@@ -4356,7 +4378,11 @@
{
pendingChanges.setRecovering(true);
broker.setRecoveryRequired(true);
- new RSUpdater(replServerMaxCSN).start();
+ final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
+ if (this.rsUpdater.compareAndSet(null, rsUpdater))
+ {
+ rsUpdater.start();
+ }
}
}
} catch (Exception e)
@@ -4375,12 +4401,14 @@
* The CSN where we need to start the search
* @param session
* The session to use to publish the changes
+ * @param shutdown
+ * whether the current run must be stopped
* @return A boolean indicating he success of the operation.
* @throws Exception
* if an Exception happens during the search.
*/
public boolean buildAndPublishMissingChanges(CSN startCSN,
- ReplicationBroker session) throws Exception
+ ReplicationBroker session, AtomicBoolean shutdown) throws Exception
{
// Trim the changes in replayOperations that are older than the startCSN.
synchronized (replayOperations)
@@ -4388,6 +4416,10 @@
Iterator<CSN> it = replayOperations.keySet().iterator();
while (it.hasNext())
{
+ if (shutdown.get())
+ {
+ return false;
+ }
if (it.next().isNewerThan(startCSN))
{
break;
@@ -4401,6 +4433,11 @@
CSN currentStartCSN = startCSN;
do
{
+ if (shutdown.get())
+ {
+ return false;
+ }
+
lastRetrievedChange = null;
// We can't do the search in one go because we need to store the results
// so that we are sure we send the operations in order and because the
@@ -4417,15 +4454,21 @@
// Publish and remove all the changes from the replayOperations list
// that are older than the endCSN.
- List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
+ final List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
synchronized (replayOperations)
{
Iterator<FakeOperation> itOp = replayOperations.values().iterator();
while (itOp.hasNext())
{
+ if (shutdown.get())
+ {
+ return false;
+ }
FakeOperation fakeOp = itOp.next();
if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
- || !state.cover(fakeOp.getCSN()))
+ || !state.cover(fakeOp.getCSN())
+ // do not look for replay operations in the future
+ || endCSN.isNewerThan(now()))
{
break;
}
@@ -4438,9 +4481,13 @@
for (FakeOperation opToSend : opsToSend)
{
+ if (shutdown.get())
+ {
+ return false;
+ }
session.publishRecovery(opToSend.generateMessage());
}
- opsToSend.clear();
+
if (lastRetrievedChange != null)
{
currentStartCSN = lastRetrievedChange;
@@ -4449,13 +4496,16 @@
{
currentStartCSN = endCSN;
}
-
} while (pendingChanges.recoveryUntil(lastRetrievedChange)
&& op.getResultCode().equals(ResultCode.SUCCESS));
return op.getResultCode().equals(ResultCode.SUCCESS);
}
+ private static CSN now()
+ {
+ return new CSN(TimeThread.getTime(), 0, 0);
+ }
/**
* Search for the changes that happened since fromCSN based on the historical
@@ -4589,6 +4639,7 @@
catch (InterruptedException e)
{
// Thread interrupted: check for shutdown.
+ Thread.currentThread().interrupt();
}
}
--
Gitblit v1.10.0