mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

abobrov
09.46.2009 4c83513d36b8f0b29b0e6b4b7ed2bb8167a7b82a
- EXPERIMENTAL Parallel Work Queue implementation.
4 files added
1 files modified
1249 ■■■■■ changed files
opends/resource/schema/02-config.ldif 6 ●●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml 91 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java 605 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java 314 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java 233 ●●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif
@@ -4131,5 +4131,11 @@
  NAME 'ds-cfg-fractional-ldif-import-plugin'
  SUP ds-cfg-plugin
  STRUCTURAL
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.233
  NAME 'ds-cfg-parallel-work-queue'
  SUP ds-cfg-work-queue
  STRUCTURAL
  MAY ( ds-cfg-num-worker-threads )
  X-ORIGIN 'OpenDS Directory Server' )
opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml
New file
@@ -0,0 +1,91 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
  ! CDDL HEADER START
  !
  ! The contents of this file are subject to the terms of the
  ! Common Development and Distribution License, Version 1.0 only
  ! (the "License").  You may not use this file except in compliance
  ! with the License.
  !
  ! You can obtain a copy of the license at
  ! trunk/opends/resource/legal-notices/OpenDS.LICENSE
  ! or https://OpenDS.dev.java.net/OpenDS.LICENSE.
  ! See the License for the specific language governing permissions
  ! and limitations under the License.
  !
  ! When distributing Covered Code, include this CDDL HEADER in each
  ! file and include the License file at
  ! trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
  ! add the following below this CDDL HEADER, with the fields enclosed
  ! by brackets "[]" replaced with your own identifying information:
  !      Portions Copyright [yyyy] [name of copyright owner]
  !
  ! CDDL HEADER END
  !
  !
  !      Copyright 2007-2009 Sun Microsystems, Inc.
  ! -->
<adm:managed-object name="parallel-work-queue"
  plural-name="parallel-work-queues" extends="work-queue"
  package="org.opends.server.admin.std"
  xmlns:adm="http://www.opends.org/admin"
  xmlns:ldap="http://www.opends.org/admin-ldap">
  <adm:synopsis>
    The
    <adm:user-friendly-name />
    is a type of work queue that uses a number of worker threads that
    watch a queue and pick up an operation to process whenever one
    becomes available.
  </adm:synopsis>
  <adm:description>
    The parallel work queue is a FIFO queue serviced by a fixed
    number of worker threads. This fixed number of threads can be
    changed on the fly, with the change taking effect as soon as
    it is made. This work queue implementation is unbound ie it
    does not block after reaching certain queue size and as such
    should only be used on a very well tuned server configuration
    to avoid potential out of memory errors.
  </adm:description>
  <adm:profile name="ldap">
    <ldap:object-class>
      <ldap:name>ds-cfg-parallel-work-queue</ldap:name>
      <ldap:superior>ds-cfg-work-queue</ldap:superior>
    </ldap:object-class>
  </adm:profile>
  <adm:property-override name="java-class" advanced="true">
    <adm:default-behavior>
      <adm:defined>
        <adm:value>
          org.opends.server.extensions.ParallelWorkQueue
        </adm:value>
      </adm:defined>
    </adm:default-behavior>
  </adm:property-override>
  <adm:property name="num-worker-threads">
    <adm:synopsis>
      Specifies the number of worker threads to be used for processing
      operations placed in the queue.
  </adm:synopsis>
  <adm:description>
      If the value is increased,
      the additional worker threads are created immediately. If the
      value is reduced, the appropriate number of threads are destroyed
      as operations complete processing.
    </adm:description>
    <adm:default-behavior>
      <adm:alias>
        <adm:synopsis>
          Let the server decide.
        </adm:synopsis>
      </adm:alias>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="1" upper-limit="2147483647" />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-num-worker-threads</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
</adm:managed-object>
opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
New file
@@ -0,0 +1,605 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.extensions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.ParallelWorkQueueCfg;
import org.opends.server.api.WorkQueue;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.monitors.ParallelWorkQueueMonitor;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.CancelRequest;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Operation;
import org.opends.server.types.ResultCode;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * This class defines a data structure for storing and interacting with the
 * Directory Server work queue.
 */
