From 22196baccce4bcbbc7f4e2144ce085d9433842bc Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 17 Sep 2010 22:51:17 +0000
Subject: [PATCH] Fix potential OOME when under heavy asynchronous search load by making work queue capacity finite so that clients are blocked when the queue is full

---
 opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java |  765 ++++++++++++++++++++++++++++++++--------------------------
 1 files changed, 426 insertions(+), 339 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java b/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
index c994266..a3d04fc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -28,13 +28,20 @@
 
 
 
+import static org.opends.messages.ConfigMessages.*;
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.opends.messages.Message;
 import org.opends.server.admin.server.ConfigurationChangeListener;
@@ -44,19 +51,7 @@
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.monitors.TraditionalWorkQueueMonitor;
-import org.opends.server.types.AbstractOperation;
-import org.opends.server.types.CancelRequest;
-import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.InitializationException;
-import org.opends.server.types.Operation;
-import org.opends.server.types.ResultCode;
-
-import static org.opends.messages.ConfigMessages.*;
-import static org.opends.messages.CoreMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.types.*;
 
 
 
@@ -64,28 +59,23 @@
  * This class defines a data structure for storing and interacting with the
  * Directory Server work queue.
  */
-public class TraditionalWorkQueue
-       extends WorkQueue<TraditionalWorkQueueCfg>
-       implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
+public class TraditionalWorkQueue extends WorkQueue<TraditionalWorkQueueCfg>
+    implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
 {
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
 
-
-
-
   /**
    * The maximum number of times to retry getting the next operation from the
    * queue if an unexpected failure occurs.
    */
   private static final int MAX_RETRY_COUNT = 5;
 
-
-
   // The set of worker threads that will be used to process this work queue.
-  private ArrayList<TraditionalWorkerThread> workerThreads;
+  private final ArrayList<TraditionalWorkerThread> workerThreads =
+    new ArrayList<TraditionalWorkerThread>();
 
   // The number of operations that have been submitted to the work queue for
   // processing.
@@ -100,7 +90,7 @@
   private boolean killThreads;
 
   // Indicates whether the Directory Server is shutting down.
-  private volatile boolean shutdownRequested;
+  private boolean shutdownRequested;
 
   // The thread number used for the last worker thread that was created.
   private int lastThreadNumber;
@@ -113,16 +103,35 @@
   // a configuration change has not been completely applied).
   private int numWorkerThreads;
 
+  // The queue overflow policy: true indicates that operations will be blocked
+  // until the queue has available capacity, otherwise operations will be
+  // rejected.
+  //
+  // This is hard-coded to true for now because a reject on full policy does
+  // not seem to have a valid use case.
+  //
+  private final boolean isBlocking = true;
+
   // The queue that will be used to actually hold the pending operations.
   private LinkedBlockingQueue<AbstractOperation> opQueue;
 
-  // The lock used to provide threadsafe access for the queue.
-  private Object queueLock;
+  // The locks used to provide threadsafe access for the queue.
+
+  // Used for non-config changes.
+  private final ReadLock queueReadLock;
+
+  // Used for config changes.
+  private final WriteLock queueWriteLock;
+  {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    queueReadLock = lock.readLock();
+    queueWriteLock = lock.writeLock();
+  }
 
 
 
   /**
-   * Creates a new instance of this work queue.  All initialization should be
+   * Creates a new instance of this work queue. All initialization should be
    * performed in the <CODE>initializeWorkQueue</CODE> method.
    */
   public TraditionalWorkQueue()
@@ -137,66 +146,70 @@
    */
   @Override()
   public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
-         throws ConfigException, InitializationException
+      throws ConfigException, InitializationException
   {
-    shutdownRequested = false;
-    killThreads       = false;
-    opsSubmitted      = new AtomicLong(0);
-    queueFullRejects  = new AtomicLong(0);
-    queueLock         = new Object();
-
-
-    // Register to be notified of any configuration changes.
-    configuration.addTraditionalChangeListener(this);
-
-
-    // Get the necessary configuration from the provided entry.
-    numWorkerThreads = getNumWorkerThreads(configuration);
-    maxCapacity      = configuration.getMaxWorkQueueCapacity();
-
-
-    // Create the actual work queue.
-    if (maxCapacity > 0)
-    {
-      opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
-    }
-    else
-    {
-      opQueue = new LinkedBlockingQueue<AbstractOperation>();
-    }
-
-
-    // Create the set of worker threads that should be used to service the
-    // work queue.
-    workerThreads = new ArrayList<TraditionalWorkerThread>(numWorkerThreads);
-    for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
-    lastThreadNumber++)
-    {
-      TraditionalWorkerThread t =
-           new TraditionalWorkerThread(this, lastThreadNumber);
-      t.start();
-      workerThreads.add(t);
-    }
-
-
-    // Create and register a monitor provider for the work queue.
+    queueWriteLock.lock();
     try
     {
-      TraditionalWorkQueueMonitor monitor =
-           new TraditionalWorkQueueMonitor(this);
-      monitor.initializeMonitorProvider(null);
-      DirectoryServer.registerMonitorProvider(monitor);
-    }
-    catch (Exception e)
-    {
-      if (debugEnabled())
+      shutdownRequested = false;
+      killThreads = false;
+      opsSubmitted = new AtomicLong(0);
+      queueFullRejects = new AtomicLong(0);
+
+      // Register to be notified of any configuration changes.
+      configuration.addTraditionalChangeListener(this);
+
+      // Get the necessary configuration from the provided entry.
+      numWorkerThreads = getNumWorkerThreads(configuration);
+      maxCapacity = configuration.getMaxWorkQueueCapacity();
+
+      // Create the actual work queue.
+      if (maxCapacity > 0)
       {
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
+      }
+      else
+      {
+        // This will never be the case, since the configuration definition
+        // ensures that the capacity is always finite.
+        opQueue = new LinkedBlockingQueue<AbstractOperation>();
       }
 
-      Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
-          String.valueOf(TraditionalWorkQueueMonitor.class), String.valueOf(e));
-      logError(message);
+      // Create the set of worker threads that should be used to service the
+      // work queue.
+      for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
+        lastThreadNumber++)
+      {
+        TraditionalWorkerThread t = new TraditionalWorkerThread(this,
+            lastThreadNumber);
+        t.start();
+        workerThreads.add(t);
+      }
+
+      // Create and register a monitor provider for the work queue.
+      try
+      {
+        TraditionalWorkQueueMonitor monitor = new TraditionalWorkQueueMonitor(
+            this);
+        monitor.initializeMonitorProvider(null);
+        DirectoryServer.registerMonitorProvider(monitor);
+      }
+      catch (Exception e)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+
+        Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
+            String.valueOf(TraditionalWorkQueueMonitor.class),
+            String.valueOf(e));
+        logError(message);
+      }
+    }
+    finally
+    {
+      queueWriteLock.unlock();
     }
   }
 
