From eeb2e4f0a3ddfdf3ae9ab8fe3be4f824566f0e8d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 23 Dec 2013 15:45:05 +0000
Subject: [PATCH] Fix feature envy between ListenerThread and ReplicationDomain
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 114 +++++++++++++++++++++++++++++++++++++-------------------
1 files changed, 75 insertions(+), 39 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index ae0a284..b01feaa 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -34,7 +34,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
@@ -79,7 +78,7 @@
* and which can start receiving updates.
* <p>
* When updates are received the Replication Service calls the
- * {@link #processUpdate(UpdateMsg, AtomicBoolean)} method.
+ * {@link #processUpdate(UpdateMsg)} method.
* ReplicationDomain implementation should implement the appropriate code
* for replaying the update on the local repository.
* When fully done the subclass must call the
@@ -156,7 +155,7 @@
* them to the global incoming update message queue for later processing by
* replay threads.
*/
- private ListenerThread listenerThread;
+ private volatile DirectoryThread listenerThread = null;
/**
* A Map used to store all the ReplicationDomains created on this server.
@@ -740,7 +739,7 @@
* Also responsible for updating the list of pending changes
* @return the received message - null if none
*/
- UpdateMsg receive()
+ private UpdateMsg receive()
{
UpdateMsg update = null;
@@ -2715,25 +2714,6 @@
}
/**
- * This method is called when the ReplicationDomain has completed the
- * processing of a received update synchronously.
- * In such cases the processUpdateDone () is called and the state
- * is updated automatically.
- *
- * @param msg The UpdateMessage that was processed.
- */
- void processUpdateDoneSynchronous(UpdateMsg msg)
- {
- /*
- Warning: in synchronous mode, no way to tell the replay of an update went
- wrong Just put null in processUpdateDone so that if assured replication
- is used the ack is sent without error at replay flag.
- */
- processUpdateDone(msg, null);
- state.update(msg.getCSN());
- }
-
- /**
* Check if the domain is connected to a ReplicationServer.
*
* @return true if the server is connected, false if not.
@@ -3000,7 +2980,7 @@
* Starts the receiver side of the Replication Service.
* <p>
* After this method has been called, the Replication Service will start
- * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
+ * calling the {@link #processUpdate(UpdateMsg)}.
* <p>
* This method must be called once and must be called after the
* {@link #startPublishService(ReplicationDomainCfg)}.
@@ -3009,8 +2989,48 @@
{
synchronized (sessionLock)
{
- // Create the listener thread
- listenerThread = new ListenerThread(this);
+ final String threadName = "Replica DS(" + getServerId()
+ + ") listener for domain \"" + getBaseDNString() + "\"";
+
+ listenerThread = new DirectoryThread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Replication Listener thread starting.");
+ }
+
+ // Loop processing any incoming update messages.
+ while (!listenerThread.isShutdownInitiated())
+ {
+ final UpdateMsg updateMsg = receive();
+ if (updateMsg == null)
+ {
+ // The server is shutting down.
+ listenerThread.initiateShutdown();
+ }
+ else if (processUpdate(updateMsg))
+ {
+ /*
+ * Warning: in synchronous mode, no way to tell the replay of an
+ * update went wrong Just put null in processUpdateDone so that if
+ * assured replication is used the ack is sent without error at
+ * replay flag.
+ */
+ processUpdateDone(updateMsg, null);
+ state.update(updateMsg.getCSN());
+ }
+ }
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Replication Listener thread stopping.");
+ }
+ }
+ }, threadName);
+
listenerThread.start();
}
}
@@ -3041,14 +3061,34 @@
// Stop the listener thread
if (listenerThread != null)
{
- listenerThread.shutdown();
- listenerThread.waitForShutdown();
+ listenerThread.initiateShutdown();
+ try
+ {
+ listenerThread.join();
+ }
+ catch (InterruptedException e)
+ {
+ // Give up waiting.
+ }
listenerThread = null;
}
}
}
/**
+ * Returns {@code true} if the listener thread is shutting down or has
+ * shutdown.
+ *
+ * @return {@code true} if the listener thread is shutting down or has
+ * shutdown.
+ */
+ protected final boolean isListenerShuttingDown()
+ {
+ final DirectoryThread tmp = listenerThread;
+ return tmp == null || tmp.isShutdownInitiated();
+ }
+
+ /**
* Restart the Replication service after a {@link #disableService()}.
* <p>
* The Replication Service will restart from the point indicated by the
@@ -3065,10 +3105,7 @@
synchronized (sessionLock)
{
broker.start();
-
- // Create the listener thread
- listenerThread = new ListenerThread(this);
- listenerThread.start();
+ startListenService();
}
}
@@ -3156,6 +3193,8 @@
*/
public abstract long countEntries() throws DirectoryException;
+
+
/**
* This method should handle the processing of {@link UpdateMsg} receive from
* remote replication entities.
@@ -3165,20 +3204,17 @@
*
* @param updateMsg
* The {@link UpdateMsg} that was received.
- * @param shutdown
- * whether the server initiated shutdown
* @return A boolean indicating if the processing is completed at return time.
* If <code> true </code> is returned, no further processing is
* necessary. If <code> false </code> is returned, the subclass should
* call the method {@link #processUpdateDone(UpdateMsg, String)} and
* update the ServerState When this processing is complete.
*/
- public abstract boolean processUpdate(UpdateMsg updateMsg,
- AtomicBoolean shutdown);
+ public abstract boolean processUpdate(UpdateMsg updateMsg);
/**
* This method must be called after each call to
- * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
+ * {@link #processUpdate(UpdateMsg)} when the processing of the
* update is completed.
* <p>
* It is useful for implementation needing to process the update in an
@@ -3192,7 +3228,7 @@
* this update, and this is the matching human readable message
* describing the problem.
*/
- public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
+ protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
{
broker.updateWindowAfterReplay();
@@ -3401,7 +3437,7 @@
* The Replication Service will handle the delivery of this {@link UpdateMsg}
* to all the participants of this Replication Domain. These members will be
* receive this {@link UpdateMsg} through a call of the
- * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message.
+ * {@link #processUpdate(UpdateMsg)} message.
*
* @param msg The UpdateMsg that should be pushed.
*/
--
Gitblit v1.10.0