mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

neil_a_wilson
07.54.2006 f108de7fc4fddeaf70f73996fd4e9b62b2f692c0
opendj-sdk/opends/src/server/org/opends/server/api/WorkQueue.java
@@ -34,6 +34,8 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
import static org.opends.server.loggers.Debug.*;
/**
@@ -49,6 +51,14 @@
public abstract class WorkQueue
{
  /**
   * The fully-qualified name of this class for debugging purposes.
   */
  private static final String CLASS_NAME =
       "org.opends.server.api.WorkQueue";
  /**
   * Initializes this work queue based on the information in the
   * provided configuration entry.
   *
@@ -96,5 +106,70 @@
   */
  public abstract void submitOperation(Operation operation)
         throws DirectoryException;
  /**
   * Indicates whether the work queue is currently processing any
   * requests.  Note that this is a point-in-time determination, and
   * if any component of the server wishes to depend on a quiescent
   * state then it should use some external mechanism to ensure that
   * no other requests are submitted to the queue.
   *
   * @return  {@code true} if the work queue is currently idle, or
   *          {@code false} if it is being used to process one or more
   *          operations.
   */
  public abstract boolean isIdle();
  /**
   * Waits for the work queue to become idle before returning.  Note
   * that this is a point-in-time determination, and if any component
   * of the server wishes to depend on a quiescent state then it
   * should use some external mechanism to ensure that no other
   * requests are submitted to the queue.
   *
   * @param  timeLimit  The maximum length of time in milliseconds
   *                    that this method should wait for the queue to
   *                    become idle before giving up.  A time limit
   *                    that is less than or equal to zero indicates
   *                    that there should not be a time limit.
   *
   * @return  {@code true} if the work queue is idle at the time that
   *          this method returns, or {@code false} if the wait time
   *          limit was reached before the server became idle.
   */
  public boolean waitUntilIdle(long timeLimit)
  {
    assert debugEnter(CLASS_NAME, "waitUntilIdle",
                      String.valueOf(timeLimit));
    long stopWaitingTime;
    if (timeLimit <= 0)
    {
      stopWaitingTime = Long.MAX_VALUE;
    }
    else
    {
      stopWaitingTime = System.currentTimeMillis() + timeLimit;
    }
    while (System.currentTimeMillis() < stopWaitingTime)
    {
      if (isIdle())
      {
        return true;
      }
      try
      {
        Thread.sleep(1);
      } catch (InterruptedException ie) {}
    }
    return false;
  }
}
opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -142,18 +142,7 @@
  /**
   * Initializes this work queue based on the information in the provided
   * configuration entry.
   *
   * @param  configEntry  The configuration entry that contains the information
   *                      to use to initialize this work queue.
   *
   * @throws  ConfigException  If the provided configuration entry does not have
   *                           a valid work queue configuration.
   *
   * @throws  InitializationException  If a problem occurs during initialization
   *                                   that is not related to the server
   *                                   configuration.
   * {@inheritDoc}
   */
  public void initializeWorkQueue(ConfigEntry configEntry)
         throws ConfigException, InitializationException