@@ -208,8 +221,17 @@
   @Override()
   public void finalizeWorkQueue(Message reason)
   {
-    shutdownRequested = true;
+    queueWriteLock.lock();
+    try
+    {
+      shutdownRequested = true;
+    }
+    finally
+    {
+      queueWriteLock.unlock();
+    }
 
+    // From now on no more operations can be enqueued or dequeued.
 
     // Send responses to any operations in the pending queue to indicate that
     // they won't be processed because the server is shutting down.
@@ -222,7 +244,8 @@
       {
         // The operation has no chance of responding to the cancel
         // request so avoid waiting for a cancel response.
-        if (o.getCancelResult() == null) {
+        if (o.getCancelResult() == null)
+        {
           o.abort(cancelRequest);
         }
       }
@@ -233,12 +256,11 @@
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
         }
 
-        logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
-            String.valueOf(o), String.valueOf(e)));
+        logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(String.valueOf(o),
+            String.valueOf(e)));
       }
     }
 
-
     // Notify all the worker threads of the shutdown.
     for (TraditionalWorkerThread t : workerThreads)
     {
@@ -253,8 +275,8 @@
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
         }
 
-        logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
-            t.getName(), String.valueOf(e)));
+        logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(t.getName(),
+            String.valueOf(e)));
       }
     }
   }
