From 47be44124da7f6ad42bed03a24701ca07c00918d Mon Sep 17 00:00:00 2001
From: neil_a_wilson <neil_a_wilson@localhost>
Date: Wed, 12 Sep 2007 00:03:02 +0000
Subject: [PATCH] Make a couple of changes to help improve server performance:
---
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java | 255 +++++++++++++++++++++++---------------------------
1 files changed, 118 insertions(+), 137 deletions(-)
diff --git a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
index 75ae755..bec5d7e 100644
--- a/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
+++ b/opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -25,7 +25,6 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.extensions;
-import org.opends.messages.Message;
@@ -36,14 +35,16 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
+import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.TraditionalWorkQueueCfg;
import org.opends.server.api.WorkQueue;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.monitors.TraditionalWorkQueueMonitor;
+import org.opends.server.types.AbstractOperation;
import org.opends.server.types.CancelRequest;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DebugLogLevel;
@@ -53,12 +54,10 @@
import org.opends.server.types.Operation;
import org.opends.server.types.ResultCode;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.CoreMessages.*;
-import org.opends.server.types.AbstractOperation;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -123,7 +122,7 @@
private LinkedBlockingQueue<AbstractOperation> opQueue;
// The lock used to provide threadsafe access for the queue.
- private ReentrantLock queueLock;
+ private Object queueLock;
@@ -149,7 +148,7 @@
killThreads = false;
opsSubmitted = new AtomicLong(0);
queueFullRejects = new AtomicLong(0);
- queueLock = new ReentrantLock();
+ queueLock = new Object();
// Register to be notified of any configuration changes.
@@ -354,38 +353,35 @@
// so, then return null and the thread will exit.
if (killThreads)
{
- queueLock.lock();
-
- try
+ synchronized (queueLock)
{
- int currentThreads = workerThreads.size();
- if (currentThreads > numWorkerThreads)
+ try
{
- if (workerThreads.remove(Thread.currentThread()))
+ int currentThreads = workerThreads.size();
+ if (currentThreads > numWorkerThreads)
{
- currentThreads--;
- }
+ if (workerThreads.remove(Thread.currentThread()))
+ {
+ currentThreads--;
+ }
- if (currentThreads <= numWorkerThreads)
- {
- killThreads = false;
- }
+ if (currentThreads <= numWorkerThreads)
+ {
+ killThreads = false;
+ }
- workerThread.setStoppedByReducedThreadNumber();
- return null;
+ workerThread.setStoppedByReducedThreadNumber();
+ return null;
+ }
}
- }
- catch (Exception e)
- {
- if (debugEnabled())
+ catch (Exception e)
{
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
}
- finally
- {
- queueLock.unlock();
- }
}
if ((shutdownRequested) || (numFailures > MAX_RETRY_COUNT))
@@ -415,38 +411,35 @@
}
else if (killThreads)
{
- queueLock.lock();
-
- try
+ synchronized (queueLock)
{
- int currentThreads = workerThreads.size();
- if (currentThreads > numWorkerThreads)
+ try
{
- if (workerThreads.remove(Thread.currentThread()))
+ int currentThreads = workerThreads.size();
+ if (currentThreads > numWorkerThreads)
{
- currentThreads--;
- }
+ if (workerThreads.remove(Thread.currentThread()))
+ {
+ currentThreads--;
+ }
- if (currentThreads <= numWorkerThreads)
- {
- killThreads = false;
- }
+ if (currentThreads <= numWorkerThreads)
+ {
+ killThreads = false;
+ }
- workerThread.setStoppedByReducedThreadNumber();
- return null;
+ workerThread.setStoppedByReducedThreadNumber();
+ return null;
+ }
}
- }
- catch (Exception e)
- {
- if (debugEnabled())
+ catch (Exception e)
{
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
}
- finally
- {
- queueLock.unlock();
- }
}
}
else
@@ -581,40 +574,37 @@
int currentThreads = workerThreads.size();
if (newNumThreads != currentThreads)
{
- queueLock.lock();
-
- try
+ synchronized (queueLock)
{
- int threadsToAdd = newNumThreads - currentThreads;
- if (threadsToAdd > 0)
+ try
{
- for (int i=0; i < threadsToAdd; i++)
+ int threadsToAdd = newNumThreads - currentThreads;
+ if (threadsToAdd > 0)
{
- TraditionalWorkerThread t =
- new TraditionalWorkerThread(this, lastThreadNumber++);
- workerThreads.add(t);
- t.start();
+ for (int i=0; i < threadsToAdd; i++)
+ {
+ TraditionalWorkerThread t =
+ new TraditionalWorkerThread(this, lastThreadNumber++);
+ workerThreads.add(t);
+ t.start();
+ }
+
+ killThreads = false;
+ }
+ else
+ {
+ killThreads = true;
}
- killThreads = false;
+ numWorkerThreads = newNumThreads;
}
- else
+ catch (Exception e)
{
- killThreads = true;
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
-
- numWorkerThreads = newNumThreads;
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- finally
- {
- queueLock.unlock();
}
}
@@ -626,68 +616,65 @@
// checks will be against the new queue.
if (newMaxCapacity != maxCapacity)
{
- queueLock.lock();
-
- try
+ synchronized (queueLock)
{
- LinkedBlockingQueue<AbstractOperation> newOpQueue;
- if (newMaxCapacity > 0)
+ try
{
- newOpQueue =
- new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
- }
- else
- {
- newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
- }
-
- LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
- opQueue = newOpQueue;
-
- LinkedList<AbstractOperation> pendingOps =
- new LinkedList<AbstractOperation>();
- oldOpQueue.drainTo(pendingOps);
-
-
- // We have to be careful when adding any existing pending operations
- // because the new capacity could be less than what was already
- // backlogged in the previous queue. If that happens, we may have to
- // loop a few times to get everything in there.
- while (! pendingOps.isEmpty())
- {
- Iterator<AbstractOperation> iterator = pendingOps.iterator();
- while (iterator.hasNext())
+ LinkedBlockingQueue<AbstractOperation> newOpQueue;
+ if (newMaxCapacity > 0)
{
- AbstractOperation o = iterator.next();
- try
+ newOpQueue =
+ new LinkedBlockingQueue<AbstractOperation>(newMaxCapacity);
+ }
+ else
+ {
+ newOpQueue = new LinkedBlockingQueue<AbstractOperation>();
+ }
+
+ LinkedBlockingQueue<AbstractOperation> oldOpQueue = opQueue;
+ opQueue = newOpQueue;
+
+ LinkedList<AbstractOperation> pendingOps =
+ new LinkedList<AbstractOperation>();
+ oldOpQueue.drainTo(pendingOps);
+
+
+ // We have to be careful when adding any existing pending operations
+ // because the new capacity could be less than what was already
+ // backlogged in the previous queue. If that happens, we may have to
+ // loop a few times to get everything in there.
+ while (! pendingOps.isEmpty())
+ {
+ Iterator<AbstractOperation> iterator = pendingOps.iterator();
+ while (iterator.hasNext())
{
- if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
+ AbstractOperation o = iterator.next();
+ try
{
- iterator.remove();
+ if (newOpQueue.offer(o, 1000, TimeUnit.MILLISECONDS))
+ {
+ iterator.remove();
+ }
}
- }
- catch (InterruptedException ie)
- {
- if (debugEnabled())
+ catch (InterruptedException ie)
{
- TRACER.debugCaught(DebugLogLevel.ERROR, ie);
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, ie);
+ }
}
}
}
- }
- maxCapacity = newMaxCapacity;
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ maxCapacity = newMaxCapacity;
}
- }
- finally
- {
- queueLock.unlock();
+ catch (Exception e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
}
}
@@ -708,9 +695,7 @@
return false;
}
- queueLock.lock();
-
- try
+ synchronized (queueLock)
{
for (TraditionalWorkerThread t : workerThreads)
{
@@ -722,10 +707,6 @@
return true;
}
- finally
- {
- queueLock.unlock();
- }
}
}
--
Gitblit v1.10.0