mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.49.2013 582344d280d24dfec999b862d8255eb077995b99
opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.extensions;
@@ -83,31 +84,37 @@
  // The set of worker threads that will be used to process this work queue.
  /** The set of worker threads that will be used to process this work queue. */
  private ArrayList<ParallelWorkerThread> workerThreads;
  // The number of operations that have been submitted to the work queue for
  // processing.
  /**
   * The number of operations that have been submitted to the work queue for
   * processing.
   */
  private AtomicLong opsSubmitted;
  // Indicates whether one or more of the worker threads needs to be killed at
  // the next convenient opportunity.
  /**
   * Indicates whether one or more of the worker threads needs to be killed at
   * the next convenient opportunity.
   */
  private boolean killThreads;
  // Indicates whether the Directory Server is shutting down.
  /** Indicates whether the Directory Server is shutting down. */
  private boolean shutdownRequested;
  // The thread number used for the last worker thread that was created.
  /** The thread number used for the last worker thread that was created. */
  private int lastThreadNumber;
  // The number of worker threads that should be active (or will be shortly if
  // a configuration change has not been completely applied).
  /**
   * The number of worker threads that should be active (or will be shortly if a
   * configuration change has not been completely applied).
   */
  private int numWorkerThreads;
  // The queue that will be used to actually hold the pending operations.
  private ConcurrentLinkedQueue<AbstractOperation> opQueue;
  /** The queue that will be used to actually hold the pending operations. */
  private ConcurrentLinkedQueue<Operation> opQueue;
  // The lock used to provide threadsafe access for the queue.
  /** The lock used to provide threadsafe access for the queue. */
  private final Object queueLock = new Object();
@@ -143,7 +150,7 @@
    numWorkerThreads = getNumWorkerThreads(configuration);
    // Create the actual work queue.
    opQueue = new ConcurrentLinkedQueue<AbstractOperation>();
    opQueue = new ConcurrentLinkedQueue<Operation>();
    // Create the set of worker threads that should be used to service the
    // work queue.
@@ -266,8 +273,7 @@
   *                              at its maximum capacity).
   */
  @Override
  public void submitOperation(AbstractOperation operation)
         throws DirectoryException
  public void submitOperation(Operation operation) throws DirectoryException
  {
    if (shutdownRequested)
    {
@@ -294,7 +300,7 @@
   *          if the server is shutting down and no more operations will be
   *          processed.
   */
  public AbstractOperation nextOperation(ParallelWorkerThread workerThread)
  public Operation nextOperation(ParallelWorkerThread workerThread)
  {
    return retryNextOperation(workerThread, 0);
  }
@@ -317,7 +323,7 @@
   *          if the server is shutting down and no more operations will be
   *          processed, or if there have been too many consecutive failures.
   */
  private AbstractOperation retryNextOperation(
  private Operation retryNextOperation(
                                       ParallelWorkerThread workerThread,
                                       int numFailures)
  {
@@ -333,8 +339,7 @@
          int currentThreads = workerThreads.size();
          if (currentThreads > numWorkerThreads)
          {
            if (workerThreads.remove((ParallelWorkerThread)
              Thread.currentThread()))
            if (workerThreads.remove(Thread.currentThread()))
            {
              currentThreads--;
            }
@@ -374,7 +379,7 @@
    {
      while (true)
      {
        AbstractOperation nextOperation = null;
        Operation nextOperation = null;
        if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
          nextOperation = opQueue.poll();
        }
@@ -395,8 +400,7 @@
                int currentThreads = workerThreads.size();
                if (currentThreads > numWorkerThreads)
                {
                  if (workerThreads.remove((ParallelWorkerThread)
                    Thread.currentThread()))
                  if (workerThreads.remove(Thread.currentThread()))
                  {
                    currentThreads--;
                  }
@@ -494,6 +498,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isConfigurationChangeAcceptable(
                      ParallelWorkQueueCfg configuration,
                      List<Message> unacceptableReasons)
@@ -506,6 +511,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public ConfigChangeResult applyConfigurationChange(
                                 ParallelWorkQueueCfg configuration)
  {