@@ -264,12 +286,20 @@
   /**
    * Indicates whether this work queue has received a request to shut down.
    *
-   * @return  <CODE>true</CODE> if the work queue has recieved a request to shut
-   *          down, or <CODE>false</CODE> if not.
+   * @return <CODE>true</CODE> if the work queue has recieved a request to shut
+   *         down, or <CODE>false</CODE> if not.
    */
   public boolean shutdownRequested()
   {
-    return shutdownRequested;
+    queueReadLock.lock();
+    try
+    {
+      return shutdownRequested;
+    }
+    finally
+    {
+      queueReadLock.unlock();
+    }
   }
 
 
@@ -278,46 +308,93 @@
    * Submits an operation to be processed by one of the worker threads
    * associated with this work queue.
    *
-   * @param  operation  The operation to be processed.
-   *
-   * @throws  DirectoryException  If the provided operation is not accepted for
-   *                              some reason (e.g., if the server is shutting
-   *                              down or the pending operation queue is already
-   *                              at its maximum capacity).
+   * @param operation
+   *          The operation to be processed.
+   * @throws DirectoryException
+   *           If the provided operation is not accepted for some reason (e.g.,
+   *           if the server is shutting down or the pending operation queue is
+   *           already at its maximum capacity).
    */
   @Override
   public void submitOperation(AbstractOperation operation)
-         throws DirectoryException
+      throws DirectoryException
   {
-    if (shutdownRequested)
+    queueReadLock.lock();
+    try
     {
-      Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
-      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
-    }
+      if (shutdownRequested)
+      {
+        Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
+        throw new DirectoryException(ResultCode.UNAVAILABLE, message);
+      }
 
-    if (! opQueue.offer(operation))
+      if (isBlocking)
+      {
+        try
+        {
+          // If the queue is full and there is an administrative change taking
+          // place then starvation could arise: this thread will hold the read
+          // lock, the admin thread will be waiting on the write lock, and the
+          // worker threads may be queued behind the admin thread. Since the
+          // worker threads cannot run, the queue will never empty and allow
+          // this thread to proceed. To help things out we can periodically
+          // yield the read lock when the queue is full.
+          while (!opQueue.offer(operation, 1, TimeUnit.SECONDS))
+          {
+            queueReadLock.unlock();
+            Thread.yield();
+            queueReadLock.lock();
+
+            if (shutdownRequested)
+            {
+              Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
+              throw new DirectoryException(ResultCode.UNAVAILABLE, message);
+            }
+          }
+        }
+        catch (InterruptedException e)
+        {
+          // We cannot handle the interruption here. Reject the request and
+          // re-interrupt this thread.
+          Thread.currentThread().interrupt();
+
+          queueFullRejects.incrementAndGet();
+
+          Message message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
+          throw new DirectoryException(ResultCode.BUSY, message);
+        }
+      }
+      else
+      {
+        if (!opQueue.offer(operation))
+        {
+          queueFullRejects.incrementAndGet();
+
+          Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
+          throw new DirectoryException(ResultCode.BUSY, message);
+        }
+      }
+
+      opsSubmitted.incrementAndGet();
+    }
+    finally
     {
-      queueFullRejects.incrementAndGet();
-
-      Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
-      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
+      queueReadLock.unlock();
     }
