From 36fc2f63761b54cbfc48cf30d6f4b2b5f7fc90ba Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 20 Mar 2014 18:23:26 +0000
Subject: [PATCH] OPENDJ-1354: replication threads BLOCKED in pendingChanges queue
---
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 67 ++++++++++++++++++++++++++++-----
1 files changed, 57 insertions(+), 10 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index af28824..02d6849 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -50,6 +50,8 @@
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
@@ -2379,8 +2381,7 @@
localReplicationServer.getDegradedStatusThreshold();
if (degradedStatusThreshold > 0) // 0 means no status analyzer
{
- final StatusAnalyzer thread =
- new StatusAnalyzer(this, degradedStatusThreshold);
+ final StatusAnalyzer thread = new StatusAnalyzer(this);
if (statusAnalyzer.compareAndSet(null, thread))
{
thread.start();
@@ -2670,15 +2671,8 @@
{
// Requested to stop analyzers
stopStatusAnalyzer();
- return;
}
-
- final StatusAnalyzer saThread = statusAnalyzer.get();
- if (saThread != null) // it is running
- {
- saThread.setDegradedStatusThreshold(degradedStatusThreshold);
- }
- else if (connectedDSs.size() > 0)
+ else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
{
// Requested to start analyzers with provided threshold value
startStatusAnalyzer();
@@ -2754,4 +2748,57 @@
+ " and port=" + localReplicationServer.getReplicationPort()
+ ": " + message);
}
+
+
+
+ /**
+ * Go through each connected DS, get the number of pending changes we have for
+ * it and change status accordingly if threshold value is crossed/uncrossed.
+ */
+ void checkDSDegradedStatus()
+ {
+ final int degradedStatusThreshold = localReplicationServer
+ .getDegradedStatusThreshold();
+ // Threshold value = 0 means no status analyzer (no degrading system)
+ // we should not have that as the status analyzer thread should not be
+ // created if this is the case, but for sanity purpose, we add this
+ // test
+ if (degradedStatusThreshold > 0)
+ {
+ for (DataServerHandler serverHandler : getConnectedDSs().values())
+ {
+ // Get number of pending changes for this server
+ final int nChanges = serverHandler.getRcvMsgQueueSize();
+ if (logger.isTraceEnabled())
+ {
+ logger.trace("In RS " + getLocalRSServerId() + ", for baseDN="
+ + getBaseDN() + ": " + "Status analyzer: DS "
+ + serverHandler.getServerId() + " has " + nChanges
+ + " message(s) in writer queue.");
+ }
+
+ // Check status to know if it is relevant to change the status. Do not
+ // take RSD lock to test. If we attempt to change the status whereas
+ // the current status does allow it, this will be noticed by
+ // the changeStatusFromStatusAnalyzer() method. This allows to take the
+ // lock roughly only when needed versus every sleep time timeout.
+ if (nChanges >= degradedStatusThreshold)
+ {
+ if (serverHandler.getStatus() == NORMAL_STATUS
+ && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
+ {
+ break; // Interrupted.
+ }
+ }
+ else
+ {
+ if (serverHandler.getStatus() == DEGRADED_STATUS
+ && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
+ {
+ break; // Interrupted.
+ }
+ }
+ }
+ }
+ }
}
--
Gitblit v1.10.0