mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

neil_a_wilson
12.03.2007 47be44124da7f6ad42bed03a24701ca07c00918d
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -25,7 +25,6 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.extensions;
import org.opends.messages.Message;
@@ -36,14 +35,16 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.TraditionalWorkQueueCfg;
import org.opends.server.api.WorkQueue;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.monitors.TraditionalWorkQueueMonitor;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.CancelRequest;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DebugLogLevel;
@@ -53,12 +54,10 @@
import org.opends.server.types.Operation;
import org.opends.server.types.ResultCode;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
import org.opends.server.types.AbstractOperation;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -123,7 +122,7 @@
  private LinkedBlockingQueue<AbstractOperation> opQueue;
  // The lock used to provide threadsafe access for the queue.
  private ReentrantLock queueLock;
  private Object queueLock;
@@ -149,7 +148,7 @@
    killThreads       = false;
    opsSubmitted      = new AtomicLong(0);
    queueFullRejects  = new AtomicLong(0);
    queueLock         = new ReentrantLock();
    queueLock         = new Object();
    // Register to be notified of any configuration changes.
@@ -354,38 +353,35 @@
    // so, then return null and the thread will exit.
    if (killThreads)
    {
      queueLock.lock();
      try
      synchronized (queueLock)
      {
        int currentThreads = workerThreads.size();
        if (currentThreads > numWorkerThreads)
        try
        {
          if (workerThreads.remove(Thread.currentThread()))
          int currentThreads = workerThreads.size();
          if (currentThreads > numWorkerThreads)
          {
            currentThreads--;
          }
            if (workerThreads.remove(Thread.currentThread()))
            {
              currentThreads--;
            }
          if (currentThreads <= numWorkerThreads)
          {
            killThreads = false;
          }
            if (currentThreads <= numWorkerThreads)
            {
              killThreads = false;
            }
          workerThread.setStoppedByReducedThreadNumber();
          return null;
            workerThread.setStoppedByReducedThreadNumber();
            return null;
          }
        }
      }
      catch (Exception e)
      {
        if (debugEnabled())
        catch (Exception e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
      finally
      {
        queueLock.unlock();
      }
    }
    if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
@@ -415,38 +411,35 @@
          }
          else if (killThreads)
          {
            queueLock.lock();
            try
            synchronized (queueLock)
            {
              int currentThreads = workerThreads.size();
              if (currentThreads > numWorkerThreads)
              try
              {
                if (workerThreads.remove(Thread.currentThread()))
                int currentThreads = workerThreads.size();
                if (currentThreads > numWorkerThreads)
                {
                  currentThreads--;
                }
                  if (workerThreads.remove(Thread.currentThread()))
                  {
                    currentThreads--;
                  }
                if (currentThreads <= numWorkerThreads)
                {
                  killThreads = false;
                }
                  if (currentThreads <= numWorkerThreads)
                  {
                    killThreads = false;
                  }
                workerThread.setStoppedByReducedThreadNumber();
                return null;
                  workerThread.setStoppedByReducedThreadNumber();
                  return null;
                }
              }
            }
            catch (Exception e)
            {
              if (debugEnabled())
              catch (Exception e)
              {
                TRACER.debugCaught(DebugLogLevel.ERROR, e);
                if (debugEnabled())
                {
                  TRACER.debugCaught(DebugLogLevel.ERROR, e);
                }
              }
            }
            finally
            {
              queueLock.unlock();
            }
          }
        }
        else
@@ -581,40 +574,37 @@
    int currentThreads = workerThreads.size();
    if (newNumThreads != currentThreads)
    {
      queueLock.lock();
      try
      synchronized (queueLock)
      {
        int threadsToAdd = newNumThreads - currentThreads;
        if (threadsToAdd > 0)
        try
        {
          for (int i=0; i < threadsToAdd; i++)
          int threadsToAdd = newNumThreads - currentThreads;
          if (threadsToAdd > 0)
          {
            TraditionalWorkerThread t =
                 new TraditionalWorkerThread(this, lastThreadNumber++);
            workerThreads.add(t);
            t.start();
            for (int i=0; i < threadsToAdd; i++)
            {
              TraditionalWorkerThread t =
                   new TraditionalWorkerThread(this, lastThreadNumber++);
              workerThreads.add(t);
              t.start();
            }
            killThreads = false;
          }
          else
          {
            killThreads = true;
          }
          killThreads = false;
          numWorkerThreads = newNumThreads;
        }
        else
        catch (Exception e)
        {
          killThreads = true;
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
        numWorkerThreads = newNumThreads;
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
      finally
      {
        queueLock.unlock();
      }
    }
@@ -626,68 +616,65 @@
    // checks will be against the new queue.
    if (newMaxCapacity != maxCapacity)
    {
      queueLock.lock();
      try
      synchronized (queueLock)
      {
        LinkedBlockingQueue<AbstractOperation> newOpQueue;
        if (newMaxCapacity > 0)
        try
        {
          newOpQueue =
            new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
        }
        else
        {
          newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
        }
        LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
        opQueue = newOpQueue;
        LinkedList<AbstractOperation> pendingOps =
          new LinkedList<AbstractOperation>();
        oldOpQueue.drainTo(pendingOps);
        // We have to be careful when adding any existing pending operations
        // because the new capacity could be less than what was already
        // backlogged in the previous queue.  If that happens, we may have to
        // loop a few times to get everything in there.
        while (! pendingOps.isEmpty())
        {
          Iterator<AbstractOperation> iterator = pendingOps.iterator();
          while (iterator.hasNext())
          LinkedBlockingQueue<AbstractOperation> newOpQueue;
          if (newMaxCapacity > 0)
          {
            AbstractOperation o = iterator.next();
            try
            newOpQueue =
              new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
          }
          else
          {
            newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
          }
          LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
          opQueue = newOpQueue;
          LinkedList<AbstractOperation> pendingOps =
            new LinkedList<AbstractOperation>();
          oldOpQueue.drainTo(pendingOps);
          // We have to be careful when adding any existing pending operations
          // because the new capacity could be less than what was already
          // backlogged in the previous queue.  If that happens, we may have to
          // loop a few times to get everything in there.
          while (! pendingOps.isEmpty())
          {
            Iterator<AbstractOperation> iterator = pendingOps.iterator();
            while (iterator.hasNext())
            {
              if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
              AbstractOperation o = iterator.next();
              try
              {
                iterator.remove();
                if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
                {
                  iterator.remove();
                }
              }
            }
            catch (InterruptedException ie)
            {
              if (debugEnabled())
              catch (InterruptedException ie)
              {
                TRACER.debugCaught(DebugLogLevel.ERROR, ie);
                if (debugEnabled())
                {
                  TRACER.debugCaught(DebugLogLevel.ERROR, ie);
                }
              }
            }
          }
        }
        maxCapacity = newMaxCapacity;
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          maxCapacity = newMaxCapacity;
        }
      }
      finally
      {
        queueLock.unlock();
        catch (Exception e)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
      }
    }
@@ -708,9 +695,7 @@
      return false;
    }
    queueLock.lock();
    try
    synchronized (queueLock)
    {
      for (TraditionalWorkerThread t : workerThreads)
      {
@@ -722,10 +707,6 @@
      return true;
    }
    finally
    {
      queueLock.unlock();
    }
  }
}