| | |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * The maximum number of times to retry getting the next operation from the |
| | | * queue if an unexpected failure occurs. |
| | | */ |
| | | private static final int MAX_RETRY_COUNT = 5; |
| | | |
| | | |
| | | |
| | | /** The set of worker threads that will be used to process this work queue. */ |
| | | private ArrayList<ParallelWorkerThread> workerThreads; |
| | | |
| | | /** |
| | | * The number of operations that have been submitted to the work queue for |
| | | * processing. |
| | | */ |
| | | /** The number of operations that have been submitted to the work queue for processing. */ |
| | | private AtomicLong opsSubmitted; |
| | | |
| | | /** |
| | |
| | | /** The lock used to provide threadsafe access for the queue. */ |
| | | private final Object queueLock = new Object(); |
| | | |
| | | |
| | | private final Semaphore queueSemaphore = new Semaphore(0, false); |
| | | |
| | | |
| | | /** |
| | | * Creates a new instance of this work queue. All initialization should be |
| | | * performed in the <CODE>initializeWorkQueue</CODE> method. |
| | |
| | | // No implementation should be performed here. |
| | | } |
| | | |
| | | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initializeWorkQueue(ParallelWorkQueueCfg configuration) |
| | | throws ConfigException, InitializationException |
| | |
| | | workerThreads.add(t); |
| | | } |
| | | |
| | | |
| | | // Create and register a monitor provider for the work queue. |
| | | try |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void finalizeWorkQueue(LocalizableMessage reason) |
| | | { |
| | | shutdownRequested = true; |
| | | |
| | | |
| | | // Send responses to any operations in the pending queue to indicate that |
| | | // they won't be processed because the server is shutting down. |
| | | CancelRequest cancelRequest = new CancelRequest(true, reason); |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | // Notify all the worker threads of the shutdown. |
| | | for (ParallelWorkerThread t : workerThreads) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Indicates whether this work queue has received a request to shut down. |
| | | * |
| | |
| | | return shutdownRequested; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Submits an operation to be processed by one of the worker threads |
| | | * associated with this work queue. |
| | |
| | | opsSubmitted.incrementAndGet(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean trySubmitOperation(Operation operation) |
| | | throws DirectoryException |
| | |
| | | return true; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Retrieves the next operation that should be processed by one of the worker |
| | | * threads, blocking if necessary until a new request arrives. This method |
| | |
| | | return retryNextOperation(workerThread, 0); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Retrieves the next operation that should be processed by one of the worker |
| | | * threads following a previous failure attempt. A maximum of five |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Attempts to remove the specified operation from this queue if it has not |
| | | * yet been picked up for processing by one of the worker threads. |
| | |
| | | return opQueue.remove(operation); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Retrieves the total number of operations that have been successfully |
| | | * submitted to this work queue for processing since server startup. This |
| | |
| | | return opsSubmitted.longValue(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Retrieves the number of pending operations in the queue that have not yet |
| | | * been picked up for processing. Note that this method is not a |
| | |
| | | return opQueue.size(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean isConfigurationChangeAcceptable( |
| | | ParallelWorkQueueCfg configuration, |
| | |
| | | return true; |
| | | } |
| | | |
| | | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | ParallelWorkQueueCfg configuration) |
| | |
| | | return new ConfigChangeResult(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean isIdle() |
| | | { |