From b875ab3f7b327f797ec4532015e45da6ae3fff56 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 08 Apr 2014 09:09:25 +0000
Subject: [PATCH] Backport fix for OPENDJ-1354: replication threads BLOCKED in pendingChanges queue
---
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java | 237 +++++++++++++++++++++++-----------------------------------
1 files changed, 94 insertions(+), 143 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 34185c6..721ec23 100644
--- a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -22,18 +22,19 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
+
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.types.DebugLogLevel;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.common.ServerStatus.*;
-import static org.opends.server.replication.common.StatusMachineEvent.*;
+
/**
* This thread is in charge of periodically determining if the connected
@@ -44,46 +45,45 @@
* the threshold is uncrossed, the status analyzer must make the DS status
* change back to NORMAL_STATUS. To have meaning of status, please refer to
* ServerStatus class.
+ * <p>
+ * In addition, this thread is responsible for publishing any pending status
+ * messages.
*/
-public class StatusAnalyzer extends DirectoryThread
+class StatusAnalyzer extends DirectoryThread
{
-
- private volatile boolean shutdown = false;
-
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private final ReplicationServerDomain replicationServerDomain;
- private volatile int degradedStatusThreshold = -1;
-
/** Sleep time for the thread, in ms. */
private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
- private volatile boolean done = false;
+ private final ReplicationServerDomain replicationServerDomain;
+ private final Object eventMonitor = new Object();
+ private boolean pendingStatusMessage = false;
+ private long nextCheckDSDegradedStatusTime;
- private final Object shutdownLock = new Object();
+
/**
* Create a StatusAnalyzer.
- * @param replicationServerDomain The ReplicationServerDomain the status
- * analyzer is for.
- * @param degradedStatusThreshold The pending changes threshold value to be
- * used for putting a DS in DEGRADED_STATUS.
+ *
+ * @param replicationServerDomain
+ * The ReplicationServerDomain the status analyzer is for.
*/
- public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
- int degradedStatusThreshold)
+ StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
{
super("Replication server RS("
+ replicationServerDomain.getLocalRSServerId()
- + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
+ + ") status monitor for domain \""
+ + replicationServerDomain.getBaseDN()
+ "\"");
-
this.replicationServerDomain = replicationServerDomain;
- this.degradedStatusThreshold = degradedStatusThreshold;
}
+
+
/**
* Analyzes if servers are late or not, and change their status accordingly.
*/
@@ -96,79 +96,55 @@
getMessage("Directory server status analyzer starting."));
}
- while (!shutdown)
+ try
{
- synchronized (shutdownLock)
+ while (true)
{
- if (!shutdown)
+ final boolean requestStatusBroadcastWasRequested;
+ synchronized (eventMonitor)
{
- try
+ if (!isShutdownInitiated() && !pendingStatusMessage)
{
- shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
+ eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
}
- catch (InterruptedException e)
- {
- // Server shutdown monitor may interrupt slow threads.
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- shutdown = true;
- break;
- }
- }
- }
-
- // Go through each connected DS, get the number of pending changes we have
- // for it and change status accordingly if threshold value is
- // crossed/uncrossed
- for (DataServerHandler serverHandler :
- replicationServerDomain.getConnectedDSs().values())
- {
- // Get number of pending changes for this server
- int nChanges = serverHandler.getRcvMsgQueueSize();
- if (debugEnabled())
- {
- TRACER.debugInfo(getMessage("Status analyzer: DS "
- + serverHandler.getServerId() + " has " + nChanges
- + " message(s) in writer queue."));
+ requestStatusBroadcastWasRequested = pendingStatusMessage;
+ pendingStatusMessage = false;
}
- // Check status to know if it is relevant to change the status. Do not
- // take RSD lock to test. If we attempt to change the status whereas
- // the current status does allow it, this will be noticed by
- // the changeStatusFromStatusAnalyzer() method. This allows to take the
- // lock roughly only when needed versus every sleep time timeout.
- if (degradedStatusThreshold > 0)
- // Threshold value = 0 means no status analyzer (no degrading system)
- // we should not have that as the status analyzer thread should not be
- // created if this is the case, but for sanity purpose, we add this
- // test
+ if (isShutdownInitiated())
{
- if (nChanges >= degradedStatusThreshold)
- {
- if (serverHandler.getStatus() == NORMAL_STATUS
- && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
- {
- break;
- }
- }
- else
- {
- if (serverHandler.getStatus() == DEGRADED_STATUS
- && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
- {
- break;
- }
- }
+ break;
+ }
+
+ // Broadcast heartbeats, topology messages, etc if requested.
+ if (requestStatusBroadcastWasRequested)
+ {
+ replicationServerDomain.sendPendingStatusMessages();
+ }
+
+ /*
+ * Check the degraded status for connected DS instances only if
+ * sufficient time has passed. The current time is not cached because
+ * the call to checkDSDegradedStatus may take some time.
+ */
+ if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
+ {
+ replicationServerDomain.checkDSDegradedStatus();
+ nextCheckDSDegradedStatusTime = System.currentTimeMillis()
+ + STATUS_ANALYZER_SLEEP_TIME;
}
}
}
+ catch (InterruptedException e)
+ {
+ // Forcefully stopped.
+ }
- done = true;
TRACER.debugInfo(getMessage("Status analyzer is terminated."));
}
+
+
private String getMessage(String message)
{
return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -176,75 +152,50 @@
+ message;
}
- private boolean isInterrupted(DataServerHandler serverHandler,
- StatusMachineEvent event)
- {
- if (replicationServerDomain.changeStatus(serverHandler, event))
- {
- // Finish job and let thread die
- TRACER.debugInfo(
- getMessage("Status analyzer has been interrupted and will die."));
- return true;
- }
- return false;
- }
+
/**
* Stops the thread.
*/
- public void shutdown()
+ void shutdown()
{
- synchronized (shutdownLock)
- {
- shutdown = true;
- shutdownLock.notifyAll();
-
- if (debugEnabled())
- {
- TRACER.debugInfo(getMessage("Shutting down status analyzer."));
- }
- }
- }
-
- /**
- * Waits for analyzer death. If not finished within 2 seconds,
- * forces interruption
- */
- public void waitForShutdown()
- {
- try
- {
- int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
- int n = 0;
- while (!done && this.isAlive())
- {
- Thread.sleep(50);
- n++;
- if (n >= FACTOR)
- {
- TRACER.debugInfo(getMessage("Interrupting status analyzer."));
- interrupt();
- }
- }
- } catch (InterruptedException e)
- {
- // exit the loop if this thread is interrupted.
- }
- }
-
- /**
- * Sets the threshold value.
- * @param degradedStatusThreshold The new threshold value.
- */
- public void setDegradedStatusThreshold(int degradedStatusThreshold)
- {
+ initiateShutdown();
if (debugEnabled())
{
- TRACER.debugInfo(getMessage(
- "Directory server status analyzer changing threshold value to "
- + degradedStatusThreshold));
+ TRACER.debugInfo(getMessage("Shutting down status analyzer."));
}
+ synchronized (eventMonitor)
+ {
+ eventMonitor.notifyAll();
+ }
+ try
+ {
+ join(2000);
+ }
+ catch (InterruptedException e)
+ {
+ // Trapped: forcefully stop the thread.
+ }
+ if (isAlive())
+ {
+ // The join timed out or was interrupted so attempt to forcefully stop the
+ // analyzer.
+ interrupt();
+ }
+ }
- this.degradedStatusThreshold = degradedStatusThreshold;
+
+
+ /**
+ * Requests that a topology state related message be broadcast to the rest of
+ * the topology. Messages include DS heartbeats, topology information, etc.
+ */
+ void notifyPendingStatusMessage()
+ {
+ synchronized (eventMonitor)
+ {
+ pendingStatusMessage = true;
+ eventMonitor.notifyAll();
+ }
}
}
--
Gitblit v1.10.0