From 582344d280d24dfec999b862d8255eb077995b99 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 26 Mar 2013 14:49:27 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler
---
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java | 106 ++++++++++++++++++++++++++++++-----------------------
1 files changed, 60 insertions(+), 46 deletions(-)
diff --git a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
index a3d04fc..4f79854 100644
--- a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
+++ b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.extensions;
@@ -30,9 +31,8 @@
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
import java.util.ArrayList;
import java.util.List;
@@ -73,54 +73,70 @@
*/
private static final int MAX_RETRY_COUNT = 5;
- // The set of worker threads that will be used to process this work queue.
+ /** The set of worker threads that will be used to process this work queue. */
private final ArrayList<TraditionalWorkerThread> workerThreads =
new ArrayList<TraditionalWorkerThread>();
- // The number of operations that have been submitted to the work queue for
- // processing.
+ /**
+ * The number of operations that have been submitted to the work queue for
+ * processing.
+ */
private AtomicLong opsSubmitted;
- // The number of times that an attempt to submit a new request has been
- // rejected because the work queue is already at its maximum capacity.
+ /**
+ * The number of times that an attempt to submit a new request has been
+ * rejected because the work queue is already at its maximum capacity.
+ */
private AtomicLong queueFullRejects;
- // Indicates whether one or more of the worker threads needs to be killed at
- // the next convenient opportunity.
+ /**
+ * Indicates whether one or more of the worker threads needs to be killed at
+ * the next convenient opportunity.
+ */
private boolean killThreads;
- // Indicates whether the Directory Server is shutting down.
+ /** Indicates whether the Directory Server is shutting down. */
private boolean shutdownRequested;
- // The thread number used for the last worker thread that was created.
+ /** The thread number used for the last worker thread that was created. */
private int lastThreadNumber;
- // The maximum number of pending requests that this work queue will allow
- // before it will start rejecting them.
+ /**
+ * The maximum number of pending requests that this work queue will allow
+ * before it will start rejecting them.
+ */
private int maxCapacity;
- // The number of worker threads that should be active (or will be shortly if
- // a configuration change has not been completely applied).
+ /**
+ * The number of worker threads that should be active (or will be shortly if a
+ * configuration change has not been completely applied).
+ */
private int numWorkerThreads;
- // The queue overflow policy: true indicates that operations will be blocked
- // until the queue has available capacity, otherwise operations will be
- // rejected.
- //
- // This is hard-coded to true for now because a reject on full policy does
- // not seem to have a valid use case.
- //
+ /**
+ * The queue overflow policy: true indicates that operations will be blocked
+ * until the queue has available capacity, otherwise operations will be
+ * rejected.
+ * <p>
+ * This is hard-coded to true for now because a reject on full policy does not
+ * seem to have a valid use case.
+ * </p>
+ */
private final boolean isBlocking = true;
- // The queue that will be used to actually hold the pending operations.
- private LinkedBlockingQueue<AbstractOperation> opQueue;
+ /** The queue that will be used to actually hold the pending operations. */
+ private LinkedBlockingQueue<Operation> opQueue;
- // The locks used to provide threadsafe access for the queue.
-
- // Used for non-config changes.
+ /**
+ * The lock used to provide threadsafe access for the queue, used for
+ * non-config changes.
+ */
private final ReadLock queueReadLock;
- // Used for config changes.
+ /**
+ * The lock used to provide threadsafe access for the queue, used for config
+ * changes.
+ */
private final WriteLock queueWriteLock;
{
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -166,13 +182,13 @@
// Create the actual work queue.
if (maxCapacity > 0)
{
- opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
+ opQueue = new LinkedBlockingQueue<Operation>(maxCapacity);
}
else
{
// This will never be the case, since the configuration definition
// ensures that the capacity is always finite.
- opQueue = new LinkedBlockingQueue<AbstractOperation>();
+ opQueue = new LinkedBlockingQueue<Operation>();
}
// Create the set of worker threads that should be used to service the
@@ -316,8 +332,7 @@
* already at its maximum capacity).
*/
@Override
- public void submitOperation(AbstractOperation operation)
- throws DirectoryException
+ public void submitOperation(Operation operation) throws DirectoryException
{
queueReadLock.lock();
try
@@ -396,7 +411,7 @@
* if the server is shutting down and no more operations will be
* processed.
*/
- public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
+ public Operation nextOperation(TraditionalWorkerThread workerThread)
{
return retryNextOperation(workerThread, 0);
}
@@ -419,8 +434,8 @@
* if the server is shutting down and no more operations will be
* processed, or if there have been too many consecutive failures.
*/
- private AbstractOperation retryNextOperation(
- TraditionalWorkerThread workerThread, int numFailures)
+ private Operation retryNextOperation(TraditionalWorkerThread workerThread,
+ int numFailures)
{
// See if we should kill off this thread. This could be necessary if the
// number of worker threads has been decreased with the server online. If
@@ -449,7 +464,7 @@
while (true)
{
- AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
+ Operation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
if (nextOperation != null)
{
return nextOperation;
@@ -627,6 +642,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isConfigurationChangeAcceptable(
TraditionalWorkQueueCfg configuration, List<Message> unacceptableReasons)
{
@@ -638,6 +654,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public ConfigChangeResult applyConfigurationChange(
TraditionalWorkQueueCfg configuration)
{
@@ -695,18 +712,18 @@
{
// First switch the queue with the exclusive lock.
queueWriteLock.lock();
- LinkedBlockingQueue<AbstractOperation> oldOpQueue;
+ LinkedBlockingQueue<Operation> oldOpQueue;
try
{
- LinkedBlockingQueue<AbstractOperation> newOpQueue = null;
+ LinkedBlockingQueue<Operation> newOpQueue = null;
if (newMaxCapacity > 0)
{
- newOpQueue = new LinkedBlockingQueue<AbstractOperation>(
+ newOpQueue = new LinkedBlockingQueue<Operation>(
newMaxCapacity);
}
else
{
- newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
+ newOpQueue = new LinkedBlockingQueue<Operation>();
}
oldOpQueue = opQueue;
@@ -720,7 +737,7 @@
}
// Now resubmit any pending requests - we'll need the shared lock.
- AbstractOperation pendingOperation = null;
+ Operation pendingOperation = null;
queueReadLock.lock();
try
{
@@ -747,10 +764,7 @@
}
while ((pendingOperation = oldOpQueue.poll()) != null)
{
- if (pendingOperation != null)
- {
- pendingOperation.abort(cancelRequest);
- }
+ pendingOperation.abort(cancelRequest);
}
}
finally
--
Gitblit v1.10.0