| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2013 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.extensions; |
| | | |
| | |
| | | |
| | | import static org.opends.messages.ConfigMessages.*; |
| | | import static org.opends.messages.CoreMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | |
| | | */ |
| | | private static final int MAX_RETRY_COUNT = 5; |
| | | |
| | | // The set of worker threads that will be used to process this work queue. |
| | | /** The set of worker threads that will be used to process this work queue. */ |
| | | private final ArrayList<TraditionalWorkerThread> workerThreads = |
| | | new ArrayList<TraditionalWorkerThread>(); |
| | | |
| | | // The number of operations that have been submitted to the work queue for |
| | | // processing. |
| | | /** |
| | | * The number of operations that have been submitted to the work queue for |
| | | * processing. |
| | | */ |
| | | private AtomicLong opsSubmitted; |
| | | |
| | | // The number of times that an attempt to submit a new request has been |
| | | // rejected because the work queue is already at its maximum capacity. |
| | | /** |
| | | * The number of times that an attempt to submit a new request has been |
| | | * rejected because the work queue is already at its maximum capacity. |
| | | */ |
| | | private AtomicLong queueFullRejects; |
| | | |
| | | // Indicates whether one or more of the worker threads needs to be killed at |
| | | // the next convenient opportunity. |
| | | /** |
| | | * Indicates whether one or more of the worker threads needs to be killed at |
| | | * the next convenient opportunity. |
| | | */ |
| | | private boolean killThreads; |
| | | |
| | | // Indicates whether the Directory Server is shutting down. |
| | | /** Indicates whether the Directory Server is shutting down. */ |
| | | private boolean shutdownRequested; |
| | | |
| | | // The thread number used for the last worker thread that was created. |
| | | /** The thread number used for the last worker thread that was created. */ |
| | | private int lastThreadNumber; |
| | | |
| | | // The maximum number of pending requests that this work queue will allow |
| | | // before it will start rejecting them. |
| | | /** |
| | | * The maximum number of pending requests that this work queue will allow |
| | | * before it will start rejecting them. |
| | | */ |
| | | private int maxCapacity; |
| | | |
| | | // The number of worker threads that should be active (or will be shortly if |
| | | // a configuration change has not been completely applied). |
| | | /** |
| | | * The number of worker threads that should be active (or will be shortly if a |
| | | * configuration change has not been completely applied). |
| | | */ |
| | | private int numWorkerThreads; |
| | | |
| | | // The queue overflow policy: true indicates that operations will be blocked |
| | | // until the queue has available capacity, otherwise operations will be |
| | | // rejected. |
| | | // |
| | | // This is hard-coded to true for now because a reject on full policy does |
| | | // not seem to have a valid use case. |
| | | // |
| | | /** |
| | | * The queue overflow policy: true indicates that operations will be blocked |
| | | * until the queue has available capacity, otherwise operations will be |
| | | * rejected. |
| | | * <p> |
| | | * This is hard-coded to true for now because a reject on full policy does not |
| | | * seem to have a valid use case. |
| | | * </p> |
| | | */ |
| | | private final boolean isBlocking = true; |
| | | |
| | | // The queue that will be used to actually hold the pending operations. |
| | | private LinkedBlockingQueue<AbstractOperation> opQueue; |
| | | /** The queue that will be used to actually hold the pending operations. */ |
| | | private LinkedBlockingQueue<Operation> opQueue; |
| | | |
| | | // The locks used to provide threadsafe access for the queue. |
| | | |
| | | // Used for non-config changes. |
| | | /** |
| | | * The lock used to provide threadsafe access for the queue, used for |
| | | * non-config changes. |
| | | */ |
| | | private final ReadLock queueReadLock; |
| | | |
| | | // Used for config changes. |
| | | /** |
| | | * The lock used to provide threadsafe access for the queue, used for config |
| | | * changes. |
| | | */ |
| | | private final WriteLock queueWriteLock; |
| | | { |
| | | ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| | |
| | | // Create the actual work queue. |
| | | if (maxCapacity > 0) |
| | | { |
| | | opQueue = new LinkedBlockingQueue<AbstractOperation>(maxCapacity); |
| | | opQueue = new LinkedBlockingQueue<Operation>(maxCapacity); |
| | | } |
| | | else |
| | | { |
| | | // This will never be the case, since the configuration definition |
| | | // ensures that the capacity is always finite. |
| | | opQueue = new LinkedBlockingQueue<AbstractOperation>(); |
| | | opQueue = new LinkedBlockingQueue<Operation>(); |
| | | } |
| | | |
| | | // Create the set of worker threads that should be used to service the |
| | |
| | | * already at its maximum capacity). |
| | | */ |
| | | @Override |
| | | public void submitOperation(AbstractOperation operation) |
| | | throws DirectoryException |
| | | public void submitOperation(Operation operation) throws DirectoryException |
| | | { |
| | | queueReadLock.lock(); |
| | | try |
| | |
| | | * if the server is shutting down and no more operations will be |
| | | * processed. |
| | | */ |
| | | public AbstractOperation nextOperation(TraditionalWorkerThread workerThread) |
| | | public Operation nextOperation(TraditionalWorkerThread workerThread) |
| | | { |
| | | return retryNextOperation(workerThread, 0); |
| | | } |
| | |
| | | * if the server is shutting down and no more operations will be |
| | | * processed, or if there have been too many consecutive failures. |
| | | */ |
| | | private AbstractOperation retryNextOperation( |
| | | TraditionalWorkerThread workerThread, int numFailures) |
| | | private Operation retryNextOperation(TraditionalWorkerThread workerThread, |
| | | int numFailures) |
| | | { |
| | | // See if we should kill off this thread. This could be necessary if the |
| | | // number of worker threads has been decreased with the server online. If |
| | |
| | | |
| | | while (true) |
| | | { |
| | | AbstractOperation nextOperation = opQueue.poll(5, TimeUnit.SECONDS); |
| | | Operation nextOperation = opQueue.poll(5, TimeUnit.SECONDS); |
| | | if (nextOperation != null) |
| | | { |
| | | return nextOperation; |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean isConfigurationChangeAcceptable( |
| | | TraditionalWorkQueueCfg configuration, List<Message> unacceptableReasons) |
| | | { |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | TraditionalWorkQueueCfg configuration) |
| | | { |
| | |
| | | { |
| | | // First switch the queue with the exclusive lock. |
| | | queueWriteLock.lock(); |
| | | LinkedBlockingQueue<AbstractOperation> oldOpQueue; |
| | | LinkedBlockingQueue<Operation> oldOpQueue; |
| | | try |
| | | { |
| | | LinkedBlockingQueue<AbstractOperation> newOpQueue = null; |
| | | LinkedBlockingQueue<Operation> newOpQueue = null; |
| | | if (newMaxCapacity > 0) |
| | | { |
| | | newOpQueue = new LinkedBlockingQueue<AbstractOperation>( |
| | | newOpQueue = new LinkedBlockingQueue<Operation>( |
| | | newMaxCapacity); |
| | | } |
| | | else |
| | | { |
| | | newOpQueue = new LinkedBlockingQueue<AbstractOperation>(); |
| | | newOpQueue = new LinkedBlockingQueue<Operation>(); |
| | | } |
| | | |
| | | oldOpQueue = opQueue; |
| | |
| | | } |
| | | |
| | | // Now resubmit any pending requests - we'll need the shared lock. |
| | | AbstractOperation pendingOperation = null; |
| | | Operation pendingOperation = null; |
| | | queueReadLock.lock(); |
| | | try |
| | | { |
| | |
| | | } |
| | | while ((pendingOperation = oldOpQueue.poll()) != null) |
| | | { |
| | | if (pendingOperation != null) |
| | | { |
| | | pendingOperation.abort(cancelRequest); |
| | | } |
| | | pendingOperation.abort(cancelRequest); |
| | | } |
| | | } |
| | | finally |