From 21af6610b07617ecbf1b826310a2f244deb4d348 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 25 Mar 2014 15:02:51 +0000
Subject: [PATCH] Fix OPENDJ-1354 - replication threads BLOCKED in pendingChanges queue
---
opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java | 129 ++++++++++++++++++++++++------------------
1 files changed, 73 insertions(+), 56 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 9fc7120..2bcf574 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -42,8 +42,11 @@
* the threshold is uncrossed, the status analyzer must make the DS status
* change back to NORMAL_STATUS. To have meaning of status, please refer to
* ServerStatus class.
+ * <p>
+ * In addition, this thread is responsible for publishing any pending status
+ * messages.
*/
-public class StatusAnalyzer extends DirectoryThread
+class StatusAnalyzer extends DirectoryThread
{
private static final LocalizedLogger logger = LocalizedLogger
.getLoggerForThisClass();
@@ -52,9 +55,9 @@
private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
private final ReplicationServerDomain replicationServerDomain;
- private final Object shutdownLock = new Object();
- private volatile boolean shutdown = false;
- private volatile boolean done = false;
+ private final Object eventMonitor = new Object();
+ private boolean pendingStatusMessage = false;
+ private long nextCheckDSDegradedStatusTime;
@@ -64,13 +67,13 @@
* @param replicationServerDomain
* The ReplicationServerDomain the status analyzer is for.
*/
- public StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
+ StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
{
super("Replication server RS("
+ replicationServerDomain.getLocalRSServerId()
- + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
+ + ") status monitor for domain \""
+ + replicationServerDomain.getBaseDN()
+ "\"");
-
this.replicationServerDomain = replicationServerDomain;
}
@@ -87,35 +90,50 @@
logger.trace(getMessage("Directory server status analyzer starting."));
}
- while (!shutdown)
+ try
{
- synchronized (shutdownLock)
+ while (true)
{
- if (!shutdown)
+ final boolean requestStatusBroadcastWasRequested;
+ synchronized (eventMonitor)
{
- try
+ if (!isShutdownInitiated() && !pendingStatusMessage)
{
- shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
+ eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
}
- catch (InterruptedException e)
- {
- // Server shutdown monitor may interrupt slow threads.
- logger.traceException(e);
- shutdown = true;
- break;
- }
+ requestStatusBroadcastWasRequested = pendingStatusMessage;
+ pendingStatusMessage = false;
+ }
+
+ if (isShutdownInitiated())
+ {
+ break;
+ }
+
+ // Broadcast heartbeats, topology messages, etc if requested.
+ if (requestStatusBroadcastWasRequested)
+ {
+ replicationServerDomain.sendPendingStatusMessages();
+ }
+
+ /*
+ * Check the degraded status for connected DS instances only if
+ * sufficient time has passed. The current time is not cached because
+ * the call to checkDSDegradedStatus may take some time.
+ */
+ if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
+ {
+ replicationServerDomain.checkDSDegradedStatus();
+ nextCheckDSDegradedStatusTime = System.currentTimeMillis()
+ + STATUS_ANALYZER_SLEEP_TIME;
}
}
-
- if (shutdown)
- {
- break;
- }
-
- replicationServerDomain.checkDSDegradedStatus();
+ }
+ catch (InterruptedException e)
+ {
+ // Forcefully stopped.
}
- done = true;
logger.trace(getMessage("Status analyzer is terminated."));
}
@@ -133,46 +151,45 @@
/**
* Stops the thread.
*/
- public void shutdown()
+ void shutdown()
{
- synchronized (shutdownLock)
+ initiateShutdown();
+ if (logger.isTraceEnabled())
{
- shutdown = true;
- shutdownLock.notifyAll();
-
- if (logger.isTraceEnabled())
- {
- logger.trace(getMessage("Shutting down status analyzer."));
- }
+ logger.trace(getMessage("Shutting down status analyzer."));
+ }
+ synchronized (eventMonitor)
+ {
+ eventMonitor.notifyAll();
+ }
+ try
+ {
+ join(2000);
+ }
+ catch (InterruptedException e)
+ {
+ // Trapped: forcefully stop the thread.
+ }
+ if (isAlive())
+ {
+ // The join timed out or was interrupted so attempt to forcefully stop the
+ // analyzer.
+ interrupt();
}
}
/**
- * Waits for analyzer death. If not finished within 2 seconds, forces
- * interruption
+ * Requests that a topology state related message be broadcast to the rest of
+ * the topology. Messages include DS heartbeats, topology information, etc.
*/
- public void waitForShutdown()
+ void notifyPendingStatusMessage()
{
- try
+ synchronized (eventMonitor)
{
- int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
- int n = 0;
- while (!done && this.isAlive())
- {
- Thread.sleep(50);
- n++;
- if (n >= FACTOR)
- {
- logger.trace(getMessage("Interrupting status analyzer."));
- interrupt();
- }
- }
- }
- catch (InterruptedException e)
- {
- // exit the loop if this thread is interrupted.
+ pendingStatusMessage = true;
+ eventMonitor.notifyAll();
}
}
}
--
Gitblit v1.10.0