mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
27.14.2011 c380f629462b7d8e7534601f24364fe948e28780
opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java
@@ -95,10 +95,9 @@
    private final class BindUpdateStatsResultHandler extends
        UpdateStatsResultHandler<BindResult>
    {
      private BindUpdateStatsResultHandler(final long startTime,
          final AsynchronousConnection connection, final ConnectionWorker worker)
      private BindUpdateStatsResultHandler(final long startTime)
      {
        super(startTime, connection, worker);
        super(startTime);
      }
@@ -117,7 +116,7 @@
    private final class BindWorkerThread extends ConnectionWorker
    private final class BindWorkerThread extends WorkerThread
    {
      private SearchRequest sr;
      private BindRequest br;
@@ -183,7 +182,7 @@
          final RecursiveFutureResult<SearchResultEntry, BindResult> future =
            new RecursiveFutureResult<SearchResultEntry, BindResult>(
              new BindUpdateStatsResultHandler(startTime, connection, this))
              new BindUpdateStatsResultHandler(startTime))
          {
            @Override
            protected FutureResult<? extends BindResult> chainResult(
@@ -206,7 +205,7 @@
        else
        {
          return performBind(connection, data,
              new BindUpdateStatsResultHandler(startTime, connection, this));
              new BindUpdateStatsResultHandler(startTime));
        }
      }
@@ -418,7 +417,7 @@
    @Override
    ConnectionWorker newConnectionWorker(
    WorkerThread newWorkerThread(
        final AsynchronousConnection connection,
        final ConnectionFactory connectionFactory)
    {
opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java
@@ -53,7 +53,7 @@
{
  private static final class ModifyPerformanceRunner extends PerformanceRunner
  {
    private final class ModifyWorkerThread extends ConnectionWorker
    private final class ModifyWorkerThread extends WorkerThread
    {
      private ModifyRequest mr;
      private Object[] data;
@@ -78,8 +78,8 @@
          data = DataSource.generateData(dataSources, data);
        }
        mr = newModifyRequest(data);
        return connection.modify(mr, new UpdateStatsResultHandler<Result>(
            startTime, connection, this));
        return connection.modify(mr,
            new UpdateStatsResultHandler<Result>(startTime));
      }
@@ -136,7 +136,7 @@
    @Override
    ConnectionWorker newConnectionWorker(
    WorkerThread newWorkerThread(
        final AsynchronousConnection connection,
        final ConnectionFactory connectionFactory)
    {
opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
@@ -33,14 +33,12 @@
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.*;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
@@ -54,170 +52,6 @@
 */
abstract class PerformanceRunner implements ConnectionEventListener
{
  abstract class ConnectionWorker
  {
    private final AtomicInteger operationsInFlight = new AtomicInteger();
    private volatile int count;
    private final AsynchronousConnection staticConnection;
    private final ConnectionFactory connectionFactory;
    private final CountDownLatch latch = new CountDownLatch(1);
    ConnectionWorker(final AsynchronousConnection staticConnection,
        final ConnectionFactory connectionFactory)
    {
      this.staticConnection = staticConnection;
      this.connectionFactory = connectionFactory;
    }
    public void operationCompleted(final AsynchronousConnection connection)
    {
      if (operationsInFlight.decrementAndGet() == 0
          && this.staticConnection == null)
      {
        connection.close();
      }
      startWork();
    }
    public abstract FutureResult<?> performOperation(
        final AsynchronousConnection connection,
        final DataSource[] dataSources, final long startTime);
    public void startWork()
    {
      if (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
      {
        if (this.staticConnection == null)
        {
          connectionFactory
              .getAsynchronousConnection(new ResultHandler<AsynchronousConnection>()
              {
                public void handleErrorResult(final ErrorResultException e)
                {
                  app.println(LocalizableMessage.raw(e.getResult()
                      .getDiagnosticMessage()));
                  if (e.getCause() != null && app.isVerbose())
                  {
                    e.getCause().printStackTrace(app.getErrorStream());
                  }
                  stopRequested = true;
                }
                public void handleResult(final AsynchronousConnection result)
                {
                  doWork(result);
                }
              });
        }
        else
        {
          if (!noRebind
              && this.staticConnection instanceof AuthenticatedAsynchronousConnection)
          {
            final AuthenticatedAsynchronousConnection ac =
              (AuthenticatedAsynchronousConnection) this.staticConnection;
            ac.rebind(new ResultHandler<BindResult>()
            {
              public void handleErrorResult(final ErrorResultException e)
              {
                app.println(LocalizableMessage.raw(e.getResult().toString()));
                if (e.getCause() != null && app.isVerbose())
                {
                  e.getCause().printStackTrace(app.getErrorStream());
                }
                stopRequested = true;
              }
              public void handleResult(final BindResult result)
              {
                doWork(staticConnection);
              }
            });
          }
          else
          {
            doWork(staticConnection);
          }
        }
      }
      else
      {
        latch.countDown();
      }
    }
    public void waitFor() throws InterruptedException
    {
      latch.await();
    }
    private void doWork(final AsynchronousConnection connection)
    {
      long start;
      double sleepTimeInMS = 0;
      final int opsToPerform = isAsync ? numConcurrentTasks : numConcurrentTasks
          - operationsInFlight.get();
      for (int i = 0; i < opsToPerform; i++)
      {
        if (maxIterations > 0 && count >= maxIterations)
        {
          break;
        }
        start = System.nanoTime();
        performOperation(connection, dataSources.get(), start);
        operationRecentCount.getAndIncrement();
        operationsInFlight.getAndIncrement();
        count++;
        if (targetThroughput > 0)
        {
          try
          {
            if (sleepTimeInMS > 1)
            {
              Thread.sleep((long) Math.floor(sleepTimeInMS));
            }
          }
          catch (final InterruptedException e)
          {
            continue;
          }
          sleepTimeInMS += targetTimeInMS
              - ((System.nanoTime() - start) / 1000000.0);
          if (sleepTimeInMS < -60000)
          {
            // If we fall behind by 60 seconds, just forget about
            // catching up
            sleepTimeInMS = -60000;
          }
        }
      }
    }
  }
  /**
   * Statistics thread base implementation.
   */
@@ -566,17 +400,12 @@
  class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S>
  {
    private final long startTime;
    private final AsynchronousConnection connection;
    private final ConnectionWorker worker;
    UpdateStatsResultHandler(final long startTime,
        final AsynchronousConnection connection, final ConnectionWorker worker)
    UpdateStatsResultHandler(final long startTime)
    {
      this.startTime = startTime;
      this.connection = connection;
      this.worker = worker;
    }
@@ -590,8 +419,6 @@
      {
        app.println(LocalizableMessage.raw(error.getResult().toString()));
      }
      worker.operationCompleted(connection);
    }
@@ -600,7 +427,6 @@
    {
      successRecentCount.getAndIncrement();
      updateStats();
      worker.operationCompleted(connection);
    }
@@ -662,7 +488,7 @@
      AsynchronousConnection connection;
      final double targetTimeInMS =
        (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
        (1000.0 / (targetThroughput / (double) (numThreads * numConnections)));
      double sleepTimeInMS = 0;
      long start;
      while (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
@@ -1001,7 +827,7 @@
  private volatile boolean stopRequested;
  private int numConcurrentTasks;
  private int numThreads;
  private int numConnections;
@@ -1015,9 +841,7 @@
  private int statsInterval;
  private double targetTimeInMS;
  private final IntegerArgument numConcurrentTasksArgument;
  private final IntegerArgument numThreadsArgument;
  private final IntegerArgument maxIterationsArgument;
@@ -1045,19 +869,19 @@
      throws ArgumentException
  {
    this.app = app;
    numConcurrentTasksArgument = new IntegerArgument("numConcurrentTasks", 't',
        "numConcurrentTasks", false, false, true,
        LocalizableMessage.raw("{numConcurrentTasks}"), 1, null, true, 1,
    numThreadsArgument = new IntegerArgument("numThreads", 't',
        "numThreads", false, false, true,
        LocalizableMessage.raw("{numThreads}"), 1, null, true, 1,
        false, 0,
        LocalizableMessage.raw("Number of concurrent tasks per connection"));
    numConcurrentTasksArgument.setPropertyName("numConcurrentTasks");
        LocalizableMessage.raw("Number of worker threads per connection"));
    numThreadsArgument.setPropertyName("numThreads");
    if (!alwaysSingleThreaded)
    {
      argParser.addArgument(numConcurrentTasksArgument);
      argParser.addArgument(numThreadsArgument);
    }
    else
    {
      numConcurrentTasksArgument.addValue("1");
      numThreadsArgument.addValue("1");
    }
    numConnectionsArgument = new IntegerArgument("numConnections", 'c',
@@ -1178,19 +1002,20 @@
  public final void validate() throws ArgumentException
  {
    numConnections = numConnectionsArgument.getIntValue();
    numConcurrentTasks = numConcurrentTasksArgument.getIntValue();
    maxIterations = maxIterationsArgument.getIntValue() / numConnections;
    numThreads = numThreadsArgument.getIntValue();
    maxIterations = maxIterationsArgument.getIntValue()
        / numConnections / numThreads;
    statsInterval = statsIntervalArgument.getIntValue() * 1000;
    targetThroughput = targetThroughputArgument.getIntValue();
    isAsync = asyncArgument.isPresent();
    noRebind = noRebindArgument.isPresent();
    if (!noRebindArgument.isPresent() && this.numConcurrentTasks > 1)
    if (!noRebindArgument.isPresent() && this.numThreads > 1)
    {
      throw new ArgumentException(LocalizableMessage.raw("--"
          + noRebindArgument.getLongIdentifier() + " must be used if --"
          + numConcurrentTasksArgument.getLongIdentifier() + " is > 1"));
          + numThreadsArgument.getLongIdentifier() + " is > 1"));
    }
    if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
@@ -1201,9 +1026,6 @@
    }
    dataSourcePrototypes = DataSource.parse(arguments.getValues());
    targetTimeInMS =
      (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
  }
@@ -1220,7 +1042,7 @@
  abstract ConnectionWorker newConnectionWorker(
  abstract WorkerThread newWorkerThread(
      final AsynchronousConnection connection,
      final ConnectionFactory connectionFactory);
@@ -1232,8 +1054,9 @@
  final int run(final ConnectionFactory connectionFactory)
  {
    final List<ConnectionWorker> workers = new ArrayList<ConnectionWorker>();
    final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>();
    final List<Thread> threads = new ArrayList<Thread>();
    final List<AsynchronousConnection> connections =
      new ArrayList<AsynchronousConnection>();
    AsynchronousConnection connection = null;
    try
@@ -1246,18 +1069,20 @@
          connection.addConnectionEventListener(this);
          connections.add(connection);
        }
        final ConnectionWorker worker = newConnectionWorker(connection,
            connectionFactory);
        workers.add(worker);
        worker.startWork();
        for (int j = 0; j < numThreads; j++)
        {
          final Thread thread = newWorkerThread(connection, connectionFactory);
          threads.add(thread);
          thread.start();
        }
      }
      final Thread statsThread = newStatsThread();
      statsThread.start();
      for (final ConnectionWorker w : workers)
      for (final Thread t : threads)
      {
        w.waitFor();
        t.join();
      }
      stopRequested = true;
      statsThread.join();
opendj-sdk/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java
@@ -62,10 +62,9 @@
    private final class SearchStatsHandler extends
        UpdateStatsResultHandler<Result> implements SearchResultHandler
    {
      private SearchStatsHandler(final long startTime,
          final AsynchronousConnection connection, final ConnectionWorker worker)
      private SearchStatsHandler(final long startTime)
      {
        super(startTime, connection, worker);
        super(startTime);
      }
@@ -121,7 +120,7 @@
    private final class SearchWorkerThread extends ConnectionWorker
    private final class SearchWorkerThread extends WorkerThread
    {
      private SearchRequest sr;
@@ -162,8 +161,7 @@
          sr.setFilter(String.format(filter, data));
          sr.setName(String.format(baseDN, data));
        }
        return connection.search(sr, new SearchStatsHandler(startTime,
            connection, this));
        return connection.search(sr, new SearchStatsHandler(startTime));
      }
    }
@@ -190,7 +188,7 @@
    @Override
    ConnectionWorker newConnectionWorker(
    WorkerThread newWorkerThread(
        final AsynchronousConnection connection,
        final ConnectionFactory connectionFactory)
    {