public class ParallelWorkQueue
       extends WorkQueue<ParallelWorkQueueCfg>
       implements ConfigurationChangeListener<ParallelWorkQueueCfg>
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * The maximum number of times to retry getting the next operation from the
   * queue if an unexpected failure occurs.
   */
  private static final int MAX_RETRY_COUNT = 5;
  // The set of worker threads that will be used to process this work queue.
  private ArrayList<ParallelWorkerThread> workerThreads;
  // The number of operations that have been submitted to the work queue for
  // processing.
  private AtomicLong opsSubmitted;
  // Indicates whether one or more of the worker threads needs to be killed at
  // the next convenient opportunity.
  private boolean killThreads;
  // Indicates whether the Directory Server is shutting down.
  private boolean shutdownRequested;
  // The thread number used for the last worker thread that was created.
  private int lastThreadNumber;
  // The number of worker threads that should be active (or will be shortly if
  // a configuration change has not been completely applied).
  private int numWorkerThreads;
  // The queue that will be used to actually hold the pending operations.
  private ConcurrentLinkedQueue<AbstractOperation> opQueue;
  // The lock used to provide threadsafe access for the queue.
  private final Object queueLock = new Object();
  private final Semaphore queueSemaphore = new Semaphore(0, false);
  /**
   * Creates a new instance of this work queue.  All initialization should be
   * performed in the <CODE>initializeWorkQueue</CODE> method.
   */
  public ParallelWorkQueue()
  {
    // No implementation should be performed here.
  }
  /**
   * {@inheritDoc}
   */
  @Override()
  public void initializeWorkQueue(ParallelWorkQueueCfg configuration)
         throws ConfigException, InitializationException
  {
    shutdownRequested = false;
    killThreads       = false;
    opsSubmitted      = new AtomicLong(0);
    // Register to be notified of any configuration changes.
    configuration.addParallelChangeListener(this);
    // Get the necessary configuration from the provided entry.
    numWorkerThreads = getNumWorkerThreads(configuration);
    // Create the actual work queue.
    opQueue = new ConcurrentLinkedQueue<AbstractOperation>();
    // Create the set of worker threads that should be used to service the
    // work queue.
    workerThreads = new ArrayList<ParallelWorkerThread>(numWorkerThreads);
    for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
    lastThreadNumber++)
    {
      ParallelWorkerThread t =
           new ParallelWorkerThread(this, lastThreadNumber);
      t.start();
      workerThreads.add(t);
    }
    // Create and register a monitor provider for the work queue.
    try
    {
      ParallelWorkQueueMonitor monitor =
           new ParallelWorkQueueMonitor(this);
      monitor.initializeMonitorProvider(null);
      monitor.start();
      DirectoryServer.registerMonitorProvider(monitor);
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
          String.valueOf(ParallelWorkQueueMonitor.class), String.valueOf(e));
      logError(message);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override()
  public void finalizeWorkQueue(Message reason)
  {
    shutdownRequested = true;
    // Send responses to any operations in the pending queue to indicate that
    // they won't be processed because the server is shutting down.
    CancelRequest cancelRequest = new CancelRequest(true, reason);
    ArrayList<Operation> pendingOperations = new ArrayList<Operation>();
    opQueue.removeAll(pendingOperations);
    for (Operation o : pendingOperations)
    {
      try
      {
        // The operation has no chance of responding to the cancel
        // request so avoid waiting for a cancel response.
        if (o.getCancelResult() == null) {
          o.abort(cancelRequest);
        }
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
            String.valueOf(o), String.valueOf(e)));
      }
    }
    // Notify all the worker threads of the shutdown.
    for (ParallelWorkerThread t : workerThreads)
    {
      try
      {
        t.shutDown();
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
            t.getName(), String.valueOf(e)));
      }
    }
  }
  /**
   * Indicates whether this work queue has received a request to shut down.
   *
   * @return  <CODE>true</CODE> if the work queue has recieved a request to shut
   *          down, or <CODE>false</CODE> if not.
   */
  public boolean shutdownRequested()
  {
    return shutdownRequested;
  }
  /**
   * Submits an operation to be processed by one of the worker threads
   * associated with this work queue.
   *
   * @param  operation  The operation to be processed.
   *
   * @throws  DirectoryException  If the provided operation is not accepted for
   *                              some reason (e.g., if the server is shutting
   *                              down or the pending operation queue is already
   *                              at its maximum capacity).
   */
  @Override
  public void submitOperation(AbstractOperation operation)
         throws DirectoryException
  {
    if (shutdownRequested)
    {
      Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
    }
    opQueue.add(operation);
    queueSemaphore.release();
    opsSubmitted.incrementAndGet();
  }
  /**
   * Retrieves the next operation that should be processed by one of the worker
   * threads, blocking if necessary until a new request arrives.  This method
   * should only be called by a worker thread associated with this work queue.
   *
   * @param  workerThread  The worker thread that is requesting the operation.
   *
   * @return  The next operation that should be processed, or <CODE>null</CODE>
   *          if the server is shutting down and no more operations will be
   *          processed.
   */
  public AbstractOperation nextOperation(ParallelWorkerThread workerThread)
  {
    return retryNextOperation(workerThread, 0);
  }
  /**
   * Retrieves the next operation that should be processed by one of the worker
   * threads following a previous failure attempt.  A maximum of five
   * consecutive failures will be allowed before returning <CODE>null</CODE>,
   * which will cause the associated thread to exit.
   *
   * @param  workerThread  The worker thread that is requesting the operation.
   * @param  numFailures   The number of consecutive failures that the worker
   *                       thread has experienced so far.  If this gets too
   *                       high, then this method will return <CODE>null</CODE>
   *                       rather than retrying.
   *
   * @return  The next operation that should be processed, or <CODE>null</CODE>
   *          if the server is shutting down and no more operations will be
   *          processed, or if there have been too many consecutive failures.
   */
  private AbstractOperation retryNextOperation(
                                       ParallelWorkerThread workerThread,
                                       int numFailures)
  {
    // See if we should kill off this thread.  This could be necessary if the
    // number of worker threads has been decreased with the server online. If
    // so, then return null and the thread will exit.
    if (killThreads)
    {
      synchronized (queueLock)
      {
        try
        {
          int currentThreads = workerThreads.size();
          if (currentThreads > numWorkerThreads)
          {
            if (workerThreads.remove((ParallelWorkerThread)
              Thread.currentThread()))
            {
              currentThreads--;
            }
            if (currentThreads <= numWorkerThreads)
            {
              killThreads = false;
            }
            workerThread.setStoppedByReducedThreadNumber();
            return null;
          }
        }
        catch (Exception e)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
    }
    if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
    {
      if (numFailures > MAX_RETRY_COUNT)
      {
        Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
            Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
        logError(message);
      }
      return null;
    }
    try
    {
      while (true)
      {
        AbstractOperation nextOperation = null;
        if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
          nextOperation = opQueue.poll();
        }
        if (nextOperation == null)
        {
          // There was no work to do in the specified length of time.  See if
          // we should shutdown, and if not then just check again.
          if (shutdownRequested)
          {
            return null;
          }
          else if (killThreads)
          {
            synchronized (queueLock)
            {
              try
              {
                int currentThreads = workerThreads.size();
                if (currentThreads > numWorkerThreads)
                {
                  if (workerThreads.remove((ParallelWorkerThread)
                    Thread.currentThread()))
                  {
                    currentThreads--;
                  }
                  if (currentThreads <= numWorkerThreads)
                  {
                    killThreads = false;
                  }
                  workerThread.setStoppedByReducedThreadNumber();
                  return null;
                }
              }
              catch (Exception e)
              {
                if (debugEnabled())
                {
                  TRACER.debugCaught(DebugLogLevel.ERROR, e);
                }
              }
            }
          }
        }
        else
        {
          return nextOperation;
        }
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      // This should not happen.  The only recourse we have is to log a message
      // and try again.
      logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
          Thread.currentThread().getName(), String.valueOf(e)));
      return retryNextOperation(workerThread, numFailures + 1);
    }
  }
  /**
   * Attempts to remove the specified operation from this queue if it has not
   * yet been picked up for processing by one of the worker threads.
   *
   * @param  operation  The operation to remove from the queue.
   *
   * @return  <CODE>true</CODE> if the provided request was present in the queue
   *          and was removed successfully, or <CODE>false</CODE> it not.
   */
  public boolean removeOperation(AbstractOperation operation)
  {
    return opQueue.remove(operation);
  }
  /**
   * Retrieves the total number of operations that have been successfully
   * submitted to this work queue for processing since server startup.  This
   * does not include operations that have been rejected for some reason like
   * the queue already at its maximum capacity.
   *
   * @return  The total number of operations that have been successfully
   *          submitted to this work queue since startup.
   */
  public long getOpsSubmitted()
  {
    return opsSubmitted.longValue();
  }
  /**
   * Retrieves the number of pending operations in the queue that have not yet
   * been picked up for processing.  Note that this method is not a
   * constant-time operation and can be relatively inefficient, so it should be
   * used sparingly.
   *
   * @return  The number of pending operations in the queue that have not yet
   *          been picked up for processing.
   */
  public int size()
  {
    return opQueue.size();
  }
  /**
   * {@inheritDoc}
   */
  public boolean isConfigurationChangeAcceptable(
                      ParallelWorkQueueCfg configuration,
                      List<Message> unacceptableReasons)
  {
    // The provided configuration will always be acceptable.
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationChange(
                                 ParallelWorkQueueCfg configuration)
  {
    ArrayList<Message> resultMessages = new ArrayList<Message>();
    int newNumThreads  = getNumWorkerThreads(configuration);
    // Apply a change to the number of worker threads if appropriate.
    int currentThreads = workerThreads.size();
    if (newNumThreads != currentThreads)
    {
      synchronized (queueLock)
      {
        try
        {
          int threadsToAdd = newNumThreads - currentThreads;
          if (threadsToAdd > 0)
          {
            for (int i = 0; i < threadsToAdd; i++)
            {
              ParallelWorkerThread t =
                   new ParallelWorkerThread(this, lastThreadNumber++);
              workerThreads.add(t);
              t.start();
            }
            killThreads = false;
          }
          else
          {
            killThreads = true;
          }
          numWorkerThreads = newNumThreads;
        }
        catch (Exception e)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
  }
  /**
   * {@inheritDoc}
   */
  @Override()
  public boolean isIdle()
  {
    if (opQueue.size() > 0) {
      return false;
    }
    synchronized (queueLock)
    {
      for (ParallelWorkerThread t : workerThreads)
      {
        if (t.isActive())
        {
          return false;
        }
      }
      return true;
    }
  }
  // Determine the number of worker threads.
  private int getNumWorkerThreads(ParallelWorkQueueCfg configuration)
  {
    if (configuration.getNumWorkerThreads() == null)
    {
      // Automatically choose based on the number of processors.
      int cpus = Runtime.getRuntime().availableProcessors();
      int value = Math.max(24, cpus * 2);
      Message message = INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL.get(value);
      logError(message);
      return value;
    }
    else
    {
      return configuration.getNumWorkerThreads();
    }
  }
}
opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java
New file
@@ -0,0 +1,314 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.extensions;
import org.opends.messages.Message;
import java.util.Map;
import org.opends.server.api.DirectoryThread;
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.CancelRequest;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DisconnectReason;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a data structure for storing and interacting with a
 * Directory Server worker thread.
 */
public class ParallelWorkerThread
       extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  // Indicates whether the Directory Server is shutting down and this thread
  // should stop running.
  private boolean shutdownRequested;
  // Indicates whether this thread was stopped because the server threadnumber
  // was reduced.
  private boolean stoppedByReducedThreadNumber;
  // Indicates whether this thread is currently waiting for work.
  private boolean waitingForWork;
  // The operation that this worker thread is currently processing.
  private AbstractOperation operation;
  // The handle to the actual thread for this worker thread.
  private Thread workerThread;
  // The work queue that this worker thread will service.
  private ParallelWorkQueue workQueue;
  /**
   * Creates a new worker thread that will service the provided work queue and
   * process any new requests that are submitted.
   *
   * @param  workQueue  The work queue with which this worker thread is
   *                    associated.
   * @param  threadID   The thread ID for this worker thread.
   */
  public ParallelWorkerThread(ParallelWorkQueue workQueue, int threadID)
  {
    super("Worker Thread " + threadID);
    this.workQueue = workQueue;
    stoppedByReducedThreadNumber = false;
    shutdownRequested            = false;
    waitingForWork               = false;
    operation                    = null;
    workerThread                 = null;
  }
  /**
   * Indicates that this thread is about to be stopped because the Directory
   * Server configuration has been updated to reduce the number of worker
   * threads.
   */
  public void setStoppedByReducedThreadNumber()
  {
    stoppedByReducedThreadNumber = true;
  }
  /**
   * Indicates whether this worker thread is actively processing a request.
   * Note that this is a point-in-time determination and if a reliable answer is
   * expected then the server should impose some external constraint to ensure
   * that no new requests are enqueued.
   *
   * @return  {@code true} if this worker thread is actively processing a
   *          request, or {@code false} if it is idle.
   */
  public boolean isActive()
  {
    return (isAlive() && (operation != null));
  }
  /**
   * Operates in a loop, retrieving the next request from the work queue,
   * processing it, and then going back to the queue for more.
   */
  @Override
  public void run()
  {
    workerThread = currentThread();
    while (! shutdownRequested)
    {
      try
      {
        waitingForWork = true;
        operation = null;
        operation = workQueue.nextOperation(this);
        waitingForWork = false;
        if (operation == null)
        {
          // The operation may be null if the server is shutting down.  If that
          // is the case, then break out of the while loop.
          break;
        }
        else
        {
          // The operation is not null, so process it.  Make sure that when
          // processing is complete.
          operation.run();
          operation.operationCompleted();
        }
      }
      catch (Throwable t)
      {
        if (debugEnabled())
        {
          TRACER.debugWarning(
            "Uncaught exception in worker thread while processing " +
                "operation %s: %s", String.valueOf(operation), t);
          TRACER.debugCaught(DebugLogLevel.ERROR, t);
        }
        try
        {
          Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.
              get(getName(), String.valueOf(operation),
                  stackTraceToSingleLineString(t));
          logError(message);
          operation.setResultCode(DirectoryServer.getServerErrorResultCode());
          operation.appendErrorMessage(message);
          operation.getClientConnection().sendResponse(operation);
        }
        catch (Throwable t2)
        {
          if (debugEnabled())
          {
            TRACER.debugWarning(
              "Exception in worker thread while trying to log a " +
                  "message about an uncaught exception %s: %s", t, t2);
            TRACER.debugCaught(DebugLogLevel.ERROR, t2);
          }
        }
        try
        {
          Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(),
                                      String.valueOf(operation),
                                      stackTraceToSingleLineString(t));
          operation.disconnectClient(DisconnectReason.SERVER_ERROR,
                                     true, message);
        }
        catch (Throwable t2)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, t2);
          }
        }
      }
    }
    // If we have gotten here, then we presume that the server thread is
    // shutting down.  However, if that's not the case then that is a problem
    // and we will want to log a message.
    if (stoppedByReducedThreadNumber)
    {
      logError(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER.get(getName()));
    }
    else if (! workQueue.shutdownRequested())
    {
      logError(WARN_UNEXPECTED_WORKER_THREAD_EXIT.get(getName()));
    }
    if (debugEnabled())
    {
      TRACER.debugInfo(getName() + " exiting.");
    }
  }
  /**
   * Indicates that the Directory Server has received a request to stop running
   * and that this thread should stop running as soon as possible.
   */
  public void shutDown()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo(getName() + " being signaled to shut down.");
    }
    // Set a flag that indicates that the thread should stop running.
    shutdownRequested = true;
    // Check to see if the thread is waiting for work.  If so, then interrupt
    // it.
    if (waitingForWork)
    {
      try
      {
        workerThread.interrupt();
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugWarning(
            "Caught an exception while trying to interrupt the worker " +
                "thread waiting for work: %s", e);
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    }
    else
    {
      try
      {
        CancelRequest cancelRequest =
          new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get());
        operation.cancel(cancelRequest);
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugWarning(
            "Caught an exception while trying to abandon the " +
                "operation in progress for the worker thread: %s", e);
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    }
  }
  /**
   * Retrieves any relevent debug information with which this tread is
   * associated so they can be included in debug messages.
   *
   * @return debug information about this thread as a string.
   */
  @Override
  public Map<String, String> getDebugProperties()
  {
    Map<String, String> properties = super.getDebugProperties();
    properties.put("clientConnection",
                   operation.getClientConnection().toString());
    properties.put("operation", operation.toString());
    return properties;
  }
}
opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java
New file
@@ -0,0 +1,233 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.monitors;
import java.util.ArrayList;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.AttributeSyntax;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.extensions.ParallelWorkQueue;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
/**
 * This class defines a Directory Server monitor that can be used to provide
 * information about the state of the work queue.
 */
