From 36fc2f63761b54cbfc48cf30d6f4b2b5f7fc90ba Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 20 Mar 2014 18:23:26 +0000
Subject: [PATCH] OPENDJ-1354: replication threads BLOCKED in pendingChanges queue

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |   67 ++++++++++++++++++++++++++++-----
 1 files changed, 57 insertions(+), 10 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index af28824..02d6849 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -50,6 +50,8 @@
 import org.forgerock.opendj.ldap.ResultCode;
 
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.replication.common.StatusMachineEvent.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
 import static org.opends.server.util.StaticUtils.*;
 
@@ -2379,8 +2381,7 @@
         localReplicationServer.getDegradedStatusThreshold();
     if (degradedStatusThreshold > 0) // 0 means no status analyzer
     {
-      final StatusAnalyzer thread =
-          new StatusAnalyzer(this, degradedStatusThreshold);
+      final StatusAnalyzer thread = new StatusAnalyzer(this);
       if (statusAnalyzer.compareAndSet(null, thread))
       {
         thread.start();
@@ -2670,15 +2671,8 @@
     {
       // Requested to stop analyzers
       stopStatusAnalyzer();
-      return;
     }
-
-    final StatusAnalyzer saThread = statusAnalyzer.get();
-    if (saThread != null) // it is running
-    {
-      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
-    }
-    else if (connectedDSs.size() > 0)
+    else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
     {
       // Requested to start analyzers with provided threshold value
       startStatusAnalyzer();
@@ -2754,4 +2748,57 @@
         + " and port=" + localReplicationServer.getReplicationPort()
         + ": " + message);
   }
+
+
+
+  /**
+   * Go through each connected DS, get the number of pending changes we have for
+   * it and change status accordingly if threshold value is crossed/uncrossed.
+   */
+  void checkDSDegradedStatus()
+  {
+    final int degradedStatusThreshold = localReplicationServer
+        .getDegradedStatusThreshold();
+    // Threshold value = 0 means no status analyzer (no degrading system)
+    // we should not have that as the status analyzer thread should not be
+    // created if this is the case, but for sanity purpose, we add this
+    // test
+    if (degradedStatusThreshold > 0)
+    {
+      for (DataServerHandler serverHandler : getConnectedDSs().values())
+      {
+        // Get number of pending changes for this server
+        final int nChanges = serverHandler.getRcvMsgQueueSize();
+        if (logger.isTraceEnabled())
+        {
+          logger.trace("In RS " + getLocalRSServerId() + ", for baseDN="
+              + getBaseDN() + ": " + "Status analyzer: DS "
+              + serverHandler.getServerId() + " has " + nChanges
+              + " message(s) in writer queue.");
+        }
+
+        // Check status to know if it is relevant to change the status. Do not
+        // take RSD lock to test. If we attempt to change the status whereas
+        // the current status does allow it, this will be noticed by
+        // the changeStatusFromStatusAnalyzer() method. This allows to take the
+        // lock roughly only when needed versus every sleep time timeout.
+        if (nChanges >= degradedStatusThreshold)
+        {
+          if (serverHandler.getStatus() == NORMAL_STATUS
+              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
+          {
+            break; // Interrupted.
+          }
+        }
+        else
+        {
+          if (serverHandler.getStatus() == DEGRADED_STATUS
+              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
+          {
+            break; // Interrupted.
+          }
+        }
+      }
+    }
+  }
 }

--
Gitblit v1.10.0