mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
18.51.2010 0a0ed252bb022406397ce9a87af99d9ac32c9bb7
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -28,13 +28,20 @@
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationChangeListener;
@@ -44,19 +51,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.monitors.TraditionalWorkQueueMonitor;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.CancelRequest;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Operation;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.types.*;
@@ -64,28 +59,23 @@
 * This class defines a data structure for storing and interacting with the
 * Directory Server work queue.
 */
public class TraditionalWorkQueue
       extends WorkQueue<TraditionalWorkQueueCfg>
       implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
public class TraditionalWorkQueue extends WorkQueue<TraditionalWorkQueueCfg>
    implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * The maximum number of times to retry getting the next operation from the
   * queue if an unexpected failure occurs.
   */
  private static final int MAX_RETRY_COUNT = 5;
  // The set of worker threads that will be used to process this work queue.
  private ArrayList<TraditionalWorkerThread> workerThreads;
  private final ArrayList<TraditionalWorkerThread> workerThreads =
    new ArrayList<TraditionalWorkerThread>();
  // The number of operations that have been submitted to the work queue for
  // processing.
@@ -100,7 +90,7 @@
  private boolean killThreads;
  // Indicates whether the Directory Server is shutting down.
  private volatile boolean shutdownRequested;
  private boolean shutdownRequested;
  // The thread number used for the last worker thread that was created.
  private int lastThreadNumber;
@@ -113,16 +103,35 @@
  // a configuration change has not been completely applied).
  private int numWorkerThreads;
  // The queue overflow policy: true indicates that operations will be blocked
  // until the queue has available capacity, otherwise operations will be
  // rejected.
  //
  // This is hard-coded to true for now because a reject on full policy does
  // not seem to have a valid use case.
  //
  private final boolean isBlocking = true;
  // The queue that will be used to actually hold the pending operations.
  private LinkedBlockingQueue<AbstractOperation> opQueue;
  // The lock used to provide threadsafe access for the queue.
  private Object queueLock;
  // The locks used to provide threadsafe access for the queue.
  // Used for non-config changes.
  private final ReadLock queueReadLock;
  // Used for config changes.
  private final WriteLock queueWriteLock;
  {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    queueReadLock = lock.readLock();
    queueWriteLock = lock.writeLock();
  }
  /**
   * Creates a new instance of this work queue.  All initialization should be
   * Creates a new instance of this work queue. All initialization should be
   * performed in the <CODE>initializeWorkQueue</CODE> method.
   */
  public TraditionalWorkQueue()
@@ -137,66 +146,70 @@
   */
  @Override()
  public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
         throws ConfigException, InitializationException
      throws ConfigException, InitializationException
  {
    shutdownRequested = false;
    killThreads       = false;
    opsSubmitted      = new AtomicLong(0);
    queueFullRejects  = new AtomicLong(0);
    queueLock         = new Object();
    // Register to be notified of any configuration changes.
    configuration.addTraditionalChangeListener(this);
    // Get the necessary configuration from the provided entry.
    numWorkerThreads = getNumWorkerThreads(configuration);
    maxCapacity      = configuration.getMaxWorkQueueCapacity();
    // Create the actual work queue.
    if (maxCapacity > 0)
    {
      opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
    }
    else
    {
      opQueue = new LinkedBlockingQueue<AbstractOperation>();
    }
    // Create the set of worker threads that should be used to service the
    // work queue.
    workerThreads = new ArrayList<TraditionalWorkerThread>(numWorkerThreads);
    for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
    lastThreadNumber++)
    {
      TraditionalWorkerThread t =
           new TraditionalWorkerThread(this, lastThreadNumber);
      t.start();
      workerThreads.add(t);
    }
    // Create and register a monitor provider for the work queue.
    queueWriteLock.lock();
    try
    {
      TraditionalWorkQueueMonitor monitor =
           new TraditionalWorkQueueMonitor(this);
      monitor.initializeMonitorProvider(null);
      DirectoryServer.registerMonitorProvider(monitor);
    }
    catch (Exception e)
    {
      if (debugEnabled())
      shutdownRequested = false;
      killThreads = false;
      opsSubmitted = new AtomicLong(0);
      queueFullRejects = new AtomicLong(0);
      // Register to be notified of any configuration changes.
      configuration.addTraditionalChangeListener(this);
      // Get the necessary configuration from the provided entry.
      numWorkerThreads = getNumWorkerThreads(configuration);
      maxCapacity = configuration.getMaxWorkQueueCapacity();
      // Create the actual work queue.
      if (maxCapacity > 0)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
      }
      else
      {
        // This will never be the case, since the configuration definition
        // ensures that the capacity is always finite.
        opQueue = new LinkedBlockingQueue<AbstractOperation>();
      }
      Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
          String.valueOf(TraditionalWorkQueueMonitor.class), String.valueOf(e));
      logError(message);
      // Create the set of worker threads that should be used to service the
      // work queue.
      for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
        lastThreadNumber++)
      {
        TraditionalWorkerThread t = new TraditionalWorkerThread(this,
            lastThreadNumber);
        t.start();
        workerThreads.add(t);
      }
      // Create and register a monitor provider for the work queue.
      try
      {
        TraditionalWorkQueueMonitor monitor = new TraditionalWorkQueueMonitor(
            this);
        monitor.initializeMonitorProvider(null);
        DirectoryServer.registerMonitorProvider(monitor);
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
            String.valueOf(TraditionalWorkQueueMonitor.class),
            String.valueOf(e));
        logError(message);
      }
    }
    finally
    {
      queueWriteLock.unlock();
    }
  }
