From 0b99e26e3e75fe58045645706cd3cc8995fda728 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 27 May 2011 16:14:16 +0000
Subject: [PATCH] Fix OPENDJ-127: Back out OpenDS SDK revision 6648 as it prevents the -M options from working, however doing so seems to significantly impact max throughput (around 20%), probably due to context switching during Future.get() calls.
---
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java | 12 +-
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java | 8
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java | 231 +++++----------------------------------------
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java | 13 +-
4 files changed, 43 insertions(+), 221 deletions(-)
diff --git a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java
index e5fc802..de17abf 100644
--- a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java
+++ b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java
@@ -95,10 +95,9 @@
private final class BindUpdateStatsResultHandler extends
UpdateStatsResultHandler<BindResult>
{
- private BindUpdateStatsResultHandler(final long startTime,
- final AsynchronousConnection connection, final ConnectionWorker worker)
+ private BindUpdateStatsResultHandler(final long startTime)
{
- super(startTime, connection, worker);
+ super(startTime);
}
@@ -117,7 +116,7 @@
- private final class BindWorkerThread extends ConnectionWorker
+ private final class BindWorkerThread extends WorkerThread
{
private SearchRequest sr;
private BindRequest br;
@@ -183,7 +182,7 @@
final RecursiveFutureResult<SearchResultEntry, BindResult> future =
new RecursiveFutureResult<SearchResultEntry, BindResult>(
- new BindUpdateStatsResultHandler(startTime, connection, this))
+ new BindUpdateStatsResultHandler(startTime))
{
@Override
protected FutureResult<? extends BindResult> chainResult(
@@ -206,7 +205,7 @@
else
{
return performBind(connection, data,
- new BindUpdateStatsResultHandler(startTime, connection, this));
+ new BindUpdateStatsResultHandler(startTime));
}
}
@@ -418,7 +417,7 @@
@Override
- ConnectionWorker newConnectionWorker(
+ WorkerThread newWorkerThread(
final AsynchronousConnection connection,
final ConnectionFactory connectionFactory)
{
diff --git a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java
index c486260..84d374b 100644
--- a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java
+++ b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java
@@ -53,7 +53,7 @@
{
private static final class ModifyPerformanceRunner extends PerformanceRunner
{
- private final class ModifyWorkerThread extends ConnectionWorker
+ private final class ModifyWorkerThread extends WorkerThread
{
private ModifyRequest mr;
private Object[] data;
@@ -78,8 +78,8 @@
data = DataSource.generateData(dataSources, data);
}
mr = newModifyRequest(data);
- return connection.modify(mr, new UpdateStatsResultHandler<Result>(
- startTime, connection, this));
+ return connection.modify(mr,
+ new UpdateStatsResultHandler<Result>(startTime));
}
@@ -136,7 +136,7 @@
@Override
- ConnectionWorker newConnectionWorker(
+ WorkerThread newWorkerThread(
final AsynchronousConnection connection,
final ConnectionFactory connectionFactory)
{
diff --git a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
index bff44e1..0ee2525 100644
--- a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
+++ b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
@@ -33,14 +33,12 @@
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.*;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.*;
-import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
@@ -54,170 +52,6 @@
*/
abstract class PerformanceRunner implements ConnectionEventListener
{
- abstract class ConnectionWorker
- {
- private final AtomicInteger operationsInFlight = new AtomicInteger();
-
- private volatile int count;
-
- private final AsynchronousConnection staticConnection;
-
- private final ConnectionFactory connectionFactory;
-
- private final CountDownLatch latch = new CountDownLatch(1);
-
-
-
- ConnectionWorker(final AsynchronousConnection staticConnection,
- final ConnectionFactory connectionFactory)
- {
- this.staticConnection = staticConnection;
- this.connectionFactory = connectionFactory;
- }
-
-
-
- public void operationCompleted(final AsynchronousConnection connection)
- {
- if (operationsInFlight.decrementAndGet() == 0
- && this.staticConnection == null)
- {
- connection.close();
- }
- startWork();
- }
-
-
-
- public abstract FutureResult<?> performOperation(
- final AsynchronousConnection connection,
- final DataSource[] dataSources, final long startTime);
-
-
-
- public void startWork()
- {
- if (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
- {
- if (this.staticConnection == null)
- {
- connectionFactory
- .getAsynchronousConnection(new ResultHandler<AsynchronousConnection>()
- {
- public void handleErrorResult(final ErrorResultException e)
- {
- app.println(LocalizableMessage.raw(e.getResult()
- .getDiagnosticMessage()));
- if (e.getCause() != null && app.isVerbose())
- {
- e.getCause().printStackTrace(app.getErrorStream());
- }
- stopRequested = true;
- }
-
-
-
- public void handleResult(final AsynchronousConnection result)
- {
- doWork(result);
- }
- });
- }
- else
- {
- if (!noRebind
- && this.staticConnection instanceof AuthenticatedAsynchronousConnection)
- {
- final AuthenticatedAsynchronousConnection ac =
- (AuthenticatedAsynchronousConnection) this.staticConnection;
- ac.rebind(new ResultHandler<BindResult>()
- {
- public void handleErrorResult(final ErrorResultException e)
- {
- app.println(LocalizableMessage.raw(e.getResult().toString()));
- if (e.getCause() != null && app.isVerbose())
- {
- e.getCause().printStackTrace(app.getErrorStream());
- }
- stopRequested = true;
- }
-
-
-
- public void handleResult(final BindResult result)
- {
- doWork(staticConnection);
- }
- });
- }
- else
- {
- doWork(staticConnection);
- }
- }
- }
- else
- {
- latch.countDown();
- }
- }
-
-
-
- public void waitFor() throws InterruptedException
- {
- latch.await();
- }
-
-
-
- private void doWork(final AsynchronousConnection connection)
- {
- long start;
- double sleepTimeInMS = 0;
- final int opsToPerform = isAsync ? numConcurrentTasks : numConcurrentTasks
- - operationsInFlight.get();
- for (int i = 0; i < opsToPerform; i++)
- {
- if (maxIterations > 0 && count >= maxIterations)
- {
- break;
- }
- start = System.nanoTime();
- performOperation(connection, dataSources.get(), start);
- operationRecentCount.getAndIncrement();
- operationsInFlight.getAndIncrement();
- count++;
-
- if (targetThroughput > 0)
- {
- try
- {
- if (sleepTimeInMS > 1)
- {
- Thread.sleep((long) Math.floor(sleepTimeInMS));
- }
- }
- catch (final InterruptedException e)
- {
- continue;
- }
-
- sleepTimeInMS += targetTimeInMS
- - ((System.nanoTime() - start) / 1000000.0);
- if (sleepTimeInMS < -60000)
- {
- // If we fall behind by 60 seconds, just forget about
- // catching up
- sleepTimeInMS = -60000;
- }
- }
- }
- }
- }
-
-
-
/**
* Statistics thread base implementation.
*/
@@ -566,17 +400,12 @@
class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S>
{
private final long startTime;
- private final AsynchronousConnection connection;
- private final ConnectionWorker worker;
- UpdateStatsResultHandler(final long startTime,
- final AsynchronousConnection connection, final ConnectionWorker worker)
+ UpdateStatsResultHandler(final long startTime)
{
this.startTime = startTime;
- this.connection = connection;
- this.worker = worker;
}
@@ -590,8 +419,6 @@
{
app.println(LocalizableMessage.raw(error.getResult().toString()));
}
-
- worker.operationCompleted(connection);
}
@@ -600,7 +427,6 @@
{
successRecentCount.getAndIncrement();
updateStats();
- worker.operationCompleted(connection);
}
@@ -662,7 +488,7 @@
AsynchronousConnection connection;
final double targetTimeInMS =
- (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
+ (1000.0 / (targetThroughput / (double) (numThreads * numConnections)));
double sleepTimeInMS = 0;
long start;
while (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
@@ -1001,7 +827,7 @@
private volatile boolean stopRequested;
- private int numConcurrentTasks;
+ private int numThreads;
private int numConnections;
@@ -1015,9 +841,7 @@
private int statsInterval;
- private double targetTimeInMS;
-
- private final IntegerArgument numConcurrentTasksArgument;
+ private final IntegerArgument numThreadsArgument;
private final IntegerArgument maxIterationsArgument;
@@ -1045,19 +869,19 @@
throws ArgumentException
{
this.app = app;
- numConcurrentTasksArgument = new IntegerArgument("numConcurrentTasks", 't',
- "numConcurrentTasks", false, false, true,
- LocalizableMessage.raw("{numConcurrentTasks}"), 1, null, true, 1,
+ numThreadsArgument = new IntegerArgument("numThreads", 't',
+ "numThreads", false, false, true,
+ LocalizableMessage.raw("{numThreads}"), 1, null, true, 1,
false, 0,
- LocalizableMessage.raw("Number of concurrent tasks per connection"));
- numConcurrentTasksArgument.setPropertyName("numConcurrentTasks");
+ LocalizableMessage.raw("Number of worker threads per connection"));
+ numThreadsArgument.setPropertyName("numThreads");
if (!alwaysSingleThreaded)
{
- argParser.addArgument(numConcurrentTasksArgument);
+ argParser.addArgument(numThreadsArgument);
}
else
{
- numConcurrentTasksArgument.addValue("1");
+ numThreadsArgument.addValue("1");
}
numConnectionsArgument = new IntegerArgument("numConnections", 'c',
@@ -1178,19 +1002,20 @@
public final void validate() throws ArgumentException
{
numConnections = numConnectionsArgument.getIntValue();
- numConcurrentTasks = numConcurrentTasksArgument.getIntValue();
- maxIterations = maxIterationsArgument.getIntValue() / numConnections;
+ numThreads = numThreadsArgument.getIntValue();
+ maxIterations = maxIterationsArgument.getIntValue()
+ / numConnections / numThreads;
statsInterval = statsIntervalArgument.getIntValue() * 1000;
targetThroughput = targetThroughputArgument.getIntValue();
isAsync = asyncArgument.isPresent();
noRebind = noRebindArgument.isPresent();
- if (!noRebindArgument.isPresent() && this.numConcurrentTasks > 1)
+ if (!noRebindArgument.isPresent() && this.numThreads > 1)
{
throw new ArgumentException(LocalizableMessage.raw("--"
+ noRebindArgument.getLongIdentifier() + " must be used if --"
- + numConcurrentTasksArgument.getLongIdentifier() + " is > 1"));
+ + numThreadsArgument.getLongIdentifier() + " is > 1"));
}
if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
@@ -1201,9 +1026,6 @@
}
dataSourcePrototypes = DataSource.parse(arguments.getValues());
-
- targetTimeInMS =
- (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
}
@@ -1220,7 +1042,7 @@
- abstract ConnectionWorker newConnectionWorker(
+ abstract WorkerThread newWorkerThread(
final AsynchronousConnection connection,
final ConnectionFactory connectionFactory);
@@ -1232,8 +1054,9 @@
final int run(final ConnectionFactory connectionFactory)
{
- final List<ConnectionWorker> workers = new ArrayList<ConnectionWorker>();
- final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>();
+ final List<Thread> threads = new ArrayList<Thread>();
+ final List<AsynchronousConnection> connections =
+ new ArrayList<AsynchronousConnection>();
AsynchronousConnection connection = null;
try
@@ -1246,18 +1069,20 @@
connection.addConnectionEventListener(this);
connections.add(connection);
}
- final ConnectionWorker worker = newConnectionWorker(connection,
- connectionFactory);
- workers.add(worker);
- worker.startWork();
+ for (int j = 0; j < numThreads; j++)
+ {
+ final Thread thread = newWorkerThread(connection, connectionFactory);
+ threads.add(thread);
+ thread.start();
+ }
}
final Thread statsThread = newStatsThread();
statsThread.start();
- for (final ConnectionWorker w : workers)
+ for (final Thread t : threads)
{
- w.waitFor();
+ t.join();
}
stopRequested = true;
statsThread.join();
diff --git a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java
index 2d485c1..a78c738 100644
--- a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java
+++ b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java
@@ -62,10 +62,9 @@
private final class SearchStatsHandler extends
UpdateStatsResultHandler<Result> implements SearchResultHandler
{
- private SearchStatsHandler(final long startTime,
- final AsynchronousConnection connection, final ConnectionWorker worker)
+ private SearchStatsHandler(final long startTime)
{
- super(startTime, connection, worker);
+ super(startTime);
}
@@ -121,7 +120,7 @@
- private final class SearchWorkerThread extends ConnectionWorker
+ private final class SearchWorkerThread extends WorkerThread
{
private SearchRequest sr;
@@ -162,8 +161,7 @@
sr.setFilter(String.format(filter, data));
sr.setName(String.format(baseDN, data));
}
- return connection.search(sr, new SearchStatsHandler(startTime,
- connection, this));
+ return connection.search(sr, new SearchStatsHandler(startTime));
}
}
@@ -190,7 +188,7 @@
@Override
- ConnectionWorker newConnectionWorker(
+ WorkerThread newWorkerThread(
final AsynchronousConnection connection,
final ConnectionFactory connectionFactory)
{
--
Gitblit v1.10.0