From f2160f4bd1c8ac67e5a86a6710d431e8932877f9 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 28 May 2010 11:47:51 +0000
Subject: [PATCH] Synchronize SDK on java.net with internal repository.
---
sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java | 1228 ++++++++++++++++++++++++++++++---------------------------
1 files changed, 648 insertions(+), 580 deletions(-)
diff --git a/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java b/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
index f1f4811..03c0bce 100644
--- a/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
+++ b/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
@@ -38,491 +38,23 @@
import java.util.concurrent.atomic.AtomicReference;
import org.opends.sdk.*;
+import org.opends.sdk.responses.ExtendedResult;
import org.opends.sdk.responses.Result;
import com.sun.opends.sdk.tools.AuthenticatedConnectionFactory.AuthenticatedAsynchronousConnection;
-
/**
* Benchmark application framework.
*/
-abstract class PerformanceRunner
+abstract class PerformanceRunner implements ConnectionEventListener
{
- private final AtomicInteger operationRecentCount = new AtomicInteger();
-
- private final AtomicInteger successRecentCount = new AtomicInteger();
-
- private final AtomicInteger failedRecentCount = new AtomicInteger();
-
- private final AtomicLong waitRecentTime = new AtomicLong();
-
- private final AtomicReference<ReversableArray> eTimeBuffer = new AtomicReference<ReversableArray>(
- new ReversableArray(100000));
-
- private final ConsoleApplication app;
-
- private final ThreadLocal<DataSource[]> dataSources = new ThreadLocal<DataSource[]>();
-
- private volatile boolean stopRequested;
-
- private int numThreads;
-
- private int numConnections;
-
- private int targetThroughput;
-
- private int maxIterations;
-
- private boolean isAsync;
-
- private boolean noRebind;
-
- private int statsInterval;
-
- private IntegerArgument numThreadsArgument;
-
- private IntegerArgument maxIterationsArgument;
-
- private IntegerArgument statsIntervalArgument;
-
- private IntegerArgument targetThroughputArgument;
-
- private IntegerArgument numConnectionsArgument;
-
- private IntegerArgument percentilesArgument;
-
- private BooleanArgument keepConnectionsOpen;
-
- private BooleanArgument noRebindArgument;
-
- private BooleanArgument asyncArgument;
-
- private StringArgument arguments;
-
-
-
- PerformanceRunner(ArgumentParser argParser, ConsoleApplication app)
- throws ArgumentException
- {
- this.app = app;
- numThreadsArgument = new IntegerArgument("numThreads", 't',
- "numThreads", false, false, true, LocalizableMessage.raw("{numThreads}"),
- 1, null, true, 1, false, 0, LocalizableMessage
- .raw("number of search threads per connection"));
- numThreadsArgument.setPropertyName("numThreads");
- argParser.addArgument(numThreadsArgument);
-
- numConnectionsArgument = new IntegerArgument("numConnections", 'c',
- "numConnections", false, false, true, LocalizableMessage
- .raw("{numConnections}"), 1, null, true, 1, false, 0,
- LocalizableMessage.raw("number of connections"));
- numThreadsArgument.setPropertyName("numConnections");
- argParser.addArgument(numConnectionsArgument);
-
- maxIterationsArgument = new IntegerArgument("maxIterations", 'm',
- "maxIterations", false, false, true, LocalizableMessage
- .raw("{maxIterations}"), 0, null, LocalizableMessage
- .raw("max searches per thread, 0 for unlimited"));
- numThreadsArgument.setPropertyName("maxIterations");
- argParser.addArgument(maxIterationsArgument);
-
- statsIntervalArgument = new IntegerArgument("statInterval", 'i',
- "statInterval", false, false, true, LocalizableMessage
- .raw("{statInterval}"), 5, null, true, 1, false, 0, LocalizableMessage
- .raw("Display results each specified number of seconds"));
- numThreadsArgument.setPropertyName("statInterval");
- argParser.addArgument(statsIntervalArgument);
-
- targetThroughputArgument = new IntegerArgument("targetThroughput",
- 'M', "targetThroughput", false, false, true, LocalizableMessage
- .raw("{targetThroughput}"), 0, null, LocalizableMessage
- .raw("Target average throughput to achieve"));
- targetThroughputArgument.setPropertyName("targetThroughput");
- argParser.addArgument(targetThroughputArgument);
-
- percentilesArgument = new IntegerArgument("percentile", 'e',
- "percentile", false, true, LocalizableMessage.raw("{percentile}"), true,
- 50, true, 100, LocalizableMessage.raw("Calculate max response time for a "
- + "percentile of operations"));
- percentilesArgument.setPropertyName("percentile");
- argParser.addArgument(percentilesArgument);
-
- keepConnectionsOpen = new BooleanArgument("keepConnectionsOpen",
- 'f', "keepConnectionsOpen", LocalizableMessage
- .raw("keep connections open"));
- keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
- argParser.addArgument(keepConnectionsOpen);
-
- noRebindArgument = new BooleanArgument("noRebind", 'F', "noRebind",
- LocalizableMessage.raw("keep connections open and don't rebind"));
- keepConnectionsOpen.setPropertyName("noRebind");
- argParser.addArgument(noRebindArgument);
-
- asyncArgument = new BooleanArgument("asynchronous", 'A',
- "asynchronous", LocalizableMessage.raw("asynch, don't wait for results"));
- keepConnectionsOpen.setPropertyName("asynchronous");
- argParser.addArgument(asyncArgument);
-
- arguments = new StringArgument(
- "arguments",
- 'g',
- "arguments",
- false,
- true,
- true,
- LocalizableMessage.raw("{arguments}"),
- null,
- null,
- LocalizableMessage
- .raw("arguments for variables in the filter and/or base DN"));
- arguments.setPropertyName("arguments");
- argParser.addArgument(arguments);
- }
-
-
-
- public void validate() throws ArgumentException
- {
- numConnections = numConnectionsArgument.getIntValue();
- numThreads = numThreadsArgument.getIntValue();
- maxIterations = maxIterationsArgument.getIntValue();
- statsInterval = statsIntervalArgument.getIntValue() * 1000;
- targetThroughput = targetThroughputArgument.getIntValue();
-
- isAsync = asyncArgument.isPresent();
- noRebind = noRebindArgument.isPresent();
-
- if (!noRebindArgument.isPresent() && this.numThreads > 1)
- {
- throw new ArgumentException(LocalizableMessage.raw("--"
- + noRebindArgument.getLongIdentifier()
- + " must be used if --"
- + numThreadsArgument.getLongIdentifier() + " is > 1"));
- }
-
- if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
- {
- throw new ArgumentException(LocalizableMessage.raw("--"
- + noRebindArgument.getLongIdentifier()
- + " must be used when using --"
- + asyncArgument.getLongIdentifier()));
- }
-
- try
- {
- DataSource.parse(arguments.getValues());
- }
- catch (IOException ioe)
- {
- throw new ArgumentException(LocalizableMessage
- .raw("Error occured while parsing arguments: "
- + ioe.toString()));
- }
- }
-
-
-
- final int run(ConnectionFactory connectionFactory)
- {
- List<Thread> threads = new ArrayList<Thread>();
-
- AsynchronousConnection connection = null;
- Thread thread;
- try
- {
- for (int i = 0; i < numConnections; i++)
- {
- if (keepConnectionsOpen.isPresent()
- || noRebindArgument.isPresent())
- {
- connection = connectionFactory.getAsynchronousConnection(
- null).get();
- }
- for (int j = 0; j < numThreads; j++)
- {
- thread = newWorkerThread(connection, connectionFactory);
-
- threads.add(thread);
- thread.start();
- }
- }
-
- Thread statsThread = newStatsThread();
- statsThread.start();
-
- for (Thread t : threads)
- {
- t.join();
- }
- stopRequested = true;
- statsThread.join();
- }
- catch (InterruptedException e)
- {
- stopRequested = true;
- }
- catch (ErrorResultException e)
- {
- stopRequested = true;
- app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
- }
-
- return 0;
- }
-
-
-
- final DataSource[] getDataSources()
- {
- try
- {
- return DataSource.parse(arguments.getValues());
- }
- catch (IOException ioe)
- {
- // Ignore as this shouldn've been handled eariler
- }
- return new DataSource[0];
- }
-
-
-
- abstract WorkerThread<?> newWorkerThread(
- AsynchronousConnection connection,
- ConnectionFactory connectionFactory);
-
-
-
- abstract StatsThread newStatsThread();
-
-
-
- class UpdateStatsResultHandler<S extends Result> implements
- ResultHandler<S>
- {
- private long eTime;
-
-
-
- UpdateStatsResultHandler(long eTime)
- {
- this.eTime = eTime;
- }
-
-
-
- public void handleResult(S result)
- {
- successRecentCount.getAndIncrement();
- eTime = System.nanoTime() - eTime;
- waitRecentTime.getAndAdd(eTime);
- synchronized (this)
- {
- ReversableArray array = eTimeBuffer.get();
- if (array.remaining() == 0)
- {
- array.set(array.size() - 1, eTime);
- }
- else
- {
- array.append(eTime);
- }
- }
- }
-
-
-
- public void handleErrorResult(ErrorResultException error)
- {
- failedRecentCount.getAndIncrement();
- app.println(LocalizableMessage.raw(error.getResult().toString()));
- }
-
-
-
- public long getETime()
- {
- return eTime;
- }
- }
-
-
-
- abstract class WorkerThread<R extends ResultHandler<?>> extends
- Thread
- {
- private int count;
-
- private final AsynchronousConnection connection;
-
- private final ConnectionFactory connectionFactory;
-
-
-
- WorkerThread(AsynchronousConnection connection,
- ConnectionFactory connectionFactory)
- {
- super("Worker Thread");
- this.connection = connection;
- this.connectionFactory = connectionFactory;
- }
-
-
-
- public abstract FutureResult<?> performOperation(
- AsynchronousConnection connection, R handler,
- DataSource[] dataSources);
-
-
-
- public abstract R getHandler(long startTime);
-
-
-
- public void run()
- {
- if (dataSources.get() == null)
- {
- try
- {
- dataSources.set(DataSource.parse(arguments.getValues()));
- }
- catch (IOException ioe)
- {
- // Ignore as this shouldn've been handled eariler
- }
- }
-
- FutureResult<?> future;
- AsynchronousConnection connection;
- R handler;
-
- double targetTimeInMS = (1.0 / (targetThroughput / (numThreads * numConnections))) * 1000.0;
- double sleepTimeInMS = 0;
- long start;
- while (!stopRequested
- && !(maxIterations > 0 && count >= maxIterations))
- {
- start = System.nanoTime();
- handler = getHandler(start);
-
- if (this.connection == null)
- {
- try
- {
- connection = connectionFactory.getAsynchronousConnection(
- null).get();
- }
- catch (InterruptedException e)
- {
- // Ignore and check stop requested
- continue;
- }
- catch (ErrorResultException e)
- {
- app.println(LocalizableMessage.raw(e.getResult()
- .getDiagnosticMessage()));
- if (e.getCause() != null && app.isVerbose())
- {
- e.getCause().printStackTrace(app.getErrorStream());
- }
- stopRequested = true;
- break;
- }
- }
- else
- {
- connection = this.connection;
- if (!noRebind
- && connection instanceof AuthenticatedAsynchronousConnection)
- {
- AuthenticatedAsynchronousConnection ac = (AuthenticatedAsynchronousConnection) connection;
- try
- {
- ac.rebind(null).get();
- }
- catch (InterruptedException e)
- {
- // Ignore and check stop requested
- continue;
- }
- catch (ErrorResultException e)
- {
- app.println(LocalizableMessage.raw(e.getResult().toString()));
- if (e.getCause() != null && app.isVerbose())
- {
- e.getCause().printStackTrace(app.getErrorStream());
- }
- stopRequested = true;
- break;
- }
- }
- }
- future = performOperation(connection, handler, dataSources
- .get());
- operationRecentCount.getAndIncrement();
- count++;
- if (!isAsync)
- {
- try
- {
- future.get();
- }
- catch (InterruptedException e)
- {
- // Ignore and check stop requested
- continue;
- }
- catch (ErrorResultException e)
- {
- if (e.getCause() instanceof IOException)
- {
- e.getCause().printStackTrace(app.getErrorStream());
- stopRequested = true;
- break;
- }
- // Ignore. Handled by result handler
- }
- if (this.connection == null)
- {
- connection.close();
- }
- }
- if (targetThroughput > 0)
- {
- try
- {
- if (sleepTimeInMS > 1)
- {
- sleep((long) Math.floor(sleepTimeInMS));
- }
- }
- catch (InterruptedException e)
- {
- continue;
- }
-
- sleepTimeInMS += targetTimeInMS
- - ((System.nanoTime() - start) / 1000000.0);
- if (sleepTimeInMS < -60000)
- {
- // If we fall behind by 60 seconds, just forget about
- // catching up
- sleepTimeInMS = -60000;
- }
- }
- }
- }
- }
-
-
-
+ /**
+ * Statistics thread base implementation.
+ */
class StatsThread extends Thread
{
- protected final String[] EMPTY_STRINGS = new String[0];
-
private final MultiColumnPrinter printer;
private final List<GarbageCollectorMXBean> beans;
@@ -561,10 +93,10 @@
- public StatsThread(String[] additionalColumns)
+ public StatsThread(final String[] additionalColumns)
{
super("Stats Thread");
- TreeSet<Double> pSet = new TreeSet<Double>();
+ final TreeSet<Double> pSet = new TreeSet<Double>();
if (!percentilesArgument.isPresent())
{
pSet.add(.1);
@@ -573,14 +105,14 @@
}
else
{
- for (String percentile : percentilesArgument.getValues())
+ for (final String percentile : percentilesArgument.getValues())
{
pSet.add(100.0 - Double.parseDouble(percentile));
}
}
this.percentiles = pSet.descendingSet();
- numColumns = 5 + this.percentiles.size()
- + additionalColumns.length + (isAsync ? 1 : 0);
+ numColumns = 5 + this.percentiles.size() + additionalColumns.length
+ + (isAsync ? 1 : 0);
printer = new MultiColumnPrinter(numColumns, 2, "-",
MultiColumnPrinter.RIGHT, app);
printer.setTitleAlign(MultiColumnPrinter.RIGHT);
@@ -607,7 +139,7 @@
title[2] = "recent";
title[3] = "average";
int i = 4;
- for (Double percentile : this.percentiles)
+ for (final Double percentile : this.percentiles)
{
title[i++] = Double.toString(100.0 - percentile) + "%";
}
@@ -616,7 +148,7 @@
{
title[i++] = "req/res";
}
- for (String column : additionalColumns)
+ for (final String column : additionalColumns)
{
title[i++] = column;
}
@@ -628,24 +160,17 @@
- String[] getAdditionalColumns()
- {
- return EMPTY_STRINGS;
- }
-
-
-
@Override
public void run()
{
printer.printTitle();
- String[] strings = new String[numColumns];
+ final String[] strings = new String[numColumns];
- long startTime = System.currentTimeMillis();
+ final long startTime = System.currentTimeMillis();
long statTime = startTime;
long gcDuration = 0;
- for (GarbageCollectorMXBean bean : beans)
+ for (final GarbageCollectorMXBean bean : beans)
{
gcDuration += bean.getCollectionTime();
}
@@ -655,7 +180,7 @@
{
sleep(statsInterval);
}
- catch (InterruptedException ie)
+ catch (final InterruptedException ie)
{
// Ignore.
}
@@ -665,7 +190,7 @@
lastGCDuration = gcDuration;
gcDuration = 0;
- for (GarbageCollectorMXBean bean : beans)
+ for (final GarbageCollectorMXBean bean : beans)
{
gcDuration += bean.getCollectionTime();
}
@@ -684,19 +209,31 @@
averageDuration -= gcDuration;
recentDuration /= 1000.0;
averageDuration /= 1000.0;
- strings[0] = String.format("%.1f", successCount
- / recentDuration);
- strings[1] = String.format("%.1f", totalSuccessCount
- / averageDuration);
- strings[2] = String.format("%.3f",
- (waitTime - (gcDuration - lastGCDuration)) / successCount
- / 1000000.0);
- strings[3] = String.format("%.3f", (totalWaitTime - gcDuration)
- / totalSuccessCount / 1000000.0);
+ strings[0] = String.format("%.1f", successCount / recentDuration);
+ strings[1] = String.format("%.1f", totalSuccessCount / averageDuration);
+ if (successCount > 0)
+ {
+ strings[2] = String.format("%.3f",
+ (waitTime - (gcDuration - lastGCDuration)) / successCount
+ / 1000000.0);
+ }
+ else
+ {
+ strings[2] = "-";
+ }
+ if (totalSuccessCount > 0)
+ {
+ strings[3] = String.format("%.3f", (totalWaitTime - gcDuration)
+ / totalSuccessCount / 1000000.0);
+ }
+ else
+ {
+ strings[3] = "-";
+ }
boolean changed = false;
etimes = eTimeBuffer.getAndSet(etimes);
- int appendLength = Math.min(array.remaining(), etimes.size());
+ final int appendLength = Math.min(array.remaining(), etimes.size());
if (appendLength > 0)
{
array.append(etimes, appendLength);
@@ -737,36 +274,288 @@
// Now everything is ordered from smallest to largest
int index;
int i = 4;
- for (Double percent : percentiles)
+ for (final Double percent : percentiles)
{
- index = array.size()
- - (int) Math.floor((percent / 100.0) * totalSuccessCount)
- - 1;
- if (index < 0)
+ if (array.size() <= 0)
{
- strings[i++] = String.format("*%.3f",
- array.get(0) / 1000000.0);
+ strings[i++] = "-";
}
else
{
- strings[i++] = String.format("%.3f",
- array.get(index) / 1000000.0);
+ index = array.size()
+ - (int) Math.floor((percent / 100.0) * totalSuccessCount) - 1;
+ if (index < 0)
+ {
+ strings[i++] = String.format("*%.3f", array.get(0) / 1000000.0);
+ }
+ else
+ {
+ strings[i++] = String
+ .format("%.3f", array.get(index) / 1000000.0);
+ }
}
}
- strings[i++] = String.format("%.1f", totalFailedCount
- / averageDuration);
+ strings[i++] = String
+ .format("%.1f", totalFailedCount / averageDuration);
if (isAsync)
{
- strings[i++] = String.format("%.1f", (double) searchCount
- / successCount);
+ if (successCount > 0)
+ {
+ strings[i++] = String.format("%.1f", (double) searchCount
+ / successCount);
+ }
+ else
+ {
+ strings[i++] = "-";
+ }
}
- for (String column : getAdditionalColumns())
+ for (final String column : getAdditionalColumns())
{
strings[i++] = column;
}
printer.printRow(strings);
}
}
+
+
+
+ String[] getAdditionalColumns()
+ {
+ return EMPTY_STRINGS;
+ }
+ }
+
+
+
+ /**
+ * Statistics update result handler implementation.
+ *
+ * @param <S>
+ * The type of expected result.
+ */
+ class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S>
+ {
+ private long eTime;
+
+
+
+ UpdateStatsResultHandler(final long eTime)
+ {
+ this.eTime = eTime;
+ }
+
+
+
+ public long getETime()
+ {
+ return eTime;
+ }
+
+
+
+ public void handleErrorResult(final ErrorResultException error)
+ {
+ failedRecentCount.getAndIncrement();
+ app.println(LocalizableMessage.raw(error.getResult().toString()));
+ }
+
+
+
+ public void handleResult(final S result)
+ {
+ successRecentCount.getAndIncrement();
+ eTime = System.nanoTime() - eTime;
+ waitRecentTime.getAndAdd(eTime);
+ synchronized (this)
+ {
+ final ReversableArray array = eTimeBuffer.get();
+ if (array.remaining() == 0)
+ {
+ array.set(array.size() - 1, eTime);
+ }
+ else
+ {
+ array.append(eTime);
+ }
+ }
+ }
+ }
+
+
+
+ /**
+ * Worker thread base implementation.
+ *
+ * @param <R>
+ * Type of result handler.
+ */
+ abstract class WorkerThread<R extends ResultHandler<?>> extends Thread
+ {
+ private int count;
+
+ private final AsynchronousConnection connection;
+
+ private final ConnectionFactory connectionFactory;
+
+
+
+ WorkerThread(final AsynchronousConnection connection,
+ final ConnectionFactory connectionFactory)
+ {
+ super("Worker Thread");
+ this.connection = connection;
+ this.connectionFactory = connectionFactory;
+ }
+
+
+
+ public abstract R getHandler(long startTime);
+
+
+
+ public abstract FutureResult<?> performOperation(
+ AsynchronousConnection connection, R handler, DataSource[] dataSources);
+
+
+
+ @Override
+ public void run()
+ {
+ if (dataSources.get() == null)
+ {
+ try
+ {
+ dataSources.set(DataSource.parse(arguments.getValues()));
+ }
+ catch (final IOException ioe)
+ {
+ // Ignore as this shouldn've been handled eariler
+ }
+ }
+
+ FutureResult<?> future;
+ AsynchronousConnection connection;
+ R handler;
+
+ final double targetTimeInMS =
+ (1.0 / (targetThroughput / (numThreads * numConnections))) * 1000.0;
+ double sleepTimeInMS = 0;
+ long start;
+ while (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
+ {
+ start = System.nanoTime();
+ handler = getHandler(start);
+
+ if (this.connection == null)
+ {
+ try
+ {
+ connection = connectionFactory.getAsynchronousConnection(null)
+ .get();
+ }
+ catch (final InterruptedException e)
+ {
+ // Ignore and check stop requested
+ continue;
+ }
+ catch (final ErrorResultException e)
+ {
+ app.println(LocalizableMessage.raw(e.getResult()
+ .getDiagnosticMessage()));
+ if (e.getCause() != null && app.isVerbose())
+ {
+ e.getCause().printStackTrace(app.getErrorStream());
+ }
+ stopRequested = true;
+ break;
+ }
+ }
+ else
+ {
+ connection = this.connection;
+ if (!noRebind
+ && connection instanceof AuthenticatedAsynchronousConnection)
+ {
+ final AuthenticatedAsynchronousConnection ac =
+ (AuthenticatedAsynchronousConnection) connection;
+ try
+ {
+ ac.rebind(null).get();
+ }
+ catch (final InterruptedException e)
+ {
+ // Ignore and check stop requested
+ continue;
+ }
+ catch (final ErrorResultException e)
+ {
+ app.println(LocalizableMessage.raw(e.getResult().toString()));
+ if (e.getCause() != null && app.isVerbose())
+ {
+ e.getCause().printStackTrace(app.getErrorStream());
+ }
+ stopRequested = true;
+ break;
+ }
+ }
+ }
+ future = performOperation(connection, handler, dataSources.get());
+ operationRecentCount.getAndIncrement();
+ count++;
+ if (!isAsync)
+ {
+ try
+ {
+ future.get();
+ }
+ catch (final InterruptedException e)
+ {
+ // Ignore and check stop requested
+ continue;
+ }
+ catch (final ErrorResultException e)
+ {
+ if (e.getCause() instanceof IOException)
+ {
+ e.getCause().printStackTrace(app.getErrorStream());
+ stopRequested = true;
+ break;
+ }
+ // Ignore. Handled by result handler
+ }
+ finally
+ {
+ if (this.connection == null)
+ {
+ connection.close();
+ }
+ }
+ }
+ if (targetThroughput > 0)
+ {
+ try
+ {
+ if (sleepTimeInMS > 1)
+ {
+ sleep((long) Math.floor(sleepTimeInMS));
+ }
+ }
+ catch (final InterruptedException e)
+ {
+ continue;
+ }
+
+ sleepTimeInMS += targetTimeInMS
+ - ((System.nanoTime() - start) / 1000000.0);
+ if (sleepTimeInMS < -60000)
+ {
+ // If we fall behind by 60 seconds, just forget about
+ // catching up
+ sleepTimeInMS = -60000;
+ }
+ }
+ }
+ }
}
@@ -781,64 +570,14 @@
- public ReversableArray(int capacity)
+ public ReversableArray(final int capacity)
{
this.array = new long[capacity];
}
- public void set(int index, long value)
- {
- if (index >= size)
- {
- throw new IndexOutOfBoundsException();
- }
- if (!reversed)
- {
- array[index] = value;
- }
- else
- {
- array[size - index - 1] = value;
- }
- }
-
-
-
- public long get(int index)
- {
- if (index >= size)
- {
- throw new IndexOutOfBoundsException();
- }
- if (!reversed)
- {
- return array[index];
- }
- else
- {
- return array[size - index - 1];
- }
- }
-
-
-
- public int size()
- {
- return size;
- }
-
-
-
- public void reverse()
- {
- reversed = !reversed;
- }
-
-
-
- public void append(long value)
+ public void append(final long value)
{
if (size == array.length)
{
@@ -859,7 +598,7 @@
- public void append(ReversableArray a, int length)
+ public void append(final ReversableArray a, final int length)
{
if (length > a.size() || length > remaining())
{
@@ -879,13 +618,6 @@
- public int remaining()
- {
- return array.length - size;
- }
-
-
-
public void clear()
{
size = 0;
@@ -893,7 +625,57 @@
- public void siftDown(int start, int end)
+ public long get(final int index)
+ {
+ if (index >= size)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ if (!reversed)
+ {
+ return array[index];
+ }
+ else
+ {
+ return array[size - index - 1];
+ }
+ }
+
+
+
+ public int remaining()
+ {
+ return array.length - size;
+ }
+
+
+
+ public void reverse()
+ {
+ reversed = !reversed;
+ }
+
+
+
+ public void set(final int index, final long value)
+ {
+ if (index >= size)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ if (!reversed)
+ {
+ array[index] = value;
+ }
+ else
+ {
+ array[size - index - 1] = value;
+ }
+ }
+
+
+
+ public void siftDown(final int start, final int end)
{
int root = start;
int child;
@@ -918,7 +700,7 @@
- public void siftUp(int start, int end)
+ public void siftUp(final int start, final int end)
{
int child = end;
int parent;
@@ -939,11 +721,297 @@
- private void swap(int i, int i2)
+ public int size()
{
- long temp = get(i);
+ return size;
+ }
+
+
+
+ private void swap(final int i, final int i2)
+ {
+ final long temp = get(i);
set(i, get(i2));
set(i2, temp);
}
}
+
+
+
+ private static final String[] EMPTY_STRINGS = new String[0];
+
+ private final AtomicInteger operationRecentCount = new AtomicInteger();
+
+ private final AtomicInteger successRecentCount = new AtomicInteger();
+
+ private final AtomicInteger failedRecentCount = new AtomicInteger();
+
+ private final AtomicLong waitRecentTime = new AtomicLong();
+
+ private final AtomicReference<ReversableArray> eTimeBuffer = new AtomicReference<ReversableArray>(
+ new ReversableArray(100000));
+
+ private final ConsoleApplication app;
+
+ private final ThreadLocal<DataSource[]> dataSources = new ThreadLocal<DataSource[]>();
+
+ private volatile boolean stopRequested;
+
+ private int numThreads;
+
+ private int numConnections;
+
+ private int targetThroughput;
+
+ private int maxIterations;
+
+ private boolean isAsync;
+
+ private boolean noRebind;
+
+ private int statsInterval;
+
+ private final IntegerArgument numThreadsArgument;
+
+ private final IntegerArgument maxIterationsArgument;
+
+ private final IntegerArgument statsIntervalArgument;
+
+ private final IntegerArgument targetThroughputArgument;
+
+ private final IntegerArgument numConnectionsArgument;
+
+ private final IntegerArgument percentilesArgument;
+
+ private final BooleanArgument keepConnectionsOpen;
+
+ private final BooleanArgument noRebindArgument;
+
+ private final BooleanArgument asyncArgument;
+
+ private final StringArgument arguments;
+
+
+
+ PerformanceRunner(final ArgumentParser argParser, final ConsoleApplication app)
+ throws ArgumentException
+ {
+ this.app = app;
+ numThreadsArgument = new IntegerArgument("numThreads", 't', "numThreads",
+ false, false, true, LocalizableMessage.raw("{numThreads}"), 1, null,
+ true, 1, false, 0, LocalizableMessage
+ .raw("number of search threads per connection"));
+ numThreadsArgument.setPropertyName("numThreads");
+ argParser.addArgument(numThreadsArgument);
+
+ numConnectionsArgument = new IntegerArgument("numConnections", 'c',
+ "numConnections", false, false, true, LocalizableMessage
+ .raw("{numConnections}"), 1, null, true, 1, false, 0,
+ LocalizableMessage.raw("number of connections"));
+ numThreadsArgument.setPropertyName("numConnections");
+ argParser.addArgument(numConnectionsArgument);
+
+ maxIterationsArgument = new IntegerArgument("maxIterations", 'm',
+ "maxIterations", false, false, true, LocalizableMessage
+ .raw("{maxIterations}"), 0, null, LocalizableMessage
+ .raw("max searches per thread, 0 for unlimited"));
+ numThreadsArgument.setPropertyName("maxIterations");
+ argParser.addArgument(maxIterationsArgument);
+
+ statsIntervalArgument = new IntegerArgument("statInterval", 'i',
+ "statInterval", false, false, true, LocalizableMessage
+ .raw("{statInterval}"), 5, null, true, 1, false, 0,
+ LocalizableMessage
+ .raw("Display results each specified number of seconds"));
+ numThreadsArgument.setPropertyName("statInterval");
+ argParser.addArgument(statsIntervalArgument);
+
+ targetThroughputArgument = new IntegerArgument("targetThroughput", 'M',
+ "targetThroughput", false, false, true, LocalizableMessage
+ .raw("{targetThroughput}"), 0, null, LocalizableMessage
+ .raw("Target average throughput to achieve"));
+ targetThroughputArgument.setPropertyName("targetThroughput");
+ argParser.addArgument(targetThroughputArgument);
+
+ percentilesArgument = new IntegerArgument("percentile", 'e', "percentile",
+ false, true, LocalizableMessage.raw("{percentile}"), true, 50, true,
+ 100, LocalizableMessage.raw("Calculate max response time for a "
+ + "percentile of operations"));
+ percentilesArgument.setPropertyName("percentile");
+ argParser.addArgument(percentilesArgument);
+
+ keepConnectionsOpen = new BooleanArgument("keepConnectionsOpen", 'f',
+ "keepConnectionsOpen", LocalizableMessage.raw("keep connections open"));
+ keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
+ argParser.addArgument(keepConnectionsOpen);
+
+ noRebindArgument = new BooleanArgument("noRebind", 'F', "noRebind",
+ LocalizableMessage.raw("keep connections open and don't rebind"));
+ keepConnectionsOpen.setPropertyName("noRebind");
+ argParser.addArgument(noRebindArgument);
+
+ asyncArgument = new BooleanArgument("asynchronous", 'A', "asynchronous",
+ LocalizableMessage.raw("asynch, don't wait for results"));
+ keepConnectionsOpen.setPropertyName("asynchronous");
+ argParser.addArgument(asyncArgument);
+
+ arguments = new StringArgument("arguments", 'g', "arguments", false, true,
+ true, LocalizableMessage.raw("{arguments}"), null, null,
+ LocalizableMessage
+ .raw("arguments for variables in the filter and/or base DN"));
+ arguments.setPropertyName("arguments");
+ argParser.addArgument(arguments);
+ }
+
+
+
+ public void connectionClosed()
+ {
+ // Ignore
+ }
+
+
+
+ public synchronized void connectionErrorOccurred(
+ final boolean isDisconnectNotification, final ErrorResultException error)
+ {
+ if (!stopRequested)
+ {
+ app.println(LocalizableMessage.raw("Error occurred on one or more "
+ + "connections: " + error.getResult().toString()));
+ if (error.getCause() != null && app.isVerbose())
+ {
+ error.getCause().printStackTrace(app.getErrorStream());
+ }
+ stopRequested = true;
+ }
+ }
+
+
+
+ public void connectionReceivedUnsolicitedNotification(
+ final ExtendedResult notification)
+ {
+ // Ignore
+ }
+
+
+
+ public void validate() throws ArgumentException
+ {
+ numConnections = numConnectionsArgument.getIntValue();
+ numThreads = numThreadsArgument.getIntValue();
+ maxIterations = maxIterationsArgument.getIntValue();
+ statsInterval = statsIntervalArgument.getIntValue() * 1000;
+ targetThroughput = targetThroughputArgument.getIntValue();
+
+ isAsync = asyncArgument.isPresent();
+ noRebind = noRebindArgument.isPresent();
+
+ if (!noRebindArgument.isPresent() && this.numThreads > 1)
+ {
+ throw new ArgumentException(LocalizableMessage.raw("--"
+ + noRebindArgument.getLongIdentifier() + " must be used if --"
+ + numThreadsArgument.getLongIdentifier() + " is > 1"));
+ }
+
+ if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
+ {
+ throw new ArgumentException(LocalizableMessage.raw("--"
+ + noRebindArgument.getLongIdentifier()
+ + " must be used when using --" + asyncArgument.getLongIdentifier()));
+ }
+
+ try
+ {
+ DataSource.parse(arguments.getValues());
+ }
+ catch (final IOException ioe)
+ {
+ throw new ArgumentException(LocalizableMessage
+ .raw("Error occured while parsing arguments: " + ioe.toString()));
+ }
+ }
+
+
+
+ final DataSource[] getDataSources()
+ {
+ try
+ {
+ return DataSource.parse(arguments.getValues());
+ }
+ catch (final IOException ioe)
+ {
+ // Ignore as this shouldn've been handled eariler
+ }
+ return new DataSource[0];
+ }
+
+
+
+ abstract StatsThread newStatsThread();
+
+
+
+ abstract WorkerThread<?> newWorkerThread(AsynchronousConnection connection,
+ ConnectionFactory connectionFactory);
+
+
+
+ final int run(final ConnectionFactory connectionFactory)
+ {
+ final List<Thread> threads = new ArrayList<Thread>();
+ final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>();
+
+ AsynchronousConnection connection = null;
+ Thread thread;
+ try
+ {
+ for (int i = 0; i < numConnections; i++)
+ {
+ if (keepConnectionsOpen.isPresent() || noRebindArgument.isPresent())
+ {
+ connection = connectionFactory.getAsynchronousConnection(null).get();
+ connection.addConnectionEventListener(this);
+ connections.add(connection);
+ }
+ for (int j = 0; j < numThreads; j++)
+ {
+ thread = newWorkerThread(connection, connectionFactory);
+
+ threads.add(thread);
+ thread.start();
+ }
+ }
+
+ final Thread statsThread = newStatsThread();
+ statsThread.start();
+
+ for (final Thread t : threads)
+ {
+ t.join();
+ }
+ stopRequested = true;
+ statsThread.join();
+ }
+ catch (final InterruptedException e)
+ {
+ stopRequested = true;
+ }
+ catch (final ErrorResultException e)
+ {
+ stopRequested = true;
+ app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
+ }
+ finally
+ {
+ for (final AsynchronousConnection c : connections)
+ {
+ c.close();
+ }
+ }
+
+ return 0;
+ }
}
--
Gitblit v1.10.0