@@ -208,8 +221,17 @@
  @Override()
  public void finalizeWorkQueue(Message reason)
  {
    shutdownRequested = true;
    queueWriteLock.lock();
    try
    {
      shutdownRequested = true;
    }
    finally
    {
      queueWriteLock.unlock();
    }
    // From now on no more operations can be enqueued or dequeued.
    // Send responses to any operations in the pending queue to indicate that
    // they won't be processed because the server is shutting down.
@@ -222,7 +244,8 @@
      {
        // The operation has no chance of responding to the cancel
        // request so avoid waiting for a cancel response.
        if (o.getCancelResult() == null) {
        if (o.getCancelResult() == null)
        {
          o.abort(cancelRequest);
        }
      }
@@ -233,12 +256,11 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
            String.valueOf(o), String.valueOf(e)));
        logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(String.valueOf(o),
            String.valueOf(e)));
      }
    }
    // Notify all the worker threads of the shutdown.
    for (TraditionalWorkerThread t : workerThreads)
    {
@@ -253,8 +275,8 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
            t.getName(), String.valueOf(e)));
        logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(t.getName(),
            String.valueOf(e)));
      }
    }
  }
@@ -264,12 +286,20 @@
  /**
   * Indicates whether this work queue has received a request to shut down.
   *
   * @return  <CODE>true</CODE> if the work queue has recieved a request to shut
   *          down, or <CODE>false</CODE> if not.
   * @return <CODE>true</CODE> if the work queue has recieved a request to shut
   *         down, or <CODE>false</CODE> if not.
   */
  public boolean shutdownRequested()
  {
    return shutdownRequested;
    queueReadLock.lock();
    try
    {
      return shutdownRequested;
    }
    finally
    {
      queueReadLock.unlock();
    }
  }
