From b96394fab56fb19ed17ddcff418862c466df2f38 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 02 Jan 2014 10:14:22 +0000
Subject: [PATCH] Added tests for BoundedWorkQueueStrategy (finally).
---
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/BoundedWorkQueueStrategyTest.java | 181 +++++++++++++++++++++++++++++++++++++++++++++
opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java | 38 ++++++++-
2 files changed, 213 insertions(+), 6 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java b/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java
index 573c854..12e7403 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/BoundedWorkQueueStrategy.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.core;
@@ -63,13 +63,23 @@
else
{
int cpus = Runtime.getRuntime().availableProcessors();
- int numWorkerThreads =
- DirectoryServer.getWorkQueue().getNumWorkerThreads();
this.maxNbConcurrentOperations =
- Math.max(cpus, numWorkerThreads * 25 / 100);
+ Math.max(cpus, getNumWorkerThreads() * 25 / 100);
}
}
+ /**
+ * Return the maximum number of worker threads that can be used by the
+ * WorkQueue (The WorkQueue could have a thread pool which adjusts its size).
+ *
+ * @return the maximum number of worker threads that can be used by the
+ * WorkQueue
+ */
+ protected int getNumWorkerThreads()
+ {
+ return DirectoryServer.getWorkQueue().getNumWorkerThreads();
+ }
+
/** {@inheritDoc} */
@Override
public void enqueueRequest(final Operation operation)
@@ -83,13 +93,13 @@
if (maxNbConcurrentOperations == 0)
{ // unlimited concurrent operations
- if (!DirectoryServer.tryEnqueueRequest(operation))
+ if (!tryEnqueueRequest(operation))
{ // avoid potential deadlocks by running in the current thread
operation.run();
}
}
else if (nbRunningOperations.getAndIncrement() > maxNbConcurrentOperations
- || !DirectoryServer.tryEnqueueRequest(wrap(operation)))
+ || !tryEnqueueRequest(wrap(operation)))
{ // avoid potential deadlocks by running in the current thread
try
{
@@ -104,6 +114,22 @@
}
}
+ /**
+ * Tries to add the provided operation to the work queue if not full so that
+ * it will be processed by one of the worker threads.
+ *
+ * @param op
+ * The operation to be added to the work queue.
+ * @return true if the operation could be enqueued, false otherwise
+ * @throws DirectoryException
+ * If a problem prevents the operation from being added to the queue
+ * (e.g., the queue is full).
+ */
+ protected boolean tryEnqueueRequest(Operation op) throws DirectoryException
+ {
+ return DirectoryServer.tryEnqueueRequest(op);
+ }
+
private Operation wrap(final Operation operation)
{
if (operation instanceof AbandonOperation)
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/BoundedWorkQueueStrategyTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/BoundedWorkQueueStrategyTest.java
new file mode 100644
index 0000000..1767719
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/core/BoundedWorkQueueStrategyTest.java
@@ -0,0 +1,181 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.core;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.api.ClientConnection;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Operation;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+@SuppressWarnings("javadoc")
+public class BoundedWorkQueueStrategyTest extends DirectoryServerTestCase
+{
+
+ @SuppressWarnings("unchecked")
+ private static final List<Class<? extends Operation>> ALL_OPERATION_CLASSES =
+ Arrays.asList(AbandonOperation.class, AddOperation.class,
+ BindOperation.class, CompareOperation.class, DeleteOperation.class,
+ ExtendedOperation.class, ModifyDNOperation.class,
+ ModifyOperation.class, SearchOperation.class, UnbindOperation.class);
+
+ /** Overrides the use of undesired static methods. */
+ private final class BoundedWorkQueueStrategyForTest extends
+ BoundedWorkQueueStrategy
+ {
+
+ private boolean enqueueRequestSucceeds;
+
+ private BoundedWorkQueueStrategyForTest(Integer maxNbConcurrentOperations,
+ boolean enqueueRequestSucceeds)
+ {
+ super(maxNbConcurrentOperations);
+ this.enqueueRequestSucceeds = enqueueRequestSucceeds;
+ }
+
+ @Override
+ protected boolean tryEnqueueRequest(Operation op) throws DirectoryException
+ {
+ return enqueueRequestSucceeds;
+ }
+
+ @Override
+ protected int getNumWorkerThreads()
+ {
+ return 1;
+ }
+ }
+
+ private Operation getMockedOperation(
+ Class<? extends Operation> operationClass, boolean isConnectionValid)
+ {
+ final Operation operation = mock(operationClass);
+ final ClientConnection connection = mock(ClientConnection.class);
+ when(operation.getClientConnection()).thenReturn(connection);
+ when(connection.isConnectionValid()).thenReturn(isConnectionValid);
+ return operation;
+ }
+
+ @Test
+ public void doNotEnqueueOperationWithInvalidConnection() throws Exception
+ {
+ final BoundedWorkQueueStrategy strategy =
+ new BoundedWorkQueueStrategyForTest(1, true);
+ final Operation operation =
+ getMockedOperation(SearchOperation.class, false);
+ strategy.enqueueRequest(operation);
+
+ verify(operation, times(1)).getClientConnection();
+ verifyNoMoreInteractions(operation);
+ }
+
+ @DataProvider
+ public Object[][] allOperationClasses() throws Exception
+ {
+ final Object[][] results = new Object[ALL_OPERATION_CLASSES.size() * 2][];
+ for (int i = 0; i < ALL_OPERATION_CLASSES.size(); i++)
+ {
+ Class<? extends Operation> operationClass = ALL_OPERATION_CLASSES.get(i);
+ results[i * 2] = new Object[] { operationClass, true };
+ results[i * 2 + 1] = new Object[] { operationClass, false };
+ }
+ return results;
+ }
+
+ @Test(expectedExceptions = RuntimeException.class)
+ // FIXME use expectedExceptionsMessageRegExp = "Not implemented for.*"
+ public void enqueueUnhandledOperationClass() throws Exception
+ {
+ enqueueRequestWithConcurrency(Operation.class, false);
+ }
+
+ @Test(dataProvider = "allOperationClasses")
+ public void enqueueRequestWithConcurrency(
+ Class<? extends Operation> operationClass, boolean enqueueRequestSucceeds)
+ throws Exception
+ {
+ final BoundedWorkQueueStrategy strategy =
+ new BoundedWorkQueueStrategyForTest(1, enqueueRequestSucceeds);
+ final Operation operation = getMockedOperation(operationClass, true);
+ strategy.enqueueRequest(operation);
+
+ verify(operation, times(1)).getClientConnection();
+ if (!enqueueRequestSucceeds)
+ { // run in current thread
+ verify(operation, times(1)).run();
+ }
+ verifyNoMoreInteractions(operation);
+ }
+
+ @Test
+ public void enqueueRequestWithLimitedConcurrency()
+ throws Exception
+ {
+ final Class<? extends Operation> operationClass = SearchOperation.class;
+ final BoundedWorkQueueStrategy strategy =
+ new BoundedWorkQueueStrategyForTest(1, true);
+ final Operation operation1 = getMockedOperation(operationClass, true);
+ final Operation operation2 = getMockedOperation(operationClass, true);
+ final Operation operation3 = getMockedOperation(operationClass, true);
+ strategy.enqueueRequest(operation1);
+ strategy.enqueueRequest(operation2);
+ strategy.enqueueRequest(operation3);
+
+ verify(operation1, times(1)).getClientConnection();
+ verifyNoMoreInteractions(operation1);
+ verify(operation2, times(1)).getClientConnection();
+ verifyNoMoreInteractions(operation2);
+
+ verify(operation3, times(1)).getClientConnection();
+ // to many concurrent operations => run in current thread
+ verify(operation3, times(1)).run();
+ verifyNoMoreInteractions(operation3);
+ }
+
+ @Test(dataProvider = "allOperationClasses")
+ public void enqueueRequestNoConcurrency(
+ Class<? extends Operation> operationClass, boolean enqueueRequestSucceeds)
+ throws Exception
+ {
+ final BoundedWorkQueueStrategy strategy =
+ new BoundedWorkQueueStrategyForTest(0, enqueueRequestSucceeds);
+ final Operation operation = getMockedOperation(operationClass, true);
+ strategy.enqueueRequest(operation);
+
+ verify(operation, times(1)).getClientConnection();
+ if (!enqueueRequestSucceeds)
+ { // run in current thread
+ verify(operation, times(1)).run();
+ }
+ verifyNoMoreInteractions(operation);
+ }
+
+}
--
Gitblit v1.10.0