mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
18.51.2010 0a0ed252bb022406397ce9a87af99d9ac32c9bb7
Fix potential OOME when under heavy asynchronous search load by making work queue capacity finite so that clients are blocked when the queue is full
7 files modified
871 ■■■■ changed files
opends/resource/config/config.ldif 2 ●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/TraditionalWorkQueueConfiguration.xml 21 ●●●●● patch | view | raw | blame | history
opends/src/admin/messages/TraditionalWorkQueueCfgDefn.properties 3 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/core.properties 3 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/config/ConfigConstants.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java 765 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java 60 ●●●● patch | view | raw | blame | history
opends/resource/config/config.ldif
@@ -2486,7 +2486,7 @@
objectClass: ds-cfg-traditional-work-queue
cn: Work Queue
ds-cfg-java-class: org.opends.server.extensions.TraditionalWorkQueue
ds-cfg-max-work-queue-capacity: 0
ds-cfg-max-work-queue-capacity: 1000
dn: cn=Administration Connector,cn=config
objectClass: top
opends/src/admin/defn/org/opends/server/admin/std/TraditionalWorkQueueConfiguration.xml
@@ -23,7 +23,7 @@
  ! CDDL HEADER END
  !
  !
  !      Copyright 2007-2009 Sun Microsystems, Inc.
  !      Copyright 2007-2010 Sun Microsystems, Inc.
  ! -->
<adm:managed-object name="traditional-work-queue"
  plural-name="traditional-work-queues" extends="work-queue"
@@ -95,23 +95,16 @@
    </adm:synopsis>
    <adm:description>
      If the work queue is already full and additional requests are
      received by the server, the requests are rejected.
      A value of zero indicates that there is no limit to the size
      of the queue.
      received by the server, then the server front end, and possibly the
      client, will be blocked until the work queue has available capacity.
    </adm:description>
    <adm:requires-admin-action>
      <adm:server-restart />
    </adm:requires-admin-action>
    <adm:default-behavior>
      <adm:alias>
        <adm:synopsis>
          The work queue does not impose any limit on the number of
          operations that can be enqueued at any one time.
        </adm:synopsis>
      </adm:alias>
      <adm:defined>
        <adm:value>1000</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="0" upper-limit="2147483647"/>
      <adm:integer lower-limit="1" upper-limit="2147483647"/>
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
opends/src/admin/messages/TraditionalWorkQueueCfgDefn.properties
@@ -4,8 +4,7 @@
description=The traditional work queue is a FIFO queue serviced by a fixed number of worker threads. This fixed number of threads can be changed on the fly, with the change taking effect as soon as it is made. You can limit the size of the work queue to a specified number of operations. When this many operations are in the queue, waiting to be picked up by threads, any new requests are rejected with an error message.
property.java-class.synopsis=Specifies the fully-qualified name of the Java class that provides the Traditional Work Queue implementation.
property.max-work-queue-capacity.synopsis=Specifies the maximum number of queued operations that can be in the work queue at any given time.
property.max-work-queue-capacity.description=If the work queue is already full and additional requests are received by the server, the requests are rejected. A value of zero indicates that there is no limit to the size of the queue.
property.max-work-queue-capacity.default-behavior.alias.synopsis=The work queue does not impose any limit on the number of operations that can be enqueued at any one time.
property.max-work-queue-capacity.description=If the work queue is already full and additional requests are received by the server, then the server front end, and possibly the client, will be blocked until the work queue has available capacity.
property.num-worker-threads.synopsis=Specifies the number of worker threads to be used for processing operations placed in the queue.
property.num-worker-threads.description=If the value is increased, the additional worker threads are created immediately. If the value is reduced, the appropriate number of threads are destroyed as operations complete processing.
property.num-worker-threads.default-behavior.alias.synopsis=Let the server decide.
opends/src/messages/messages/core.properties
@@ -1848,3 +1848,6 @@
the maximum number of operations per interval (must be positive)
MILD_ERR_SUBENTRY_WRITE_INSUFFICIENT_PRIVILEGES_739=This operation involves \
 LDAP subentries which you do not have sufficient privileges to administer
