From 582344d280d24dfec999b862d8255eb077995b99 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 26 Mar 2013 14:49:27 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler 

---
 opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java |   50 ++++++++++++++++++++++++++++----------------------
 1 files changed, 28 insertions(+), 22 deletions(-)

diff --git a/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java b/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
index a910bef..7cf3e35 100644
--- a/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
+++ b/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS.
  */
 package org.opends.server.extensions;
 
@@ -83,31 +84,37 @@
 
 
 
-  // The set of worker threads that will be used to process this work queue.
+  /** The set of worker threads that will be used to process this work queue. */
   private ArrayList<ParallelWorkerThread> workerThreads;
 
-  // The number of operations that have been submitted to the work queue for
-  // processing.
+  /**
+   * The number of operations that have been submitted to the work queue for
+   * processing.
+   */
   private AtomicLong opsSubmitted;
 
-  // Indicates whether one or more of the worker threads needs to be killed at
-  // the next convenient opportunity.
+  /**
+   * Indicates whether one or more of the worker threads needs to be killed at
+   * the next convenient opportunity.
+   */
   private boolean killThreads;
 
-  // Indicates whether the Directory Server is shutting down.
+  /** Indicates whether the Directory Server is shutting down. */
   private boolean shutdownRequested;
 
-  // The thread number used for the last worker thread that was created.
+  /** The thread number used for the last worker thread that was created. */
   private int lastThreadNumber;
 
-  // The number of worker threads that should be active (or will be shortly if
-  // a configuration change has not been completely applied).
+  /**
+   * The number of worker threads that should be active (or will be shortly if a
+   * configuration change has not been completely applied).
+   */
   private int numWorkerThreads;
 
-  // The queue that will be used to actually hold the pending operations.
-  private ConcurrentLinkedQueue<AbstractOperation> opQueue;
+  /** The queue that will be used to actually hold the pending operations. */
+  private ConcurrentLinkedQueue<Operation> opQueue;
 
-  // The lock used to provide threadsafe access for the queue.
+  /** The lock used to provide threadsafe access for the queue. */
   private final Object queueLock = new Object();
 
 
@@ -143,7 +150,7 @@
     numWorkerThreads = getNumWorkerThreads(configuration);
 
     // Create the actual work queue.
-    opQueue = new ConcurrentLinkedQueue<AbstractOperation>();
+    opQueue = new ConcurrentLinkedQueue<Operation>();
 
     // Create the set of worker threads that should be used to service the
     // work queue.
@@ -266,8 +273,7 @@
    *                              at its maximum capacity).
    */
   @Override
-  public void submitOperation(AbstractOperation operation)
-         throws DirectoryException
+  public void submitOperation(Operation operation) throws DirectoryException
   {
     if (shutdownRequested)
     {
@@ -294,7 +300,7 @@
    *          if the server is shutting down and no more operations will be
    *          processed.
    */
-  public AbstractOperation nextOperation(ParallelWorkerThread workerThread)
+  public Operation nextOperation(ParallelWorkerThread workerThread)
   {
     return retryNextOperation(workerThread, 0);
   }
@@ -317,7 +323,7 @@
    *          if the server is shutting down and no more operations will be
    *          processed, or if there have been too many consecutive failures.
    */
-  private AbstractOperation retryNextOperation(
+  private Operation retryNextOperation(
                                        ParallelWorkerThread workerThread,
                                        int numFailures)
   {
@@ -333,8 +339,7 @@
           int currentThreads = workerThreads.size();
           if (currentThreads > numWorkerThreads)
           {
-            if (workerThreads.remove((ParallelWorkerThread)
-              Thread.currentThread()))
+            if (workerThreads.remove(Thread.currentThread()))
             {
               currentThreads--;
             }
@@ -374,7 +379,7 @@
     {
       while (true)
       {
-        AbstractOperation nextOperation = null;
+        Operation nextOperation = null;
         if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
           nextOperation = opQueue.poll();
         }
@@ -395,8 +400,7 @@
                 int currentThreads = workerThreads.size();
                 if (currentThreads > numWorkerThreads)
                 {
-                  if (workerThreads.remove((ParallelWorkerThread)
-                    Thread.currentThread()))
+                  if (workerThreads.remove(Thread.currentThread()))
                   {
                     currentThreads--;
                   }
@@ -494,6 +498,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public boolean isConfigurationChangeAcceptable(
                       ParallelWorkQueueCfg configuration,
                       List<Message> unacceptableReasons)
@@ -506,6 +511,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public ConfigChangeResult applyConfigurationChange(
                                  ParallelWorkQueueCfg configuration)
   {

--
Gitblit v1.10.0