@@ -278,46 +308,93 @@
   * Submits an operation to be processed by one of the worker threads
   * associated with this work queue.
   *
   * @param  operation  The operation to be processed.
   *
   * @throws  DirectoryException  If the provided operation is not accepted for
   *                              some reason (e.g., if the server is shutting
   *                              down or the pending operation queue is already
   *                              at its maximum capacity).
   * @param operation
   *          The operation to be processed.
   * @throws DirectoryException
   *           If the provided operation is not accepted for some reason (e.g.,
   *           if the server is shutting down or the pending operation queue is
   *           already at its maximum capacity).
   */
  @Override
  public void submitOperation(AbstractOperation operation)
         throws DirectoryException
      throws DirectoryException
  {
    if (shutdownRequested)
    queueReadLock.lock();
    try
    {
      Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
    }
      if (shutdownRequested)
      {
        Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
        throw new DirectoryException(ResultCode.UNAVAILABLE, message);
      }
    if (! opQueue.offer(operation))
      if (isBlocking)
      {
        try
        {
          // If the queue is full and there is an administrative change taking
          // place then starvation could arise: this thread will hold the read
          // lock, the admin thread will be waiting on the write lock, and the
          // worker threads may be queued behind the admin thread. Since the
          // worker threads cannot run, the queue will never empty and allow
          // this thread to proceed. To help things out we can periodically
          // yield the read lock when the queue is full.
          while (!opQueue.offer(operation, 1, TimeUnit.SECONDS))
          {
            queueReadLock.unlock();
            Thread.yield();
            queueReadLock.lock();
            if (shutdownRequested)
            {
              Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
              throw new DirectoryException(ResultCode.UNAVAILABLE, message);
            }
          }
        }
        catch (InterruptedException e)
        {
          // We cannot handle the interruption here. Reject the request and
          // re-interrupt this thread.
          Thread.currentThread().interrupt();
          queueFullRejects.incrementAndGet();
          Message message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
          throw new DirectoryException(ResultCode.BUSY, message);
        }
      }
      else
      {
        if (!opQueue.offer(operation))
        {
          queueFullRejects.incrementAndGet();
          Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
          throw new DirectoryException(ResultCode.BUSY, message);
        }
      }
      opsSubmitted.incrementAndGet();
    }
    finally
    {
      queueFullRejects.incrementAndGet();
      Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
      queueReadLock.unlock();
    }
    opsSubmitted.incrementAndGet();
  }
  /**
   * Retrieves the next operation that should be processed by one of the worker
   * threads, blocking if necessary until a new request arrives.  This method
   * threads, blocking if necessary until a new request arrives. This method
   * should only be called by a worker thread associated with this work queue.
   *
   * @param  workerThread  The worker thread that is requesting the operation.
   *
   * @return  The next operation that should be processed, or <CODE>null</CODE>
   *          if the server is shutting down and no more operations will be
   *          processed.
   * @param workerThread
   *          The worker thread that is requesting the operation.
   * @return The next operation that should be processed, or <CODE>null</CODE>
   *         if the server is shutting down and no more operations will be
   *         processed.
   */
  public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
  {
@@ -328,131 +405,81 @@
  /**
   * Retrieves the next operation that should be processed by one of the worker
   * threads following a previous failure attempt.  A maximum of five
   * consecutive failures will be allowed before returning <CODE>null</CODE>,
   * which will cause the associated thread to exit.
   * threads following a previous failure attempt. A maximum of five consecutive
   * failures will be allowed before returning <CODE>null</CODE>, which will
   * cause the associated thread to exit.
   *
   * @param  workerThread  The worker thread that is requesting the operation.
   * @param  numFailures   The number of consecutive failures that the worker
   *                       thread has experienced so far.  If this gets too
   *                       high, then this method will return <CODE>null</CODE>
   *                       rather than retrying.
   *
   * @return  The next operation that should be processed, or <CODE>null</CODE>
   *          if the server is shutting down and no more operations will be
   *          processed, or if there have been too many consecutive failures.
   * @param workerThread
   *          The worker thread that is requesting the operation.
   * @param numFailures
   *          The number of consecutive failures that the worker thread has
   *          experienced so far. If this gets too high, then this method will
   *          return <CODE>null</CODE> rather than retrying.
   * @return The next operation that should be processed, or <CODE>null</CODE>
   *         if the server is shutting down and no more operations will be
   *         processed, or if there have been too many consecutive failures.
   */
  private AbstractOperation retryNextOperation(
                                       TraditionalWorkerThread workerThread,
                                       int numFailures)
      TraditionalWorkerThread workerThread, int numFailures)
  {
    // See if we should kill off this thread.  This could be necessary if the
    // number of worker threads has been decreased with the server online.  If
    // See if we should kill off this thread. This could be necessary if the
    // number of worker threads has been decreased with the server online. If
    // so, then return null and the thread will exit.
    if (killThreads)
    {
      synchronized (queueLock)
      {
        try
        {
          int currentThreads = workerThreads.size();
          if (currentThreads > numWorkerThreads)
          {
            if (workerThreads.remove(Thread.currentThread()))
            {
              currentThreads--;
            }
            if (currentThreads <= numWorkerThreads)
            {
              killThreads = false;
            }
            workerThread.setStoppedByReducedThreadNumber();
            return null;
          }
        }
        catch (Exception e)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
    }
    if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
    {
      if (numFailures > MAX_RETRY_COUNT)
      {
        Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
            Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
        logError(message);
      }
      return null;
    }
    queueReadLock.lock();
    try
    {
      if (shutdownRequested)
      {
        return null;
      }
      if (killThreads && tryKillThisWorkerThread(workerThread))
      {
        return null;
      }
      if (numFailures > MAX_RETRY_COUNT)
      {
        Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(Thread
            .currentThread().getName(), numFailures, MAX_RETRY_COUNT);
        logError(message);
        return null;
      }
      while (true)
      {
        AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
        if (nextOperation == null)
        {
          // There was no work to do in the specified length of time.  See if
          // we should shutdown, and if not then just check again.
          if (shutdownRequested)
          {
            return null;
          }
          else if (killThreads)
          {
            synchronized (queueLock)
            {
              try
              {
                int currentThreads = workerThreads.size();
                if (currentThreads > numWorkerThreads)
                {
                  if (workerThreads.remove(Thread.currentThread()))
                  {
                    currentThreads--;
                  }
                  if (currentThreads <= numWorkerThreads)
                  {
                    killThreads = false;
                  }
                  workerThread.setStoppedByReducedThreadNumber();
                  return null;
                }
              }
              catch (Exception e)
              {
                if (debugEnabled())
                {
                  TRACER.debugCaught(DebugLogLevel.ERROR, e);
                }
              }
            }
          }
        }
        else
        if (nextOperation != null)
        {
          return nextOperation;
        }
        // There was no work to do in the specified length of time. Release the
        // read lock allowing shutdown or config changes to proceed and then see
        // if we should give up or check again.
        queueReadLock.unlock();
        Thread.yield();
        queueReadLock.lock();
        if (shutdownRequested)
        {
          return null;
        }
        if (killThreads && tryKillThisWorkerThread(workerThread))
        {
          return null;
        }
      }
    }
    catch (InterruptedException ie)
    {
      // This is somewhat expected so don't log.
      //      assert debugException(CLASS_NAME, "retryNextOperation", ie);
      // assert debugException(CLASS_NAME, "retryNextOperation", ie);
      // If this occurs, then the worker thread must have been interrupted for
      // some reason.  This could be because the Directory Server is shutting
      // some reason. This could be because the Directory Server is shutting
      // down, in which case we should return null.
      if (shutdownRequested)
      {
@@ -460,10 +487,9 @@
      }
      // If we've gotten here, then the worker thread was interrupted for some
      // other reason.  This should not happen, and we need to log a message.
      logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(
          Thread.currentThread().getName(), String.valueOf(ie)));
      return retryNextOperation(workerThread, numFailures+1);
      // other reason. This should not happen, and we need to log a message.
      logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(Thread
          .currentThread().getName(), String.valueOf(ie)));
    }
    catch (Exception e)
    {
@@ -472,40 +498,86 @@
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      // This should not happen.  The only recourse we have is to log a message
      // This should not happen. The only recourse we have is to log a message
      // and try again.
      logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
          Thread.currentThread().getName(), String.valueOf(e)));
      return retryNextOperation(workerThread, numFailures + 1);
      logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(Thread
          .currentThread().getName(), String.valueOf(e)));
    }
    finally
    {
      queueReadLock.unlock();
    }
    // An exception has occurred - retry.
    return retryNextOperation(workerThread, numFailures + 1);
  }
  /**
   * Attempts to remove the specified operation from this queue if it has not
   * yet been picked up for processing by one of the worker threads.
   * Kills this worker thread if needed. This method assumes that the read lock
   * is already taken and ensure that it is taken on exit.
   *
   * @param  operation  The operation to remove from the queue.
   *
   * @return  <CODE>true</CODE> if the provided request was present in the queue
   *          and was removed successfully, or <CODE>false</CODE> it not.
   * @param workerThread
   *          The worker thread associated with this thread.
   * @return {@code true} if this thread was killed or is about to be killed as
   *         a result of shutdown.
   */
  public boolean removeOperation(AbstractOperation operation)
  private boolean tryKillThisWorkerThread(TraditionalWorkerThread workerThread)
  {
    return opQueue.remove(operation);
    queueReadLock.unlock();
    queueWriteLock.lock();
    try
    {
      if (shutdownRequested)
      {
        // Shutdown may have been requested between unlock/lock. This thread is
        // about to shutdown anyway, so return true.
        return true;
      }
      int currentThreads = workerThreads.size();
      if (currentThreads > numWorkerThreads)
      {
        if (workerThreads.remove(Thread.currentThread()))
        {
          currentThreads--;
        }
        if (currentThreads <= numWorkerThreads)
        {
          killThreads = false;
        }
        workerThread.setStoppedByReducedThreadNumber();
        return true;
      }
    }
    finally
    {
      queueWriteLock.unlock();
      queueReadLock.lock();
      if (shutdownRequested)
      {
        // Shutdown may have been requested between unlock/lock. This thread is
        // about to shutdown anyway, so return true.
        return true;
      }
    }
    return false;
  }
  /**
   * Retrieves the total number of operations that have been successfully
   * submitted to this work queue for processing since server startup.  This
   * does not include operations that have been rejected for some reason like
   * the queue already at its maximum capacity.
   * submitted to this work queue for processing since server startup. This does
   * not include operations that have been rejected for some reason like the
   * queue already at its maximum capacity.
   *
   * @return  The total number of operations that have been successfully
   *          submitted to this work queue since startup.
   * @return The total number of operations that have been successfully
   *         submitted to this work queue since startup.
   */
  public long getOpsSubmitted()
  {
@@ -518,8 +590,8 @@
   * Retrieves the total number of operations that have been rejected because
   * the work queue was already at its maximum capacity.
   *
   * @return  The total number of operations that have been rejected because the
   *          work queue was already at its maximum capacity.
   * @return The total number of operations that have been rejected because the
   *         work queue was already at its maximum capacity.
   */
  public long getOpsRejectedDueToQueueFull()
  {
@@ -530,16 +602,24 @@
  /**
   * Retrieves the number of pending operations in the queue that have not yet
   * been picked up for processing.  Note that this method is not a
   * constant-time operation and can be relatively inefficient, so it should be
   * used sparingly.
   * been picked up for processing. Note that this method is not a constant-time
   * operation and can be relatively inefficient, so it should be used
   * sparingly.
   *
   * @return  The number of pending operations in the queue that have not yet
   *          been picked up for processing.
   * @return The number of pending operations in the queue that have not yet
   *         been picked up for processing.
   */
  public int size()
  {
    return opQueue.size();
    queueReadLock.lock();
    try
    {
      return opQueue.size();
    }
    finally
    {
      queueReadLock.unlock();
    }
  }
@@ -548,8 +628,7 @@
   * {@inheritDoc}
   */
  public boolean isConfigurationChangeAcceptable(
                      TraditionalWorkQueueCfg configuration,
                      List<Message> unacceptableReasons)
      TraditionalWorkQueueCfg configuration, List<Message> unacceptableReasons)
  {
    return true;
  }
@@ -560,122 +639,126 @@
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationChange(
                                 TraditionalWorkQueueCfg configuration)
      TraditionalWorkQueueCfg configuration)
  {
    ArrayList<Message> resultMessages = new ArrayList<Message>();
    int newNumThreads  = getNumWorkerThreads(configuration);
    int newNumThreads = getNumWorkerThreads(configuration);
    int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
    // Apply a change to the number of worker threads if appropriate.
    int currentThreads = workerThreads.size();
    if (newNumThreads != currentThreads)
    {
      synchronized (queueLock)
      queueWriteLock.lock();
      try
      {
        try
        int threadsToAdd = newNumThreads - currentThreads;
        if (threadsToAdd > 0)
        {
          int threadsToAdd = newNumThreads - currentThreads;
          if (threadsToAdd > 0)
          for (int i = 0; i < threadsToAdd; i++)
          {
            for (int i=0; i < threadsToAdd; i++)
            {
              TraditionalWorkerThread t =
                   new TraditionalWorkerThread(this, lastThreadNumber++);
              workerThreads.add(t);
              t.start();
            }
            killThreads = false;
          }
          else
          {
            killThreads = true;
            TraditionalWorkerThread t = new TraditionalWorkerThread(this,
                lastThreadNumber++);
            workerThreads.add(t);
            t.start();
          }
          numWorkerThreads = newNumThreads;
          killThreads = false;
        }
        catch (Exception e)
        else
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
          killThreads = true;
        }
        numWorkerThreads = newNumThreads;
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
      finally
      {
        queueWriteLock.unlock();
      }
    }
    // Apply a change to the maximum capacity if appropriate.  Since we can't
    // Apply a change to the maximum capacity if appropriate. Since we can't
    // change capacity on the fly, then we'll have to create a new queue and
    // transfer any remaining items into it.  Any thread that is waiting on the
    // transfer any remaining items into it. Any thread that is waiting on the
    // original queue will time out after at most a few seconds and further
    // checks will be against the new queue.
    if (newMaxCapacity != maxCapacity)
    {
      synchronized (queueLock)
      // First switch the queue with the exclusive lock.
      queueWriteLock.lock();
      LinkedBlockingQueue<AbstractOperation> oldOpQueue;
      try
      {
        try
        LinkedBlockingQueue<AbstractOperation> newOpQueue = null;
        if (newMaxCapacity > 0)
        {
          LinkedBlockingQueue<AbstractOperation> newOpQueue;
          if (newMaxCapacity > 0)
          {
            newOpQueue =
              new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
          }
          else
          {
            newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
          }
          LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
          opQueue = newOpQueue;
          LinkedList<AbstractOperation> pendingOps =
            new LinkedList<AbstractOperation>();
          oldOpQueue.drainTo(pendingOps);
          // We have to be careful when adding any existing pending operations
          // because the new capacity could be less than what was already
          // backlogged in the previous queue.  If that happens, we may have to
          // loop a few times to get everything in there.
          while (! pendingOps.isEmpty())
          {
            Iterator<AbstractOperation> iterator = pendingOps.iterator();
            while (iterator.hasNext())
            {
              AbstractOperation o = iterator.next();
              try
              {
                if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
                {
                  iterator.remove();
                }
              }
              catch (InterruptedException ie)
              {
                if (debugEnabled())
                {
                  TRACER.debugCaught(DebugLogLevel.ERROR, ie);
                }
              }
            }
          }
          maxCapacity = newMaxCapacity;
          newOpQueue = new LinkedBlockingQueue<AbstractOperation>(
              newMaxCapacity);
        }
        catch (Exception e)
        else
        {
          if (debugEnabled())
          newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
        }
        oldOpQueue = opQueue;
        opQueue = newOpQueue;
        maxCapacity = newMaxCapacity;
      }
      finally
      {
        queueWriteLock.unlock();
      }
      // Now resubmit any pending requests - we'll need the shared lock.
      AbstractOperation pendingOperation = null;
      queueReadLock.lock();
      try
      {
        // We have to be careful when adding any existing pending operations
        // because the new capacity could be less than what was already
        // backlogged in the previous queue. If that happens, we may have to
        // loop a few times to get everything in there.
        while ((pendingOperation = oldOpQueue.poll()) != null)
        {
          opQueue.put(pendingOperation);
        }
      }
      catch (InterruptedException e)
      {
        // We cannot handle the interruption here. Cancel pending requests and
        // re-interrupt this thread.
        Thread.currentThread().interrupt();
        Message message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
        CancelRequest cancelRequest = new CancelRequest(true, message);
        if (pendingOperation != null)
        {
          pendingOperation.abort(cancelRequest);
        }
        while ((pendingOperation = oldOpQueue.poll()) != null)
        {
          if (pendingOperation != null)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            pendingOperation.abort(cancelRequest);
          }
        }
      }
      finally
      {
        queueReadLock.unlock();
      }
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
  }
@@ -687,13 +770,14 @@
  @Override()
  public boolean isIdle()
  {
    if (opQueue.size() > 0)
    queueReadLock.lock();
    try
    {
      return false;
    }
      if (opQueue.size() > 0)
      {
        return false;
      }
    synchronized (queueLock)
    {
      for (TraditionalWorkerThread t : workerThreads)
      {
        if (t.isActive())
@@ -704,6 +788,10 @@
      return true;
    }
    finally
    {
      queueReadLock.unlock();
    }
  }
@@ -728,4 +816,3 @@
    }
  }
}