From 4c83513d36b8f0b29b0e6b4b7ed2bb8167a7b82a Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 14:46:06 +0000
Subject: [PATCH] - EXPERIMENTAL Parallel Work Queue implementation.

---
 opends/resource/schema/02-config.ldif                                                |    8 
 opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml |   91 ++++
 opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java           |  233 +++++++++++
 opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java             |  314 ++++++++++++++
 opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java                |  605 ++++++++++++++++++++++++++++
 5 files changed, 1,250 insertions(+), 1 deletions(-)

diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 12f31ef..158ad3b 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -4031,7 +4031,7 @@
   NAME 'ds-cfg-qos-policy'
   SUP top
   STRUCTURAL
-  MUST ( cn $ 
+  MUST ( cn $
          ds-cfg-java-class)
   X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.185
@@ -4132,4 +4132,10 @@
   SUP ds-cfg-plugin
   STRUCTURAL
   X-ORIGIN 'OpenDS Directory Server' )
+objectClasses: ( 1.3.6.1.4.1.26027.1.2.233
+  NAME 'ds-cfg-parallel-work-queue'
+  SUP ds-cfg-work-queue
+  STRUCTURAL
+  MAY ( ds-cfg-num-worker-threads )
+  X-ORIGIN 'OpenDS Directory Server' )
 
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml
new file mode 100644
index 0000000..ad915a8
--- /dev/null
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  ! CDDL HEADER START
+  !
+  ! The contents of this file are subject to the terms of the
+  ! Common Development and Distribution License, Version 1.0 only
+  ! (the "License").  You may not use this file except in compliance
+  ! with the License.
+  !
+  ! You can obtain a copy of the license at
+  ! trunk/opends/resource/legal-notices/OpenDS.LICENSE
+  ! or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+  ! See the License for the specific language governing permissions
+  ! and limitations under the License.
+  !
+  ! When distributing Covered Code, include this CDDL HEADER in each
+  ! file and include the License file at
+  ! trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+  ! add the following below this CDDL HEADER, with the fields enclosed
+  ! by brackets "[]" replaced with your own identifying information:
+  !      Portions Copyright [yyyy] [name of copyright owner]
+  !
+  ! CDDL HEADER END
+  !
+  !
+  !      Copyright 2007-2009 Sun Microsystems, Inc.
+  ! -->
+<adm:managed-object name="parallel-work-queue"
+  plural-name="parallel-work-queues" extends="work-queue"
+  package="org.opends.server.admin.std"
+  xmlns:adm="http://www.opends.org/admin"
+  xmlns:ldap="http://www.opends.org/admin-ldap">
+  <adm:synopsis>
+    The
+    <adm:user-friendly-name />
+    is a type of work queue that uses a number of worker threads that
+    watch a queue and pick up an operation to process whenever one
+    becomes available.
+  </adm:synopsis>
+  <adm:description>
+    The parallel work queue is a FIFO queue serviced by a fixed
+    number of worker threads. This fixed number of threads can be
+    changed on the fly, with the change taking effect as soon as
+    it is made. This work queue implementation is unbound ie it
+    does not block after reaching certain queue size and as such
+    should only be used on a very well tuned server configuration
+    to avoid potential out of memory errors.
+  </adm:description>
+  <adm:profile name="ldap">
+    <ldap:object-class>
+      <ldap:name>ds-cfg-parallel-work-queue</ldap:name>
+      <ldap:superior>ds-cfg-work-queue</ldap:superior>
+    </ldap:object-class>
+  </adm:profile>
+  <adm:property-override name="java-class" advanced="true">
+    <adm:default-behavior>
+      <adm:defined>
+        <adm:value>
+          org.opends.server.extensions.ParallelWorkQueue
+        </adm:value>
+      </adm:defined>
+    </adm:default-behavior>
+  </adm:property-override>
+  <adm:property name="num-worker-threads">
+    <adm:synopsis>
+      Specifies the number of worker threads to be used for processing
+      operations placed in the queue.
+  </adm:synopsis>
+  <adm:description>
+      If the value is increased,
+      the additional worker threads are created immediately. If the
+      value is reduced, the appropriate number of threads are destroyed
+      as operations complete processing.
+    </adm:description>
+    <adm:default-behavior>
+      <adm:alias>
+        <adm:synopsis>
+          Let the server decide.
+        </adm:synopsis>
+      </adm:alias>
+    </adm:default-behavior>
+    <adm:syntax>
+      <adm:integer lower-limit="1" upper-limit="2147483647" />
+    </adm:syntax>
+    <adm:profile name="ldap">
+      <ldap:attribute>
+        <ldap:name>ds-cfg-num-worker-threads</ldap:name>
+      </ldap:attribute>
+    </adm:profile>
+  </adm:property>
+</adm:managed-object>
diff --git a/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java b/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
new file mode 100644
index 0000000..d9b2e10
--- /dev/null
+++ b/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
@@ -0,0 +1,605 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.extensions;
+
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opends.messages.Message;
+import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.server.ParallelWorkQueueCfg;
+import org.opends.server.api.WorkQueue;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.monitors.ParallelWorkQueueMonitor;
+import org.opends.server.types.AbstractOperation;
+import org.opends.server.types.CancelRequest;
+import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.Operation;
+import org.opends.server.types.ResultCode;
+
+import static org.opends.messages.ConfigMessages.*;
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+
+
+/**
+ * This class defines a data structure for storing and interacting with the
+ * Directory Server work queue.
+ */
+public class ParallelWorkQueue
+       extends WorkQueue<ParallelWorkQueueCfg>
+       implements ConfigurationChangeListener<ParallelWorkQueueCfg>
+{
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
+
+
+
+  /**
+   * The maximum number of times to retry getting the next operation from the
+   * queue if an unexpected failure occurs.
+   */
+  private static final int MAX_RETRY_COUNT = 5;
+
+
+
+  // The set of worker threads that will be used to process this work queue.
+  private ArrayList<ParallelWorkerThread> workerThreads;
+
+  // The number of operations that have been submitted to the work queue for
+  // processing.
+  private AtomicLong opsSubmitted;
+
+  // Indicates whether one or more of the worker threads needs to be killed at
+  // the next convenient opportunity.
+  private boolean killThreads;
+
+  // Indicates whether the Directory Server is shutting down.
+  private boolean shutdownRequested;
+
+  // The thread number used for the last worker thread that was created.
+  private int lastThreadNumber;
+
+  // The number of worker threads that should be active (or will be shortly if
+  // a configuration change has not been completely applied).
+  private int numWorkerThreads;
+
+  // The queue that will be used to actually hold the pending operations.
+  private ConcurrentLinkedQueue<AbstractOperation> opQueue;
+
+  // The lock used to provide threadsafe access for the queue.
+  private final Object queueLock = new Object();
+
+
+  private final Semaphore queueSemaphore = new Semaphore(0, false);
+
+
+  /**
+   * Creates a new instance of this work queue.  All initialization should be
+   * performed in the <CODE>initializeWorkQueue</CODE> method.
+   */
+  public ParallelWorkQueue()
+  {
+    // No implementation should be performed here.
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override()
+  public void initializeWorkQueue(ParallelWorkQueueCfg configuration)
+         throws ConfigException, InitializationException
+  {
+    shutdownRequested = false;
+    killThreads       = false;
+    opsSubmitted      = new AtomicLong(0);
+
+    // Register to be notified of any configuration changes.
+    configuration.addParallelChangeListener(this);
+
+    // Get the necessary configuration from the provided entry.
+    numWorkerThreads = getNumWorkerThreads(configuration);
+
+    // Create the actual work queue.
+    opQueue = new ConcurrentLinkedQueue<AbstractOperation>();
+
+    // Create the set of worker threads that should be used to service the
+    // work queue.
+    workerThreads = new ArrayList<ParallelWorkerThread>(numWorkerThreads);
+    for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
+    lastThreadNumber++)
+    {
+      ParallelWorkerThread t =
+           new ParallelWorkerThread(this, lastThreadNumber);
+      t.start();
+      workerThreads.add(t);
+    }
+
+
+    // Create and register a monitor provider for the work queue.
+    try
+    {
+      ParallelWorkQueueMonitor monitor =
+           new ParallelWorkQueueMonitor(this);
+      monitor.initializeMonitorProvider(null);
+      monitor.start();
+      DirectoryServer.registerMonitorProvider(monitor);
+    }
+    catch (Exception e)
+    {
+      if (debugEnabled())
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
+
+      Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
+          String.valueOf(ParallelWorkQueueMonitor.class), String.valueOf(e));
+      logError(message);
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override()
+  public void finalizeWorkQueue(Message reason)
+  {
+    shutdownRequested = true;
+
+
+    // Send responses to any operations in the pending queue to indicate that
+    // they won't be processed because the server is shutting down.
+    CancelRequest cancelRequest = new CancelRequest(true, reason);
+    ArrayList<Operation> pendingOperations = new ArrayList<Operation>();
+    opQueue.removeAll(pendingOperations);
+
+    for (Operation o : pendingOperations)
+    {
+      try
+      {
+        // The operation has no chance of responding to the cancel
+        // request so avoid waiting for a cancel response.
+        if (o.getCancelResult() == null) {
+          o.abort(cancelRequest);
+        }
+      }
+      catch (Exception e)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+
+        logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
+            String.valueOf(o), String.valueOf(e)));
+      }
+    }
+
+
+    // Notify all the worker threads of the shutdown.
+    for (ParallelWorkerThread t : workerThreads)
+    {
+      try
+      {
+        t.shutDown();
+      }
+      catch (Exception e)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+
+        logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
+            t.getName(), String.valueOf(e)));
+      }
+    }
+  }
+
+
+
+  /**
+   * Indicates whether this work queue has received a request to shut down.
+   *
+   * @return  <CODE>true</CODE> if the work queue has recieved a request to shut
+   *          down, or <CODE>false</CODE> if not.
+   */
+  public boolean shutdownRequested()
+  {
+    return shutdownRequested;
+  }
+
+
+
+  /**
+   * Submits an operation to be processed by one of the worker threads
+   * associated with this work queue.
+   *
+   * @param  operation  The operation to be processed.
+   *
+   * @throws  DirectoryException  If the provided operation is not accepted for
+   *                              some reason (e.g., if the server is shutting
+   *                              down or the pending operation queue is already
+   *                              at its maximum capacity).
+   */
+  @Override
+  public void submitOperation(AbstractOperation operation)
+         throws DirectoryException
+  {
+    if (shutdownRequested)
+    {
+      Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
+      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
+    }
+
+    opQueue.add(operation);
+    queueSemaphore.release();
+
+    opsSubmitted.incrementAndGet();
+  }
+
+
+
+  /**
+   * Retrieves the next operation that should be processed by one of the worker
+   * threads, blocking if necessary until a new request arrives.  This method
+   * should only be called by a worker thread associated with this work queue.
+   *
+   * @param  workerThread  The worker thread that is requesting the operation.
+   *
+   * @return  The next operation that should be processed, or <CODE>null</CODE>
+   *          if the server is shutting down and no more operations will be
+   *          processed.
+   */
+  public AbstractOperation nextOperation(ParallelWorkerThread workerThread)
+  {
+    return retryNextOperation(workerThread, 0);
+  }
+
+
+
+  /**
+   * Retrieves the next operation that should be processed by one of the worker
+   * threads following a previous failure attempt.  A maximum of five
+   * consecutive failures will be allowed before returning <CODE>null</CODE>,
+   * which will cause the associated thread to exit.
+   *
+   * @param  workerThread  The worker thread that is requesting the operation.
+   * @param  numFailures   The number of consecutive failures that the worker
+   *                       thread has experienced so far.  If this gets too
+   *                       high, then this method will return <CODE>null</CODE>
+   *                       rather than retrying.
+   *
+   * @return  The next operation that should be processed, or <CODE>null</CODE>
+   *          if the server is shutting down and no more operations will be
+   *          processed, or if there have been too many consecutive failures.
+   */
+  private AbstractOperation retryNextOperation(
+                                       ParallelWorkerThread workerThread,
+                                       int numFailures)
+  {
+    // See if we should kill off this thread.  This could be necessary if the
+    // number of worker threads has been decreased with the server online. If
+    // so, then return null and the thread will exit.
+    if (killThreads)
+    {
+      synchronized (queueLock)
+      {
+        try
+        {
+          int currentThreads = workerThreads.size();
+          if (currentThreads > numWorkerThreads)
+          {
+            if (workerThreads.remove((ParallelWorkerThread)
+              Thread.currentThread()))
+            {
+              currentThreads--;
+            }
+
+            if (currentThreads <= numWorkerThreads)
+            {
+              killThreads = false;
+            }
+
+            workerThread.setStoppedByReducedThreadNumber();
+            return null;
+          }
+        }
+        catch (Exception e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
+        }
+      }
+    }
+
+    if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
+    {
+      if (numFailures > MAX_RETRY_COUNT)
+      {
+        Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
+            Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
+        logError(message);
+      }
+
+      return null;
+    }
+
+    try
+    {
+      while (true)
+      {
+        AbstractOperation nextOperation = null;
+        if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
+          nextOperation = opQueue.poll();
+        }
+        if (nextOperation == null)
+        {
+          // There was no work to do in the specified length of time.  See if
+          // we should shutdown, and if not then just check again.
+          if (shutdownRequested)
+          {
+            return null;
+          }
+          else if (killThreads)
+          {
+            synchronized (queueLock)
+            {
+              try
+              {
+                int currentThreads = workerThreads.size();
+                if (currentThreads > numWorkerThreads)
+                {
+                  if (workerThreads.remove((ParallelWorkerThread)
+                    Thread.currentThread()))
+                  {
+                    currentThreads--;
+                  }
+
+                  if (currentThreads <= numWorkerThreads)
+                  {
+                    killThreads = false;
+                  }
+
+                  workerThread.setStoppedByReducedThreadNumber();
+                  return null;
+                }
+              }
+              catch (Exception e)
+              {
+                if (debugEnabled())
+                {
+                  TRACER.debugCaught(DebugLogLevel.ERROR, e);
+                }
+              }
+            }
+          }
+        }
+        else
+        {
+          return nextOperation;
+        }
+      }
+    }
+    catch (Exception e)
+    {
+      if (debugEnabled())
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
+
+      // This should not happen.  The only recourse we have is to log a message
+      // and try again.
+      logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
+          Thread.currentThread().getName(), String.valueOf(e)));
+      return retryNextOperation(workerThread, numFailures + 1);
+    }
+  }
+
+
+
+  /**
+   * Attempts to remove the specified operation from this queue if it has not
+   * yet been picked up for processing by one of the worker threads.
+   *
+   * @param  operation  The operation to remove from the queue.
+   *
+   * @return  <CODE>true</CODE> if the provided request was present in the queue
+   *          and was removed successfully, or <CODE>false</CODE> it not.
+   */
+  public boolean removeOperation(AbstractOperation operation)
+  {
+    return opQueue.remove(operation);
+  }
+
+
+
+  /**
+   * Retrieves the total number of operations that have been successfully
+   * submitted to this work queue for processing since server startup.  This
+   * does not include operations that have been rejected for some reason like
+   * the queue already at its maximum capacity.
+   *
+   * @return  The total number of operations that have been successfully
+   *          submitted to this work queue since startup.
+   */
+  public long getOpsSubmitted()
+  {
+    return opsSubmitted.longValue();
+  }
+
+
+
+  /**
+   * Retrieves the number of pending operations in the queue that have not yet
+   * been picked up for processing.  Note that this method is not a
+   * constant-time operation and can be relatively inefficient, so it should be
+   * used sparingly.
+   *
+   * @return  The number of pending operations in the queue that have not yet
+   *          been picked up for processing.
+   */
+  public int size()
+  {
+    return opQueue.size();
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean isConfigurationChangeAcceptable(
+                      ParallelWorkQueueCfg configuration,
+                      List<Message> unacceptableReasons)
+  {
+    // The provided configuration will always be acceptable.
+    return true;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  public ConfigChangeResult applyConfigurationChange(
+                                 ParallelWorkQueueCfg configuration)
+  {
+    ArrayList<Message> resultMessages = new ArrayList<Message>();
+    int newNumThreads  = getNumWorkerThreads(configuration);
+
+    // Apply a change to the number of worker threads if appropriate.
+    int currentThreads = workerThreads.size();
+    if (newNumThreads != currentThreads)
+    {
+      synchronized (queueLock)
+      {
+        try
+        {
+          int threadsToAdd = newNumThreads - currentThreads;
+          if (threadsToAdd > 0)
+          {
+            for (int i = 0; i < threadsToAdd; i++)
+            {
+              ParallelWorkerThread t =
+                   new ParallelWorkerThread(this, lastThreadNumber++);
+              workerThreads.add(t);
+              t.start();
+            }
+
+            killThreads = false;
+          }
+          else
+          {
+            killThreads = true;
+          }
+
+          numWorkerThreads = newNumThreads;
+        }
+        catch (Exception e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
+        }
+      }
+    }
+
+    return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override()
+  public boolean isIdle()
+  {
+    if (opQueue.size() > 0) {
+      return false;
+    }
+
+    synchronized (queueLock)
+    {
+      for (ParallelWorkerThread t : workerThreads)
+      {
+        if (t.isActive())
+        {
+          return false;
+        }
+      }
+
+      return true;
+    }
+  }
+
+
+
+  // Determine the number of worker threads.
+  private int getNumWorkerThreads(ParallelWorkQueueCfg configuration)
+  {
+    if (configuration.getNumWorkerThreads() == null)
+    {
+      // Automatically choose based on the number of processors.
+      int cpus = Runtime.getRuntime().availableProcessors();
+      int value = Math.max(24, cpus * 2);
+
+      Message message = INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL.get(value);
+      logError(message);
+
+      return value;
+    }
+    else
+    {
+      return configuration.getNumWorkerThreads();
+    }
+  }
+}
diff --git a/opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java b/opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java
new file mode 100644
index 0000000..8a3b61f
--- /dev/null
+++ b/opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java
@@ -0,0 +1,314 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.extensions;
+import org.opends.messages.Message;
+
+
+
+import java.util.Map;
+
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.types.AbstractOperation;
+import org.opends.server.types.CancelRequest;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DisconnectReason;
+
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.util.StaticUtils.*;
+
+
+/**
+ * This class defines a data structure for storing and interacting with a
+ * Directory Server worker thread.
+ */
+public class ParallelWorkerThread
+       extends DirectoryThread
+{
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
+  // Indicates whether the Directory Server is shutting down and this thread
+  // should stop running.
+  private boolean shutdownRequested;
+
+  // Indicates whether this thread was stopped because the server threadnumber
+  // was reduced.
+  private boolean stoppedByReducedThreadNumber;
+
+  // Indicates whether this thread is currently waiting for work.
+  private boolean waitingForWork;
+
+  // The operation that this worker thread is currently processing.
+  private AbstractOperation operation;
+
+  // The handle to the actual thread for this worker thread.
+  private Thread workerThread;
+
+  // The work queue that this worker thread will service.
+  private ParallelWorkQueue workQueue;
+
+
+
+  /**
+   * Creates a new worker thread that will service the provided work queue and
+   * process any new requests that are submitted.
+   *
+   * @param  workQueue  The work queue with which this worker thread is
+   *                    associated.
+   * @param  threadID   The thread ID for this worker thread.
+   */
+  public ParallelWorkerThread(ParallelWorkQueue workQueue, int threadID)
+  {
+    super("Worker Thread " + threadID);
+
+
+    this.workQueue = workQueue;
+
+    stoppedByReducedThreadNumber = false;
+    shutdownRequested            = false;
+    waitingForWork               = false;
+    operation                    = null;
+    workerThread                 = null;
+  }
+
+
+
+  /**
+   * Indicates that this thread is about to be stopped because the Directory
+   * Server configuration has been updated to reduce the number of worker
+   * threads.
+   */
+  public void setStoppedByReducedThreadNumber()
+  {
+    stoppedByReducedThreadNumber = true;
+  }
+
+
+
+  /**
+   * Indicates whether this worker thread is actively processing a request.
+   * Note that this is a point-in-time determination and if a reliable answer is
+   * expected then the server should impose some external constraint to ensure
+   * that no new requests are enqueued.
+   *
+   * @return  {@code true} if this worker thread is actively processing a
+   *          request, or {@code false} if it is idle.
+   */
+  public boolean isActive()
+  {
+    return (isAlive() && (operation != null));
+  }
+
+
+
+  /**
+   * Operates in a loop, retrieving the next request from the work queue,
+   * processing it, and then going back to the queue for more.
+   */
+  @Override
+  public void run()
+  {
+    workerThread = currentThread();
+
+    while (! shutdownRequested)
+    {
+      try
+      {
+        waitingForWork = true;
+        operation = null;
+        operation = workQueue.nextOperation(this);
+        waitingForWork = false;
+
+
+        if (operation == null)
+        {
+          // The operation may be null if the server is shutting down.  If that
+          // is the case, then break out of the while loop.
+          break;
+        }
+        else
+        {
+          // The operation is not null, so process it.  Make sure that when
+          // processing is complete.
+          operation.run();
+          operation.operationCompleted();
+        }
+      }
+      catch (Throwable t)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugWarning(
+            "Uncaught exception in worker thread while processing " +
+                "operation %s: %s", String.valueOf(operation), t);
+
+          TRACER.debugCaught(DebugLogLevel.ERROR, t);
+        }
+
+        try
+        {
+          Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.
+              get(getName(), String.valueOf(operation),
+                  stackTraceToSingleLineString(t));
+          logError(message);
+
+          operation.setResultCode(DirectoryServer.getServerErrorResultCode());
+          operation.appendErrorMessage(message);
+          operation.getClientConnection().sendResponse(operation);
+        }
+        catch (Throwable t2)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugWarning(
+              "Exception in worker thread while trying to log a " +
+                  "message about an uncaught exception %s: %s", t, t2);
+
+            TRACER.debugCaught(DebugLogLevel.ERROR, t2);
+          }
+        }
+
+
+        try
+        {
+          Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(),
+                                      String.valueOf(operation),
+                                      stackTraceToSingleLineString(t));
+
+          operation.disconnectClient(DisconnectReason.SERVER_ERROR,
+                                     true, message);
+        }
+        catch (Throwable t2)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, t2);
+          }
+        }
+      }
+    }
+
+    // If we have gotten here, then we presume that the server thread is
+    // shutting down.  However, if that's not the case then that is a problem
+    // and we will want to log a message.
+    if (stoppedByReducedThreadNumber)
+    {
+      logError(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER.get(getName()));
+    }
+    else if (! workQueue.shutdownRequested())
+    {
+      logError(WARN_UNEXPECTED_WORKER_THREAD_EXIT.get(getName()));
+    }
+
+
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(getName() + " exiting.");
+    }
+  }
+
+
+
+  /**
+   * Indicates that the Directory Server has received a request to stop running
+   * and that this thread should stop running as soon as possible.
+   */
+  public void shutDown()
+  {
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(getName() + " being signaled to shut down.");
+    }
+
+    // Set a flag that indicates that the thread should stop running.
+    shutdownRequested = true;
+
+
+    // Check to see if the thread is waiting for work.  If so, then interrupt
+    // it.
+    if (waitingForWork)
+    {
+      try
+      {
+        workerThread.interrupt();
+      }
+      catch (Exception e)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugWarning(
+            "Caught an exception while trying to interrupt the worker " +
+                "thread waiting for work: %s", e);
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+      }
+    }
+    else
+    {
+      try
+      {
+        CancelRequest cancelRequest =
+          new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get());
+        operation.cancel(cancelRequest);
+      }
+      catch (Exception e)
+      {
+        if (debugEnabled())
+        {
+          TRACER.debugWarning(
+            "Caught an exception while trying to abandon the " +
+                "operation in progress for the worker thread: %s", e);
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Retrieves any relevent debug information with which this tread is
+   * associated so they can be included in debug messages.
+   *
+   * @return debug information about this thread as a string.
+   */
+  @Override
+  public Map<String, String> getDebugProperties()
+  {
+    Map<String, String> properties = super.getDebugProperties();
+    properties.put("clientConnection",
+                   operation.getClientConnection().toString());
+    properties.put("operation", operation.toString());
+
+    return properties;
+  }
+}
+
diff --git a/opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java b/opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java
new file mode 100644
index 0000000..83066bb
--- /dev/null
+++ b/opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java
@@ -0,0 +1,233 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.monitors;
+
+
+
+import java.util.ArrayList;
+
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.AttributeSyntax;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.extensions.ParallelWorkQueue;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.InitializationException;
+
+
+
+
+/**
+ * This class defines a Directory Server monitor that can be used to provide
+ * information about the state of the work queue.
+ */
+public class ParallelWorkQueueMonitor
+       extends MonitorProvider<MonitorProviderCfg>
+{
+  /**
+   * The name to use for the monitor attribute that provides the current request
+   * backlog.
+   */
+  public static final String ATTR_CURRENT_BACKLOG = "currentRequestBacklog";
+
+
+
+  /**
+   * The name to use for the monitor attribute that provides the average request
+   * backlog.
+   */
+  public static final String ATTR_AVERAGE_BACKLOG = "averageRequestBacklog";
+
+
+
+  /**
+   * The name to use for the monitor attribute that provides the maximum
+   * observed request backlog.
+   */
+  public static final String ATTR_MAX_BACKLOG = "maxRequestBacklog";
+
+
+
+  /**
+   * The name to use for the monitor attribute that provides the total number of
+   * operations submitted.
+   */
+  public static final String ATTR_OPS_SUBMITTED = "requestsSubmitted";
+
+
+
+  // The maximum backlog observed by polling the queue.
+  private int maxBacklog;
+
+  // The total number of times the backlog has been polled.
+  private long numPolls;
+
+  // The total backlog observed from periodic polling.
+  private long totalBacklog;
+
+  // The parallel work queue instance with which this monitor is associated.
+  private ParallelWorkQueue workQueue;
+
+
+
+  /**
+   * Initializes this monitor provider.  Note that no initialization should be
+   * done here, since it should be performed in the
+   * <CODE>initializeMonitorProvider</CODE> class.
+   *
+   * @param  workQueue  The work queue with which this monitor is associated.
+   */
+  public ParallelWorkQueueMonitor(ParallelWorkQueue workQueue)
+  {
+    super("Work Queue Monitor Provider");
+
+
+    this.workQueue = workQueue;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  public void initializeMonitorProvider(MonitorProviderCfg configuration)
+         throws ConfigException, InitializationException
+  {
+    maxBacklog   = 0;
+    totalBacklog = 0;
+    numPolls     = 0;
+  }
+
+
+
+  /**
+   * Retrieves the name of this monitor provider.  It should be unique among all
+   * monitor providers, including all instances of the same monitor provider.
+   *
+   * @return  The name of this monitor provider.
+   */
+  public String getMonitorInstanceName()
+  {
+    return "Work Queue";
+  }
+
+
+
+  /**
+   * Retrieves the length of time in milliseconds that should elapse between
+   * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
+   * return value indicates that the <CODE>updateMonitorData()</CODE> method
+   * should not be periodically invoked.
+   *
+   * @return  The length of time in milliseconds that should elapse between
+   *          calls to the <CODE>updateMonitorData()</CODE> method.
+   */
+  public long getUpdateInterval()
+  {
+    // We will poll the work queue every 10 seconds.
+    return 10000;
+  }
+
+
+
+  /**
+   * Performs any processing periodic processing that may be desired to update
+   * the information associated with this monitor.  Note that best-effort
+   * attempts will be made to ensure that calls to this method come
+   * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
+   * be made.
+   */
+  public void updateMonitorData()
+  {
+    int backlog = workQueue.size();
+    totalBacklog += backlog;
+    numPolls++;
+
+    if (backlog > maxBacklog)
+    {
+      maxBacklog = backlog;
+    }
+  }
+
+
+
+  /**
+   * Retrieves a set of attributes containing monitor data that should be
+   * returned to the client if the corresponding monitor entry is requested.
+   *
+   * @return  A set of attributes containing monitor data that should be
+   *          returned to the client if the corresponding monitor entry is
+   *          requested.
+   */
+  public ArrayList<Attribute> getMonitorData()
+  {
+    int backlog = workQueue.size();
+    totalBacklog += backlog;
+    numPolls++;
+    if (backlog > maxBacklog)
+    {
+      maxBacklog = backlog;
+    }
+
+    long averageBacklog = (long) (1.0 * totalBacklog / numPolls);
+
+    long opsSubmitted = workQueue.getOpsSubmitted();
+
+    ArrayList<Attribute> monitorAttrs = new ArrayList<Attribute>();
+    AttributeSyntax<?> integerSyntax = DirectoryServer
+        .getDefaultIntegerSyntax();
+
+    // The current backlog.
+    AttributeType attrType = DirectoryServer.getDefaultAttributeType(
+        ATTR_CURRENT_BACKLOG, integerSyntax);
+    monitorAttrs
+        .add(Attributes.create(attrType, String.valueOf(backlog)));
+
+    // The average backlog.
+    attrType = DirectoryServer.getDefaultAttributeType(ATTR_AVERAGE_BACKLOG,
+        integerSyntax);
+    monitorAttrs.add(Attributes.create(attrType, String
+        .valueOf(averageBacklog)));
+
+    // The maximum backlog.
+    attrType = DirectoryServer.getDefaultAttributeType(ATTR_MAX_BACKLOG,
+        integerSyntax);
+    monitorAttrs.add(Attributes.create(attrType, String
+        .valueOf(maxBacklog)));
+
+    // The total number of operations submitted.
+    attrType = DirectoryServer.getDefaultAttributeType(ATTR_OPS_SUBMITTED,
+        integerSyntax);
+    monitorAttrs.add(Attributes.create(attrType, String
+        .valueOf(opsSubmitted)));
+
+    return monitorAttrs;
+  }
+}

--
Gitblit v1.10.0