From 4c83513d36b8f0b29b0e6b4b7ed2bb8167a7b82a Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 14:46:06 +0000
Subject: [PATCH] - EXPERIMENTAL Parallel Work Queue implementation.
---
opends/resource/schema/02-config.ldif | 8
opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml | 91 ++++
opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java | 233 +++++++++++
opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java | 314 ++++++++++++++
opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java | 605 ++++++++++++++++++++++++++++
5 files changed, 1,250 insertions(+), 1 deletions(-)
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 12f31ef..158ad3b 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -4031,7 +4031,7 @@
NAME 'ds-cfg-qos-policy'
SUP top
STRUCTURAL
- MUST ( cn $
+ MUST ( cn $
ds-cfg-java-class)
X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.185
@@ -4132,4 +4132,10 @@
SUP ds-cfg-plugin
STRUCTURAL
X-ORIGIN 'OpenDS Directory Server' )
+objectClasses: ( 1.3.6.1.4.1.26027.1.2.233
+ NAME 'ds-cfg-parallel-work-queue'
+ SUP ds-cfg-work-queue
+ STRUCTURAL
+ MAY ( ds-cfg-num-worker-threads )
+ X-ORIGIN 'OpenDS Directory Server' )
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml
new file mode 100644
index 0000000..ad915a8
--- /dev/null
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ParallelWorkQueueConfiguration.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ ! CDDL HEADER START
+ !
+ ! The contents of this file are subject to the terms of the
+ ! Common Development and Distribution License, Version 1.0 only
+ ! (the "License"). You may not use this file except in compliance
+ ! with the License.
+ !
+ ! You can obtain a copy of the license at
+ ! trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ ! or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ ! See the License for the specific language governing permissions
+ ! and limitations under the License.
+ !
+ ! When distributing Covered Code, include this CDDL HEADER in each
+ ! file and include the License file at
+ ! trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ ! add the following below this CDDL HEADER, with the fields enclosed
+ ! by brackets "[]" replaced with your own identifying information:
+ ! Portions Copyright [yyyy] [name of copyright owner]
+ !
+ ! CDDL HEADER END
+ !
+ !
+ ! Copyright 2007-2009 Sun Microsystems, Inc.
+ ! -->
+<adm:managed-object name="parallel-work-queue"
+ plural-name="parallel-work-queues" extends="work-queue"
+ package="org.opends.server.admin.std"
+ xmlns:adm="http://www.opends.org/admin"
+ xmlns:ldap="http://www.opends.org/admin-ldap">
+ <adm:synopsis>
+ The
+ <adm:user-friendly-name />
+ is a type of work queue that uses a number of worker threads that
+ watch a queue and pick up an operation to process whenever one
+ becomes available.
+ </adm:synopsis>
+ <adm:description>
+ The parallel work queue is a FIFO queue serviced by a fixed
+ number of worker threads. This fixed number of threads can be
+ changed on the fly, with the change taking effect as soon as
+ it is made. This work queue implementation is unbound ie it
+ does not block after reaching certain queue size and as such
+ should only be used on a very well tuned server configuration
+ to avoid potential out of memory errors.
+ </adm:description>
+ <adm:profile name="ldap">
+ <ldap:object-class>
+ <ldap:name>ds-cfg-parallel-work-queue</ldap:name>
+ <ldap:superior>ds-cfg-work-queue</ldap:superior>
+ </ldap:object-class>
+ </adm:profile>
+ <adm:property-override name="java-class" advanced="true">
+ <adm:default-behavior>
+ <adm:defined>
+ <adm:value>
+ org.opends.server.extensions.ParallelWorkQueue
+ </adm:value>
+ </adm:defined>
+ </adm:default-behavior>
+ </adm:property-override>
+ <adm:property name="num-worker-threads">
+ <adm:synopsis>
+ Specifies the number of worker threads to be used for processing
+ operations placed in the queue.
+ </adm:synopsis>
+ <adm:description>
+ If the value is increased,
+ the additional worker threads are created immediately. If the
+ value is reduced, the appropriate number of threads are destroyed
+ as operations complete processing.
+ </adm:description>
+ <adm:default-behavior>
+ <adm:alias>
+ <adm:synopsis>
+ Let the server decide.
+ </adm:synopsis>
+ </adm:alias>
+ </adm:default-behavior>
+ <adm:syntax>
+ <adm:integer lower-limit="1" upper-limit="2147483647" />
+ </adm:syntax>
+ <adm:profile name="ldap">
+ <ldap:attribute>
+ <ldap:name>ds-cfg-num-worker-threads</ldap:name>
+ </ldap:attribute>
+ </adm:profile>
+ </adm:property>
+</adm:managed-object>
diff --git a/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java b/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
new file mode 100644
index 0000000..d9b2e10
--- /dev/null
+++ b/opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
@@ -0,0 +1,605 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2006-2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.extensions;
+
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opends.messages.Message;
+import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.server.ParallelWorkQueueCfg;
+import org.opends.server.api.WorkQueue;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.monitors.ParallelWorkQueueMonitor;
+import org.opends.server.types.AbstractOperation;
+import org.opends.server.types.CancelRequest;
+import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.Operation;
+import org.opends.server.types.ResultCode;
+
+import static org.opends.messages.ConfigMessages.*;
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+
+
+/**
+ * This class defines a data structure for storing and interacting with the
+ * Directory Server work queue.
+ */
+public class ParallelWorkQueue
+ extends WorkQueue<ParallelWorkQueueCfg>
+ implements ConfigurationChangeListener<ParallelWorkQueueCfg>
+{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+
+
+
+ /**
+ * The maximum number of times to retry getting the next operation from the
+ * queue if an unexpected failure occurs.
+ */
+ private static final int MAX_RETRY_COUNT = 5;
+
+
+
+ // The set of worker threads that will be used to process this work queue.
+ private ArrayList<ParallelWorkerThread> workerThreads;
+
+ // The number of operations that have been submitted to the work queue for
+ // processing.
+ private AtomicLong opsSubmitted;
+
+ // Indicates whether one or more of the worker threads needs to be killed at
+ // the next convenient opportunity.
+ private boolean killThreads;
+
+ // Indicates whether the Directory Server is shutting down.
+ private boolean shutdownRequested;
+
+ // The thread number used for the last worker thread that was created.
+ private int lastThreadNumber;
+
+ // The number of worker threads that should be active (or will be shortly if
+ // a configuration change has not been completely applied).
+ private int numWorkerThreads;
+
+ // The queue that will be used to actually hold the pending operations.
+ private ConcurrentLinkedQueue<AbstractOperation> opQueue;
+
+ // The lock used to provide threadsafe access for the queue.
+ private final Object queueLock = new Object();
+
+
+ private final Semaphore queueSemaphore = new Semaphore(0, false);
+
+
+ /**
+ * Creates a new instance of this work queue. All initialization should be
+ * performed in the <CODE>initializeWorkQueue</CODE> method.
+ */
+ public ParallelWorkQueue()
+ {
+ // No implementation should be performed here.
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override()
+ public void initializeWorkQueue(ParallelWorkQueueCfg configuration)
+ throws ConfigException, InitializationException
+ {
+ shutdownRequested = false;
+ killThreads = false;
+ opsSubmitted = new AtomicLong(0);
+
+ // Register to be notified of any configuration changes.
+ configuration.addParallelChangeListener(this);
+
+ // Get the necessary configuration from the provided entry.
+ numWorkerThreads = getNumWorkerThreads(configuration);
+
+ // Create the actual work queue.
+ opQueue = new ConcurrentLinkedQueue<AbstractOperation>();
+
+ // Create the set of worker threads that should be used to service the
+ // work queue.
+ workerThreads = new ArrayList<ParallelWorkerThread>(numWorkerThreads);
+ for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
+ lastThreadNumber++)
+ {
+ ParallelWorkerThread t =
+ new ParallelWorkerThread(this, lastThreadNumber);
+ t.start();
+ workerThreads.add(t);
+ }
+
+
+ // Create and register a monitor provider for the work queue.
+ try
+ {
+ ParallelWorkQueueMonitor monitor =
+ new ParallelWorkQueueMonitor(this);
+ monitor.initializeMonitorProvider(null);
+ monitor.start();
+ DirectoryServer.registerMonitorProvider(monitor);
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+
+ Message message = ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR.get(
+ String.valueOf(ParallelWorkQueueMonitor.class), String.valueOf(e));
+ logError(message);
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override()
+ public void finalizeWorkQueue(Message reason)
+ {
+ shutdownRequested = true;
+
+
+ // Send responses to any operations in the pending queue to indicate that
+ // they won't be processed because the server is shutting down.
+ CancelRequest cancelRequest = new CancelRequest(true, reason);
+ ArrayList<Operation> pendingOperations = new ArrayList<Operation>();
+ opQueue.removeAll(pendingOperations);
+
+ for (Operation o : pendingOperations)
+ {
+ try
+ {
+ // The operation has no chance of responding to the cancel
+ // request so avoid waiting for a cancel response.
+ if (o.getCancelResult() == null) {
+ o.abort(cancelRequest);
+ }
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+
+ logError(WARN_QUEUE_UNABLE_TO_CANCEL.get(
+ String.valueOf(o), String.valueOf(e)));
+ }
+ }
+
+
+ // Notify all the worker threads of the shutdown.
+ for (ParallelWorkerThread t : workerThreads)
+ {
+ try
+ {
+ t.shutDown();
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+
+ logError(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD.get(
+ t.getName(), String.valueOf(e)));
+ }
+ }
+ }
+
+
+
+ /**
+ * Indicates whether this work queue has received a request to shut down.
+ *
+ * @return <CODE>true</CODE> if the work queue has recieved a request to shut
+ * down, or <CODE>false</CODE> if not.
+ */
+ public boolean shutdownRequested()
+ {
+ return shutdownRequested;
+ }
+
+
+
+ /**
+ * Submits an operation to be processed by one of the worker threads
+ * associated with this work queue.
+ *
+ * @param operation The operation to be processed.
+ *
+ * @throws DirectoryException If the provided operation is not accepted for
+ * some reason (e.g., if the server is shutting
+ * down or the pending operation queue is already
+ * at its maximum capacity).
+ */
+ @Override
+ public void submitOperation(AbstractOperation operation)
+ throws DirectoryException
+ {
+ if (shutdownRequested)
+ {
+ Message message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
+ throw new DirectoryException(ResultCode.UNAVAILABLE, message);
+ }
+
+ opQueue.add(operation);
+ queueSemaphore.release();
+
+ opsSubmitted.incrementAndGet();
+ }
+
+
+
+ /**
+ * Retrieves the next operation that should be processed by one of the worker
+ * threads, blocking if necessary until a new request arrives. This method
+ * should only be called by a worker thread associated with this work queue.
+ *
+ * @param workerThread The worker thread that is requesting the operation.
+ *
+ * @return The next operation that should be processed, or <CODE>null</CODE>
+ * if the server is shutting down and no more operations will be
+ * processed.
+ */
+ public AbstractOperation nextOperation(ParallelWorkerThread workerThread)
+ {
+ return retryNextOperation(workerThread, 0);
+ }
+
+
+
+ /**
+ * Retrieves the next operation that should be processed by one of the worker
+ * threads following a previous failure attempt. A maximum of five
+ * consecutive failures will be allowed before returning <CODE>null</CODE>,
+ * which will cause the associated thread to exit.
+ *
+ * @param workerThread The worker thread that is requesting the operation.
+ * @param numFailures The number of consecutive failures that the worker
+ * thread has experienced so far. If this gets too
+ * high, then this method will return <CODE>null</CODE>
+ * rather than retrying.
+ *
+ * @return The next operation that should be processed, or <CODE>null</CODE>
+ * if the server is shutting down and no more operations will be
+ * processed, or if there have been too many consecutive failures.
+ */
+ private AbstractOperation retryNextOperation(
+ ParallelWorkerThread workerThread,
+ int numFailures)
+ {
+ // See if we should kill off this thread. This could be necessary if the
+ // number of worker threads has been decreased with the server online. If
+ // so, then return null and the thread will exit.
+ if (killThreads)
+ {
+ synchronized (queueLock)
+ {
+ try
+ {
+ int currentThreads = workerThreads.size();
+ if (currentThreads > numWorkerThreads)
+ {
+ if (workerThreads.remove((ParallelWorkerThread)
+ Thread.currentThread()))
+ {
+ currentThreads--;
+ }
+
+ if (currentThreads <= numWorkerThreads)
+ {
+ killThreads = false;
+ }
+
+ workerThread.setStoppedByReducedThreadNumber();
+ return null;
+ }
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ }
+ }
+
+ if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
+ {
+ if (numFailures > MAX_RETRY_COUNT)
+ {
+ Message message = ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES.get(
+ Thread.currentThread().getName(), numFailures, MAX_RETRY_COUNT);
+ logError(message);
+ }
+
+ return null;
+ }
+
+ try
+ {
+ while (true)
+ {
+ AbstractOperation nextOperation = null;
+ if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
+ nextOperation = opQueue.poll();
+ }
+ if (nextOperation == null)
+ {
+ // There was no work to do in the specified length of time. See if
+ // we should shutdown, and if not then just check again.
+ if (shutdownRequested)
+ {
+ return null;
+ }
+ else if (killThreads)
+ {
+ synchronized (queueLock)
+ {
+ try
+ {
+ int currentThreads = workerThreads.size();
+ if (currentThreads > numWorkerThreads)
+ {
+ if (workerThreads.remove((ParallelWorkerThread)
+ Thread.currentThread()))
+ {
+ currentThreads--;
+ }
+
+ if (currentThreads <= numWorkerThreads)
+ {
+ killThreads = false;
+ }
+
+ workerThread.setStoppedByReducedThreadNumber();
+ return null;
+ }
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ return nextOperation;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+
+ // This should not happen. The only recourse we have is to log a message
+ // and try again.
+ logError(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION.get(
+ Thread.currentThread().getName(), String.valueOf(e)));
+ return retryNextOperation(workerThread, numFailures + 1);
+ }
+ }
+
+
+
+ /**
+ * Attempts to remove the specified operation from this queue if it has not
+ * yet been picked up for processing by one of the worker threads.
+ *
+ * @param operation The operation to remove from the queue.
+ *
+ * @return <CODE>true</CODE> if the provided request was present in the queue
+ * and was removed successfully, or <CODE>false</CODE> it not.
+ */
+ public boolean removeOperation(AbstractOperation operation)
+ {
+ return opQueue.remove(operation);
+ }
+
+
+
+ /**
+ * Retrieves the total number of operations that have been successfully
+ * submitted to this work queue for processing since server startup. This
+ * does not include operations that have been rejected for some reason like
+ * the queue already at its maximum capacity.
+ *
+ * @return The total number of operations that have been successfully
+ * submitted to this work queue since startup.
+ */
+ public long getOpsSubmitted()
+ {
+ return opsSubmitted.longValue();
+ }
+
+
+
+ /**
+ * Retrieves the number of pending operations in the queue that have not yet
+ * been picked up for processing. Note that this method is not a
+ * constant-time operation and can be relatively inefficient, so it should be
+ * used sparingly.
+ *
+ * @return The number of pending operations in the queue that have not yet
+ * been picked up for processing.
+ */
+ public int size()
+ {
+ return opQueue.size();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isConfigurationChangeAcceptable(
+ ParallelWorkQueueCfg configuration,
+ List<Message> unacceptableReasons)
+ {
+ // The provided configuration will always be acceptable.
+ return true;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public ConfigChangeResult applyConfigurationChange(
+ ParallelWorkQueueCfg configuration)
+ {
+ ArrayList<Message> resultMessages = new ArrayList<Message>();
+ int newNumThreads = getNumWorkerThreads(configuration);
+
+ // Apply a change to the number of worker threads if appropriate.
+ int currentThreads = workerThreads.size();
+ if (newNumThreads != currentThreads)
+ {
+ synchronized (queueLock)
+ {
+ try
+ {
+ int threadsToAdd = newNumThreads - currentThreads;
+ if (threadsToAdd > 0)
+ {
+ for (int i = 0; i < threadsToAdd; i++)
+ {
+ ParallelWorkerThread t =
+ new ParallelWorkerThread(this, lastThreadNumber++);
+ workerThreads.add(t);
+ t.start();
+ }
+
+ killThreads = false;
+ }
+ else
+ {
+ killThreads = true;
+ }
+
+ numWorkerThreads = newNumThreads;
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ }
+ }
+
+ return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override()
+ public boolean isIdle()
+ {
+ if (opQueue.size() > 0) {
+ return false;
+ }
+
+ synchronized (queueLock)
+ {
+ for (ParallelWorkerThread t : workerThreads)
+ {
+ if (t.isActive())
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+
+
+
+ // Determine the number of worker threads.
+ private int getNumWorkerThreads(ParallelWorkQueueCfg configuration)
+ {
+ if (configuration.getNumWorkerThreads() == null)
+ {
+ // Automatically choose based on the number of processors.
+ int cpus = Runtime.getRuntime().availableProcessors();
+ int value = Math.max(24, cpus * 2);
+
+ Message message = INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL.get(value);
+ logError(message);
+
+ return value;
+ }
+ else
+ {
+ return configuration.getNumWorkerThreads();
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java b/opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java
new file mode 100644
index 0000000..8a3b61f
--- /dev/null
+++ b/opends/src/server/org/opends/server/extensions/ParallelWorkerThread.java
@@ -0,0 +1,314 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2006-2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.extensions;
+import org.opends.messages.Message;
+
+
+
+import java.util.Map;
+
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.types.AbstractOperation;
+import org.opends.server.types.CancelRequest;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DisconnectReason;
+
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.util.StaticUtils.*;
+
+
+/**
+ * This class defines a data structure for storing and interacting with a
+ * Directory Server worker thread.
+ */
+public class ParallelWorkerThread
+ extends DirectoryThread
+{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ // Indicates whether the Directory Server is shutting down and this thread
+ // should stop running.
+ private boolean shutdownRequested;
+
+ // Indicates whether this thread was stopped because the server threadnumber
+ // was reduced.
+ private boolean stoppedByReducedThreadNumber;
+
+ // Indicates whether this thread is currently waiting for work.
+ private boolean waitingForWork;
+
+ // The operation that this worker thread is currently processing.
+ private AbstractOperation operation;
+
+ // The handle to the actual thread for this worker thread.
+ private Thread workerThread;
+
+ // The work queue that this worker thread will service.
+ private ParallelWorkQueue workQueue;
+
+
+
+ /**
+ * Creates a new worker thread that will service the provided work queue and
+ * process any new requests that are submitted.
+ *
+ * @param workQueue The work queue with which this worker thread is
+ * associated.
+ * @param threadID The thread ID for this worker thread.
+ */
+ public ParallelWorkerThread(ParallelWorkQueue workQueue, int threadID)
+ {
+ super("Worker Thread " + threadID);
+
+
+ this.workQueue = workQueue;
+
+ stoppedByReducedThreadNumber = false;
+ shutdownRequested = false;
+ waitingForWork = false;
+ operation = null;
+ workerThread = null;
+ }
+
+
+
+ /**
+ * Indicates that this thread is about to be stopped because the Directory
+ * Server configuration has been updated to reduce the number of worker
+ * threads.
+ */
+ public void setStoppedByReducedThreadNumber()
+ {
+ stoppedByReducedThreadNumber = true;
+ }
+
+
+
+ /**
+ * Indicates whether this worker thread is actively processing a request.
+ * Note that this is a point-in-time determination and if a reliable answer is
+ * expected then the server should impose some external constraint to ensure
+ * that no new requests are enqueued.
+ *
+ * @return {@code true} if this worker thread is actively processing a
+ * request, or {@code false} if it is idle.
+ */
+ public boolean isActive()
+ {
+ return (isAlive() && (operation != null));
+ }
+
+
+
+ /**
+ * Operates in a loop, retrieving the next request from the work queue,
+ * processing it, and then going back to the queue for more.
+ */
+ @Override
+ public void run()
+ {
+ workerThread = currentThread();
+
+ while (! shutdownRequested)
+ {
+ try
+ {
+ waitingForWork = true;
+ operation = null;
+ operation = workQueue.nextOperation(this);
+ waitingForWork = false;
+
+
+ if (operation == null)
+ {
+ // The operation may be null if the server is shutting down. If that
+ // is the case, then break out of the while loop.
+ break;
+ }
+ else
+ {
+ // The operation is not null, so process it. Make sure that when
+ // processing is complete.
+ operation.run();
+ operation.operationCompleted();
+ }
+ }
+ catch (Throwable t)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugWarning(
+ "Uncaught exception in worker thread while processing " +
+ "operation %s: %s", String.valueOf(operation), t);
+
+ TRACER.debugCaught(DebugLogLevel.ERROR, t);
+ }
+
+ try
+ {
+ Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.
+ get(getName(), String.valueOf(operation),
+ stackTraceToSingleLineString(t));
+ logError(message);
+
+ operation.setResultCode(DirectoryServer.getServerErrorResultCode());
+ operation.appendErrorMessage(message);
+ operation.getClientConnection().sendResponse(operation);
+ }
+ catch (Throwable t2)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugWarning(
+ "Exception in worker thread while trying to log a " +
+ "message about an uncaught exception %s: %s", t, t2);
+
+ TRACER.debugCaught(DebugLogLevel.ERROR, t2);
+ }
+ }
+
+
+ try
+ {
+ Message message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(),
+ String.valueOf(operation),
+ stackTraceToSingleLineString(t));
+
+ operation.disconnectClient(DisconnectReason.SERVER_ERROR,
+ true, message);
+ }
+ catch (Throwable t2)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, t2);
+ }
+ }
+ }
+ }
+
+ // If we have gotten here, then we presume that the server thread is
+ // shutting down. However, if that's not the case then that is a problem
+ // and we will want to log a message.
+ if (stoppedByReducedThreadNumber)
+ {
+ logError(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER.get(getName()));
+ }
+ else if (! workQueue.shutdownRequested())
+ {
+ logError(WARN_UNEXPECTED_WORKER_THREAD_EXIT.get(getName()));
+ }
+
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo(getName() + " exiting.");
+ }
+ }
+
+
+
+ /**
+ * Indicates that the Directory Server has received a request to stop running
+ * and that this thread should stop running as soon as possible.
+ */
+ public void shutDown()
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo(getName() + " being signaled to shut down.");
+ }
+
+ // Set a flag that indicates that the thread should stop running.
+ shutdownRequested = true;
+
+
+ // Check to see if the thread is waiting for work. If so, then interrupt
+ // it.
+ if (waitingForWork)
+ {
+ try
+ {
+ workerThread.interrupt();
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugWarning(
+ "Caught an exception while trying to interrupt the worker " +
+ "thread waiting for work: %s", e);
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ CancelRequest cancelRequest =
+ new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get());
+ operation.cancel(cancelRequest);
+ }
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugWarning(
+ "Caught an exception while trying to abandon the " +
+ "operation in progress for the worker thread: %s", e);
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Retrieves any relevent debug information with which this tread is
+ * associated so they can be included in debug messages.
+ *
+ * @return debug information about this thread as a string.
+ */
+ @Override
+ public Map<String, String> getDebugProperties()
+ {
+ Map<String, String> properties = super.getDebugProperties();
+ properties.put("clientConnection",
+ operation.getClientConnection().toString());
+ properties.put("operation", operation.toString());
+
+ return properties;
+ }
+}
+
diff --git a/opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java b/opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java
new file mode 100644
index 0000000..83066bb
--- /dev/null
+++ b/opends/src/server/org/opends/server/monitors/ParallelWorkQueueMonitor.java
@@ -0,0 +1,233 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2006-2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.monitors;
+
+
+
+import java.util.ArrayList;
+
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.AttributeSyntax;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.extensions.ParallelWorkQueue;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.InitializationException;
+
+
+
+
+/**
+ * This class defines a Directory Server monitor that can be used to provide
+ * information about the state of the work queue.
+ */
+public class ParallelWorkQueueMonitor
+ extends MonitorProvider<MonitorProviderCfg>
+{
+ /**
+ * The name to use for the monitor attribute that provides the current request
+ * backlog.
+ */
+ public static final String ATTR_CURRENT_BACKLOG = "currentRequestBacklog";
+
+
+
+ /**
+ * The name to use for the monitor attribute that provides the average request
+ * backlog.
+ */
+ public static final String ATTR_AVERAGE_BACKLOG = "averageRequestBacklog";
+
+
+
+ /**
+ * The name to use for the monitor attribute that provides the maximum
+ * observed request backlog.
+ */
+ public static final String ATTR_MAX_BACKLOG = "maxRequestBacklog";
+
+
+
+ /**
+ * The name to use for the monitor attribute that provides the total number of
+ * operations submitted.
+ */
+ public static final String ATTR_OPS_SUBMITTED = "requestsSubmitted";
+
+
+
+ // The maximum backlog observed by polling the queue.
+ private int maxBacklog;
+
+ // The total number of times the backlog has been polled.
+ private long numPolls;
+
+ // The total backlog observed from periodic polling.
+ private long totalBacklog;
+
+ // The parallel work queue instance with which this monitor is associated.
+ private ParallelWorkQueue workQueue;
+
+
+
+ /**
+ * Initializes this monitor provider. Note that no initialization should be
+ * done here, since it should be performed in the
+ * <CODE>initializeMonitorProvider</CODE> class.
+ *
+ * @param workQueue The work queue with which this monitor is associated.
+ */
+ public ParallelWorkQueueMonitor(ParallelWorkQueue workQueue)
+ {
+ super("Work Queue Monitor Provider");
+
+
+ this.workQueue = workQueue;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void initializeMonitorProvider(MonitorProviderCfg configuration)
+ throws ConfigException, InitializationException
+ {
+ maxBacklog = 0;
+ totalBacklog = 0;
+ numPolls = 0;
+ }
+
+
+
+ /**
+ * Retrieves the name of this monitor provider. It should be unique among all
+ * monitor providers, including all instances of the same monitor provider.
+ *
+ * @return The name of this monitor provider.
+ */
+ public String getMonitorInstanceName()
+ {
+ return "Work Queue";
+ }
+
+
+
+ /**
+ * Retrieves the length of time in milliseconds that should elapse between
+ * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero
+ * return value indicates that the <CODE>updateMonitorData()</CODE> method
+ * should not be periodically invoked.
+ *
+ * @return The length of time in milliseconds that should elapse between
+ * calls to the <CODE>updateMonitorData()</CODE> method.
+ */
+ public long getUpdateInterval()
+ {
+ // We will poll the work queue every 10 seconds.
+ return 10000;
+ }
+
+
+
+ /**
+ * Performs any processing periodic processing that may be desired to update
+ * the information associated with this monitor. Note that best-effort
+ * attempts will be made to ensure that calls to this method come
+ * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
+ * be made.
+ */
+ public void updateMonitorData()
+ {
+ int backlog = workQueue.size();
+ totalBacklog += backlog;
+ numPolls++;
+
+ if (backlog > maxBacklog)
+ {
+ maxBacklog = backlog;
+ }
+ }
+
+
+
+ /**
+ * Retrieves a set of attributes containing monitor data that should be
+ * returned to the client if the corresponding monitor entry is requested.
+ *
+ * @return A set of attributes containing monitor data that should be
+ * returned to the client if the corresponding monitor entry is
+ * requested.
+ */
+ public ArrayList<Attribute> getMonitorData()
+ {
+ int backlog = workQueue.size();
+ totalBacklog += backlog;
+ numPolls++;
+ if (backlog > maxBacklog)
+ {
+ maxBacklog = backlog;
+ }
+
+ long averageBacklog = (long) (1.0 * totalBacklog / numPolls);
+
+ long opsSubmitted = workQueue.getOpsSubmitted();
+
+ ArrayList<Attribute> monitorAttrs = new ArrayList<Attribute>();
+ AttributeSyntax<?> integerSyntax = DirectoryServer
+ .getDefaultIntegerSyntax();
+
+ // The current backlog.
+ AttributeType attrType = DirectoryServer.getDefaultAttributeType(
+ ATTR_CURRENT_BACKLOG, integerSyntax);
+ monitorAttrs
+ .add(Attributes.create(attrType, String.valueOf(backlog)));
+
+ // The average backlog.
+ attrType = DirectoryServer.getDefaultAttributeType(ATTR_AVERAGE_BACKLOG,
+ integerSyntax);
+ monitorAttrs.add(Attributes.create(attrType, String
+ .valueOf(averageBacklog)));
+
+ // The maximum backlog.
+ attrType = DirectoryServer.getDefaultAttributeType(ATTR_MAX_BACKLOG,
+ integerSyntax);
+ monitorAttrs.add(Attributes.create(attrType, String
+ .valueOf(maxBacklog)));
+
+ // The total number of operations submitted.
+ attrType = DirectoryServer.getDefaultAttributeType(ATTR_OPS_SUBMITTED,
+ integerSyntax);
+ monitorAttrs.add(Attributes.create(attrType, String
+ .valueOf(opsSubmitted)));
+
+ return monitorAttrs;
+ }
+}
--
Gitblit v1.10.0