From 6945df4500d706d53a192c78d63e7d2c7e83258c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 22 Apr 2013 10:28:50 +0000
Subject: [PATCH] OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler

---
 opendj-sdk/opends/src/server/org/opends/server/protocols/jmx/JmxClientConnection.java                               |    9 
 opendj-sdk/opends/src/server/org/opends/server/api/ClientConnection.java                                            |   10 
 opendj-sdk/opends/src/server/org/opends/server/core/DirectoryServer.java                                            |   55 +++-
 opendj-sdk/opends/src/server/org/opends/server/protocols/http/SdkConnectionAdapter.java                             |   17 +
 opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/HTTPConnectionHandlerConfiguration.xml                 |   27 ++
 opendj-sdk/opends/src/admin/messages/HTTPConnectionHandlerCfgDefn.properties                                        |    3 
 opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java                                 |   69 +++-
 opendj-sdk/opends/src/server/org/opends/server/core/UnbindOperationWrapper.java                                     |   50 +++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/MockClientConnection.java |   13 
 opendj-sdk/opends/src/server/org/opends/server/core/AbandonOperationWrapper.java                                    |   58 ++++
 opendj-sdk/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java                                    |   51 +--
 opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java                                   |  246 ++++++++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/protocols/internal/InternalClientConnection.java                     |    8 
 opendj-sdk/opends/src/server/org/opends/server/core/ExtendedOperationWrapper.java                                   |   94 +++++++
 opendj-sdk/opends/src/server/org/opends/server/api/WorkQueue.java                                                   |   55 ++++
 opendj-sdk/opends/resource/schema/02-config.ldif                                                                    |    3 
 opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java                             |    7 
 17 files changed, 700 insertions(+), 75 deletions(-)

diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index fbe5b04..d620358 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -3855,7 +3855,8 @@
         ds-cfg-max-blocked-write-time-limit $
         ds-cfg-buffer-size $
         ds-cfg-config-file $
-        ds-cfg-authentication-required )
+        ds-cfg-authentication-required $
+        ds-cfg-max-concurrent-ops-per-connection )
   X-ORIGIN 'OpenDJ Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.14
   NAME 'ds-cfg-entry-cache'
diff --git a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/HTTPConnectionHandlerConfiguration.xml b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/HTTPConnectionHandlerConfiguration.xml
index 5f2f9cd..ae4d7fe 100644
--- a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/HTTPConnectionHandlerConfiguration.xml
+++ b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/HTTPConnectionHandlerConfiguration.xml
@@ -481,4 +481,31 @@
       </ldap:attribute>
     </adm:profile>
   </adm:property>
+  <adm:property name="max-concurrent-ops-per-connection">
+    <adm:synopsis>
+      Specifies the maximum number of internal operations that each
+      HTTP client connection can execute concurrently.
+    </adm:synopsis>
+    <adm:description>
+      This property allow to limit the impact that each HTTP request can have on
+      the whole server by limiting the number of internal operations that each
+      HTTP request can execute concurrently.
+      A value of 0 means that no limit is enforced.
+    </adm:description>
+    <adm:default-behavior>
+      <adm:alias>
+        <adm:synopsis>
+          Let the server decide.
+        </adm:synopsis>
+      </adm:alias>
+    </adm:default-behavior>
+    <adm:syntax>
+      <adm:integer lower-limit="0"/>
+    </adm:syntax>
+    <adm:profile name="ldap">
+      <ldap:attribute>
+        <ldap:name>ds-cfg-max-concurrent-ops-per-connection</ldap:name>
+      </ldap:attribute>
+    </adm:profile>
+  </adm:property>
 </adm:managed-object>
diff --git a/opendj-sdk/opends/src/admin/messages/HTTPConnectionHandlerCfgDefn.properties b/opendj-sdk/opends/src/admin/messages/HTTPConnectionHandlerCfgDefn.properties
index b3babda..65f6e25 100644
--- a/opendj-sdk/opends/src/admin/messages/HTTPConnectionHandlerCfgDefn.properties
+++ b/opendj-sdk/opends/src/admin/messages/HTTPConnectionHandlerCfgDefn.properties
@@ -33,6 +33,9 @@
 property.listen-port.description=Only a single port number may be provided.
 property.max-blocked-write-time-limit.synopsis=Specifies the maximum length of time that attempts to write data to HTTP clients should be allowed to block.
 property.max-blocked-write-time-limit.description=If an attempt to write data to a client takes longer than this length of time, then the client connection is terminated.
