From 36fc2f63761b54cbfc48cf30d6f4b2b5f7fc90ba Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 20 Mar 2014 18:23:26 +0000
Subject: [PATCH] OPENDJ-1354: replication threads BLOCKED in pendingChanges queue

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |   67 ++++++++++++++--
 opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java          |  125 +++++++-----------------------
 2 files changed, 88 insertions(+), 104 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index af28824..02d6849 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -50,6 +50,8 @@
 import org.forgerock.opendj.ldap.ResultCode;
 
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.replication.common.StatusMachineEvent.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
 import static org.opends.server.util.StaticUtils.*;
 
@@ -2379,8 +2381,7 @@
         localReplicationServer.getDegradedStatusThreshold();
     if (degradedStatusThreshold > 0) // 0 means no status analyzer
     {
-      final StatusAnalyzer thread =
-          new StatusAnalyzer(this, degradedStatusThreshold);
+      final StatusAnalyzer thread = new StatusAnalyzer(this);
       if (statusAnalyzer.compareAndSet(null, thread))
       {
         thread.start();
@@ -2670,15 +2671,8 @@
     {
       // Requested to stop analyzers
       stopStatusAnalyzer();
-      return;
     }
-
-    final StatusAnalyzer saThread = statusAnalyzer.get();
-    if (saThread != null) // it is running
-    {
-      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
-    }
-    else if (connectedDSs.size() > 0)
+    else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
     {
       // Requested to start analyzers with provided threshold value
       startStatusAnalyzer();
@@ -2754,4 +2748,57 @@
         + " and port=" + localReplicationServer.getReplicationPort()
         + ": " + message);
   }
+
+
+
+  /**
+   * Go through each connected DS, get the number of pending changes we have for
+   * it and change status accordingly if threshold value is crossed/uncrossed.
+   */
+  void checkDSDegradedStatus()
+  {
+    final int degradedStatusThreshold = localReplicationServer
+        .getDegradedStatusThreshold();
+    // Threshold value = 0 means no status analyzer (no degrading system)
+    // we should not have that as the status analyzer thread should not be
+    // created if this is the case, but for sanity purpose, we add this
+    // test
+    if (degradedStatusThreshold > 0)
+    {
+      for (DataServerHandler serverHandler : getConnectedDSs().values())
+      {
+        // Get number of pending changes for this server
+        final int nChanges = serverHandler.getRcvMsgQueueSize();
+        if (logger.isTraceEnabled())
+        {
+          logger.trace("In RS " + getLocalRSServerId() + ", for baseDN="
+              + getBaseDN() + ": " + "Status analyzer: DS "
+              + serverHandler.getServerId() + " has " + nChanges
+              + " message(s) in writer queue.");
+        }
+
+        // Check status to know if it is relevant to change the status. Do not
+        // take RSD lock to test. If we attempt to change the status whereas
+        // the current status does allow it, this will be noticed by
+        // the changeStatusFromStatusAnalyzer() method. This allows to take the
+        // lock roughly only when needed versus every sleep time timeout.
+        if (nChanges >= degradedStatusThreshold)
+        {
+          if (serverHandler.getStatus() == NORMAL_STATUS
+              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
+          {
+            break; // Interrupted.
+          }
+        }
+        else
+        {
+          if (serverHandler.getStatus() == DEGRADED_STATUS
+              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
+          {
+            break; // Interrupted.
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index d236211..9fc7120 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -26,12 +26,12 @@
  */
 package org.opends.server.replication.server;
 
-import org.opends.server.api.DirectoryThread;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.opends.server.replication.common.StatusMachineEvent;
 
-import static org.opends.server.replication.common.ServerStatus.*;
-import static org.opends.server.replication.common.StatusMachineEvent.*;
+
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.api.DirectoryThread;
+
+
 
 /**
  * This thread is in charge of periodically determining if the connected
@@ -45,29 +45,26 @@
  */
 public class StatusAnalyzer extends DirectoryThread
 {
-
-  private volatile boolean shutdown = false;
-  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
-  private final ReplicationServerDomain replicationServerDomain;
-  private volatile int degradedStatusThreshold = -1;
+  private static final LocalizedLogger logger = LocalizedLogger
+      .getLoggerForThisClass();
 
   /** Sleep time for the thread, in ms. */
   private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
 
+  private final ReplicationServerDomain replicationServerDomain;
+  private final Object shutdownLock = new Object();
+  private volatile boolean shutdown = false;
   private volatile boolean done = false;
 
-  private final Object shutdownLock = new Object();
+
 
   /**
    * Create a StatusAnalyzer.
-   * @param replicationServerDomain The ReplicationServerDomain the status
-   *        analyzer is for.
-   * @param degradedStatusThreshold The pending changes threshold value to be
-   * used for putting a DS in DEGRADED_STATUS.
+   *
+   * @param replicationServerDomain
+   *          The ReplicationServerDomain the status analyzer is for.
    */
-  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
-    int degradedStatusThreshold)
+  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
   {
     super("Replication server RS("
         + replicationServerDomain.getLocalRSServerId()
@@ -75,9 +72,10 @@
         + "\"");
 
     this.replicationServerDomain = replicationServerDomain;
-    this.degradedStatusThreshold = degradedStatusThreshold;
   }
 
+
+
   /**
    * Analyzes if servers are late or not, and change their status accordingly.
    */
@@ -86,8 +84,7 @@
   {
     if (logger.isTraceEnabled())
     {
-      logger.trace(
-          getMessage("Directory server status analyzer starting."));
+      logger.trace(getMessage("Directory server status analyzer starting."));
     }
 
     while (!shutdown)
@@ -110,56 +107,20 @@
         }
       }
 
-      // Go through each connected DS, get the number of pending changes we have
-      // for it and change status accordingly if threshold value is
-      // crossed/uncrossed
-      for (DataServerHandler serverHandler :
-        replicationServerDomain.getConnectedDSs().values())
+      if (shutdown)
       {
-        // Get number of pending changes for this server
-        int nChanges = serverHandler.getRcvMsgQueueSize();
-        if (logger.isTraceEnabled())
-        {
-          logger.trace(getMessage("Status analyzer: DS "
-              + serverHandler.getServerId() + " has " + nChanges
-              + " message(s) in writer queue."));
-        }
-
-        // Check status to know if it is relevant to change the status. Do not
-        // take RSD lock to test. If we attempt to change the status whereas
-        // the current status does allow it, this will be noticed by
-        // the changeStatusFromStatusAnalyzer() method. This allows to take the
-        // lock roughly only when needed versus every sleep time timeout.
-        if (degradedStatusThreshold > 0)
-          // Threshold value = 0 means no status analyzer (no degrading system)
-          // we should not have that as the status analyzer thread should not be
-          // created if this is the case, but for sanity purpose, we add this
-          // test
-        {
-          if (nChanges >= degradedStatusThreshold)
-          {
-            if (serverHandler.getStatus() == NORMAL_STATUS
-                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
-            {
-              break;
-            }
-          }
-          else
-          {
-            if (serverHandler.getStatus() == DEGRADED_STATUS
-                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
-            {
-              break;
-            }
-          }
-        }
+        break;
       }
+
+      replicationServerDomain.checkDSDegradedStatus();
     }
 
     done = true;
     logger.trace(getMessage("Status analyzer is terminated."));
   }
 
+
+
   private String getMessage(String message)
   {
     return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -167,18 +128,7 @@
         + message;
   }
 
-  private boolean isInterrupted(DataServerHandler serverHandler,
-      StatusMachineEvent event)
-  {
-    if (replicationServerDomain.changeStatus(serverHandler, event))
-    {
-      // Finish job and let thread die
-      logger.trace(
-          getMessage("Status analyzer has been interrupted and will die."));
-      return true;
-    }
-    return false;
-  }
+
 
   /**
    * Stops the thread.
@@ -197,9 +147,11 @@
     }
   }
 
+
+
   /**
-   * Waits for analyzer death. If not finished within 2 seconds,
-   * forces interruption
+   * Waits for analyzer death. If not finished within 2 seconds, forces
+   * interruption
    */
   public void waitForShutdown()
   {
@@ -217,25 +169,10 @@
           interrupt();
         }
       }
-    } catch (InterruptedException e)
+    }
+    catch (InterruptedException e)
     {
       // exit the loop if this thread is interrupted.
     }
   }
-
-  /**
-   * Sets the threshold value.
-   * @param degradedStatusThreshold The new threshold value.
-   */
-  public void setDegradedStatusThreshold(int degradedStatusThreshold)
-  {
-    if (logger.isTraceEnabled())
-    {
-      logger.trace(getMessage(
-          "Directory server status analyzer changing threshold value to "
-              + degradedStatusThreshold));
-    }
-
-    this.degradedStatusThreshold = degradedStatusThreshold;
-  }
 }

--
Gitblit v1.10.0