mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.49.2013 582344d280d24dfec999b862d8255eb077995b99
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.extensions;
@@ -30,9 +31,8 @@
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.util.ArrayList;
import java.util.List;
@@ -73,54 +73,70 @@
   */
  private static final int MAX_RETRY_COUNT = 5;
  // The set of worker threads that will be used to process this work queue.
  /** The set of worker threads that will be used to process this work queue. */
  private final ArrayList<TraditionalWorkerThread> workerThreads =
    new ArrayList<TraditionalWorkerThread>();
  // The number of operations that have been submitted to the work queue for
  // processing.
  /**
   * The number of operations that have been submitted to the work queue for
   * processing.
   */
  private AtomicLong opsSubmitted;
  // The number of times that an attempt to submit a new request has been
  // rejected because the work queue is already at its maximum capacity.
  /**
   * The number of times that an attempt to submit a new request has been
   * rejected because the work queue is already at its maximum capacity.
   */
  private AtomicLong queueFullRejects;
  // Indicates whether one or more of the worker threads needs to be killed at
  // the next convenient opportunity.
  /**
   * Indicates whether one or more of the worker threads needs to be killed at
   * the next convenient opportunity.
   */
  private boolean killThreads;
  // Indicates whether the Directory Server is shutting down.
  /** Indicates whether the Directory Server is shutting down. */
  private boolean shutdownRequested;
  // The thread number used for the last worker thread that was created.
  /** The thread number used for the last worker thread that was created. */
  private int lastThreadNumber;
  // The maximum number of pending requests that this work queue will allow
  // before it will start rejecting them.
  /**
   * The maximum number of pending requests that this work queue will allow
   * before it will start rejecting them.
   */
  private int maxCapacity;
  // The number of worker threads that should be active (or will be shortly if
  // a configuration change has not been completely applied).
  /**
   * The number of worker threads that should be active (or will be shortly if a
   * configuration change has not been completely applied).
   */
  private int numWorkerThreads;
  // The queue overflow policy: true indicates that operations will be blocked
  // until the queue has available capacity, otherwise operations will be
  // rejected.
  //
  // This is hard-coded to true for now because a reject on full policy does
  // not seem to have a valid use case.
  //
  /**
   * The queue overflow policy: true indicates that operations will be blocked
   * until the queue has available capacity, otherwise operations will be
   * rejected.
   * <p>
   * This is hard-coded to true for now because a reject on full policy does not
   * seem to have a valid use case.
   * </p>
   */
  private final boolean isBlocking = true;
  // The queue that will be used to actually hold the pending operations.
  private LinkedBlockingQueue<AbstractOperation> opQueue;
  /** The queue that will be used to actually hold the pending operations. */
  private LinkedBlockingQueue<Operation> opQueue;
  // The locks used to provide threadsafe access for the queue.
  // Used for non-config changes.
  /**
   * The lock used to provide threadsafe access for the queue, used for
   * non-config changes.
   */
  private final ReadLock queueReadLock;
  // Used for config changes.
  /**
   * The lock used to provide threadsafe access for the queue, used for config
   * changes.
   */
  private final WriteLock queueWriteLock;
  {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -166,13 +182,13 @@
      // Create the actual work queue.
      if (maxCapacity > 0)
      {
        opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
        opQueue = new LinkedBlockingQueue<Operation>(maxCapacity);
      }
      else
      {
        // This will never be the case, since the configuration definition
        // ensures that the capacity is always finite.
        opQueue = new LinkedBlockingQueue<AbstractOperation>();
        opQueue = new LinkedBlockingQueue<Operation>();
      }
      // Create the set of worker threads that should be used to service the
@@ -316,8 +332,7 @@
   *           already at its maximum capacity).
   */
  @Override
  public void submitOperation(AbstractOperation operation)
      throws DirectoryException
  public void submitOperation(Operation operation) throws DirectoryException
  {
    queueReadLock.lock();
    try
@@ -396,7 +411,7 @@
   *         if the server is shutting down and no more operations will be
   *         processed.
   */
  public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
  public Operation nextOperation(TraditionalWorkerThread workerThread)
  {
    return retryNextOperation(workerThread, 0);
  }
@@ -419,8 +434,8 @@
   *         if the server is shutting down and no more operations will be
   *         processed, or if there have been too many consecutive failures.
   */
  private AbstractOperation retryNextOperation(
      TraditionalWorkerThread workerThread, int numFailures)
  private Operation retryNextOperation(TraditionalWorkerThread workerThread,
      int numFailures)
  {
    // See if we should kill off this thread. This could be necessary if the
    // number of worker threads has been decreased with the server online. If
@@ -449,7 +464,7 @@
      while (true)
      {
        AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
        Operation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
        if (nextOperation != null)
        {
          return nextOperation;
@@ -627,6 +642,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isConfigurationChangeAcceptable(
      TraditionalWorkQueueCfg configuration, List<Message> unacceptableReasons)
  {
@@ -638,6 +654,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public ConfigChangeResult applyConfigurationChange(
      TraditionalWorkQueueCfg configuration)
  {
@@ -695,18 +712,18 @@
    {
      // First switch the queue with the exclusive lock.
      queueWriteLock.lock();
      LinkedBlockingQueue<AbstractOperation> oldOpQueue;
      LinkedBlockingQueue<Operation> oldOpQueue;
      try
      {
        LinkedBlockingQueue<AbstractOperation> newOpQueue = null;
        LinkedBlockingQueue<Operation> newOpQueue = null;
        if (newMaxCapacity > 0)
        {
          newOpQueue = new LinkedBlockingQueue<AbstractOperation>(
          newOpQueue = new LinkedBlockingQueue<Operation>(
              newMaxCapacity);
        }
        else
        {
          newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
          newOpQueue = new LinkedBlockingQueue<Operation>();
        }
        oldOpQueue = opQueue;
@@ -720,7 +737,7 @@
      }
      // Now resubmit any pending requests - we'll need the shared lock.
      AbstractOperation pendingOperation = null;
      Operation pendingOperation = null;
      queueReadLock.lock();
      try
      {
@@ -747,10 +764,7 @@
        }
        while ((pendingOperation = oldOpQueue.poll()) != null)
        {
          if (pendingOperation != null)
          {
            pendingOperation.abort(cancelRequest);
          }
          pendingOperation.abort(cancelRequest);
        }
      }
      finally