From 36fc2f63761b54cbfc48cf30d6f4b2b5f7fc90ba Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 20 Mar 2014 18:23:26 +0000
Subject: [PATCH] OPENDJ-1354: replication threads BLOCKED in pendingChanges queue

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java |  125 ++++++++++-------------------------------
 1 files changed, 31 insertions(+), 94 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index d236211..9fc7120 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -26,12 +26,12 @@
  */
 package org.opends.server.replication.server;
 
-import org.opends.server.api.DirectoryThread;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.opends.server.replication.common.StatusMachineEvent;
 
-import static org.opends.server.replication.common.ServerStatus.*;
-import static org.opends.server.replication.common.StatusMachineEvent.*;
+
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.api.DirectoryThread;
+
+
 
 /**
  * This thread is in charge of periodically determining if the connected
@@ -45,29 +45,26 @@
  */
 public class StatusAnalyzer extends DirectoryThread
 {
-
-  private volatile boolean shutdown = false;
-  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
-  private final ReplicationServerDomain replicationServerDomain;
-  private volatile int degradedStatusThreshold = -1;
+  private static final LocalizedLogger logger = LocalizedLogger
+      .getLoggerForThisClass();
 
   /** Sleep time for the thread, in ms. */
   private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
 
+  private final ReplicationServerDomain replicationServerDomain;
+  private final Object shutdownLock = new Object();
+  private volatile boolean shutdown = false;
   private volatile boolean done = false;
 
-  private final Object shutdownLock = new Object();
+
 
   /**
    * Create a StatusAnalyzer.
-   * @param replicationServerDomain The ReplicationServerDomain the status
-   *        analyzer is for.
-   * @param degradedStatusThreshold The pending changes threshold value to be
-   * used for putting a DS in DEGRADED_STATUS.
+   *
+   * @param replicationServerDomain
+   *          The ReplicationServerDomain the status analyzer is for.
    */
-  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
-    int degradedStatusThreshold)
+  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
   {
     super("Replication server RS("
         + replicationServerDomain.getLocalRSServerId()
@@ -75,9 +72,10 @@
         + "\"");
 
     this.replicationServerDomain = replicationServerDomain;
-    this.degradedStatusThreshold = degradedStatusThreshold;
   }
 
+
+
   /**
    * Analyzes if servers are late or not, and change their status accordingly.
    */
@@ -86,8 +84,7 @@
   {
     if (logger.isTraceEnabled())
     {
-      logger.trace(
-          getMessage("Directory server status analyzer starting."));
+      logger.trace(getMessage("Directory server status analyzer starting."));
     }
 
     while (!shutdown)
@@ -110,56 +107,20 @@
         }
       }
 
-      // Go through each connected DS, get the number of pending changes we have
-      // for it and change status accordingly if threshold value is
-      // crossed/uncrossed
-      for (DataServerHandler serverHandler :
-        replicationServerDomain.getConnectedDSs().values())
+      if (shutdown)
       {
-        // Get number of pending changes for this server
-        int nChanges = serverHandler.getRcvMsgQueueSize();
-        if (logger.isTraceEnabled())
-        {
-          logger.trace(getMessage("Status analyzer: DS "
-              + serverHandler.getServerId() + " has " + nChanges
-              + " message(s) in writer queue."));
-        }
-
-        // Check status to know if it is relevant to change the status. Do not
-        // take RSD lock to test. If we attempt to change the status whereas
-        // the current status does allow it, this will be noticed by
-        // the changeStatusFromStatusAnalyzer() method. This allows to take the
-        // lock roughly only when needed versus every sleep time timeout.
-        if (degradedStatusThreshold > 0)
-          // Threshold value = 0 means no status analyzer (no degrading system)
-          // we should not have that as the status analyzer thread should not be
-          // created if this is the case, but for sanity purpose, we add this
-          // test
-        {
-          if (nChanges >= degradedStatusThreshold)
-          {
-            if (serverHandler.getStatus() == NORMAL_STATUS
-                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
-            {
-              break;
-            }
-          }
-          else
-          {
-            if (serverHandler.getStatus() == DEGRADED_STATUS
-                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
-            {
-              break;
-            }
-          }
-        }
+        break;
       }
+
+      replicationServerDomain.checkDSDegradedStatus();
     }
 
     done = true;
     logger.trace(getMessage("Status analyzer is terminated."));
   }
 
+
+
   private String getMessage(String message)
   {
     return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -167,18 +128,7 @@
         + message;
   }
 
-  private boolean isInterrupted(DataServerHandler serverHandler,
-      StatusMachineEvent event)
-  {
-    if (replicationServerDomain.changeStatus(serverHandler, event))
-    {
-      // Finish job and let thread die
-      logger.trace(
-          getMessage("Status analyzer has been interrupted and will die."));
-      return true;
-    }
-    return false;
-  }
+
 
   /**
    * Stops the thread.
@@ -197,9 +147,11 @@
     }
   }
 
+
+
   /**
-   * Waits for analyzer death. If not finished within 2 seconds,
-   * forces interruption
+   * Waits for analyzer death. If not finished within 2 seconds, forces
+   * interruption
    */
   public void waitForShutdown()
   {
@@ -217,25 +169,10 @@
           interrupt();
         }
       }
-    } catch (InterruptedException e)
+    }
+    catch (InterruptedException e)
     {
       // exit the loop if this thread is interrupted.
     }
   }
-
-  /**
-   * Sets the threshold value.
-   * @param degradedStatusThreshold The new threshold value.
-   */
-  public void setDegradedStatusThreshold(int degradedStatusThreshold)
-  {
-    if (logger.isTraceEnabled())
-    {
-      logger.trace(getMessage(
-          "Directory server status analyzer changing threshold value to "
-              + degradedStatusThreshold));
-    }
-
-    this.degradedStatusThreshold = degradedStatusThreshold;
-  }
 }

--
Gitblit v1.10.0