mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
20.23.2014 36fc2f63761b54cbfc48cf30d6f4b2b5f7fc90ba
OPENDJ-1354: replication threads BLOCKED in pendingChanges queue

Initial refactoring work to reduce feature envy between StatusAnalyzer and ReplicationServerDomain.
2 files modified
192 ■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 67 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java 125 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -50,6 +50,8 @@
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
@@ -2379,8 +2381,7 @@
        localReplicationServer.getDegradedStatusThreshold();
    if (degradedStatusThreshold > 0) // 0 means no status analyzer
    {
      final StatusAnalyzer thread =
          new StatusAnalyzer(this, degradedStatusThreshold);
      final StatusAnalyzer thread = new StatusAnalyzer(this);
      if (statusAnalyzer.compareAndSet(null, thread))
      {
        thread.start();
@@ -2670,15 +2671,8 @@
    {
      // Requested to stop analyzers
      stopStatusAnalyzer();
      return;
    }
    final StatusAnalyzer saThread = statusAnalyzer.get();
    if (saThread != null) // it is running
    {
      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
    }
    else if (connectedDSs.size() > 0)
    else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
    {
      // Requested to start analyzers with provided threshold value
      startStatusAnalyzer();
@@ -2754,4 +2748,57 @@
        + " and port=" + localReplicationServer.getReplicationPort()
        + ": " + message);
  }
  /**
   * Go through each connected DS, get the number of pending changes we have for
   * it and change status accordingly if threshold value is crossed/uncrossed.
   */
  void checkDSDegradedStatus()
  {
    final int degradedStatusThreshold = localReplicationServer
        .getDegradedStatusThreshold();
    // Threshold value = 0 means no status analyzer (no degrading system)
    // we should not have that as the status analyzer thread should not be
    // created if this is the case, but for sanity purpose, we add this
    // test
    if (degradedStatusThreshold > 0)
    {
      for (DataServerHandler serverHandler : getConnectedDSs().values())
      {
        // Get number of pending changes for this server
        final int nChanges = serverHandler.getRcvMsgQueueSize();
        if (logger.isTraceEnabled())
        {
          logger.trace("In RS " + getLocalRSServerId() + ", for baseDN="
              + getBaseDN() + ": " + "Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue.");
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (nChanges >= degradedStatusThreshold)
        {
          if (serverHandler.getStatus() == NORMAL_STATUS
              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
          {
            break; // Interrupted.
          }
        }
        else
        {
          if (serverHandler.getStatus() == DEGRADED_STATUS
              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
          {
            break; // Interrupted.
          }
        }
      }
    }
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -26,12 +26,12 @@
 */
package org.opends.server.replication.server;
import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.replication.common.StatusMachineEvent;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.api.DirectoryThread;
/**
 * This thread is in charge of periodically determining if the connected
@@ -45,29 +45,26 @@
 */
public class StatusAnalyzer extends DirectoryThread
{
  private volatile boolean shutdown = false;
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private final ReplicationServerDomain replicationServerDomain;
  private volatile int degradedStatusThreshold = -1;
  private static final LocalizedLogger logger = LocalizedLogger
      .getLoggerForThisClass();
  /** Sleep time for the thread, in ms. */
  private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
  private final ReplicationServerDomain replicationServerDomain;
  private final Object shutdownLock = new Object();
  private volatile boolean shutdown = false;
  private volatile boolean done = false;
  private final Object shutdownLock = new Object();
  /**
   * Create a StatusAnalyzer.
   * @param replicationServerDomain The ReplicationServerDomain the status
   *        analyzer is for.
   * @param degradedStatusThreshold The pending changes threshold value to be
   * used for putting a DS in DEGRADED_STATUS.
   *
   * @param replicationServerDomain
   *          The ReplicationServerDomain the status analyzer is for.
   */
  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
    int degradedStatusThreshold)
  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
  {
    super("Replication server RS("
        + replicationServerDomain.getLocalRSServerId()
@@ -75,9 +72,10 @@
        + "\"");
    this.replicationServerDomain = replicationServerDomain;
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  /**
   * Analyzes if servers are late or not, and change their status accordingly.
   */
@@ -86,8 +84,7 @@
  {
    if (logger.isTraceEnabled())
    {
      logger.trace(
          getMessage("Directory server status analyzer starting."));
      logger.trace(getMessage("Directory server status analyzer starting."));
    }
    while (!shutdown)
@@ -110,56 +107,20 @@
        }
      }
      // Go through each connected DS, get the number of pending changes we have
      // for it and change status accordingly if threshold value is
      // crossed/uncrossed
      for (DataServerHandler serverHandler :
        replicationServerDomain.getConnectedDSs().values())
      if (shutdown)
      {
        // Get number of pending changes for this server
        int nChanges = serverHandler.getRcvMsgQueueSize();
        if (logger.isTraceEnabled())
        {
          logger.trace(getMessage("Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue."));
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (degradedStatusThreshold > 0)
          // Threshold value = 0 means no status analyzer (no degrading system)
          // we should not have that as the status analyzer thread should not be
          // created if this is the case, but for sanity purpose, we add this
          // test
        {
          if (nChanges >= degradedStatusThreshold)
          {
            if (serverHandler.getStatus() == NORMAL_STATUS
                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
            {
              break;
            }
          }
          else
          {
            if (serverHandler.getStatus() == DEGRADED_STATUS
                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
            {
              break;
            }
          }
        }
        break;
      }
      replicationServerDomain.checkDSDegradedStatus();
    }
    done = true;
    logger.trace(getMessage("Status analyzer is terminated."));
  }
  private String getMessage(String message)
  {
    return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -167,18 +128,7 @@
        + message;
  }
  private boolean isInterrupted(DataServerHandler serverHandler,
      StatusMachineEvent event)
  {
    if (replicationServerDomain.changeStatus(serverHandler, event))
    {
      // Finish job and let thread die
      logger.trace(
          getMessage("Status analyzer has been interrupted and will die."));
      return true;
    }
    return false;
  }
  /**
   * Stops the thread.
@@ -197,9 +147,11 @@
    }
  }
  /**
   * Waits for analyzer death. If not finished within 2 seconds,
   * forces interruption
   * Waits for analyzer death. If not finished within 2 seconds, forces
   * interruption
   */
  public void waitForShutdown()
  {
@@ -217,25 +169,10 @@
          interrupt();
        }
      }
    } catch (InterruptedException e)
    }
    catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
    }
  }
  /**
   * Sets the threshold value.
   * @param degradedStatusThreshold The new threshold value.
   */
  public void setDegradedStatusThreshold(int degradedStatusThreshold)
  {
    if (logger.isTraceEnabled())
    {
      logger.trace(getMessage(
          "Directory server status analyzer changing threshold value to "
              + degradedStatusThreshold));
    }
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
}