-
-    opsSubmitted.incrementAndGet();
   }
 
 
 
   /**
    * Retrieves the next operation that should be processed by one of the worker
-   * threads, blocking if necessary until a new request arrives.  This method
+   * threads, blocking if necessary until a new request arrives. This method
    * should only be called by a worker thread associated with this work queue.
    *
-   * @param  workerThread  The worker thread that is requesting the operation.
-   *
-   * @return  The next operation that should be processed, or <CODE>null</CODE>
-   *          if the server is shutting down and no more operations will be
-   *          processed.
+   * @param workerThread
+   *          The worker thread that is requesting the operation.
+   * @return The next operation that should be processed, or <CODE>null</CODE>
+   *         if the server is shutting down and no more operations will be
+   *         processed.
    */
   public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
   {
@@ -328,131 +405,81 @@
 
   /**
    * Retrieves the next operation that should be processed by one of the worker
-   * threads following a previous failure attempt.  A maximum of five
-   * consecutive failures will be allowed before returning <CODE>null</CODE>,
-   * which will cause the associated thread to exit.
+   * threads following a previous failure attempt. A maximum of five consecutive
+   * failures will be allowed before returning <CODE>null</CODE>, which will
+   * cause the associated thread to exit.
    *
-   * @param  workerThread  The worker thread that is requesting the operation.
-   * @param  numFailures   The number of consecutive failures that the worker
-   *                       thread has experienced so far.  If this gets too
-   *                       high, then this method will return <CODE>null</CODE>
-   *                       rather than retrying.
-   *
-   * @return  The next operation that should be processed, or <CODE>null</CODE>
-   *          if the server is shutting down and no more operations will be
-   *          processed, or if there have been too many consecutive failures.
+   * @param workerThread
+   *          The worker thread that is requesting the operation.
+   * @param numFailures
+   *          The number of consecutive failures that the worker thread has
+   *          experienced so far. If this gets too high, then this method will
+   *          return <CODE>null</CODE> rather than retrying.
+   * @return The next operation that should be processed, or <CODE>null</CODE>
+   *         if the server is shutting down and no more operations will be
+   *         processed, or if there have been too many consecutive failures.
    */
   private AbstractOperation retryNextOperation(
-                                       TraditionalWorkerThread workerThread,
-                                       int numFailures)
+      TraditionalWorkerThread workerThread, int numFailures)
   {
-    // See if we should kill off this thread.  This could be necessary if the
-    // number of worker threads has been decreased with the server online.  If
+    // See if we should kill off this thread. This could be necessary if the
+    // number of worker threads has been decreased with the server online. If
     // so, then return null and the thread will exit.
-    if (killThreads)
-    {
-      synchronized (queueLock)
-      {
-        try
-        {
-          int currentThreads = workerThreads.size();
-          if (currentThreads > numWorkerThreads)
-          {
-            if (workerThreads.remove(Thread.currentThread()))
-            {
-              currentThreads--;
-            }
-
-            if (currentThreads <= numWorkerThreads)
-            {
-              killThreads = false;
-            }
-
-            workerThread.setStoppedByReducedThreadNumber();
-            return null;
-          }
-        }
-        catch (Exception e)
-        {
-          if (debugEnabled())
-          {
-            TRACER.debugCaught(DebugLogLevel.ERROR, e);
-          }
-        }
-      }
-    }
-
-    if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
-    {
-      if (numFailures > MAX_RETRY_COUNT)
-      {
-        Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
-            Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
-        logError(message);
-      }
-
-      return null;
-    }
-
+    queueReadLock.lock();
     try
     {
+      if (shutdownRequested)
+      {
+        return null;
+      }
+
+      if (killThreads && tryKillThisWorkerThread(workerThread))
+      {
+        return null;
+      }
+
+      if (numFailures > MAX_RETRY_COUNT)
+      {
+        Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(Thread
+            .currentThread().getName(), numFailures, MAX_RETRY_COUNT);
+        logError(message);
+
+        return null;
+      }
+
       while (true)
       {
         AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
-        if (nextOperation == null)
-        {
-          // There was no work to do in the specified length of time.  See if
-          // we should shutdown, and if not then just check again.
-          if (shutdownRequested)
-          {
-            return null;
-          }
-          else if (killThreads)
-          {
-            synchronized (queueLock)
-            {
-              try
-              {
-                int currentThreads = workerThreads.size();
-                if (currentThreads > numWorkerThreads)
-                {
-                  if (workerThreads.remove(Thread.currentThread()))
-                  {
-                    currentThreads--;
-                  }
-
-                  if (currentThreads <= numWorkerThreads)
-                  {
-                    killThreads = false;
-                  }
-
-                  workerThread.setStoppedByReducedThreadNumber();
-                  return null;
-                }
-              }
-              catch (Exception e)
-              {
-                if (debugEnabled())
-                {
-                  TRACER.debugCaught(DebugLogLevel.ERROR, e);
-                }
-              }
-            }
-          }
-        }
-        else
+        if (nextOperation != null)
         {
           return nextOperation;
         }
+
+        // There was no work to do in the specified length of time. Release the
+        // read lock allowing shutdown or config changes to proceed and then see
+        // if we should give up or check again.
+        queueReadLock.unlock();
+        Thread.yield();
+        queueReadLock.lock();
+
+        if (shutdownRequested)
+        {
+          return null;
+        }
+
+        if (killThreads && tryKillThisWorkerThread(workerThread))
+        {
+          return null;
+        }
       }
     }
     catch (InterruptedException ie)
     {
       // This is somewhat expected so don't log.
-      //      assert debugException(CLASS_NAME, "retryNextOperation", ie);
+      // assert debugException(CLASS_NAME, "retryNextOperation", ie);
 
       // If this occurs, then the worker thread must have been interrupted for
-      // some reason.  This could be because the Directory Server is shutting
+      // some reason. This could be because the Directory Server is shutting
       // down, in which case we should return null.
       if (shutdownRequested)
       {
@@ -460,10 +487,9 @@
       }
 
       // If we've gotten here, then the worker thread was interrupted for some
-      // other reason.  This should not happen, and we need to log a message.
-      logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(
-          Thread.currentThread().getName(), String.valueOf(ie)));
-      return retryNextOperation(workerThread, numFailures+1);
+      // other reason. This should not happen, and we need to log a message.
+      logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(Thread
+          .currentThread().getName(), String.valueOf(ie)));
     }
     catch (Exception e)
     {
@@ -472,40 +498,86 @@
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
       }
 
