From 47be44124da7f6ad42bed03a24701ca07c00918d Mon Sep 17 00:00:00 2001
From: neil_a_wilson <neil_a_wilson@localhost>
Date: Wed, 12 Sep 2007 00:03:02 +0000
Subject: [PATCH] Make a couple of changes to help improve server performance:

---
 opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java |  255 +++++++++++++++++++++++---------------------------
 1 files changed, 118 insertions(+), 137 deletions(-)

diff --git a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
index 75ae755..bec5d7e 100644
--- a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
+++ b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -25,7 +25,6 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.extensions;
-import org.opends.messages.Message;
 
 
 
@@ -36,14 +35,16 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 
+import org.opends.messages.Message;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.server.TraditionalWorkQueueCfg;
 import org.opends.server.api.WorkQueue;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.monitors.TraditionalWorkQueueMonitor;
+import org.opends.server.types.AbstractOperation;
 import org.opends.server.types.CancelRequest;
 import org.opends.server.types.ConfigChangeResult;
 import org.opends.server.types.DebugLogLevel;
@@ -53,12 +54,10 @@
 import org.opends.server.types.Operation;
 import org.opends.server.types.ResultCode;
 
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.messages.ConfigMessages.*;
 import static org.opends.messages.CoreMessages.*;
-import org.opends.server.types.AbstractOperation;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
 
 
 
@@ -123,7 +122,7 @@
   private LinkedBlockingQueue<AbstractOperation> opQueue;
 
   // The lock used to provide threadsafe access for the queue.
-  private ReentrantLock queueLock;
+  private Object queueLock;
 
 
 
@@ -149,7 +148,7 @@
     killThreads       = false;
     opsSubmitted      = new AtomicLong(0);
     queueFullRejects  = new AtomicLong(0);
-    queueLock         = new ReentrantLock();
+    queueLock         = new Object();
 
 
     // Register to be notified of any configuration changes.
@@ -354,38 +353,35 @@
     // so, then return null and the thread will exit.
     if (killThreads)
     {
-      queueLock.lock();
-
-      try
+      synchronized (queueLock)
       {
-        int currentThreads = workerThreads.size();
-        if (currentThreads > numWorkerThreads)
+        try
         {
-          if (workerThreads.remove(Thread.currentThread()))
+          int currentThreads = workerThreads.size();
+          if (currentThreads > numWorkerThreads)
           {
-            currentThreads--;
-          }
+            if (workerThreads.remove(Thread.currentThread()))
+            {
+              currentThreads--;
+            }
 
-          if (currentThreads <= numWorkerThreads)
-          {
-            killThreads = false;
-          }
+            if (currentThreads <= numWorkerThreads)
+            {
+              killThreads = false;
+            }
 
-          workerThread.setStoppedByReducedThreadNumber();
-          return null;
+            workerThread.setStoppedByReducedThreadNumber();
+            return null;
+          }
         }
-      }
-      catch (Exception e)
-      {
-        if (debugEnabled())
+        catch (Exception e)
         {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
         }
       }
-      finally
-      {
-        queueLock.unlock();
-      }
     }
 
     if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
@@ -415,38 +411,35 @@
           }
           else if (killThreads)
           {
-            queueLock.lock();
-
-            try
+            synchronized (queueLock)
             {
-              int currentThreads = workerThreads.size();
-              if (currentThreads > numWorkerThreads)
+              try
               {
-                if (workerThreads.remove(Thread.currentThread()))
+                int currentThreads = workerThreads.size();
+                if (currentThreads > numWorkerThreads)
                 {
-                  currentThreads--;
-                }
+                  if (workerThreads.remove(Thread.currentThread()))
+                  {
+                    currentThreads--;
+                  }
 
-                if (currentThreads <= numWorkerThreads)
-                {
-                  killThreads = false;
-                }
+                  if (currentThreads <= numWorkerThreads)
+                  {
+                    killThreads = false;
+                  }
 
-                workerThread.setStoppedByReducedThreadNumber();
-                return null;
+                  workerThread.setStoppedByReducedThreadNumber();
+                  return null;
+                }
               }
-            }
-            catch (Exception e)
-            {
-              if (debugEnabled())
+              catch (Exception e)
               {
-                TRACER.debugCaught(DebugLogLevel.ERROR, e);
+                if (debugEnabled())
+                {
+                  TRACER.debugCaught(DebugLogLevel.ERROR, e);
+                }
               }
             }
-            finally
-            {
-              queueLock.unlock();
-            }
           }
         }
         else
