mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
22.28.2013 6945df4500d706d53a192c78d63e7d2c7e83258c
opendj-sdk/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -51,7 +51,13 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.monitors.TraditionalWorkQueueMonitor;
import org.opends.server.types.*;
import org.opends.server.types.CancelRequest;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Operation;
import org.opends.server.types.ResultCode;
@@ -176,7 +182,8 @@
      configuration.addTraditionalChangeListener(this);
      // Get the necessary configuration from the provided entry.
      numWorkerThreads = getNumWorkerThreads(configuration);
      numWorkerThreads =
          computeNumWorkerThreads(configuration.getNumWorkerThreads());
      maxCapacity = configuration.getMaxWorkQueueCapacity();
      // Create the actual work queue.
@@ -334,6 +341,32 @@
  @Override
  public void submitOperation(Operation operation) throws DirectoryException
  {
    submitOperation(operation, isBlocking);
  }
  /** {@inheritDoc} */
  @Override
  public boolean trySubmitOperation(Operation operation)
      throws DirectoryException
  {
    try
    {
      submitOperation(operation, false);
      return true;
    }
    catch (DirectoryException e)
    {
      if (ResultCode.BUSY == e.getResultCode())
      {
        return false;
      }
      throw e;
    }
  }
  private void submitOperation(Operation operation,
      boolean blockEnqueuingWhenFull) throws DirectoryException
  {
    queueReadLock.lock();
    try
    {
@@ -343,7 +376,7 @@
        throw new DirectoryException(ResultCode.UNAVAILABLE, message);
      }
      if (isBlocking)
      if (blockEnqueuingWhenFull)
      {
        try
        {
@@ -659,7 +692,8 @@
      TraditionalWorkQueueCfg configuration)
  {
    ArrayList<Message> resultMessages = new ArrayList<Message>();
    int newNumThreads = getNumWorkerThreads(configuration);
    int newNumThreads =
        computeNumWorkerThreads(configuration.getNumWorkerThreads());
    int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
    // Apply a change to the number of worker threads if appropriate.
@@ -808,25 +842,14 @@
    }
  }
  // Determine the number of worker threads.
  private int getNumWorkerThreads(TraditionalWorkQueueCfg configuration)
  /**
   * Return the number of worker threads used by this WorkQueue.
   *
   * @return the number of worker threads used by this WorkQueue
   */
  @Override
  public int getNumWorkerThreads()
  {
    if (configuration.getNumWorkerThreads() == null)
    {
      // Automatically choose based on the number of processors.
      int cpus = Runtime.getRuntime().availableProcessors();
      int value = Math.max(24, cpus * 2);
      Message message = INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL.get(value);
      logError(message);
      return value;
    }
    else
    {
      return configuration.getNumWorkerThreads();
    }
    return this.numWorkerThreads;
  }
}