+property.max-concurrent-ops-per-connection.synopsis=Specifies the maximum number of internal operations that each HTTP client connection can execute concurrently.
+property.max-concurrent-ops-per-connection.description=This property allow to limit the impact that each HTTP request can have on the whole server by limiting the number of internal operations that each HTTP request can execute concurrently. A value of 0 means that no limit is enforced.
+property.max-concurrent-ops-per-connection.default-behavior.alias.synopsis=Let the server decide.
 property.max-request-size.synopsis=Specifies the size in bytes of the largest HTTP request message that will be allowed by the HTTP Connection Handler.
 property.max-request-size.description=This can help prevent denial-of-service attacks by clients that indicate they send extremely large requests to the server causing it to attempt to allocate large amounts of memory.
 property.ssl-cert-nickname.synopsis=Specifies the nickname (also called the alias) of the certificate that the HTTP Connection Handler should use when performing SSL communication.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/api/ClientConnection.java b/opendj-sdk/opends/src/server/org/opends/server/api/ClientConnection.java
index cbc65f4..917cf59 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/api/ClientConnection.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/api/ClientConnection.java
@@ -55,10 +55,10 @@
 import org.opends.server.types.AuthenticationInfo;
 import org.opends.server.types.CancelRequest;
 import org.opends.server.types.CancelResult;
+import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.DisconnectReason;
-import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
 import org.opends.server.types.IntermediateResponse;
 import org.opends.server.types.Operation;
@@ -419,7 +419,13 @@
    */
   public abstract InetAddress getLocalAddress();
 
-
+  /**
+   * Returns whether the Directory Server believes this connection to be valid
+   * and available for communication.
+   *
+   * @return true if the connection is valid, false otherwise
+   */
+  public abstract boolean isConnectionValid();
 
   /**
    * Indicates whether this client connection is currently using a
diff --git a/opendj-sdk/opends/src/server/org/opends/server/api/WorkQueue.java b/opendj-sdk/opends/src/server/org/opends/server/api/WorkQueue.java
index b9fddd5..78d6be1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/api/WorkQueue.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/api/WorkQueue.java
@@ -27,6 +27,9 @@
  */
 package org.opends.server.api;
 
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+
 import org.opends.messages.Message;
 import org.opends.server.admin.std.server.WorkQueueCfg;
 import org.opends.server.config.ConfigException;
