From b875ab3f7b327f797ec4532015e45da6ae3fff56 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 08 Apr 2014 09:09:25 +0000
Subject: [PATCH] Backport fix for OPENDJ-1354: replication threads BLOCKED in pendingChanges queue

---
 opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java |  237 +++++++++++++++++++++++-----------------------------------
 1 files changed, 94 insertions(+), 143 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 34185c6..721ec23 100644
--- a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -22,18 +22,19 @@
  *
  *
  *      Copyright 2008-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
+
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.types.DebugLogLevel;
 
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.common.ServerStatus.*;
-import static org.opends.server.replication.common.StatusMachineEvent.*;
+
 
 /**
  * This thread is in charge of periodically determining if the connected
@@ -44,46 +45,45 @@
  * the threshold is uncrossed, the status analyzer must make the DS status
  * change back to NORMAL_STATUS. To have meaning of status, please refer to
  * ServerStatus class.
+ * <p>
+ * In addition, this thread is responsible for publishing any pending status
+ * messages.
  */
-public class StatusAnalyzer extends DirectoryThread
+class StatusAnalyzer extends DirectoryThread
 {
-
-  private volatile boolean shutdown = false;
-
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private final ReplicationServerDomain replicationServerDomain;
-  private volatile int degradedStatusThreshold = -1;
-
   /** Sleep time for the thread, in ms. */
   private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
 
-  private volatile boolean done = false;
+  private final ReplicationServerDomain replicationServerDomain;
+  private final Object eventMonitor = new Object();
+  private boolean pendingStatusMessage = false;
+  private long nextCheckDSDegradedStatusTime;
 
-  private final Object shutdownLock = new Object();
+
 
   /**
    * Create a StatusAnalyzer.
-   * @param replicationServerDomain The ReplicationServerDomain the status
-   *        analyzer is for.
-   * @param degradedStatusThreshold The pending changes threshold value to be
-   * used for putting a DS in DEGRADED_STATUS.
+   *
+   * @param replicationServerDomain
+   *          The ReplicationServerDomain the status analyzer is for.
    */
-  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
-    int degradedStatusThreshold)
+  StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
   {
     super("Replication server RS("
         + replicationServerDomain.getLocalRSServerId()
-        + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
+        + ") status monitor for domain \""
+        + replicationServerDomain.getBaseDN()
         + "\"");
-
     this.replicationServerDomain = replicationServerDomain;
-    this.degradedStatusThreshold = degradedStatusThreshold;
   }
 
+
+
   /**
    * Analyzes if servers are late or not, and change their status accordingly.
    */
@@ -96,79 +96,55 @@
           getMessage("Directory server status analyzer starting."));
     }
 
