| | |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import org.opends.server.api.ConfigurableComponent; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.server.TraditionalWorkQueueCfg; |
| | | import org.opends.server.api.WorkQueue; |
| | | import org.opends.server.config.ConfigAttribute; |
| | | import org.opends.server.config.ConfigEntry; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.config.IntegerConfigAttribute; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.monitors.TraditionalWorkQueueMonitor; |
| | | import org.opends.server.types.CancelRequest; |
| | |
| | | * Directory Server work queue. |
| | | */ |
| | | public class TraditionalWorkQueue |
| | | extends WorkQueue |
| | | implements ConfigurableComponent |
| | | extends WorkQueue<TraditionalWorkQueueCfg> |
| | | implements ConfigurationChangeListener<TraditionalWorkQueueCfg> |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void initializeWorkQueue(ConfigEntry configEntry) |
| | | @Override() |
| | | public void initializeWorkQueue(TraditionalWorkQueueCfg configuration) |
| | | throws ConfigException, InitializationException |
| | | { |
| | | shutdownRequested = false; |
| | |
| | | queueLock = new ReentrantLock(); |
| | | |
| | | |
| | | // Register to be notified of any configuration changes. |
| | | configuration.addTraditionalChangeListener(this); |
| | | |
| | | |
| | | // Get the necessary configuration from the provided entry. |
| | | configEntryDN = configEntry.getDN(); |
| | | |
| | | int msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_NUM_THREADS; |
| | | IntegerConfigAttribute numThreadsStub = |
| | | new IntegerConfigAttribute(ATTR_NUM_WORKER_THREADS, getMessage(msgID), |
| | | true, false, false, true, 1, false, 0, |
| | | DEFAULT_NUM_WORKER_THREADS); |
| | | try |
| | | { |
| | | IntegerConfigAttribute numThreadsAttr = |
| | | (IntegerConfigAttribute) |
| | | configEntry.getConfigAttribute(numThreadsStub); |
| | | if (numThreadsAttr == null) |
| | | { |
| | | numWorkerThreads = DEFAULT_NUM_WORKER_THREADS; |
| | | } |
| | | else |
| | | { |
| | | numWorkerThreads = numThreadsAttr.activeIntValue(); |
| | | if (numWorkerThreads <= 0) |
| | | { |
| | | //This is not valid. The number of worker threads must be a positive |
| | | // integer. |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_NUM_THREADS_INVALID_VALUE; |
| | | String message = getMessage(msgID, |
| | | String.valueOf(configEntryDN), |
| | | numWorkerThreads); |
| | | logError(ErrorLogCategory.CONFIGURATION, |
| | | ErrorLogSeverity.SEVERE_WARNING, message, msgID); |
| | | numWorkerThreads = DEFAULT_NUM_WORKER_THREADS; |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_NUM_WORKER_THREADS; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | String.valueOf(e)); |
| | | logError(ErrorLogCategory.CONFIGURATION, ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | |
| | | numWorkerThreads = DEFAULT_NUM_WORKER_THREADS; |
| | | } |
| | | |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_MAX_CAPACITY; |
| | | IntegerConfigAttribute capacityStub = |
| | | new IntegerConfigAttribute(ATTR_MAX_WORK_QUEUE_CAPACITY, |
| | | getMessage(msgID), true, false, false, true, |
| | | 0, false, 0, |
| | | DEFAULT_MAX_WORK_QUEUE_CAPACITY); |
| | | try |
| | | { |
| | | IntegerConfigAttribute capacityAttr = |
| | | (IntegerConfigAttribute) |
| | | configEntry.getConfigAttribute(capacityStub); |
| | | if (capacityAttr == null) |
| | | { |
| | | maxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY; |
| | | } |
| | | else |
| | | { |
| | | maxCapacity = capacityAttr.activeIntValue(); |
| | | if (maxCapacity < 0) |
| | | { |
| | | // This is not valid. The maximum capacity must be greater than or |
| | | // equal to zero. |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CAPACITY_INVALID_VALUE; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | maxCapacity); |
| | | logError(ErrorLogCategory.CONFIGURATION, |
| | | ErrorLogSeverity.SEVERE_WARNING, message, msgID); |
| | | |
| | | maxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY; |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_QUEUE_CAPACITY; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | String.valueOf(e)); |
| | | logError(ErrorLogCategory.CONFIGURATION, ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | |
| | | maxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY; |
| | | } |
| | | configEntryDN = configuration.dn(); |
| | | numWorkerThreads = configuration.getNumWorkerThreads(); |
| | | maxCapacity = configuration.getMaxWorkQueueCapacity(); |
| | | |
| | | |
| | | // Create the actual work queue. |
| | |
| | | } |
| | | |
| | | |
| | | // Register with the Directory Server as a configurable component. |
| | | DirectoryServer.registerConfigurableComponent(this); |
| | | |
| | | |
| | | // Create and register a monitor provider for the work queue. |
| | | try |
| | | { |
| | |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR; |
| | | int msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR; |
| | | String message = getMessage(msgID, TraditionalWorkQueueMonitor.class, |
| | | String.valueOf(e)); |
| | | logError(ErrorLogCategory.CORE_SERVER, ErrorLogSeverity.SEVERE_ERROR, |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override() |
| | | public void finalizeWorkQueue(String reason) |
| | | { |
| | | shutdownRequested = true; |
| | |
| | | * down or the pending operation queue is already |
| | | * at its maximum capacity). |
| | | */ |
| | | @Override() |
| | | public void submitOperation(Operation operation) |
| | | throws DirectoryException |
| | | { |
| | |
| | | |
| | | |
| | | /** |
| | | * Retrieves the DN of the configuration entry with which this component is |
| | | * associated. |
| | | * |
| | | * @return The DN of the configuration entry with which this component is |
| | | * associated. |
| | | * {@inheritDoc} |
| | | */ |
| | | public DN getConfigurableComponentEntryDN() |
| | | @Override() |
| | | public boolean isConfigurationChangeAcceptable( |
| | | TraditionalWorkQueueCfg configuration, |
| | | List<String> unacceptableReasons) |
| | | { |
| | | return configEntryDN; |
| | | // The provided configuration will always be acceptable. |
| | | return true; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Retrieves the set of configuration attributes that are associated with this |
| | | * configurable component. |
| | | * |
| | | * @return The set of configuration attributes that are associated with this |
| | | * configurable component. |
| | | * {@inheritDoc} |
| | | */ |
| | | public List<ConfigAttribute> getConfigurationAttributes() |
| | | { |
| | | LinkedList<ConfigAttribute> attrList = new LinkedList<ConfigAttribute>(); |
| | | |
| | | |
| | | int msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_NUM_THREADS; |
| | | IntegerConfigAttribute numThreadsAttr = |
| | | new IntegerConfigAttribute(ATTR_NUM_WORKER_THREADS, getMessage(msgID), |
| | | true, false, false, true, 1, false, 0, |
| | | workerThreads.size()); |
| | | attrList.add(numThreadsAttr); |
| | | |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_MAX_CAPACITY; |
| | | IntegerConfigAttribute capacityAttr = |
| | | new IntegerConfigAttribute(ATTR_MAX_WORK_QUEUE_CAPACITY, |
| | | getMessage(msgID), true, false, false, true, |
| | | 0, false, 0, maxCapacity); |
| | | attrList.add(capacityAttr); |
| | | |
| | | |
| | | return attrList; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Indicates whether the provided configuration entry has an acceptable |
| | | * configuration for this component. If it does not, then detailed |
| | | * information about the problem(s) should be added to the provided list. |
| | | * |
| | | * @param configEntry The configuration entry for which to make the |
| | | * determination. |
| | | * @param unacceptableReasons A list that can be used to hold messages about |
| | | * why the provided entry does not have an |
| | | * acceptable configuration. |
| | | * |
| | | * @return <CODE>true</CODE> if the provided entry has an acceptable |
| | | * configuration for this component, or <CODE>false</CODE> if not. |
| | | */ |
| | | public boolean hasAcceptableConfiguration(ConfigEntry configEntry, |
| | | List<String> unacceptableReasons) |
| | | { |
| | | boolean configIsAcceptable = true; |
| | | |
| | | |
| | | // Check the configuration for the number of worker threads. |
| | | int msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_NUM_THREADS; |
| | | IntegerConfigAttribute numThreadsStub = |
| | | new IntegerConfigAttribute(ATTR_NUM_WORKER_THREADS, getMessage(msgID), |
| | | true, false, false, true, 1, false, 0, |
| | | workerThreads.size()); |
| | | try |
| | | { |
| | | IntegerConfigAttribute numThreadsAttr = |
| | | (IntegerConfigAttribute) |
| | | configEntry.getConfigAttribute(numThreadsStub); |
| | | if (numThreadsAttr == null) |
| | | { |
| | | // This means that the entry doesn't contain the attribute. This is |
| | | // fine, since we'll just use the default. |
| | | } |
| | | else |
| | | { |
| | | int numWorkerThreads = numThreadsAttr.activeIntValue(); |
| | | if (numWorkerThreads <= 0) |
| | | { |
| | | //This is not valid. The number of worker threads must be a positive |
| | | // integer. |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_NUM_THREADS_INVALID_VALUE; |
| | | String message = getMessage(msgID, |
| | | String.valueOf(configEntryDN), |
| | | numWorkerThreads); |
| | | unacceptableReasons.add(message); |
| | | configIsAcceptable = false; |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_NUM_WORKER_THREADS; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | String.valueOf(e)); |
| | | unacceptableReasons.add(message); |
| | | configIsAcceptable = false; |
| | | } |
| | | |
| | | |
| | | // Check the configuration for the maximum work queue capacity. |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_MAX_CAPACITY; |
| | | IntegerConfigAttribute capacityStub = |
| | | new IntegerConfigAttribute(ATTR_MAX_WORK_QUEUE_CAPACITY, |
| | | getMessage(msgID), true, false, false, true, |
| | | 0, false, 0, |
| | | maxCapacity); |
| | | try |
| | | { |
| | | IntegerConfigAttribute capacityAttr = |
| | | (IntegerConfigAttribute) |
| | | configEntry.getConfigAttribute(capacityStub); |
| | | if (capacityAttr == null) |
| | | { |
| | | //This means that the entry doesn't contain the attribute. This is |
| | | // fine, since we'll just use the default. |
| | | } |
| | | else |
| | | { |
| | | int newMaxCapacity = capacityAttr.activeIntValue(); |
| | | if (newMaxCapacity < 0) |
| | | { |
| | | // This is not valid. The maximum capacity must be greater than or |
| | | // equal to zero. |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CAPACITY_INVALID_VALUE; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | newMaxCapacity); |
| | | unacceptableReasons.add(message); |
| | | configIsAcceptable = false; |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_QUEUE_CAPACITY; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | String.valueOf(e)); |
| | | unacceptableReasons.add(message); |
| | | configIsAcceptable = false; |
| | | } |
| | | |
| | | |
| | | return configIsAcceptable; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Makes a best-effort attempt to apply the configuration contained in the |
| | | * provided entry. Information about the result of this processing should be |
| | | * added to the provided message list. Information should always be added to |
| | | * this list if a configuration change could not be applied. If detailed |
| | | * results are requested, then information about the changes applied |
| | | * successfully (and optionally about parameters that were not changed) should |
| | | * also be included. |
| | | * |
| | | * @param configEntry The entry containing the new configuration to |
| | | * apply for this component. |
| | | * @param detailedResults Indicates whether detailed information about the |
| | | * processing should be added to the list. |
| | | * |
| | | * @return Information about the result of the configuration update. |
| | | */ |
| | | public ConfigChangeResult applyNewConfiguration(ConfigEntry configEntry, |
| | | boolean detailedResults) |
| | | @Override() |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | TraditionalWorkQueueCfg configuration) |
| | | { |
| | | ArrayList<String> resultMessages = new ArrayList<String>(); |
| | | int newNumThreads; |
| | | int newMaxCapacity; |
| | | |
| | | |
| | | // Check the configuration for the number of worker threads. |
| | | int msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_NUM_THREADS; |
| | | IntegerConfigAttribute numThreadsStub = |
| | | new IntegerConfigAttribute(ATTR_NUM_WORKER_THREADS, getMessage(msgID), |
| | | true, false, false, true, 1, false, 0, |
| | | workerThreads.size()); |
| | | try |
| | | { |
| | | IntegerConfigAttribute numThreadsAttr = |
| | | (IntegerConfigAttribute) |
| | | configEntry.getConfigAttribute(numThreadsStub); |
| | | if (numThreadsAttr == null) |
| | | { |
| | | // This means that the entry doesn't contain the attribute. This is |
| | | // fine, since we'll just use the default. |
| | | newNumThreads = DEFAULT_NUM_WORKER_THREADS; |
| | | } |
| | | else |
| | | { |
| | | newNumThreads = numThreadsAttr.activeIntValue(); |
| | | if (newNumThreads <= 0) |
| | | { |
| | | //This is not valid. The number of worker threads must be a positive |
| | | // integer. This should never happen since it should be filtered out |
| | | // by the hasAcceptableConfiguration method, but if it does for some |
| | | // reason then handle it. |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_NUM_THREADS_INVALID_VALUE; |
| | | String message = getMessage(msgID, |
| | | String.valueOf(configEntryDN), |
| | | newNumThreads); |
| | | resultMessages.add(message); |
| | | newNumThreads = DEFAULT_NUM_WORKER_THREADS; |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_NUM_WORKER_THREADS; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | String.valueOf(e)); |
| | | resultMessages.add(message); |
| | | newNumThreads = DEFAULT_NUM_WORKER_THREADS; |
| | | } |
| | | |
| | | |
| | | // Check the configuration for the maximum work queue capacity. |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_MAX_CAPACITY; |
| | | IntegerConfigAttribute capacityStub = |
| | | new IntegerConfigAttribute(ATTR_MAX_WORK_QUEUE_CAPACITY, |
| | | getMessage(msgID), true, false, false, true, |
| | | 0, false, 0, |
| | | maxCapacity); |
| | | try |
| | | { |
| | | IntegerConfigAttribute capacityAttr = |
| | | (IntegerConfigAttribute) |
| | | configEntry.getConfigAttribute(capacityStub); |
| | | if (capacityAttr == null) |
| | | { |
| | | //This means that the entry doesn't contain the attribute. This is |
| | | // fine, since we'll just use the default. |
| | | newMaxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY; |
| | | } |
| | | else |
| | | { |
| | | newMaxCapacity = capacityAttr.activeIntValue(); |
| | | if (newMaxCapacity < 0) |
| | | { |
| | | // This is not valid. The maximum capacity must be greater than or |
| | | // equal to zero. |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CAPACITY_INVALID_VALUE; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | newMaxCapacity); |
| | | resultMessages.add(message); |
| | | newMaxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY; |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_QUEUE_CAPACITY; |
| | | String message = getMessage(msgID, String.valueOf(configEntryDN), |
| | | String.valueOf(e)); |
| | | resultMessages.add(message); |
| | | newMaxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY; |
| | | } |
| | | int newNumThreads = configuration.getNumWorkerThreads(); |
| | | int newMaxCapacity = configuration.getMaxWorkQueueCapacity(); |
| | | |
| | | |
| | | // Apply a change to the number of worker threads if appropriate. |
| | |
| | | t.start(); |
| | | } |
| | | |
| | | if (detailedResults) |
| | | { |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_CREATED_THREADS; |
| | | String message = getMessage(msgID, threadsToAdd, newNumThreads); |
| | | resultMessages.add(message); |
| | | } |
| | | |
| | | killThreads = false; |
| | | } |
| | | else |
| | | { |
| | | if (detailedResults) |
| | | { |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_DESTROYING_THREADS; |
| | | String message = getMessage(msgID, Math.abs(threadsToAdd), |
| | | newNumThreads); |
| | | resultMessages.add(message); |
| | | } |
| | | |
| | | killThreads = true; |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | if (detailedResults) |
| | | { |
| | | msgID = MSGID_CONFIG_WORK_QUEUE_NEW_CAPACITY; |
| | | String message = getMessage(msgID, newMaxCapacity); |
| | | resultMessages.add(message); |
| | | } |
| | | |
| | | maxCapacity = newMaxCapacity; |
| | | } |
| | | catch (Exception e) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override() |
| | | public boolean isIdle() |
| | | { |
| | | if (opQueue.size() > 0) |