| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2013 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.extensions; |
| | | |
| | |
| | | |
| | | |
| | | |
| | | // The set of worker threads that will be used to process this work queue. |
| | | /** The set of worker threads that will be used to process this work queue. */ |
| | | private ArrayList<ParallelWorkerThread> workerThreads; |
| | | |
| | | // The number of operations that have been submitted to the work queue for |
| | | // processing. |
| | | /** |
| | | * The number of operations that have been submitted to the work queue for |
| | | * processing. |
| | | */ |
| | | private AtomicLong opsSubmitted; |
| | | |
| | | // Indicates whether one or more of the worker threads needs to be killed at |
| | | // the next convenient opportunity. |
| | | /** |
| | | * Indicates whether one or more of the worker threads needs to be killed at |
| | | * the next convenient opportunity. |
| | | */ |
| | | private boolean killThreads; |
| | | |
| | | // Indicates whether the Directory Server is shutting down. |
| | | /** Indicates whether the Directory Server is shutting down. */ |
| | | private boolean shutdownRequested; |
| | | |
| | | // The thread number used for the last worker thread that was created. |
| | | /** The thread number used for the last worker thread that was created. */ |
| | | private int lastThreadNumber; |
| | | |
| | | // The number of worker threads that should be active (or will be shortly if |
| | | // a configuration change has not been completely applied). |
| | | /** |
| | | * The number of worker threads that should be active (or will be shortly if a |
| | | * configuration change has not been completely applied). |
| | | */ |
| | | private int numWorkerThreads; |
| | | |
| | | // The queue that will be used to actually hold the pending operations. |
| | | private ConcurrentLinkedQueue<AbstractOperation> opQueue; |
| | | /** The queue that will be used to actually hold the pending operations. */ |
| | | private ConcurrentLinkedQueue<Operation> opQueue; |
| | | |
| | | // The lock used to provide threadsafe access for the queue. |
| | | /** The lock used to provide threadsafe access for the queue. */ |
| | | private final Object queueLock = new Object(); |
| | | |
| | | |
| | |
| | | numWorkerThreads = getNumWorkerThreads(configuration); |
| | | |
| | | // Create the actual work queue. |
| | | opQueue = new ConcurrentLinkedQueue<AbstractOperation>(); |
| | | opQueue = new ConcurrentLinkedQueue<Operation>(); |
| | | |
| | | // Create the set of worker threads that should be used to service the |
| | | // work queue. |
| | |
| | | * at its maximum capacity). |
| | | */ |
| | | @Override |
| | | public void submitOperation(AbstractOperation operation) |
| | | throws DirectoryException |
| | | public void submitOperation(Operation operation) throws DirectoryException |
| | | { |
| | | if (shutdownRequested) |
| | | { |
| | |
| | | * if the server is shutting down and no more operations will be |
| | | * processed. |
| | | */ |
| | | public AbstractOperation nextOperation(ParallelWorkerThread workerThread) |
| | | public Operation nextOperation(ParallelWorkerThread workerThread) |
| | | { |
| | | return retryNextOperation(workerThread, 0); |
| | | } |
| | |
| | | * if the server is shutting down and no more operations will be |
| | | * processed, or if there have been too many consecutive failures. |
| | | */ |
| | | private AbstractOperation retryNextOperation( |
| | | private Operation retryNextOperation( |
| | | ParallelWorkerThread workerThread, |
| | | int numFailures) |
| | | { |
| | |
| | | int currentThreads = workerThreads.size(); |
| | | if (currentThreads > numWorkerThreads) |
| | | { |
| | | if (workerThreads.remove((ParallelWorkerThread) |
| | | Thread.currentThread())) |
| | | if (workerThreads.remove(Thread.currentThread())) |
| | | { |
| | | currentThreads--; |
| | | } |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | AbstractOperation nextOperation = null; |
| | | Operation nextOperation = null; |
| | | if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) { |
| | | nextOperation = opQueue.poll(); |
| | | } |
| | |
| | | int currentThreads = workerThreads.size(); |
| | | if (currentThreads > numWorkerThreads) |
| | | { |
| | | if (workerThreads.remove((ParallelWorkerThread) |
| | | Thread.currentThread())) |
| | | if (workerThreads.remove(Thread.currentThread())) |
| | | { |
| | | currentThreads--; |
| | | } |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean isConfigurationChangeAcceptable( |
| | | ParallelWorkQueueCfg configuration, |
| | | List<Message> unacceptableReasons) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | ParallelWorkQueueCfg configuration) |
| | | { |