-      // This should not happen.  The only recourse we have is to log a message
+      // This should not happen. The only recourse we have is to log a message
       // and try again.
-      logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
-          Thread.currentThread().getName(), String.valueOf(e)));
-      return retryNextOperation(workerThread, numFailures + 1);
+      logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(Thread
+          .currentThread().getName(), String.valueOf(e)));
     }
+    finally
+    {
+      queueReadLock.unlock();
+    }
+
+    // An exception has occurred - retry.
+    return retryNextOperation(workerThread, numFailures + 1);
   }
 
 
 
   /**
-   * Attempts to remove the specified operation from this queue if it has not
-   * yet been picked up for processing by one of the worker threads.
+   * Kills this worker thread if needed. This method assumes that the read lock
+   * is already taken and ensure that it is taken on exit.
    *
-   * @param  operation  The operation to remove from the queue.
-   *
-   * @return  <CODE>true</CODE> if the provided request was present in the queue
-   *          and was removed successfully, or <CODE>false</CODE> it not.
+   * @param workerThread
+   *          The worker thread associated with this thread.
+   * @return {@code true} if this thread was killed or is about to be killed as
+   *         a result of shutdown.
    */
-  public boolean removeOperation(AbstractOperation operation)
+  private boolean tryKillThisWorkerThread(TraditionalWorkerThread workerThread)
   {
-    return opQueue.remove(operation);
+    queueReadLock.unlock();
+    queueWriteLock.lock();
+    try
+    {
+      if (shutdownRequested)
+      {
+        // Shutdown may have been requested between unlock/lock. This thread is
+        // about to shutdown anyway, so return true.
+        return true;
+      }
+
+      int currentThreads = workerThreads.size();
+      if (currentThreads > numWorkerThreads)
+      {
+        if (workerThreads.remove(Thread.currentThread()))
+        {
+          currentThreads--;
+        }
+
+        if (currentThreads <= numWorkerThreads)
+        {
+          killThreads = false;
+        }
+
+        workerThread.setStoppedByReducedThreadNumber();
+        return true;
+      }
+    }
+    finally
+    {
+      queueWriteLock.unlock();
+      queueReadLock.lock();
+
+      if (shutdownRequested)
+      {
+        // Shutdown may have been requested between unlock/lock. This thread is
+        // about to shutdown anyway, so return true.
+        return true;
+      }
+    }
+    return false;
   }
 
 
 
   /**
    * Retrieves the total number of operations that have been successfully
-   * submitted to this work queue for processing since server startup.  This
-   * does not include operations that have been rejected for some reason like
-   * the queue already at its maximum capacity.
+   * submitted to this work queue for processing since server startup. This does
+   * not include operations that have been rejected for some reason like the
+   * queue already at its maximum capacity.
    *
-   * @return  The total number of operations that have been successfully
-   *          submitted to this work queue since startup.
+   * @return The total number of operations that have been successfully
+   *         submitted to this work queue since startup.
    */
   public long getOpsSubmitted()
   {
@@ -518,8 +590,8 @@
    * Retrieves the total number of operations that have been rejected because
    * the work queue was already at its maximum capacity.
    *
-   * @return  The total number of operations that have been rejected because the
-   *          work queue was already at its maximum capacity.
+   * @return The total number of operations that have been rejected because the
+   *         work queue was already at its maximum capacity.
    */
   public long getOpsRejectedDueToQueueFull()
   {
@@ -530,16 +602,24 @@
 
   /**
    * Retrieves the number of pending operations in the queue that have not yet
-   * been picked up for processing.  Note that this method is not a
-   * constant-time operation and can be relatively inefficient, so it should be
-   * used sparingly.
+   * been picked up for processing. Note that this method is not a constant-time
+   * operation and can be relatively inefficient, so it should be used
+   * sparingly.
    *
-   * @return  The number of pending operations in the queue that have not yet
-   *          been picked up for processing.
+   * @return The number of pending operations in the queue that have not yet
+   *         been picked up for processing.
    */
   public int size()
   {
-    return opQueue.size();
+    queueReadLock.lock();
+    try
+    {
+      return opQueue.size();
+    }
+    finally
+    {
+      queueReadLock.unlock();
+    }
   }
 
 
@@ -548,8 +628,7 @@
    * {@inheritDoc}
    */
   public boolean isConfigurationChangeAcceptable(
-                      TraditionalWorkQueueCfg configuration,
-                      List<Message> unacceptableReasons)
+      TraditionalWorkQueueCfg configuration, List<Message> unacceptableReasons)
   {
     return true;
   }
@@ -560,122 +639,126 @@
    * {@inheritDoc}
    */
   public ConfigChangeResult applyConfigurationChange(
-                                 TraditionalWorkQueueCfg configuration)
+      TraditionalWorkQueueCfg configuration)
   {
     ArrayList<Message> resultMessages = new ArrayList<Message>();
-    int newNumThreads  = getNumWorkerThreads(configuration);
+    int newNumThreads = getNumWorkerThreads(configuration);
     int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
 
-
     // Apply a change to the number of worker threads if appropriate.
     int currentThreads = workerThreads.size();
     if (newNumThreads != currentThreads)
     {
-      synchronized (queueLock)
+      queueWriteLock.lock();
+      try
       {
-        try
+        int threadsToAdd = newNumThreads - currentThreads;
+        if (threadsToAdd > 0)
         {
-          int threadsToAdd = newNumThreads - currentThreads;
-          if (threadsToAdd > 0)
+          for (int i = 0; i < threadsToAdd; i++)
           {
-            for (int i=0; i < threadsToAdd; i++)
-            {
-              TraditionalWorkerThread t =
-                   new TraditionalWorkerThread(this, lastThreadNumber++);
-              workerThreads.add(t);
-              t.start();
-            }
-
-            killThreads = false;
-          }
-          else
-          {
-            killThreads = true;
+            TraditionalWorkerThread t = new TraditionalWorkerThread(this,
+                lastThreadNumber++);
+            workerThreads.add(t);
+            t.start();
           }
 
-          numWorkerThreads = newNumThreads;
+          killThreads = false;
         }
-        catch (Exception e)
+        else
         {
-          if (debugEnabled())
-          {
-            TRACER.debugCaught(DebugLogLevel.ERROR, e);
-          }
+          killThreads = true;
         }
+
+        numWorkerThreads = newNumThreads;
+      }
+      catch (Exception e)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+      }
+      finally
+      {
+        queueWriteLock.unlock();
       }
     }
 
 
