From 582344d280d24dfec999b862d8255eb077995b99 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 26 Mar 2013 14:49:27 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler 

---
 opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java |  106 ++++++++++++++++++++++++++++++-----------------------
 1 files changed, 60 insertions(+), 46 deletions(-)

diff --git a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
index a3d04fc..4f79854 100644
--- a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
+++ b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS.
  */
 package org.opends.server.extensions;
 
@@ -30,9 +31,8 @@
 
 import static org.opends.messages.ConfigMessages.*;
 import static org.opends.messages.CoreMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -73,54 +73,70 @@
    */
   private static final int MAX_RETRY_COUNT = 5;
 
-  // The set of worker threads that will be used to process this work queue.
+  /** The set of worker threads that will be used to process this work queue. */
   private final ArrayList<TraditionalWorkerThread> workerThreads =
     new ArrayList<TraditionalWorkerThread>();
 
-  // The number of operations that have been submitted to the work queue for
-  // processing.
+  /**
+   * The number of operations that have been submitted to the work queue for
+   * processing.
+   */
   private AtomicLong opsSubmitted;
 
-  // The number of times that an attempt to submit a new request has been
-  // rejected because the work queue is already at its maximum capacity.
+  /**
+   * The number of times that an attempt to submit a new request has been
+   * rejected because the work queue is already at its maximum capacity.
+   */
   private AtomicLong queueFullRejects;
 
-  // Indicates whether one or more of the worker threads needs to be killed at
-  // the next convenient opportunity.
+  /**
+   * Indicates whether one or more of the worker threads needs to be killed at
+   * the next convenient opportunity.
+   */
   private boolean killThreads;
 
-  // Indicates whether the Directory Server is shutting down.
+  /** Indicates whether the Directory Server is shutting down. */
   private boolean shutdownRequested;
 
-  // The thread number used for the last worker thread that was created.
+  /** The thread number used for the last worker thread that was created. */
   private int lastThreadNumber;
 
-  // The maximum number of pending requests that this work queue will allow
-  // before it will start rejecting them.
+  /**
+   * The maximum number of pending requests that this work queue will allow
+   * before it will start rejecting them.
+   */
   private int maxCapacity;
 
-  // The number of worker threads that should be active (or will be shortly if
-  // a configuration change has not been completely applied).
+  /**
+   * The number of worker threads that should be active (or will be shortly if a
+   * configuration change has not been completely applied).
+   */
   private int numWorkerThreads;
 
-  // The queue overflow policy: true indicates that operations will be blocked
-  // until the queue has available capacity, otherwise operations will be
-  // rejected.
-  //
-  // This is hard-coded to true for now because a reject on full policy does
-  // not seem to have a valid use case.
-  //
+  /**
+   * The queue overflow policy: true indicates that operations will be blocked
+   * until the queue has available capacity, otherwise operations will be
+   * rejected.
+   * <p>
+   * This is hard-coded to true for now because a reject on full policy does not
+   * seem to have a valid use case.
+   * </p>
+   */
   private final boolean isBlocking = true;
 
-  // The queue that will be used to actually hold the pending operations.
-  private LinkedBlockingQueue<AbstractOperation> opQueue;
+  /** The queue that will be used to actually hold the pending operations. */
+  private LinkedBlockingQueue<Operation> opQueue;
 
-  // The locks used to provide threadsafe access for the queue.
-
-  // Used for non-config changes.
+  /**
+   * The lock used to provide threadsafe access for the queue, used for
+   * non-config changes.
+   */
   private final ReadLock queueReadLock;
 
-  // Used for config changes.
+  /**
+   * The lock used to provide threadsafe access for the queue, used for config
+   * changes.
+   */
   private final WriteLock queueWriteLock;
   {
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -166,13 +182,13 @@
       // Create the actual work queue.
       if (maxCapacity > 0)
       {
-        opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity);
+        opQueue = new LinkedBlockingQueue<Operation>(maxCapacity);
       }
       else
       {
         // This will never be the case, since the configuration definition
         // ensures that the capacity is always finite.
-        opQueue = new LinkedBlockingQueue<AbstractOperation>();
+        opQueue = new LinkedBlockingQueue<Operation>();
       }
 
       // Create the set of worker threads that should be used to service the
@@ -316,8 +332,7 @@
    *           already at its maximum capacity).
    */
   @Override
-  public void submitOperation(AbstractOperation operation)
-      throws DirectoryException
+  public void submitOperation(Operation operation) throws DirectoryException
   {
     queueReadLock.lock();
     try
@@ -396,7 +411,7 @@
    *         if the server is shutting down and no more operations will be
    *         processed.
    */
-  public AbstractOperation nextOperation(TraditionalWorkerThread workerThread)
+  public Operation nextOperation(TraditionalWorkerThread workerThread)
   {
     return retryNextOperation(workerThread, 0);
   }
@@ -419,8 +434,8 @@
    *         if the server is shutting down and no more operations will be
    *         processed, or if there have been too many consecutive failures.
    */
-  private AbstractOperation retryNextOperation(
-      TraditionalWorkerThread workerThread, int numFailures)
+  private Operation retryNextOperation(TraditionalWorkerThread workerThread,
+      int numFailures)
   {
     // See if we should kill off this thread. This could be necessary if the
     // number of worker threads has been decreased with the server online. If
@@ -449,7 +464,7 @@
 
       while (true)
       {
-        AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
+        Operation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
         if (nextOperation != null)
         {
           return nextOperation;
@@ -627,6 +642,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public boolean isConfigurationChangeAcceptable(
       TraditionalWorkQueueCfg configuration, List<Message> unacceptableReasons)
   {
@@ -638,6 +654,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public ConfigChangeResult applyConfigurationChange(
       TraditionalWorkQueueCfg configuration)
   {
@@ -695,18 +712,18 @@
     {
       // First switch the queue with the exclusive lock.
       queueWriteLock.lock();
-      LinkedBlockingQueue<AbstractOperation> oldOpQueue;
+      LinkedBlockingQueue<Operation> oldOpQueue;
       try
       {
-        LinkedBlockingQueue<AbstractOperation> newOpQueue = null;
+        LinkedBlockingQueue<Operation> newOpQueue = null;
         if (newMaxCapacity > 0)
         {
-          newOpQueue = new LinkedBlockingQueue<AbstractOperation>(
+          newOpQueue = new LinkedBlockingQueue<Operation>(
               newMaxCapacity);
         }
         else
         {
-          newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
+          newOpQueue = new LinkedBlockingQueue<Operation>();
         }
 
         oldOpQueue = opQueue;
@@ -720,7 +737,7 @@
       }
 
       // Now resubmit any pending requests - we'll need the shared lock.
-      AbstractOperation pendingOperation = null;
+      Operation pendingOperation = null;
       queueReadLock.lock();
       try
       {
@@ -747,10 +764,7 @@
         }
         while ((pendingOperation = oldOpQueue.poll()) != null)
         {
-          if (pendingOperation != null)
-          {
-            pendingOperation.abort(cancelRequest);
-          }
+          pendingOperation.abort(cancelRequest);
         }
       }
       finally

--
Gitblit v1.10.0