SEVERE_WARN_OP_REJECTED_BY_QUEUE_INTERRUPT_740=The request to process this \
 operation has been rejected because request handler thread was interrupted \
 while attempting to put the operation on the work queue
opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -1365,23 +1365,6 @@
  /**
   * The name of the configuration attribute that indicates the maximum number
   * of pending operations that may be in the work queue at any given time.
   */
  public static final String ATTR_MAX_WORK_QUEUE_CAPACITY =
       "ds-cfg-max-work-queue-capacity";
  /**
   * The default maximum capacity that should be used for the work queue if none
   * is specified in the configuration.
   */
  public static final int DEFAULT_MAX_WORK_QUEUE_CAPACITY = 0;
  /**
   * The name of the configuration attribute that holds the fully-qualified name
   * for the monitor provider class.
   */
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -28,13 +28,20 @@
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationChangeListener;
@@ -44,19 +51,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.monitors.TraditionalWorkQueueMonitor;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.CancelRequest;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Operation;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.types.*;
@@ -64,28 +59,23 @@
 * This class defines a data structure for storing and interacting with the
 * Directory Server work queue.
 */
public class TraditionalWorkQueue
       extends WorkQueue<TraditionalWorkQueueCfg>
       implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
public class TraditionalWorkQueue extends WorkQueue<TraditionalWorkQueueCfg>
    implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * The maximum number of times to retry getting the next operation from the
   * queue if an unexpected failure occurs.
   */
  private static final int MAX_RETRY_COUNT = 5;
  // The set of worker threads that will be used to process this work queue.
  private ArrayList<TraditionalWorkerThread> workerThreads;
  private final ArrayList<TraditionalWorkerThread> workerThreads =
    new ArrayList<TraditionalWorkerThread>();
  // The number of operations that have been submitted to the work queue for
  // processing.
@@ -100,7 +90,7 @@
  private boolean killThreads;
  // Indicates whether the Directory Server is shutting down.
  private volatile boolean shutdownRequested;
  private boolean shutdownRequested;
  // The thread number used for the last worker thread that was created.
  private int lastThreadNumber;
@@ -113,16 +103,35 @@
  // a configuration change has not been completely applied).
  private int numWorkerThreads;
  // The queue overflow policy: true indicates that operations will be blocked
  // until the queue has available capacity, otherwise operations will be
  // rejected.
  //
  // This is hard-coded to true for now because a reject on full policy does
  // not seem to have a valid use case.
  //
  private final boolean isBlocking = true;
  // The queue that will be used to actually hold the pending operations.
  private LinkedBlockingQueue<AbstractOperation> opQueue;
  // The lock used to provide threadsafe access for the queue.
  private Object queueLock;
  // The locks used to provide threadsafe access for the queue.
  // Used for non-config changes.
  private final ReadLock queueReadLock;
  // Used for config changes.
  private final WriteLock queueWriteLock;
  {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    queueReadLock = lock.readLock();
    queueWriteLock = lock.writeLock();
  }
  /**
   * Creates a new instance of this work queue.  All initialization should be
   * Creates a new instance of this work queue. All initialization should be
   * performed in the <CODE>initializeWorkQueue</CODE> method.
   */
  public TraditionalWorkQueue()
