From 22196baccce4bcbbc7f4e2144ce085d9433842bc Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 17 Sep 2010 22:51:17 +0000
Subject: [PATCH] Fix potential OOME when under heavy asynchronous search load by making work queue capacity finite so that clients are blocked when the queue is full
---
opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java | 765 ++++++++++++++++++++++++++++++++--------------------------
1 files changed, 426 insertions(+), 339 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java b/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
index c994266..a3d04fc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -28,13 +28,20 @@
+import static org.opends.messages.ConfigMessages.*;
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationChangeListener;
@@ -44,19 +51,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.monitors.TraditionalWorkQueueMonitor;
-import org.opends.server.types.AbstractOperation;
-import org.opends.server.types.CancelRequest;
-import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.InitializationException;
-import org.opends.server.types.Operation;
-import org.opends.server.types.ResultCode;
-
-import static org.opends.messages.ConfigMessages.*;
-import static org.opends.messages.CoreMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.types.*;
@@ -64,28 +59,23 @@
* This class defines a data structure for storing and interacting with the
* Directory Server work queue.
*/
-public class TraditionalWorkQueue
- extends WorkQueue<TraditionalWorkQueueCfg>
- implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
+public class TraditionalWorkQueue extends WorkQueue<TraditionalWorkQueueCfg>
+ implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
{
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
-
-
-
/**
* The maximum number of times to retry getting the next operation from the
* queue if an unexpected failure occurs.
*/
private static final int MAX_RETRY_COUNT = 5;
-
-
// The set of worker threads that will be used to process this work queue.
- private ArrayList<TraditionalWorkerThread> workerThreads;
+ private final ArrayList<TraditionalWorkerThread> workerThreads =
+ new ArrayList<TraditionalWorkerThread>();
// The number of operations that have been submitted to the work queue for
// processing.
@@ -100,7 +90,7 @@
private boolean killThreads;
// Indicates whether the Directory Server is shutting down.
- private volatile boolean shutdownRequested;
+ private boolean shutdownRequested;
// The thread number used for the last worker thread that was created.
private int lastThreadNumber;
@@ -113,16 +103,35 @@
// a configuration change has not been completely applied).
private int numWorkerThreads;
+ // The queue overflow policy: true indicates that operations will be blocked
+ // until the queue has available capacity, otherwise operations will be
+ // rejected.
+ //
+ // This is hard-coded to true for now because a reject on full policy does
+ // not seem to have a valid use case.
+ //
+ private final boolean isBlocking = true;
+
// The queue that will be used to actually hold the pending operations.
private LinkedBlockingQueue<AbstractOperation> opQueue;
- // The lock used to provide threadsafe access for the queue.
- private Object queueLock;
+ // The locks used to provide threadsafe access for the queue.
+
+ // Used for non-config changes.
+ private final ReadLock queueReadLock;
+
+ // Used for config changes.
+ private final WriteLock queueWriteLock;
+ {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ queueReadLock = lock.readLock();
+ queueWriteLock = lock.writeLock();
+ }
/**
- * Creates a new instance of this work queue. All initialization should be
+ * Creates a new instance of this work queue. All initialization should be
* performed in the <CODE>initializeWorkQueue</CODE> method.
*/
public TraditionalWorkQueue()
@@ -137,66 +146,70 @@
*/
@Override()
public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
- throws ConfigException, InitializationException
+ throws ConfigException, InitializationException
{
- shutdownRequested = false;
- killThreads = false;
- opsSubmitted = new AtomicLong(0);
- queueFullRejects = new AtomicLong(0);
- queueLock = new Object();
-
-
- // Register to be notified of any configuration changes.
- configuration.addTraditionalChangeListener(this);
-
-
- // Get the necessary configuration from the provided entry.
- numWorkerThreads = getNumWorkerThreads(configuration);
- maxCapacity = configuration.getMaxWorkQueueCapacity();
-
-
- // Create the actual work queue.
- if (maxCapacity > 0)
- {
- opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
- }
- else
- {
- opQueue = new LinkedBlockingQueue<AbstractOperation>();
- }
-
-
- // Create the set of worker threads that should be used to service the
- // work queue.
- workerThreads = new ArrayList<TraditionalWorkerThread>(numWorkerThreads);
- for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
- lastThreadNumber++)
- {
- TraditionalWorkerThread t =
- new TraditionalWorkerThread(this, lastThreadNumber);
- t.start();
- workerThreads.add(t);
- }
-
-
- // Create and register a monitor provider for the work queue.
+ queueWriteLock.lock();
try
{
- TraditionalWorkQueueMonitor monitor =
- new TraditionalWorkQueueMonitor(this);
- monitor.initializeMonitorProvider(null);
- DirectoryServer.registerMonitorProvider(monitor);
- }
- catch (Exception e)
- {
- if (debugEnabled())
+ shutdownRequested = false;
+ killThreads = false;
+ opsSubmitted = new AtomicLong(0);
+ queueFullRejects = new AtomicLong(0);
+
+ // Register to be notified of any configuration changes.
+ configuration.addTraditionalChangeListener(this);
+
+ // Get the necessary configuration from the provided entry.
+ numWorkerThreads = getNumWorkerThreads(configuration);
+ maxCapacity = configuration.getMaxWorkQueueCapacity();
+
+ // Create the actual work queue.
+ if (maxCapacity > 0)
{
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
+ }
+ else
+ {
+ // This will never be the case, since the configuration definition
+ // ensures that the capacity is always finite.
+ opQueue = new LinkedBlockingQueue<AbstractOperation>();
}
- Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
- String.valueOf(TraditionalWorkQueueMonitor.class), String.valueOf(e));
- logError(message);
+ // Create the set of worker threads that should be used to service the
+ // work queue.
+ for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
+ lastThreadNumber++)
+ {
+ TraditionalWorkerThread t = new TraditionalWorkerThread(this,
+ lastThreadNumber);
+ t.start();
+ workerThreads.add(t);
+ }
+
+ // Create and register a monitor provider for the work queue.
+ try
+ {
+ TraditionalWorkQueueMonitor monitor = new TraditionalWorkQueueMonitor(
+ this);
+ monitor.initializeMonitorProvider(null);
+ DirectoryServer.registerMonitorProvider(monitor);
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+
+ Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
+ String.valueOf(TraditionalWorkQueueMonitor.class),
+ String.valueOf(e));
+ logError(message);
+ }
+ }
+ finally
+ {
+ queueWriteLock.unlock();
}
}
@@ -208,8 +221,17 @@
@Override()
public void finalizeWorkQueue(Message reason)
{
- shutdownRequested = true;
+ queueWriteLock.lock();
+ try
+ {
+ shutdownRequested = true;
+ }
+ finally
+ {
+ queueWriteLock.unlock();
+ }
+ // From now on no more operations can be enqueued or dequeued.
// Send responses to any operations in the pending queue to indicate that
// they won't be processed because the server is shutting down.
@@ -222,7 +244,8 @@
{
// The operation has no chance of responding to the cancel
// request so avoid waiting for a cancel response.
- if (o.getCancelResult() == null) {
+ if (o.getCancelResult() == null)
+ {
o.abort(cancelRequest);
}
}
@@ -233,12 +256,11 @@
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
- String.valueOf(o), String.valueOf(e)));
+ logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(String.valueOf(o),
+ String.valueOf(e)));
}
}
-
// Notify all the worker threads of the shutdown.
for (TraditionalWorkerThread t : workerThreads)
{
@@ -253,8 +275,8 @@
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
- t.getName(), String.valueOf(e)));
+ logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(t.getName(),
+ String.valueOf(e)));
}
}
}
@@ -264,12 +286,20 @@
/**
* Indicates whether this work queue has received a request to shut down.
*
- * @return <CODE>true</CODE> if the work queue has recieved a request to shut
- * down, or <CODE>false</CODE> if not.
+ * @return <CODE>true</CODE> if the work queue has recieved a request to shut
+ * down, or <CODE>false</CODE> if not.
*/
public boolean shutdownRequested()
{
- return shutdownRequested;
+ queueReadLock.lock();
+ try
+ {
+ return shutdownRequested;
+ }
+ finally
+ {
+ queueReadLock.unlock();
+ }
}
@@ -278,46 +308,93 @@
* Submits an operation to be processed by one of the worker threads
* associated with this work queue.
*
- * @param operation The operation to be processed.
- *
- * @throws DirectoryException If the provided operation is not accepted for
- * some reason (e.g., if the server is shutting
- * down or the pending operation queue is already
- * at its maximum capacity).
+ * @param operation
+ * The operation to be processed.
+ * @throws DirectoryException
+ * If the provided operation is not accepted for some reason (e.g.,
+ * if the server is shutting down or the pending operation queue is
+ * already at its maximum capacity).
*/
@Override
public void submitOperation(AbstractOperation operation)
- throws DirectoryException
+ throws DirectoryException
{
- if (shutdownRequested)
+ queueReadLock.lock();
+ try
{
- Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
- throw new DirectoryException(ResultCode.UNAVAILABLE, message);
- }
+ if (shutdownRequested)
+ {
+ Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
+ throw new DirectoryException(ResultCode.UNAVAILABLE, message);
+ }
- if (! opQueue.offer(operation))
+ if (isBlocking)
+ {
+ try
+ {
+ // If the queue is full and there is an administrative change taking
+ // place then starvation could arise: this thread will hold the read
+ // lock, the admin thread will be waiting on the write lock, and the
+ // worker threads may be queued behind the admin thread. Since the
+ // worker threads cannot run, the queue will never empty and allow
+ // this thread to proceed. To help things out we can periodically
+ // yield the read lock when the queue is full.
+ while (!opQueue.offer(operation, 1, TimeUnit.SECONDS))
+ {
+ queueReadLock.unlock();
+ Thread.yield();
+ queueReadLock.lock();
+
+ if (shutdownRequested)
+ {
+ Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
+ throw new DirectoryException(ResultCode.UNAVAILABLE, message);
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // We cannot handle the interruption here. Reject the request and
+ // re-interrupt this thread.
+ Thread.currentThread().interrupt();
+
+ queueFullRejects.incrementAndGet();
+
+ Message message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
+ throw new DirectoryException(ResultCode.BUSY, message);
+ }
+ }
+ else
+ {
+ if (!opQueue.offer(operation))
+ {
+ queueFullRejects.incrementAndGet();
+
+ Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
+ throw new DirectoryException(ResultCode.BUSY, message);
+ }
+ }
+
+ opsSubmitted.incrementAndGet();
+ }
+ finally
{
- queueFullRejects.incrementAndGet();
-
- Message message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
- throw new DirectoryException(ResultCode.UNAVAILABLE, message);
+ queueReadLock.unlock();
}
-
- opsSubmitted.incrementAndGet();
}
/**
* Retrieves the next operation that should be processed by one of the worker
- * threads, blocking if necessary until a new request arrives. This method
+ * threads, blocking if necessary until a new request arrives. This method
* should only be called by a worker thread associated with this work queue.
*
- * @param workerThread The worker thread that is requesting the operation.
- *
- * @return The next operation that should be processed, or <CODE>null</CODE>
- * if the server is shutting down and no more operations will be
- * processed.
+ * @param workerThread
+ * The worker thread that is requesting the operation.
+ * @return The next operation that should be processed, or <CODE>null</CODE>
+ * if the server is shutting down and no more operations will be
+ * processed.
*/
public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
{
@@ -328,131 +405,81 @@
/**
* Retrieves the next operation that should be processed by one of the worker
- * threads following a previous failure attempt. A maximum of five
- * consecutive failures will be allowed before returning <CODE>null</CODE>,
- * which will cause the associated thread to exit.
+ * threads following a previous failure attempt. A maximum of five consecutive
+ * failures will be allowed before returning <CODE>null</CODE>, which will
+ * cause the associated thread to exit.
*
- * @param workerThread The worker thread that is requesting the operation.
- * @param numFailures The number of consecutive failures that the worker
- * thread has experienced so far. If this gets too
- * high, then this method will return <CODE>null</CODE>
- * rather than retrying.
- *
- * @return The next operation that should be processed, or <CODE>null</CODE>
- * if the server is shutting down and no more operations will be
- * processed, or if there have been too many consecutive failures.
+ * @param workerThread
+ * The worker thread that is requesting the operation.
+ * @param numFailures
+ * The number of consecutive failures that the worker thread has
+ * experienced so far. If this gets too high, then this method will
+ * return <CODE>null</CODE> rather than retrying.
+ * @return The next operation that should be processed, or <CODE>null</CODE>
+ * if the server is shutting down and no more operations will be
+ * processed, or if there have been too many consecutive failures.
*/
private AbstractOperation retryNextOperation(
- TraditionalWorkerThread workerThread,
- int numFailures)
+ TraditionalWorkerThread workerThread, int numFailures)
{
- // See if we should kill off this thread. This could be necessary if the
- // number of worker threads has been decreased with the server online. If
+ // See if we should kill off this thread. This could be necessary if the
+ // number of worker threads has been decreased with the server online. If
// so, then return null and the thread will exit.
- if (killThreads)
- {
- synchronized (queueLock)
- {
- try
- {
- int currentThreads = workerThreads.size();
- if (currentThreads > numWorkerThreads)
- {
- if (workerThreads.remove(Thread.currentThread()))
- {
- currentThreads--;
- }
-
- if (currentThreads <= numWorkerThreads)
- {
- killThreads = false;
- }
-
- workerThread.setStoppedByReducedThreadNumber();
- return null;
- }
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- }
- }
-
- if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
- {
- if (numFailures > MAX_RETRY_COUNT)
- {
- Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
- Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
- logError(message);
- }
-
- return null;
- }
-
+ queueReadLock.lock();
try
{
+ if (shutdownRequested)
+ {
+ return null;
+ }
+
+ if (killThreads && tryKillThisWorkerThread(workerThread))
+ {
+ return null;
+ }
+
+ if (numFailures > MAX_RETRY_COUNT)
+ {
+ Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(Thread
+ .currentThread().getName(), numFailures, MAX_RETRY_COUNT);
+ logError(message);
+
+ return null;
+ }
+
while (true)
{
AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
- if (nextOperation == null)
- {
- // There was no work to do in the specified length of time. See if
- // we should shutdown, and if not then just check again.
- if (shutdownRequested)
- {
- return null;
- }
- else if (killThreads)
- {
- synchronized (queueLock)
- {
- try
- {
- int currentThreads = workerThreads.size();
- if (currentThreads > numWorkerThreads)
- {
- if (workerThreads.remove(Thread.currentThread()))
- {
- currentThreads--;
- }
-
- if (currentThreads <= numWorkerThreads)
- {
- killThreads = false;
- }
-
- workerThread.setStoppedByReducedThreadNumber();
- return null;
- }
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- }
- }
- }
- else
+ if (nextOperation != null)
{
return nextOperation;
}
+
+ // There was no work to do in the specified length of time. Release the
+ // read lock allowing shutdown or config changes to proceed and then see
+ // if we should give up or check again.
+ queueReadLock.unlock();
+ Thread.yield();
+ queueReadLock.lock();
+
+ if (shutdownRequested)
+ {
+ return null;
+ }
+
+ if (killThreads && tryKillThisWorkerThread(workerThread))
+ {
+ return null;
+ }
}
}
catch (InterruptedException ie)
{
// This is somewhat expected so don't log.
- // assert debugException(CLASS_NAME, "retryNextOperation", ie);
+ // assert debugException(CLASS_NAME, "retryNextOperation", ie);
// If this occurs, then the worker thread must have been interrupted for
- // some reason. This could be because the Directory Server is shutting
+ // some reason. This could be because the Directory Server is shutting
// down, in which case we should return null.
if (shutdownRequested)
{
@@ -460,10 +487,9 @@
}
// If we've gotten here, then the worker thread was interrupted for some
- // other reason. This should not happen, and we need to log a message.
- logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(
- Thread.currentThread().getName(), String.valueOf(ie)));
- return retryNextOperation(workerThread, numFailures+1);
+ // other reason. This should not happen, and we need to log a message.
+ logError(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN.get(Thread
+ .currentThread().getName(), String.valueOf(ie)));
}
catch (Exception e)
{
@@ -472,40 +498,86 @@
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- // This should not happen. The only recourse we have is to log a message
+ // This should not happen. The only recourse we have is to log a message
// and try again.
- logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
- Thread.currentThread().getName(), String.valueOf(e)));
- return retryNextOperation(workerThread, numFailures + 1);
+ logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(Thread
+ .currentThread().getName(), String.valueOf(e)));
}
+ finally
+ {
+ queueReadLock.unlock();
+ }
+
+ // An exception has occurred - retry.
+ return retryNextOperation(workerThread, numFailures + 1);
}
/**
- * Attempts to remove the specified operation from this queue if it has not
- * yet been picked up for processing by one of the worker threads.
+ * Kills this worker thread if needed. This method assumes that the read lock
+ * is already taken and ensure that it is taken on exit.
*
- * @param operation The operation to remove from the queue.
- *
- * @return <CODE>true</CODE> if the provided request was present in the queue
- * and was removed successfully, or <CODE>false</CODE> it not.
+ * @param workerThread
+ * The worker thread associated with this thread.
+ * @return {@code true} if this thread was killed or is about to be killed as
+ * a result of shutdown.
*/
- public boolean removeOperation(AbstractOperation operation)
+ private boolean tryKillThisWorkerThread(TraditionalWorkerThread workerThread)
{
- return opQueue.remove(operation);
+ queueReadLock.unlock();
+ queueWriteLock.lock();
+ try
+ {
+ if (shutdownRequested)
+ {
+ // Shutdown may have been requested between unlock/lock. This thread is
+ // about to shutdown anyway, so return true.
+ return true;
+ }
+
+ int currentThreads = workerThreads.size();
+ if (currentThreads > numWorkerThreads)
+ {
+ if (workerThreads.remove(Thread.currentThread()))
+ {
+ currentThreads--;
+ }
+
+ if (currentThreads <= numWorkerThreads)
+ {
+ killThreads = false;
+ }
+
+ workerThread.setStoppedByReducedThreadNumber();
+ return true;
+ }
+ }
+ finally
+ {
+ queueWriteLock.unlock();
+ queueReadLock.lock();
+
+ if (shutdownRequested)
+ {
+ // Shutdown may have been requested between unlock/lock. This thread is
+ // about to shutdown anyway, so return true.
+ return true;
+ }
+ }
+ return false;
}
/**
* Retrieves the total number of operations that have been successfully
- * submitted to this work queue for processing since server startup. This
- * does not include operations that have been rejected for some reason like
- * the queue already at its maximum capacity.
+ * submitted to this work queue for processing since server startup. This does
+ * not include operations that have been rejected for some reason like the
+ * queue already at its maximum capacity.
*
- * @return The total number of operations that have been successfully
- * submitted to this work queue since startup.
+ * @return The total number of operations that have been successfully
+ * submitted to this work queue since startup.
*/
public long getOpsSubmitted()
{
@@ -518,8 +590,8 @@
* Retrieves the total number of operations that have been rejected because
* the work queue was already at its maximum capacity.
*
- * @return The total number of operations that have been rejected because the
- * work queue was already at its maximum capacity.
+ * @return The total number of operations that have been rejected because the
+ * work queue was already at its maximum capacity.
*/
public long getOpsRejectedDueToQueueFull()
{
@@ -530,16 +602,24 @@
/**
* Retrieves the number of pending operations in the queue that have not yet
- * been picked up for processing. Note that this method is not a
- * constant-time operation and can be relatively inefficient, so it should be
- * used sparingly.
+ * been picked up for processing. Note that this method is not a constant-time
+ * operation and can be relatively inefficient, so it should be used
+ * sparingly.
*
- * @return The number of pending operations in the queue that have not yet
- * been picked up for processing.
+ * @return The number of pending operations in the queue that have not yet
+ * been picked up for processing.
*/
public int size()
{
- return opQueue.size();
+ queueReadLock.lock();
+ try
+ {
+ return opQueue.size();
+ }
+ finally
+ {
+ queueReadLock.unlock();
+ }
}
@@ -548,8 +628,7 @@
* {@inheritDoc}
*/
public boolean isConfigurationChangeAcceptable(
- TraditionalWorkQueueCfg configuration,
- List<Message> unacceptableReasons)
+ TraditionalWorkQueueCfg configuration, List<Message> unacceptableReasons)
{
return true;
}
@@ -560,122 +639,126 @@
* {@inheritDoc}
*/
public ConfigChangeResult applyConfigurationChange(
- TraditionalWorkQueueCfg configuration)
+ TraditionalWorkQueueCfg configuration)
{
ArrayList<Message> resultMessages = new ArrayList<Message>();
- int newNumThreads = getNumWorkerThreads(configuration);
+ int newNumThreads = getNumWorkerThreads(configuration);
int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
-
// Apply a change to the number of worker threads if appropriate.
int currentThreads = workerThreads.size();
if (newNumThreads != currentThreads)
{
- synchronized (queueLock)
+ queueWriteLock.lock();
+ try
{
- try
+ int threadsToAdd = newNumThreads - currentThreads;
+ if (threadsToAdd > 0)
{
- int threadsToAdd = newNumThreads - currentThreads;
- if (threadsToAdd > 0)
+ for (int i = 0; i < threadsToAdd; i++)
{
- for (int i=0; i < threadsToAdd; i++)
- {
- TraditionalWorkerThread t =
- new TraditionalWorkerThread(this, lastThreadNumber++);
- workerThreads.add(t);
- t.start();
- }
-
- killThreads = false;
- }
- else
- {
- killThreads = true;
+ TraditionalWorkerThread t = new TraditionalWorkerThread(this,
+ lastThreadNumber++);
+ workerThreads.add(t);
+ t.start();
}
- numWorkerThreads = newNumThreads;
+ killThreads = false;
}
- catch (Exception e)
+ else
{
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
+ killThreads = true;
}
+
+ numWorkerThreads = newNumThreads;
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ finally
+ {
+ queueWriteLock.unlock();
}
}
- // Apply a change to the maximum capacity if appropriate. Since we can't
+ // Apply a change to the maximum capacity if appropriate. Since we can't
// change capacity on the fly, then we'll have to create a new queue and
- // transfer any remaining items into it. Any thread that is waiting on the
+ // transfer any remaining items into it. Any thread that is waiting on the
// original queue will time out after at most a few seconds and further
// checks will be against the new queue.
if (newMaxCapacity != maxCapacity)
{
- synchronized (queueLock)
+ // First switch the queue with the exclusive lock.
+ queueWriteLock.lock();
+ LinkedBlockingQueue<AbstractOperation> oldOpQueue;
+ try
{
- try
+ LinkedBlockingQueue<AbstractOperation> newOpQueue = null;
+ if (newMaxCapacity > 0)
{
- LinkedBlockingQueue<AbstractOperation> newOpQueue;
- if (newMaxCapacity > 0)
- {
- newOpQueue =
- new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
- }
- else
- {
- newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
- }
-
- LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
- opQueue = newOpQueue;
-
- LinkedList<AbstractOperation> pendingOps =
- new LinkedList<AbstractOperation>();
- oldOpQueue.drainTo(pendingOps);
-
-
- // We have to be careful when adding any existing pending operations
- // because the new capacity could be less than what was already
- // backlogged in the previous queue. If that happens, we may have to
- // loop a few times to get everything in there.
- while (! pendingOps.isEmpty())
- {
- Iterator<AbstractOperation> iterator = pendingOps.iterator();
- while (iterator.hasNext())
- {
- AbstractOperation o = iterator.next();
- try
- {
- if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
- {
- iterator.remove();
- }
- }
- catch (InterruptedException ie)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, ie);
- }
- }
- }
- }
-
- maxCapacity = newMaxCapacity;
+ newOpQueue = new LinkedBlockingQueue<AbstractOperation>(
+ newMaxCapacity);
}
- catch (Exception e)
+ else
{
- if (debugEnabled())
+ newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
+ }
+
+ oldOpQueue = opQueue;
+ opQueue = newOpQueue;
+
+ maxCapacity = newMaxCapacity;
+ }
+ finally
+ {
+ queueWriteLock.unlock();
+ }
+
+ // Now resubmit any pending requests - we'll need the shared lock.
+ AbstractOperation pendingOperation = null;
+ queueReadLock.lock();
+ try
+ {
+ // We have to be careful when adding any existing pending operations
+ // because the new capacity could be less than what was already
+ // backlogged in the previous queue. If that happens, we may have to
+ // loop a few times to get everything in there.
+ while ((pendingOperation = oldOpQueue.poll()) != null)
+ {
+ opQueue.put(pendingOperation);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // We cannot handle the interruption here. Cancel pending requests and
+ // re-interrupt this thread.
+ Thread.currentThread().interrupt();
+
+ Message message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
+ CancelRequest cancelRequest = new CancelRequest(true, message);
+ if (pendingOperation != null)
+ {
+ pendingOperation.abort(cancelRequest);
+ }
+ while ((pendingOperation = oldOpQueue.poll()) != null)
+ {
+ if (pendingOperation != null)
{
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ pendingOperation.abort(cancelRequest);
}
}
}
+ finally
+ {
+ queueReadLock.unlock();
+ }
}
-
return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
}
@@ -687,13 +770,14 @@
@Override()
public boolean isIdle()
{
- if (opQueue.size() > 0)
+ queueReadLock.lock();
+ try
{
- return false;
- }
+ if (opQueue.size() > 0)
+ {
+ return false;
+ }
- synchronized (queueLock)
- {
for (TraditionalWorkerThread t : workerThreads)
{
if (t.isActive())
@@ -704,6 +788,10 @@
return true;
}
+ finally
+ {
+ queueReadLock.unlock();
+ }
}
@@ -728,4 +816,3 @@
}
}
}
-
--
Gitblit v1.10.0