mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
27.14.2011 0b99e26e3e75fe58045645706cd3cc8995fda728
Fix OPENDJ-127: Back out OpenDS SDK revision 6648 as it prevents the -M options from working, however doing so seems to significantly impact max throughput (around 20%), probably due to context switching during Future.get() calls.

#### From OpenDS SDK SVN history for revision 6650 #####

Backout xxxrate threading changes introduced in revision 6648 since they prevent the -M (throughput throttling) option from working. Several attempts were made to fix it:

1) Putting the adaptive sleeps inside the ConnectionWorker.operationComplete callback causes the Grizzly worker thread to be blocked which, in turn, delays other incoming results on other connections (and impacts their etime calculations).

2) Instead of doing adaptive sleeps, I tried putting the "delayed" requests into a ScheduledExecuterService, but this just introduces more complexity (e.g. more threads), more context switches, etc.

At this point, I decided to go back to the original design.

The new threading strategy introduced in revision 6649 seems to more than compensate for the performance benefits made in revision 6648.
4 files modified
264 ■■■■ changed files
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java 13 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java 8 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java 231 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java 12 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java
@@ -95,10 +95,9 @@
    private final class BindUpdateStatsResultHandler extends
        UpdateStatsResultHandler<BindResult>
    {
      private BindUpdateStatsResultHandler(final long startTime,
          final AsynchronousConnection connection, final ConnectionWorker worker)
      private BindUpdateStatsResultHandler(final long startTime)
      {
        super(startTime, connection, worker);
        super(startTime);
      }
@@ -117,7 +116,7 @@
    private final class BindWorkerThread extends ConnectionWorker
    private final class BindWorkerThread extends WorkerThread
    {
      private SearchRequest sr;
      private BindRequest br;
@@ -183,7 +182,7 @@
          final RecursiveFutureResult<SearchResultEntry, BindResult> future =
            new RecursiveFutureResult<SearchResultEntry, BindResult>(
              new BindUpdateStatsResultHandler(startTime, connection, this))
              new BindUpdateStatsResultHandler(startTime))
          {
            @Override
            protected FutureResult<? extends BindResult> chainResult(
@@ -206,7 +205,7 @@
        else
        {
          return performBind(connection, data,
              new BindUpdateStatsResultHandler(startTime, connection, this));
              new BindUpdateStatsResultHandler(startTime));
        }
      }
@@ -418,7 +417,7 @@
    @Override
    ConnectionWorker newConnectionWorker(
    WorkerThread newWorkerThread(
        final AsynchronousConnection connection,
        final ConnectionFactory connectionFactory)
    {
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java
@@ -53,7 +53,7 @@
{
  private static final class ModifyPerformanceRunner extends PerformanceRunner
  {
    private final class ModifyWorkerThread extends ConnectionWorker
    private final class ModifyWorkerThread extends WorkerThread
    {
      private ModifyRequest mr;
      private Object[] data;
@@ -78,8 +78,8 @@
          data = DataSource.generateData(dataSources, data);
        }
        mr = newModifyRequest(data);
        return connection.modify(mr, new UpdateStatsResultHandler<Result>(
            startTime, connection, this));
        return connection.modify(mr,
            new UpdateStatsResultHandler<Result>(startTime));
      }
@@ -136,7 +136,7 @@
    @Override
    ConnectionWorker newConnectionWorker(
    WorkerThread newWorkerThread(
        final AsynchronousConnection connection,
        final ConnectionFactory connectionFactory)
    {
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
@@ -33,14 +33,12 @@
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.*;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
@@ -54,170 +52,6 @@
 */
abstract class PerformanceRunner implements ConnectionEventListener
{
  abstract class ConnectionWorker
  {
    private final AtomicInteger operationsInFlight = new AtomicInteger();
    private volatile int count;
    private final AsynchronousConnection staticConnection;
    private final ConnectionFactory connectionFactory;
    private final CountDownLatch latch = new CountDownLatch(1);
    ConnectionWorker(final AsynchronousConnection staticConnection,
        final ConnectionFactory connectionFactory)
    {
      this.staticConnection = staticConnection;
      this.connectionFactory = connectionFactory;
    }
    public void operationCompleted(final AsynchronousConnection connection)
    {
      if (operationsInFlight.decrementAndGet() == 0
          && this.staticConnection == null)
      {
        connection.close();
      }
      startWork();
    }
    public abstract FutureResult<?> performOperation(
        final AsynchronousConnection connection,
        final DataSource[] dataSources, final long startTime);
    public void startWork()
    {
      if (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
      {
        if (this.staticConnection == null)
        {
          connectionFactory
              .getAsynchronousConnection(new ResultHandler<AsynchronousConnection>()
              {
                public void handleErrorResult(final ErrorResultException e)
                {
                  app.println(LocalizableMessage.raw(e.getResult()
                      .getDiagnosticMessage()));
                  if (e.getCause() != null && app.isVerbose())
                  {
                    e.getCause().printStackTrace(app.getErrorStream());
                  }
                  stopRequested = true;
                }
                public void handleResult(final AsynchronousConnection result)
                {
                  doWork(result);
                }
              });
        }
        else
        {
          if (!noRebind
              && this.staticConnection instanceof AuthenticatedAsynchronousConnection)
          {
            final AuthenticatedAsynchronousConnection ac =
              (AuthenticatedAsynchronousConnection) this.staticConnection;
            ac.rebind(new ResultHandler<BindResult>()
            {
              public void handleErrorResult(final ErrorResultException e)
              {
                app.println(LocalizableMessage.raw(e.getResult().toString()));
                if (e.getCause() != null && app.isVerbose())
                {
                  e.getCause().printStackTrace(app.getErrorStream());
                }
                stopRequested = true;
              }
              public void handleResult(final BindResult result)
              {
                doWork(staticConnection);
              }
            });
          }
          else
          {
            doWork(staticConnection);
          }
        }
      }
      else
      {
        latch.countDown();
      }
    }
    public void waitFor() throws InterruptedException
    {
      latch.await();
    }
    private void doWork(final AsynchronousConnection connection)
    {
      long start;
      double sleepTimeInMS = 0;
      final int opsToPerform = isAsync ? numConcurrentTasks : numConcurrentTasks
          - operationsInFlight.get();
      for (int i = 0; i < opsToPerform; i++)
      {
        if (maxIterations > 0 && count >= maxIterations)
        {
          break;
        }
        start = System.nanoTime();
        performOperation(connection, dataSources.get(), start);
        operationRecentCount.getAndIncrement();
        operationsInFlight.getAndIncrement();
        count++;
        if (targetThroughput > 0)
        {
          try
          {
            if (sleepTimeInMS > 1)
            {
              Thread.sleep((long) Math.floor(sleepTimeInMS));
            }
          }
          catch (final InterruptedException e)
          {
            continue;
          }
          sleepTimeInMS += targetTimeInMS
              - ((System.nanoTime() - start) / 1000000.0);
          if (sleepTimeInMS < -60000)
          {
            // If we fall behind by 60 seconds, just forget about
            // catching up
            sleepTimeInMS = -60000;
          }
        }
      }
    }
  }
  /**
   * Statistics thread base implementation.
   */
@@ -566,17 +400,12 @@
  class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S>
  {
    private final long startTime;
    private final AsynchronousConnection connection;
    private final ConnectionWorker worker;
    UpdateStatsResultHandler(final long startTime,
        final AsynchronousConnection connection, final ConnectionWorker worker)
    UpdateStatsResultHandler(final long startTime)
    {
      this.startTime = startTime;
      this.connection = connection;
      this.worker = worker;
    }
@@ -590,8 +419,6 @@
      {
        app.println(LocalizableMessage.raw(error.getResult().toString()));
      }
      worker.operationCompleted(connection);
    }
@@ -600,7 +427,6 @@
    {
      successRecentCount.getAndIncrement();
      updateStats();
      worker.operationCompleted(connection);
    }
@@ -662,7 +488,7 @@
      AsynchronousConnection connection;
      final double targetTimeInMS =
        (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
        (1000.0 / (targetThroughput / (double) (numThreads * numConnections)));
      double sleepTimeInMS = 0;
      long start;
      while (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
@@ -1001,7 +827,7 @@
  private volatile boolean stopRequested;
  private int numConcurrentTasks;
  private int numThreads;
  private int numConnections;
@@ -1015,9 +841,7 @@
  private int statsInterval;
  private double targetTimeInMS;
  private final IntegerArgument numConcurrentTasksArgument;
  private final IntegerArgument numThreadsArgument;
  private final IntegerArgument maxIterationsArgument;
@@ -1045,19 +869,19 @@
      throws ArgumentException
  {
    this.app = app;
    numConcurrentTasksArgument = new IntegerArgument("numConcurrentTasks", 't',
        "numConcurrentTasks", false, false, true,
        LocalizableMessage.raw("{numConcurrentTasks}"), 1, null, true, 1,
    numThreadsArgument = new IntegerArgument("numThreads", 't',
        "numThreads", false, false, true,
        LocalizableMessage.raw("{numThreads}"), 1, null, true, 1,
        false, 0,
        LocalizableMessage.raw("Number of concurrent tasks per connection"));
    numConcurrentTasksArgument.setPropertyName("numConcurrentTasks");
        LocalizableMessage.raw("Number of worker threads per connection"));
    numThreadsArgument.setPropertyName("numThreads");
    if (!alwaysSingleThreaded)
    {
      argParser.addArgument(numConcurrentTasksArgument);
      argParser.addArgument(numThreadsArgument);
    }
    else
    {
      numConcurrentTasksArgument.addValue("1");
      numThreadsArgument.addValue("1");
    }
    numConnectionsArgument = new IntegerArgument("numConnections", 'c',
@@ -1178,19 +1002,20 @@
  public final void validate() throws ArgumentException
  {
    numConnections = numConnectionsArgument.getIntValue();
    numConcurrentTasks = numConcurrentTasksArgument.getIntValue();
    maxIterations = maxIterationsArgument.getIntValue() / numConnections;
    numThreads = numThreadsArgument.getIntValue();
    maxIterations = maxIterationsArgument.getIntValue()
        / numConnections / numThreads;
    statsInterval = statsIntervalArgument.getIntValue() * 1000;
    targetThroughput = targetThroughputArgument.getIntValue();
    isAsync = asyncArgument.isPresent();
    noRebind = noRebindArgument.isPresent();
    if (!noRebindArgument.isPresent() && this.numConcurrentTasks > 1)
    if (!noRebindArgument.isPresent() && this.numThreads > 1)
    {
      throw new ArgumentException(LocalizableMessage.raw("--"
          + noRebindArgument.getLongIdentifier() + " must be used if --"
          + numConcurrentTasksArgument.getLongIdentifier() + " is > 1"));
          + numThreadsArgument.getLongIdentifier() + " is > 1"));
    }
    if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
@@ -1201,9 +1026,6 @@
    }
    dataSourcePrototypes = DataSource.parse(arguments.getValues());
    targetTimeInMS =
      (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
  }
@@ -1220,7 +1042,7 @@
  abstract ConnectionWorker newConnectionWorker(
  abstract WorkerThread newWorkerThread(
      final AsynchronousConnection connection,
      final ConnectionFactory connectionFactory);
@@ -1232,8 +1054,9 @@
  final int run(final ConnectionFactory connectionFactory)
  {
    final List<ConnectionWorker> workers = new ArrayList<ConnectionWorker>();
    final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>();
    final List<Thread> threads = new ArrayList<Thread>();
    final List<AsynchronousConnection> connections =
      new ArrayList<AsynchronousConnection>();
    AsynchronousConnection connection = null;
    try
@@ -1246,18 +1069,20 @@
          connection.addConnectionEventListener(this);
          connections.add(connection);
        }
        final ConnectionWorker worker = newConnectionWorker(connection,
            connectionFactory);
        workers.add(worker);
        worker.startWork();
        for (int j = 0; j < numThreads; j++)
        {
          final Thread thread = newWorkerThread(connection, connectionFactory);
          threads.add(thread);
          thread.start();
        }
      }
      final Thread statsThread = newStatsThread();
      statsThread.start();
      for (final ConnectionWorker w : workers)
      for (final Thread t : threads)
      {
        w.waitFor();
        t.join();
      }
      stopRequested = true;
      statsThread.join();
opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java
@@ -62,10 +62,9 @@
    private final class SearchStatsHandler extends
        UpdateStatsResultHandler<Result> implements SearchResultHandler
    {
      private SearchStatsHandler(final long startTime,
          final AsynchronousConnection connection, final ConnectionWorker worker)
      private SearchStatsHandler(final long startTime)
      {
        super(startTime, connection, worker);
        super(startTime);
      }
@@ -121,7 +120,7 @@
    private final class SearchWorkerThread extends ConnectionWorker
    private final class SearchWorkerThread extends WorkerThread
    {
      private SearchRequest sr;
@@ -162,8 +161,7 @@
          sr.setFilter(String.format(filter, data));
          sr.setName(String.format(baseDN, data));
        }
        return connection.search(sr, new SearchStatsHandler(startTime,
            connection, this));
        return connection.search(sr, new SearchStatsHandler(startTime));
      }
    }
@@ -190,7 +188,7 @@
    @Override
    ConnectionWorker newConnectionWorker(
    WorkerThread newWorkerThread(
        final AsynchronousConnection connection,
        final ConnectionFactory connectionFactory)
    {