mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
25.02.2014 21af6610b07617ecbf1b826310a2f244deb4d348
opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -42,8 +42,11 @@
 * the threshold is uncrossed, the status analyzer must make the DS status
 * change back to NORMAL_STATUS. To have meaning of status, please refer to
 * ServerStatus class.
 * <p>
 * In addition, this thread is responsible for publishing any pending status
 * messages.
 */
public class StatusAnalyzer extends DirectoryThread
class StatusAnalyzer extends DirectoryThread
{
  private static final LocalizedLogger logger = LocalizedLogger
      .getLoggerForThisClass();
@@ -52,9 +55,9 @@
  private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
  private final ReplicationServerDomain replicationServerDomain;
  private final Object shutdownLock = new Object();
  private volatile boolean shutdown = false;
  private volatile boolean done = false;
  private final Object eventMonitor = new Object();
  private boolean pendingStatusMessage = false;
  private long nextCheckDSDegradedStatusTime;
@@ -64,13 +67,13 @@
   * @param replicationServerDomain
   *          The ReplicationServerDomain the status analyzer is for.
   */
  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
  StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
  {
    super("Replication server RS("
        + replicationServerDomain.getLocalRSServerId()
        + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
        + ") status monitor for domain \""
        + replicationServerDomain.getBaseDN()
        + "\"");
    this.replicationServerDomain = replicationServerDomain;
  }
@@ -87,35 +90,50 @@
      logger.trace(getMessage("Directory server status analyzer starting."));
    }
    while (!shutdown)
    try
    {
      synchronized (shutdownLock)
      while (true)
      {
        if (!shutdown)
        final boolean requestStatusBroadcastWasRequested;
        synchronized (eventMonitor)
        {
          try
          if (!isShutdownInitiated() && !pendingStatusMessage)
          {
            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
            eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
          }
          catch (InterruptedException e)
          {
            // Server shutdown monitor may interrupt slow threads.
            logger.traceException(e);
            shutdown = true;
            break;
          }
          requestStatusBroadcastWasRequested = pendingStatusMessage;
          pendingStatusMessage = false;
        }
        if (isShutdownInitiated())
        {
          break;
        }
        // Broadcast heartbeats, topology messages, etc if requested.
        if (requestStatusBroadcastWasRequested)
        {
          replicationServerDomain.sendPendingStatusMessages();
        }
        /*
         * Check the degraded status for connected DS instances only if
         * sufficient time has passed. The current time is not cached because
         * the call to checkDSDegradedStatus may take some time.
         */
        if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
        {
          replicationServerDomain.checkDSDegradedStatus();
          nextCheckDSDegradedStatusTime = System.currentTimeMillis()
              + STATUS_ANALYZER_SLEEP_TIME;
        }
      }
      if (shutdown)
      {
        break;
      }
      replicationServerDomain.checkDSDegradedStatus();
    }
    catch (InterruptedException e)
    {
      // Forcefully stopped.
    }
    done = true;
    logger.trace(getMessage("Status analyzer is terminated."));
  }
@@ -133,46 +151,45 @@
  /**
   * Stops the thread.
   */
  public void shutdown()
  void shutdown()
  {
    synchronized (shutdownLock)
    initiateShutdown();
    if (logger.isTraceEnabled())
    {
      shutdown = true;
      shutdownLock.notifyAll();
      if (logger.isTraceEnabled())
      {
        logger.trace(getMessage("Shutting down status analyzer."));
      }
      logger.trace(getMessage("Shutting down status analyzer."));
    }
    synchronized (eventMonitor)
    {
      eventMonitor.notifyAll();
    }
    try
    {
      join(2000);
    }
    catch (InterruptedException e)
    {
      // Trapped: forcefully stop the thread.
    }
    if (isAlive())
    {
      // The join timed out or was interrupted so attempt to forcefully stop the
      // analyzer.
      interrupt();
    }
  }
  /**
   * Waits for analyzer death. If not finished within 2 seconds, forces
   * interruption
   * Requests that a topology state related message be broadcast to the rest of
   * the topology. Messages include DS heartbeats, topology information, etc.
   */
  public void waitForShutdown()
  void notifyPendingStatusMessage()
  {
    try
    synchronized (eventMonitor)
    {
      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
      int n = 0;
      while (!done && this.isAlive())
      {
        Thread.sleep(50);
        n++;
        if (n >= FACTOR)
        {
          logger.trace(getMessage("Interrupting status analyzer."));
          interrupt();
        }
      }
    }
    catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
      pendingStatusMessage = true;
      eventMonitor.notifyAll();
    }
  }
}