/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
|
* or http://forgerock.org/license/CDDLv1.0.html.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at legal-notices/CDDLv1_0.txt.
|
* If applicable, add the following below this CDDL HEADER, with the
|
* fields enclosed by brackets "[]" replaced with your own identifying
|
* information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Copyright 2010 Sun Microsystems, Inc.
|
* Portions Copyright 2011-2015 ForgeRock AS.
|
*/
|
package com.forgerock.opendj.ldap.tools;
|
|
import static java.util.Locale.ENGLISH;
|
import static java.util.concurrent.TimeUnit.*;
|
|
import static org.forgerock.util.Utils.*;
|
|
import static com.forgerock.opendj.ldap.tools.ToolsMessages.*;
|
|
import java.io.IOException;
|
import java.io.PrintStream;
|
import java.lang.management.GarbageCollectorMXBean;
|
import java.lang.management.ManagementFactory;
|
import java.util.ArrayList;
|
import java.util.Arrays;
|
import java.util.Iterator;
|
import java.util.LinkedList;
|
import java.util.List;
|
import java.util.Map.Entry;
|
import java.util.Queue;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import org.forgerock.i18n.LocalizableMessage;
|
import org.forgerock.opendj.ldap.Connection;
|
import org.forgerock.opendj.ldap.ConnectionEventListener;
|
import org.forgerock.opendj.ldap.ConnectionFactory;
|
import org.forgerock.opendj.ldap.LdapException;
|
import org.forgerock.opendj.ldap.LdapResultHandler;
|
import org.forgerock.opendj.ldap.responses.ExtendedResult;
|
import org.forgerock.opendj.ldap.responses.Result;
|
import org.forgerock.util.promise.Promise;
|
|
import com.forgerock.opendj.cli.ArgumentException;
|
import com.forgerock.opendj.cli.ArgumentParser;
|
import com.forgerock.opendj.cli.AuthenticatedConnectionFactory.AuthenticatedConnection;
|
import com.forgerock.opendj.cli.BooleanArgument;
|
import com.forgerock.opendj.cli.ConsoleApplication;
|
import com.forgerock.opendj.cli.IntegerArgument;
|
import com.forgerock.opendj.cli.MultiColumnPrinter;
|
import com.forgerock.opendj.cli.StringArgument;
|
import com.forgerock.opendj.util.StaticUtils;
|
|
/**
|
* Benchmark application framework.
|
*/
|
abstract class PerformanceRunner implements ConnectionEventListener {
|
static final class ResponseTimeBuckets {
|
private static final long NS_1_US = NANOSECONDS.convert(1, MICROSECONDS);
|
private static final long NS_100_US = NANOSECONDS.convert(100, MICROSECONDS);
|
private static final long NS_1_MS = NANOSECONDS.convert(1, MILLISECONDS);
|
private static final long NS_10_MS = NANOSECONDS.convert(10, MILLISECONDS);
|
private static final long NS_100_MS = NANOSECONDS.convert(100, MILLISECONDS);
|
private static final long NS_1_S = NANOSECONDS.convert(1, SECONDS);
|
private static final long NS_5_S = NANOSECONDS.convert(5, SECONDS);
|
|
private static final int NB_INDEX = 2120;
|
private static final int RANGE_100_MICROSECONDS_START_INDEX = 1000;
|
private static final int RANGE_1_MILLISECOND_START_INDEX = 1090;
|
private static final int RANGE_100_MILLISECONDS_START_INDEX = 2080;
|
|
/**
|
* Array of response time buckets.
|
*
|
* <pre>
|
* index 0 -> 999: 1000 buckets for 0ms - 1ms interval with 1 µs increments
|
* index 1000 -> 1089: 90 buckets for 1ms - 10ms interval with 100 µs increments
|
* index 1090 -> 2079: 990 buckets for 10ms - 1000ms interval with 1000 µs increments
|
* index 2080 -> 2119: 40 buckets for 1000ms - 5000ms interval with 100 000 µs increments
|
* </pre>
|
*/
|
private final AtomicLong[] index2Frequency = new AtomicLong[NB_INDEX];
|
|
/**
|
* Store the lower bounds (in microseconds) of the eTime buckets.
|
*/
|
private final long[] index2Etime = new long[NB_INDEX];
|
|
/**
|
* Sorted map used for storing response times from 5s+ with 500 millisecond increments.
|
*
|
* Keys (Long in microseconds) of this map must respect this pattern: n * 500 000 + 5 000 000,
|
* where n is a natural integer.
|
*/
|
private final ConcurrentSkipListMap<Long, AtomicLong> bigEtimes = new ConcurrentSkipListMap<>();
|
|
/**
|
* Initialize both index2Frequency and index2Etime arrays.
|
*/
|
private ResponseTimeBuckets() {
|
// Helpful variables to compute index2Etime values.
|
long rangeWidthMicroSecs;
|
long rangeStart = 0;
|
long initialTimeMicroSecs = 0;
|
|
for (int i = 0; i < NB_INDEX; i++) {
|
index2Frequency[i] = new AtomicLong();
|
if (i < RANGE_100_MICROSECONDS_START_INDEX) {
|
// 0ms-1ms in 1 us increments
|
rangeWidthMicroSecs = 1;
|
} else if (i < RANGE_1_MILLISECOND_START_INDEX) {
|
// 1ms-10ms in 100 us increments
|
rangeWidthMicroSecs = 100;
|
rangeStart = RANGE_100_MICROSECONDS_START_INDEX;
|
initialTimeMicroSecs = MICROSECONDS.convert(1, MILLISECONDS);
|
} else if (i < RANGE_100_MILLISECONDS_START_INDEX) {
|
// 10ms-1000ms in 1000 us increments
|
rangeWidthMicroSecs = MICROSECONDS.convert(1, MILLISECONDS);
|
rangeStart = RANGE_1_MILLISECOND_START_INDEX;
|
initialTimeMicroSecs = MICROSECONDS.convert(10, MILLISECONDS);
|
} else {
|
// 1000ms-5000ms with 100 000 us increments
|
rangeWidthMicroSecs = MICROSECONDS.convert(100, MILLISECONDS);
|
rangeStart = RANGE_100_MILLISECONDS_START_INDEX;
|
initialTimeMicroSecs = MICROSECONDS.convert(1, SECONDS);
|
}
|
index2Etime[i] = (i - rangeStart) * rangeWidthMicroSecs + initialTimeMicroSecs;
|
}
|
}
|
|
/**
|
* Compute the closest response time values for each percentile given in
|
* parameter. Percentiles array has to be sorted from lower to higher
|
* percentiles.
|
*
|
* @param percentiles
|
* array of {@code double}
|
*
|
* @param nbData
|
* number of response times recorded.
|
*
|
* @return array of response times in microseconds corresponding to
|
* percentiles.
|
*/
|
List<Long> getPercentile(double[] percentiles, long nbData) {
|
List<Long> responseTimes = new ArrayList<>();
|
Queue<Long> nbDataThresholds = new LinkedList<>();
|
long nbDataSum = nbData;
|
|
for (int i = percentiles.length - 1; i >= 0; i--) {
|
nbDataThresholds.add((long) (percentiles[i] * nbData) / 100);
|
}
|
|
Iterator<Entry<Long, AtomicLong>> iter = bigEtimes.descendingMap().entrySet().iterator();
|
while (iter.hasNext() && !nbDataThresholds.isEmpty()) {
|
Entry<Long, AtomicLong> currentETime = iter.next();
|
nbDataSum -= currentETime.getValue().get();
|
computePercentiles(nbDataThresholds, responseTimes, nbDataSum, currentETime.getKey());
|
}
|
|
int stdTimeIndex = NB_INDEX - 1;
|
while (stdTimeIndex >= 0 && !nbDataThresholds.isEmpty()) {
|
long currentETime = index2Etime[stdTimeIndex];
|
nbDataSum -= index2Frequency[stdTimeIndex].get();
|
computePercentiles(nbDataThresholds, responseTimes, nbDataSum, currentETime);
|
stdTimeIndex--;
|
}
|
|
return responseTimes;
|
}
|
|
private void computePercentiles(Queue<Long> currentDataThreshold, List<Long> responseTimes, long currentSum,
|
long currentETime) {
|
while (currentDataThreshold.peek() != null && currentDataThreshold.peek() >= currentSum) {
|
responseTimes.add(currentETime);
|
currentDataThreshold.poll();
|
}
|
}
|
|
void addTimeToInterval(long responseTimeNanoSecs) {
|
if (responseTimeNanoSecs >= NS_5_S) {
|
long matchingKey = responseTimeNanoSecs / NS_100_MS;
|
matchingKey -= matchingKey % 5;
|
matchingKey = matchingKey * MICROSECONDS.convert(100, MILLISECONDS);
|
// We now have a key corresponding to pattern 5 000 000 + n * 500 000 µs
|
AtomicLong existingKey = bigEtimes.putIfAbsent(matchingKey, new AtomicLong(1));
|
if (existingKey != null) {
|
existingKey.getAndIncrement();
|
}
|
return;
|
}
|
|
final int startRangeIndex;
|
final long rangeWidthNanoSecs;
|
if (responseTimeNanoSecs < NS_1_MS) {
|
rangeWidthNanoSecs = NS_1_US;
|
startRangeIndex = 0;
|
} else if (responseTimeNanoSecs < NS_10_MS) {
|
rangeWidthNanoSecs = NS_100_US;
|
startRangeIndex = RANGE_100_MICROSECONDS_START_INDEX - 10;
|
} else if (responseTimeNanoSecs < NS_1_S) {
|
rangeWidthNanoSecs = NS_1_MS;
|
startRangeIndex = RANGE_1_MILLISECOND_START_INDEX - 10;
|
} else {
|
rangeWidthNanoSecs = NS_100_MS;
|
startRangeIndex = RANGE_100_MILLISECONDS_START_INDEX - 10;
|
}
|
final int intervalIndex = ((int) (responseTimeNanoSecs / rangeWidthNanoSecs)) + startRangeIndex;
|
index2Frequency[intervalIndex].getAndIncrement();
|
}
|
}
|
|
/** To allow tests. */
|
static ResponseTimeBuckets getResponseTimeBuckets() {
|
return new ResponseTimeBuckets();
|
}
|
|
|
/**
|
* Statistics thread base implementation.
|
*/
|
class StatsThread extends Thread {
|
private final String[] additionalColumns;
|
private final List<GarbageCollectorMXBean> beans;
|
private final double[] percentiles;
|
private final int numColumns;
|
protected long totalSuccessCount;
|
protected long totalOperationCount;
|
protected long totalFailedCount;
|
protected long totalWaitTime;
|
protected int successCount;
|
protected int operationCount;
|
protected int failedCount;
|
protected long waitTime;
|
protected long lastStatTime;
|
protected long lastGCDuration;
|
protected double recentDuration;
|
protected double averageDuration;
|
|
public StatsThread(final String... additionalColumns) {
|
super("Stats Thread");
|
|
this.additionalColumns = additionalColumns;
|
if (!percentilesArgument.isPresent()) {
|
this.percentiles = new double[]{99.9, 99.99, 99.999};
|
} else {
|
this.percentiles = new double[percentilesArgument.getValues().size()];
|
int index = 0;
|
for (final String percentile : percentilesArgument.getValues()) {
|
percentiles[index++] = Double.parseDouble(percentile);
|
}
|
Arrays.sort(percentiles);
|
}
|
|
this.numColumns = 5 + this.percentiles.length + additionalColumns.length + (isAsync ? 1 : 0);
|
this.beans = ManagementFactory.getGarbageCollectorMXBeans();
|
}
|
|
@Override
|
public void run() {
|
final MultiColumnPrinter printer;
|
|
if (!app.isScriptFriendly()) {
|
printer = new MultiColumnPrinter(numColumns, 2, "-", MultiColumnPrinter.RIGHT, app);
|
printer.setTitleAlign(MultiColumnPrinter.RIGHT);
|
|
String[] title = new String[numColumns];
|
Arrays.fill(title, "");
|
title[0] = "Throughput";
|
title[2] = "Response Time";
|
int[] span = new int[numColumns];
|
span[0] = 2;
|
span[1] = 0;
|
span[2] = 2 + this.percentiles.length;
|
Arrays.fill(span, 3, 4 + this.percentiles.length, 0);
|
Arrays.fill(span, 4 + this.percentiles.length, span.length, 1);
|
printer.addTitle(title, span);
|
title = new String[numColumns];
|
Arrays.fill(title, "");
|
title[0] = "(ops/second)";
|
title[2] = "(milliseconds)";
|
printer.addTitle(title, span);
|
title = new String[numColumns];
|
title[0] = "recent";
|
title[1] = "average";
|
title[2] = "recent";
|
title[3] = "average";
|
int i = 4;
|
for (double percentile :percentiles) {
|
title[i++] = percentile + "%";
|
}
|
title[i++] = "err/sec";
|
if (isAsync) {
|
title[i++] = "req/res";
|
}
|
for (final String column : additionalColumns) {
|
title[i++] = column;
|
}
|
span = new int[numColumns];
|
Arrays.fill(span, 1);
|
printer.addTitle(title, span);
|
printer.printTitle();
|
} else {
|
final PrintStream out = app.getOutputStream();
|
out.print("Time (seconds)");
|
out.print(",");
|
out.print("Recent throughput (ops/second)");
|
out.print(",");
|
out.print("Average throughput (ops/second)");
|
out.print(",");
|
out.print("Recent response time (milliseconds)");
|
out.print(",");
|
out.print("Average response time (milliseconds)");
|
for (final double percentile : this.percentiles) {
|
out.print(",");
|
out.print(percentile);
|
out.print("% response time (milliseconds)");
|
}
|
out.print(",");
|
out.print("Errors/second");
|
if (isAsync) {
|
out.print(",");
|
out.print("Requests/response");
|
}
|
for (final String column : additionalColumns) {
|
out.print(",");
|
out.print(column);
|
}
|
out.println();
|
printer = null;
|
}
|
|
final long startTime = System.currentTimeMillis();
|
long statTime = startTime;
|
long gcDuration = 0;
|
for (final GarbageCollectorMXBean bean : beans) {
|
gcDuration += bean.getCollectionTime();
|
}
|
while (!stopRequested) {
|
try {
|
sleep(statsInterval);
|
} catch (final InterruptedException ie) {
|
// Ignore.
|
}
|
|
lastStatTime = statTime;
|
statTime = System.currentTimeMillis();
|
|
lastGCDuration = gcDuration;
|
gcDuration = 0;
|
for (final GarbageCollectorMXBean bean : beans) {
|
gcDuration += bean.getCollectionTime();
|
}
|
|
operationCount = operationRecentCount.getAndSet(0);
|
successCount = successRecentCount.getAndSet(0);
|
failedCount = failedRecentCount.getAndSet(0);
|
waitTime = waitRecentTime.getAndSet(0);
|
|
final int resultCount = successCount + failedCount;
|
|
totalOperationCount += operationCount;
|
totalSuccessCount += successCount;
|
totalFailedCount += failedCount;
|
totalWaitTime += waitTime;
|
|
final long totalResultCount = totalSuccessCount + totalFailedCount;
|
|
recentDuration = statTime - lastStatTime;
|
averageDuration = statTime - startTime;
|
recentDuration -= gcDuration - lastGCDuration;
|
averageDuration -= gcDuration;
|
recentDuration /= 1000.0;
|
averageDuration /= 1000.0;
|
|
final String[] strings = new String[numColumns];
|
strings[0] = String.format(ENGLISH, "%.1f", resultCount / recentDuration);
|
strings[1] = String.format(ENGLISH, "%.1f", totalResultCount / averageDuration);
|
|
if (resultCount > 0) {
|
strings[2] = String.format(ENGLISH, "%.3f", (waitTime - (gcDuration - lastGCDuration))
|
/ (double) resultCount / 1000000.0);
|
} else {
|
strings[2] = "-";
|
}
|
|
if (totalResultCount > 0) {
|
strings[3] = String.format(ENGLISH, "%.3f",
|
(totalWaitTime - gcDuration) / (double) totalResultCount / 1000000.0);
|
} else {
|
strings[3] = "-";
|
}
|
|
int i = 4;
|
List<Long> computedPercentiles = eTimesBuckets.getPercentile(percentiles, totalOperationCount);
|
for (int j = computedPercentiles.size() - 1; j >= 0; j--) {
|
strings[i++] = String.format(ENGLISH, "%.2f", computedPercentiles.get(j) / 1000.0);
|
}
|
strings[i++] = String.format(ENGLISH, "%.1f", failedCount / recentDuration);
|
if (isAsync) {
|
strings[i++] = resultCount > 0 ? String.format(ENGLISH, "%.1f", (double) operationCount
|
/ resultCount) : "-";
|
}
|
|
for (final String column : getAdditionalColumns()) {
|
strings[i++] = column;
|
}
|
|
if (printer != null) {
|
printer.printRow(strings);
|
} else {
|
// Script-friendly.
|
final PrintStream out = app.getOutputStream();
|
out.print(averageDuration);
|
for (final String s : strings) {
|
out.print(",");
|
out.print(s);
|
}
|
out.println();
|
}
|
}
|
}
|
|
String[] getAdditionalColumns() {
|
return EMPTY_STRINGS;
|
}
|
|
/** Resets both general and recent statistic indicators. */
|
void resetStats() {
|
failedCount = 0;
|
operationCount = 0;
|
successCount = 0;
|
operationRecentCount.set(0);
|
successRecentCount.set(0);
|
failedRecentCount.set(0);
|
waitRecentTime.set(0);
|
}
|
}
|
|
class TimerThread extends Thread {
|
private final long timeToWait;
|
|
TimerThread(long timeToWait) {
|
this.timeToWait = timeToWait;
|
}
|
|
void performStopOperations() {
|
stopRequested = true;
|
}
|
|
@Override
|
public void run() {
|
try {
|
Thread.sleep(timeToWait);
|
} catch (InterruptedException e) {
|
throw new IllegalStateException(e);
|
} finally {
|
performStopOperations();
|
}
|
}
|
}
|
|
/**
|
* Statistics update result handler implementation.
|
*
|
* @param <S>
|
* The type of expected result.
|
*/
|
class UpdateStatsResultHandler<S extends Result> implements LdapResultHandler<S> {
|
protected final long currentTime;
|
|
UpdateStatsResultHandler(final long currentTime) {
|
this.currentTime = currentTime;
|
}
|
|
@Override
|
public void handleException(final LdapException exception) {
|
failedRecentCount.getAndIncrement();
|
updateStats();
|
app.errPrintVerboseMessage(LocalizableMessage.raw(exception.getResult().toString()));
|
}
|
|
@Override
|
public void handleResult(final S result) {
|
successRecentCount.getAndIncrement();
|
updateStats();
|
}
|
|
private void updateStats() {
|
if (!isWarmingUp) {
|
final long eTime = System.nanoTime() - currentTime;
|
waitRecentTime.getAndAdd(eTime);
|
eTimesBuckets.addTimeToInterval(eTime);
|
}
|
}
|
}
|
|
/**
|
* Worker thread base implementation.
|
*/
|
abstract class WorkerThread extends Thread {
|
private int count;
|
private final Connection connection;
|
private final ConnectionFactory connectionFactory;
|
boolean localStopRequested;
|
|
WorkerThread(final Connection connection, final ConnectionFactory connectionFactory) {
|
super("Worker Thread");
|
this.connection = connection;
|
this.connectionFactory = connectionFactory;
|
}
|
|
public abstract Promise<?, LdapException> performOperation(Connection connection,
|
DataSource[] dataSources, long startTime);
|
|
@Override
|
public void run() {
|
Promise<?, LdapException> promise;
|
Connection connection;
|
final double targetTimeInMS = 1000.0 / (targetThroughput / (double) (numThreads * numConnections));
|
double sleepTimeInMS = 0;
|
|
while (!stopRequested && !localStopRequested
|
&& (maxIterations <= 0 || count < maxIterations)) {
|
if (this.connection == null) {
|
try {
|
connection = connectionFactory.getConnectionAsync().getOrThrow();
|
} catch (final InterruptedException e) {
|
// Ignore and check stop requested
|
continue;
|
} catch (final LdapException e) {
|
app.errPrintln(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
|
if (e.getCause() != null && app.isVerbose()) {
|
e.getCause().printStackTrace(app.getErrorStream());
|
}
|
stopRequested = true;
|
break;
|
}
|
} else {
|
connection = this.connection;
|
if (!noRebind && connection instanceof AuthenticatedConnection) {
|
final AuthenticatedConnection ac = (AuthenticatedConnection) connection;
|
try {
|
ac.rebindAsync().getOrThrow();
|
} catch (final InterruptedException e) {
|
// Ignore and check stop requested
|
continue;
|
} catch (final LdapException e) {
|
app.errPrintln(LocalizableMessage.raw(e.getResult().toString()));
|
if (e.getCause() != null && app.isVerbose()) {
|
e.getCause().printStackTrace(app.getErrorStream());
|
}
|
stopRequested = true;
|
break;
|
}
|
}
|
}
|
|
long start = System.nanoTime();
|
promise = performOperation(connection, dataSources.get(), start);
|
operationRecentCount.getAndIncrement();
|
if (!isAsync) {
|
try {
|
promise.getOrThrow();
|
} catch (final InterruptedException e) {
|
// Ignore and check stop requested
|
continue;
|
} catch (final LdapException e) {
|
if (e.getCause() instanceof IOException) {
|
e.getCause().printStackTrace(app.getErrorStream());
|
stopRequested = true;
|
break;
|
}
|
// Ignore. Handled by result handler
|
} finally {
|
if (this.connection == null) {
|
connection.close();
|
}
|
}
|
}
|
if (targetThroughput > 0) {
|
try {
|
if (sleepTimeInMS > 1) {
|
sleep((long) Math.floor(sleepTimeInMS));
|
}
|
} catch (final InterruptedException e) {
|
continue;
|
}
|
|
sleepTimeInMS += targetTimeInMS - (System.nanoTime() - start) / 1000000.0;
|
if (sleepTimeInMS < -60000) {
|
// If we fall behind by 60 seconds, just forget about catching up
|
sleepTimeInMS = -60000;
|
}
|
}
|
}
|
}
|
|
void incrementIterationCount() {
|
count++;
|
}
|
}
|
|
private static final String[] EMPTY_STRINGS = new String[0];
|
private final AtomicInteger operationRecentCount = new AtomicInteger();
|
protected final AtomicInteger successRecentCount = new AtomicInteger();
|
protected final AtomicInteger failedRecentCount = new AtomicInteger();
|
private final AtomicLong waitRecentTime = new AtomicLong();
|
private final ResponseTimeBuckets eTimesBuckets = new ResponseTimeBuckets();
|
|
|
private final ConsoleApplication app;
|
private DataSource[] dataSourcePrototypes;
|
|
/** Thread local copies of the data sources. */
|
private final ThreadLocal<DataSource[]> dataSources = new ThreadLocal<DataSource[]>() {
|
/** {@inheritDoc} */
|
@Override
|
protected DataSource[] initialValue() {
|
final DataSource[] prototypes = getDataSources();
|
final int sz = prototypes.length;
|
final DataSource[] threadLocalCopy = new DataSource[sz];
|
for (int i = 0; i < sz; i++) {
|
threadLocalCopy[i] = prototypes[i].duplicate();
|
}
|
return threadLocalCopy;
|
}
|
|
};
|
|
int numThreads;
|
int numConnections;
|
volatile boolean stopRequested;
|
private volatile boolean isWarmingUp;
|
private int targetThroughput;
|
private int maxIterations;
|
/** Warm-up duration time in ms. **/
|
private long warmUpDuration;
|
/** Max duration time in ms, 0 for unlimited. **/
|
private long maxDurationTime;
|
private boolean isAsync;
|
private boolean noRebind;
|
private int statsInterval;
|
private final IntegerArgument numThreadsArgument;
|
private final IntegerArgument maxDurationArgument;
|
private final IntegerArgument statsIntervalArgument;
|
private final IntegerArgument targetThroughputArgument;
|
private final IntegerArgument numConnectionsArgument;
|
private final IntegerArgument percentilesArgument;
|
private final BooleanArgument keepConnectionsOpen;
|
private final BooleanArgument noRebindArgument;
|
private final BooleanArgument asyncArgument;
|
private final StringArgument arguments;
|
protected final IntegerArgument maxIterationsArgument;
|
protected final IntegerArgument warmUpArgument;
|
|
private final List<Thread> workerThreads = new ArrayList<>();
|
|
PerformanceRunner(final PerformanceRunnerOptions options) throws ArgumentException {
|
ArgumentParser argParser = options.getArgumentParser();
|
|
this.app = options.getConsoleApplication();
|
numThreadsArgument =
|
new IntegerArgument("numThreads", 't', "numThreads", false, false, true,
|
LocalizableMessage.raw("{numThreads}"), 1, null, true, 1, false, 0,
|
LocalizableMessage.raw("Number of worker threads per connection"));
|
numThreadsArgument.setPropertyName("numThreads");
|
if (options.supportsMultipleThreadsPerConnection()) {
|
argParser.addArgument(numThreadsArgument);
|
} else {
|
numThreadsArgument.addValue("1");
|
}
|
|
numConnectionsArgument =
|
new IntegerArgument("numConnections", 'c', "numConnections", false, false, true,
|
LocalizableMessage.raw("{numConnections}"), 1, null, true, 1, false, 0,
|
LocalizableMessage.raw("Number of connections"));
|
numConnectionsArgument.setPropertyName("numConnections");
|
argParser.addArgument(numConnectionsArgument);
|
|
maxIterationsArgument =
|
new IntegerArgument("maxIterations", 'm', "maxIterations", false, false, true,
|
LocalizableMessage.raw("{maxIterations}"), 0, null,
|
LocalizableMessage.raw("Max iterations, 0 for unlimited"));
|
maxIterationsArgument.setPropertyName("maxIterations");
|
argParser.addArgument(maxIterationsArgument);
|
|
maxDurationArgument =
|
new IntegerArgument("maxDuration", 'd', "maxDuration", false, false, true,
|
LocalizableMessage.raw("{maxDuration}"), 0, null, true, 1, false, 0,
|
LocalizableMessage.raw("Maximum duration in seconds, 0 for unlimited"));
|
argParser.addArgument(maxDurationArgument);
|
|
warmUpArgument =
|
new IntegerArgument("warmUpDuration", 'B', "warmUpDuration", false, false, true,
|
LocalizableMessage.raw("{warmUpDuration}"), 0, null,
|
LocalizableMessage.raw("Warm up duration in seconds"));
|
argParser.addArgument(warmUpArgument);
|
|
statsIntervalArgument =
|
new IntegerArgument("statInterval", 'i', "statInterval", false, false, true,
|
LocalizableMessage.raw("{statInterval}"), 5, null, true, 1, false, 0,
|
LocalizableMessage.raw("Display results each specified number of seconds"));
|
statsIntervalArgument.setPropertyName("statInterval");
|
argParser.addArgument(statsIntervalArgument);
|
|
targetThroughputArgument =
|
new IntegerArgument("targetThroughput", 'M', "targetThroughput", false, false,
|
true, LocalizableMessage.raw("{targetThroughput}"), 0, null,
|
LocalizableMessage.raw("Target average throughput to achieve"));
|
targetThroughputArgument.setPropertyName("targetThroughput");
|
argParser.addArgument(targetThroughputArgument);
|
|
percentilesArgument =
|
new IntegerArgument("percentile", 'e', "percentile", false, true,
|
LocalizableMessage.raw("{percentile}"), true, 0, true, 100,
|
LocalizableMessage.raw("Calculate max response time for a "
|
+ "percentile of operations"));
|
percentilesArgument.setPropertyName("percentile");
|
percentilesArgument.setMultiValued(true);
|
argParser.addArgument(percentilesArgument);
|
|
keepConnectionsOpen =
|
new BooleanArgument("keepConnectionsOpen", 'f', "keepConnectionsOpen",
|
LocalizableMessage.raw("Keep connections open"));
|
keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
|
argParser.addArgument(keepConnectionsOpen);
|
|
noRebindArgument =
|
new BooleanArgument("noRebind", 'F', "noRebind", LocalizableMessage
|
.raw("Keep connections open and do not rebind"));
|
noRebindArgument.setPropertyName("noRebind");
|
if (options.supportsRebind()) {
|
argParser.addArgument(noRebindArgument);
|
}
|
|
asyncArgument =
|
new BooleanArgument("asynchronous", 'A', "asynchronous", LocalizableMessage
|
.raw("Use asynchronous mode and do not "
|
+ "wait for results before sending the next request"));
|
asyncArgument.setPropertyName("asynchronous");
|
if (options.supportsAsynchronousRequests()) {
|
argParser.addArgument(asyncArgument);
|
}
|
|
arguments =
|
new StringArgument(
|
"argument",
|
'g',
|
"argument",
|
false,
|
true,
|
true,
|
LocalizableMessage.raw("{generator function or static string}"),
|
null,
|
null,
|
LocalizableMessage
|
.raw("Argument used to evaluate the Java "
|
+ "style format strings in program parameters (ie. Base DN, "
|
+ "Search Filter). The set of all arguments provided form the "
|
+ "the argument list in order. Besides static string "
|
+ "arguments, they can be generated per iteration with the "
|
+ "following functions: " + StaticUtils.EOL
|
+ DataSource.getUsage()));
|
if (options.supportsGeneratorArgument()) {
|
argParser.addArgument(arguments);
|
}
|
}
|
|
@Override
|
public void handleConnectionClosed() {
|
// Ignore
|
}
|
|
@Override
|
public synchronized void handleConnectionError(final boolean isDisconnectNotification, final LdapException error) {
|
if (!stopRequested) {
|
app.errPrintln(LocalizableMessage.raw("Error occurred on one or more connections: "
|
+ error.getResult()));
|
if (error.getCause() != null && app.isVerbose()) {
|
error.getCause().printStackTrace(app.getErrorStream());
|
}
|
stopRequested = true;
|
}
|
}
|
|
@Override
|
public void handleUnsolicitedNotification(final ExtendedResult notification) {
|
// Ignore
|
}
|
|
public final void validate() throws ArgumentException {
|
numConnections = numConnectionsArgument.getIntValue();
|
numThreads = numThreadsArgument.getIntValue();
|
warmUpDuration = warmUpArgument.getIntValue() * 1000L;
|
maxIterations = maxIterationsArgument.getIntValue() / numConnections / numThreads;
|
maxDurationTime = maxDurationArgument.getIntValue() * 1000L;
|
statsInterval = statsIntervalArgument.getIntValue() * 1000;
|
targetThroughput = targetThroughputArgument.getIntValue();
|
|
isAsync = asyncArgument.isPresent();
|
noRebind = noRebindArgument.isPresent();
|
|
if (!noRebindArgument.isPresent() && this.numThreads > 1) {
|
throw new ArgumentException(ERR_TOOL_ARG_MUST_BE_USED_WHEN_ARG_CONDITION.get(
|
"--" + noRebindArgument.getLongIdentifier(), "--" + numThreadsArgument.getLongIdentifier(), "> 1"));
|
}
|
|
if (!noRebindArgument.isPresent() && asyncArgument.isPresent()) {
|
throw new ArgumentException(ERR_TOOL_ARG_NEEDED_WHEN_USING_ARG.get(
|
"--" + noRebindArgument.getLongIdentifier(), asyncArgument.getLongIdentifier()));
|
}
|
|
if (maxIterationsArgument.isPresent() && maxIterations <= 0) {
|
throw new ArgumentException(ERR_TOOL_NOT_ENOUGH_ITERATIONS.get(
|
"--" + maxIterationsArgument.getLongIdentifier(), numConnections * numThreads,
|
numConnectionsArgument.getLongIdentifier(), numThreadsArgument.getLongIdentifier()));
|
}
|
|
dataSourcePrototypes = DataSource.parse(arguments.getValues());
|
}
|
|
final DataSource[] getDataSources() {
|
if (dataSourcePrototypes == null) {
|
throw new IllegalStateException("dataSources are null - validate() must be called first");
|
}
|
return dataSourcePrototypes;
|
}
|
|
abstract WorkerThread newWorkerThread(final Connection connection, final ConnectionFactory connectionFactory);
|
abstract StatsThread newStatsThread();
|
|
TimerThread newEndTimerThread(final long timeTowait) {
|
return new TimerThread(timeTowait);
|
}
|
|
final int run(final ConnectionFactory connectionFactory) {
|
final List<Connection> connections = new ArrayList<>();
|
|
Connection connection = null;
|
try {
|
isWarmingUp = warmUpDuration > 0;
|
for (int i = 0; i < numConnections; i++) {
|
if (keepConnectionsOpen.isPresent() || noRebindArgument.isPresent()) {
|
connection = connectionFactory.getConnectionAsync().getOrThrow();
|
connection.addConnectionEventListener(this);
|
connections.add(connection);
|
}
|
for (int j = 0; j < numThreads; j++) {
|
final Thread thread = newWorkerThread(connection, connectionFactory);
|
workerThreads.add(thread);
|
thread.start();
|
}
|
}
|
|
if (maxDurationTime > 0) {
|
newEndTimerThread(maxDurationTime).start();
|
}
|
|
final StatsThread statsThread = newStatsThread();
|
|
if (isWarmingUp) {
|
if (!app.isScriptFriendly()) {
|
app.println(INFO_TOOL_WARMING_UP.get(warmUpDuration / 1000));
|
}
|
Thread.sleep(warmUpDuration);
|
statsThread.resetStats();
|
isWarmingUp = false;
|
}
|
|
statsThread.start();
|
joinAllWorkerThreads();
|
stopRequested = true;
|
statsThread.join();
|
} catch (final InterruptedException e) {
|
stopRequested = true;
|
} catch (final LdapException e) {
|
stopRequested = true;
|
app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
|
} finally {
|
closeSilently(connections);
|
}
|
|
return 0;
|
}
|
|
protected void joinAllWorkerThreads() throws InterruptedException {
|
for (final Thread t : workerThreads) {
|
t.join();
|
}
|
}
|
}
|