mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

neil_a_wilson
28.07.2007 042b01a7f8476852bc8610abbd44d49c8119959d
opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -37,12 +37,10 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.TraditionalWorkQueueCfg;
import org.opends.server.api.WorkQueue;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.core.DirectoryServer;
import org.opends.server.monitors.TraditionalWorkQueueMonitor;
import org.opends.server.types.CancelRequest;
@@ -71,8 +69,8 @@
 * Directory Server work queue.
 */
public class TraditionalWorkQueue
       extends WorkQueue
       implements ConfigurableComponent
       extends WorkQueue<TraditionalWorkQueueCfg>
       implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
{
  /**
   * The tracer object for the debug logger.
@@ -145,7 +143,8 @@
  /**
   * {@inheritDoc}
   */
  public void initializeWorkQueue(ConfigEntry configEntry)
  @Override()
  public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
         throws ConfigException, InitializationException
  {
    shutdownRequested = false;
@@ -155,104 +154,14 @@
    queueLock         = new ReentrantLock();
    // Register to be notified of any configuration changes.
    configuration.addTraditionalChangeListener(this);
    // Get the necessary configuration from the provided entry.
    configEntryDN = configEntry.getDN();
    int msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_NUM_THREADS;
    IntegerConfigAttribute numThreadsStub =
      new IntegerConfigAttribute(ATTR_NUM_WORKER_THREADS, getMessage(msgID),
                                 true, false, false, true, 1, false, 0,
                                 DEFAULT_NUM_WORKER_THREADS);
    try
    {
      IntegerConfigAttribute numThreadsAttr =
        (IntegerConfigAttribute)
        configEntry.getConfigAttribute(numThreadsStub);
      if (numThreadsAttr == null)
      {
        numWorkerThreads = DEFAULT_NUM_WORKER_THREADS;
      }
      else
      {
        numWorkerThreads = numThreadsAttr.activeIntValue();
        if (numWorkerThreads <= 0)
        {
          //This is not valid.  The number of worker threads must be a positive
          // integer.
          msgID = MSGID_CONFIG_WORK_QUEUE_NUM_THREADS_INVALID_VALUE;
          String message = getMessage(msgID,
                                      String.valueOf(configEntryDN),
                                      numWorkerThreads);
          logError(ErrorLogCategory.CONFIGURATION,
                   ErrorLogSeverity.SEVERE_WARNING, message, msgID);
          numWorkerThreads = DEFAULT_NUM_WORKER_THREADS;
        }
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_NUM_WORKER_THREADS;
      String message = getMessage(msgID, String.valueOf(configEntryDN),
                                  String.valueOf(e));
      logError(ErrorLogCategory.CONFIGURATION, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      numWorkerThreads = DEFAULT_NUM_WORKER_THREADS;
    }
    msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_MAX_CAPACITY;
    IntegerConfigAttribute capacityStub =
      new IntegerConfigAttribute(ATTR_MAX_WORK_QUEUE_CAPACITY,
                                 getMessage(msgID), true, false, false, true,
                                 0, false, 0,
                                 DEFAULT_MAX_WORK_QUEUE_CAPACITY);
    try
    {
      IntegerConfigAttribute capacityAttr =
        (IntegerConfigAttribute)
        configEntry.getConfigAttribute(capacityStub);
      if (capacityAttr == null)
      {
        maxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY;
      }
      else
      {
        maxCapacity = capacityAttr.activeIntValue();
        if (maxCapacity < 0)
        {
          // This is not valid.  The maximum capacity must be greater than or
          // equal to zero.
          msgID = MSGID_CONFIG_WORK_QUEUE_CAPACITY_INVALID_VALUE;
          String message = getMessage(msgID, String.valueOf(configEntryDN),
                                      maxCapacity);
          logError(ErrorLogCategory.CONFIGURATION,
                   ErrorLogSeverity.SEVERE_WARNING, message, msgID);
          maxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY;
        }
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_QUEUE_CAPACITY;
      String message = getMessage(msgID, String.valueOf(configEntryDN),
                                  String.valueOf(e));
      logError(ErrorLogCategory.CONFIGURATION, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      maxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY;
    }
    configEntryDN    = configuration.dn();
    numWorkerThreads = configuration.getNumWorkerThreads();
    maxCapacity      = configuration.getMaxWorkQueueCapacity();
    // Create the actual work queue.
@@ -279,10 +188,6 @@
    }
    // Register with the Directory Server as a configurable component.
    DirectoryServer.registerConfigurableComponent(this);
    // Create and register a monitor provider for the work queue.
    try
    {
@@ -299,7 +204,7 @@
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR;
      int msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR;
      String message = getMessage(msgID, TraditionalWorkQueueMonitor.class,
                                  String.valueOf(e));
      logError(ErrorLogCategory.CORE_SERVER, ErrorLogSeverity.SEVERE_ERROR,
@@ -312,6 +217,7 @@
  /**
   * {@inheritDoc}
   */
  @Override()
  public void finalizeWorkQueue(String reason)
  {
    shutdownRequested = true;
@@ -389,6 +295,7 @@
   *                              down or the pending operation queue is already
   *                              at its maximum capacity).
   */
  @Override()
  public void submitOperation(Operation operation)
         throws DirectoryException
  {
@@ -659,291 +566,29 @@
  /**
   * Retrieves the DN of the configuration entry with which this component is
   * associated.
   *
   * @return  The DN of the configuration entry with which this component is
   *          associated.
   * {@inheritDoc}
   */
  public DN getConfigurableComponentEntryDN()
  @Override()
  public boolean isConfigurationChangeAcceptable(
                      TraditionalWorkQueueCfg configuration,
                      List<String> unacceptableReasons)
  {
    return configEntryDN;
    // The provided configuration will always be acceptable.
    return true;
  }
  /**
   * Retrieves the set of configuration attributes that are associated with this
   * configurable component.
   *
   * @return  The set of configuration attributes that are associated with this
   *          configurable component.
   * {@inheritDoc}
   */
  public List<ConfigAttribute> getConfigurationAttributes()
  {
    LinkedList<ConfigAttribute> attrList = new LinkedList<ConfigAttribute>();
    int msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_NUM_THREADS;
    IntegerConfigAttribute numThreadsAttr =
      new IntegerConfigAttribute(ATTR_NUM_WORKER_THREADS, getMessage(msgID),
                                 true, false, false, true, 1, false, 0,
                                 workerThreads.size());
    attrList.add(numThreadsAttr);
    msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_MAX_CAPACITY;
    IntegerConfigAttribute capacityAttr =
      new IntegerConfigAttribute(ATTR_MAX_WORK_QUEUE_CAPACITY,
                                 getMessage(msgID), true, false, false, true,
                                 0, false, 0, maxCapacity);
    attrList.add(capacityAttr);
    return attrList;
  }
  /**
   * Indicates whether the provided configuration entry has an acceptable
   * configuration for this component.  If it does not, then detailed
   * information about the problem(s) should be added to the provided list.
   *
   * @param  configEntry          The configuration entry for which to make the
   *                              determination.
   * @param  unacceptableReasons  A list that can be used to hold messages about
   *                              why the provided entry does not have an
   *                              acceptable configuration.
   *
   * @return  <CODE>true</CODE> if the provided entry has an acceptable
   *          configuration for this component, or <CODE>false</CODE> if not.
   */
  public boolean hasAcceptableConfiguration(ConfigEntry configEntry,
                                            List<String> unacceptableReasons)
  {
    boolean configIsAcceptable = true;
    // Check the configuration for the number of worker threads.
    int msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_NUM_THREADS;
    IntegerConfigAttribute numThreadsStub =
      new IntegerConfigAttribute(ATTR_NUM_WORKER_THREADS, getMessage(msgID),
                                 true, false, false, true, 1, false, 0,
                                 workerThreads.size());
    try
    {
      IntegerConfigAttribute numThreadsAttr =
        (IntegerConfigAttribute)
        configEntry.getConfigAttribute(numThreadsStub);
      if (numThreadsAttr == null)
      {
        // This means that the entry doesn't contain the attribute.  This is
        // fine, since we'll just use the default.
      }
      else
      {
        int numWorkerThreads = numThreadsAttr.activeIntValue();
        if (numWorkerThreads <= 0)
        {
          //This is not valid.  The number of worker threads must be a positive
          // integer.
          msgID = MSGID_CONFIG_WORK_QUEUE_NUM_THREADS_INVALID_VALUE;
          String message = getMessage(msgID,
                                      String.valueOf(configEntryDN),
                                      numWorkerThreads);
          unacceptableReasons.add(message);
          configIsAcceptable = false;
        }
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_NUM_WORKER_THREADS;
      String message = getMessage(msgID, String.valueOf(configEntryDN),
                                  String.valueOf(e));
      unacceptableReasons.add(message);
      configIsAcceptable = false;
    }
    // Check the configuration for the maximum work queue capacity.
    msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_MAX_CAPACITY;
    IntegerConfigAttribute capacityStub =
      new IntegerConfigAttribute(ATTR_MAX_WORK_QUEUE_CAPACITY,
                                 getMessage(msgID), true, false, false, true,
                                 0, false, 0,
                                 maxCapacity);
    try
    {
      IntegerConfigAttribute capacityAttr =
        (IntegerConfigAttribute)
        configEntry.getConfigAttribute(capacityStub);
      if (capacityAttr == null)
      {
        //This means that the entry doesn't contain the attribute.  This is
        // fine, since we'll just use the default.
      }
      else
      {
        int newMaxCapacity = capacityAttr.activeIntValue();
        if (newMaxCapacity < 0)
        {
          // This is not valid.  The maximum capacity must be greater than or
          // equal to zero.
          msgID = MSGID_CONFIG_WORK_QUEUE_CAPACITY_INVALID_VALUE;
          String message = getMessage(msgID, String.valueOf(configEntryDN),
                                      newMaxCapacity);
          unacceptableReasons.add(message);
          configIsAcceptable = false;
        }
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_QUEUE_CAPACITY;
      String message = getMessage(msgID, String.valueOf(configEntryDN),
                                  String.valueOf(e));
      unacceptableReasons.add(message);
      configIsAcceptable = false;
    }
    return configIsAcceptable;
  }
  /**
   * Makes a best-effort attempt to apply the configuration contained in the
   * provided entry.  Information about the result of this processing should be
   * added to the provided message list.  Information should always be added to
   * this list if a configuration change could not be applied.  If detailed
   * results are requested, then information about the changes applied
   * successfully (and optionally about parameters that were not changed) should
   * also be included.
   *
   * @param  configEntry      The entry containing the new configuration to
   *                          apply for this component.
   * @param  detailedResults  Indicates whether detailed information about the
   *                          processing should be added to the list.
   *
   * @return  Information about the result of the configuration update.
   */
  public ConfigChangeResult applyNewConfiguration(ConfigEntry configEntry,
                                                  boolean detailedResults)
  @Override()
  public ConfigChangeResult applyConfigurationChange(
                                 TraditionalWorkQueueCfg configuration)
  {
    ArrayList<String> resultMessages = new ArrayList<String>();
    int newNumThreads;
    int newMaxCapacity;
    // Check the configuration for the number of worker threads.
    int msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_NUM_THREADS;
    IntegerConfigAttribute numThreadsStub =
      new IntegerConfigAttribute(ATTR_NUM_WORKER_THREADS, getMessage(msgID),
                                 true, false, false, true, 1, false, 0,
                                 workerThreads.size());
    try
    {
      IntegerConfigAttribute numThreadsAttr =
        (IntegerConfigAttribute)
        configEntry.getConfigAttribute(numThreadsStub);
      if (numThreadsAttr == null)
      {
        // This means that the entry doesn't contain the attribute.  This is
        // fine, since we'll just use the default.
        newNumThreads = DEFAULT_NUM_WORKER_THREADS;
      }
      else
      {
        newNumThreads = numThreadsAttr.activeIntValue();
        if (newNumThreads <= 0)
        {
          //This is not valid.  The number of worker threads must be a positive
          // integer.  This should never happen since it should be filtered out
          // by the hasAcceptableConfiguration method, but if it does for some
          // reason then handle it.
          msgID = MSGID_CONFIG_WORK_QUEUE_NUM_THREADS_INVALID_VALUE;
          String message = getMessage(msgID,
                                      String.valueOf(configEntryDN),
                                      newNumThreads);
          resultMessages.add(message);
          newNumThreads = DEFAULT_NUM_WORKER_THREADS;
        }
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_NUM_WORKER_THREADS;
      String message = getMessage(msgID, String.valueOf(configEntryDN),
                                  String.valueOf(e));
      resultMessages.add(message);
      newNumThreads = DEFAULT_NUM_WORKER_THREADS;
    }
    // Check the configuration for the maximum work queue capacity.
    msgID = MSGID_CONFIG_WORK_QUEUE_DESCRIPTION_MAX_CAPACITY;
    IntegerConfigAttribute capacityStub =
      new IntegerConfigAttribute(ATTR_MAX_WORK_QUEUE_CAPACITY,
                                 getMessage(msgID), true, false, false, true,
                                 0, false, 0,
                                 maxCapacity);
    try
    {
      IntegerConfigAttribute capacityAttr =
        (IntegerConfigAttribute)
        configEntry.getConfigAttribute(capacityStub);
      if (capacityAttr == null)
      {
        //This means that the entry doesn't contain the attribute.  This is
        // fine, since we'll just use the default.
        newMaxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY;
      }
      else
      {
        newMaxCapacity = capacityAttr.activeIntValue();
        if (newMaxCapacity < 0)
        {
          // This is not valid.  The maximum capacity must be greater than or
          // equal to zero.
          msgID = MSGID_CONFIG_WORK_QUEUE_CAPACITY_INVALID_VALUE;
          String message = getMessage(msgID, String.valueOf(configEntryDN),
                                      newMaxCapacity);
          resultMessages.add(message);
          newMaxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY;
        }
      }
    }
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      msgID = MSGID_CONFIG_WORK_QUEUE_CANNOT_DETERMINE_QUEUE_CAPACITY;
      String message = getMessage(msgID, String.valueOf(configEntryDN),
                                  String.valueOf(e));
      resultMessages.add(message);
      newMaxCapacity = DEFAULT_MAX_WORK_QUEUE_CAPACITY;
    }
    int newNumThreads  = configuration.getNumWorkerThreads();
    int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
    // Apply a change to the number of worker threads if appropriate.
@@ -965,25 +610,10 @@
            t.start();
          }
          if (detailedResults)
          {
            msgID = MSGID_CONFIG_WORK_QUEUE_CREATED_THREADS;
            String message = getMessage(msgID, threadsToAdd, newNumThreads);
            resultMessages.add(message);
          }
          killThreads = false;
        }
        else
        {
          if (detailedResults)
          {
            msgID = MSGID_CONFIG_WORK_QUEUE_DESTROYING_THREADS;
            String message = getMessage(msgID, Math.abs(threadsToAdd),
                                        newNumThreads);
            resultMessages.add(message);
          }
          killThreads = true;
        }
@@ -1058,13 +688,6 @@
          }
        }
        if (detailedResults)
        {
          msgID = MSGID_CONFIG_WORK_QUEUE_NEW_CAPACITY;
          String message = getMessage(msgID, newMaxCapacity);
          resultMessages.add(message);
        }
        maxCapacity = newMaxCapacity;
      }
      catch (Exception e)
@@ -1089,6 +712,7 @@
  /**
   * {@inheritDoc}
   */
  @Override()
  public boolean isIdle()
  {
    if (opQueue.size() > 0)