@@ -315,13 +304,7 @@
  /**
   * Performs any necessary finalization for this work queue,
   * including ensuring that all active operations are interrupted or
   * will be allowed to complete, and that all pending operations will
   * be cancelled.
   *
   * @param  reason  The human-readable reason that the work queue is being
   *                 shut down.
   * {@inheritDoc}
   */
  public void finalizeWorkQueue(String reason)
  {
@@ -1089,5 +1072,39 @@
    return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
  }
  /**
   * {@inheritDoc}
   */
  public boolean isIdle()
  {
    assert debugEnter(CLASS_NAME, "isIdle");
    if (opQueue.size() > 0)
    {
      return false;
    }
    queueLock.lock();
    try
    {
      for (TraditionalWorkerThread t : workerThreads)
      {
        if (t.isActive())
        {
          return false;
        }
      }
      return true;
    }
    finally
    {
      queueLock.unlock();
    }
  }
}
opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
@@ -125,6 +125,24 @@
  /**
   * Indicates whether this worker thread is actively processing a request.
   * Note that this is a point-in-time determination and if a reliable answer is
   * expected then the server should impose some external constraint to ensure
   * that no new requests are enqueued.
   *
   * @return  {@code true} if this worker thread is actively processing a
   *          request, or {@code false} if it is idle.
   */
  public boolean isActive()
  {
    assert debugEnter(CLASS_NAME, "isActive");
    return (isAlive() && (operation != null));
  }
  /**
   * Operates in a loop, retrieving the next request from the work queue,
   * processing it, and then going back to the queue for more.
   */
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/extensions/TraditionalWorkQueueTestCase.java
@@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import org.testng.annotations.BeforeClass;
@@ -39,13 +40,19 @@
import org.opends.server.config.ConfigEntry;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.SearchOperation;
import org.opends.server.plugins.DelayPreOpPlugin;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.types.Attribute;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.Control;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DN;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import static org.testng.Assert.*;
@@ -128,5 +135,117 @@
      assertEquals(LDAPSearch.mainSearch(args, false, null, System.err), 0);
    }
  }
  /**
   * Tests the {@code WorkQueue.waitUntilIdle()} method for a case in which the
   * work queue should already be idle.
   *
   * @throws  Exception  If an unexpected problem occurs.
   */
  @Test(groups = { "slow" })
  public void testWaitUntilIdleNoOpsInProgress()
         throws Exception
  {
    Thread.sleep(5000);
    long startTime = System.currentTimeMillis();
    assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(10000));
    long stopTime = System.currentTimeMillis();
    assertTrue((stopTime - startTime) <= 1000);
  }
  /**
   * Tests the {@code WorkQueue.waitUntilIdle()} method for a case in which the
   * work queue should already be idle and no timeout is given.
   *
   * @throws  Exception  If an unexpected problem occurs.
   */
  @Test(groups = { "slow" }, timeOut=10000)
  public void testWaitUntilIdleNoOpsInProgressNoTimeout()
         throws Exception
  {
    Thread.sleep(5000);
    long startTime = System.currentTimeMillis();
    assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(0));
    long stopTime = System.currentTimeMillis();
    assertTrue((stopTime - startTime) <= 1000);
  }
  /**
   * Tests the {@code WorkQueue.waitUntilIdle()} method for a case in which the
   * work queue should not be idle for several seconds.
   *
   * @throws  Exception  If an unexpected problem occurs.
   */
  @Test(groups = { "slow" })
  public void testWaitUntilIdleSlowOpInProgress()
         throws Exception
  {
    TestCaseUtils.initializeTestBackend(true);
    List<Control> requestControls =
         DelayPreOpPlugin.createDelayControlList(5000);
    SearchFilter filter =
         SearchFilter.createFilterFromString("(objectClass=*)");
    LinkedHashSet<String> attrs = new LinkedHashSet<String>();
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    SearchOperation searchOperation =
         new SearchOperation(conn, conn.nextOperationID(), conn.nextMessageID(),
                             requestControls, DN.decode("o=test"),
                             SearchScope.BASE_OBJECT,
                             DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false,
                             filter, attrs);
    DirectoryServer.getWorkQueue().submitOperation(searchOperation);
    long startTime = System.currentTimeMillis();
    assertTrue(DirectoryServer.getWorkQueue().waitUntilIdle(10000));
    long stopTime = System.currentTimeMillis();
    assertTrue((stopTime - startTime) >= 4000);
  }
  /**
   * Tests the {@code WorkQueue.waitUntilIdle()} method for a case in which the
   * work queue should not be idle for several seconds.
   *
   * @throws  Exception  If an unexpected problem occurs.
   */
  @Test(groups = { "slow" })
  public void testWaitUntilTimeoutWithIdleSlowOpInProgress()
         throws Exception
  {
    TestCaseUtils.initializeTestBackend(true);
    List<Control> requestControls =
         DelayPreOpPlugin.createDelayControlList(5000);
    SearchFilter filter =
         SearchFilter.createFilterFromString("(objectClass=*)");
    LinkedHashSet<String> attrs = new LinkedHashSet<String>();
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    SearchOperation searchOperation =
         new SearchOperation(conn, conn.nextOperationID(), conn.nextMessageID(),
                             requestControls, DN.decode("o=test"),
                             SearchScope.BASE_OBJECT,
                             DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false,
                             filter, attrs);
    DirectoryServer.getWorkQueue().submitOperation(searchOperation);
    long startTime = System.currentTimeMillis();
    assertFalse(DirectoryServer.getWorkQueue().waitUntilIdle(1000));
    long stopTime = System.currentTimeMillis();
    assertTrue((stopTime - startTime) <= 2000);
  }
}