-    while (!shutdown)
+    try
     {
-      synchronized (shutdownLock)
+      while (true)
       {
-        if (!shutdown)
+        final boolean requestStatusBroadcastWasRequested;
+        synchronized (eventMonitor)
         {
-          try
+          if (!isShutdownInitiated() && !pendingStatusMessage)
           {
-            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
+            eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
           }
-          catch (InterruptedException e)
-          {
-            // Server shutdown monitor may interrupt slow threads.
-            if (debugEnabled())
-            {
-              TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            }
-            shutdown = true;
-            break;
-          }
-        }
-      }
-
-      // Go through each connected DS, get the number of pending changes we have
-      // for it and change status accordingly if threshold value is
-      // crossed/uncrossed
-      for (DataServerHandler serverHandler :
-        replicationServerDomain.getConnectedDSs().values())
-      {
-        // Get number of pending changes for this server
-        int nChanges = serverHandler.getRcvMsgQueueSize();
-        if (debugEnabled())
-        {
-          TRACER.debugInfo(getMessage("Status analyzer: DS "
-              + serverHandler.getServerId() + " has " + nChanges
-              + " message(s) in writer queue."));
+          requestStatusBroadcastWasRequested = pendingStatusMessage;
+          pendingStatusMessage = false;
         }
 
-        // Check status to know if it is relevant to change the status. Do not
-        // take RSD lock to test. If we attempt to change the status whereas
-        // the current status does allow it, this will be noticed by
-        // the changeStatusFromStatusAnalyzer() method. This allows to take the
-        // lock roughly only when needed versus every sleep time timeout.
-        if (degradedStatusThreshold > 0)
-          // Threshold value = 0 means no status analyzer (no degrading system)
-          // we should not have that as the status analyzer thread should not be
-          // created if this is the case, but for sanity purpose, we add this
-          // test
+        if (isShutdownInitiated())
         {
-          if (nChanges >= degradedStatusThreshold)
-          {
-            if (serverHandler.getStatus() == NORMAL_STATUS
-                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
-            {
-              break;
-            }
-          }
-          else
-          {
-            if (serverHandler.getStatus() == DEGRADED_STATUS
-                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
-            {
-              break;
-            }
-          }
+          break;
+        }
+
+        // Broadcast heartbeats, topology messages, etc if requested.
+        if (requestStatusBroadcastWasRequested)
+        {
+          replicationServerDomain.sendPendingStatusMessages();
+        }
+
+        /*
+         * Check the degraded status for connected DS instances only if
+         * sufficient time has passed. The current time is not cached because
+         * the call to checkDSDegradedStatus may take some time.
+         */
+        if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
+        {
+          replicationServerDomain.checkDSDegradedStatus();
+          nextCheckDSDegradedStatusTime = System.currentTimeMillis()
+              + STATUS_ANALYZER_SLEEP_TIME;
         }
       }
     }
+    catch (InterruptedException e)
+    {
+      // Forcefully stopped.
+    }
 
-    done = true;
     TRACER.debugInfo(getMessage("Status analyzer is terminated."));
   }
 
+
+
   private String getMessage(String message)
   {
     return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -176,75 +152,50 @@
         + message;
   }
 
-  private boolean isInterrupted(DataServerHandler serverHandler,
-      StatusMachineEvent event)
-  {
-    if (replicationServerDomain.changeStatus(serverHandler, event))
-    {
-      // Finish job and let thread die
-      TRACER.debugInfo(
-          getMessage("Status analyzer has been interrupted and will die."));
-      return true;
-    }
-    return false;
-  }
+
 
   /**
    * Stops the thread.
    */
-  public void shutdown()
+  void shutdown()
   {
-    synchronized (shutdownLock)
-    {
-      shutdown = true;
-      shutdownLock.notifyAll();
-
-      if (debugEnabled())
-      {
-        TRACER.debugInfo(getMessage("Shutting down status analyzer."));
-      }
-    }
-  }
-
-  /**
-   * Waits for analyzer death. If not finished within 2 seconds,
-   * forces interruption
-   */
-  public void waitForShutdown()
-  {
-    try
-    {
-      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
-      int n = 0;
-      while (!done && this.isAlive())
-      {
-        Thread.sleep(50);
-        n++;
-        if (n >= FACTOR)
-        {
-          TRACER.debugInfo(getMessage("Interrupting status analyzer."));
-          interrupt();
-        }
-      }
-    } catch (InterruptedException e)
-    {
-      // exit the loop if this thread is interrupted.
-    }
-  }
-
-  /**
-   * Sets the threshold value.
-   * @param degradedStatusThreshold The new threshold value.
-   */
-  public void setDegradedStatusThreshold(int degradedStatusThreshold)
-  {
+    initiateShutdown();
     if (debugEnabled())
     {
-      TRACER.debugInfo(getMessage(
-          "Directory server status analyzer changing threshold value to "
-              + degradedStatusThreshold));
+      TRACER.debugInfo(getMessage("Shutting down status analyzer."));
     }
+    synchronized (eventMonitor)
+    {
+      eventMonitor.notifyAll();
+    }
+    try
+    {
+      join(2000);
+    }
+    catch (InterruptedException e)
+    {
+      // Trapped: forcefully stop the thread.
+    }
+    if (isAlive())
+    {
+      // The join timed out or was interrupted so attempt to forcefully stop the
+      // analyzer.
+      interrupt();
+    }
+  }
 
-    this.degradedStatusThreshold = degradedStatusThreshold;
+
+
+  /**
+   * Requests that a topology state related message be broadcast to the rest of
+   * the topology. Messages include DS heartbeats, topology information, etc.
+   */
+  void notifyPendingStatusMessage()
+  {
+    synchronized (eventMonitor)
+    {
+      pendingStatusMessage = true;
+      eventMonitor.notifyAll();
+    }
   }
 }

--
Gitblit v1.10.0