/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2014 ForgeRock AS.
*/
package com.forgerock.opendj.ldap.tools;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.Connection;
import org.forgerock.opendj.ldap.ConnectionEventListener;
import org.forgerock.opendj.ldap.ConnectionFactory;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultHandler;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.util.promise.Promise;
import com.forgerock.opendj.cli.ArgumentException;
import com.forgerock.opendj.cli.ArgumentParser;
import com.forgerock.opendj.cli.AuthenticatedConnectionFactory.AuthenticatedConnection;
import com.forgerock.opendj.cli.BooleanArgument;
import com.forgerock.opendj.cli.ConsoleApplication;
import com.forgerock.opendj.cli.IntegerArgument;
import com.forgerock.opendj.cli.MultiColumnPrinter;
import com.forgerock.opendj.cli.StringArgument;
import com.forgerock.opendj.util.StaticUtils;
import static java.util.concurrent.TimeUnit.*;
import static org.forgerock.util.Utils.*;
import static com.forgerock.opendj.ldap.tools.ToolsMessages.*;
/**
* Benchmark application framework.
*/
abstract class PerformanceRunner implements ConnectionEventListener {
static final class ResponseTimeBuckets {
private static final long NS_1_US = NANOSECONDS.convert(1, MICROSECONDS);
private static final long NS_100_US = NANOSECONDS.convert(100, MICROSECONDS);
private static final long NS_1_MS = NANOSECONDS.convert(1, MILLISECONDS);
private static final long NS_10_MS = NANOSECONDS.convert(10, MILLISECONDS);
private static final long NS_100_MS = NANOSECONDS.convert(100, MILLISECONDS);
private static final long NS_1_S = NANOSECONDS.convert(1, SECONDS);
private static final long NS_5_S = NANOSECONDS.convert(5, SECONDS);
private static final int NB_INDEX = 2120;
private static final int RANGE_100_MICROSECONDS_START_INDEX = 1000;
private static final int RANGE_1_MILLISECOND_START_INDEX = 1090;
private static final int RANGE_100_MILLISECONDS_START_INDEX = 2080;
/**
* Array of response time buckets.
*
*
* index 0 -> 999: 1000 buckets for 0ms - 1ms interval with 1 µs increments
* index 1000 -> 1089: 90 buckets for 1ms - 10ms interval with 100 µs increments
* index 1090 -> 2079: 990 buckets for 10ms - 1000ms interval with 1000 µs increments
* index 2080 -> 2119: 40 buckets for 1000ms - 5000ms interval with 100 000 µs increments
*
*/
private final AtomicLong[] index2Frequency = new AtomicLong[NB_INDEX];
/**
* Store the lower bounds (in microseconds) of the eTime buckets.
*/
private final long[] index2Etime = new long[NB_INDEX];
/**
* Sorted map used for storing response times from 5s+ with 500 millisecond increments.
*
* Keys (Long in microseconds) of this map must respect this pattern: n * 500 000 + 5 000 000,
* where n is a natural integer.
*/
private final ConcurrentSkipListMap bigEtimes = new ConcurrentSkipListMap();
/**
* Initialize both index2Frequency and index2Etime arrays.
*/
private ResponseTimeBuckets() {
// Helpful variables to compute index2Etime values.
long rangeWidthMicroSecs;
long rangeStart = 0;
long initialTimeMicroSecs = 0;
for (int i = 0; i < NB_INDEX; i++) {
index2Frequency[i] = new AtomicLong();
if (i < RANGE_100_MICROSECONDS_START_INDEX) {
// 0ms-1ms in 1 us increments
rangeWidthMicroSecs = 1;
} else if (i < RANGE_1_MILLISECOND_START_INDEX) {
// 1ms-10ms in 100 us increments
rangeWidthMicroSecs = 100;
rangeStart = RANGE_100_MICROSECONDS_START_INDEX;
initialTimeMicroSecs = MICROSECONDS.convert(1, MILLISECONDS);
} else if (i < RANGE_100_MILLISECONDS_START_INDEX) {
// 10ms-1000ms in 1000 us increments
rangeWidthMicroSecs = MICROSECONDS.convert(1, MILLISECONDS);
rangeStart = RANGE_1_MILLISECOND_START_INDEX;
initialTimeMicroSecs = MICROSECONDS.convert(10, MILLISECONDS);
} else {
// 1000ms-5000ms with 100 000 us increments
rangeWidthMicroSecs = MICROSECONDS.convert(100, MILLISECONDS);
rangeStart = RANGE_100_MILLISECONDS_START_INDEX;
initialTimeMicroSecs = MICROSECONDS.convert(1, SECONDS);
}
index2Etime[i] = (i - rangeStart) * rangeWidthMicroSecs + initialTimeMicroSecs;
}
}
/**
* Compute the closest response time values for each percentile given in
* parameter. Percentiles array has to be sorted from lower to higher
* percentiles.
*
* @param percentiles
* array of {@code double}
*
* @param nbData
* number of response times recorded.
*
* @return array of response times in microseconds corresponding to
* percentiles.
*/
List getPercentile(double[] percentiles, long nbData) {
List responseTimes = new ArrayList();
Queue nbDataThresholds = new LinkedList();
long nbDataSum = nbData;
for (int i = percentiles.length - 1; i >= 0; i--) {
nbDataThresholds.add((long) (percentiles[i] * nbData) / 100);
}
Iterator> iter = bigEtimes.descendingMap().entrySet().iterator();
while (iter.hasNext() && !nbDataThresholds.isEmpty()) {
Entry currentETime = iter.next();
nbDataSum -= currentETime.getValue().get();
computePercentiles(nbDataThresholds, responseTimes, nbDataSum, currentETime.getKey());
}
int stdTimeIndex = NB_INDEX - 1;
while (stdTimeIndex >= 0 && !nbDataThresholds.isEmpty()) {
long currentETime = index2Etime[stdTimeIndex];
nbDataSum -= index2Frequency[stdTimeIndex].get();
computePercentiles(nbDataThresholds, responseTimes, nbDataSum, currentETime);
stdTimeIndex--;
}
return responseTimes;
}
private void computePercentiles(Queue currentDataThreshold, List responseTimes, long currentSum,
long currentETime) {
while (currentDataThreshold.peek() != null && currentDataThreshold.peek() >= currentSum) {
responseTimes.add(currentETime);
currentDataThreshold.poll();
}
}
void addTimeToInterval(long responseTimeNanoSecs) {
if (responseTimeNanoSecs >= NS_5_S) {
long matchingKey = responseTimeNanoSecs / NS_100_MS;
matchingKey -= matchingKey % 5;
matchingKey = matchingKey * MICROSECONDS.convert(100, MILLISECONDS);
// We now have a key corresponding to pattern 5 000 000 + n * 500 000 µs
AtomicLong existingKey = bigEtimes.putIfAbsent(matchingKey, new AtomicLong(1));
if (existingKey != null) {
existingKey.getAndIncrement();
}
return;
}
final int startRangeIndex;
final long rangeWidthNanoSecs;
if (responseTimeNanoSecs < NS_1_MS) {
rangeWidthNanoSecs = NS_1_US;
startRangeIndex = 0;
} else if (responseTimeNanoSecs < NS_10_MS) {
rangeWidthNanoSecs = NS_100_US;
startRangeIndex = RANGE_100_MICROSECONDS_START_INDEX - 10;
} else if (responseTimeNanoSecs < NS_1_S) {
rangeWidthNanoSecs = NS_1_MS;
startRangeIndex = RANGE_1_MILLISECOND_START_INDEX - 10;
} else {
rangeWidthNanoSecs = NS_100_MS;
startRangeIndex = RANGE_100_MILLISECONDS_START_INDEX - 10;
}
final int intervalIndex = ((int) (responseTimeNanoSecs / rangeWidthNanoSecs)) + startRangeIndex;
index2Frequency[intervalIndex].getAndIncrement();
}
}
//To allow tests
static ResponseTimeBuckets getResponseTimeBuckets() {
return new ResponseTimeBuckets();
}
/**
* Statistics thread base implementation.
*/
class StatsThread extends Thread {
private final String[] additionalColumns;
private final List beans;
private final double[] percentiles;
private final int numColumns;
protected long totalSuccessCount;
protected long totalOperationCount;
protected long totalFailedCount;
protected long totalWaitTime;
protected int successCount;
protected int operationCount;
protected int failedCount;
protected long waitTime;
protected long lastStatTime;
protected long lastGCDuration;
protected double recentDuration;
protected double averageDuration;
public StatsThread(final String... additionalColumns) {
super("Stats Thread");
this.additionalColumns = additionalColumns;
if (!percentilesArgument.isPresent()) {
this.percentiles = new double[]{99.9, 99.99, 99.999};
} else {
this.percentiles = new double[percentilesArgument.getValues().size()];
int index = 0;
for (final String percentile : percentilesArgument.getValues()) {
percentiles[index++] = Double.parseDouble(percentile);
}
Arrays.sort(percentiles);
}
this.numColumns = 5 + this.percentiles.length + additionalColumns.length + (isAsync ? 1 : 0);
this.beans = ManagementFactory.getGarbageCollectorMXBeans();
}
@Override
public void run() {
final MultiColumnPrinter printer;
if (!app.isScriptFriendly()) {
printer = new MultiColumnPrinter(numColumns, 2, "-", MultiColumnPrinter.RIGHT, app);
printer.setTitleAlign(MultiColumnPrinter.RIGHT);
String[] title = new String[numColumns];
Arrays.fill(title, "");
title[0] = "Throughput";
title[2] = "Response Time";
int[] span = new int[numColumns];
span[0] = 2;
span[1] = 0;
span[2] = 2 + this.percentiles.length;
Arrays.fill(span, 3, 4 + this.percentiles.length, 0);
Arrays.fill(span, 4 + this.percentiles.length, span.length, 1);
printer.addTitle(title, span);
title = new String[numColumns];
Arrays.fill(title, "");
title[0] = "(ops/second)";
title[2] = "(milliseconds)";
printer.addTitle(title, span);
title = new String[numColumns];
title[0] = "recent";
title[1] = "average";
title[2] = "recent";
title[3] = "average";
int i = 4;
for (double percentile :percentiles) {
title[i++] = percentile + "%";
}
title[i++] = "err/sec";
if (isAsync) {
title[i++] = "req/res";
}
for (final String column : additionalColumns) {
title[i++] = column;
}
span = new int[numColumns];
Arrays.fill(span, 1);
printer.addTitle(title, span);
printer.printTitle();
} else {
final PrintStream out = app.getOutputStream();
out.print("Time (seconds)");
out.print(",");
out.print("Recent throughput (ops/second)");
out.print(",");
out.print("Average throughput (ops/second)");
out.print(",");
out.print("Recent response time (milliseconds)");
out.print(",");
out.print("Average response time (milliseconds)");
for (final double percentile : this.percentiles) {
out.print(",");
out.print(percentile);
out.print("% response time (milliseconds)");
}
out.print(",");
out.print("Errors/second");
if (isAsync) {
out.print(",");
out.print("Requests/response");
}
for (final String column : additionalColumns) {
out.print(",");
out.print(column);
}
out.println();
printer = null;
}
final long startTime = System.currentTimeMillis();
long statTime = startTime;
long gcDuration = 0;
for (final GarbageCollectorMXBean bean : beans) {
gcDuration += bean.getCollectionTime();
}
while (!stopRequested) {
try {
sleep(statsInterval);
} catch (final InterruptedException ie) {
// Ignore.
}
lastStatTime = statTime;
statTime = System.currentTimeMillis();
lastGCDuration = gcDuration;
gcDuration = 0;
for (final GarbageCollectorMXBean bean : beans) {
gcDuration += bean.getCollectionTime();
}
operationCount = operationRecentCount.getAndSet(0);
successCount = successRecentCount.getAndSet(0);
failedCount = failedRecentCount.getAndSet(0);
waitTime = waitRecentTime.getAndSet(0);
final int resultCount = successCount + failedCount;
totalOperationCount += operationCount;
totalSuccessCount += successCount;
totalFailedCount += failedCount;
totalWaitTime += waitTime;
final long totalResultCount = totalSuccessCount + totalFailedCount;
recentDuration = statTime - lastStatTime;
averageDuration = statTime - startTime;
recentDuration -= gcDuration - lastGCDuration;
averageDuration -= gcDuration;
recentDuration /= 1000.0;
averageDuration /= 1000.0;
final String[] strings = new String[numColumns];
strings[0] = String.format("%.1f", resultCount / recentDuration);
strings[1] = String.format("%.1f", totalResultCount / averageDuration);
if (resultCount > 0) {
strings[2] = String.format("%.3f", (waitTime - (gcDuration - lastGCDuration))
/ (double) resultCount / 1000000.0);
} else {
strings[2] = "-";
}
if (totalResultCount > 0) {
strings[3] =
String.format("%.3f", (totalWaitTime - gcDuration) / (double) totalResultCount / 1000000.0);
} else {
strings[3] = "-";
}
int i = 4;
List computedPercentiles = eTimesBuckets.getPercentile(percentiles, totalOperationCount);
for (int j = computedPercentiles.size() - 1; j >= 0; j--) {
strings[i++] = String.format("%.2f", computedPercentiles.get(j) / 1000.0);
}
strings[i++] = String.format("%.1f", failedCount / recentDuration);
if (isAsync) {
strings[i++] = resultCount > 0 ? String.format("%.1f", (double) operationCount / resultCount) : "-";
}
for (final String column : getAdditionalColumns()) {
strings[i++] = column;
}
if (printer != null) {
printer.printRow(strings);
} else {
// Script-friendly.
final PrintStream out = app.getOutputStream();
out.print(averageDuration);
for (final String s : strings) {
out.print(",");
out.print(s);
}
out.println();
}
}
}
String[] getAdditionalColumns() {
return EMPTY_STRINGS;
}
void resetStats() {
failedCount = 0;
operationCount = 0;
successCount = 0;
operationRecentCount.set(0);
successRecentCount.set(0);
failedRecentCount.set(0);
waitRecentTime.set(0);
}
}
private class TimerThread extends Thread {
private final long timeToWait;
public TimerThread(long timeToWait) {
this.timeToWait = timeToWait;
}
@Override
public void run() {
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} finally {
stopRequested = true;
}
}
}
/**
* Statistics update result handler implementation.
*
* @param
* The type of expected result.
*/
class UpdateStatsResultHandler implements ResultHandler {
protected final long currentTime;
UpdateStatsResultHandler(final long currentTime) {
this.currentTime = currentTime;
}
@Override
public void handleError(final LdapException error) {
failedRecentCount.getAndIncrement();
updateStats();
app.errPrintVerboseMessage(LocalizableMessage.raw(error.getResult().toString()));
}
@Override
public void handleResult(final S result) {
successRecentCount.getAndIncrement();
updateStats();
}
private void updateStats() {
if (!isWarmingUp) {
final long eTime = System.nanoTime() - currentTime;
waitRecentTime.getAndAdd(eTime);
eTimesBuckets.addTimeToInterval(eTime);
}
}
}
/**
* Worker thread base implementation.
*/
abstract class WorkerThread extends Thread {
private int count;
private final Connection connection;
private final ConnectionFactory connectionFactory;
WorkerThread(final Connection connection, final ConnectionFactory connectionFactory) {
super("Worker Thread");
this.connection = connection;
this.connectionFactory = connectionFactory;
}
public abstract Promise, LdapException> performOperation(Connection connection,
DataSource[] dataSources, long startTime);
@Override
public void run() {
Promise, LdapException> promise;
Connection connection;
final double targetTimeInMS = 1000.0 / (targetThroughput / (double) (numThreads * numConnections));
double sleepTimeInMS = 0;
long start;
while (!stopRequested && !(maxIterations > 0 && count >= maxIterations)) {
if (this.connection == null) {
try {
connection = connectionFactory.getConnectionAsync().getOrThrow();
} catch (final InterruptedException e) {
// Ignore and check stop requested
continue;
} catch (final LdapException e) {
app.errPrintln(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
if (e.getCause() != null && app.isVerbose()) {
e.getCause().printStackTrace(app.getErrorStream());
}
stopRequested = true;
break;
}
} else {
connection = this.connection;
if (!noRebind && connection instanceof AuthenticatedConnection) {
final AuthenticatedConnection ac = (AuthenticatedConnection) connection;
try {
ac.rebindAsync().getOrThrow();
} catch (final InterruptedException e) {
// Ignore and check stop requested
continue;
} catch (final LdapException e) {
app.errPrintln(LocalizableMessage.raw(e.getResult().toString()));
if (e.getCause() != null && app.isVerbose()) {
e.getCause().printStackTrace(app.getErrorStream());
}
stopRequested = true;
break;
}
}
}
start = System.nanoTime();
promise = performOperation(connection, dataSources.get(), start);
operationRecentCount.getAndIncrement();
count++;
if (!isAsync) {
try {
if (promise != null) {
promise.getOrThrow();
}
} catch (final InterruptedException e) {
// Ignore and check stop requested
continue;
} catch (final LdapException e) {
if (e.getCause() instanceof IOException) {
e.getCause().printStackTrace(app.getErrorStream());
stopRequested = true;
break;
}
// Ignore. Handled by result handler
} finally {
if (this.connection == null) {
connection.close();
}
}
}
if (targetThroughput > 0) {
try {
if (sleepTimeInMS > 1) {
sleep((long) Math.floor(sleepTimeInMS));
}
} catch (final InterruptedException e) {
continue;
}
sleepTimeInMS += targetTimeInMS - (System.nanoTime() - start) / 1000000.0;
if (sleepTimeInMS < -60000) {
// If we fall behind by 60 seconds, just forget about catching up
sleepTimeInMS = -60000;
}
}
}
}
}
private static final String[] EMPTY_STRINGS = new String[0];
private final AtomicInteger operationRecentCount = new AtomicInteger();
protected final AtomicInteger successRecentCount = new AtomicInteger();
protected final AtomicInteger failedRecentCount = new AtomicInteger();
private final AtomicLong waitRecentTime = new AtomicLong();
private final ResponseTimeBuckets eTimesBuckets = new ResponseTimeBuckets();
private final ConsoleApplication app;
private DataSource[] dataSourcePrototypes;
/** Thread local copies of the data sources. */
private final ThreadLocal dataSources = new ThreadLocal() {
/** {@inheritDoc} */
@Override
protected DataSource[] initialValue() {
final DataSource[] prototypes = getDataSources();
final int sz = prototypes.length;
final DataSource[] threadLocalCopy = new DataSource[sz];
for (int i = 0; i < sz; i++) {
threadLocalCopy[i] = prototypes[i].duplicate();
}
return threadLocalCopy;
}
};
private volatile boolean stopRequested;
private volatile boolean isWarmingUp;
private int numThreads;
private int numConnections;
private int targetThroughput;
private int maxIterations;
/** Warm-up duration time in ms. **/
private long warmUpDuration;
/** Max duration time in ms, 0 for unlimited. **/
private long maxDurationTime;
private boolean isAsync;
private boolean noRebind;
private int statsInterval;
private final IntegerArgument numThreadsArgument;
private final IntegerArgument maxIterationsArgument;
private final IntegerArgument maxDurationArgument;
private final IntegerArgument statsIntervalArgument;
private final IntegerArgument targetThroughputArgument;
private final IntegerArgument numConnectionsArgument;
private final IntegerArgument percentilesArgument;
private final BooleanArgument keepConnectionsOpen;
private final BooleanArgument noRebindArgument;
private final BooleanArgument asyncArgument;
private final StringArgument arguments;
protected final IntegerArgument warmUpArgument;
PerformanceRunner(final PerformanceRunnerOptions options) throws ArgumentException {
ArgumentParser argParser = options.getArgumentParser();
this.app = options.getConsoleApplication();
numThreadsArgument =
new IntegerArgument("numThreads", 't', "numThreads", false, false, true,
LocalizableMessage.raw("{numThreads}"), 1, null, true, 1, false, 0,
LocalizableMessage.raw("Number of worker threads per connection"));
numThreadsArgument.setPropertyName("numThreads");
if (options.supportsMultipleThreadsPerConnection()) {
argParser.addArgument(numThreadsArgument);
} else {
numThreadsArgument.addValue("1");
}
numConnectionsArgument =
new IntegerArgument("numConnections", 'c', "numConnections", false, false, true,
LocalizableMessage.raw("{numConnections}"), 1, null, true, 1, false, 0,
LocalizableMessage.raw("Number of connections"));
numConnectionsArgument.setPropertyName("numConnections");
argParser.addArgument(numConnectionsArgument);
maxIterationsArgument =
new IntegerArgument("maxIterations", 'm', "maxIterations", false, false, true,
LocalizableMessage.raw("{maxIterations}"), 0, null, LocalizableMessage
.raw("Max iterations, 0 for unlimited"));
maxIterationsArgument.setPropertyName("maxIterations");
argParser.addArgument(maxIterationsArgument);
maxDurationArgument =
new IntegerArgument("maxDuration", 'd', "maxDuration", false, false, true,
LocalizableMessage.raw("{maxDuration}"), 0, null, true, 1, false, 0,
LocalizableMessage.raw("Maximum duration in seconds, 0 for unlimited"));
argParser.addArgument(maxDurationArgument);
warmUpArgument =
new IntegerArgument("warmUpDuration", 'B', "warmUpDuration", false, false, true,
LocalizableMessage.raw("{warmUpDuration}"), 0, null,
LocalizableMessage.raw("Warm up duration in seconds"));
argParser.addArgument(warmUpArgument);
statsIntervalArgument =
new IntegerArgument("statInterval", 'i', "statInterval", false, false, true,
LocalizableMessage.raw("{statInterval}"), 5, null, true, 1, false, 0,
LocalizableMessage.raw("Display results each specified number of seconds"));
statsIntervalArgument.setPropertyName("statInterval");
argParser.addArgument(statsIntervalArgument);
targetThroughputArgument =
new IntegerArgument("targetThroughput", 'M', "targetThroughput", false, false,
true, LocalizableMessage.raw("{targetThroughput}"), 0, null,
LocalizableMessage.raw("Target average throughput to achieve"));
targetThroughputArgument.setPropertyName("targetThroughput");
argParser.addArgument(targetThroughputArgument);
percentilesArgument =
new IntegerArgument("percentile", 'e', "percentile", false, true,
LocalizableMessage.raw("{percentile}"), true, 0, true, 100,
LocalizableMessage.raw("Calculate max response time for a "
+ "percentile of operations"));
percentilesArgument.setPropertyName("percentile");
percentilesArgument.setMultiValued(true);
argParser.addArgument(percentilesArgument);
keepConnectionsOpen =
new BooleanArgument("keepConnectionsOpen", 'f', "keepConnectionsOpen",
LocalizableMessage.raw("Keep connections open"));
keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
argParser.addArgument(keepConnectionsOpen);
noRebindArgument =
new BooleanArgument("noRebind", 'F', "noRebind", LocalizableMessage
.raw("Keep connections open and do not rebind"));
noRebindArgument.setPropertyName("noRebind");
if (options.supportsRebind()) {
argParser.addArgument(noRebindArgument);
}
asyncArgument =
new BooleanArgument("asynchronous", 'A', "asynchronous", LocalizableMessage
.raw("Use asynchronous mode and do not "
+ "wait for results before sending the next request"));
asyncArgument.setPropertyName("asynchronous");
if (options.supportsAsynchronousRequests()) {
argParser.addArgument(asyncArgument);
}
arguments =
new StringArgument(
"argument",
'g',
"argument",
false,
true,
true,
LocalizableMessage.raw("{generator function or static string}"),
null,
null,
LocalizableMessage
.raw("Argument used to evaluate the Java "
+ "style format strings in program parameters (ie. Base DN, "
+ "Search Filter). The set of all arguments provided form the "
+ "the argument list in order. Besides static string "
+ "arguments, they can be generated per iteration with the "
+ "following functions: " + StaticUtils.EOL
+ DataSource.getUsage()));
if (options.supportsGeneratorArgument()) {
argParser.addArgument(arguments);
}
}
@Override
public void handleConnectionClosed() {
// Ignore
}
@Override
public synchronized void handleConnectionError(final boolean isDisconnectNotification, final LdapException error) {
if (!stopRequested) {
app.errPrintln(LocalizableMessage.raw("Error occurred on one or more connections: "
+ error.getResult()));
if (error.getCause() != null && app.isVerbose()) {
error.getCause().printStackTrace(app.getErrorStream());
}
stopRequested = true;
}
}
@Override
public void handleUnsolicitedNotification(final ExtendedResult notification) {
// Ignore
}
public final void validate() throws ArgumentException {
numConnections = numConnectionsArgument.getIntValue();
numThreads = numThreadsArgument.getIntValue();
warmUpDuration = warmUpArgument.getIntValue() * 1000L;
maxIterations = maxIterationsArgument.getIntValue() / numConnections / numThreads;
maxDurationTime = maxDurationArgument.getIntValue() * 1000L;
statsInterval = statsIntervalArgument.getIntValue() * 1000;
targetThroughput = targetThroughputArgument.getIntValue();
isAsync = asyncArgument.isPresent();
noRebind = noRebindArgument.isPresent();
if (!noRebindArgument.isPresent() && this.numThreads > 1) {
throw new ArgumentException(ERR_TOOL_ARG_MUST_BE_USED_WHEN_ARG_CONDITION.get(
"--" + noRebindArgument.getLongIdentifier(), "--" + numThreadsArgument.getLongIdentifier(), "> 1"));
}
if (!noRebindArgument.isPresent() && asyncArgument.isPresent()) {
throw new ArgumentException(ERR_TOOL_ARG_NEEDED_WHEN_USING_ARG.get(
"--" + noRebindArgument.getLongIdentifier(), asyncArgument.getLongIdentifier()));
}
if (maxIterationsArgument.isPresent() && maxIterations <= 0) {
throw new ArgumentException(ERR_TOOL_NOT_ENOUGH_ITERATIONS.get(
"--" + maxIterationsArgument.getLongIdentifier(), numConnections * numThreads,
numConnectionsArgument.getLongIdentifier(), numThreadsArgument.getLongIdentifier()));
}
dataSourcePrototypes = DataSource.parse(arguments.getValues());
}
final DataSource[] getDataSources() {
if (dataSourcePrototypes == null) {
throw new IllegalStateException(
"dataSources are null - validate() must be called first");
}
return dataSourcePrototypes;
}
abstract WorkerThread newWorkerThread(final Connection connection,
final ConnectionFactory connectionFactory);
abstract StatsThread newStatsThread();
final int run(final ConnectionFactory connectionFactory) {
final List threads = new ArrayList();
final List connections = new ArrayList();
Connection connection = null;
try {
isWarmingUp = warmUpDuration > 0;
for (int i = 0; i < numConnections; i++) {
if (keepConnectionsOpen.isPresent() || noRebindArgument.isPresent()) {
connection = connectionFactory.getConnectionAsync().getOrThrow();
connection.addConnectionEventListener(this);
connections.add(connection);
}
for (int j = 0; j < numThreads; j++) {
final Thread thread = newWorkerThread(connection, connectionFactory);
threads.add(thread);
thread.start();
}
}
if (maxDurationTime > 0) {
new TimerThread(maxDurationTime).start();
}
final StatsThread statsThread = newStatsThread();
if (isWarmingUp) {
if (!app.isScriptFriendly()) {
app.println(INFO_TOOL_WARMING_UP.get(warmUpDuration / 1000));
}
Thread.sleep(warmUpDuration);
statsThread.resetStats();
isWarmingUp = false;
}
statsThread.start();
for (final Thread t : threads) {
t.join();
}
stopRequested = true;
statsThread.join();
} catch (final InterruptedException e) {
stopRequested = true;
} catch (final LdapException e) {
stopRequested = true;
app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
} finally {
closeSilently(connections);
}
return 0;
}
}