@@ -581,40 +574,37 @@
     int currentThreads = workerThreads.size();
     if (newNumThreads != currentThreads)
     {
-      queueLock.lock();
-
-      try
+      synchronized (queueLock)
       {
-        int threadsToAdd = newNumThreads - currentThreads;
-        if (threadsToAdd > 0)
+        try
         {
-          for (int i=0; i < threadsToAdd; i++)
+          int threadsToAdd = newNumThreads - currentThreads;
+          if (threadsToAdd > 0)
           {
-            TraditionalWorkerThread t =
-                 new TraditionalWorkerThread(this, lastThreadNumber++);
-            workerThreads.add(t);
-            t.start();
+            for (int i=0; i < threadsToAdd; i++)
+            {
+              TraditionalWorkerThread t =
+                   new TraditionalWorkerThread(this, lastThreadNumber++);
+              workerThreads.add(t);
+              t.start();
+            }
+
+            killThreads = false;
+          }
+          else
+          {
+            killThreads = true;
           }
 
-          killThreads = false;
+          numWorkerThreads = newNumThreads;
         }
-        else
+        catch (Exception e)
         {
-          killThreads = true;
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
         }
-
-        numWorkerThreads = newNumThreads;
-      }
-      catch (Exception e)
-      {
-        if (debugEnabled())
-        {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        }
-      }
-      finally
-      {
-        queueLock.unlock();
       }
     }
 
@@ -626,68 +616,65 @@
     // checks will be against the new queue.
     if (newMaxCapacity != maxCapacity)
     {
-      queueLock.lock();
-
-      try
+      synchronized (queueLock)
       {
-        LinkedBlockingQueue<AbstractOperation> newOpQueue;
-        if (newMaxCapacity > 0)
+        try
         {
-          newOpQueue =
-            new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
-        }
-        else
-        {
-          newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
-        }
-
-        LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
-        opQueue = newOpQueue;
-
-        LinkedList<AbstractOperation> pendingOps =
-          new LinkedList<AbstractOperation>();
-        oldOpQueue.drainTo(pendingOps);
-
-
-        // We have to be careful when adding any existing pending operations
-        // because the new capacity could be less than what was already
-        // backlogged in the previous queue.  If that happens, we may have to
-        // loop a few times to get everything in there.
-        while (! pendingOps.isEmpty())
-        {
-          Iterator<AbstractOperation> iterator = pendingOps.iterator();
-          while (iterator.hasNext())
+          LinkedBlockingQueue<AbstractOperation> newOpQueue;
+          if (newMaxCapacity > 0)
           {
-            AbstractOperation o = iterator.next();
-            try
+            newOpQueue =
+              new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
+          }
+          else
+          {
+            newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
+          }
+
+          LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
+          opQueue = newOpQueue;
+
+          LinkedList<AbstractOperation> pendingOps =
+            new LinkedList<AbstractOperation>();
+          oldOpQueue.drainTo(pendingOps);
+
+
+          // We have to be careful when adding any existing pending operations
+          // because the new capacity could be less than what was already
+          // backlogged in the previous queue.  If that happens, we may have to
+          // loop a few times to get everything in there.
+          while (! pendingOps.isEmpty())
+          {
+            Iterator<AbstractOperation> iterator = pendingOps.iterator();
+            while (iterator.hasNext())
             {
-              if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
+              AbstractOperation o = iterator.next();
+              try
               {
-                iterator.remove();
+                if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
+                {
+                  iterator.remove();
+                }
               }
-            }
-            catch (InterruptedException ie)
-            {
-              if (debugEnabled())
+              catch (InterruptedException ie)
               {
-                TRACER.debugCaught(DebugLogLevel.ERROR, ie);
+                if (debugEnabled())
+                {
+                  TRACER.debugCaught(DebugLogLevel.ERROR, ie);
+                }
               }
             }
           }
-        }
 
-        maxCapacity = newMaxCapacity;
-      }
-      catch (Exception e)
-      {
-        if (debugEnabled())
-        {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          maxCapacity = newMaxCapacity;
         }
-      }
-      finally
-      {
-        queueLock.unlock();
+        catch (Exception e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
+        }
       }
     }
 
@@ -708,9 +695,7 @@
       return false;
     }
 
-    queueLock.lock();
-
-    try
+    synchronized (queueLock)
     {
       for (TraditionalWorkerThread t : workerThreads)
       {
@@ -722,10 +707,6 @@
 
       return true;
     }
-    finally
-    {
-      queueLock.unlock();
-    }
   }
 }
 

--
Gitblit v1.10.0