/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2015 ForgeRock AS.
*/
package com.forgerock.opendj.ldap.tools;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.*;
import static org.forgerock.util.Utils.*;
import static com.forgerock.opendj.ldap.tools.ToolsMessages.*;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.Connection;
import org.forgerock.opendj.ldap.ConnectionEventListener;
import org.forgerock.opendj.ldap.ConnectionFactory;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LdapResultHandler;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.util.promise.Promise;
import com.forgerock.opendj.cli.ArgumentException;
import com.forgerock.opendj.cli.ArgumentParser;
import com.forgerock.opendj.cli.AuthenticatedConnectionFactory.AuthenticatedConnection;
import com.forgerock.opendj.cli.BooleanArgument;
import com.forgerock.opendj.cli.ConsoleApplication;
import com.forgerock.opendj.cli.IntegerArgument;
import com.forgerock.opendj.cli.MultiColumnPrinter;
import com.forgerock.opendj.cli.StringArgument;
import com.forgerock.opendj.util.StaticUtils;
/** Benchmark application framework. */
abstract class PerformanceRunner implements ConnectionEventListener {
static final class ResponseTimeBuckets {
private static final long NS_1_US = NANOSECONDS.convert(1, MICROSECONDS);
private static final long NS_100_US = NANOSECONDS.convert(100, MICROSECONDS);
private static final long NS_1_MS = NANOSECONDS.convert(1, MILLISECONDS);
private static final long NS_10_MS = NANOSECONDS.convert(10, MILLISECONDS);
private static final long NS_100_MS = NANOSECONDS.convert(100, MILLISECONDS);
private static final long NS_1_S = NANOSECONDS.convert(1, SECONDS);
private static final long NS_5_S = NANOSECONDS.convert(5, SECONDS);
private static final int NB_INDEX = 2120;
private static final int RANGE_100_MICROSECONDS_START_INDEX = 1000;
private static final int RANGE_1_MILLISECOND_START_INDEX = 1090;
private static final int RANGE_100_MILLISECONDS_START_INDEX = 2080;
/**
* Array of response time buckets.
*
*
* index 0 -> 999: 1000 buckets for 0ms - 1ms interval with 1 µs increments
* index 1000 -> 1089: 90 buckets for 1ms - 10ms interval with 100 µs increments
* index 1090 -> 2079: 990 buckets for 10ms - 1000ms interval with 1000 µs increments
* index 2080 -> 2119: 40 buckets for 1000ms - 5000ms interval with 100 000 µs increments
*
*/
private final AtomicLong[] index2Frequency = new AtomicLong[NB_INDEX];
/**
* Store the lower bounds (in microseconds) of the eTime buckets.
*/
private final long[] index2Etime = new long[NB_INDEX];
/**
* Sorted map used for storing response times from 5s+ with 500 millisecond increments.
*
* Keys (Long in microseconds) of this map must respect this pattern: n * 500 000 + 5 000 000,
* where n is a natural integer.
*/
private final ConcurrentSkipListMap bigEtimes = new ConcurrentSkipListMap<>();
/**
* Initialize both index2Frequency and index2Etime arrays.
*/
private ResponseTimeBuckets() {
// Helpful variables to compute index2Etime values.
long rangeWidthMicroSecs;
long rangeStart = 0;
long initialTimeMicroSecs = 0;
for (int i = 0; i < NB_INDEX; i++) {
index2Frequency[i] = new AtomicLong();
if (i < RANGE_100_MICROSECONDS_START_INDEX) {
// 0ms-1ms in 1 us increments
rangeWidthMicroSecs = 1;
} else if (i < RANGE_1_MILLISECOND_START_INDEX) {
// 1ms-10ms in 100 us increments
rangeWidthMicroSecs = 100;
rangeStart = RANGE_100_MICROSECONDS_START_INDEX;
initialTimeMicroSecs = MICROSECONDS.convert(1, MILLISECONDS);
} else if (i < RANGE_100_MILLISECONDS_START_INDEX) {
// 10ms-1000ms in 1000 us increments
rangeWidthMicroSecs = MICROSECONDS.convert(1, MILLISECONDS);
rangeStart = RANGE_1_MILLISECOND_START_INDEX;
initialTimeMicroSecs = MICROSECONDS.convert(10, MILLISECONDS);
} else {
// 1000ms-5000ms with 100 000 us increments
rangeWidthMicroSecs = MICROSECONDS.convert(100, MILLISECONDS);
rangeStart = RANGE_100_MILLISECONDS_START_INDEX;
initialTimeMicroSecs = MICROSECONDS.convert(1, SECONDS);
}
index2Etime[i] = (i - rangeStart) * rangeWidthMicroSecs + initialTimeMicroSecs;
}
}
/**
* Compute the closest response time values for each percentile given in
* parameter. Percentiles array has to be sorted from lower to higher
* percentiles.
*
* @param percentiles
* array of {@code double}
*
* @param nbData
* number of response times recorded.
*
* @return array of response times in microseconds corresponding to
* percentiles.
*/
List getPercentile(double[] percentiles, long nbData) {
List responseTimes = new ArrayList<>();
Queue nbDataThresholds = new LinkedList<>();
long nbDataSum = nbData;
for (int i = percentiles.length - 1; i >= 0; i--) {
nbDataThresholds.add((long) (percentiles[i] * nbData) / 100);
}
Iterator> iter = bigEtimes.descendingMap().entrySet().iterator();
while (iter.hasNext() && !nbDataThresholds.isEmpty()) {
Entry currentETime = iter.next();
nbDataSum -= currentETime.getValue().get();
computePercentiles(nbDataThresholds, responseTimes, nbDataSum, currentETime.getKey());
}
int stdTimeIndex = NB_INDEX - 1;
while (stdTimeIndex >= 0 && !nbDataThresholds.isEmpty()) {
long currentETime = index2Etime[stdTimeIndex];
nbDataSum -= index2Frequency[stdTimeIndex].get();
computePercentiles(nbDataThresholds, responseTimes, nbDataSum, currentETime);
stdTimeIndex--;
}
return responseTimes;
}
private void computePercentiles(Queue currentDataThreshold, List responseTimes, long currentSum,
long currentETime) {
while (currentDataThreshold.peek() != null && currentDataThreshold.peek() >= currentSum) {
responseTimes.add(currentETime);
currentDataThreshold.poll();
}
}
void addTimeToInterval(long responseTimeNanoSecs) {
if (responseTimeNanoSecs >= NS_5_S) {
long matchingKey = responseTimeNanoSecs / NS_100_MS;
matchingKey -= matchingKey % 5;
matchingKey = matchingKey * MICROSECONDS.convert(100, MILLISECONDS);
// We now have a key corresponding to pattern 5 000 000 + n * 500 000 µs
AtomicLong existingKey = bigEtimes.putIfAbsent(matchingKey, new AtomicLong(1));
if (existingKey != null) {
existingKey.getAndIncrement();
}
return;
}
final int startRangeIndex;
final long rangeWidthNanoSecs;
if (responseTimeNanoSecs < NS_1_MS) {
rangeWidthNanoSecs = NS_1_US;
startRangeIndex = 0;
} else if (responseTimeNanoSecs < NS_10_MS) {
rangeWidthNanoSecs = NS_100_US;
startRangeIndex = RANGE_100_MICROSECONDS_START_INDEX - 10;
} else if (responseTimeNanoSecs < NS_1_S) {
rangeWidthNanoSecs = NS_1_MS;
startRangeIndex = RANGE_1_MILLISECOND_START_INDEX - 10;
} else {
rangeWidthNanoSecs = NS_100_MS;
startRangeIndex = RANGE_100_MILLISECONDS_START_INDEX - 10;
}
final int intervalIndex = ((int) (responseTimeNanoSecs / rangeWidthNanoSecs)) + startRangeIndex;
index2Frequency[intervalIndex].getAndIncrement();
}
}
/** To allow tests. */
static ResponseTimeBuckets getResponseTimeBuckets() {
return new ResponseTimeBuckets();
}
/** Statistics thread base implementation. */
class StatsThread extends Thread {
protected long totalResultCount;
protected long totalOperationCount;
protected double totalDurationSec;
protected long totalWaitTimeNs;
protected int intervalSuccessCount;
protected int intervalOperationCount;
protected int intervalFailedCount;
protected double intervalDurationSec;
protected long intervalWaitTimeNs;
protected long lastStatTimeMs;
protected long lastGCDurationMs;
private final int numColumns;
private final String[] additionalColumns;
private final double[] percentiles;
private final List gcBeans;
private final boolean isScriptFriendly = app.isScriptFriendly();
private MultiColumnPrinter printer;
public StatsThread(final String... additionalColumns) {
super("Stats Thread");
this.additionalColumns = additionalColumns;
if (!percentilesArgument.isPresent()) {
this.percentiles = new double[] { 99.9, 99.99, 99.999 };
} else {
this.percentiles = new double[percentilesArgument.getValues().size()];
int index = 0;
for (final String percentile : percentilesArgument.getValues()) {
percentiles[index++] = Double.parseDouble(percentile);
}
Arrays.sort(percentiles);
}
this.numColumns = 5 + this.percentiles.length + additionalColumns.length + (isAsync ? 1 : 0);
this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
}
private void printResultsTitle() {
if (isScriptFriendly) {
printResultsTitleScriptFriendly();
return;
}
printer = new MultiColumnPrinter(numColumns, 2, "-", MultiColumnPrinter.RIGHT, app);
printer.setTitleAlign(MultiColumnPrinter.RIGHT);
printResultTitleHeaders();
printResultTitleDetails();
}
private void printResultTitleHeaders() {
final String[][] titleHeaders = new String[2][numColumns];
for (final String[] titleLine : titleHeaders) {
Arrays.fill(titleLine, "");
}
titleHeaders[0][0] = "Throughput";
titleHeaders[0][2] = "Response Time";
titleHeaders[1][0] = "(ops/second)";
titleHeaders[1][2] = "(milliseconds)";
final int[] span = new int[numColumns];
span[0] = 2;
span[1] = 0;
span[2] = 2 + this.percentiles.length;
Arrays.fill(span, 3, 4 + this.percentiles.length, 0);
Arrays.fill(span, 4 + this.percentiles.length, span.length, 1);
for (final String[] titleLine : titleHeaders) {
printer.addTitle(titleLine, span);
}
}
private void printResultTitleDetails() {
final String[] titleDetails = new String[numColumns];
titleDetails[0] = "recent";
titleDetails[1] = "average";
titleDetails[2] = "recent";
titleDetails[3] = "average";
int i = 4;
for (double percentile :percentiles) {
titleDetails[i++] = percentile + "%";
}
titleDetails[i++] = "err/sec";
if (isAsync) {
titleDetails[i++] = "req/res";
}
for (final String column : additionalColumns) {
titleDetails[i++] = column;
}
final int[] span = new int[numColumns];
Arrays.fill(span, 1);
printer.addTitle(titleDetails, span);
printer.printTitle();
}
private void printResultsTitleScriptFriendly() {
final PrintStream out = app.getOutputStream();
out.print("Time (seconds)");
out.print(",");
out.print("Recent throughput (ops/second)");
out.print(",");
out.print("Average throughput (ops/second)");
out.print(",");
out.print("Recent response time (milliseconds)");
out.print(",");
out.print("Average response time (milliseconds)");
for (final double percentile : this.percentiles) {
out.print(",");
out.print(percentile);
out.print("% response time (milliseconds)");
}
out.print(",");
out.print("Errors/second");
if (isAsync) {
out.print(",");
out.print("Requests/response");
}
for (final String column : additionalColumns) {
out.print(",");
out.print(column);
}
out.println();
}
@Override
public void run() {
printResultsTitle();
long totalStatTimeMs = System.currentTimeMillis();
long gcDurationMs = getGCDuration();
while (!stopRequested) {
try {
sleep(statsInterval);
} catch (final InterruptedException ie) {
// Ignore.
}
lastStatTimeMs = totalStatTimeMs;
totalStatTimeMs = System.currentTimeMillis();
lastGCDurationMs = gcDurationMs;
gcDurationMs = getGCDuration();
final long gcIntervalDurationMs = gcDurationMs - lastGCDurationMs;
computeStatsForInterval(totalStatTimeMs, gcIntervalDurationMs);
final long intervalResultCount = intervalSuccessCount + intervalFailedCount;
final String[] printableStats = new String[numColumns];
Arrays.fill(printableStats, "-");
printableStats[0] = getDivisionResult(intervalResultCount, intervalDurationSec, 1);
printableStats[1] = getDivisionResult(totalResultCount, totalDurationSec, 1);
final long intervalWaitTimeMs = NANOSECONDS.toMillis(intervalWaitTimeNs) - gcIntervalDurationMs;
printableStats[2] = getDivisionResult(intervalWaitTimeMs, intervalResultCount, 3);
final long totalWaitTimeMs = NANOSECONDS.toMillis(totalWaitTimeNs) - gcDurationMs;
printableStats[3] = getDivisionResult(totalWaitTimeMs, totalResultCount, 3);
int i = 4;
final List computedPercentiles = eTimesBuckets.getPercentile(percentiles, totalOperationCount);
for (int j = computedPercentiles.size() - 1; j >= 0; j--) {
printableStats[i++] = getDivisionResult(computedPercentiles.get(j) , 1000.0, 2);
}
i = 4 + percentiles.length;
printableStats[i++] = getDivisionResult(intervalFailedCount, intervalDurationSec, 1);
if (isAsync) {
printableStats[i++] = getDivisionResult(intervalOperationCount, intervalResultCount, 1);
}
for (final String column : getAdditionalColumns()) {
printableStats[i++] = column;
}
if (isScriptFriendly) {
printScriptFriendlyStats(printableStats);
} else {
printer.printRow(printableStats);
}
}
}
private void computeStatsForInterval(final long statTime, final long gcIntervalDurationMs) {
intervalOperationCount = operationRecentCount.getAndSet(0);
intervalSuccessCount = successRecentCount.getAndSet(0);
intervalFailedCount = failedRecentCount.getAndSet(0);
intervalWaitTimeNs = waitRecentTimeNs.getAndSet(0);
totalOperationCount += intervalOperationCount;
totalResultCount += intervalSuccessCount + intervalFailedCount;
totalWaitTimeNs += intervalWaitTimeNs;
final long intervalDurationMs = statTime - lastStatTimeMs;
intervalDurationSec = (intervalDurationMs - gcIntervalDurationMs) / 1000.0;
totalDurationSec += intervalDurationSec;
}
private long getGCDuration() {
long gcDuration = 0;
for (final GarbageCollectorMXBean bean : gcBeans) {
gcDuration += bean.getCollectionTime();
}
return gcDuration;
}
private String getDivisionResult(final long numerator, final double denominator, final int precision) {
return getDivisionResult(numerator, denominator, precision, "-");
}
protected String getDivisionResult(
final long numerator, final double denominator, final int precision, final String fallBack) {
return denominator > 0 ? String.format(ENGLISH, "%." + precision + "f", numerator / denominator)
: fallBack;
}
private void printScriptFriendlyStats(String[] printableStats) {
final PrintStream out = app.getOutputStream();
out.print(String.format(ENGLISH, "%.3f", totalDurationSec));
for (final String s : printableStats) {
out.print(",");
out.print(s);
}
out.println();
}
String[] getAdditionalColumns() {
return EMPTY_STRINGS;
}
/** Resets both general and recent statistic indicators. */
void resetStats() {
intervalFailedCount = 0;
intervalOperationCount = 0;
intervalSuccessCount = 0;
operationRecentCount.set(0);
successRecentCount.set(0);
failedRecentCount.set(0);
waitRecentTimeNs.set(0);
}
}
class TimerThread extends Thread {
private final long timeToWait;
TimerThread(long timeToWait) {
this.timeToWait = timeToWait;
}
void performStopOperations() {
stopRequested = true;
}
@Override
public void run() {
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} finally {
performStopOperations();
}
}
}
/**
* Statistics update result handler implementation.
*
* @param
* The type of expected result.
*/
class UpdateStatsResultHandler implements LdapResultHandler {
protected final long currentTime;
UpdateStatsResultHandler(final long currentTime) {
this.currentTime = currentTime;
}
@Override
public void handleException(final LdapException exception) {
failedRecentCount.getAndIncrement();
updateStats();
app.errPrintVerboseMessage(LocalizableMessage.raw(exception.getResult().toString()));
}
@Override
public void handleResult(final S result) {
successRecentCount.getAndIncrement();
updateStats();
}
private void updateStats() {
if (!isWarmingUp) {
final long eTime = System.nanoTime() - currentTime;
waitRecentTimeNs.getAndAdd(eTime);
eTimesBuckets.addTimeToInterval(eTime);
}
}
}
/** Worker thread base implementation. */
abstract class WorkerThread extends Thread {
private int count;
private final Connection connection;
private final ConnectionFactory connectionFactory;
boolean localStopRequested;
WorkerThread(final Connection connection, final ConnectionFactory connectionFactory) {
super("Worker Thread");
this.connection = connection;
this.connectionFactory = connectionFactory;
}
public abstract Promise, LdapException> performOperation(
Connection connection, DataSource[] dataSources, long startTime);
@Override
public void run() {
Promise, LdapException> promise;
Connection connection;
final double targetTimeMs = 1000.0 / (targetThroughput / (double) (numThreads * numConnections));
double sleepTimeMs = 0;
while (!stopRequested && !localStopRequested
&& (maxIterations <= 0 || count < maxIterations)) {
if (this.connection == null) {
try {
connection = connectionFactory.getConnectionAsync().getOrThrow();
} catch (final InterruptedException e) {
// Ignore and check stop requested
continue;
} catch (final LdapException e) {
app.errPrintln(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
if (e.getCause() != null && app.isVerbose()) {
e.getCause().printStackTrace(app.getErrorStream());
}
stopRequested = true;
break;
}
} else {
connection = this.connection;
if (!noRebind && connection instanceof AuthenticatedConnection) {
final AuthenticatedConnection ac = (AuthenticatedConnection) connection;
try {
ac.rebindAsync().getOrThrow();
} catch (final InterruptedException e) {
// Ignore and check stop requested
continue;
} catch (final LdapException e) {
app.errPrintln(LocalizableMessage.raw(e.getResult().toString()));
if (e.getCause() != null && app.isVerbose()) {
e.getCause().printStackTrace(app.getErrorStream());
}
stopRequested = true;
break;
}
}
}
long startTimeNs = System.nanoTime();
promise = performOperation(connection, dataSources.get(), startTimeNs);
operationRecentCount.getAndIncrement();
if (!isAsync) {
try {
promise.getOrThrow();
} catch (final InterruptedException e) {
// Ignore and check stop requested
continue;
} catch (final LdapException e) {
if (e.getCause() instanceof IOException) {
e.getCause().printStackTrace(app.getErrorStream());
stopRequested = true;
break;
}
// Ignore. Handled by result handler
} finally {
if (this.connection == null) {
connection.close();
}
}
}
if (targetThroughput > 0) {
try {
if (sleepTimeMs > 1) {
sleep((long) Math.floor(sleepTimeMs));
}
} catch (final InterruptedException e) {
continue;
}
sleepTimeMs += targetTimeMs - NANOSECONDS.toMillis(System.nanoTime() - startTimeNs);
final long oneMinuteMs = MINUTES.toMillis(1);
if (sleepTimeMs + oneMinuteMs < 0) {
// If we fall behind by 60 seconds, just forget about catching up
sleepTimeMs = -oneMinuteMs;
}
}
}
}
void incrementIterationCount() {
count++;
}
}
private static final String[] EMPTY_STRINGS = new String[0];
private final AtomicInteger operationRecentCount = new AtomicInteger();
protected final AtomicInteger successRecentCount = new AtomicInteger();
protected final AtomicInteger failedRecentCount = new AtomicInteger();
private final AtomicLong waitRecentTimeNs = new AtomicLong();
private final ResponseTimeBuckets eTimesBuckets = new ResponseTimeBuckets();
private final ConsoleApplication app;
private DataSource[] dataSourcePrototypes;
/** Thread local copies of the data sources. */
private final ThreadLocal dataSources = new ThreadLocal() {
/** {@inheritDoc} */
@Override
protected DataSource[] initialValue() {
final DataSource[] prototypes = getDataSources();
final int sz = prototypes.length;
final DataSource[] threadLocalCopy = new DataSource[sz];
for (int i = 0; i < sz; i++) {
threadLocalCopy[i] = prototypes[i].duplicate();
}
return threadLocalCopy;
}
};
int numThreads;
int numConnections;
volatile boolean stopRequested;
private volatile boolean isWarmingUp;
private int targetThroughput;
private int maxIterations;
/** Warm-up duration time in ms. **/
private long warmUpDuration;
/** Max duration time in ms, 0 for unlimited. **/
private long maxDurationTime;
private boolean isAsync;
private boolean noRebind;
private int statsInterval;
private final IntegerArgument numThreadsArgument;
private final IntegerArgument maxDurationArgument;
private final IntegerArgument statsIntervalArgument;
private final IntegerArgument targetThroughputArgument;
private final IntegerArgument numConnectionsArgument;
private final IntegerArgument percentilesArgument;
private final BooleanArgument keepConnectionsOpen;
private final BooleanArgument noRebindArgument;
private final BooleanArgument asyncArgument;
private final StringArgument arguments;
protected final IntegerArgument maxIterationsArgument;
protected final IntegerArgument warmUpArgument;
private final List workerThreads = new ArrayList<>();
PerformanceRunner(final PerformanceRunnerOptions options) throws ArgumentException {
ArgumentParser argParser = options.getArgumentParser();
this.app = options.getConsoleApplication();
numThreadsArgument =
new IntegerArgument("numThreads", 't', "numThreads", false, false, true,
LocalizableMessage.raw("{numThreads}"), 1, null, true, 1, false, 0,
LocalizableMessage.raw("Number of worker threads per connection"));
numThreadsArgument.setPropertyName("numThreads");
if (options.supportsMultipleThreadsPerConnection()) {
argParser.addArgument(numThreadsArgument);
} else {
numThreadsArgument.addValue("1");
}
numConnectionsArgument =
new IntegerArgument("numConnections", 'c', "numConnections", false, false, true,
LocalizableMessage.raw("{numConnections}"), 1, null, true, 1, false, 0,
LocalizableMessage.raw("Number of connections"));
numConnectionsArgument.setPropertyName("numConnections");
argParser.addArgument(numConnectionsArgument);
maxIterationsArgument =
new IntegerArgument("maxIterations", 'm', "maxIterations", false, false, true,
LocalizableMessage.raw("{maxIterations}"), 0, null,
LocalizableMessage.raw("Max iterations, 0 for unlimited"));
maxIterationsArgument.setPropertyName("maxIterations");
argParser.addArgument(maxIterationsArgument);
maxDurationArgument =
new IntegerArgument("maxDuration", 'd', "maxDuration", false, false, true,
LocalizableMessage.raw("{maxDuration}"), 0, null, true, 1, false, 0,
LocalizableMessage.raw("Maximum duration in seconds, 0 for unlimited"));
argParser.addArgument(maxDurationArgument);
warmUpArgument =
new IntegerArgument("warmUpDuration", 'B', "warmUpDuration", false, false, true,
LocalizableMessage.raw("{warmUpDuration}"), 0, null,
LocalizableMessage.raw("Warm up duration in seconds"));
argParser.addArgument(warmUpArgument);
statsIntervalArgument =
new IntegerArgument("statInterval", 'i', "statInterval", false, false, true,
LocalizableMessage.raw("{statInterval}"), 5, null, true, 1, false, 0,
LocalizableMessage.raw("Display results each specified number of seconds"));
statsIntervalArgument.setPropertyName("statInterval");
argParser.addArgument(statsIntervalArgument);
targetThroughputArgument =
new IntegerArgument("targetThroughput", 'M', "targetThroughput", false, false,
true, LocalizableMessage.raw("{targetThroughput}"), 0, null,
LocalizableMessage.raw("Target average throughput to achieve"));
targetThroughputArgument.setPropertyName("targetThroughput");
argParser.addArgument(targetThroughputArgument);
percentilesArgument =
new IntegerArgument("percentile", 'e', "percentile", false, true,
LocalizableMessage.raw("{percentile}"), true, 0, true, 100,
LocalizableMessage.raw("Calculate max response time for a "
+ "percentile of operations"));
percentilesArgument.setPropertyName("percentile");
percentilesArgument.setMultiValued(true);
argParser.addArgument(percentilesArgument);
keepConnectionsOpen =
new BooleanArgument("keepConnectionsOpen", 'f', "keepConnectionsOpen",
LocalizableMessage.raw("Keep connections open"));
keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
argParser.addArgument(keepConnectionsOpen);
noRebindArgument =
new BooleanArgument("noRebind", 'F', "noRebind", LocalizableMessage
.raw("Keep connections open and do not rebind"));
noRebindArgument.setPropertyName("noRebind");
if (options.supportsRebind()) {
argParser.addArgument(noRebindArgument);
}
asyncArgument =
new BooleanArgument("asynchronous", 'A', "asynchronous", LocalizableMessage
.raw("Use asynchronous mode and do not "
+ "wait for results before sending the next request"));
asyncArgument.setPropertyName("asynchronous");
if (options.supportsAsynchronousRequests()) {
argParser.addArgument(asyncArgument);
}
arguments =
new StringArgument(
"argument",
'g',
"argument",
false,
true,
true,
LocalizableMessage.raw("{generator function or static string}"),
null,
null,
LocalizableMessage
.raw("Argument used to evaluate the Java "
+ "style format strings in program parameters (ie. Base DN, "
+ "Search Filter). The set of all arguments provided form the "
+ "the argument list in order. Besides static string "
+ "arguments, they can be generated per iteration with the "
+ "following functions: " + StaticUtils.EOL
+ DataSource.getUsage()));
if (options.supportsGeneratorArgument()) {
argParser.addArgument(arguments);
}
}
@Override
public void handleConnectionClosed() {
// Ignore
}
@Override
public synchronized void handleConnectionError(final boolean isDisconnectNotification, final LdapException error) {
if (!stopRequested) {
app.errPrintln(LocalizableMessage.raw("Error occurred on one or more connections: "
+ error.getResult()));
if (error.getCause() != null && app.isVerbose()) {
error.getCause().printStackTrace(app.getErrorStream());
}
stopRequested = true;
}
}
@Override
public void handleUnsolicitedNotification(final ExtendedResult notification) {
// Ignore
}
public final void validate() throws ArgumentException {
numConnections = numConnectionsArgument.getIntValue();
numThreads = numThreadsArgument.getIntValue();
warmUpDuration = warmUpArgument.getIntValue() * 1000L;
maxIterations = maxIterationsArgument.getIntValue() / numConnections / numThreads;
maxDurationTime = maxDurationArgument.getIntValue() * 1000L;
statsInterval = statsIntervalArgument.getIntValue() * 1000;
targetThroughput = targetThroughputArgument.getIntValue();
isAsync = asyncArgument.isPresent();
noRebind = noRebindArgument.isPresent();
if (!noRebindArgument.isPresent() && this.numThreads > 1) {
throw new ArgumentException(ERR_TOOL_ARG_MUST_BE_USED_WHEN_ARG_CONDITION.get(
"--" + noRebindArgument.getLongIdentifier(), "--" + numThreadsArgument.getLongIdentifier(), "> 1"));
}
if (!noRebindArgument.isPresent() && asyncArgument.isPresent()) {
throw new ArgumentException(ERR_TOOL_ARG_NEEDED_WHEN_USING_ARG.get(
"--" + noRebindArgument.getLongIdentifier(), asyncArgument.getLongIdentifier()));
}
if (maxIterationsArgument.isPresent() && maxIterations <= 0) {
throw new ArgumentException(ERR_TOOL_NOT_ENOUGH_ITERATIONS.get(
"--" + maxIterationsArgument.getLongIdentifier(), numConnections * numThreads,
numConnectionsArgument.getLongIdentifier(), numThreadsArgument.getLongIdentifier()));
}
dataSourcePrototypes = DataSource.parse(arguments.getValues());
}
final DataSource[] getDataSources() {
if (dataSourcePrototypes == null) {
throw new IllegalStateException("dataSources are null - validate() must be called first");
}
return dataSourcePrototypes;
}
abstract WorkerThread newWorkerThread(final Connection connection, final ConnectionFactory connectionFactory);
abstract StatsThread newStatsThread();
TimerThread newEndTimerThread(final long timeTowait) {
return new TimerThread(timeTowait);
}
final int run(final ConnectionFactory connectionFactory) {
final List connections = new ArrayList<>();
Connection connection = null;
try {
isWarmingUp = warmUpDuration > 0;
for (int i = 0; i < numConnections; i++) {
if (keepConnectionsOpen.isPresent() || noRebindArgument.isPresent()) {
connection = connectionFactory.getConnectionAsync().getOrThrow();
connection.addConnectionEventListener(this);
connections.add(connection);
}
for (int j = 0; j < numThreads; j++) {
final Thread thread = newWorkerThread(connection, connectionFactory);
workerThreads.add(thread);
thread.start();
}
}
if (maxDurationTime > 0) {
newEndTimerThread(maxDurationTime).start();
}
final StatsThread statsThread = newStatsThread();
if (isWarmingUp) {
if (!app.isScriptFriendly()) {
app.println(INFO_TOOL_WARMING_UP.get(warmUpDuration / 1000));
}
Thread.sleep(warmUpDuration);
statsThread.resetStats();
isWarmingUp = false;
}
statsThread.start();
joinAllWorkerThreads();
stopRequested = true;
statsThread.join();
} catch (final InterruptedException e) {
stopRequested = true;
} catch (final LdapException e) {
stopRequested = true;
app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
} finally {
closeSilently(connections);
}
return 0;
}
protected void joinAllWorkerThreads() throws InterruptedException {
for (final Thread t : workerThreads) {
t.join();
}
}
}