-    // Apply a change to the maximum capacity if appropriate.  Since we can't
+    // Apply a change to the maximum capacity if appropriate. Since we can't
     // change capacity on the fly, then we'll have to create a new queue and
-    // transfer any remaining items into it.  Any thread that is waiting on the
+    // transfer any remaining items into it. Any thread that is waiting on the
     // original queue will time out after at most a few seconds and further
     // checks will be against the new queue.
     if (newMaxCapacity != maxCapacity)
     {
-      synchronized (queueLock)
+      // First switch the queue with the exclusive lock.
+      queueWriteLock.lock();
+      LinkedBlockingQueue<AbstractOperation> oldOpQueue;
+      try
       {
-        try
+        LinkedBlockingQueue<AbstractOperation> newOpQueue = null;
+        if (newMaxCapacity > 0)
         {
-          LinkedBlockingQueue<AbstractOperation> newOpQueue;
-          if (newMaxCapacity > 0)
-          {
-            newOpQueue =
-              new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
-          }
-          else
-          {
-            newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
-          }
-
-          LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
-          opQueue = newOpQueue;
-
-          LinkedList<AbstractOperation> pendingOps =
-            new LinkedList<AbstractOperation>();
-          oldOpQueue.drainTo(pendingOps);
-
-
-          // We have to be careful when adding any existing pending operations
-          // because the new capacity could be less than what was already
-          // backlogged in the previous queue.  If that happens, we may have to
-          // loop a few times to get everything in there.
-          while (! pendingOps.isEmpty())
-          {
-            Iterator<AbstractOperation> iterator = pendingOps.iterator();
-            while (iterator.hasNext())
-            {
-              AbstractOperation o = iterator.next();
-              try
-              {
-                if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
-                {
-                  iterator.remove();
-                }
-              }
-              catch (InterruptedException ie)
-              {
-                if (debugEnabled())
-                {
-                  TRACER.debugCaught(DebugLogLevel.ERROR, ie);
-                }
-              }
-            }
-          }
-
-          maxCapacity = newMaxCapacity;
+          newOpQueue = new LinkedBlockingQueue<AbstractOperation>(
+              newMaxCapacity);
         }
-        catch (Exception e)
+        else
         {
-          if (debugEnabled())
+          newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
+        }
+
+        oldOpQueue = opQueue;
+        opQueue = newOpQueue;
+
+        maxCapacity = newMaxCapacity;
+      }
+      finally
+      {
+        queueWriteLock.unlock();
+      }
+
+      // Now resubmit any pending requests - we'll need the shared lock.
+      AbstractOperation pendingOperation = null;
+      queueReadLock.lock();
+      try
+      {
+        // We have to be careful when adding any existing pending operations
+        // because the new capacity could be less than what was already
+        // backlogged in the previous queue. If that happens, we may have to
+        // loop a few times to get everything in there.
+        while ((pendingOperation = oldOpQueue.poll()) != null)
+        {
+          opQueue.put(pendingOperation);
+        }
+      }
+      catch (InterruptedException e)
+      {
+        // We cannot handle the interruption here. Cancel pending requests and
+        // re-interrupt this thread.
+        Thread.currentThread().interrupt();
+
+        Message message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
+        CancelRequest cancelRequest = new CancelRequest(true, message);
+        if (pendingOperation != null)
+        {
+          pendingOperation.abort(cancelRequest);
+        }
+        while ((pendingOperation = oldOpQueue.poll()) != null)
+        {
+          if (pendingOperation != null)
           {
-            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            pendingOperation.abort(cancelRequest);
           }
         }
       }
+      finally
+      {
+        queueReadLock.unlock();
+      }
     }
 
-
     return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
   }
 
@@ -687,13 +770,14 @@
   @Override()
   public boolean isIdle()
   {
-    if (opQueue.size() > 0)
+    queueReadLock.lock();
+    try
     {
-      return false;
-    }
+      if (opQueue.size() > 0)
+      {
+        return false;
+      }
 
-    synchronized (queueLock)
-    {
       for (TraditionalWorkerThread t : workerThreads)
       {
         if (t.isActive())
@@ -704,6 +788,10 @@
 
       return true;
     }
+    finally
+    {
+      queueReadLock.unlock();
+    }
   }
 
 
@@ -728,4 +816,3 @@
     }
   }
 }
-

--
Gitblit v1.10.0