@@ -137,66 +146,70 @@
   */
  @Override()
  public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
         throws ConfigException, InitializationException
      throws ConfigException, InitializationException
  {
    shutdownRequested = false;
    killThreads       = false;
    opsSubmitted      = new AtomicLong(0);
    queueFullRejects  = new AtomicLong(0);
    queueLock         = new Object();
    // Register to be notified of any configuration changes.
    configuration.addTraditionalChangeListener(this);
    // Get the necessary configuration from the provided entry.
    numWorkerThreads = getNumWorkerThreads(configuration);
    maxCapacity      = configuration.getMaxWorkQueueCapacity();
    // Create the actual work queue.
    if (maxCapacity > 0)
    {
      opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
    }
    else
    {
      opQueue = new LinkedBlockingQueue<AbstractOperation>();
    }
    // Create the set of worker threads that should be used to service the
    // work queue.
    workerThreads = new ArrayList<TraditionalWorkerThread>(numWorkerThreads);
    for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
    lastThreadNumber++)
    {
      TraditionalWorkerThread t =
           new TraditionalWorkerThread(this, lastThreadNumber);
      t.start();
      workerThreads.add(t);
    }
    // Create and register a monitor provider for the work queue.
    queueWriteLock.lock();
    try
    {
      TraditionalWorkQueueMonitor monitor =
           new TraditionalWorkQueueMonitor(this);
      monitor.initializeMonitorProvider(null);
      DirectoryServer.registerMonitorProvider(monitor);
    }
    catch (Exception e)
    {
      if (debugEnabled())
      shutdownRequested = false;
      killThreads = false;
      opsSubmitted = new AtomicLong(0);
      queueFullRejects = new AtomicLong(0);
      // Register to be notified of any configuration changes.
      configuration.addTraditionalChangeListener(this);
      // Get the necessary configuration from the provided entry.
      numWorkerThreads = getNumWorkerThreads(configuration);
      maxCapacity = configuration.getMaxWorkQueueCapacity();
      // Create the actual work queue.
      if (maxCapacity > 0)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
      }
      else
      {
        // This will never be the case, since the configuration definition
        // ensures that the capacity is always finite.
        opQueue = new LinkedBlockingQueue<AbstractOperation>();
      }
      Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
          String.valueOf(TraditionalWorkQueueMonitor.class), String.valueOf(e));
      logError(message);
      // Create the set of worker threads that should be used to service the
      // work queue.
      for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
        lastThreadNumber++)
      {
        TraditionalWorkerThread t = new TraditionalWorkerThread(this,
            lastThreadNumber);
        t.start();
        workerThreads.add(t);
      }
      // Create and register a monitor provider for the work queue.
      try
      {
        TraditionalWorkQueueMonitor monitor = new TraditionalWorkQueueMonitor(
            this);
        monitor.initializeMonitorProvider(null);
        DirectoryServer.registerMonitorProvider(monitor);
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
            String.valueOf(TraditionalWorkQueueMonitor.class),
            String.valueOf(e));
        logError(message);
      }
    }
    finally
    {
      queueWriteLock.unlock();
    }
  }
@@ -208,8 +221,17 @@
  @Override()
  public void finalizeWorkQueue(Message reason)
  {
    shutdownRequested = true;
    queueWriteLock.lock();
    try
    {
      shutdownRequested = true;
    }
    finally
    {
      queueWriteLock.unlock();
    }
    // From now on no more operations can be enqueued or dequeued.
    // Send responses to any operations in the pending queue to indicate that
    // they won't be processed because the server is shutting down.
@@ -222,7 +244,8 @@
      {
        // The operation has no chance of responding to the cancel
        // request so avoid waiting for a cancel response.
        if (o.getCancelResult() == null) {
        if (o.getCancelResult() == null)
        {
          o.abort(cancelRequest);
        }
      }
@@ -233,12 +256,11 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
            String.valueOf(o), String.valueOf(e)));
        logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(String.valueOf(o),
            String.valueOf(e)));
      }
    }
    // Notify all the worker threads of the shutdown.
    for (TraditionalWorkerThread t : workerThreads)
    {
@@ -253,8 +275,8 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
            t.getName(), String.valueOf(e)));
        logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(t.getName(),
            String.valueOf(e)));
      }
    }
  }
@@ -264,12 +286,20 @@
  /**
   * Indicates whether this work queue has received a request to shut down.
   *
   * @return  <CODE>true</CODE> if the work queue has recieved a request to shut
   *          down, or <CODE>false</CODE> if not.
   * @return <CODE>true</CODE> if the work queue has recieved a request to shut
   *         down, or <CODE>false</CODE> if not.
   */
  public boolean shutdownRequested()
  {
    return shutdownRequested;
    queueReadLock.lock();
    try
    {
      return shutdownRequested;
    }
    finally
    {
      queueReadLock.unlock();
    }
  }
