mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
20.23.2014 36fc2f63761b54cbfc48cf30d6f4b2b5f7fc90ba
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -50,6 +50,8 @@
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
@@ -2379,8 +2381,7 @@
        localReplicationServer.getDegradedStatusThreshold();
    if (degradedStatusThreshold > 0) // 0 means no status analyzer
    {
      final StatusAnalyzer thread =
          new StatusAnalyzer(this, degradedStatusThreshold);
      final StatusAnalyzer thread = new StatusAnalyzer(this);
      if (statusAnalyzer.compareAndSet(null, thread))
      {
        thread.start();
@@ -2670,15 +2671,8 @@
    {
      // Requested to stop analyzers
      stopStatusAnalyzer();
      return;
    }
    final StatusAnalyzer saThread = statusAnalyzer.get();
    if (saThread != null) // it is running
    {
      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
    }
    else if (connectedDSs.size() > 0)
    else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
    {
      // Requested to start analyzers with provided threshold value
      startStatusAnalyzer();
@@ -2754,4 +2748,57 @@
        + " and port=" + localReplicationServer.getReplicationPort()
        + ": " + message);
  }
  /**
   * Go through each connected DS, get the number of pending changes we have for
   * it and change status accordingly if threshold value is crossed/uncrossed.
   */
  void checkDSDegradedStatus()
  {
    final int degradedStatusThreshold = localReplicationServer
        .getDegradedStatusThreshold();
    // Threshold value = 0 means no status analyzer (no degrading system)
    // we should not have that as the status analyzer thread should not be
    // created if this is the case, but for sanity purpose, we add this
    // test
    if (degradedStatusThreshold > 0)
    {
      for (DataServerHandler serverHandler : getConnectedDSs().values())
      {
        // Get number of pending changes for this server
        final int nChanges = serverHandler.getRcvMsgQueueSize();
        if (logger.isTraceEnabled())
        {
          logger.trace("In RS " + getLocalRSServerId() + ", for baseDN="
              + getBaseDN() + ": " + "Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue.");
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (nChanges >= degradedStatusThreshold)
        {
          if (serverHandler.getStatus() == NORMAL_STATUS
              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
          {
            break; // Interrupted.
          }
        }
        else
        {
          if (serverHandler.getStatus() == DEGRADED_STATUS
              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
          {
            break; // Interrupted.
          }
        }
      }
    }
  }
}