/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2010 Sun Microsystems, Inc. * Portions Copyright 2011-2014 ForgeRock AS. */ package com.forgerock.opendj.ldap.tools; import java.io.IOException; import java.io.PrintStream; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; import java.util.Queue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.opendj.ldap.Connection; import org.forgerock.opendj.ldap.ConnectionEventListener; import org.forgerock.opendj.ldap.ConnectionFactory; import org.forgerock.opendj.ldap.LdapException; import org.forgerock.opendj.ldap.ResultHandler; import org.forgerock.opendj.ldap.responses.ExtendedResult; import org.forgerock.opendj.ldap.responses.Result; import org.forgerock.util.promise.Promise; import com.forgerock.opendj.cli.ArgumentException; import com.forgerock.opendj.cli.ArgumentParser; import com.forgerock.opendj.cli.BooleanArgument; import com.forgerock.opendj.cli.ConsoleApplication; import com.forgerock.opendj.cli.IntegerArgument; import com.forgerock.opendj.cli.MultiColumnPrinter; import com.forgerock.opendj.cli.StringArgument; import com.forgerock.opendj.util.StaticUtils; import static java.util.concurrent.TimeUnit.*; import static org.forgerock.util.Utils.*; import static com.forgerock.opendj.ldap.tools.ToolsMessages.*; /** * Benchmark application framework. */ abstract class PerformanceRunner implements ConnectionEventListener { static final class ResponseTimeBuckets { private static final long NS_1_US = NANOSECONDS.convert(1, MICROSECONDS); private static final long NS_100_US = NANOSECONDS.convert(100, MICROSECONDS); private static final long NS_1_MS = NANOSECONDS.convert(1, MILLISECONDS); private static final long NS_10_MS = NANOSECONDS.convert(10, MILLISECONDS); private static final long NS_100_MS = NANOSECONDS.convert(100, MILLISECONDS); private static final long NS_1_S = NANOSECONDS.convert(1, SECONDS); private static final long NS_5_S = NANOSECONDS.convert(5, SECONDS); private static final int NB_INDEX = 2120; private static final int RANGE_100_MICROSECONDS_START_INDEX = 1000; private static final int RANGE_1_MILLISECOND_START_INDEX = 1090; private static final int RANGE_100_MILLISECONDS_START_INDEX = 2080; /** * Array of response time buckets. * *
         * index    0 ->  999: 1000 buckets for    0ms -    1ms  interval with       1 µs increments
         * index 1000 -> 1089:   90 buckets for    1ms -   10ms  interval with     100 µs increments
         * index 1090 -> 2079:  990 buckets for   10ms - 1000ms  interval with    1000 µs increments
         * index 2080 -> 2119:   40 buckets for 1000ms - 5000ms  interval with 100 000 µs increments
         * 
*/ private final AtomicLong[] index2Frequency = new AtomicLong[NB_INDEX]; /** * Store the lower bounds (in microseconds) of the eTime buckets. */ private final long[] index2Etime = new long[NB_INDEX]; /** * Sorted map used for storing response times from 5s+ with 500 millisecond increments. * * Keys (Long in microseconds) of this map must respect this pattern: n * 500 000 + 5 000 000, * where n is a natural integer. */ private final ConcurrentSkipListMap bigEtimes = new ConcurrentSkipListMap(); /** * Initialize both index2Frequency and index2Etime arrays. */ private ResponseTimeBuckets() { // Helpful variables to compute index2Etime values. long rangeWidthMicroSecs; long rangeStart = 0; long initialTimeMicroSecs = 0; for (int i = 0; i < NB_INDEX; i++) { index2Frequency[i] = new AtomicLong(); if (i < RANGE_100_MICROSECONDS_START_INDEX) { // 0ms-1ms in 1 us increments rangeWidthMicroSecs = 1; } else if (i < RANGE_1_MILLISECOND_START_INDEX) { // 1ms-10ms in 100 us increments rangeWidthMicroSecs = 100; rangeStart = RANGE_100_MICROSECONDS_START_INDEX; initialTimeMicroSecs = MICROSECONDS.convert(1, MILLISECONDS); } else if (i < RANGE_100_MILLISECONDS_START_INDEX) { // 10ms-1000ms in 1000 us increments rangeWidthMicroSecs = MICROSECONDS.convert(1, MILLISECONDS); rangeStart = RANGE_1_MILLISECOND_START_INDEX; initialTimeMicroSecs = MICROSECONDS.convert(10, MILLISECONDS); } else { // 1000ms-5000ms with 100 000 us increments rangeWidthMicroSecs = MICROSECONDS.convert(100, MILLISECONDS); rangeStart = RANGE_100_MILLISECONDS_START_INDEX; initialTimeMicroSecs = MICROSECONDS.convert(1, SECONDS); } index2Etime[i] = (i - rangeStart) * rangeWidthMicroSecs + initialTimeMicroSecs; } } /** * Compute the closest response time values for each percentile given in * parameter. Percentiles array has to be sorted from lower to higher * percentiles. * * @param percentiles * array of {@code double} * * @param nbData * number of response times recorded. * * @return array of response times in microseconds corresponding to * percentiles. */ List getPercentile(double[] percentiles, long nbData) { List responseTimes = new ArrayList(); Queue nbDataThresholds = new LinkedList(); long nbDataSum = nbData; for (int i = percentiles.length - 1; i >= 0; i--) { nbDataThresholds.add((long) (percentiles[i] * nbData) / 100); } Iterator> iter = bigEtimes.descendingMap().entrySet().iterator(); while (iter.hasNext() && !nbDataThresholds.isEmpty()) { Entry currentETime = iter.next(); nbDataSum -= currentETime.getValue().get(); computePercentiles(nbDataThresholds, responseTimes, nbDataSum, currentETime.getKey()); } int stdTimeIndex = NB_INDEX - 1; while (stdTimeIndex >= 0 && !nbDataThresholds.isEmpty()) { long currentETime = index2Etime[stdTimeIndex]; nbDataSum -= index2Frequency[stdTimeIndex].get(); computePercentiles(nbDataThresholds, responseTimes, nbDataSum, currentETime); stdTimeIndex--; } return responseTimes; } private void computePercentiles(Queue currentDataThreshold, List responseTimes, long currentSum, long currentETime) { while (currentDataThreshold.peek() != null && currentDataThreshold.peek() >= currentSum) { responseTimes.add(currentETime); currentDataThreshold.poll(); } } void addTimeToInterval(long responseTimeNanoSecs) { if (responseTimeNanoSecs >= NS_5_S) { long matchingKey = responseTimeNanoSecs / NS_100_MS; matchingKey -= matchingKey % 5; matchingKey = matchingKey * MICROSECONDS.convert(100, MILLISECONDS); // We now have a key corresponding to pattern 5 000 000 + n * 500 000 µs AtomicLong existingKey = bigEtimes.putIfAbsent(matchingKey, new AtomicLong(1)); if (existingKey != null) { existingKey.getAndIncrement(); } return; } final int startRangeIndex; final long rangeWidthNanoSecs; if (responseTimeNanoSecs < NS_1_MS) { rangeWidthNanoSecs = NS_1_US; startRangeIndex = 0; } else if (responseTimeNanoSecs < NS_10_MS) { rangeWidthNanoSecs = NS_100_US; startRangeIndex = RANGE_100_MICROSECONDS_START_INDEX - 10; } else if (responseTimeNanoSecs < NS_1_S) { rangeWidthNanoSecs = NS_1_MS; startRangeIndex = RANGE_1_MILLISECOND_START_INDEX - 10; } else { rangeWidthNanoSecs = NS_100_MS; startRangeIndex = RANGE_100_MILLISECONDS_START_INDEX - 10; } final int intervalIndex = ((int) (responseTimeNanoSecs / rangeWidthNanoSecs)) + startRangeIndex; index2Frequency[intervalIndex].getAndIncrement(); } } //To allow tests static ResponseTimeBuckets getResponseTimeBuckets() { return new ResponseTimeBuckets(); } /** * Statistics thread base implementation. */ class StatsThread extends Thread { private final String[] additionalColumns; private final List beans; private final double[] percentiles; private final int numColumns; protected long totalSuccessCount; protected long totalOperationCount; protected long totalFailedCount; protected long totalWaitTime; protected int successCount; protected int operationCount; protected int failedCount; protected long waitTime; protected long lastStatTime; protected long lastGCDuration; protected double recentDuration; protected double averageDuration; public StatsThread(final String... additionalColumns) { super("Stats Thread"); this.additionalColumns = additionalColumns; if (!percentilesArgument.isPresent()) { this.percentiles = new double[]{99.9, 99.99, 99.999}; } else { this.percentiles = new double[percentilesArgument.getValues().size()]; int index = 0; for (final String percentile : percentilesArgument.getValues()) { percentiles[index++] = Double.parseDouble(percentile); } Arrays.sort(percentiles); } this.numColumns = 5 + this.percentiles.length + additionalColumns.length + (isAsync ? 1 : 0); this.beans = ManagementFactory.getGarbageCollectorMXBeans(); } @Override public void run() { final MultiColumnPrinter printer; if (!app.isScriptFriendly()) { printer = new MultiColumnPrinter(numColumns, 2, "-", MultiColumnPrinter.RIGHT, app); printer.setTitleAlign(MultiColumnPrinter.RIGHT); String[] title = new String[numColumns]; Arrays.fill(title, ""); title[0] = "Throughput"; title[2] = "Response Time"; int[] span = new int[numColumns]; span[0] = 2; span[1] = 0; span[2] = 2 + this.percentiles.length; Arrays.fill(span, 3, 4 + this.percentiles.length, 0); Arrays.fill(span, 4 + this.percentiles.length, span.length, 1); printer.addTitle(title, span); title = new String[numColumns]; Arrays.fill(title, ""); title[0] = "(ops/second)"; title[2] = "(milliseconds)"; printer.addTitle(title, span); title = new String[numColumns]; title[0] = "recent"; title[1] = "average"; title[2] = "recent"; title[3] = "average"; int i = 4; for (double percentile :percentiles) { title[i++] = percentile + "%"; } title[i++] = "err/sec"; if (isAsync) { title[i++] = "req/res"; } for (final String column : additionalColumns) { title[i++] = column; } span = new int[numColumns]; Arrays.fill(span, 1); printer.addTitle(title, span); printer.printTitle(); } else { final PrintStream out = app.getOutputStream(); out.print("Time (seconds)"); out.print(","); out.print("Recent throughput (ops/second)"); out.print(","); out.print("Average throughput (ops/second)"); out.print(","); out.print("Recent response time (milliseconds)"); out.print(","); out.print("Average response time (milliseconds)"); for (final double percentile : this.percentiles) { out.print(","); out.print(percentile); out.print("% response time (milliseconds)"); } out.print(","); out.print("Errors/second"); if (isAsync) { out.print(","); out.print("Requests/response"); } for (final String column : additionalColumns) { out.print(","); out.print(column); } out.println(); printer = null; } final long startTime = System.currentTimeMillis(); long statTime = startTime; long gcDuration = 0; for (final GarbageCollectorMXBean bean : beans) { gcDuration += bean.getCollectionTime(); } while (!stopRequested) { try { sleep(statsInterval); } catch (final InterruptedException ie) { // Ignore. } lastStatTime = statTime; statTime = System.currentTimeMillis(); lastGCDuration = gcDuration; gcDuration = 0; for (final GarbageCollectorMXBean bean : beans) { gcDuration += bean.getCollectionTime(); } operationCount = operationRecentCount.getAndSet(0); successCount = successRecentCount.getAndSet(0); failedCount = failedRecentCount.getAndSet(0); waitTime = waitRecentTime.getAndSet(0); final int resultCount = successCount + failedCount; totalOperationCount += operationCount; totalSuccessCount += successCount; totalFailedCount += failedCount; totalWaitTime += waitTime; final long totalResultCount = totalSuccessCount + totalFailedCount; recentDuration = statTime - lastStatTime; averageDuration = statTime - startTime; recentDuration -= gcDuration - lastGCDuration; averageDuration -= gcDuration; recentDuration /= 1000.0; averageDuration /= 1000.0; final String[] strings = new String[numColumns]; strings[0] = String.format("%.1f", resultCount / recentDuration); strings[1] = String.format("%.1f", totalResultCount / averageDuration); if (resultCount > 0) { strings[2] = String.format("%.3f", (waitTime - (gcDuration - lastGCDuration)) / (double) resultCount / 1000000.0); } else { strings[2] = "-"; } if (totalResultCount > 0) { strings[3] = String.format("%.3f", (totalWaitTime - gcDuration) / (double) totalResultCount / 1000000.0); } else { strings[3] = "-"; } int i = 4; List computedPercentiles = eTimesBuckets.getPercentile(percentiles, totalOperationCount); for (int j = computedPercentiles.size() - 1; j >= 0; j--) { strings[i++] = String.format("%.2f", computedPercentiles.get(j) / 1000.0); } strings[i++] = String.format("%.1f", failedCount / recentDuration); if (isAsync) { strings[i++] = resultCount > 0 ? String.format("%.1f", (double) operationCount / resultCount) : "-"; } for (final String column : getAdditionalColumns()) { strings[i++] = column; } if (printer != null) { printer.printRow(strings); } else { // Script-friendly. final PrintStream out = app.getOutputStream(); out.print(averageDuration); for (final String s : strings) { out.print(","); out.print(s); } out.println(); } } } String[] getAdditionalColumns() { return EMPTY_STRINGS; } void resetStats() { failedCount = 0; operationCount = 0; successCount = 0; operationRecentCount.set(0); successRecentCount.set(0); failedRecentCount.set(0); waitRecentTime.set(0); } } private class TimerThread extends Thread { private final long timeToWait; public TimerThread(long timeToWait) { this.timeToWait = timeToWait; } @Override public void run() { try { Thread.sleep(timeToWait); } catch (InterruptedException e) { throw new IllegalStateException(e); } finally { stopRequested = true; } } } /** * Statistics update result handler implementation. * * @param * The type of expected result. */ class UpdateStatsResultHandler implements ResultHandler { protected final long currentTime; UpdateStatsResultHandler(final long currentTime) { this.currentTime = currentTime; } @Override public void handleError(final LdapException error) { failedRecentCount.getAndIncrement(); updateStats(); app.errPrintVerboseMessage(LocalizableMessage.raw(error.getResult().toString())); } @Override public void handleResult(final S result) { successRecentCount.getAndIncrement(); updateStats(); } private void updateStats() { if (!isWarmingUp) { final long eTime = System.nanoTime() - currentTime; waitRecentTime.getAndAdd(eTime); eTimesBuckets.addTimeToInterval(eTime); } } } /** * Worker thread base implementation. */ abstract class WorkerThread extends Thread { private int count; private final Connection connection; private final ConnectionFactory connectionFactory; WorkerThread(final Connection connection, final ConnectionFactory connectionFactory) { super("Worker Thread"); this.connection = connection; this.connectionFactory = connectionFactory; } public abstract Promise performOperation(Connection connection, DataSource[] dataSources, long startTime); @Override public void run() { Promise promise; Connection connection; final double targetTimeInMS = 1000.0 / (targetThroughput / (double) (numThreads * numConnections)); double sleepTimeInMS = 0; long start; while (!stopRequested && !(maxIterations > 0 && count >= maxIterations)) { if (this.connection == null) { try { connection = connectionFactory.getConnectionAsync().getOrThrow(); } catch (final InterruptedException e) { // Ignore and check stop requested continue; } catch (final LdapException e) { app.errPrintln(LocalizableMessage.raw(e.getResult().getDiagnosticMessage())); if (e.getCause() != null && app.isVerbose()) { e.getCause().printStackTrace(app.getErrorStream()); } stopRequested = true; break; } } else { connection = this.connection; if (!noRebind && app.getBindRequest() != null) { try { connection.bindAsync(app.getBindRequest()).getOrThrow(); } catch (final InterruptedException e) { // Ignore and check stop requested continue; } catch (final LdapException e) { app.errPrintln(LocalizableMessage.raw(e.getResult().toString())); if (e.getCause() != null && app.isVerbose()) { e.getCause().printStackTrace(app.getErrorStream()); } stopRequested = true; break; } } } start = System.nanoTime(); promise = performOperation(connection, dataSources.get(), start); operationRecentCount.getAndIncrement(); count++; if (!isAsync) { try { if (promise != null) { promise.getOrThrow(); } } catch (final InterruptedException e) { // Ignore and check stop requested continue; } catch (final LdapException e) { if (e.getCause() instanceof IOException) { e.getCause().printStackTrace(app.getErrorStream()); stopRequested = true; break; } // Ignore. Handled by result handler } finally { if (this.connection == null) { connection.close(); } } } if (targetThroughput > 0) { try { if (sleepTimeInMS > 1) { sleep((long) Math.floor(sleepTimeInMS)); } } catch (final InterruptedException e) { continue; } sleepTimeInMS += targetTimeInMS - (System.nanoTime() - start) / 1000000.0; if (sleepTimeInMS < -60000) { // If we fall behind by 60 seconds, just forget about catching up sleepTimeInMS = -60000; } } } } } private static final String[] EMPTY_STRINGS = new String[0]; private final AtomicInteger operationRecentCount = new AtomicInteger(); protected final AtomicInteger successRecentCount = new AtomicInteger(); protected final AtomicInteger failedRecentCount = new AtomicInteger(); private final AtomicLong waitRecentTime = new AtomicLong(); private final ResponseTimeBuckets eTimesBuckets = new ResponseTimeBuckets(); private final ConsoleApplication app; private DataSource[] dataSourcePrototypes; /** Thread local copies of the data sources. */ private final ThreadLocal dataSources = new ThreadLocal() { /** {@inheritDoc} */ @Override protected DataSource[] initialValue() { final DataSource[] prototypes = getDataSources(); final int sz = prototypes.length; final DataSource[] threadLocalCopy = new DataSource[sz]; for (int i = 0; i < sz; i++) { threadLocalCopy[i] = prototypes[i].duplicate(); } return threadLocalCopy; } }; private volatile boolean stopRequested; private volatile boolean isWarmingUp; private int numThreads; private int numConnections; private int targetThroughput; private int maxIterations; /** Warm-up duration time in ms. **/ private long warmUpDuration; /** Max duration time in ms, 0 for unlimited. **/ private long maxDurationTime; private boolean isAsync; private boolean noRebind; private int statsInterval; private final IntegerArgument numThreadsArgument; private final IntegerArgument maxIterationsArgument; private final IntegerArgument maxDurationArgument; private final IntegerArgument statsIntervalArgument; private final IntegerArgument targetThroughputArgument; private final IntegerArgument numConnectionsArgument; private final IntegerArgument percentilesArgument; private final BooleanArgument keepConnectionsOpen; private final BooleanArgument noRebindArgument; private final BooleanArgument asyncArgument; private final StringArgument arguments; protected final IntegerArgument warmUpArgument; PerformanceRunner(final PerformanceRunnerOptions options) throws ArgumentException { ArgumentParser argParser = options.getArgumentParser(); this.app = options.getConsoleApplication(); numThreadsArgument = new IntegerArgument("numThreads", 't', "numThreads", false, false, true, LocalizableMessage.raw("{numThreads}"), 1, null, true, 1, false, 0, LocalizableMessage.raw("Number of worker threads per connection")); numThreadsArgument.setPropertyName("numThreads"); if (options.supportsMultipleThreadsPerConnection()) { argParser.addArgument(numThreadsArgument); } else { numThreadsArgument.addValue("1"); } numConnectionsArgument = new IntegerArgument("numConnections", 'c', "numConnections", false, false, true, LocalizableMessage.raw("{numConnections}"), 1, null, true, 1, false, 0, LocalizableMessage.raw("Number of connections")); numConnectionsArgument.setPropertyName("numConnections"); argParser.addArgument(numConnectionsArgument); maxIterationsArgument = new IntegerArgument("maxIterations", 'm', "maxIterations", false, false, true, LocalizableMessage.raw("{maxIterations}"), 0, null, LocalizableMessage .raw("Max iterations, 0 for unlimited")); maxIterationsArgument.setPropertyName("maxIterations"); argParser.addArgument(maxIterationsArgument); maxDurationArgument = new IntegerArgument("maxDuration", 'd', "maxDuration", false, false, true, LocalizableMessage.raw("{maxDuration}"), 0, null, true, 1, false, 0, LocalizableMessage.raw("Maximum duration in seconds, 0 for unlimited")); argParser.addArgument(maxDurationArgument); warmUpArgument = new IntegerArgument("warmUpDuration", 'B', "warmUpDuration", false, false, true, LocalizableMessage.raw("{warmUpDuration}"), 0, null, LocalizableMessage.raw("Warm up duration in seconds")); argParser.addArgument(warmUpArgument); statsIntervalArgument = new IntegerArgument("statInterval", 'i', "statInterval", false, false, true, LocalizableMessage.raw("{statInterval}"), 5, null, true, 1, false, 0, LocalizableMessage.raw("Display results each specified number of seconds")); statsIntervalArgument.setPropertyName("statInterval"); argParser.addArgument(statsIntervalArgument); targetThroughputArgument = new IntegerArgument("targetThroughput", 'M', "targetThroughput", false, false, true, LocalizableMessage.raw("{targetThroughput}"), 0, null, LocalizableMessage.raw("Target average throughput to achieve")); targetThroughputArgument.setPropertyName("targetThroughput"); argParser.addArgument(targetThroughputArgument); percentilesArgument = new IntegerArgument("percentile", 'e', "percentile", false, true, LocalizableMessage.raw("{percentile}"), true, 0, true, 100, LocalizableMessage.raw("Calculate max response time for a " + "percentile of operations")); percentilesArgument.setPropertyName("percentile"); percentilesArgument.setMultiValued(true); argParser.addArgument(percentilesArgument); keepConnectionsOpen = new BooleanArgument("keepConnectionsOpen", 'f', "keepConnectionsOpen", LocalizableMessage.raw("Keep connections open")); keepConnectionsOpen.setPropertyName("keepConnectionsOpen"); argParser.addArgument(keepConnectionsOpen); noRebindArgument = new BooleanArgument("noRebind", 'F', "noRebind", LocalizableMessage .raw("Keep connections open and do not rebind")); noRebindArgument.setPropertyName("noRebind"); if (options.supportsRebind()) { argParser.addArgument(noRebindArgument); } asyncArgument = new BooleanArgument("asynchronous", 'A', "asynchronous", LocalizableMessage .raw("Use asynchronous mode and do not " + "wait for results before sending the next request")); asyncArgument.setPropertyName("asynchronous"); if (options.supportsAsynchronousRequests()) { argParser.addArgument(asyncArgument); } arguments = new StringArgument( "argument", 'g', "argument", false, true, true, LocalizableMessage.raw("{generator function or static string}"), null, null, LocalizableMessage .raw("Argument used to evaluate the Java " + "style format strings in program parameters (ie. Base DN, " + "Search Filter). The set of all arguments provided form the " + "the argument list in order. Besides static string " + "arguments, they can be generated per iteration with the " + "following functions: " + StaticUtils.EOL + DataSource.getUsage())); if (options.supportsGeneratorArgument()) { argParser.addArgument(arguments); } } @Override public void handleConnectionClosed() { // Ignore } @Override public synchronized void handleConnectionError(final boolean isDisconnectNotification, final LdapException error) { if (!stopRequested) { app.errPrintln(LocalizableMessage.raw("Error occurred on one or more connections: " + error.getResult())); if (error.getCause() != null && app.isVerbose()) { error.getCause().printStackTrace(app.getErrorStream()); } stopRequested = true; } } @Override public void handleUnsolicitedNotification(final ExtendedResult notification) { // Ignore } public final void validate() throws ArgumentException { numConnections = numConnectionsArgument.getIntValue(); numThreads = numThreadsArgument.getIntValue(); warmUpDuration = warmUpArgument.getIntValue() * 1000L; maxIterations = maxIterationsArgument.getIntValue() / numConnections / numThreads; maxDurationTime = maxDurationArgument.getIntValue() * 1000L; statsInterval = statsIntervalArgument.getIntValue() * 1000; targetThroughput = targetThroughputArgument.getIntValue(); isAsync = asyncArgument.isPresent(); noRebind = noRebindArgument.isPresent(); if (!noRebindArgument.isPresent() && this.numThreads > 1) { throw new ArgumentException(ERR_TOOL_ARG_MUST_BE_USED_WHEN_ARG_CONDITION.get( "--" + noRebindArgument.getLongIdentifier(), "--" + numThreadsArgument.getLongIdentifier(), "> 1")); } if (!noRebindArgument.isPresent() && asyncArgument.isPresent()) { throw new ArgumentException(ERR_TOOL_ARG_NEEDED_WHEN_USING_ARG.get( "--" + noRebindArgument.getLongIdentifier(), asyncArgument.getLongIdentifier())); } if (maxIterationsArgument.isPresent() && maxIterations <= 0) { throw new ArgumentException(ERR_TOOL_NOT_ENOUGH_ITERATIONS.get( "--" + maxIterationsArgument.getLongIdentifier(), numConnections * numThreads, numConnectionsArgument.getLongIdentifier(), numThreadsArgument.getLongIdentifier())); } dataSourcePrototypes = DataSource.parse(arguments.getValues()); } final DataSource[] getDataSources() { if (dataSourcePrototypes == null) { throw new IllegalStateException( "dataSources are null - validate() must be called first"); } return dataSourcePrototypes; } abstract WorkerThread newWorkerThread(final Connection connection, final ConnectionFactory connectionFactory); abstract StatsThread newStatsThread(); final int run(final ConnectionFactory connectionFactory) { final List threads = new ArrayList(); final List connections = new ArrayList(); Connection connection = null; try { isWarmingUp = warmUpDuration > 0; for (int i = 0; i < numConnections; i++) { if (keepConnectionsOpen.isPresent() || noRebindArgument.isPresent()) { connection = connectionFactory.getConnectionAsync().getOrThrow(); connection.addConnectionEventListener(this); connections.add(connection); } for (int j = 0; j < numThreads; j++) { final Thread thread = newWorkerThread(connection, connectionFactory); threads.add(thread); thread.start(); } } if (maxDurationTime > 0) { new TimerThread(maxDurationTime).start(); } final StatsThread statsThread = newStatsThread(); if (isWarmingUp) { if (!app.isScriptFriendly()) { app.println(INFO_TOOL_WARMING_UP.get(warmUpDuration / 1000)); } Thread.sleep(warmUpDuration); statsThread.resetStats(); isWarmingUp = false; } statsThread.start(); for (final Thread t : threads) { t.join(); } stopRequested = true; statsThread.join(); } catch (final InterruptedException e) { stopRequested = true; } catch (final LdapException e) { stopRequested = true; app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage())); } finally { closeSilently(connections); } return 0; } }