mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
22.06.2010 18b64d8efe7da2095468d1937a379ba7d7083d27
sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
@@ -33,16 +33,18 @@
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.sun.opends.sdk.util.StaticUtils;
import org.opends.sdk.*;
import org.opends.sdk.responses.BindResult;
import org.opends.sdk.responses.ExtendedResult;
import org.opends.sdk.responses.Result;
import com.sun.opends.sdk.tools.AuthenticatedConnectionFactory.AuthenticatedAsynchronousConnection;
import com.sun.opends.sdk.util.StaticUtils;
@@ -51,6 +53,170 @@
 */
abstract class PerformanceRunner implements ConnectionEventListener
{
  abstract class ConnectionWorker
  {
    private final AtomicInteger operationsInFlight = new AtomicInteger();
    private volatile int count;
    private final AsynchronousConnection staticConnection;
    private final ConnectionFactory connectionFactory;
    private final CountDownLatch latch = new CountDownLatch(1);
    ConnectionWorker(final AsynchronousConnection staticConnection,
        final ConnectionFactory connectionFactory)
    {
      this.staticConnection = staticConnection;
      this.connectionFactory = connectionFactory;
    }
    public void operationCompleted(final AsynchronousConnection connection)
    {
      if (operationsInFlight.decrementAndGet() == 0
          && this.staticConnection == null)
      {
        connection.close();
      }
      startWork();
    }
    public abstract FutureResult<?> performOperation(
        final AsynchronousConnection connection,
        final DataSource[] dataSources, final long startTime);
    public void startWork()
    {
      if (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
      {
        if (this.staticConnection == null)
        {
          connectionFactory
              .getAsynchronousConnection(new ResultHandler<AsynchronousConnection>()
              {
                public void handleErrorResult(final ErrorResultException e)
                {
                  app.println(LocalizableMessage.raw(e.getResult()
                      .getDiagnosticMessage()));
                  if (e.getCause() != null && app.isVerbose())
                  {
                    e.getCause().printStackTrace(app.getErrorStream());
                  }
                  stopRequested = true;
                }
                public void handleResult(final AsynchronousConnection result)
                {
                  doWork(result);
                }
              });
        }
        else
        {
          if (!noRebind
              && this.staticConnection instanceof AuthenticatedAsynchronousConnection)
          {
            final AuthenticatedAsynchronousConnection ac =
              (AuthenticatedAsynchronousConnection) this.staticConnection;
            ac.rebind(new ResultHandler<BindResult>()
            {
              public void handleErrorResult(final ErrorResultException e)
              {
                app.println(LocalizableMessage.raw(e.getResult().toString()));
                if (e.getCause() != null && app.isVerbose())
                {
                  e.getCause().printStackTrace(app.getErrorStream());
                }
                stopRequested = true;
              }
              public void handleResult(final BindResult result)
              {
                doWork(staticConnection);
              }
            });
          }
          else
          {
            doWork(staticConnection);
          }
        }
      }
      else
      {
        latch.countDown();
      }
    }
    public void waitFor() throws InterruptedException
    {
      latch.await();
    }
    private void doWork(final AsynchronousConnection connection)
    {
      long start;
      double sleepTimeInMS = 0;
      final int opsToPerform = isAsync ? numConcurrentTasks : numConcurrentTasks
          - operationsInFlight.get();
      for (int i = 0; i < opsToPerform; i++)
      {
        if (maxIterations > 0 && count >= maxIterations)
        {
          break;
        }
        start = System.nanoTime();
        performOperation(connection, dataSources.get(), start);
        operationRecentCount.getAndIncrement();
        operationsInFlight.getAndIncrement();
        count++;
        if (targetThroughput > 0)
        {
          try
          {
            if (sleepTimeInMS > 1)
            {
              Thread.sleep((long) Math.floor(sleepTimeInMS));
            }
          }
          catch (final InterruptedException e)
          {
            continue;
          }
          sleepTimeInMS += targetTimeInMS
              - ((System.nanoTime() - start) / 1000000.0);
          if (sleepTimeInMS < -60000)
          {
            // If we fall behind by 60 seconds, just forget about
            // catching up
            sleepTimeInMS = -60000;
          }
        }
      }
    }
  }
  /**
   * Statistics thread base implementation.
   */
@@ -263,8 +429,8 @@
        if (resultCount > 0)
        {
          strings[2] = String.format("%.3f",
              (waitTime - (gcDuration - lastGCDuration))
                  / (double) resultCount / 1000000.0);
              (waitTime - (gcDuration - lastGCDuration)) / (double) resultCount
                  / 1000000.0);
        }
        else
        {
@@ -370,7 +536,7 @@
        {
          // Script-friendly.
          app.getOutputStream().print(averageDuration);
          for (String s : strings)
          for (final String s : strings)
          {
            app.getOutputStream().print(",");
            app.getOutputStream().print(s);
@@ -399,12 +565,17 @@
  class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S>
  {
    private final long startTime;
    private final AsynchronousConnection connection;
    private final ConnectionWorker worker;
    UpdateStatsResultHandler(final long startTime)
    UpdateStatsResultHandler(final long startTime,
        final AsynchronousConnection connection, final ConnectionWorker worker)
    {
      this.startTime = startTime;
      this.connection = connection;
      this.worker = worker;
    }
@@ -418,6 +589,8 @@
      {
        app.println(LocalizableMessage.raw(error.getResult().toString()));
      }
      worker.operationCompleted(connection);
    }
@@ -426,6 +599,7 @@
    {
      successRecentCount.getAndIncrement();
      updateStats();
      worker.operationCompleted(connection);
    }
@@ -487,8 +661,7 @@
      AsynchronousConnection connection;
      final double targetTimeInMS =
        (1.0 / (targetThroughput /
            (double) (numThreads * numConnections))) * 1000.0;
        (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
      double sleepTimeInMS = 0;
      long start;
      while (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
@@ -797,20 +970,20 @@
  private final AtomicLong waitRecentTime = new AtomicLong();
  private final AtomicReference<ReversableArray> eTimeBuffer =
    new AtomicReference<ReversableArray>(new ReversableArray(100000));
  private final AtomicReference<ReversableArray> eTimeBuffer = new AtomicReference<ReversableArray>(
      new ReversableArray(100000));
  private final ConsoleApplication app;
  private DataSource[] dataSourcePrototypes;
  // Thread local copies of the data sources
  private final ThreadLocal<DataSource[]> dataSources =
    new ThreadLocal<DataSource[]>()
  private final ThreadLocal<DataSource[]> dataSources = new ThreadLocal<DataSource[]>()
  {
    /**
     * {@inheritDoc}
     */
    @Override
    protected DataSource[] initialValue()
    {
      final DataSource[] prototypes = getDataSources();
@@ -827,7 +1000,7 @@
  private volatile boolean stopRequested;
  private int numThreads;
  private int numConcurrentTasks;
  private int numConnections;
@@ -841,7 +1014,9 @@
  private int statsInterval;
  private final IntegerArgument numThreadsArgument;
  private double targetTimeInMS;
  private final IntegerArgument numConcurrentTasksArgument;
  private final IntegerArgument maxIterationsArgument;
@@ -864,52 +1039,52 @@
  PerformanceRunner(final ArgumentParser argParser,
                    final ConsoleApplication app,
                    boolean neverRebind, boolean neverAsynchronous,
                    boolean alwaysSingleThreaded)
      final ConsoleApplication app, final boolean neverRebind,
      final boolean neverAsynchronous, final boolean alwaysSingleThreaded)
      throws ArgumentException
  {
    this.app = app;
    numThreadsArgument = new IntegerArgument("numThreads", 't', "numThreads",
        false, false, true, LocalizableMessage.raw("{numThreads}"), 1, null,
        true, 1, false, 0, LocalizableMessage
            .raw("Number of worker threads per connection"));
    numThreadsArgument.setPropertyName("numThreads");
    if(!alwaysSingleThreaded)
    numConcurrentTasksArgument = new IntegerArgument("numConcurrentTasks", 't',
        "numConcurrentTasks", false, false, true,
        LocalizableMessage.raw("{numConcurrentTasks}"), 1, null, true, 1,
        false, 0,
        LocalizableMessage.raw("Number of concurrent tasks per connection"));
    numConcurrentTasksArgument.setPropertyName("numConcurrentTasks");
    if (!alwaysSingleThreaded)
    {
      argParser.addArgument(numThreadsArgument);
      argParser.addArgument(numConcurrentTasksArgument);
    }
    else
    {
      numThreadsArgument.addValue("1");
      numConcurrentTasksArgument.addValue("1");
    }
    numConnectionsArgument = new IntegerArgument("numConnections", 'c',
        "numConnections", false, false, true, LocalizableMessage
            .raw("{numConnections}"), 1, null, true, 1, false, 0,
        "numConnections", false, false, true,
        LocalizableMessage.raw("{numConnections}"), 1, null, true, 1, false, 0,
        LocalizableMessage.raw("Number of connections"));
    numConnectionsArgument.setPropertyName("numConnections");
    argParser.addArgument(numConnectionsArgument);
    maxIterationsArgument = new IntegerArgument("maxIterations", 'm',
        "maxIterations", false, false, true, LocalizableMessage
            .raw("{maxIterations}"), 0, null, LocalizableMessage
            .raw("Max iterations, 0 for unlimited"));
        "maxIterations", false, false, true,
        LocalizableMessage.raw("{maxIterations}"), 0, null,
        LocalizableMessage.raw("Max iterations, 0 for unlimited"));
    maxIterationsArgument.setPropertyName("maxIterations");
    argParser.addArgument(maxIterationsArgument);
    statsIntervalArgument = new IntegerArgument("statInterval", 'i',
        "statInterval", false, false, true, LocalizableMessage
            .raw("{statInterval}"), 5, null, true, 1, false, 0,
        "statInterval", false, false, true,
        LocalizableMessage.raw("{statInterval}"), 5, null, true, 1, false, 0,
        LocalizableMessage
            .raw("Display results each specified number of seconds"));
    statsIntervalArgument.setPropertyName("statInterval");
    argParser.addArgument(statsIntervalArgument);
    targetThroughputArgument = new IntegerArgument("targetThroughput", 'M',
        "targetThroughput", false, false, true, LocalizableMessage
            .raw("{targetThroughput}"), 0, null, LocalizableMessage
            .raw("Target average throughput to achieve"));
        "targetThroughput", false, false, true,
        LocalizableMessage.raw("{targetThroughput}"), 0, null,
        LocalizableMessage.raw("Target average throughput to achieve"));
    targetThroughputArgument.setPropertyName("targetThroughput");
    argParser.addArgument(targetThroughputArgument);
@@ -929,7 +1104,7 @@
    noRebindArgument = new BooleanArgument("noRebind", 'F', "noRebind",
        LocalizableMessage.raw("Keep connections open and don't rebind"));
    noRebindArgument.setPropertyName("noRebind");
    if(!neverRebind)
    if (!neverRebind)
    {
      argParser.addArgument(noRebindArgument);
    }
@@ -939,24 +1114,30 @@
    }
    asyncArgument = new BooleanArgument("asynchronous", 'A', "asynchronous",
        LocalizableMessage.raw("Use asynchronous mode and don't " +
            "wait for results before sending the next request"));
        LocalizableMessage.raw("Use asynchronous mode and don't "
            + "wait for results before sending the next request"));
    asyncArgument.setPropertyName("asynchronous");
    if(!neverAsynchronous)
    if (!neverAsynchronous)
    {
      argParser.addArgument(asyncArgument);
    }
    arguments = new StringArgument("argument", 'g', "argument", false, true,
        true, LocalizableMessage.raw("{generator function or static string}"),
        null, null,
        LocalizableMessage.raw("Argument used to evaluate the Java " +
            "style format strings in program parameters (ie. Base DN, " +
            "Search Filter). The set of all arguments provided form the " +
            "the argument list in order. Besides static string " +
            "arguments, they can be generated per iteration with the " +
            "following functions: " + StaticUtils.EOL +
            DataSource.getUsage()));
    arguments = new StringArgument(
        "argument",
        'g',
        "argument",
        false,
        true,
        true,
        LocalizableMessage.raw("{generator function or static string}"),
        null,
        null,
        LocalizableMessage.raw("Argument used to evaluate the Java "
            + "style format strings in program parameters (ie. Base DN, "
            + "Search Filter). The set of all arguments provided form the "
            + "the argument list in order. Besides static string "
            + "arguments, they can be generated per iteration with the "
            + "following functions: " + StaticUtils.EOL + DataSource.getUsage()));
    argParser.addArgument(arguments);
  }
@@ -986,8 +1167,7 @@
  public void handleUnsolicitedNotification(
      final ExtendedResult notification)
  public void handleUnsolicitedNotification(final ExtendedResult notification)
  {
    // Ignore
  }
@@ -997,20 +1177,19 @@
  public final void validate() throws ArgumentException
  {
    numConnections = numConnectionsArgument.getIntValue();
    numThreads = numThreadsArgument.getIntValue();
    maxIterations = maxIterationsArgument.getIntValue() /
        numConnections / numThreads;
    numConcurrentTasks = numConcurrentTasksArgument.getIntValue();
    maxIterations = maxIterationsArgument.getIntValue() / numConnections;
    statsInterval = statsIntervalArgument.getIntValue() * 1000;
    targetThroughput = targetThroughputArgument.getIntValue();
    isAsync = asyncArgument.isPresent();
    noRebind = noRebindArgument.isPresent();
    if (!noRebindArgument.isPresent() && this.numThreads > 1)
    if (!noRebindArgument.isPresent() && this.numConcurrentTasks > 1)
    {
      throw new ArgumentException(LocalizableMessage.raw("--"
          + noRebindArgument.getLongIdentifier() + " must be used if --"
          + numThreadsArgument.getLongIdentifier() + " is > 1"));
          + numConcurrentTasksArgument.getLongIdentifier() + " is > 1"));
    }
    if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
@@ -1021,6 +1200,9 @@
    }
    dataSourcePrototypes = DataSource.parse(arguments.getValues());
    targetTimeInMS =
      (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
  }
@@ -1037,22 +1219,22 @@
  abstract ConnectionWorker newConnectionWorker(
      final AsynchronousConnection connection,
      final ConnectionFactory connectionFactory);
  abstract StatsThread newStatsThread();
  abstract WorkerThread newWorkerThread(AsynchronousConnection connection,
      ConnectionFactory connectionFactory);
  final int run(final ConnectionFactory connectionFactory)
  {
    final List<Thread> threads = new ArrayList<Thread>();
    final List<ConnectionWorker> workers = new ArrayList<ConnectionWorker>();
    final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>();
    AsynchronousConnection connection = null;
    Thread thread;
    try
    {
      for (int i = 0; i < numConnections; i++)
@@ -1063,21 +1245,18 @@
          connection.addConnectionEventListener(this);
          connections.add(connection);
        }
        for (int j = 0; j < numThreads; j++)
        {
          thread = newWorkerThread(connection, connectionFactory);
          threads.add(thread);
          thread.start();
        }
        final ConnectionWorker worker = newConnectionWorker(connection,
            connectionFactory);
        workers.add(worker);
        worker.startWork();
      }
      final Thread statsThread = newStatsThread();
      statsThread.start();
      for (final Thread t : threads)
      for (final ConnectionWorker w : workers)
      {
        t.join();
        w.waitFor();
      }
      stopRequested = true;
      statsThread.join();