@@ -106,6 +109,22 @@
 
 
   /**
+   * Tries to submit an operation to be processed in the server, without
+   * blocking.
+   *
+   * @param operation
+   *          The operation to be processed.
+   * @return true if the operation could be submitted to the queue, false if the
+   *         queue was full
+   * @throws DirectoryException
+   *           If the provided operation is not accepted for some reason (e.g.,
+   *           if the server is shutting down).
+   */
+  public abstract boolean trySubmitOperation(Operation operation)
+      throws DirectoryException;
+
+
+  /**
    * Indicates whether the work queue is currently processing any
    * requests.  Note that this is a point-in-time determination, and
    * if any component of the server wishes to depend on a quiescent
@@ -119,6 +138,42 @@
   public abstract boolean isIdle();
 
 
+  /**
+   * Return the maximum number of worker threads that can be used by this
+   * WorkQueue (The WorkQueue could have a thread pool which adjusts its size).
+   *
+   * @return the maximum number of worker threads that can be used by this
+   *         WorkQueue
+   */
+  public abstract int getNumWorkerThreads();
+
+
+  /**
+   * Computes the number of worker threads to use by the working queue based on
+   * the configured number.
+   *
+   * @param configuredNumWorkerThreads
+   *          the configured number of worker threads to use
+   * @return the number of worker threads to use
+   */
+  protected int computeNumWorkerThreads(Integer configuredNumWorkerThreads)
+  {
+    if (configuredNumWorkerThreads != null)
+    {
+      return configuredNumWorkerThreads;
+    }
+    else
+    {
+      // Automatically choose based on the number of processors.
+      int cpus = Runtime.getRuntime().availableProcessors();
+      int value = Math.max(24, cpus * 2);
+
+      Message message = INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL.get(value);
+      logError(message);
+
+      return value;
+    }
+  }
 
   /**
    * Waits for the work queue to become idle before returning.  Note
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/AbandonOperationWrapper.java b/opendj-sdk/opends/src/server/org/opends/server/core/AbandonOperationWrapper.java
new file mode 100644
index 0000000..b0ea99e
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/AbandonOperationWrapper.java
@@ -0,0 +1,58 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.core;
+
+
+/**
+ * This abstract class wraps/decorates a given abandon operation. This class
+ * will be extended by sub-classes to enhance the functionality of the
+ * AbandonOperationBasis.
+ */
+public abstract class AbandonOperationWrapper extends
+    OperationWrapper<AbandonOperation> implements AbandonOperation
+{
+
+  /**
+   * Creates a new abandon operation wrapper based on the provided abandon
+   * operation.
+   *
+   * @param abandon
+   *          The abandon operation to wrap
+   */
+  public AbandonOperationWrapper(AbandonOperation abandon)
+  {
+    super(abandon);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int getIDToAbandon()
+  {
+    return getOperation().getIDToAbandon();
+  }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java b/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java
new file mode 100644
index 0000000..1b80946
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java
@@ -0,0 +1,246 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.core;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Operation;
+
+/**
+ * A QueueingStrategy that concurrently enqueues a bounded number of operations
+ * to the DirectoryServer work queue. If the maximum number of concurrently
+ * enqueued operations has been reached or if the work queue if full, then the
+ * operation will be executed on the current thread.
+ */
+public class BoundedWorkQueueStrategy implements QueueingStrategy
+{
+
+  /**
+   * The number of concurrently running operations for this
+   * BoundedWorkQueueStrategy.
+   */
+  private final AtomicInteger nbRunningOperations = new AtomicInteger(0);
+  /** Maximum number of concurrent operations. 0 means "unlimited". */
+  private final int maxNbConcurrentOperations;
+
+  /**
+   * Constructor for BoundedWorkQueueStrategy.
+   *
+   * @param maxNbConcurrentOperations
+   *          the maximum number of operations that can be concurrently enqueued
+   *          to the DirectoryServer work queue
+   */
+  public BoundedWorkQueueStrategy(Integer maxNbConcurrentOperations)
+  {
+    if (maxNbConcurrentOperations != null)
+    {
+      this.maxNbConcurrentOperations = maxNbConcurrentOperations;
+    }
+    else
+    {
+      int cpus = Runtime.getRuntime().availableProcessors();
+      int numWorkerThreads =
+          DirectoryServer.getWorkQueue().getNumWorkerThreads();
+      this.maxNbConcurrentOperations =
+          Math.max(cpus, numWorkerThreads * 25 / 100);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void enqueueRequest(final Operation operation)
+      throws DirectoryException
+  {
+    if (!operation.getClientConnection().isConnectionValid())
+    {
+      // do not bother enqueueing
+      return;
+    }
+
+    if (maxNbConcurrentOperations == 0)
+    { // unlimited concurrent operations
+      if (!DirectoryServer.tryEnqueueRequest(operation))
+      { // avoid potential deadlocks by running in the current thread
+        operation.run();
+      }
+    }
+    else if (nbRunningOperations.getAndIncrement() > maxNbConcurrentOperations
+        || !DirectoryServer.tryEnqueueRequest(wrap(operation)))
+    { // avoid potential deadlocks by running in the current thread
+      try
+      {
+        operation.run();
+      }
+      finally
+      {
+        // only decrement when the operation is run synchronously.
+        // Otherwise it'll be decremented twice (once more in the wrapper).
+        nbRunningOperations.decrementAndGet();
+      }
+    }
+  }
+
+  private Operation wrap(final Operation operation)
+  {
+    if (operation instanceof AbandonOperation)
+    {
+      return new AbandonOperationWrapper((AbandonOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof AddOperation)
+    {
+      return new AddOperationWrapper((AddOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof BindOperation)
+    {
+      return new BindOperationWrapper((BindOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof CompareOperation)
+    {
+      return new CompareOperationWrapper((CompareOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof DeleteOperation)
+    {
+      return new DeleteOperationWrapper((DeleteOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof ExtendedOperation)
+    {
+      return new ExtendedOperationWrapper((ExtendedOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof ModifyDNOperation)
+    {
+      return new ModifyDNOperationWrapper((ModifyDNOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof ModifyOperation)
+    {
+      return new ModifyOperationWrapper((ModifyOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof SearchOperation)
+    {
+      return new SearchOperationWrapper((SearchOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else if (operation instanceof UnbindOperation)
+    {
+      return new UnbindOperationWrapper((UnbindOperation) operation)
+      {
+        @Override
+        public void run()
+        {
+          runWrapped(operation);
+        }
+      };
+    }
+    else
+    {
+      throw new RuntimeException(
+          "Not implemented for " + operation == null ? null : operation
+              .getClass().getName());
+    }
+  }
+
+  /**
+   * Execute the provided operation and decrement the number of currently
+   * running operations after it has finished executing.
+   *
+   * @param the
+   *          operation to execute
+   */
+  private void runWrapped(final Operation operation)
+  {
+    try
+    {
+      operation.run();
+    }
+    finally
+    {
+      nbRunningOperations.decrementAndGet();
+    }
+  }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/DirectoryServer.java b/opendj-sdk/opends/src/server/org/opends/server/core/DirectoryServer.java
index b6047f4..385f641 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/DirectoryServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/DirectoryServer.java
@@ -37,10 +37,8 @@
 import static org.opends.server.schema.SchemaConstants.*;
 import static org.opends.server.util.DynamicConstants.*;
 import static org.opends.server.util.ServerConstants.*;
-import static org.opends.server.util.StaticUtils.getExceptionMessage;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.opends.server.util.StaticUtils.toLowerCase;
-import static org.opends.server.util.Validator.ensureNotNull;
+import static org.opends.server.util.StaticUtils.*;
+import static org.opends.server.util.Validator.*;
 
 import java.io.*;
 import java.lang.management.ManagementFactory;
@@ -7287,15 +7285,16 @@
 
 
   /**
-   * Adds the provided operation to the work queue so that it will be processed
-   * by one of the worker threads.
+   * Runs all the necessary checks prior to adding an operation to the work
+   * queue. It throws a DirectoryException if one of the check fails.
    *
-   * @param  operation  The operation to be added to the work queue.
-   *
-   * @throws  DirectoryException  If a problem prevents the operation from being
-   *                              added to the queue (e.g., the queue is full).
+   * @param operation
+   *          The operation to be added to the work queue.
+   * @throws DirectoryException
+   *           If a check failed preventing the operation from being added to
+   *           the queue
    */
-  public static void enqueueRequest(Operation operation)
+  private static void checkCanEnqueueRequest(Operation operation)
          throws DirectoryException
   {
     ClientConnection clientConnection = operation.getClientConnection();
@@ -7348,7 +7347,6 @@
          break;
 
       }
-
     }
 
 
@@ -7420,12 +7418,41 @@
           // determination up to the modify operation itself.
       }
     }
+  }
 
-
-
+  /**
+   * Adds the provided operation to the work queue so that it will be processed
+   * by one of the worker threads.
+   *
+   * @param  operation  The operation to be added to the work queue.
+   *
+   * @throws  DirectoryException  If a problem prevents the operation from being
+   *                              added to the queue (e.g., the queue is full).
+   */
+  public static void enqueueRequest(Operation operation)
+      throws DirectoryException
+  {
+    checkCanEnqueueRequest(operation);
     directoryServer.workQueue.submitOperation(operation);
   }
 
+  /**
+   * Tries to add the provided operation to the work queue if not full so that
+   * it will be processed by one of the worker threads.
+   *
+   * @param operation
+   *          The operation to be added to the work queue.
+   * @return true if the operation could be enqueued, false otherwise
+   * @throws DirectoryException
+   *           If a problem prevents the operation from being added to the queue
+   *           (e.g., the queue is full).
+   */
+  public static boolean tryEnqueueRequest(Operation operation)
+      throws DirectoryException
+  {
+    checkCanEnqueueRequest(operation);
+    return directoryServer.workQueue.trySubmitOperation(operation);
+  }
 
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/ExtendedOperationWrapper.java b/opendj-sdk/opends/src/server/org/opends/server/core/ExtendedOperationWrapper.java
new file mode 100644
index 0000000..75871ac
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/ExtendedOperationWrapper.java
@@ -0,0 +1,94 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.core;
+
+import org.opends.server.types.ByteString;
+
+/**
+ * This abstract class wraps/decorates a given extended operation. This class
+ * will be extended by sub-classes to enhance the functionality of the
+ * ExtendedOperationBasis.
+ */
+public abstract class ExtendedOperationWrapper extends
+    OperationWrapper<ExtendedOperation> implements ExtendedOperation
+{
+
+  /**
+   * Creates a new extended operation wrapper based on the provided extended
+   * operation.
+   *
+   * @param extended
+   *          The extended operation to wrap
+   */
+  public ExtendedOperationWrapper(ExtendedOperation extended)
+  {
+    super(extended);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getRequestOID()
+  {
+    return getOperation().getRequestOID();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getResponseOID()
+  {
+    return getOperation().getResponseOID();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ByteString getRequestValue()
+  {
+    return getOperation().getRequestValue();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ByteString getResponseValue()
+  {
+    return getOperation().getResponseValue();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setResponseOID(String responseOID)
+  {
+    getOperation().setResponseOID(responseOID);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setResponseValue(ByteString responseValue)
+  {
+    getOperation().setResponseValue(responseValue);
+  }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/UnbindOperationWrapper.java b/opendj-sdk/opends/src/server/org/opends/server/core/UnbindOperationWrapper.java
new file mode 100644
index 0000000..c56bf82
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/UnbindOperationWrapper.java
@@ -0,0 +1,50 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.core;
+
+/**
+ * This abstract class wraps/decorates a given unbind operation. This class will
+ * be extended by sub-classes to enhance the functionality of the
+ * UnbindOperationBasis.
+ */
+public abstract class UnbindOperationWrapper extends
+    OperationWrapper<UnbindOperation> implements UnbindOperation
+{
+
+  /**
+   * Creates a new unbind operation wrapper based on the provided unbind
+   * operation.
+   *
+   * @param unbind
+   *          The unbind operation to wrap
+   */
+  public UnbindOperationWrapper(UnbindOperation unbind)
+  {
+    super(unbind);
+  }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java b/opendj-sdk/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
index 5214b64..eaefb78 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
@@ -29,6 +29,11 @@
 
 
 
+import static org.opends.messages.ConfigMessages.*;
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -52,11 +57,6 @@
 import org.opends.server.types.Operation;
 import org.opends.server.types.ResultCode;
 
-import static org.opends.messages.ConfigMessages.*;
-import static org.opends.messages.CoreMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
 
 
 /**
@@ -146,7 +146,8 @@
     configuration.addParallelChangeListener(this);
 
     // Get the necessary configuration from the provided entry.
-    numWorkerThreads = getNumWorkerThreads(configuration);
+    numWorkerThreads =
+        computeNumWorkerThreads(configuration.getNumWorkerThreads());
 
     // Create the actual work queue.
     opQueue = new ConcurrentLinkedQueue<Operation>();
@@ -286,6 +287,14 @@
     opsSubmitted.incrementAndGet();
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public boolean trySubmitOperation(Operation operation)
+      throws DirectoryException
+  {
+    submitOperation(operation);
+    return true;
+  }
 
 
   /**
@@ -515,7 +524,8 @@
                                  ParallelWorkQueueCfg configuration)
   {
     ArrayList<Message> resultMessages = new ArrayList<Message>();
-    int newNumThreads  = getNumWorkerThreads(configuration);
+    int newNumThreads =
+        computeNumWorkerThreads(configuration.getNumWorkerThreads());
 
     // Apply a change to the number of worker threads if appropriate.
     int currentThreads = workerThreads.size();
@@ -583,25 +593,14 @@
     }
   }
 
-
-
-  // Determine the number of worker threads.
-  private int getNumWorkerThreads(ParallelWorkQueueCfg configuration)
+  /**
+   * Return the number of worker threads used by this WorkQueue.
+   *
+   * @return the number of worker threads used by this WorkQueue
+   */
+  @Override
+  public int getNumWorkerThreads()
   {
-    if (configuration.getNumWorkerThreads() == null)
-    {
-      // Automatically choose based on the number of processors.
-      int cpus = Runtime.getRuntime().availableProcessors();
-      int value = Math.max(24, cpus * 2);
-
-      Message message = INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL.get(value);
-      logError(message);
-
-      return value;
-    }
-    else
-    {
-      return configuration.getNumWorkerThreads();
-    }
+    return this.numWorkerThreads;
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java b/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
index 4f79854..5d5fecd 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -51,7 +51,13 @@
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.monitors.TraditionalWorkQueueMonitor;
-import org.opends.server.types.*;
+import org.opends.server.types.CancelRequest;
+import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.Operation;
+import org.opends.server.types.ResultCode;
 
 
 
@@ -176,7 +182,8 @@
       configuration.addTraditionalChangeListener(this);
 
       // Get the necessary configuration from the provided entry.
-      numWorkerThreads = getNumWorkerThreads(configuration);
+      numWorkerThreads =
+          computeNumWorkerThreads(configuration.getNumWorkerThreads());
       maxCapacity = configuration.getMaxWorkQueueCapacity();
 
       // Create the actual work queue.
@@ -334,6 +341,32 @@
   @Override
   public void submitOperation(Operation operation) throws DirectoryException
   {
+    submitOperation(operation, isBlocking);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean trySubmitOperation(Operation operation)
+      throws DirectoryException
+  {
+    try
+    {
+      submitOperation(operation, false);
+      return true;
+    }
+    catch (DirectoryException e)
+    {
+      if (ResultCode.BUSY == e.getResultCode())
+      {
+        return false;
+      }
+      throw e;
+    }
+  }
+
+  private void submitOperation(Operation operation,
+      boolean blockEnqueuingWhenFull) throws DirectoryException
+  {
     queueReadLock.lock();
     try
     {
@@ -343,7 +376,7 @@
         throw new DirectoryException(ResultCode.UNAVAILABLE, message);
       }
 
-      if (isBlocking)
+      if (blockEnqueuingWhenFull)
       {
         try
         {
@@ -659,7 +692,8 @@
       TraditionalWorkQueueCfg configuration)
   {
     ArrayList<Message> resultMessages = new ArrayList<Message>();
-    int newNumThreads = getNumWorkerThreads(configuration);
+    int newNumThreads =
+        computeNumWorkerThreads(configuration.getNumWorkerThreads());
     int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
 
     // Apply a change to the number of worker threads if appropriate.
@@ -808,25 +842,14 @@
     }
   }
 
-
-
-  // Determine the number of worker threads.
-  private int getNumWorkerThreads(TraditionalWorkQueueCfg configuration)
+  /**
+   * Return the number of worker threads used by this WorkQueue.
+   *
+   * @return the number of worker threads used by this WorkQueue
+   */
+  @Override
+  public int getNumWorkerThreads()
   {
-    if (configuration.getNumWorkerThreads() == null)
-    {
-      // Automatically choose based on the number of processors.
-      int cpus = Runtime.getRuntime().availableProcessors();
-      int value = Math.max(24, cpus * 2);
-
-      Message message = INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL.get(value);
-      logError(message);
-
-      return value;
-    }
-    else
-    {
-      return configuration.getNumWorkerThreads();
-    }
+    return this.numWorkerThreads;
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/http/SdkConnectionAdapter.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/http/SdkConnectionAdapter.java
index e878683..02f16b5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/http/SdkConnectionAdapter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/http/SdkConnectionAdapter.java
@@ -59,6 +59,7 @@
 import org.opends.server.core.AbandonOperationBasis;
 import org.opends.server.core.AddOperationBasis;
 import org.opends.server.core.BindOperationBasis;
+import org.opends.server.core.BoundedWorkQueueStrategy;
 import org.opends.server.core.CompareOperationBasis;
 import org.opends.server.core.DeleteOperationBasis;
 import org.opends.server.core.ExtendedOperationBasis;
@@ -67,7 +68,6 @@
 import org.opends.server.core.QueueingStrategy;
 import org.opends.server.core.SearchOperationBasis;
 import org.opends.server.core.UnbindOperationBasis;
-import org.opends.server.core.WorkQueueStrategy;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.types.AuthenticationInfo;
 import org.opends.server.types.ByteString;
@@ -98,7 +98,7 @@
   private AtomicInteger nextMessageID = new AtomicInteger(0);
 
   /** The queueing strategy used for this connection. */
-  private QueueingStrategy queueingStrategy = new WorkQueueStrategy();
+  private final QueueingStrategy queueingStrategy;
 
   /**
    * Whether this connection has been closed by calling {@link #close()} or
@@ -115,6 +115,9 @@
   public SdkConnectionAdapter(HTTPClientConnection clientConnection)
   {
     this.clientConnection = clientConnection;
+    this.queueingStrategy =
+        new BoundedWorkQueueStrategy(clientConnection.getConnectionHandler()
+            .getCurrentConfig().getMaxConcurrentOpsPerConnection());
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -269,6 +272,16 @@
     return enqueueOperation(operation, resultHandler);
   }
 
+  /**
+   * Return the queueing strategy used by this connection.
+   *
+   * @return The queueing strategy used by this connection
+   */
+  public QueueingStrategy getQueueingStrategy()
+  {
+    return queueingStrategy;
+  }
+
   /** {@inheritDoc} */
   @Override
   public boolean isClosed()
diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/internal/InternalClientConnection.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/internal/InternalClientConnection.java
index 4a424e3..4414f35 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/internal/InternalClientConnection.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/internal/InternalClientConnection.java
@@ -617,7 +617,13 @@
     // time limit for internal client connections.
   }
 
-
+  /** {@inheritDoc} */
+  @Override
+  public boolean isConnectionValid()
+  {
+    // This connection is always valid
+    return true;
+  }
 
   /**
    * Indicates whether this client connection is currently using a
diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/jmx/JmxClientConnection.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/jmx/JmxClientConnection.java
index 875d03c..3fc3fa7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/jmx/JmxClientConnection.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/jmx/JmxClientConnection.java
@@ -88,7 +88,7 @@
   /**
    * Indicate that the disconnect process is started.
    */
-  private Boolean disconnectStarted = false;
+  private boolean disconnectStarted = false;
 
 
   /**
@@ -357,7 +357,12 @@
     return null;
   }
 
-
+  /** {@inheritDoc} */
+  @Override
+  public boolean isConnectionValid()
+  {
+    return !disconnectStarted;
+  }
 
   /**
    * Indicates whether this client connection is currently using a secure
diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index b88a2c8..b3c78cd 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -664,7 +664,12 @@
     return clientChannel.socket().getLocalAddress();
   }
 
-
+  /** {@inheritDoc} */
+  @Override
+  public boolean isConnectionValid()
+  {
+    return this.connectionValid;
+  }
 
   /**
    * Indicates whether this client connection is currently using a
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/MockClientConnection.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/MockClientConnection.java
index a338b40..ce39b4b 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/MockClientConnection.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/MockClientConnection.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
+ *      Portions Copyright 2013 ForgeRock AS
  */
 package org.opends.server.core.networkgroups;
 
@@ -56,6 +57,7 @@
 /**
  * A mock connection for connection criteria testing.
  */
+@SuppressWarnings("javadoc")
 public final class MockClientConnection extends ClientConnection
 {
   private final int clientPort;
@@ -68,7 +70,7 @@
 
   /**
    * Creates a new mock client connection.
-   *
+   * 
    * @param clientPort
    *          The client port.
    * @param isSecure
@@ -76,7 +78,7 @@
    * @param bindDN
    *          The client bind DN.
    * @param authMethod
-   *          The client authentication mathod.
+   *          The client authentication method.
    * @throws Exception
    *           If an unexpected exception occurred.
    */
@@ -271,7 +273,12 @@
     return 0;
   }
 
-
+  @Override
+  public boolean isConnectionValid()
+  {
+    // This connection is always valid
+    return true;
+  }
 
   @Override
   public boolean isSecure()

--
Gitblit v1.10.0