From f2160f4bd1c8ac67e5a86a6710d431e8932877f9 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 28 May 2010 11:47:51 +0000
Subject: [PATCH] Synchronize SDK on java.net with internal repository.

---
 sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java | 1228 ++++++++++++++++++++++++++++++---------------------------
 1 files changed, 648 insertions(+), 580 deletions(-)

diff --git a/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java b/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
index f1f4811..03c0bce 100644
--- a/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
+++ b/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
@@ -38,491 +38,23 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.opends.sdk.*;
+import org.opends.sdk.responses.ExtendedResult;
 import org.opends.sdk.responses.Result;
 
 import com.sun.opends.sdk.tools.AuthenticatedConnectionFactory.AuthenticatedAsynchronousConnection;
 
 
 
-
 /**
  * Benchmark application framework.
  */
-abstract class PerformanceRunner
+abstract class PerformanceRunner implements ConnectionEventListener
 {
-  private final AtomicInteger operationRecentCount = new AtomicInteger();
-
-  private final AtomicInteger successRecentCount = new AtomicInteger();
-
-  private final AtomicInteger failedRecentCount = new AtomicInteger();
-
-  private final AtomicLong waitRecentTime = new AtomicLong();
-
-  private final AtomicReference<ReversableArray> eTimeBuffer = new AtomicReference<ReversableArray>(
-      new ReversableArray(100000));
-
-  private final ConsoleApplication app;
-
-  private final ThreadLocal<DataSource[]> dataSources = new ThreadLocal<DataSource[]>();
-
-  private volatile boolean stopRequested;
-
-  private int numThreads;
-
-  private int numConnections;
-
-  private int targetThroughput;
-
-  private int maxIterations;
-
-  private boolean isAsync;
-
-  private boolean noRebind;
-
-  private int statsInterval;
-
-  private IntegerArgument numThreadsArgument;
-
-  private IntegerArgument maxIterationsArgument;
-
-  private IntegerArgument statsIntervalArgument;
-
-  private IntegerArgument targetThroughputArgument;
-
-  private IntegerArgument numConnectionsArgument;
-
-  private IntegerArgument percentilesArgument;
-
-  private BooleanArgument keepConnectionsOpen;
-
-  private BooleanArgument noRebindArgument;
-
-  private BooleanArgument asyncArgument;
-
-  private StringArgument arguments;
-
-
-
-  PerformanceRunner(ArgumentParser argParser, ConsoleApplication app)
-      throws ArgumentException
-  {
-    this.app = app;
-    numThreadsArgument = new IntegerArgument("numThreads", 't',
-        "numThreads", false, false, true, LocalizableMessage.raw("{numThreads}"),
-        1, null, true, 1, false, 0, LocalizableMessage
-            .raw("number of search threads per connection"));
-    numThreadsArgument.setPropertyName("numThreads");
-    argParser.addArgument(numThreadsArgument);
-
-    numConnectionsArgument = new IntegerArgument("numConnections", 'c',
-        "numConnections", false, false, true, LocalizableMessage
-            .raw("{numConnections}"), 1, null, true, 1, false, 0,
-        LocalizableMessage.raw("number of connections"));
-    numThreadsArgument.setPropertyName("numConnections");
-    argParser.addArgument(numConnectionsArgument);
-
-    maxIterationsArgument = new IntegerArgument("maxIterations", 'm',
-        "maxIterations", false, false, true, LocalizableMessage
-            .raw("{maxIterations}"), 0, null, LocalizableMessage
-            .raw("max searches per thread, 0 for unlimited"));
-    numThreadsArgument.setPropertyName("maxIterations");
-    argParser.addArgument(maxIterationsArgument);
-
-    statsIntervalArgument = new IntegerArgument("statInterval", 'i',
-        "statInterval", false, false, true, LocalizableMessage
-            .raw("{statInterval}"), 5, null, true, 1, false, 0, LocalizableMessage
-            .raw("Display results each specified number of seconds"));
-    numThreadsArgument.setPropertyName("statInterval");
-    argParser.addArgument(statsIntervalArgument);
-
-    targetThroughputArgument = new IntegerArgument("targetThroughput",
-        'M', "targetThroughput", false, false, true, LocalizableMessage
-            .raw("{targetThroughput}"), 0, null, LocalizableMessage
-            .raw("Target average throughput to achieve"));
-    targetThroughputArgument.setPropertyName("targetThroughput");
-    argParser.addArgument(targetThroughputArgument);
-
-    percentilesArgument = new IntegerArgument("percentile", 'e',
-        "percentile", false, true, LocalizableMessage.raw("{percentile}"), true,
-        50, true, 100, LocalizableMessage.raw("Calculate max response time for a "
-            + "percentile of operations"));
-    percentilesArgument.setPropertyName("percentile");
-    argParser.addArgument(percentilesArgument);
-
-    keepConnectionsOpen = new BooleanArgument("keepConnectionsOpen",
-        'f', "keepConnectionsOpen", LocalizableMessage
-            .raw("keep connections open"));
-    keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
-    argParser.addArgument(keepConnectionsOpen);
-
-    noRebindArgument = new BooleanArgument("noRebind", 'F', "noRebind",
-        LocalizableMessage.raw("keep connections open and don't rebind"));
-    keepConnectionsOpen.setPropertyName("noRebind");
-    argParser.addArgument(noRebindArgument);
-
-    asyncArgument = new BooleanArgument("asynchronous", 'A',
-        "asynchronous", LocalizableMessage.raw("asynch, don't wait for results"));
-    keepConnectionsOpen.setPropertyName("asynchronous");
-    argParser.addArgument(asyncArgument);
-
-    arguments = new StringArgument(
-        "arguments",
-        'g',
-        "arguments",
-        false,
-        true,
-        true,
-        LocalizableMessage.raw("{arguments}"),
-        null,
-        null,
-        LocalizableMessage
-            .raw("arguments for variables in the filter and/or base DN"));
-    arguments.setPropertyName("arguments");
-    argParser.addArgument(arguments);
-  }
-
-
-
-  public void validate() throws ArgumentException
-  {
-    numConnections = numConnectionsArgument.getIntValue();
-    numThreads = numThreadsArgument.getIntValue();
-    maxIterations = maxIterationsArgument.getIntValue();
-    statsInterval = statsIntervalArgument.getIntValue() * 1000;
-    targetThroughput = targetThroughputArgument.getIntValue();
-
-    isAsync = asyncArgument.isPresent();
-    noRebind = noRebindArgument.isPresent();
-
-    if (!noRebindArgument.isPresent() && this.numThreads > 1)
-    {
-      throw new ArgumentException(LocalizableMessage.raw("--"
-          + noRebindArgument.getLongIdentifier()
-          + " must be used if --"
-          + numThreadsArgument.getLongIdentifier() + " is > 1"));
-    }
-
-    if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
-    {
-      throw new ArgumentException(LocalizableMessage.raw("--"
-          + noRebindArgument.getLongIdentifier()
-          + " must be used when using --"
-          + asyncArgument.getLongIdentifier()));
-    }
-
-    try
-    {
-      DataSource.parse(arguments.getValues());
-    }
-    catch (IOException ioe)
-    {
-      throw new ArgumentException(LocalizableMessage
-          .raw("Error occured while parsing arguments: "
-              + ioe.toString()));
-    }
-  }
-
-
-
-  final int run(ConnectionFactory connectionFactory)
-  {
-    List<Thread> threads = new ArrayList<Thread>();
-
-    AsynchronousConnection connection = null;
-    Thread thread;
-    try
-    {
-      for (int i = 0; i < numConnections; i++)
-      {
-        if (keepConnectionsOpen.isPresent()
-            || noRebindArgument.isPresent())
-        {
-          connection = connectionFactory.getAsynchronousConnection(
-              null).get();
-        }
-        for (int j = 0; j < numThreads; j++)
-        {
-          thread = newWorkerThread(connection, connectionFactory);
-
-          threads.add(thread);
-          thread.start();
-        }
-      }
-
-      Thread statsThread = newStatsThread();
-      statsThread.start();
-
-      for (Thread t : threads)
-      {
-        t.join();
-      }
-      stopRequested = true;
-      statsThread.join();
-    }
-    catch (InterruptedException e)
-    {
-      stopRequested = true;
-    }
-    catch (ErrorResultException e)
-    {
-      stopRequested = true;
-      app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
-    }
-
-    return 0;
-  }
-
-
-
-  final DataSource[] getDataSources()
-  {
-    try
-    {
-      return DataSource.parse(arguments.getValues());
-    }
-    catch (IOException ioe)
-    {
-      // Ignore as this shouldn've been handled eariler
-    }
-    return new DataSource[0];
-  }
-
-
-
-  abstract WorkerThread<?> newWorkerThread(
-      AsynchronousConnection connection,
-      ConnectionFactory connectionFactory);
-
-
-
-  abstract StatsThread newStatsThread();
-
-
-
-  class UpdateStatsResultHandler<S extends Result> implements
-      ResultHandler<S>
-  {
-    private long eTime;
-
-
-
-    UpdateStatsResultHandler(long eTime)
-    {
-      this.eTime = eTime;
-    }
-
-
-
-    public void handleResult(S result)
-    {
-      successRecentCount.getAndIncrement();
-      eTime = System.nanoTime() - eTime;
-      waitRecentTime.getAndAdd(eTime);
-      synchronized (this)
-      {
-        ReversableArray array = eTimeBuffer.get();
-        if (array.remaining() == 0)
-        {
-          array.set(array.size() - 1, eTime);
-        }
-        else
-        {
-          array.append(eTime);
-        }
-      }
-    }
-
-
-
-    public void handleErrorResult(ErrorResultException error)
-    {
-      failedRecentCount.getAndIncrement();
-      app.println(LocalizableMessage.raw(error.getResult().toString()));
-    }
-
-
-
-    public long getETime()
-    {
-      return eTime;
-    }
-  }
-
-
-
-  abstract class WorkerThread<R extends ResultHandler<?>> extends
-      Thread
-  {
-    private int count;
-
-    private final AsynchronousConnection connection;
-
-    private final ConnectionFactory connectionFactory;
-
-
-
-    WorkerThread(AsynchronousConnection connection,
-        ConnectionFactory connectionFactory)
-    {
-      super("Worker Thread");
-      this.connection = connection;
-      this.connectionFactory = connectionFactory;
-    }
-
-
-
-    public abstract FutureResult<?> performOperation(
-        AsynchronousConnection connection, R handler,
-        DataSource[] dataSources);
-
-
-
-    public abstract R getHandler(long startTime);
-
-
-
-    public void run()
-    {
-      if (dataSources.get() == null)
-      {
-        try
-        {
-          dataSources.set(DataSource.parse(arguments.getValues()));
-        }
-        catch (IOException ioe)
-        {
-          // Ignore as this shouldn've been handled eariler
-        }
-      }
-
-      FutureResult<?> future;
-      AsynchronousConnection connection;
-      R handler;
-
-      double targetTimeInMS = (1.0 / (targetThroughput / (numThreads * numConnections))) * 1000.0;
-      double sleepTimeInMS = 0;
-      long start;
-      while (!stopRequested
-          && !(maxIterations > 0 && count >= maxIterations))
-      {
-        start = System.nanoTime();
-        handler = getHandler(start);
-
-        if (this.connection == null)
-        {
-          try
-          {
-            connection = connectionFactory.getAsynchronousConnection(
-                null).get();
-          }
-          catch (InterruptedException e)
-          {
-            // Ignore and check stop requested
-            continue;
-          }
-          catch (ErrorResultException e)
-          {
-            app.println(LocalizableMessage.raw(e.getResult()
-                .getDiagnosticMessage()));
-            if (e.getCause() != null && app.isVerbose())
-            {
-              e.getCause().printStackTrace(app.getErrorStream());
-            }
-            stopRequested = true;
-            break;
-          }
-        }
-        else
-        {
-          connection = this.connection;
-          if (!noRebind
-              && connection instanceof AuthenticatedAsynchronousConnection)
-          {
-            AuthenticatedAsynchronousConnection ac = (AuthenticatedAsynchronousConnection) connection;
-            try
-            {
-              ac.rebind(null).get();
-            }
-            catch (InterruptedException e)
-            {
-              // Ignore and check stop requested
-              continue;
-            }
-            catch (ErrorResultException e)
-            {
-              app.println(LocalizableMessage.raw(e.getResult().toString()));
-              if (e.getCause() != null && app.isVerbose())
-              {
-                e.getCause().printStackTrace(app.getErrorStream());
-              }
-              stopRequested = true;
-              break;
-            }
-          }
-        }
-        future = performOperation(connection, handler, dataSources
-            .get());
-        operationRecentCount.getAndIncrement();
-        count++;
-        if (!isAsync)
-        {
-          try
-          {
-            future.get();
-          }
-          catch (InterruptedException e)
-          {
-            // Ignore and check stop requested
-            continue;
-          }
-          catch (ErrorResultException e)
-          {
-            if (e.getCause() instanceof IOException)
-            {
-              e.getCause().printStackTrace(app.getErrorStream());
-              stopRequested = true;
-              break;
-            }
-            // Ignore. Handled by result handler
-          }
-          if (this.connection == null)
-          {
-            connection.close();
-          }
-        }
-        if (targetThroughput > 0)
-        {
-          try
-          {
-            if (sleepTimeInMS > 1)
-            {
-              sleep((long) Math.floor(sleepTimeInMS));
-            }
-          }
-          catch (InterruptedException e)
-          {
-            continue;
-          }
-
-          sleepTimeInMS += targetTimeInMS
-              - ((System.nanoTime() - start) / 1000000.0);
-          if (sleepTimeInMS < -60000)
-          {
-            // If we fall behind by 60 seconds, just forget about
-            // catching up
-            sleepTimeInMS = -60000;
-          }
-        }
-      }
-    }
-  }
-
-
-
+  /**
+   * Statistics thread base implementation.
+   */
   class StatsThread extends Thread
   {
-    protected final String[] EMPTY_STRINGS = new String[0];
-
     private final MultiColumnPrinter printer;
 
     private final List<GarbageCollectorMXBean> beans;
@@ -561,10 +93,10 @@
 
 
 
-    public StatsThread(String[] additionalColumns)
+    public StatsThread(final String[] additionalColumns)
     {
       super("Stats Thread");
-      TreeSet<Double> pSet = new TreeSet<Double>();
+      final TreeSet<Double> pSet = new TreeSet<Double>();
       if (!percentilesArgument.isPresent())
       {
         pSet.add(.1);
@@ -573,14 +105,14 @@
       }
       else
       {
-        for (String percentile : percentilesArgument.getValues())
+        for (final String percentile : percentilesArgument.getValues())
         {
           pSet.add(100.0 - Double.parseDouble(percentile));
         }
       }
       this.percentiles = pSet.descendingSet();
-      numColumns = 5 + this.percentiles.size()
-          + additionalColumns.length + (isAsync ? 1 : 0);
+      numColumns = 5 + this.percentiles.size() + additionalColumns.length
+          + (isAsync ? 1 : 0);
       printer = new MultiColumnPrinter(numColumns, 2, "-",
           MultiColumnPrinter.RIGHT, app);
       printer.setTitleAlign(MultiColumnPrinter.RIGHT);
@@ -607,7 +139,7 @@
       title[2] = "recent";
       title[3] = "average";
       int i = 4;
-      for (Double percentile : this.percentiles)
+      for (final Double percentile : this.percentiles)
       {
         title[i++] = Double.toString(100.0 - percentile) + "%";
       }
@@ -616,7 +148,7 @@
       {
         title[i++] = "req/res";
       }
-      for (String column : additionalColumns)
+      for (final String column : additionalColumns)
       {
         title[i++] = column;
       }
@@ -628,24 +160,17 @@
 
 
 
-    String[] getAdditionalColumns()
-    {
-      return EMPTY_STRINGS;
-    }
-
-
-
     @Override
     public void run()
     {
       printer.printTitle();
 
-      String[] strings = new String[numColumns];
+      final String[] strings = new String[numColumns];
 
-      long startTime = System.currentTimeMillis();
+      final long startTime = System.currentTimeMillis();
       long statTime = startTime;
       long gcDuration = 0;
-      for (GarbageCollectorMXBean bean : beans)
+      for (final GarbageCollectorMXBean bean : beans)
       {
         gcDuration += bean.getCollectionTime();
       }
@@ -655,7 +180,7 @@
         {
           sleep(statsInterval);
         }
-        catch (InterruptedException ie)
+        catch (final InterruptedException ie)
         {
           // Ignore.
         }
@@ -665,7 +190,7 @@
 
         lastGCDuration = gcDuration;
         gcDuration = 0;
-        for (GarbageCollectorMXBean bean : beans)
+        for (final GarbageCollectorMXBean bean : beans)
         {
           gcDuration += bean.getCollectionTime();
         }
@@ -684,19 +209,31 @@
         averageDuration -= gcDuration;
         recentDuration /= 1000.0;
         averageDuration /= 1000.0;
-        strings[0] = String.format("%.1f", successCount
-            / recentDuration);
-        strings[1] = String.format("%.1f", totalSuccessCount
-            / averageDuration);
-        strings[2] = String.format("%.3f",
-            (waitTime - (gcDuration - lastGCDuration)) / successCount
-                / 1000000.0);
-        strings[3] = String.format("%.3f", (totalWaitTime - gcDuration)
-            / totalSuccessCount / 1000000.0);
+        strings[0] = String.format("%.1f", successCount / recentDuration);
+        strings[1] = String.format("%.1f", totalSuccessCount / averageDuration);
+        if (successCount > 0)
+        {
+          strings[2] = String.format("%.3f",
+              (waitTime - (gcDuration - lastGCDuration)) / successCount
+                  / 1000000.0);
+        }
+        else
+        {
+          strings[2] = "-";
+        }
+        if (totalSuccessCount > 0)
+        {
+          strings[3] = String.format("%.3f", (totalWaitTime - gcDuration)
+              / totalSuccessCount / 1000000.0);
+        }
+        else
+        {
+          strings[3] = "-";
+        }
 
         boolean changed = false;
         etimes = eTimeBuffer.getAndSet(etimes);
-        int appendLength = Math.min(array.remaining(), etimes.size());
+        final int appendLength = Math.min(array.remaining(), etimes.size());
         if (appendLength > 0)
         {
           array.append(etimes, appendLength);
@@ -737,36 +274,288 @@
         // Now everything is ordered from smallest to largest
         int index;
         int i = 4;
-        for (Double percent : percentiles)
+        for (final Double percent : percentiles)
         {
-          index = array.size()
-              - (int) Math.floor((percent / 100.0) * totalSuccessCount)
-              - 1;
-          if (index < 0)
+          if (array.size() <= 0)
           {
-            strings[i++] = String.format("*%.3f",
-                array.get(0) / 1000000.0);
+            strings[i++] = "-";
           }
           else
           {
-            strings[i++] = String.format("%.3f",
-                array.get(index) / 1000000.0);
+            index = array.size()
+                - (int) Math.floor((percent / 100.0) * totalSuccessCount) - 1;
+            if (index < 0)
+            {
+              strings[i++] = String.format("*%.3f", array.get(0) / 1000000.0);
+            }
+            else
+            {
+              strings[i++] = String
+                  .format("%.3f", array.get(index) / 1000000.0);
+            }
           }
         }
-        strings[i++] = String.format("%.1f", totalFailedCount
-            / averageDuration);
+        strings[i++] = String
+            .format("%.1f", totalFailedCount / averageDuration);
         if (isAsync)
         {
-          strings[i++] = String.format("%.1f", (double) searchCount
-              / successCount);
+          if (successCount > 0)
+          {
+            strings[i++] = String.format("%.1f", (double) searchCount
+                / successCount);
+          }
+          else
+          {
+            strings[i++] = "-";
+          }
         }
-        for (String column : getAdditionalColumns())
+        for (final String column : getAdditionalColumns())
         {
           strings[i++] = column;
         }
         printer.printRow(strings);
       }
     }
+
+
+
+    String[] getAdditionalColumns()
+    {
+      return EMPTY_STRINGS;
+    }
+  }
+
+
+
+  /**
+   * Statistics update result handler implementation.
+   *
+   * @param <S>
+   *          The type of expected result.
+   */
+  class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S>
+  {
+    private long eTime;
+
+
+
+    UpdateStatsResultHandler(final long eTime)
+    {
+      this.eTime = eTime;
+    }
+
+
+
+    public long getETime()
+    {
+      return eTime;
+    }
+
+
+
+    public void handleErrorResult(final ErrorResultException error)
+    {
+      failedRecentCount.getAndIncrement();
+      app.println(LocalizableMessage.raw(error.getResult().toString()));
+    }
+
+
+
+    public void handleResult(final S result)
+    {
+      successRecentCount.getAndIncrement();
+      eTime = System.nanoTime() - eTime;
+      waitRecentTime.getAndAdd(eTime);
+      synchronized (this)
+      {
+        final ReversableArray array = eTimeBuffer.get();
+        if (array.remaining() == 0)
+        {
+          array.set(array.size() - 1, eTime);
+        }
+        else
+        {
+          array.append(eTime);
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * Worker thread base implementation.
+   *
+   * @param <R>
+   *          Type of result handler.
+   */
+  abstract class WorkerThread<R extends ResultHandler<?>> extends Thread
+  {
+    private int count;
+
+    private final AsynchronousConnection connection;
+
+    private final ConnectionFactory connectionFactory;
+
+
+
+    WorkerThread(final AsynchronousConnection connection,
+        final ConnectionFactory connectionFactory)
+    {
+      super("Worker Thread");
+      this.connection = connection;
+      this.connectionFactory = connectionFactory;
+    }
+
+
+
+    public abstract R getHandler(long startTime);
+
+
+
+    public abstract FutureResult<?> performOperation(
+        AsynchronousConnection connection, R handler, DataSource[] dataSources);
+
+
+
+    @Override
+    public void run()
+    {
+      if (dataSources.get() == null)
+      {
+        try
+        {
+          dataSources.set(DataSource.parse(arguments.getValues()));
+        }
+        catch (final IOException ioe)
+        {
+          // Ignore as this shouldn've been handled eariler
+        }
+      }
+
+      FutureResult<?> future;
+      AsynchronousConnection connection;
+      R handler;
+
+      final double targetTimeInMS =
+        (1.0 / (targetThroughput / (numThreads * numConnections))) * 1000.0;
+      double sleepTimeInMS = 0;
+      long start;
+      while (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
+      {
+        start = System.nanoTime();
+        handler = getHandler(start);
+
+        if (this.connection == null)
+        {
+          try
+          {
+            connection = connectionFactory.getAsynchronousConnection(null)
+                .get();
+          }
+          catch (final InterruptedException e)
+          {
+            // Ignore and check stop requested
+            continue;
+          }
+          catch (final ErrorResultException e)
+          {
+            app.println(LocalizableMessage.raw(e.getResult()
+                .getDiagnosticMessage()));
+            if (e.getCause() != null && app.isVerbose())
+            {
+              e.getCause().printStackTrace(app.getErrorStream());
+            }
+            stopRequested = true;
+            break;
+          }
+        }
+        else
+        {
+          connection = this.connection;
+          if (!noRebind
+              && connection instanceof AuthenticatedAsynchronousConnection)
+          {
+            final AuthenticatedAsynchronousConnection ac =
+              (AuthenticatedAsynchronousConnection) connection;
+            try
+            {
+              ac.rebind(null).get();
+            }
+            catch (final InterruptedException e)
+            {
+              // Ignore and check stop requested
+              continue;
+            }
+            catch (final ErrorResultException e)
+            {
+              app.println(LocalizableMessage.raw(e.getResult().toString()));
+              if (e.getCause() != null && app.isVerbose())
+              {
+                e.getCause().printStackTrace(app.getErrorStream());
+              }
+              stopRequested = true;
+              break;
+            }
+          }
+        }
+        future = performOperation(connection, handler, dataSources.get());
+        operationRecentCount.getAndIncrement();
+        count++;
+        if (!isAsync)
+        {
+          try
+          {
+            future.get();
+          }
+          catch (final InterruptedException e)
+          {
+            // Ignore and check stop requested
+            continue;
+          }
+          catch (final ErrorResultException e)
+          {
+            if (e.getCause() instanceof IOException)
+            {
+              e.getCause().printStackTrace(app.getErrorStream());
+              stopRequested = true;
+              break;
+            }
+            // Ignore. Handled by result handler
+          }
+          finally
+          {
+            if (this.connection == null)
+            {
+              connection.close();
+            }
+          }
+        }
+        if (targetThroughput > 0)
+        {
+          try
+          {
+            if (sleepTimeInMS > 1)
+            {
+              sleep((long) Math.floor(sleepTimeInMS));
+            }
+          }
+          catch (final InterruptedException e)
+          {
+            continue;
+          }
+
+          sleepTimeInMS += targetTimeInMS
+              - ((System.nanoTime() - start) / 1000000.0);
+          if (sleepTimeInMS < -60000)
+          {
+            // If we fall behind by 60 seconds, just forget about
+            // catching up
+            sleepTimeInMS = -60000;
+          }
+        }
+      }
+    }
   }
 
 
@@ -781,64 +570,14 @@
 
 
 
-    public ReversableArray(int capacity)
+    public ReversableArray(final int capacity)
     {
       this.array = new long[capacity];
     }
 
 
 
-    public void set(int index, long value)
-    {
-      if (index >= size)
-      {
-        throw new IndexOutOfBoundsException();
-      }
-      if (!reversed)
-      {
-        array[index] = value;
-      }
-      else
-      {
-        array[size - index - 1] = value;
-      }
-    }
-
-
-
-    public long get(int index)
-    {
-      if (index >= size)
-      {
-        throw new IndexOutOfBoundsException();
-      }
-      if (!reversed)
-      {
-        return array[index];
-      }
-      else
-      {
-        return array[size - index - 1];
-      }
-    }
-
-
-
-    public int size()
-    {
-      return size;
-    }
-
-
-
-    public void reverse()
-    {
-      reversed = !reversed;
-    }
-
-
-
-    public void append(long value)
+    public void append(final long value)
     {
       if (size == array.length)
       {
@@ -859,7 +598,7 @@
 
 
 
-    public void append(ReversableArray a, int length)
+    public void append(final ReversableArray a, final int length)
     {
       if (length > a.size() || length > remaining())
       {
@@ -879,13 +618,6 @@
 
 
 
-    public int remaining()
-    {
-      return array.length - size;
-    }
-
-
-
     public void clear()
     {
       size = 0;
@@ -893,7 +625,57 @@
 
 
 
-    public void siftDown(int start, int end)
+    public long get(final int index)
+    {
+      if (index >= size)
+      {
+        throw new IndexOutOfBoundsException();
+      }
+      if (!reversed)
+      {
+        return array[index];
+      }
+      else
+      {
+        return array[size - index - 1];
+      }
+    }
+
+
+
+    public int remaining()
+    {
+      return array.length - size;
+    }
+
+
+
+    public void reverse()
+    {
+      reversed = !reversed;
+    }
+
+
+
+    public void set(final int index, final long value)
+    {
+      if (index >= size)
+      {
+        throw new IndexOutOfBoundsException();
+      }
+      if (!reversed)
+      {
+        array[index] = value;
+      }
+      else
+      {
+        array[size - index - 1] = value;
+      }
+    }
+
+
+
+    public void siftDown(final int start, final int end)
     {
       int root = start;
       int child;
@@ -918,7 +700,7 @@
 
 
 
-    public void siftUp(int start, int end)
+    public void siftUp(final int start, final int end)
     {
       int child = end;
       int parent;
@@ -939,11 +721,297 @@
 
 
 
-    private void swap(int i, int i2)
+    public int size()
     {
-      long temp = get(i);
+      return size;
+    }
+
+
+
+    private void swap(final int i, final int i2)
+    {
+      final long temp = get(i);
       set(i, get(i2));
       set(i2, temp);
     }
   }
+
+
+
+  private static final String[] EMPTY_STRINGS = new String[0];
+
+  private final AtomicInteger operationRecentCount = new AtomicInteger();
+
+  private final AtomicInteger successRecentCount = new AtomicInteger();
+
+  private final AtomicInteger failedRecentCount = new AtomicInteger();
+
+  private final AtomicLong waitRecentTime = new AtomicLong();
+
+  private final AtomicReference<ReversableArray> eTimeBuffer = new AtomicReference<ReversableArray>(
+      new ReversableArray(100000));
+
+  private final ConsoleApplication app;
+
+  private final ThreadLocal<DataSource[]> dataSources = new ThreadLocal<DataSource[]>();
+
+  private volatile boolean stopRequested;
+
+  private int numThreads;
+
+  private int numConnections;
+
+  private int targetThroughput;
+
+  private int maxIterations;
+
+  private boolean isAsync;
+
+  private boolean noRebind;
+
+  private int statsInterval;
+
+  private final IntegerArgument numThreadsArgument;
+
+  private final IntegerArgument maxIterationsArgument;
+
+  private final IntegerArgument statsIntervalArgument;
+
+  private final IntegerArgument targetThroughputArgument;
+
+  private final IntegerArgument numConnectionsArgument;
+
+  private final IntegerArgument percentilesArgument;
+
+  private final BooleanArgument keepConnectionsOpen;
+
+  private final BooleanArgument noRebindArgument;
+
+  private final BooleanArgument asyncArgument;
+
+  private final StringArgument arguments;
+
+
+
+  PerformanceRunner(final ArgumentParser argParser, final ConsoleApplication app)
+      throws ArgumentException
+  {
+    this.app = app;
+    numThreadsArgument = new IntegerArgument("numThreads", 't', "numThreads",
+        false, false, true, LocalizableMessage.raw("{numThreads}"), 1, null,
+        true, 1, false, 0, LocalizableMessage
+            .raw("number of search threads per connection"));
+    numThreadsArgument.setPropertyName("numThreads");
+    argParser.addArgument(numThreadsArgument);
+
+    numConnectionsArgument = new IntegerArgument("numConnections", 'c',
+        "numConnections", false, false, true, LocalizableMessage
+            .raw("{numConnections}"), 1, null, true, 1, false, 0,
+        LocalizableMessage.raw("number of connections"));
+    numThreadsArgument.setPropertyName("numConnections");
+    argParser.addArgument(numConnectionsArgument);
+
+    maxIterationsArgument = new IntegerArgument("maxIterations", 'm',
+        "maxIterations", false, false, true, LocalizableMessage
+            .raw("{maxIterations}"), 0, null, LocalizableMessage
+            .raw("max searches per thread, 0 for unlimited"));
+    numThreadsArgument.setPropertyName("maxIterations");
+    argParser.addArgument(maxIterationsArgument);
+
+    statsIntervalArgument = new IntegerArgument("statInterval", 'i',
+        "statInterval", false, false, true, LocalizableMessage
+            .raw("{statInterval}"), 5, null, true, 1, false, 0,
+        LocalizableMessage
+            .raw("Display results each specified number of seconds"));
+    numThreadsArgument.setPropertyName("statInterval");
+    argParser.addArgument(statsIntervalArgument);
+
+    targetThroughputArgument = new IntegerArgument("targetThroughput", 'M',
+        "targetThroughput", false, false, true, LocalizableMessage
+            .raw("{targetThroughput}"), 0, null, LocalizableMessage
+            .raw("Target average throughput to achieve"));
+    targetThroughputArgument.setPropertyName("targetThroughput");
+    argParser.addArgument(targetThroughputArgument);
+
+    percentilesArgument = new IntegerArgument("percentile", 'e', "percentile",
+        false, true, LocalizableMessage.raw("{percentile}"), true, 50, true,
+        100, LocalizableMessage.raw("Calculate max response time for a "
+            + "percentile of operations"));
+    percentilesArgument.setPropertyName("percentile");
+    argParser.addArgument(percentilesArgument);
+
+    keepConnectionsOpen = new BooleanArgument("keepConnectionsOpen", 'f',
+        "keepConnectionsOpen", LocalizableMessage.raw("keep connections open"));
+    keepConnectionsOpen.setPropertyName("keepConnectionsOpen");
+    argParser.addArgument(keepConnectionsOpen);
+
+    noRebindArgument = new BooleanArgument("noRebind", 'F', "noRebind",
+        LocalizableMessage.raw("keep connections open and don't rebind"));
+    keepConnectionsOpen.setPropertyName("noRebind");
+    argParser.addArgument(noRebindArgument);
+
+    asyncArgument = new BooleanArgument("asynchronous", 'A', "asynchronous",
+        LocalizableMessage.raw("asynch, don't wait for results"));
+    keepConnectionsOpen.setPropertyName("asynchronous");
+    argParser.addArgument(asyncArgument);
+
+    arguments = new StringArgument("arguments", 'g', "arguments", false, true,
+        true, LocalizableMessage.raw("{arguments}"), null, null,
+        LocalizableMessage
+            .raw("arguments for variables in the filter and/or base DN"));
+    arguments.setPropertyName("arguments");
+    argParser.addArgument(arguments);
+  }
+
+
+
+  public void connectionClosed()
+  {
+    // Ignore
+  }
+
+
+
+  public synchronized void connectionErrorOccurred(
+      final boolean isDisconnectNotification, final ErrorResultException error)
+  {
+    if (!stopRequested)
+    {
+      app.println(LocalizableMessage.raw("Error occurred on one or more "
+          + "connections: " + error.getResult().toString()));
+      if (error.getCause() != null && app.isVerbose())
+      {
+        error.getCause().printStackTrace(app.getErrorStream());
+      }
+      stopRequested = true;
+    }
+  }
+
+
+
+  public void connectionReceivedUnsolicitedNotification(
+      final ExtendedResult notification)
+  {
+    // Ignore
+  }
+
+
+
+  public void validate() throws ArgumentException
+  {
+    numConnections = numConnectionsArgument.getIntValue();
+    numThreads = numThreadsArgument.getIntValue();
+    maxIterations = maxIterationsArgument.getIntValue();
+    statsInterval = statsIntervalArgument.getIntValue() * 1000;
+    targetThroughput = targetThroughputArgument.getIntValue();
+
+    isAsync = asyncArgument.isPresent();
+    noRebind = noRebindArgument.isPresent();
+
+    if (!noRebindArgument.isPresent() && this.numThreads > 1)
+    {
+      throw new ArgumentException(LocalizableMessage.raw("--"
+          + noRebindArgument.getLongIdentifier() + " must be used if --"
+          + numThreadsArgument.getLongIdentifier() + " is > 1"));
+    }
+
+    if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
+    {
+      throw new ArgumentException(LocalizableMessage.raw("--"
+          + noRebindArgument.getLongIdentifier()
+          + " must be used when using --" + asyncArgument.getLongIdentifier()));
+    }
+
+    try
+    {
+      DataSource.parse(arguments.getValues());
+    }
+    catch (final IOException ioe)
+    {
+      throw new ArgumentException(LocalizableMessage
+          .raw("Error occured while parsing arguments: " + ioe.toString()));
+    }
+  }
+
+
+
+  final DataSource[] getDataSources()
+  {
+    try
+    {
+      return DataSource.parse(arguments.getValues());
+    }
+    catch (final IOException ioe)
+    {
+      // Ignore as this shouldn've been handled eariler
+    }
+    return new DataSource[0];
+  }
+
+
+
+  abstract StatsThread newStatsThread();
+
+
+
+  abstract WorkerThread<?> newWorkerThread(AsynchronousConnection connection,
+      ConnectionFactory connectionFactory);
+
+
+
+  final int run(final ConnectionFactory connectionFactory)
+  {
+    final List<Thread> threads = new ArrayList<Thread>();
+    final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>();
+
+    AsynchronousConnection connection = null;
+    Thread thread;
+    try
+    {
+      for (int i = 0; i < numConnections; i++)
+      {
+        if (keepConnectionsOpen.isPresent() || noRebindArgument.isPresent())
+        {
+          connection = connectionFactory.getAsynchronousConnection(null).get();
+          connection.addConnectionEventListener(this);
+          connections.add(connection);
+        }
+        for (int j = 0; j < numThreads; j++)
+        {
+          thread = newWorkerThread(connection, connectionFactory);
+
+          threads.add(thread);
+          thread.start();
+        }
+      }
+
+      final Thread statsThread = newStatsThread();
+      statsThread.start();
+
+      for (final Thread t : threads)
+      {
+        t.join();
+      }
+      stopRequested = true;
+      statsThread.join();
+    }
+    catch (final InterruptedException e)
+    {
+      stopRequested = true;
+    }
+    catch (final ErrorResultException e)
+    {
+      stopRequested = true;
+      app.println(LocalizableMessage.raw(e.getResult().getDiagnosticMessage()));
+    }
+    finally
+    {
+      for (final AsynchronousConnection c : connections)
+      {
+        c.close();
+      }
+    }
+
+    return 0;
+  }
 }

--
Gitblit v1.10.0