| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.extensions; |
| | | import org.opends.messages.Message; |
| | | |
| | | |
| | | |
| | |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.server.TraditionalWorkQueueCfg; |
| | | import org.opends.server.api.WorkQueue; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.monitors.TraditionalWorkQueueMonitor; |
| | | import org.opends.server.types.AbstractOperation; |
| | | import org.opends.server.types.CancelRequest; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.DebugLogLevel; |
| | |
| | | import org.opends.server.types.Operation; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.ConfigMessages.*; |
| | | import static org.opends.messages.CoreMessages.*; |
| | | import org.opends.server.types.AbstractOperation; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | |
| | | |
| | |
| | | private LinkedBlockingQueue<AbstractOperation> opQueue; |
| | | |
| | | // The lock used to provide threadsafe access for the queue. |
| | | private ReentrantLock queueLock; |
| | | private Object queueLock; |
| | | |
| | | |
| | | |
| | |
| | | killThreads = false; |
| | | opsSubmitted = new AtomicLong(0); |
| | | queueFullRejects = new AtomicLong(0); |
| | | queueLock = new ReentrantLock(); |
| | | queueLock = new Object(); |
| | | |
| | | |
| | | // Register to be notified of any configuration changes. |
| | |
| | | // so, then return null and the thread will exit. |
| | | if (killThreads) |
| | | { |
| | | queueLock.lock(); |
| | | |
| | | try |
| | | synchronized (queueLock) |
| | | { |
| | | int currentThreads = workerThreads.size(); |
| | | if (currentThreads > numWorkerThreads) |
| | | try |
| | | { |
| | | if (workerThreads.remove(Thread.currentThread())) |
| | | int currentThreads = workerThreads.size(); |
| | | if (currentThreads > numWorkerThreads) |
| | | { |
| | | currentThreads--; |
| | | } |
| | | if (workerThreads.remove(Thread.currentThread())) |
| | | { |
| | | currentThreads--; |
| | | } |
| | | |
| | | if (currentThreads <= numWorkerThreads) |
| | | { |
| | | killThreads = false; |
| | | } |
| | | if (currentThreads <= numWorkerThreads) |
| | | { |
| | | killThreads = false; |
| | | } |
| | | |
| | | workerThread.setStoppedByReducedThreadNumber(); |
| | | return null; |
| | | workerThread.setStoppedByReducedThreadNumber(); |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | catch (Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | queueLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT)) |
| | |
| | | } |
| | | else if (killThreads) |
| | | { |
| | | queueLock.lock(); |
| | | |
| | | try |
| | | synchronized (queueLock) |
| | | { |
| | | int currentThreads = workerThreads.size(); |
| | | if (currentThreads > numWorkerThreads) |
| | | try |
| | | { |
| | | if (workerThreads.remove(Thread.currentThread())) |
| | | int currentThreads = workerThreads.size(); |
| | | if (currentThreads > numWorkerThreads) |
| | | { |
| | | currentThreads--; |
| | | } |
| | | if (workerThreads.remove(Thread.currentThread())) |
| | | { |
| | | currentThreads--; |
| | | } |
| | | |
| | | if (currentThreads <= numWorkerThreads) |
| | | { |
| | | killThreads = false; |
| | | } |
| | | if (currentThreads <= numWorkerThreads) |
| | | { |
| | | killThreads = false; |
| | | } |
| | | |
| | | workerThread.setStoppedByReducedThreadNumber(); |
| | | return null; |
| | | workerThread.setStoppedByReducedThreadNumber(); |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | catch (Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | queueLock.unlock(); |
| | | } |
| | | } |
| | | } |
| | | else |
| | |
| | | int currentThreads = workerThreads.size(); |
| | | if (newNumThreads != currentThreads) |
| | | { |
| | | queueLock.lock(); |
| | | |
| | | try |
| | | synchronized (queueLock) |
| | | { |
| | | int threadsToAdd = newNumThreads - currentThreads; |
| | | if (threadsToAdd > 0) |
| | | try |
| | | { |
| | | for (int i=0; i < threadsToAdd; i++) |
| | | int threadsToAdd = newNumThreads - currentThreads; |
| | | if (threadsToAdd > 0) |
| | | { |
| | | TraditionalWorkerThread t = |
| | | new TraditionalWorkerThread(this, lastThreadNumber++); |
| | | workerThreads.add(t); |
| | | t.start(); |
| | | for (int i=0; i < threadsToAdd; i++) |
| | | { |
| | | TraditionalWorkerThread t = |
| | | new TraditionalWorkerThread(this, lastThreadNumber++); |
| | | workerThreads.add(t); |
| | | t.start(); |
| | | } |
| | | |
| | | killThreads = false; |
| | | } |
| | | else |
| | | { |
| | | killThreads = true; |
| | | } |
| | | |
| | | killThreads = false; |
| | | numWorkerThreads = newNumThreads; |
| | | } |
| | | else |
| | | catch (Exception e) |
| | | { |
| | | killThreads = true; |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | |
| | | numWorkerThreads = newNumThreads; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | queueLock.unlock(); |
| | | } |
| | | } |
| | | |
| | |
| | | // checks will be against the new queue. |
| | | if (newMaxCapacity != maxCapacity) |
| | | { |
| | | queueLock.lock(); |
| | | |
| | | try |
| | | synchronized (queueLock) |
| | | { |
| | | LinkedBlockingQueue<AbstractOperation> newOpQueue; |
| | | if (newMaxCapacity > 0) |
| | | try |
| | | { |
| | | newOpQueue = |
| | | new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity); |
| | | } |
| | | else |
| | | { |
| | | newOpQueue = new LinkedBlockingQueue<AbstractOperation>(); |
| | | } |
| | | |
| | | LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue; |
| | | opQueue = newOpQueue; |
| | | |
| | | LinkedList<AbstractOperation> pendingOps = |
| | | new LinkedList<AbstractOperation>(); |
| | | oldOpQueue.drainTo(pendingOps); |
| | | |
| | | |
| | | // We have to be careful when adding any existing pending operations |
| | | // because the new capacity could be less than what was already |
| | | // backlogged in the previous queue. If that happens, we may have to |
| | | // loop a few times to get everything in there. |
| | | while (! pendingOps.isEmpty()) |
| | | { |
| | | Iterator<AbstractOperation> iterator = pendingOps.iterator(); |
| | | while (iterator.hasNext()) |
| | | LinkedBlockingQueue<AbstractOperation> newOpQueue; |
| | | if (newMaxCapacity > 0) |
| | | { |
| | | AbstractOperation o = iterator.next(); |
| | | try |
| | | newOpQueue = |
| | | new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity); |
| | | } |
| | | else |
| | | { |
| | | newOpQueue = new LinkedBlockingQueue<AbstractOperation>(); |
| | | } |
| | | |
| | | LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue; |
| | | opQueue = newOpQueue; |
| | | |
| | | LinkedList<AbstractOperation> pendingOps = |
| | | new LinkedList<AbstractOperation>(); |
| | | oldOpQueue.drainTo(pendingOps); |
| | | |
| | | |
| | | // We have to be careful when adding any existing pending operations |
| | | // because the new capacity could be less than what was already |
| | | // backlogged in the previous queue. If that happens, we may have to |
| | | // loop a few times to get everything in there. |
| | | while (! pendingOps.isEmpty()) |
| | | { |
| | | Iterator<AbstractOperation> iterator = pendingOps.iterator(); |
| | | while (iterator.hasNext()) |
| | | { |
| | | if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS)) |
| | | AbstractOperation o = iterator.next(); |
| | | try |
| | | { |
| | | iterator.remove(); |
| | | if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS)) |
| | | { |
| | | iterator.remove(); |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException ie) |
| | | { |
| | | if (debugEnabled()) |
| | | catch (InterruptedException ie) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, ie); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, ie); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | maxCapacity = newMaxCapacity; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | maxCapacity = newMaxCapacity; |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | queueLock.unlock(); |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | return false; |
| | | } |
| | | |
| | | queueLock.lock(); |
| | | |
| | | try |
| | | synchronized (queueLock) |
| | | { |
| | | for (TraditionalWorkerThread t : workerThreads) |
| | | { |
| | |
| | | |
| | | return true; |
| | | } |
| | | finally |
| | | { |
| | | queueLock.unlock(); |
| | | } |
| | | } |
| | | } |
| | | |