opendj-sdk/opends/src/server/org/opends/server/api/WorkQueue.java
@@ -34,6 +34,8 @@ import org.opends.server.types.DirectoryException; import org.opends.server.types.InitializationException; import static org.opends.server.loggers.Debug.*; /** @@ -49,6 +51,14 @@ public abstract class WorkQueue { /** * The fully-qualified name of this class for debugging purposes. */ private static final String CLASS_NAME = "org.opends.server.api.WorkQueue"; /** * Initializes this work queue based on the information in the * provided configuration entry. * @@ -96,5 +106,70 @@ */ public abstract void submitOperation(Operation operation) throws DirectoryException; /** * Indicates whether the work queue is currently processing any * requests. Note that this is a point-in-time determination, and * if any component of the server wishes to depend on a quiescent * state then it should use some external mechanism to ensure that * no other requests are submitted to the queue. * * @return {@code true} if the work queue is currently idle, or * {@code false} if it is being used to process one or more * operations. */ public abstract boolean isIdle(); /** * Waits for the work queue to become idle before returning. Note * that this is a point-in-time determination, and if any component * of the server wishes to depend on a quiescent state then it * should use some external mechanism to ensure that no other * requests are submitted to the queue. * * @param timeLimit The maximum length of time in milliseconds * that this method should wait for the queue to * become idle before giving up. A time limit * that is less than or equal to zero indicates * that there should not be a time limit. * * @return {@code true} if the work queue is idle at the time that * this method returns, or {@code false} if the wait time * limit was reached before the server became idle. */ public boolean waitUntilIdle(long timeLimit) { assert debugEnter(CLASS_NAME, "waitUntilIdle", String.valueOf(timeLimit)); long stopWaitingTime; if (timeLimit <= 0) { stopWaitingTime = Long.MAX_VALUE; } else { stopWaitingTime = System.currentTimeMillis() + timeLimit; } while (System.currentTimeMillis() < stopWaitingTime) { if (isIdle()) { return true; } try { Thread.sleep(1); } catch (InterruptedException ie) {} } return false; } } opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -142,18 +142,7 @@ /** * Initializes this work queue based on the information in the provided * configuration entry. * * @param configEntry The configuration entry that contains the information * to use to initialize this work queue. * * @throws ConfigException If the provided configuration entry does not have * a valid work queue configuration. * * @throws InitializationException If a problem occurs during initialization * that is not related to the server * configuration. * {@inheritDoc} */ public void initializeWorkQueue(ConfigEntry configEntry) throws ConfigException, InitializationException @@ -315,13 +304,7 @@ /** * Performs any necessary finalization for this work queue, * including ensuring that all active operations are interrupted or * will be allowed to complete, and that all pending operations will * be cancelled. * * @param reason The human-readable reason that the work queue is being * shut down. * {@inheritDoc} */ public void finalizeWorkQueue(String reason) { @@ -1089,5 +1072,39 @@ return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages); } /** * {@inheritDoc} */ public boolean isIdle() { assert debugEnter(CLASS_NAME, "isIdle"); if (opQueue.size() > 0) { return false; } queueLock.lock(); try { for (TraditionalWorkerThread t : workerThreads) { if (t.isActive()) { return false; } } return true; } finally { queueLock.unlock(); } } } opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
@@ -125,6 +125,24 @@ /** * Indicates whether this worker thread is actively processing a request. * Note that this is a point-in-time determination and if a reliable answer is * expected then the server should impose some external constraint to ensure * that no new requests are enqueued. * * @return {@code true} if this worker thread is actively processing a * request, or {@code false} if it is idle. */ public boolean isActive() { assert debugEnter(CLASS_NAME, "isActive"); return (isAlive() && (operation != null)); } /** * Operates in a loop, retrieving the next request from the work queue, * processing it, and then going back to the queue for more. */ opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/extensions/TraditionalWorkQueueTestCase.java
@@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import org.testng.annotations.BeforeClass; @@ -39,13 +40,19 @@ import org.opends.server.config.ConfigEntry; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperation; import org.opends.server.core.SearchOperation; import org.opends.server.plugins.DelayPreOpPlugin; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.types.Attribute; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.Control; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.DN; import org.opends.server.tools.LDAPSearch; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchScope; import static org.testng.Assert.*; @@ -128,5 +135,117 @@ assertEquals(LDAPSearch.mainSearch(args, false, null, System.err), 0); } } /** * Tests the {@code WorkQueue.waitUntilIdle()} method for a case in which the * work queue should already be idle. * * @throws Exception If an unexpected problem occurs. */ @Test(groups = { "slow" }) public void testWaitUntilIdleNoOpsInProgress() throws Exception { Thread.sleep(5000); long startTime = System.currentTimeMillis(); assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(10000)); long stopTime = System.currentTimeMillis(); assertTrue((stopTime - startTime) <= 1000); } /** * Tests the {@code WorkQueue.waitUntilIdle()} method for a case in which the * work queue should already be idle and no timeout is given. * * @throws Exception If an unexpected problem occurs. */ @Test(groups = { "slow" }, timeOut=10000) public void testWaitUntilIdleNoOpsInProgressNoTimeout() throws Exception { Thread.sleep(5000); long startTime = System.currentTimeMillis(); assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(0)); long stopTime = System.currentTimeMillis(); assertTrue((stopTime - startTime) <= 1000); } /** * Tests the {@code WorkQueue.waitUntilIdle()} method for a case in which the * work queue should not be idle for several seconds. * * @throws Exception If an unexpected problem occurs. */ @Test(groups = { "slow" }) public void testWaitUntilIdleSlowOpInProgress() throws Exception { TestCaseUtils.initializeTestBackend(true); List<Control> requestControls = DelayPreOpPlugin.createDelayControlList(5000); SearchFilter filter = SearchFilter.createFilterFromString("(objectClass=*)"); LinkedHashSet<String> attrs = new LinkedHashSet<String>(); InternalClientConnection conn = InternalClientConnection.getRootConnection(); SearchOperation searchOperation = new SearchOperation(conn, conn.nextOperationID(), conn.nextMessageID(), requestControls, DN.decode("o=test"), SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, filter, attrs); DirectoryServer.getWorkQueue().submitOperation(searchOperation); long startTime = System.currentTimeMillis(); assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(10000)); long stopTime = System.currentTimeMillis(); assertTrue((stopTime - startTime) >= 4000); } /** * Tests the {@code WorkQueue.waitUntilIdle()} method for a case in which the * work queue should not be idle for several seconds. * * @throws Exception If an unexpected problem occurs. */ @Test(groups = { "slow" }) public void testWaitUntilTimeoutWithIdleSlowOpInProgress() throws Exception { TestCaseUtils.initializeTestBackend(true); List<Control> requestControls = DelayPreOpPlugin.createDelayControlList(5000); SearchFilter filter = SearchFilter.createFilterFromString("(objectClass=*)"); LinkedHashSet<String> attrs = new LinkedHashSet<String>(); InternalClientConnection conn = InternalClientConnection.getRootConnection(); SearchOperation searchOperation = new SearchOperation(conn, conn.nextOperationID(), conn.nextMessageID(), requestControls, DN.decode("o=test"), SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, filter, attrs); DirectoryServer.getWorkQueue().submitOperation(searchOperation); long startTime = System.currentTimeMillis(); assertFalse(DirectoryServer.getWorkQueue().waitUntilIdle(1000)); long stopTime = System.currentTimeMillis(); assertTrue((stopTime - startTime) <= 2000); } }