From b96394fab56fb19ed17ddcff418862c466df2f38 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 02 Jan 2014 10:14:22 +0000
Subject: [PATCH] Added tests for BoundedWorkQueueStrategy (finally).

---
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/BoundedWorkQueueStrategyTest.java |  181 +++++++++++++++++++++++++++++++++++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java                             |   38 ++++++++-
 2 files changed, 213 insertions(+), 6 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java b/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java
index 573c854..12e7403 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java
@@ -21,7 +21,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.core;
 
@@ -63,13 +63,23 @@
     else
     {
       int cpus = Runtime.getRuntime().availableProcessors();
-      int numWorkerThreads =
-          DirectoryServer.getWorkQueue().getNumWorkerThreads();
       this.maxNbConcurrentOperations =
-          Math.max(cpus, numWorkerThreads * 25 / 100);
+          Math.max(cpus, getNumWorkerThreads() * 25 / 100);
     }
   }
 
+  /**
+   * Return the maximum number of worker threads that can be used by the
+   * WorkQueue (The WorkQueue could have a thread pool which adjusts its size).
+   *
+   * @return the maximum number of worker threads that can be used by the
+   *         WorkQueue
+   */
+  protected int getNumWorkerThreads()
+  {
+    return DirectoryServer.getWorkQueue().getNumWorkerThreads();
+  }
+
   /** {@inheritDoc} */
   @Override
   public void enqueueRequest(final Operation operation)
@@ -83,13 +93,13 @@
 
     if (maxNbConcurrentOperations == 0)
     { // unlimited concurrent operations
-      if (!DirectoryServer.tryEnqueueRequest(operation))
+      if (!tryEnqueueRequest(operation))
       { // avoid potential deadlocks by running in the current thread
         operation.run();
       }
     }
     else if (nbRunningOperations.getAndIncrement() > maxNbConcurrentOperations
-        || !DirectoryServer.tryEnqueueRequest(wrap(operation)))
+        || !tryEnqueueRequest(wrap(operation)))
     { // avoid potential deadlocks by running in the current thread
       try
       {
@@ -104,6 +114,22 @@
     }
   }
 
