mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.09.2014 b875ab3f7b327f797ec4532015e45da6ae3fff56
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -22,18 +22,19 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.types.DebugLogLevel;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
/**
 * This thread is in charge of periodically determining if the connected
@@ -44,46 +45,45 @@
 * the threshold is uncrossed, the status analyzer must make the DS status
 * change back to NORMAL_STATUS. To have meaning of status, please refer to
 * ServerStatus class.
 * <p>
 * In addition, this thread is responsible for publishing any pending status
 * messages.
 */
public class StatusAnalyzer extends DirectoryThread
class StatusAnalyzer extends DirectoryThread
{
  private volatile boolean shutdown = false;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private final ReplicationServerDomain replicationServerDomain;
  private volatile int degradedStatusThreshold = -1;
  /** Sleep time for the thread, in ms. */
  private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
  private volatile boolean done = false;
  private final ReplicationServerDomain replicationServerDomain;
  private final Object eventMonitor = new Object();
  private boolean pendingStatusMessage = false;
  private long nextCheckDSDegradedStatusTime;
  private final Object shutdownLock = new Object();
  /**
   * Create a StatusAnalyzer.
   * @param replicationServerDomain The ReplicationServerDomain the status
   *        analyzer is for.
   * @param degradedStatusThreshold The pending changes threshold value to be
   * used for putting a DS in DEGRADED_STATUS.
   *
   * @param replicationServerDomain
   *          The ReplicationServerDomain the status analyzer is for.
   */
  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
    int degradedStatusThreshold)
  StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
  {
    super("Replication server RS("
        + replicationServerDomain.getLocalRSServerId()
        + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
        + ") status monitor for domain \""
        + replicationServerDomain.getBaseDN()
        + "\"");
    this.replicationServerDomain = replicationServerDomain;
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  /**
   * Analyzes if servers are late or not, and change their status accordingly.
   */
@@ -96,79 +96,55 @@
          getMessage("Directory server status analyzer starting."));
    }
    while (!shutdown)
    try
    {
      synchronized (shutdownLock)
      while (true)
      {
        if (!shutdown)
        final boolean requestStatusBroadcastWasRequested;
        synchronized (eventMonitor)
        {
          try
          if (!isShutdownInitiated() && !pendingStatusMessage)
          {
            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
            eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
          }
          catch (InterruptedException e)
          {
            // Server shutdown monitor may interrupt slow threads.
            if (debugEnabled())
            {
              TRACER.debugCaught(DebugLogLevel.ERROR, e);
            }
            shutdown = true;
            break;
          }
        }
      }
      // Go through each connected DS, get the number of pending changes we have
      // for it and change status accordingly if threshold value is
      // crossed/uncrossed
      for (DataServerHandler serverHandler :
        replicationServerDomain.getConnectedDSs().values())
      {
        // Get number of pending changes for this server
        int nChanges = serverHandler.getRcvMsgQueueSize();
        if (debugEnabled())
        {
          TRACER.debugInfo(getMessage("Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue."));
          requestStatusBroadcastWasRequested = pendingStatusMessage;
          pendingStatusMessage = false;
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (degradedStatusThreshold > 0)
          // Threshold value = 0 means no status analyzer (no degrading system)
          // we should not have that as the status analyzer thread should not be
          // created if this is the case, but for sanity purpose, we add this
          // test
        if (isShutdownInitiated())
        {
          if (nChanges >= degradedStatusThreshold)
          {
            if (serverHandler.getStatus() == NORMAL_STATUS
                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
            {
              break;
            }
          }
          else
          {
            if (serverHandler.getStatus() == DEGRADED_STATUS
                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
            {
              break;
            }
          }
          break;
        }
        // Broadcast heartbeats, topology messages, etc if requested.
        if (requestStatusBroadcastWasRequested)
        {
          replicationServerDomain.sendPendingStatusMessages();
        }
        /*
         * Check the degraded status for connected DS instances only if
         * sufficient time has passed. The current time is not cached because
         * the call to checkDSDegradedStatus may take some time.
         */
        if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
        {
          replicationServerDomain.checkDSDegradedStatus();
          nextCheckDSDegradedStatusTime = System.currentTimeMillis()
              + STATUS_ANALYZER_SLEEP_TIME;
        }
      }
    }
    catch (InterruptedException e)
    {
      // Forcefully stopped.
    }
    done = true;
    TRACER.debugInfo(getMessage("Status analyzer is terminated."));
  }
  private String getMessage(String message)
  {
    return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -176,75 +152,50 @@
        + message;
  }
  private boolean isInterrupted(DataServerHandler serverHandler,
      StatusMachineEvent event)
  {
    if (replicationServerDomain.changeStatus(serverHandler, event))
    {
      // Finish job and let thread die
      TRACER.debugInfo(
          getMessage("Status analyzer has been interrupted and will die."));
      return true;
    }
    return false;
  }
  /**
   * Stops the thread.
   */
  public void shutdown()
  void shutdown()
  {
    synchronized (shutdownLock)
    {
      shutdown = true;
      shutdownLock.notifyAll();
      if (debugEnabled())
      {
        TRACER.debugInfo(getMessage("Shutting down status analyzer."));
      }
    }
  }
  /**
   * Waits for analyzer death. If not finished within 2 seconds,
   * forces interruption
   */
  public void waitForShutdown()
  {
    try
    {
      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
      int n = 0;
      while (!done && this.isAlive())
      {
        Thread.sleep(50);
        n++;
        if (n >= FACTOR)
        {
          TRACER.debugInfo(getMessage("Interrupting status analyzer."));
          interrupt();
        }
      }
    } catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
    }
  }
  /**
   * Sets the threshold value.
   * @param degradedStatusThreshold The new threshold value.
   */
  public void setDegradedStatusThreshold(int degradedStatusThreshold)
  {
    initiateShutdown();
    if (debugEnabled())
    {
      TRACER.debugInfo(getMessage(
          "Directory server status analyzer changing threshold value to "
              + degradedStatusThreshold));
      TRACER.debugInfo(getMessage("Shutting down status analyzer."));
    }
    synchronized (eventMonitor)
    {
      eventMonitor.notifyAll();
    }
    try
    {
      join(2000);
    }
    catch (InterruptedException e)
    {
      // Trapped: forcefully stop the thread.
    }
    if (isAlive())
    {
      // The join timed out or was interrupted so attempt to forcefully stop the
      // analyzer.
      interrupt();
    }
  }
    this.degradedStatusThreshold = degradedStatusThreshold;
  /**
   * Requests that a topology state related message be broadcast to the rest of
   * the topology. Messages include DS heartbeats, topology information, etc.
   */
  void notifyPendingStatusMessage()
  {
    synchronized (eventMonitor)
    {
      pendingStatusMessage = true;
      eventMonitor.notifyAll();
    }
  }
}