@@ -278,46 +308,93 @@
   * Submits an operation to be processed by one of the worker threads
   * associated with this work queue.
   *
   * @param  operation  The operation to be processed.
   *
   * @throws  DirectoryException  If the provided operation is not accepted for
   *                              some reason (e.g., if the server is shutting
   *                              down or the pending operation queue is already
   *                              at its maximum capacity).
   * @param operation
   *          The operation to be processed.
   * @throws DirectoryException
   *           If the provided operation is not accepted for some reason (e.g.,
   *           if the server is shutting down or the pending operation queue is
   *           already at its maximum capacity).
   */
  @Override
  public void submitOperation(AbstractOperation operation)
         throws DirectoryException
      throws DirectoryException
  {
    if (shutdownRequested)
    queueReadLock.lock();
    try
    {
      Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
    }
      if (shutdownRequested)
      {
        Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
        throw new DirectoryException(ResultCode.UNAVAILABLE, message);
      }
    if (! opQueue.offer(operation))
      if (isBlocking)
      {
        try
        {
          // If the queue is full and there is an administrative change taking
          // place then starvation could arise: this thread will hold the read
          // lock, the admin thread will be waiting on the write lock, and the
          // worker threads may be queued behind the admin thread. Since the
          // worker threads cannot run, the queue will never empty and allow
          // this thread to proceed. To help things out we can periodically
          // yield the read lock when the queue is full.
          while (!opQueue.offer(operation, 1, TimeUnit.SECONDS))
          {
            queueReadLock.unlock();
            Thread.yield();
            queueReadLock.lock();
            if (shutdownRequested)
            {
              Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
              throw new DirectoryException(ResultCode.UNAVAILABLE, message);
            }
          }
        }
        catch (InterruptedException e)
        {
          // We cannot handle the interruption here. Reject the request and
          // re-interrupt this thread.
          Thread.currentThread().interrupt();
          queueFullRejects.incrementAndGet();
          Message message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
          throw new DirectoryException(ResultCode.BUSY, message);
        }
      }
      else
      {
        if (!opQueue.offer(operation))
        {
          queueFullRejects.incrementAndGet();
          Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
          throw new DirectoryException(ResultCode.BUSY, message);
        }
      }
      opsSubmitted.incrementAndGet();
    }
    finally
    {
      queueFullRejects.incrementAndGet();
      Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
      queueReadLock.unlock();
    }
    opsSubmitted.incrementAndGet();
  }
  /**
   * Retrieves the next operation that should be processed by one of the worker
   * threads, blocking if necessary until a new request arrives.  This method
   * threads, blocking if necessary until a new request arrives. This method
   * should only be called by a worker thread associated with this work queue.
   *
   * @param  workerThread  The worker thread that is requesting the operation.
   *
   * @return  The next operation that should be processed, or <CODE>null</CODE>
   *          if the server is shutting down and no more operations will be
   *          processed.
   * @param workerThread
   *          The worker thread that is requesting the operation.
   * @return The next operation that should be processed, or <CODE>null</CODE>
   *         if the server is shutting down and no more operations will be
   *         processed.
   */
  public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
  {
@@ -328,131 +405,81 @@
  /**
   * Retrieves the next operation that should be processed by one of the worker
   * threads following a previous failure attempt.  A maximum of five
   * consecutive failures will be allowed before returning <CODE>null</CODE>,
   * which will cause the associated thread to exit.
   * threads following a previous failure attempt. A maximum of five consecutive
   * failures will be allowed before returning <CODE>null</CODE>, which will
   * cause the associated thread to exit.
   *
   * @param  workerThread  The worker thread that is requesting the operation.
   * @param  numFailures   The number of consecutive failures that the worker
   *                       thread has experienced so far.  If this gets too
   *                       high, then this method will return <CODE>null</CODE>
   *                       rather than retrying.
   *
   * @return  The next operation that should be processed, or <CODE>null</CODE>
   *          if the server is shutting down and no more operations will be
   *          processed, or if there have been too many consecutive failures.
   * @param workerThread
   *          The worker thread that is requesting the operation.
   * @param numFailures
   *          The number of consecutive failures that the worker thread has
   *          experienced so far. If this gets too high, then this method will
   *          return <CODE>null</CODE> rather than retrying.
   * @return The next operation that should be processed, or <CODE>null</CODE>
   *         if the server is shutting down and no more operations will be
   *         processed, or if there have been too many consecutive failures.
   */
  private AbstractOperation retryNextOperation(
                                       TraditionalWorkerThread workerThread,
                                       int numFailures)
      TraditionalWorkerThread workerThread, int numFailures)
  {
    // See if we should kill off this thread.  This could be necessary if the
    // number of worker threads has been decreased with the server online.  If
    // See if we should kill off this thread. This could be necessary if the
    // number of worker threads has been decreased with the server online. If
    // so, then return null and the thread will exit.
    if (killThreads)
    {
      synchronized (queueLock)
      {
        try
        {
          int currentThreads = workerThreads.size();
          if (currentThreads > numWorkerThreads)
          {
            if (workerThreads.remove(Thread.currentThread()))
            {
              currentThreads--;
            }
            if (currentThreads <= numWorkerThreads)
            {
              killThreads = false;
            }
            workerThread.setStoppedByReducedThreadNumber();
            return null;
          }
        }
        catch (Exception e)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
    }
    if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
    {
      if (numFailures > MAX_RETRY_COUNT)
      {
        Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
            Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
        logError(message);
      }
      return null;
    }
    queueReadLock.lock();
    try
    {
      if (shutdownRequested)
      {
        return null;
      }
      if (killThreads && tryKillThisWorkerThread(workerThread))
      {
        return null;
      }
      if (numFailures > MAX_RETRY_COUNT)
      {
        Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(Thread
            .currentThread().getName(), numFailures, MAX_RETRY_COUNT);
        logError(message);
        return null;
      }
      while (true)
      {
        AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
        if (nextOperation == null)
        {
          // There was no work to do in the specified length of time.  See if
          // we should shutdown, and if not then just check again.
          if (shutdownRequested)
          {
            return null;
          }
          else if (killThreads)
          {
            synchronized (queueLock)
            {
              try
              {
                int currentThreads = workerThreads.size();
                if (currentThreads > numWorkerThreads)
                {
                  if (workerThreads.remove(Thread.currentThread()))
                  {
                    currentThreads--;
                  }
                  if (currentThreads <= numWorkerThreads)
                  {
                    killThreads = false;
                  }
                  workerThread.setStoppedByReducedThreadNumber();
                  return null;
                }
              }
              catch (Exception e)
              {
                if (debugEnabled())
                {
                  TRACER.debugCaught(DebugLogLevel.ERROR, e);
                }
              }
            }
          }
        }
        else
        if (nextOperation != null)
        {
          return nextOperation;
        }
        // There was no work to do in the specified length of time. Release the
        // read lock allowing shutdown or config changes to proceed and then see
        // if we should give up or check again.
        queueReadLock.unlock();
        Thread.yield();
        queueReadLock.lock();
        if (shutdownRequested)
        {
          return null;
        }
        if (killThreads && tryKillThisWorkerThread(workerThread))
        {
          return null;
        }
      }
    }
    catch (InterruptedException ie)
    {
      // This is somewhat expected so don't log.
      //      assert debugException(CLASS_NAME, "retryNextOperation", ie);
      // assert debugException(CLASS_NAME, "retryNextOperation", ie);
      // If this occurs, then the worker thread must have been interrupted for
      // some reason.  This could be because the Directory Server is shutting
      // some reason. This could be because the Directory Server is shutting
      // down, in which case we should return null.
      if (shutdownRequested)
      {
@@ -460,10 +487,9 @@
      }
      // If we've gotten here, then the worker thread was interrupted for some
      // other reason.  This should not happen, and we need to log a message.
      logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(
          Thread.currentThread().getName(), String.valueOf(ie)));
      return retryNextOperation(workerThread, numFailures+1);
      // other reason. This should not happen, and we need to log a message.
      logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(Thread
          .currentThread().getName(), String.valueOf(ie)));
    }
    catch (Exception e)
    {
@@ -472,40 +498,86 @@
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      // This should not happen.  The only recourse we have is to log a message
      // This should not happen. The only recourse we have is to log a message
      // and try again.
      logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
          Thread.currentThread().getName(), String.valueOf(e)));
      return retryNextOperation(workerThread, numFailures + 1);
      logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(Thread
          .currentThread().getName(), String.valueOf(e)));
    }
    finally
    {
      queueReadLock.unlock();
    }
    // An exception has occurred - retry.
    return retryNextOperation(workerThread, numFailures + 1);
  }
  /**
   * Attempts to remove the specified operation from this queue if it has not
   * yet been picked up for processing by one of the worker threads.
   * Kills this worker thread if needed. This method assumes that the read lock
   * is already taken and ensure that it is taken on exit.
   *
   * @param  operation  The operation to remove from the queue.
   *
   * @return  <CODE>true</CODE> if the provided request was present in the queue
   *          and was removed successfully, or <CODE>false</CODE> it not.
   * @param workerThread
   *          The worker thread associated with this thread.
   * @return {@code true} if this thread was killed or is about to be killed as
   *         a result of shutdown.
   */
  public boolean removeOperation(AbstractOperation operation)
  private boolean tryKillThisWorkerThread(TraditionalWorkerThread workerThread)
  {
    return opQueue.remove(operation);
    queueReadLock.unlock();
    queueWriteLock.lock();
    try
    {
      if (shutdownRequested)
      {
        // Shutdown may have been requested between unlock/lock. This thread is
        // about to shutdown anyway, so return true.
        return true;
      }
      int currentThreads = workerThreads.size();
      if (currentThreads > numWorkerThreads)
      {
        if (workerThreads.remove(Thread.currentThread()))
        {
          currentThreads--;
        }
        if (currentThreads <= numWorkerThreads)
        {
          killThreads = false;
        }
        workerThread.setStoppedByReducedThreadNumber();
        return true;
      }
    }
    finally
    {
      queueWriteLock.unlock();
      queueReadLock.lock();
      if (shutdownRequested)
      {
        // Shutdown may have been requested between unlock/lock. This thread is
        // about to shutdown anyway, so return true.
        return true;
      }
    }
    return false;
  }
  /**
   * Retrieves the total number of operations that have been successfully
   * submitted to this work queue for processing since server startup.  This
   * does not include operations that have been rejected for some reason like
   * the queue already at its maximum capacity.
   * submitted to this work queue for processing since server startup. This does
   * not include operations that have been rejected for some reason like the
   * queue already at its maximum capacity.
   *
   * @return  The total number of operations that have been successfully
   *          submitted to this work queue since startup.
   * @return The total number of operations that have been successfully
   *         submitted to this work queue since startup.
   */
  public long getOpsSubmitted()
  {
@@ -518,8 +590,8 @@
   * Retrieves the total number of operations that have been rejected because
   * the work queue was already at its maximum capacity.
   *
   * @return  The total number of operations that have been rejected because the
   *          work queue was already at its maximum capacity.
   * @return The total number of operations that have been rejected because the
   *         work queue was already at its maximum capacity.
   */
  public long getOpsRejectedDueToQueueFull()
  {
@@ -530,16 +602,24 @@
  /**
   * Retrieves the number of pending operations in the queue that have not yet
   * been picked up for processing.  Note that this method is not a
   * constant-time operation and can be relatively inefficient, so it should be
   * used sparingly.
   * been picked up for processing. Note that this method is not a constant-time
   * operation and can be relatively inefficient, so it should be used
   * sparingly.
   *
   * @return  The number of pending operations in the queue that have not yet
   *          been picked up for processing.
   * @return The number of pending operations in the queue that have not yet
   *         been picked up for processing.
   */
  public int size()
  {
    return opQueue.size();
    queueReadLock.lock();
    try
    {
      return opQueue.size();
    }
    finally
    {
      queueReadLock.unlock();
    }
  }
@@ -548,8 +628,7 @@
   * {@inheritDoc}
   */
  public boolean isConfigurationChangeAcceptable(
                      TraditionalWorkQueueCfg configuration,
                      List<Message> unacceptableReasons)
      TraditionalWorkQueueCfg configuration, List<Message> unacceptableReasons)
  {
    return true;
  }
@@ -560,122 +639,126 @@
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationChange(
                                 TraditionalWorkQueueCfg configuration)
      TraditionalWorkQueueCfg configuration)
  {
    ArrayList<Message> resultMessages = new ArrayList<Message>();
    int newNumThreads  = getNumWorkerThreads(configuration);
    int newNumThreads = getNumWorkerThreads(configuration);
    int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
    // Apply a change to the number of worker threads if appropriate.
    int currentThreads = workerThreads.size();
    if (newNumThreads != currentThreads)
    {
      synchronized (queueLock)
      queueWriteLock.lock();
      try
      {
        try
        int threadsToAdd = newNumThreads - currentThreads;
        if (threadsToAdd > 0)
        {
          int threadsToAdd = newNumThreads - currentThreads;
          if (threadsToAdd > 0)
          for (int i = 0; i < threadsToAdd; i++)
          {
            for (int i=0; i < threadsToAdd; i++)
            {
              TraditionalWorkerThread t =
                   new TraditionalWorkerThread(this, lastThreadNumber++);
              workerThreads.add(t);
              t.start();
            }
            killThreads = false;
          }
          else
          {
            killThreads = true;
            TraditionalWorkerThread t = new TraditionalWorkerThread(this,
                lastThreadNumber++);
            workerThreads.add(t);
            t.start();
          }
          numWorkerThreads = newNumThreads;
          killThreads = false;
        }
        catch (Exception e)
        else
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
          killThreads = true;
        }
        numWorkerThreads = newNumThreads;
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
      finally
      {
        queueWriteLock.unlock();
      }
    }
    // Apply a change to the maximum capacity if appropriate.  Since we can't
    // Apply a change to the maximum capacity if appropriate. Since we can't
    // change capacity on the fly, then we'll have to create a new queue and
    // transfer any remaining items into it.  Any thread that is waiting on the
    // transfer any remaining items into it. Any thread that is waiting on the
    // original queue will time out after at most a few seconds and further
    // checks will be against the new queue.
    if (newMaxCapacity != maxCapacity)
    {
      synchronized (queueLock)
      // First switch the queue with the exclusive lock.
      queueWriteLock.lock();
      LinkedBlockingQueue<AbstractOperation> oldOpQueue;
      try
      {
        try
        LinkedBlockingQueue<AbstractOperation> newOpQueue = null;
        if (newMaxCapacity > 0)
        {
          LinkedBlockingQueue<AbstractOperation> newOpQueue;
          if (newMaxCapacity > 0)
          {
            newOpQueue =
              new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
          }
          else
          {
            newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
          }
          LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
          opQueue = newOpQueue;
          LinkedList<AbstractOperation> pendingOps =
            new LinkedList<AbstractOperation>();
          oldOpQueue.drainTo(pendingOps);
          // We have to be careful when adding any existing pending operations
          // because the new capacity could be less than what was already
          // backlogged in the previous queue.  If that happens, we may have to
          // loop a few times to get everything in there.
          while (! pendingOps.isEmpty())
          {
            Iterator<AbstractOperation> iterator = pendingOps.iterator();
            while (iterator.hasNext())
            {
              AbstractOperation o = iterator.next();
              try
              {
                if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
                {
                  iterator.remove();
                }
              }
              catch (InterruptedException ie)
              {
                if (debugEnabled())
                {
                  TRACER.debugCaught(DebugLogLevel.ERROR, ie);
                }
              }
            }
          }
          maxCapacity = newMaxCapacity;
          newOpQueue = new LinkedBlockingQueue<AbstractOperation>(
              newMaxCapacity);
        }
        catch (Exception e)
        else
        {
          if (debugEnabled())
          newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
        }
        oldOpQueue = opQueue;
        opQueue = newOpQueue;
        maxCapacity = newMaxCapacity;
      }
      finally
      {
        queueWriteLock.unlock();
      }
      // Now resubmit any pending requests - we'll need the shared lock.
      AbstractOperation pendingOperation = null;
      queueReadLock.lock();
      try
      {
        // We have to be careful when adding any existing pending operations
        // because the new capacity could be less than what was already
        // backlogged in the previous queue. If that happens, we may have to
        // loop a few times to get everything in there.
        while ((pendingOperation = oldOpQueue.poll()) != null)
        {
          opQueue.put(pendingOperation);
        }
      }
      catch (InterruptedException e)
      {
        // We cannot handle the interruption here. Cancel pending requests and
        // re-interrupt this thread.
        Thread.currentThread().interrupt();
        Message message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
        CancelRequest cancelRequest = new CancelRequest(true, message);
        if (pendingOperation != null)
        {
          pendingOperation.abort(cancelRequest);
        }
        while ((pendingOperation = oldOpQueue.poll()) != null)
        {
          if (pendingOperation != null)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            pendingOperation.abort(cancelRequest);
          }
        }
      }
      finally
      {
        queueReadLock.unlock();
      }
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
  }
