From 36fc2f63761b54cbfc48cf30d6f4b2b5f7fc90ba Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 20 Mar 2014 18:23:26 +0000
Subject: [PATCH] OPENDJ-1354: replication threads BLOCKED in pendingChanges queue
---
opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java | 125 ++++++++++-------------------------------
1 files changed, 31 insertions(+), 94 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index d236211..9fc7120 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -26,12 +26,12 @@
*/
package org.opends.server.replication.server;
-import org.opends.server.api.DirectoryThread;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.opends.server.replication.common.StatusMachineEvent;
-import static org.opends.server.replication.common.ServerStatus.*;
-import static org.opends.server.replication.common.StatusMachineEvent.*;
+
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.api.DirectoryThread;
+
+
/**
* This thread is in charge of periodically determining if the connected
@@ -45,29 +45,26 @@
*/
public class StatusAnalyzer extends DirectoryThread
{
-
- private volatile boolean shutdown = false;
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private final ReplicationServerDomain replicationServerDomain;
- private volatile int degradedStatusThreshold = -1;
+ private static final LocalizedLogger logger = LocalizedLogger
+ .getLoggerForThisClass();
/** Sleep time for the thread, in ms. */
private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
+ private final ReplicationServerDomain replicationServerDomain;
+ private final Object shutdownLock = new Object();
+ private volatile boolean shutdown = false;
private volatile boolean done = false;
- private final Object shutdownLock = new Object();
+
/**
* Create a StatusAnalyzer.
- * @param replicationServerDomain The ReplicationServerDomain the status
- * analyzer is for.
- * @param degradedStatusThreshold The pending changes threshold value to be
- * used for putting a DS in DEGRADED_STATUS.
+ *
+ * @param replicationServerDomain
+ * The ReplicationServerDomain the status analyzer is for.
*/
- public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
- int degradedStatusThreshold)
+ public StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
{
super("Replication server RS("
+ replicationServerDomain.getLocalRSServerId()
@@ -75,9 +72,10 @@
+ "\"");
this.replicationServerDomain = replicationServerDomain;
- this.degradedStatusThreshold = degradedStatusThreshold;
}
+
+
/**
* Analyzes if servers are late or not, and change their status accordingly.
*/
@@ -86,8 +84,7 @@
{
if (logger.isTraceEnabled())
{
- logger.trace(
- getMessage("Directory server status analyzer starting."));
+ logger.trace(getMessage("Directory server status analyzer starting."));
}
while (!shutdown)
@@ -110,56 +107,20 @@
}
}
- // Go through each connected DS, get the number of pending changes we have
- // for it and change status accordingly if threshold value is
- // crossed/uncrossed
- for (DataServerHandler serverHandler :
- replicationServerDomain.getConnectedDSs().values())
+ if (shutdown)
{
- // Get number of pending changes for this server
- int nChanges = serverHandler.getRcvMsgQueueSize();
- if (logger.isTraceEnabled())
- {
- logger.trace(getMessage("Status analyzer: DS "
- + serverHandler.getServerId() + " has " + nChanges
- + " message(s) in writer queue."));
- }
-
- // Check status to know if it is relevant to change the status. Do not
- // take RSD lock to test. If we attempt to change the status whereas
- // the current status does allow it, this will be noticed by
- // the changeStatusFromStatusAnalyzer() method. This allows to take the
- // lock roughly only when needed versus every sleep time timeout.
- if (degradedStatusThreshold > 0)
- // Threshold value = 0 means no status analyzer (no degrading system)
- // we should not have that as the status analyzer thread should not be
- // created if this is the case, but for sanity purpose, we add this
- // test
- {
- if (nChanges >= degradedStatusThreshold)
- {
- if (serverHandler.getStatus() == NORMAL_STATUS
- && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
- {
- break;
- }
- }
- else
- {
- if (serverHandler.getStatus() == DEGRADED_STATUS
- && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
- {
- break;
- }
- }
- }
+ break;
}
+
+ replicationServerDomain.checkDSDegradedStatus();
}
done = true;
logger.trace(getMessage("Status analyzer is terminated."));
}
+
+
private String getMessage(String message)
{
return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -167,18 +128,7 @@
+ message;
}
- private boolean isInterrupted(DataServerHandler serverHandler,
- StatusMachineEvent event)
- {
- if (replicationServerDomain.changeStatus(serverHandler, event))
- {
- // Finish job and let thread die
- logger.trace(
- getMessage("Status analyzer has been interrupted and will die."));
- return true;
- }
- return false;
- }
+
/**
* Stops the thread.
@@ -197,9 +147,11 @@
}
}
+
+
/**
- * Waits for analyzer death. If not finished within 2 seconds,
- * forces interruption
+ * Waits for analyzer death. If not finished within 2 seconds, forces
+ * interruption
*/
public void waitForShutdown()
{
@@ -217,25 +169,10 @@
interrupt();
}
}
- } catch (InterruptedException e)
+ }
+ catch (InterruptedException e)
{
// exit the loop if this thread is interrupted.
}
}
-
- /**
- * Sets the threshold value.
- * @param degradedStatusThreshold The new threshold value.
- */
- public void setDegradedStatusThreshold(int degradedStatusThreshold)
- {
- if (logger.isTraceEnabled())
- {
- logger.trace(getMessage(
- "Directory server status analyzer changing threshold value to "
- + degradedStatusThreshold));
- }
-
- this.degradedStatusThreshold = degradedStatusThreshold;
- }
}
--
Gitblit v1.10.0