From 21af6610b07617ecbf1b826310a2f244deb4d348 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 25 Mar 2014 15:02:51 +0000
Subject: [PATCH] Fix OPENDJ-1354 - replication threads BLOCKED in pendingChanges queue

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java |  129 ++++++++++++++++++++++++------------------
 1 files changed, 73 insertions(+), 56 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 9fc7120..2bcf574 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -42,8 +42,11 @@
  * the threshold is uncrossed, the status analyzer must make the DS status
  * change back to NORMAL_STATUS. To have meaning of status, please refer to
  * ServerStatus class.
+ * <p>
+ * In addition, this thread is responsible for publishing any pending status
+ * messages.
  */
-public class StatusAnalyzer extends DirectoryThread
+class StatusAnalyzer extends DirectoryThread
 {
   private static final LocalizedLogger logger = LocalizedLogger
       .getLoggerForThisClass();
@@ -52,9 +55,9 @@
   private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
 
   private final ReplicationServerDomain replicationServerDomain;
-  private final Object shutdownLock = new Object();
-  private volatile boolean shutdown = false;
-  private volatile boolean done = false;
+  private final Object eventMonitor = new Object();
+  private boolean pendingStatusMessage = false;
+  private long nextCheckDSDegradedStatusTime;
 
 
 
@@ -64,13 +67,13 @@
    * @param replicationServerDomain
    *          The ReplicationServerDomain the status analyzer is for.
    */
-  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
+  StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
   {
     super("Replication server RS("
         + replicationServerDomain.getLocalRSServerId()
-        + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
+        + ") status monitor for domain \""
+        + replicationServerDomain.getBaseDN()
         + "\"");
-
     this.replicationServerDomain = replicationServerDomain;
   }
 
@@ -87,35 +90,50 @@
       logger.trace(getMessage("Directory server status analyzer starting."));
     }
 
-    while (!shutdown)
+    try
     {
-      synchronized (shutdownLock)
+      while (true)
       {
-        if (!shutdown)
+        final boolean requestStatusBroadcastWasRequested;
+        synchronized (eventMonitor)
         {
-          try
+          if (!isShutdownInitiated() && !pendingStatusMessage)
           {
-            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
+            eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
           }
-          catch (InterruptedException e)
-          {
-            // Server shutdown monitor may interrupt slow threads.
-            logger.traceException(e);
-            shutdown = true;
-            break;
-          }
+          requestStatusBroadcastWasRequested = pendingStatusMessage;
+          pendingStatusMessage = false;
+        }
+
+        if (isShutdownInitiated())
+        {
+          break;
+        }
+
+        // Broadcast heartbeats, topology messages, etc if requested.
+        if (requestStatusBroadcastWasRequested)
+        {
+          replicationServerDomain.sendPendingStatusMessages();
+        }
+
+        /*
+         * Check the degraded status for connected DS instances only if
+         * sufficient time has passed. The current time is not cached because
+         * the call to checkDSDegradedStatus may take some time.
+         */
+        if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
+        {
+          replicationServerDomain.checkDSDegradedStatus();
+          nextCheckDSDegradedStatusTime = System.currentTimeMillis()
+              + STATUS_ANALYZER_SLEEP_TIME;
         }
       }
-
-      if (shutdown)
-      {
-        break;
-      }
-
-      replicationServerDomain.checkDSDegradedStatus();
+    }
+    catch (InterruptedException e)
+    {
+      // Forcefully stopped.
     }
 
-    done = true;
     logger.trace(getMessage("Status analyzer is terminated."));
   }
 
@@ -133,46 +151,45 @@
   /**
    * Stops the thread.
    */
-  public void shutdown()
+  void shutdown()
   {
-    synchronized (shutdownLock)
+    initiateShutdown();
+    if (logger.isTraceEnabled())
     {
-      shutdown = true;
-      shutdownLock.notifyAll();
-
-      if (logger.isTraceEnabled())
-      {
-        logger.trace(getMessage("Shutting down status analyzer."));
-      }
+      logger.trace(getMessage("Shutting down status analyzer."));
+    }
+    synchronized (eventMonitor)
+    {
+      eventMonitor.notifyAll();
+    }
+    try
+    {
+      join(2000);
+    }
+    catch (InterruptedException e)
+    {
+      // Trapped: forcefully stop the thread.
+    }
+    if (isAlive())
+    {
+      // The join timed out or was interrupted so attempt to forcefully stop the
+      // analyzer.
+      interrupt();
     }
   }
 
 
 
   /**
-   * Waits for analyzer death. If not finished within 2 seconds, forces
-   * interruption
+   * Requests that a topology state related message be broadcast to the rest of
+   * the topology. Messages include DS heartbeats, topology information, etc.
    */
-  public void waitForShutdown()
+  void notifyPendingStatusMessage()
   {
-    try
+    synchronized (eventMonitor)
     {
-      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
-      int n = 0;
-      while (!done && this.isAlive())
-      {
-        Thread.sleep(50);
-        n++;
-        if (n >= FACTOR)
-        {
-          logger.trace(getMessage("Interrupting status analyzer."));
-          interrupt();
-        }
-      }
-    }
-    catch (InterruptedException e)
-    {
-      // exit the loop if this thread is interrupted.
+      pendingStatusMessage = true;
+      eventMonitor.notifyAll();
     }
   }
 }

--
Gitblit v1.10.0