public class ParallelWorkQueueMonitor
       extends MonitorProvider<MonitorProviderCfg>
{
  /**
   * The name to use for the monitor attribute that provides the current request
   * backlog.
   */
  public static final String ATTR_CURRENT_BACKLOG = "currentRequestBacklog";
  /**
   * The name to use for the monitor attribute that provides the average request
   * backlog.
   */
  public static final String ATTR_AVERAGE_BACKLOG = "averageRequestBacklog";
  /**
   * The name to use for the monitor attribute that provides the maximum
   * observed request backlog.
   */
  public static final String ATTR_MAX_BACKLOG = "maxRequestBacklog";
  /**
   * The name to use for the monitor attribute that provides the total number of
   * operations submitted.
   */
  public static final String ATTR_OPS_SUBMITTED = "requestsSubmitted";
  // The maximum backlog observed by polling the queue.
  private int maxBacklog;
  // The total number of times the backlog has been polled.
  private long numPolls;
  // The total backlog observed from periodic polling.
  private long totalBacklog;
  // The parallel work queue instance with which this monitor is associated.
  private ParallelWorkQueue workQueue;
  /**
   * Initializes this monitor provider.  Note that no initialization should be
   * done here, since it should be performed in the
   * <CODE>initializeMonitorProvider</CODE> class.
   *
   * @param  workQueue  The work queue with which this monitor is associated.
   */
  public ParallelWorkQueueMonitor(ParallelWorkQueue workQueue)
  {
    super("Work Queue Monitor Provider");
    this.workQueue = workQueue;
  }
  /**
   * {@inheritDoc}
   */
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
         throws ConfigException, InitializationException
  {
    maxBacklog   = 0;
    totalBacklog = 0;
    numPolls     = 0;
  }
  /**
   * Retrieves the name of this monitor provider.  It should be unique among all
   * monitor providers, including all instances of the same monitor provider.
   *
   * @return  The name of this monitor provider.
   */
  public String getMonitorInstanceName()
  {
    return "Work Queue";
  }
  /**
   * Retrieves the length of time in milliseconds that should elapse between
   * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
   * return value indicates that the <CODE>updateMonitorData()</CODE> method
   * should not be periodically invoked.
   *
   * @return  The length of time in milliseconds that should elapse between
   *          calls to the <CODE>updateMonitorData()</CODE> method.
   */
  public long getUpdateInterval()
  {
    // We will poll the work queue every 10 seconds.
    return 10000;
  }
  /**
   * Performs any processing periodic processing that may be desired to update
   * the information associated with this monitor.  Note that best-effort
   * attempts will be made to ensure that calls to this method come
   * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
   * be made.
   */
  public void updateMonitorData()
  {
    int backlog = workQueue.size();
    totalBacklog += backlog;
    numPolls++;
    if (backlog > maxBacklog)
    {
      maxBacklog = backlog;
    }
  }
  /**
   * Retrieves a set of attributes containing monitor data that should be
   * returned to the client if the corresponding monitor entry is requested.
   *
   * @return  A set of attributes containing monitor data that should be
   *          returned to the client if the corresponding monitor entry is
   *          requested.
   */
  public ArrayList<Attribute> getMonitorData()
  {
    int backlog = workQueue.size();
    totalBacklog += backlog;
    numPolls++;
    if (backlog > maxBacklog)
    {
      maxBacklog = backlog;
    }
    long averageBacklog = (long) (1.0 * totalBacklog / numPolls);
    long opsSubmitted = workQueue.getOpsSubmitted();
    ArrayList<Attribute> monitorAttrs = new ArrayList<Attribute>();
    AttributeSyntax<?> integerSyntax = DirectoryServer
        .getDefaultIntegerSyntax();
    // The current backlog.
    AttributeType attrType = DirectoryServer.getDefaultAttributeType(
        ATTR_CURRENT_BACKLOG, integerSyntax);
    monitorAttrs
        .add(Attributes.create(attrType, String.valueOf(backlog)));
    // The average backlog.
    attrType = DirectoryServer.getDefaultAttributeType(ATTR_AVERAGE_BACKLOG,
        integerSyntax);
    monitorAttrs.add(Attributes.create(attrType, String
        .valueOf(averageBacklog)));
    // The maximum backlog.
    attrType = DirectoryServer.getDefaultAttributeType(ATTR_MAX_BACKLOG,
        integerSyntax);
    monitorAttrs.add(Attributes.create(attrType, String
        .valueOf(maxBacklog)));
    // The total number of operations submitted.
    attrType = DirectoryServer.getDefaultAttributeType(ATTR_OPS_SUBMITTED,
        integerSyntax);
    monitorAttrs.add(Attributes.create(attrType, String
        .valueOf(opsSubmitted)));
    return monitorAttrs;
  }
}