| | |
| | | import java.lang.management.GarbageCollectorMXBean; |
| | | import java.lang.management.ManagementFactory; |
| | | import java.util.*; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.opendj.ldap.*; |
| | | import org.forgerock.opendj.ldap.responses.BindResult; |
| | | import org.forgerock.opendj.ldap.responses.ExtendedResult; |
| | | import org.forgerock.opendj.ldap.responses.Result; |
| | | |
| | |
| | | */ |
| | | abstract class PerformanceRunner implements ConnectionEventListener |
| | | { |
| | | abstract class ConnectionWorker |
| | | { |
| | | private final AtomicInteger operationsInFlight = new AtomicInteger(); |
| | | |
| | | private volatile int count; |
| | | |
| | | private final AsynchronousConnection staticConnection; |
| | | |
| | | private final ConnectionFactory connectionFactory; |
| | | |
| | | private final CountDownLatch latch = new CountDownLatch(1); |
| | | |
| | | |
| | | |
| | | ConnectionWorker(final AsynchronousConnection staticConnection, |
| | | final ConnectionFactory connectionFactory) |
| | | { |
| | | this.staticConnection = staticConnection; |
| | | this.connectionFactory = connectionFactory; |
| | | } |
| | | |
| | | |
| | | |
| | | public void operationCompleted(final AsynchronousConnection connection) |
| | | { |
| | | if (operationsInFlight.decrementAndGet() == 0 |
| | | && this.staticConnection == null) |
| | | { |
| | | connection.close(); |
| | | } |
| | | startWork(); |
| | | } |
| | | |
| | | |
| | | |
| | | public abstract FutureResult<?> performOperation( |
| | | final AsynchronousConnection connection, |
| | | final DataSource[] dataSources, final long startTime); |
| | | |
| | | |
| | | |
| | | public void startWork() |
| | | { |
| | | if (!stopRequested && !(maxIterations > 0 && count >= maxIterations)) |
| | | { |
| | | if (this.staticConnection == null) |
| | | { |
| | | connectionFactory |
| | | .getAsynchronousConnection(new ResultHandler<AsynchronousConnection>() |
| | | { |
| | | public void handleErrorResult(final ErrorResultException e) |
| | | { |
| | | app.println(LocalizableMessage.raw(e.getResult() |
| | | .getDiagnosticMessage())); |
| | | if (e.getCause() != null && app.isVerbose()) |
| | | { |
| | | e.getCause().printStackTrace(app.getErrorStream()); |
| | | } |
| | | stopRequested = true; |
| | | } |
| | | |
| | | |
| | | |
| | | public void handleResult(final AsynchronousConnection result) |
| | | { |
| | | doWork(result); |
| | | } |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | if (!noRebind |
| | | && this.staticConnection instanceof AuthenticatedAsynchronousConnection) |
| | | { |
| | | final AuthenticatedAsynchronousConnection ac = |
| | | (AuthenticatedAsynchronousConnection) this.staticConnection; |
| | | ac.rebind(new ResultHandler<BindResult>() |
| | | { |
| | | public void handleErrorResult(final ErrorResultException e) |
| | | { |
| | | app.println(LocalizableMessage.raw(e.getResult().toString())); |
| | | if (e.getCause() != null && app.isVerbose()) |
| | | { |
| | | e.getCause().printStackTrace(app.getErrorStream()); |
| | | } |
| | | stopRequested = true; |
| | | } |
| | | |
| | | |
| | | |
| | | public void handleResult(final BindResult result) |
| | | { |
| | | doWork(staticConnection); |
| | | } |
| | | }); |
| | | } |
| | | else |
| | | { |
| | | doWork(staticConnection); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | latch.countDown(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | public void waitFor() throws InterruptedException |
| | | { |
| | | latch.await(); |
| | | } |
| | | |
| | | |
| | | |
| | | private void doWork(final AsynchronousConnection connection) |
| | | { |
| | | long start; |
| | | double sleepTimeInMS = 0; |
| | | final int opsToPerform = isAsync ? numConcurrentTasks : numConcurrentTasks |
| | | - operationsInFlight.get(); |
| | | for (int i = 0; i < opsToPerform; i++) |
| | | { |
| | | if (maxIterations > 0 && count >= maxIterations) |
| | | { |
| | | break; |
| | | } |
| | | start = System.nanoTime(); |
| | | performOperation(connection, dataSources.get(), start); |
| | | operationRecentCount.getAndIncrement(); |
| | | operationsInFlight.getAndIncrement(); |
| | | count++; |
| | | |
| | | if (targetThroughput > 0) |
| | | { |
| | | try |
| | | { |
| | | if (sleepTimeInMS > 1) |
| | | { |
| | | Thread.sleep((long) Math.floor(sleepTimeInMS)); |
| | | } |
| | | } |
| | | catch (final InterruptedException e) |
| | | { |
| | | continue; |
| | | } |
| | | |
| | | sleepTimeInMS += targetTimeInMS |
| | | - ((System.nanoTime() - start) / 1000000.0); |
| | | if (sleepTimeInMS < -60000) |
| | | { |
| | | // If we fall behind by 60 seconds, just forget about |
| | | // catching up |
| | | sleepTimeInMS = -60000; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Statistics thread base implementation. |
| | | */ |
| | |
| | | class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S> |
| | | { |
| | | private final long startTime; |
| | | private final AsynchronousConnection connection; |
| | | private final ConnectionWorker worker; |
| | | |
| | | |
| | | |
| | | UpdateStatsResultHandler(final long startTime, |
| | | final AsynchronousConnection connection, final ConnectionWorker worker) |
| | | UpdateStatsResultHandler(final long startTime) |
| | | { |
| | | this.startTime = startTime; |
| | | this.connection = connection; |
| | | this.worker = worker; |
| | | } |
| | | |
| | | |
| | |
| | | { |
| | | app.println(LocalizableMessage.raw(error.getResult().toString())); |
| | | } |
| | | |
| | | worker.operationCompleted(connection); |
| | | } |
| | | |
| | | |
| | |
| | | { |
| | | successRecentCount.getAndIncrement(); |
| | | updateStats(); |
| | | worker.operationCompleted(connection); |
| | | } |
| | | |
| | | |
| | |
| | | AsynchronousConnection connection; |
| | | |
| | | final double targetTimeInMS = |
| | | (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0; |
| | | (1000.0 / (targetThroughput / (double) (numThreads * numConnections))); |
| | | double sleepTimeInMS = 0; |
| | | long start; |
| | | while (!stopRequested && !(maxIterations > 0 && count >= maxIterations)) |
| | |
| | | |
| | | private volatile boolean stopRequested; |
| | | |
| | | private int numConcurrentTasks; |
| | | private int numThreads; |
| | | |
| | | private int numConnections; |
| | | |
| | |
| | | |
| | | private int statsInterval; |
| | | |
| | | private double targetTimeInMS; |
| | | |
| | | private final IntegerArgument numConcurrentTasksArgument; |
| | | private final IntegerArgument numThreadsArgument; |
| | | |
| | | private final IntegerArgument maxIterationsArgument; |
| | | |
| | |
| | | throws ArgumentException |
| | | { |
| | | this.app = app; |
| | | numConcurrentTasksArgument = new IntegerArgument("numConcurrentTasks", 't', |
| | | "numConcurrentTasks", false, false, true, |
| | | LocalizableMessage.raw("{numConcurrentTasks}"), 1, null, true, 1, |
| | | numThreadsArgument = new IntegerArgument("numThreads", 't', |
| | | "numThreads", false, false, true, |
| | | LocalizableMessage.raw("{numThreads}"), 1, null, true, 1, |
| | | false, 0, |
| | | LocalizableMessage.raw("Number of concurrent tasks per connection")); |
| | | numConcurrentTasksArgument.setPropertyName("numConcurrentTasks"); |
| | | LocalizableMessage.raw("Number of worker threads per connection")); |
| | | numThreadsArgument.setPropertyName("numThreads"); |
| | | if (!alwaysSingleThreaded) |
| | | { |
| | | argParser.addArgument(numConcurrentTasksArgument); |
| | | argParser.addArgument(numThreadsArgument); |
| | | } |
| | | else |
| | | { |
| | | numConcurrentTasksArgument.addValue("1"); |
| | | numThreadsArgument.addValue("1"); |
| | | } |
| | | |
| | | numConnectionsArgument = new IntegerArgument("numConnections", 'c', |
| | |
| | | public final void validate() throws ArgumentException |
| | | { |
| | | numConnections = numConnectionsArgument.getIntValue(); |
| | | numConcurrentTasks = numConcurrentTasksArgument.getIntValue(); |
| | | maxIterations = maxIterationsArgument.getIntValue() / numConnections; |
| | | numThreads = numThreadsArgument.getIntValue(); |
| | | maxIterations = maxIterationsArgument.getIntValue() |
| | | / numConnections / numThreads; |
| | | statsInterval = statsIntervalArgument.getIntValue() * 1000; |
| | | targetThroughput = targetThroughputArgument.getIntValue(); |
| | | |
| | | isAsync = asyncArgument.isPresent(); |
| | | noRebind = noRebindArgument.isPresent(); |
| | | |
| | | if (!noRebindArgument.isPresent() && this.numConcurrentTasks > 1) |
| | | if (!noRebindArgument.isPresent() && this.numThreads > 1) |
| | | { |
| | | throw new ArgumentException(LocalizableMessage.raw("--" |
| | | + noRebindArgument.getLongIdentifier() + " must be used if --" |
| | | + numConcurrentTasksArgument.getLongIdentifier() + " is > 1")); |
| | | + numThreadsArgument.getLongIdentifier() + " is > 1")); |
| | | } |
| | | |
| | | if (!noRebindArgument.isPresent() && asyncArgument.isPresent()) |
| | |
| | | } |
| | | |
| | | dataSourcePrototypes = DataSource.parse(arguments.getValues()); |
| | | |
| | | targetTimeInMS = |
| | | (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0; |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | |
| | | |
| | | abstract ConnectionWorker newConnectionWorker( |
| | | abstract WorkerThread newWorkerThread( |
| | | final AsynchronousConnection connection, |
| | | final ConnectionFactory connectionFactory); |
| | | |
| | |
| | | |
| | | final int run(final ConnectionFactory connectionFactory) |
| | | { |
| | | final List<ConnectionWorker> workers = new ArrayList<ConnectionWorker>(); |
| | | final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>(); |
| | | final List<Thread> threads = new ArrayList<Thread>(); |
| | | final List<AsynchronousConnection> connections = |
| | | new ArrayList<AsynchronousConnection>(); |
| | | |
| | | AsynchronousConnection connection = null; |
| | | try |
| | |
| | | connection.addConnectionEventListener(this); |
| | | connections.add(connection); |
| | | } |
| | | final ConnectionWorker worker = newConnectionWorker(connection, |
| | | connectionFactory); |
| | | workers.add(worker); |
| | | worker.startWork(); |
| | | for (int j = 0; j < numThreads; j++) |
| | | { |
| | | final Thread thread = newWorkerThread(connection, connectionFactory); |
| | | threads.add(thread); |
| | | thread.start(); |
| | | } |
| | | } |
| | | |
| | | final Thread statsThread = newStatsThread(); |
| | | statsThread.start(); |
| | | |
| | | for (final ConnectionWorker w : workers) |
| | | for (final Thread t : threads) |
| | | { |
| | | w.waitFor(); |
| | | t.join(); |
| | | } |
| | | stopRequested = true; |
| | | statsThread.join(); |