From 582344d280d24dfec999b862d8255eb077995b99 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 26 Mar 2013 14:49:27 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler
---
opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java | 50 ++++++++++++++++++++++++++++----------------------
1 files changed, 28 insertions(+), 22 deletions(-)
diff --git a/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java b/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
index a910bef..7cf3e35 100644
--- a/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
+++ b/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.extensions;
@@ -83,31 +84,37 @@
- // The set of worker threads that will be used to process this work queue.
+ /** The set of worker threads that will be used to process this work queue. */
private ArrayList<ParallelWorkerThread> workerThreads;
- // The number of operations that have been submitted to the work queue for
- // processing.
+ /**
+ * The number of operations that have been submitted to the work queue for
+ * processing.
+ */
private AtomicLong opsSubmitted;
- // Indicates whether one or more of the worker threads needs to be killed at
- // the next convenient opportunity.
+ /**
+ * Indicates whether one or more of the worker threads needs to be killed at
+ * the next convenient opportunity.
+ */
private boolean killThreads;
- // Indicates whether the Directory Server is shutting down.
+ /** Indicates whether the Directory Server is shutting down. */
private boolean shutdownRequested;
- // The thread number used for the last worker thread that was created.
+ /** The thread number used for the last worker thread that was created. */
private int lastThreadNumber;
- // The number of worker threads that should be active (or will be shortly if
- // a configuration change has not been completely applied).
+ /**
+ * The number of worker threads that should be active (or will be shortly if a
+ * configuration change has not been completely applied).
+ */
private int numWorkerThreads;
- // The queue that will be used to actually hold the pending operations.
- private ConcurrentLinkedQueue<AbstractOperation> opQueue;
+ /** The queue that will be used to actually hold the pending operations. */
+ private ConcurrentLinkedQueue<Operation> opQueue;
- // The lock used to provide threadsafe access for the queue.
+ /** The lock used to provide threadsafe access for the queue. */
private final Object queueLock = new Object();
@@ -143,7 +150,7 @@
numWorkerThreads = getNumWorkerThreads(configuration);
// Create the actual work queue.
- opQueue = new ConcurrentLinkedQueue<AbstractOperation>();
+ opQueue = new ConcurrentLinkedQueue<Operation>();
// Create the set of worker threads that should be used to service the
// work queue.
@@ -266,8 +273,7 @@
* at its maximum capacity).
*/
@Override
- public void submitOperation(AbstractOperation operation)
- throws DirectoryException
+ public void submitOperation(Operation operation) throws DirectoryException
{
if (shutdownRequested)
{
@@ -294,7 +300,7 @@
* if the server is shutting down and no more operations will be
* processed.
*/
- public AbstractOperation nextOperation(ParallelWorkerThread workerThread)
+ public Operation nextOperation(ParallelWorkerThread workerThread)
{
return retryNextOperation(workerThread, 0);
}
@@ -317,7 +323,7 @@
* if the server is shutting down and no more operations will be
* processed, or if there have been too many consecutive failures.
*/
- private AbstractOperation retryNextOperation(
+ private Operation retryNextOperation(
ParallelWorkerThread workerThread,
int numFailures)
{
@@ -333,8 +339,7 @@
int currentThreads = workerThreads.size();
if (currentThreads > numWorkerThreads)
{
- if (workerThreads.remove((ParallelWorkerThread)
- Thread.currentThread()))
+ if (workerThreads.remove(Thread.currentThread()))
{
currentThreads--;
}
@@ -374,7 +379,7 @@
{
while (true)
{
- AbstractOperation nextOperation = null;
+ Operation nextOperation = null;
if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
nextOperation = opQueue.poll();
}
@@ -395,8 +400,7 @@
int currentThreads = workerThreads.size();
if (currentThreads > numWorkerThreads)
{
- if (workerThreads.remove((ParallelWorkerThread)
- Thread.currentThread()))
+ if (workerThreads.remove(Thread.currentThread()))
{
currentThreads--;
}
@@ -494,6 +498,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isConfigurationChangeAcceptable(
ParallelWorkQueueCfg configuration,
List<Message> unacceptableReasons)
@@ -506,6 +511,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public ConfigChangeResult applyConfigurationChange(
ParallelWorkQueueCfg configuration)
{
--
Gitblit v1.10.0