+  /**
+   * Tries to add the provided operation to the work queue if not full so that
+   * it will be processed by one of the worker threads.
+   *
+   * @param op
+   *          The operation to be added to the work queue.
+   * @return true if the operation could be enqueued, false otherwise
+   * @throws DirectoryException
+   *           If a problem prevents the operation from being added to the queue
+   *           (e.g., the queue is full).
+   */
+  protected boolean tryEnqueueRequest(Operation op) throws DirectoryException
+  {
+    return DirectoryServer.tryEnqueueRequest(op);
+  }
+
   private Operation wrap(final Operation operation)
   {
     if (operation instanceof AbandonOperation)
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/BoundedWorkQueueStrategyTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/BoundedWorkQueueStrategyTest.java
new file mode 100644
index 0000000..1767719
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/BoundedWorkQueueStrategyTest.java
@@ -0,0 +1,181 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.core;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.api.ClientConnection;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Operation;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+@SuppressWarnings("javadoc")
+public class BoundedWorkQueueStrategyTest extends DirectoryServerTestCase
+{
+
+  @SuppressWarnings("unchecked")
+  private static final List<Class<? extends Operation>> ALL_OPERATION_CLASSES =
+      Arrays.asList(AbandonOperation.class, AddOperation.class,
+          BindOperation.class, CompareOperation.class, DeleteOperation.class,
+          ExtendedOperation.class, ModifyDNOperation.class,
+          ModifyOperation.class, SearchOperation.class, UnbindOperation.class);
+
+  /** Overrides the use of undesired static methods. */
+  private final class BoundedWorkQueueStrategyForTest extends
+      BoundedWorkQueueStrategy
+  {
+
+    private boolean enqueueRequestSucceeds;
+
+    private BoundedWorkQueueStrategyForTest(Integer maxNbConcurrentOperations,
+        boolean enqueueRequestSucceeds)
+    {
+      super(maxNbConcurrentOperations);
+      this.enqueueRequestSucceeds = enqueueRequestSucceeds;
+    }
+
+    @Override
+    protected boolean tryEnqueueRequest(Operation op) throws DirectoryException
+    {
+      return enqueueRequestSucceeds;
+    }
+
+    @Override
+    protected int getNumWorkerThreads()
+    {
+      return 1;
+    }
+  }
+
+  private Operation getMockedOperation(
+      Class<? extends Operation> operationClass, boolean isConnectionValid)
+  {
+    final Operation operation = mock(operationClass);
+    final ClientConnection connection = mock(ClientConnection.class);
+    when(operation.getClientConnection()).thenReturn(connection);
+    when(connection.isConnectionValid()).thenReturn(isConnectionValid);
+    return operation;
+  }
+
+  @Test
+  public void doNotEnqueueOperationWithInvalidConnection() throws Exception
+  {
+    final BoundedWorkQueueStrategy strategy =
+        new BoundedWorkQueueStrategyForTest(1, true);
+    final Operation operation =
+        getMockedOperation(SearchOperation.class, false);
+    strategy.enqueueRequest(operation);
+
+    verify(operation, times(1)).getClientConnection();
+    verifyNoMoreInteractions(operation);
+  }
+
+  @DataProvider
+  public Object[][] allOperationClasses() throws Exception
+  {
+    final Object[][] results = new Object[ALL_OPERATION_CLASSES.size() * 2][];
+    for (int i = 0; i < ALL_OPERATION_CLASSES.size(); i++)
+    {
+      Class<? extends Operation> operationClass = ALL_OPERATION_CLASSES.get(i);
+      results[i * 2] = new Object[] { operationClass, true };
+      results[i * 2 + 1] = new Object[] { operationClass, false };
+    }
+    return results;
+  }
+
+  @Test(expectedExceptions = RuntimeException.class)
+  // FIXME use expectedExceptionsMessageRegExp = "Not implemented for.*"
+  public void enqueueUnhandledOperationClass() throws Exception
+  {
+    enqueueRequestWithConcurrency(Operation.class, false);
+  }
+
+  @Test(dataProvider = "allOperationClasses")
+  public void enqueueRequestWithConcurrency(
+      Class<? extends Operation> operationClass, boolean enqueueRequestSucceeds)
+      throws Exception
+  {
+    final BoundedWorkQueueStrategy strategy =
+        new BoundedWorkQueueStrategyForTest(1, enqueueRequestSucceeds);
+    final Operation operation = getMockedOperation(operationClass, true);
+    strategy.enqueueRequest(operation);
+
+    verify(operation, times(1)).getClientConnection();
+    if (!enqueueRequestSucceeds)
+    { // run in current thread
+      verify(operation, times(1)).run();
+    }
+    verifyNoMoreInteractions(operation);
+  }
+
+  @Test
+  public void enqueueRequestWithLimitedConcurrency()
+      throws Exception
+  {
+    final Class<? extends Operation> operationClass = SearchOperation.class;
+    final BoundedWorkQueueStrategy strategy =
+        new BoundedWorkQueueStrategyForTest(1, true);
+    final Operation operation1 = getMockedOperation(operationClass, true);
+    final Operation operation2 = getMockedOperation(operationClass, true);
+    final Operation operation3 = getMockedOperation(operationClass, true);
+    strategy.enqueueRequest(operation1);
+    strategy.enqueueRequest(operation2);
+    strategy.enqueueRequest(operation3);
+
+    verify(operation1, times(1)).getClientConnection();
+    verifyNoMoreInteractions(operation1);
+    verify(operation2, times(1)).getClientConnection();
+    verifyNoMoreInteractions(operation2);
+
+    verify(operation3, times(1)).getClientConnection();
+    // to many concurrent operations => run in current thread
+    verify(operation3, times(1)).run();
+    verifyNoMoreInteractions(operation3);
+  }
+
+  @Test(dataProvider = "allOperationClasses")
+  public void enqueueRequestNoConcurrency(
+      Class<? extends Operation> operationClass, boolean enqueueRequestSucceeds)
+      throws Exception
+  {
+    final BoundedWorkQueueStrategy strategy =
+        new BoundedWorkQueueStrategyForTest(0, enqueueRequestSucceeds);
+    final Operation operation = getMockedOperation(operationClass, true);
+    strategy.enqueueRequest(operation);
+
+    verify(operation, times(1)).getClientConnection();
+    if (!enqueueRequestSucceeds)
+    { // run in current thread
+      verify(operation, times(1)).run();
+    }
+    verifyNoMoreInteractions(operation);
+  }
+
+}

--
Gitblit v1.10.0