mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.32.2013 c63e1f305327734be21f5ce0e21bdd2f7a4d143b
opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -29,11 +29,12 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.types.DebugLogLevel;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
/**
 * This thread is in charge of periodically determining if the connected
@@ -85,7 +86,6 @@
  }
  /**
   * Run method for the StatusAnalyzer.
   * Analyzes if servers are late or not, and change their status accordingly.
   */
  @Override
@@ -93,13 +93,11 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Directory server status analyzer starting for dn " +
        replicationServerDomain.getBaseDn());
      TRACER.debugInfo(
          getMessage("Directory server status analyzer starting."));
    }
    final int localRsId = replicationServerDomain.getLocalRSServerId();
    boolean interrupted = false;
    while (!shutdown && !interrupted)
    while (!shutdown)
    {
      synchronized (shutdownLock)
      {
@@ -126,22 +124,21 @@
      // for it and change status accordingly if threshold value is
      // crossed/uncrossed
      for (DataServerHandler serverHandler :
        replicationServerDomain.getConnectedDSs(). values())
        replicationServerDomain.getConnectedDSs().values())
      {
        // Get number of pending changes for this server
        int nChanges = serverHandler.getRcvMsgQueueSize();
        if (debugEnabled())
        {
          TRACER.debugInfo("Status analyzer for dn "
              + replicationServerDomain.getBaseDn() + " DS "
          TRACER.debugInfo(getMessage("Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue. This is in RS " + localRsId);
              + " message(s) in writer queue."));
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // we are in a status that do not allows that, this will be noticed by
        // the changeStatusFromStatusAnalyzer method. This allows to take the
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (degradedStatusThreshold > 0)
          // Threshold value = 0 means no status analyzer (no degrading system)
@@ -151,39 +148,18 @@
        {
          if (nChanges >= degradedStatusThreshold)
          {
            if (serverHandler.getStatus() == ServerStatus.NORMAL_STATUS)
            if (serverHandler.getStatus() == NORMAL_STATUS
                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
            {
              interrupted =
                replicationServerDomain.changeStatusFromStatusAnalyzer(
                serverHandler,
                StatusMachineEvent.TO_DEGRADED_STATUS_EVENT);
              if (interrupted)
              {
                // Finish job and let thread die
                TRACER.debugInfo("Status analyzer for dn "
                    + replicationServerDomain.getBaseDn()
                    + " has been interrupted and will die. This is in RS "
                    + localRsId);
                break;
              }
              break;
            }
          } else
          }
          else
          {
            if (serverHandler.getStatus() == ServerStatus.DEGRADED_STATUS)
            if (serverHandler.getStatus() == DEGRADED_STATUS
                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
            {
              interrupted =
                replicationServerDomain.changeStatusFromStatusAnalyzer(
                serverHandler,
                StatusMachineEvent.TO_NORMAL_STATUS_EVENT);
              if (interrupted)
              {
                // Finish job and let thread die
                TRACER.debugInfo("Status analyzer for dn "
                    + replicationServerDomain.getBaseDn()
                    + " has been interrupted and will die. This is in RS "
                    + localRsId);
                break;
              }
              break;
            }
          }
        }
@@ -191,9 +167,28 @@
    }
    done = true;
    TRACER.debugInfo("Status analyzer for dn "
        + replicationServerDomain.getBaseDn() + " is terminated."
        + " This is in RS " + localRsId);
    TRACER.debugInfo(getMessage("Status analyzer is terminated."));
  }
  private String getMessage(String message)
  {
    return "In RS " + replicationServerDomain.getLocalRSServerId()
        + ", for base dn " + replicationServerDomain.getBaseDn() + ": "
        + message;
  }
  private boolean isInterrupted(DataServerHandler serverHandler,
      StatusMachineEvent event)
  {
    if (replicationServerDomain.changeStatusFromStatusAnalyzer(serverHandler,
        event))
    {
      // Finish job and let thread die
      TRACER.debugInfo(
          getMessage("Status analyzer has been interrupted and will die."));
      return true;
    }
    return false;
  }
  /**
@@ -208,9 +203,7 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("Shutting down status analyzer for dn "
            + replicationServerDomain.getBaseDn()
            + " in RS " + replicationServerDomain.getLocalRSServerId());
        TRACER.debugInfo(getMessage("Shutting down status analyzer."));
      }
    }
  }
@@ -231,9 +224,7 @@
        n++;
        if (n >= FACTOR)
        {
          TRACER.debugInfo("Interrupting status analyzer for dn " +
              replicationServerDomain.getBaseDn() + " in RS " +
              replicationServerDomain.getLocalRSServerId());
          TRACER.debugInfo(getMessage("Interrupting status analyzer."));
          interrupt();
        }
      }
@@ -251,9 +242,9 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Directory server status analyzer for dn " +
        replicationServerDomain.getBaseDn() + " changing threshold value to " +
        degradedStatusThreshold);
      TRACER.debugInfo(getMessage(
          "Directory server status analyzer changing threshold value to "
              + degradedStatusThreshold));
    }
    this.degradedStatusThreshold = degradedStatusThreshold;