/* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2010 Sun Microsystems, Inc. * Portions Copyright 2011-2016 ForgeRock AS. */ package com.forgerock.opendj.ldap.tools; import static com.forgerock.opendj.ldap.tools.Utils.printErrorMessage; import static java.util.concurrent.TimeUnit.*; import static org.forgerock.util.Utils.*; import static com.forgerock.opendj.ldap.tools.ToolsMessages.*; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.opendj.ldap.Connection; import org.forgerock.opendj.ldap.ConnectionEventListener; import org.forgerock.opendj.ldap.ConnectionFactory; import org.forgerock.opendj.ldap.LdapException; import org.forgerock.opendj.ldap.LdapResultHandler; import org.forgerock.opendj.ldap.requests.BindRequest; import org.forgerock.opendj.ldap.responses.ExtendedResult; import org.forgerock.opendj.ldap.responses.Result; import org.forgerock.util.promise.Promise; import com.forgerock.opendj.cli.ArgumentException; import com.forgerock.opendj.cli.ArgumentParser; import com.forgerock.opendj.cli.BooleanArgument; import com.forgerock.opendj.cli.ConsoleApplication; import com.forgerock.opendj.cli.IntegerArgument; import com.forgerock.opendj.cli.StringArgument; import com.forgerock.opendj.util.StaticUtils; /** Benchmark application framework. */ abstract class PerformanceRunner implements ConnectionEventListener { private static final double[] DEFAULT_PERCENTILES = new double[] { 99.9, 99.99, 99.999 }; class TimerThread extends Thread { private final long timeToWait; TimerThread(long timeToWait) { this.timeToWait = timeToWait; } void performStopOperations() { stopTool(); } @Override public void run() { try { Thread.sleep(timeToWait); } catch (InterruptedException e) { throw new IllegalStateException(e); } finally { performStopOperations(); } } } /** * Statistics update result handler implementation. * * @param * The type of expected result. */ class UpdateStatsResultHandler implements LdapResultHandler { protected final long operationStartTimeNs; UpdateStatsResultHandler(final long currentTimeNs) { this.operationStartTimeNs = currentTimeNs; } @Override public final void handleException(final LdapException exception) { statsThread.incrementFailedCount(); updateResponseTime(); app.errPrintVerboseMessage(LocalizableMessage.raw(exception.getResult().toString())); } @Override public final void handleResult(final S result) { statsThread.incrementSuccessCount(); updateResponseTime(); updateAdditionalStatsOnResult(); } /** Do nothing by default, child classes which manage additional stats need to override this method. */ void updateAdditionalStatsOnResult() { } private void updateResponseTime() { statsThread.addResponseTime(System.nanoTime() - operationStartTimeNs); } } /** Worker thread base implementation. */ abstract class WorkerThread extends Thread { private int count; private final Connection connection; private final ConnectionFactory connectionFactory; boolean localStopRequested; WorkerThread(final Connection connection, final ConnectionFactory connectionFactory) { super("Worker Thread"); this.connection = connection; this.connectionFactory = connectionFactory; } public abstract Promise performOperation( Connection connection, DataSource[] dataSources, long currentTimeNs); @Override public void run() { Promise promise; Connection connection; final double targetTimeMs = 1000.0 / (targetThroughput / (double) (numThreads * numConnections)); double sleepTimeMs = 0; while (!stopRequested && !localStopRequested && (maxIterations <= 0 || count < maxIterations)) { try { connection = getConnectionToUse(); } catch (final InterruptedException e) { // Ignore and check stop requested continue; } catch (final LdapException e) { handleConnectionError(false, e); break; } long startTimeNs = System.nanoTime(); promise = performOperation(connection, dataSources.get(), startTimeNs); statsThread.incrementOperationCount(); try { promise.getOrThrow(); } catch (final InterruptedException e) { // Ignore and check stop requested continue; } catch (final LdapException e) { if (!stopRequested && e.getCause() instanceof IOException) { e.getCause().printStackTrace(app.getErrorStream()); stopTool(true); break; } // Ignore. Handled by result handler } finally { if (this.connection == null) { connection.close(); } } if (targetThroughput > 0) { try { if (sleepTimeMs > 1) { sleep((long) Math.floor(sleepTimeMs)); } } catch (final InterruptedException e) { continue; } sleepTimeMs += targetTimeMs - NANOSECONDS.toMillis(System.nanoTime() - startTimeNs); final long oneMinuteMs = MINUTES.toMillis(1); if (sleepTimeMs + oneMinuteMs < 0) { // If we fall behind by 60 seconds, just forget about catching up sleepTimeMs = -oneMinuteMs; } } } } private Connection getConnectionToUse() throws InterruptedException, LdapException { if (this.connection == null) { return connectionFactory.getConnectionAsync().getOrThrow(); } else { final Connection resultConnection = this.connection; if (!noRebind && bindRequest != null) { resultConnection.bindAsync(bindRequest).getOrThrow(); } return resultConnection; } } void incrementIterationCount() { count++; } } private final ConsoleApplication app; private DataSource[] dataSourcePrototypes; /** Thread local copies of the data sources. */ private final ThreadLocal dataSources = new ThreadLocal() { @Override protected DataSource[] initialValue() { final DataSource[] prototypes = getDataSources(); final int sz = prototypes.length; final DataSource[] threadLocalCopy = new DataSource[sz]; for (int i = 0; i < sz; i++) { threadLocalCopy[i] = prototypes[i].duplicate(); } return threadLocalCopy; } }; int numThreads; int numConnections; private boolean stopRequested; private int targetThroughput; private int maxIterations; /** Warm-up duration time in ms. */ private long warmUpDurationMs; /** Max duration time in ms, 0 for unlimited. */ private long maxDurationTimeMs; private boolean noRebind; private BindRequest bindRequest; private int statsIntervalMs; private final IntegerArgument numThreadsArgument; private final IntegerArgument maxDurationArgument; private final IntegerArgument statsIntervalArgument; private final IntegerArgument targetThroughputArgument; private final IntegerArgument numConnectionsArgument; private final IntegerArgument percentilesArgument; private final BooleanArgument keepConnectionsOpen; private final BooleanArgument noRebindArgument; private final StringArgument arguments; protected final IntegerArgument maxIterationsArgument; protected final IntegerArgument warmUpArgument; private final List workerThreads = new ArrayList<>(); StatsThread statsThread; PerformanceRunner(final PerformanceRunnerOptions options) throws ArgumentException { ArgumentParser argParser = options.getArgumentParser(); this.app = options.getConsoleApplication(); numThreadsArgument = IntegerArgument.builder("numThreads") .shortIdentifier('t') .description(LocalizableMessage.raw("Number of worker threads per connection")) .lowerBound(1) .defaultValue(1) .valuePlaceholder(LocalizableMessage.raw("{numThreads}")) .buildArgument(); if (options.supportsMultipleThreadsPerConnection()) { argParser.addArgument(numThreadsArgument); } else { numThreadsArgument.addValue("1"); } numConnectionsArgument = IntegerArgument.builder("numConnections") .shortIdentifier('c') .description(LocalizableMessage.raw("Number of connections")) .lowerBound(1) .defaultValue(1) .valuePlaceholder(LocalizableMessage.raw("{numConnections}")) .buildAndAddToParser(argParser); maxIterationsArgument = IntegerArgument.builder("maxIterations") .shortIdentifier('m') .description(LocalizableMessage.raw("Max iterations, 0 for unlimited")) .defaultValue(0) .valuePlaceholder(LocalizableMessage.raw("{maxIterations}")) .buildAndAddToParser(argParser); maxDurationArgument = IntegerArgument.builder("maxDuration") .shortIdentifier('d') .description(LocalizableMessage.raw("Maximum duration in seconds, 0 for unlimited")) .lowerBound(1) .defaultValue(0) .valuePlaceholder(LocalizableMessage.raw("{maxDuration}")) .buildAndAddToParser(argParser); warmUpArgument = IntegerArgument.builder("warmUpDuration") .shortIdentifier('B') .description(LocalizableMessage.raw("Warm up duration in seconds")) .defaultValue(0) .valuePlaceholder(LocalizableMessage.raw("{warmUpDuration}")) .buildAndAddToParser(argParser); statsIntervalArgument = IntegerArgument.builder("statInterval") .shortIdentifier('i') .description(LocalizableMessage.raw("Display results each specified number of seconds")) .lowerBound(1) .defaultValue(5) .valuePlaceholder(LocalizableMessage.raw("{statInterval}")) .buildAndAddToParser(argParser); targetThroughputArgument = IntegerArgument.builder("targetThroughput") .shortIdentifier('M') .description(LocalizableMessage.raw("Target average throughput to achieve")) .defaultValue(0) .valuePlaceholder(LocalizableMessage.raw("{targetThroughput}")) .buildAndAddToParser(argParser); percentilesArgument = IntegerArgument.builder("percentile") .shortIdentifier('e') .description( LocalizableMessage.raw("Calculate max response time for a percentile of operations")) .multiValued() .range(0, 100) .valuePlaceholder(LocalizableMessage.raw("{percentile}")) .buildAndAddToParser(argParser); keepConnectionsOpen = BooleanArgument.builder("keepConnectionsOpen") .shortIdentifier('f') .description(LocalizableMessage.raw("Keep connections open")) .buildAndAddToParser(argParser); noRebindArgument = BooleanArgument.builder("noRebind") .shortIdentifier('F') .description(LocalizableMessage.raw("Keep connections open and do not rebind")) .buildArgument(); if (options.supportsRebind()) { argParser.addArgument(noRebindArgument); } arguments = StringArgument.builder("argument") .shortIdentifier('g') .description(LocalizableMessage .raw("Argument used to evaluate the Java " + "style format strings in program parameters (ie. Base DN, " + "Search Filter). The set of all arguments provided form the " + "the argument list in order. Besides static string " + "arguments, they can be generated per iteration with the " + "following functions: " + StaticUtils.EOL + DataSource.getUsage())) .multiValued() .valuePlaceholder(LocalizableMessage.raw("{generator function or static string}")) .buildArgument(); if (options.supportsGeneratorArgument()) { argParser.addArgument(arguments); } } @Override public void handleConnectionClosed() { // Ignore } @Override public synchronized void handleConnectionError(final boolean isDisconnectNotification, final LdapException error) { if (!stopRequested) { app.errPrintln(ERROR_RATE_TOOLS_CANNOT_GET_CONNECTION.get(error.getMessage())); if (error.getCause() != null && app.isVerbose()) { error.getCause().printStackTrace(app.getErrorStream()); } stopTool(true); } } @Override public void handleUnsolicitedNotification(final ExtendedResult notification) { // Ignore } public final void validate() throws ArgumentException { numConnections = numConnectionsArgument.getIntValue(); numThreads = numThreadsArgument.getIntValue(); warmUpDurationMs = warmUpArgument.getIntValue() * 1000L; maxIterations = maxIterationsArgument.getIntValue() / numConnections / numThreads; maxDurationTimeMs = maxDurationArgument.getIntValue() * 1000L; statsIntervalMs = statsIntervalArgument.getIntValue() * 1000; targetThroughput = targetThroughputArgument.getIntValue(); noRebind = noRebindArgument.isPresent(); if (!noRebindArgument.isPresent() && this.numThreads > 1) { throw new ArgumentException(ERR_TOOL_ARG_MUST_BE_USED_WHEN_ARG_CONDITION.get( "--" + noRebindArgument.getLongIdentifier(), "--" + numThreadsArgument.getLongIdentifier(), "> 1")); } if (maxIterationsArgument.isPresent() && maxIterations <= 0) { throw new ArgumentException(ERR_TOOL_NOT_ENOUGH_ITERATIONS.get( "--" + maxIterationsArgument.getLongIdentifier(), numConnections * numThreads, numConnectionsArgument.getLongIdentifier(), numThreadsArgument.getLongIdentifier())); } dataSourcePrototypes = DataSource.parse(arguments.getValues()); } final DataSource[] getDataSources() { if (dataSourcePrototypes == null) { throw new IllegalStateException("dataSources are null - validate() must be called first"); } return dataSourcePrototypes; } abstract WorkerThread newWorkerThread(final Connection connection, final ConnectionFactory connectionFactory); abstract StatsThread newStatsThread(final PerformanceRunner performanceRunner, final ConsoleApplication app); TimerThread newEndTimerThread(final long timeToWait) { return new TimerThread(timeToWait); } final int run(final ConnectionFactory connectionFactory) { final List connections = new ArrayList<>(); statsThread = newStatsThread(this, app); try { validateCanConnectToServer(connectionFactory); for (int i = 0; i < numConnections; i++) { Connection connection = null; if (keepConnectionsOpen.isPresent() || noRebindArgument.isPresent()) { connection = connectionFactory.getConnection(); connection.addConnectionEventListener(this); connections.add(connection); } for (int j = 0; j < numThreads; j++) { final Thread thread = newWorkerThread(connection, connectionFactory); workerThreads.add(thread); thread.start(); } } if (maxDurationTimeMs > 0) { newEndTimerThread(maxDurationTimeMs).start(); } statsThread.startReporting(); joinAllWorkerThreads(); stopTool(); } catch (final InterruptedException e) { stopTool(true); } catch (final LdapException e) { stopTool(true); printErrorMessage(app, e); return e.getResult().getResultCode().intValue(); } finally { closeSilently(connections); } return 0; } // detects wrong bind parameters, server unreachable (server down, network problem?), etc. private void validateCanConnectToServer(ConnectionFactory connectionFactory) throws LdapException { connectionFactory.getConnection().close(); } synchronized void stopTool() { stopTool(false); } synchronized void stopTool(final boolean stoppedByError) { if (!stopRequested) { stopRequested = true; statsThread.stopRecording(stoppedByError); } } void setBindRequest(final BindRequest request) { this.bindRequest = request; } protected void joinAllWorkerThreads() throws InterruptedException { for (final Thread t : workerThreads) { t.join(); } } double[] getPercentiles() { if (percentilesArgument.isPresent()) { double[] percentiles = new double[percentilesArgument.getValues().size()]; int index = 0; for (final String percentile : percentilesArgument.getValues()) { percentiles[index++] = Double.parseDouble(percentile); } Arrays.sort(percentiles); return percentiles; } return DEFAULT_PERCENTILES; } long getWarmUpDurationMs() { return warmUpDurationMs; } long getStatsInterval() { return statsIntervalMs; } }