@@ -687,13 +770,14 @@
  @Override()
  public boolean isIdle()
  {
    if (opQueue.size() > 0)
    queueReadLock.lock();
    try
    {
      return false;
    }
      if (opQueue.size() > 0)
      {
        return false;
      }
    synchronized (queueLock)
    {
      for (TraditionalWorkerThread t : workerThreads)
      {
        if (t.isActive())
@@ -704,6 +788,10 @@
      return true;
    }
    finally
    {
      queueReadLock.unlock();
    }
  }
@@ -728,4 +816,3 @@
    }
  }
}
opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -1194,16 +1194,16 @@
    // We need to grab a lock to ensure that no one else can add
    // operations to the queue while we are performing some preliminary
    // checks.
    synchronized (opsInProgressLock)
    try
    {
      try
      synchronized (opsInProgressLock)
      {
        // If we're already in the process of disconnecting the client,
        // then reject the operation.
        if (disconnectRequested)
        {
          Message message =
              WARN_LDAP_CLIENT_DISCONNECT_IN_PROGRESS.get();
            WARN_LDAP_CLIENT_DISCONNECT_IN_PROGRESS.get();
          throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
              message);
        }
@@ -1217,41 +1217,41 @@
        if (op != null)
        {
          Message message =
              WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
            WARN_LDAP_CLIENT_DUPLICATE_MESSAGE_ID.get(messageID);
          throw new DirectoryException(ResultCode.PROTOCOL_ERROR,
              message);
        }
        // Try to add the operation to the work queue,
        // or run it synchronously (typically for the administration
        // connector)
        connectionHandler.getQueueingStrategy().enqueueRequest(
            operation);
      }
      catch (DirectoryException de)
      // Try to add the operation to the work queue,
      // or run it synchronously (typically for the administration
      // connector)
      connectionHandler.getQueueingStrategy().enqueueRequest(
          operation);
    }
    catch (DirectoryException de)
    {
      if (debugEnabled())
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, de);
        }
        operationsInProgress.remove(messageID);
        lastCompletionTime.set(TimeThread.getTime());
        throw de;
        TRACER.debugCaught(DebugLogLevel.ERROR, de);
      }
      catch (Exception e)
      operationsInProgress.remove(messageID);
      lastCompletionTime.set(TimeThread.getTime());
      throw de;
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message =
            WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
        throw new DirectoryException(DirectoryServer
            .getServerErrorResultCode(), message, e);
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      Message message =
        WARN_LDAP_CLIENT_CANNOT_ENQUEUE.get(getExceptionMessage(e));
      throw new DirectoryException(DirectoryServer
          .getServerErrorResultCode(), message, e);
    }
  }