From 269a1d06ff820c287bb21a03fa76e3314110516a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 22 Apr 2013 10:28:50 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler
---
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java | 69 +++++++++++++++++++++++-----------
1 files changed, 46 insertions(+), 23 deletions(-)
diff --git a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
index 4f79854..5d5fecd 100644
--- a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
+++ b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -51,7 +51,13 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.monitors.TraditionalWorkQueueMonitor;
-import org.opends.server.types.*;
+import org.opends.server.types.CancelRequest;
+import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.Operation;
+import org.opends.server.types.ResultCode;
@@ -176,7 +182,8 @@
configuration.addTraditionalChangeListener(this);
// Get the necessary configuration from the provided entry.
- numWorkerThreads = getNumWorkerThreads(configuration);
+ numWorkerThreads =
+ computeNumWorkerThreads(configuration.getNumWorkerThreads());
maxCapacity = configuration.getMaxWorkQueueCapacity();
// Create the actual work queue.
@@ -334,6 +341,32 @@
@Override
public void submitOperation(Operation operation) throws DirectoryException
{
+ submitOperation(operation, isBlocking);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean trySubmitOperation(Operation operation)
+ throws DirectoryException
+ {
+ try
+ {
+ submitOperation(operation, false);
+ return true;
+ }
+ catch (DirectoryException e)
+ {
+ if (ResultCode.BUSY == e.getResultCode())
+ {
+ return false;
+ }
+ throw e;
+ }
+ }
+
+ private void submitOperation(Operation operation,
+ boolean blockEnqueuingWhenFull) throws DirectoryException
+ {
queueReadLock.lock();
try
{
@@ -343,7 +376,7 @@
throw new DirectoryException(ResultCode.UNAVAILABLE, message);
}
- if (isBlocking)
+ if (blockEnqueuingWhenFull)
{
try
{
@@ -659,7 +692,8 @@
TraditionalWorkQueueCfg configuration)
{
ArrayList<Message> resultMessages = new ArrayList<Message>();
- int newNumThreads = getNumWorkerThreads(configuration);
+ int newNumThreads =
+ computeNumWorkerThreads(configuration.getNumWorkerThreads());
int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
// Apply a change to the number of worker threads if appropriate.
@@ -808,25 +842,14 @@
}
}
-
-
- // Determine the number of worker threads.
- private int getNumWorkerThreads(TraditionalWorkQueueCfg configuration)
+ /**
+ * Return the number of worker threads used by this WorkQueue.
+ *
+ * @return the number of worker threads used by this WorkQueue
+ */
+ @Override
+ public int getNumWorkerThreads()
{
- if (configuration.getNumWorkerThreads() == null)
- {
- // Automatically choose based on the number of processors.
- int cpus = Runtime.getRuntime().availableProcessors();
- int value = Math.max(24, cpus * 2);
-
- Message message = INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL.get(value);
- logError(message);
-
- return value;
- }
- else
- {
- return configuration.getNumWorkerThreads();
- }
+ return this.numWorkerThreads;
}
}
--
Gitblit v1.10.0