mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
28.47.2010 f2160f4bd1c8ac67e5a86a6710d431e8932877f9
sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
@@ -38,491 +38,23 @@
import java.util.concurrent.atomic.AtomicReference;
import org.opends.sdk.*;
import org.opends.sdk.responses.ExtendedResult;
import org.opends.sdk.responses.Result;
import com.sun.opends.sdk.tools.AuthenticatedConnectionFactory.AuthenticatedAsynchronousConnection;
/**
 * Benchmark application framework.
 */
abstract class PerformanceRunner
abstract class PerformanceRunner implements ConnectionEventListener
{
  private final AtomicInteger operationRecentCount = new AtomicInteger();
  private final AtomicInteger successRecentCount = new AtomicInteger();
  private final AtomicInteger failedRecentCount = new AtomicInteger();
  private final AtomicLong waitRecentTime = new AtomicLong();
  private final AtomicReference<ReversableArray> eTimeBuffer = new AtomicReference<ReversableArray>(
      new ReversableArray(100000));
  private final ConsoleApplication app;
  private final ThreadLocal<DataSource[]> dataSources = new ThreadLocal<DataSource[]>();
  private volatile boolean stopRequested;
  private int numThreads;
  private int numConnections;
  private int targetThroughput;
  private int maxIterations;
  private boolean isAsync;
  private boolean noRebind;
  private int statsInterval;
  private IntegerArgument numThreadsArgument;
  private IntegerArgument maxIterationsArgument;
  private IntegerArgument statsIntervalArgument;
  private IntegerArgument targetThroughputArgument;
  private IntegerArgument numConnectionsArgument;
  private IntegerArgument percentilesArgument;
  private BooleanArgument keepConnectionsOpen;
  private BooleanArgument noRebindArgument;
  private BooleanArgument asyncArgument;
  private StringArgument arguments;
  PerformanceRunner(ArgumentParser argParser, ConsoleApplication app)
      throws ArgumentException
  {
    this.app = app;
    numThreadsArgument = new IntegerArgument("numThreads", 't',
        "numThreads", false, false, true, LocalizableMessage.raw("{numThreads}"),
        1, null, true, 1, false, 0, LocalizableMessage
            .raw("number of search threads per connection"));
    numThreadsArgument.setPropertyName("numThreads");
    argParser.addArgument(numThreadsArgument);
    numConnectionsArgument = new IntegerArgument("numConnections", 'c',
        "numConnections", false, false, true, LocalizableMessage
            .raw("{numConnections}"), 1, null, true, 1, false, 0,
        LocalizableMessage.raw("number of connections"));
    numThreadsArgument.setPropertyName("numConnections");
    argParser.addArgument(numConnectionsArgument);
    maxIterationsArgument = new IntegerArgument("maxIterations", 'm',
        "maxIterations", false, false, true, LocalizableMessage
            .raw("{maxIterations}"), 0, null, LocalizableMessage
            .raw("max searches per thread, 0 for unlimited"));
    numThreadsArgument.setPropertyName("maxIterations");
    argParser.addArgument(maxIterationsArgument);
    statsIntervalArgument = new IntegerArgument("statInterval", 'i',
        "statInterval", false, false, true, LocalizableMessage
            .raw("{statInterval}"), 5, null, true, 1, false, 0, LocalizableMessage
            .raw("Display results each specified number of seconds"));
    numThreadsArgument.setPropertyName("statInterval");
    argParser.addArgument(statsIntervalArgument);
    targetThroughputArgument = new IntegerArgument("targetThroughput",
        'M', "targetThroughput", false, false, true, LocalizableMessage
            .raw("{targetThroughput}"), 0, null, LocalizableMessage
            .raw("Target average throughput to achieve"));
    targetThroughputArgument.setPropertyName("targetThroughput");
    argParser.addArgument(targetThroughputArgument);
    percentilesArgument = new IntegerArgument("percentile", 'e',
        "percentile", false, true, LocalizableMessage.raw("{percentile}"), true,
        50, true, 100, LocalizableMessage.raw("Calculate max response time for a "
            + "percentile of operations"));
    percentilesArgument.setPropertyName("percentile");
    argParser.addArgument(percentilesArgument);
    keepConnectionsOpen = new BooleanArgument("keepConnectionsOpen",
        'f', "keepConnectionsOpen", LocalizableMessage
            .raw("keep connections open"));
    keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
    argParser.addArgument(keepConnectionsOpen);
    noRebindArgument = new BooleanArgument("noRebind", 'F', "noRebind",
        LocalizableMessage.raw("keep connections open and don't rebind"));
    keepConnectionsOpen.setPropertyName("noRebind");
    argParser.addArgument(noRebindArgument);
    asyncArgument = new BooleanArgument("asynchronous", 'A',
        "asynchronous", LocalizableMessage.raw("asynch, don't wait for results"));
    keepConnectionsOpen.setPropertyName("asynchronous");
    argParser.addArgument(asyncArgument);
    arguments = new StringArgument(
        "arguments",
        'g',
        "arguments",
        false,
        true,
        true,
        LocalizableMessage.raw("{arguments}"),
        null,
        null,
        LocalizableMessage
            .raw("arguments for variables in the filter and/or base DN"));
    arguments.setPropertyName("arguments");
    argParser.addArgument(arguments);
  }
  public void validate() throws ArgumentException
  {
    numConnections = numConnectionsArgument.getIntValue();
    numThreads = numThreadsArgument.getIntValue();
    maxIterations = maxIterationsArgument.getIntValue();
    statsInterval = statsIntervalArgument.getIntValue() * 1000;
    targetThroughput = targetThroughputArgument.getIntValue();
    isAsync = asyncArgument.isPresent();
    noRebind = noRebindArgument.isPresent();
    if (!noRebindArgument.isPresent() && this.numThreads > 1)
    {
      throw new ArgumentException(LocalizableMessage.raw("--"
          + noRebindArgument.getLongIdentifier()
          + " must be used if --"
          + numThreadsArgument.getLongIdentifier() + " is > 1"));
    }
    if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
    {
      throw new ArgumentException(LocalizableMessage.raw("--"
          + noRebindArgument.getLongIdentifier()
          + " must be used when using --"
          + asyncArgument.getLongIdentifier()));
    }
    try
    {
      DataSource.parse(arguments.getValues());
    }
    catch (IOException ioe)
    {
      throw new ArgumentException(LocalizableMessage
          .raw("Error occured while parsing arguments: "
              + ioe.toString()));
    }
  }
  final int run(ConnectionFactory connectionFactory)
  {
    List<Thread> threads = new ArrayList<Thread>();
    AsynchronousConnection connection = null;
    Thread thread;
    try
    {
      for (int i = 0; i < numConnections; i++)
      {
        if (keepConnectionsOpen.isPresent()
            || noRebindArgument.isPresent())
        {
          connection = connectionFactory.getAsynchronousConnection(
              null).get();
        }
        for (int j = 0; j < numThreads; j++)
        {
          thread = newWorkerThread(connection, connectionFactory);
          threads.add(thread);
          thread.start();
        }
      }
      Thread statsThread = newStatsThread();
      statsThread.start();
      for (Thread t : threads)
      {
        t.join();
      }
      stopRequested = true;
      statsThread.join();
    }
    catch (InterruptedException e)
    {
      stopRequested = true;
    }
    catch (ErrorResultException e)
    {
      stopRequested = true;
      app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
    }
    return 0;
  }
  final DataSource[] getDataSources()
  {
    try
    {
      return DataSource.parse(arguments.getValues());
    }
    catch (IOException ioe)
    {
      // Ignore as this shouldn've been handled eariler
    }
    return new DataSource[0];
  }
  abstract WorkerThread<?> newWorkerThread(
      AsynchronousConnection connection,
      ConnectionFactory connectionFactory);
  abstract StatsThread newStatsThread();
  class UpdateStatsResultHandler<S extends Result> implements
      ResultHandler<S>
  {
    private long eTime;
    UpdateStatsResultHandler(long eTime)
    {
      this.eTime = eTime;
    }
    public void handleResult(S result)
    {
      successRecentCount.getAndIncrement();
      eTime = System.nanoTime() - eTime;
      waitRecentTime.getAndAdd(eTime);
      synchronized (this)
      {
        ReversableArray array = eTimeBuffer.get();
        if (array.remaining() == 0)
        {
          array.set(array.size() - 1, eTime);
        }
        else
        {
          array.append(eTime);
        }
      }
    }
    public void handleErrorResult(ErrorResultException error)
    {
      failedRecentCount.getAndIncrement();
      app.println(LocalizableMessage.raw(error.getResult().toString()));
    }
    public long getETime()
    {
      return eTime;
    }
  }
  abstract class WorkerThread<R extends ResultHandler<?>> extends
      Thread
  {
    private int count;
    private final AsynchronousConnection connection;
    private final ConnectionFactory connectionFactory;
    WorkerThread(AsynchronousConnection connection,
        ConnectionFactory connectionFactory)
    {
      super("Worker Thread");
      this.connection = connection;
      this.connectionFactory = connectionFactory;
    }
    public abstract FutureResult<?> performOperation(
        AsynchronousConnection connection, R handler,
        DataSource[] dataSources);
    public abstract R getHandler(long startTime);
    public void run()
    {
      if (dataSources.get() == null)
      {
        try
        {
          dataSources.set(DataSource.parse(arguments.getValues()));
        }
        catch (IOException ioe)
        {
          // Ignore as this shouldn've been handled eariler
        }
      }
      FutureResult<?> future;
      AsynchronousConnection connection;
      R handler;
      double targetTimeInMS = (1.0 / (targetThroughput / (numThreads * numConnections))) * 1000.0;
      double sleepTimeInMS = 0;
      long start;
      while (!stopRequested
          && !(maxIterations > 0 && count >= maxIterations))
      {
        start = System.nanoTime();
        handler = getHandler(start);
        if (this.connection == null)
        {
          try
          {
            connection = connectionFactory.getAsynchronousConnection(
                null).get();
          }
          catch (InterruptedException e)
          {
            // Ignore and check stop requested
            continue;
          }
          catch (ErrorResultException e)
          {
            app.println(LocalizableMessage.raw(e.getResult()
                .getDiagnosticMessage()));
            if (e.getCause() != null && app.isVerbose())
            {
              e.getCause().printStackTrace(app.getErrorStream());
            }
            stopRequested = true;
            break;
          }
        }
        else
        {
          connection = this.connection;
          if (!noRebind
              && connection instanceof AuthenticatedAsynchronousConnection)
          {
            AuthenticatedAsynchronousConnection ac = (AuthenticatedAsynchronousConnection) connection;
            try
            {
              ac.rebind(null).get();
            }
            catch (InterruptedException e)
            {
              // Ignore and check stop requested
              continue;
            }
            catch (ErrorResultException e)
            {
              app.println(LocalizableMessage.raw(e.getResult().toString()));
              if (e.getCause() != null && app.isVerbose())
              {
                e.getCause().printStackTrace(app.getErrorStream());
              }
              stopRequested = true;
              break;
            }
          }
        }
        future = performOperation(connection, handler, dataSources
            .get());
        operationRecentCount.getAndIncrement();
        count++;
        if (!isAsync)
        {
          try
          {
            future.get();
          }
          catch (InterruptedException e)
          {
            // Ignore and check stop requested
            continue;
          }
          catch (ErrorResultException e)
          {
            if (e.getCause() instanceof IOException)
            {
              e.getCause().printStackTrace(app.getErrorStream());
              stopRequested = true;
              break;
            }
            // Ignore. Handled by result handler
          }
          if (this.connection == null)
          {
            connection.close();
          }
        }
        if (targetThroughput > 0)
        {
          try
          {
            if (sleepTimeInMS > 1)
            {
              sleep((long) Math.floor(sleepTimeInMS));
            }
          }
          catch (InterruptedException e)
          {
            continue;
          }
          sleepTimeInMS += targetTimeInMS
              - ((System.nanoTime() - start) / 1000000.0);
          if (sleepTimeInMS < -60000)
          {
            // If we fall behind by 60 seconds, just forget about
            // catching up
            sleepTimeInMS = -60000;
          }
        }
      }
    }
  }
  /**
   * Statistics thread base implementation.
   */
  class StatsThread extends Thread
  {
    protected final String[] EMPTY_STRINGS = new String[0];
    private final MultiColumnPrinter printer;
    private final List<GarbageCollectorMXBean> beans;
@@ -561,10 +93,10 @@
    public StatsThread(String[] additionalColumns)
    public StatsThread(final String[] additionalColumns)
    {
      super("Stats Thread");
      TreeSet<Double> pSet = new TreeSet<Double>();
      final TreeSet<Double> pSet = new TreeSet<Double>();
      if (!percentilesArgument.isPresent())
      {
        pSet.add(.1);
@@ -573,14 +105,14 @@
      }
      else
      {
        for (String percentile : percentilesArgument.getValues())
        for (final String percentile : percentilesArgument.getValues())
        {
          pSet.add(100.0 - Double.parseDouble(percentile));
        }
      }
      this.percentiles = pSet.descendingSet();
      numColumns = 5 + this.percentiles.size()
          + additionalColumns.length + (isAsync ? 1 : 0);
      numColumns = 5 + this.percentiles.size() + additionalColumns.length
          + (isAsync ? 1 : 0);
      printer = new MultiColumnPrinter(numColumns, 2, "-",
          MultiColumnPrinter.RIGHT, app);
      printer.setTitleAlign(MultiColumnPrinter.RIGHT);
@@ -607,7 +139,7 @@
      title[2] = "recent";
      title[3] = "average";
      int i = 4;
      for (Double percentile : this.percentiles)
      for (final Double percentile : this.percentiles)
      {
        title[i++] = Double.toString(100.0 - percentile) + "%";
      }
@@ -616,7 +148,7 @@
      {
        title[i++] = "req/res";
      }
      for (String column : additionalColumns)
      for (final String column : additionalColumns)
      {
        title[i++] = column;
      }
@@ -628,24 +160,17 @@
    String[] getAdditionalColumns()
    {
      return EMPTY_STRINGS;
    }
    @Override
    public void run()
    {
      printer.printTitle();
      String[] strings = new String[numColumns];
      final String[] strings = new String[numColumns];
      long startTime = System.currentTimeMillis();
      final long startTime = System.currentTimeMillis();
      long statTime = startTime;
      long gcDuration = 0;
      for (GarbageCollectorMXBean bean : beans)
      for (final GarbageCollectorMXBean bean : beans)
      {
        gcDuration += bean.getCollectionTime();
      }
@@ -655,7 +180,7 @@
        {
          sleep(statsInterval);
        }
        catch (InterruptedException ie)
        catch (final InterruptedException ie)
        {
          // Ignore.
        }
@@ -665,7 +190,7 @@
        lastGCDuration = gcDuration;
        gcDuration = 0;
        for (GarbageCollectorMXBean bean : beans)
        for (final GarbageCollectorMXBean bean : beans)
        {
          gcDuration += bean.getCollectionTime();
        }
@@ -684,19 +209,31 @@
        averageDuration -= gcDuration;
        recentDuration /= 1000.0;
        averageDuration /= 1000.0;
        strings[0] = String.format("%.1f", successCount
            / recentDuration);
        strings[1] = String.format("%.1f", totalSuccessCount
            / averageDuration);
        strings[2] = String.format("%.3f",
            (waitTime - (gcDuration - lastGCDuration)) / successCount
                / 1000000.0);
        strings[3] = String.format("%.3f", (totalWaitTime - gcDuration)
            / totalSuccessCount / 1000000.0);
        strings[0] = String.format("%.1f", successCount / recentDuration);
        strings[1] = String.format("%.1f", totalSuccessCount / averageDuration);
        if (successCount > 0)
        {
          strings[2] = String.format("%.3f",
              (waitTime - (gcDuration - lastGCDuration)) / successCount
                  / 1000000.0);
        }
        else
        {
          strings[2] = "-";
        }
        if (totalSuccessCount > 0)
        {
          strings[3] = String.format("%.3f", (totalWaitTime - gcDuration)
              / totalSuccessCount / 1000000.0);
        }
        else
        {
          strings[3] = "-";
        }
        boolean changed = false;
        etimes = eTimeBuffer.getAndSet(etimes);
        int appendLength = Math.min(array.remaining(), etimes.size());
        final int appendLength = Math.min(array.remaining(), etimes.size());
        if (appendLength > 0)
        {
          array.append(etimes, appendLength);
@@ -737,36 +274,288 @@
        // Now everything is ordered from smallest to largest
        int index;
        int i = 4;
        for (Double percent : percentiles)
        for (final Double percent : percentiles)
        {
          index = array.size()
              - (int) Math.floor((percent / 100.0) * totalSuccessCount)
              - 1;
          if (index < 0)
          if (array.size() <= 0)
          {
            strings[i++] = String.format("*%.3f",
                array.get(0) / 1000000.0);
            strings[i++] = "-";
          }
          else
          {
            strings[i++] = String.format("%.3f",
                array.get(index) / 1000000.0);
            index = array.size()
                - (int) Math.floor((percent / 100.0) * totalSuccessCount) - 1;
            if (index < 0)
            {
              strings[i++] = String.format("*%.3f", array.get(0) / 1000000.0);
            }
            else
            {
              strings[i++] = String
                  .format("%.3f", array.get(index) / 1000000.0);
            }
          }
        }
        strings[i++] = String.format("%.1f", totalFailedCount
            / averageDuration);
        strings[i++] = String
            .format("%.1f", totalFailedCount / averageDuration);
        if (isAsync)
        {
          strings[i++] = String.format("%.1f", (double) searchCount
              / successCount);
          if (successCount > 0)
          {
            strings[i++] = String.format("%.1f", (double) searchCount
                / successCount);
          }
          else
          {
            strings[i++] = "-";
          }
        }
        for (String column : getAdditionalColumns())
        for (final String column : getAdditionalColumns())
        {
          strings[i++] = column;
        }
        printer.printRow(strings);
      }
    }
    String[] getAdditionalColumns()
    {
      return EMPTY_STRINGS;
    }
  }
  /**
   * Statistics update result handler implementation.
   *
   * @param <S>
   *          The type of expected result.
   */
  class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S>
  {
    private long eTime;
    UpdateStatsResultHandler(final long eTime)
    {
      this.eTime = eTime;
    }
    public long getETime()
    {
      return eTime;
    }
    public void handleErrorResult(final ErrorResultException error)
    {
      failedRecentCount.getAndIncrement();
      app.println(LocalizableMessage.raw(error.getResult().toString()));
    }
    public void handleResult(final S result)
    {
      successRecentCount.getAndIncrement();
      eTime = System.nanoTime() - eTime;
      waitRecentTime.getAndAdd(eTime);
      synchronized (this)
      {
        final ReversableArray array = eTimeBuffer.get();
        if (array.remaining() == 0)
        {
          array.set(array.size() - 1, eTime);
        }
        else
        {
          array.append(eTime);
        }
      }
    }
  }
  /**
   * Worker thread base implementation.
   *
   * @param <R>
   *          Type of result handler.
   */
  abstract class WorkerThread<R extends ResultHandler<?>> extends Thread
  {
    private int count;
    private final AsynchronousConnection connection;
    private final ConnectionFactory connectionFactory;
    WorkerThread(final AsynchronousConnection connection,
        final ConnectionFactory connectionFactory)
    {
      super("Worker Thread");
      this.connection = connection;
      this.connectionFactory = connectionFactory;
    }
    public abstract R getHandler(long startTime);
    public abstract FutureResult<?> performOperation(
        AsynchronousConnection connection, R handler, DataSource[] dataSources);
    @Override
    public void run()
    {
      if (dataSources.get() == null)
      {
        try
        {
          dataSources.set(DataSource.parse(arguments.getValues()));
        }
        catch (final IOException ioe)
        {
          // Ignore as this shouldn've been handled eariler
        }
      }
      FutureResult<?> future;
      AsynchronousConnection connection;
      R handler;
      final double targetTimeInMS =
        (1.0 / (targetThroughput / (numThreads * numConnections))) * 1000.0;
      double sleepTimeInMS = 0;
      long start;
      while (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
      {
        start = System.nanoTime();
        handler = getHandler(start);
        if (this.connection == null)
        {
          try
          {
            connection = connectionFactory.getAsynchronousConnection(null)
                .get();
          }
          catch (final InterruptedException e)
          {
            // Ignore and check stop requested
            continue;
          }
          catch (final ErrorResultException e)
          {
            app.println(LocalizableMessage.raw(e.getResult()
                .getDiagnosticMessage()));
            if (e.getCause() != null && app.isVerbose())
            {
              e.getCause().printStackTrace(app.getErrorStream());
            }
            stopRequested = true;
            break;
          }
        }
        else
        {
          connection = this.connection;
          if (!noRebind
              && connection instanceof AuthenticatedAsynchronousConnection)
          {
            final AuthenticatedAsynchronousConnection ac =
              (AuthenticatedAsynchronousConnection) connection;
            try
            {
              ac.rebind(null).get();
            }
            catch (final InterruptedException e)
            {
              // Ignore and check stop requested
              continue;
            }
            catch (final ErrorResultException e)
            {
              app.println(LocalizableMessage.raw(e.getResult().toString()));
              if (e.getCause() != null && app.isVerbose())
              {
                e.getCause().printStackTrace(app.getErrorStream());
              }
              stopRequested = true;
              break;
            }
          }
        }
        future = performOperation(connection, handler, dataSources.get());
        operationRecentCount.getAndIncrement();
        count++;
        if (!isAsync)
        {
          try
          {
            future.get();
          }
          catch (final InterruptedException e)
          {
            // Ignore and check stop requested
            continue;
          }
          catch (final ErrorResultException e)
          {
            if (e.getCause() instanceof IOException)
            {
              e.getCause().printStackTrace(app.getErrorStream());
              stopRequested = true;
              break;
            }
            // Ignore. Handled by result handler
          }
          finally
          {
            if (this.connection == null)
            {
              connection.close();
            }
          }
        }
        if (targetThroughput > 0)
        {
          try
          {
            if (sleepTimeInMS > 1)
            {
              sleep((long) Math.floor(sleepTimeInMS));
            }
          }
          catch (final InterruptedException e)
          {
            continue;
          }
          sleepTimeInMS += targetTimeInMS
              - ((System.nanoTime() - start) / 1000000.0);
          if (sleepTimeInMS < -60000)
          {
            // If we fall behind by 60 seconds, just forget about
            // catching up
            sleepTimeInMS = -60000;
          }
        }
      }
    }
  }
@@ -781,64 +570,14 @@
    public ReversableArray(int capacity)
    public ReversableArray(final int capacity)
    {
      this.array = new long[capacity];
    }
    public void set(int index, long value)
    {
      if (index >= size)
      {
        throw new IndexOutOfBoundsException();
      }
      if (!reversed)
      {
        array[index] = value;
      }
      else
      {
        array[size - index - 1] = value;
      }
    }
    public long get(int index)
    {
      if (index >= size)
      {
        throw new IndexOutOfBoundsException();
      }
      if (!reversed)
      {
        return array[index];
      }
      else
      {
        return array[size - index - 1];
      }
    }
    public int size()
    {
      return size;
    }
    public void reverse()
    {
      reversed = !reversed;
    }
    public void append(long value)
    public void append(final long value)
    {
      if (size == array.length)
      {
@@ -859,7 +598,7 @@
    public void append(ReversableArray a, int length)
    public void append(final ReversableArray a, final int length)
    {
      if (length > a.size() || length > remaining())
      {
@@ -879,13 +618,6 @@
    public int remaining()
    {
      return array.length - size;
    }
    public void clear()
    {
      size = 0;
@@ -893,7 +625,57 @@
    public void siftDown(int start, int end)
    public long get(final int index)
    {
      if (index >= size)
      {
        throw new IndexOutOfBoundsException();
      }
      if (!reversed)
      {
        return array[index];
      }
      else
      {
        return array[size - index - 1];
      }
    }
    public int remaining()
    {
      return array.length - size;
    }
    public void reverse()
    {
      reversed = !reversed;
    }
    public void set(final int index, final long value)
    {
      if (index >= size)
      {
        throw new IndexOutOfBoundsException();
      }
      if (!reversed)
      {
        array[index] = value;
      }
      else
      {
        array[size - index - 1] = value;
      }
    }
    public void siftDown(final int start, final int end)
    {
      int root = start;
      int child;
@@ -918,7 +700,7 @@
    public void siftUp(int start, int end)
    public void siftUp(final int start, final int end)
    {
      int child = end;
      int parent;
@@ -939,11 +721,297 @@
    private void swap(int i, int i2)
    public int size()
    {
      long temp = get(i);
      return size;
    }
    private void swap(final int i, final int i2)
    {
      final long temp = get(i);
      set(i, get(i2));
      set(i2, temp);
    }
  }
  private static final String[] EMPTY_STRINGS = new String[0];
  private final AtomicInteger operationRecentCount = new AtomicInteger();
  private final AtomicInteger successRecentCount = new AtomicInteger();
  private final AtomicInteger failedRecentCount = new AtomicInteger();
  private final AtomicLong waitRecentTime = new AtomicLong();
  private final AtomicReference<ReversableArray> eTimeBuffer = new AtomicReference<ReversableArray>(
      new ReversableArray(100000));
  private final ConsoleApplication app;
  private final ThreadLocal<DataSource[]> dataSources = new ThreadLocal<DataSource[]>();
  private volatile boolean stopRequested;
  private int numThreads;
  private int numConnections;
  private int targetThroughput;
  private int maxIterations;
  private boolean isAsync;
  private boolean noRebind;
  private int statsInterval;
  private final IntegerArgument numThreadsArgument;
  private final IntegerArgument maxIterationsArgument;
  private final IntegerArgument statsIntervalArgument;
  private final IntegerArgument targetThroughputArgument;
  private final IntegerArgument numConnectionsArgument;
  private final IntegerArgument percentilesArgument;
  private final BooleanArgument keepConnectionsOpen;
  private final BooleanArgument noRebindArgument;
  private final BooleanArgument asyncArgument;
  private final StringArgument arguments;
  PerformanceRunner(final ArgumentParser argParser, final ConsoleApplication app)
      throws ArgumentException
  {
    this.app = app;
    numThreadsArgument = new IntegerArgument("numThreads", 't', "numThreads",
        false, false, true, LocalizableMessage.raw("{numThreads}"), 1, null,
        true, 1, false, 0, LocalizableMessage
            .raw("number of search threads per connection"));
    numThreadsArgument.setPropertyName("numThreads");
    argParser.addArgument(numThreadsArgument);
    numConnectionsArgument = new IntegerArgument("numConnections", 'c',
        "numConnections", false, false, true, LocalizableMessage
            .raw("{numConnections}"), 1, null, true, 1, false, 0,
        LocalizableMessage.raw("number of connections"));
    numThreadsArgument.setPropertyName("numConnections");
    argParser.addArgument(numConnectionsArgument);
    maxIterationsArgument = new IntegerArgument("maxIterations", 'm',
        "maxIterations", false, false, true, LocalizableMessage
            .raw("{maxIterations}"), 0, null, LocalizableMessage
            .raw("max searches per thread, 0 for unlimited"));
    numThreadsArgument.setPropertyName("maxIterations");
    argParser.addArgument(maxIterationsArgument);
    statsIntervalArgument = new IntegerArgument("statInterval", 'i',
        "statInterval", false, false, true, LocalizableMessage
            .raw("{statInterval}"), 5, null, true, 1, false, 0,
        LocalizableMessage
            .raw("Display results each specified number of seconds"));
    numThreadsArgument.setPropertyName("statInterval");
    argParser.addArgument(statsIntervalArgument);
    targetThroughputArgument = new IntegerArgument("targetThroughput", 'M',
        "targetThroughput", false, false, true, LocalizableMessage
            .raw("{targetThroughput}"), 0, null, LocalizableMessage
            .raw("Target average throughput to achieve"));
    targetThroughputArgument.setPropertyName("targetThroughput");
    argParser.addArgument(targetThroughputArgument);
    percentilesArgument = new IntegerArgument("percentile", 'e', "percentile",
        false, true, LocalizableMessage.raw("{percentile}"), true, 50, true,
        100, LocalizableMessage.raw("Calculate max response time for a "
            + "percentile of operations"));
    percentilesArgument.setPropertyName("percentile");
    argParser.addArgument(percentilesArgument);
    keepConnectionsOpen = new BooleanArgument("keepConnectionsOpen", 'f',
        "keepConnectionsOpen", LocalizableMessage.raw("keep connections open"));
    keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
    argParser.addArgument(keepConnectionsOpen);
    noRebindArgument = new BooleanArgument("noRebind", 'F', "noRebind",
        LocalizableMessage.raw("keep connections open and don't rebind"));
    keepConnectionsOpen.setPropertyName("noRebind");
    argParser.addArgument(noRebindArgument);
    asyncArgument = new BooleanArgument("asynchronous", 'A', "asynchronous",
        LocalizableMessage.raw("asynch, don't wait for results"));
    keepConnectionsOpen.setPropertyName("asynchronous");
    argParser.addArgument(asyncArgument);
    arguments = new StringArgument("arguments", 'g', "arguments", false, true,
        true, LocalizableMessage.raw("{arguments}"), null, null,
        LocalizableMessage
            .raw("arguments for variables in the filter and/or base DN"));
    arguments.setPropertyName("arguments");
    argParser.addArgument(arguments);
  }
  public void connectionClosed()
  {
    // Ignore
  }
  public synchronized void connectionErrorOccurred(
      final boolean isDisconnectNotification, final ErrorResultException error)
  {
    if (!stopRequested)
    {
      app.println(LocalizableMessage.raw("Error occurred on one or more "
          + "connections: " + error.getResult().toString()));
      if (error.getCause() != null && app.isVerbose())
      {
        error.getCause().printStackTrace(app.getErrorStream());
      }
      stopRequested = true;
    }
  }
  public void connectionReceivedUnsolicitedNotification(
      final ExtendedResult notification)
  {
    // Ignore
  }
  public void validate() throws ArgumentException
  {
    numConnections = numConnectionsArgument.getIntValue();
    numThreads = numThreadsArgument.getIntValue();
    maxIterations = maxIterationsArgument.getIntValue();
    statsInterval = statsIntervalArgument.getIntValue() * 1000;
    targetThroughput = targetThroughputArgument.getIntValue();
    isAsync = asyncArgument.isPresent();
    noRebind = noRebindArgument.isPresent();
    if (!noRebindArgument.isPresent() && this.numThreads > 1)
    {
      throw new ArgumentException(LocalizableMessage.raw("--"
          + noRebindArgument.getLongIdentifier() + " must be used if --"
          + numThreadsArgument.getLongIdentifier() + " is > 1"));
    }
    if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
    {
      throw new ArgumentException(LocalizableMessage.raw("--"
          + noRebindArgument.getLongIdentifier()
          + " must be used when using --" + asyncArgument.getLongIdentifier()));
    }
    try
    {
      DataSource.parse(arguments.getValues());
    }
    catch (final IOException ioe)
    {
      throw new ArgumentException(LocalizableMessage
          .raw("Error occured while parsing arguments: " + ioe.toString()));
    }
  }
  final DataSource[] getDataSources()
  {
    try
    {
      return DataSource.parse(arguments.getValues());
    }
    catch (final IOException ioe)
    {
      // Ignore as this shouldn've been handled eariler
    }
    return new DataSource[0];
  }
  abstract StatsThread newStatsThread();
  abstract WorkerThread<?> newWorkerThread(AsynchronousConnection connection,
      ConnectionFactory connectionFactory);
  final int run(final ConnectionFactory connectionFactory)
  {
    final List<Thread> threads = new ArrayList<Thread>();
    final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>();
    AsynchronousConnection connection = null;
    Thread thread;
    try
    {
      for (int i = 0; i < numConnections; i++)
      {
        if (keepConnectionsOpen.isPresent() || noRebindArgument.isPresent())
        {
          connection = connectionFactory.getAsynchronousConnection(null).get();
          connection.addConnectionEventListener(this);
          connections.add(connection);
        }
        for (int j = 0; j < numThreads; j++)
        {
          thread = newWorkerThread(connection, connectionFactory);
          threads.add(thread);
          thread.start();
        }
      }
      final Thread statsThread = newStatsThread();
      statsThread.start();
      for (final Thread t : threads)
      {
        t.join();
      }
      stopRequested = true;
      statsThread.join();
    }
    catch (final InterruptedException e)
    {
      stopRequested = true;
    }
    catch (final ErrorResultException e)
    {
      stopRequested = true;
      app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
    }
    finally
    {
      for (final AsynchronousConnection c : connections)
      {
        c.close();
      }
    }
    return 0;
  }
}