From 0b99e26e3e75fe58045645706cd3cc8995fda728 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 27 May 2011 16:14:16 +0000
Subject: [PATCH] Fix OPENDJ-127: Back out OpenDS SDK revision 6648 as it prevents the -M options from working, however doing so seems to significantly impact max throughput (around 20%), probably due to context switching during Future.get() calls.

---
 opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java        |   12 +-
 opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java           |    8 
 opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java |  231 +++++----------------------------------------
 opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java          |   13 +-
 4 files changed, 43 insertions(+), 221 deletions(-)

diff --git a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java
index e5fc802..de17abf 100644
--- a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java
+++ b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/AuthRate.java
@@ -95,10 +95,9 @@
     private final class BindUpdateStatsResultHandler extends
         UpdateStatsResultHandler<BindResult>
     {
-      private BindUpdateStatsResultHandler(final long startTime,
-          final AsynchronousConnection connection, final ConnectionWorker worker)
+      private BindUpdateStatsResultHandler(final long startTime)
       {
-        super(startTime, connection, worker);
+        super(startTime);
       }
 
 
@@ -117,7 +116,7 @@
 
 
 
-    private final class BindWorkerThread extends ConnectionWorker
+    private final class BindWorkerThread extends WorkerThread
     {
       private SearchRequest sr;
       private BindRequest br;
@@ -183,7 +182,7 @@
 
           final RecursiveFutureResult<SearchResultEntry, BindResult> future =
             new RecursiveFutureResult<SearchResultEntry, BindResult>(
-              new BindUpdateStatsResultHandler(startTime, connection, this))
+              new BindUpdateStatsResultHandler(startTime))
           {
             @Override
             protected FutureResult<? extends BindResult> chainResult(
@@ -206,7 +205,7 @@
         else
         {
           return performBind(connection, data,
-              new BindUpdateStatsResultHandler(startTime, connection, this));
+              new BindUpdateStatsResultHandler(startTime));
         }
       }
 
@@ -418,7 +417,7 @@
 
 
     @Override
-    ConnectionWorker newConnectionWorker(
+    WorkerThread newWorkerThread(
         final AsynchronousConnection connection,
         final ConnectionFactory connectionFactory)
     {
diff --git a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java
index c486260..84d374b 100644
--- a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java
+++ b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/ModRate.java
@@ -53,7 +53,7 @@
 {
   private static final class ModifyPerformanceRunner extends PerformanceRunner
   {
-    private final class ModifyWorkerThread extends ConnectionWorker
+    private final class ModifyWorkerThread extends WorkerThread
     {
       private ModifyRequest mr;
       private Object[] data;
@@ -78,8 +78,8 @@
           data = DataSource.generateData(dataSources, data);
         }
         mr = newModifyRequest(data);
-        return connection.modify(mr, new UpdateStatsResultHandler<Result>(
-            startTime, connection, this));
+        return connection.modify(mr,
+            new UpdateStatsResultHandler<Result>(startTime));
       }
 
 
@@ -136,7 +136,7 @@
 
 
     @Override
-    ConnectionWorker newConnectionWorker(
+    WorkerThread newWorkerThread(
         final AsynchronousConnection connection,
         final ConnectionFactory connectionFactory)
     {
diff --git a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
index bff44e1..0ee2525 100644
--- a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
+++ b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
@@ -33,14 +33,12 @@
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.util.*;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.opendj.ldap.*;
-import org.forgerock.opendj.ldap.responses.BindResult;
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
 import org.forgerock.opendj.ldap.responses.Result;
 
@@ -54,170 +52,6 @@
  */
 abstract class PerformanceRunner implements ConnectionEventListener
 {
-  abstract class ConnectionWorker
-  {
-    private final AtomicInteger operationsInFlight = new AtomicInteger();
-
-    private volatile int count;
-
-    private final AsynchronousConnection staticConnection;
-
-    private final ConnectionFactory connectionFactory;
-
-    private final CountDownLatch latch = new CountDownLatch(1);
-
-
-
-    ConnectionWorker(final AsynchronousConnection staticConnection,
-        final ConnectionFactory connectionFactory)
-    {
-      this.staticConnection = staticConnection;
-      this.connectionFactory = connectionFactory;
-    }
-
-
-
-    public void operationCompleted(final AsynchronousConnection connection)
-    {
-      if (operationsInFlight.decrementAndGet() == 0
-          && this.staticConnection == null)
-      {
-        connection.close();
-      }
-      startWork();
-    }
-
-
-
-    public abstract FutureResult<?> performOperation(
-        final AsynchronousConnection connection,
-        final DataSource[] dataSources, final long startTime);
-
-
-
-    public void startWork()
-    {
-      if (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
-      {
-        if (this.staticConnection == null)
-        {
-          connectionFactory
-              .getAsynchronousConnection(new ResultHandler<AsynchronousConnection>()
-              {
-                public void handleErrorResult(final ErrorResultException e)
-                {
-                  app.println(LocalizableMessage.raw(e.getResult()
-                      .getDiagnosticMessage()));
-                  if (e.getCause() != null && app.isVerbose())
-                  {
-                    e.getCause().printStackTrace(app.getErrorStream());
-                  }
-                  stopRequested = true;
-                }
-
-
-
-                public void handleResult(final AsynchronousConnection result)
-                {
-                  doWork(result);
-                }
-              });
-        }
-        else
-        {
-          if (!noRebind
-              && this.staticConnection instanceof AuthenticatedAsynchronousConnection)
-          {
-            final AuthenticatedAsynchronousConnection ac =
-              (AuthenticatedAsynchronousConnection) this.staticConnection;
-            ac.rebind(new ResultHandler<BindResult>()
-            {
-              public void handleErrorResult(final ErrorResultException e)
-              {
-                app.println(LocalizableMessage.raw(e.getResult().toString()));
-                if (e.getCause() != null && app.isVerbose())
-                {
-                  e.getCause().printStackTrace(app.getErrorStream());
-                }
-                stopRequested = true;
-              }
-
-
-
-              public void handleResult(final BindResult result)
-              {
-                doWork(staticConnection);
-              }
-            });
-          }
-          else
-          {
-            doWork(staticConnection);
-          }
-        }
-      }
-      else
-      {
-        latch.countDown();
-      }
-    }
-
-
-
-    public void waitFor() throws InterruptedException
-    {
-      latch.await();
-    }
-
-
-
-    private void doWork(final AsynchronousConnection connection)
-    {
-      long start;
-      double sleepTimeInMS = 0;
-      final int opsToPerform = isAsync ? numConcurrentTasks : numConcurrentTasks
-          - operationsInFlight.get();
-      for (int i = 0; i < opsToPerform; i++)
-      {
-        if (maxIterations > 0 && count >= maxIterations)
-        {
-          break;
-        }
-        start = System.nanoTime();
-        performOperation(connection, dataSources.get(), start);
-        operationRecentCount.getAndIncrement();
-        operationsInFlight.getAndIncrement();
-        count++;
-
-        if (targetThroughput > 0)
-        {
-          try
-          {
-            if (sleepTimeInMS > 1)
-            {
-              Thread.sleep((long) Math.floor(sleepTimeInMS));
-            }
-          }
-          catch (final InterruptedException e)
-          {
-            continue;
-          }
-
-          sleepTimeInMS += targetTimeInMS
-              - ((System.nanoTime() - start) / 1000000.0);
-          if (sleepTimeInMS < -60000)
-          {
-            // If we fall behind by 60 seconds, just forget about
-            // catching up
-            sleepTimeInMS = -60000;
-          }
-        }
-      }
-    }
-  }
-
-
-
   /**
    * Statistics thread base implementation.
    */
@@ -566,17 +400,12 @@
   class UpdateStatsResultHandler<S extends Result> implements ResultHandler<S>
   {
     private final long startTime;
-    private final AsynchronousConnection connection;
-    private final ConnectionWorker worker;
 
 
 
-    UpdateStatsResultHandler(final long startTime,
-        final AsynchronousConnection connection, final ConnectionWorker worker)
+    UpdateStatsResultHandler(final long startTime)
     {
       this.startTime = startTime;
-      this.connection = connection;
-      this.worker = worker;
     }
 
 
@@ -590,8 +419,6 @@
       {
         app.println(LocalizableMessage.raw(error.getResult().toString()));
       }
-
-      worker.operationCompleted(connection);
     }
 
 
@@ -600,7 +427,6 @@
     {
       successRecentCount.getAndIncrement();
       updateStats();
-      worker.operationCompleted(connection);
     }
 
 
@@ -662,7 +488,7 @@
       AsynchronousConnection connection;
 
       final double targetTimeInMS =
-        (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
+        (1000.0 / (targetThroughput / (double) (numThreads * numConnections)));
       double sleepTimeInMS = 0;
       long start;
       while (!stopRequested && !(maxIterations > 0 && count >= maxIterations))
@@ -1001,7 +827,7 @@
 
   private volatile boolean stopRequested;
 
-  private int numConcurrentTasks;
+  private int numThreads;
 
   private int numConnections;
 
@@ -1015,9 +841,7 @@
 
   private int statsInterval;
 
-  private double targetTimeInMS;
-
-  private final IntegerArgument numConcurrentTasksArgument;
+  private final IntegerArgument numThreadsArgument;
 
   private final IntegerArgument maxIterationsArgument;
 
@@ -1045,19 +869,19 @@
       throws ArgumentException
   {
     this.app = app;
-    numConcurrentTasksArgument = new IntegerArgument("numConcurrentTasks", 't',
-        "numConcurrentTasks", false, false, true,
-        LocalizableMessage.raw("{numConcurrentTasks}"), 1, null, true, 1,
+    numThreadsArgument = new IntegerArgument("numThreads", 't',
+        "numThreads", false, false, true,
+        LocalizableMessage.raw("{numThreads}"), 1, null, true, 1,
         false, 0,
-        LocalizableMessage.raw("Number of concurrent tasks per connection"));
-    numConcurrentTasksArgument.setPropertyName("numConcurrentTasks");
+        LocalizableMessage.raw("Number of worker threads per connection"));
+    numThreadsArgument.setPropertyName("numThreads");
     if (!alwaysSingleThreaded)
     {
-      argParser.addArgument(numConcurrentTasksArgument);
+      argParser.addArgument(numThreadsArgument);
     }
     else
     {
-      numConcurrentTasksArgument.addValue("1");
+      numThreadsArgument.addValue("1");
     }
 
     numConnectionsArgument = new IntegerArgument("numConnections", 'c',
@@ -1178,19 +1002,20 @@
   public final void validate() throws ArgumentException
   {
     numConnections = numConnectionsArgument.getIntValue();
-    numConcurrentTasks = numConcurrentTasksArgument.getIntValue();
-    maxIterations = maxIterationsArgument.getIntValue() / numConnections;
+    numThreads = numThreadsArgument.getIntValue();
+    maxIterations = maxIterationsArgument.getIntValue()
+        / numConnections / numThreads;
     statsInterval = statsIntervalArgument.getIntValue() * 1000;
     targetThroughput = targetThroughputArgument.getIntValue();
 
     isAsync = asyncArgument.isPresent();
     noRebind = noRebindArgument.isPresent();
 
-    if (!noRebindArgument.isPresent() && this.numConcurrentTasks > 1)
+    if (!noRebindArgument.isPresent() && this.numThreads > 1)
     {
       throw new ArgumentException(LocalizableMessage.raw("--"
           + noRebindArgument.getLongIdentifier() + " must be used if --"
-          + numConcurrentTasksArgument.getLongIdentifier() + " is > 1"));
+          + numThreadsArgument.getLongIdentifier() + " is > 1"));
     }
 
     if (!noRebindArgument.isPresent() && asyncArgument.isPresent())
@@ -1201,9 +1026,6 @@
     }
 
     dataSourcePrototypes = DataSource.parse(arguments.getValues());
-
-    targetTimeInMS =
-      (1.0 / (targetThroughput / (double) (numConcurrentTasks * numConnections))) * 1000.0;
   }
 
 
@@ -1220,7 +1042,7 @@
 
 
 
-  abstract ConnectionWorker newConnectionWorker(
+  abstract WorkerThread newWorkerThread(
       final AsynchronousConnection connection,
       final ConnectionFactory connectionFactory);
 
@@ -1232,8 +1054,9 @@
 
   final int run(final ConnectionFactory connectionFactory)
   {
-    final List<ConnectionWorker> workers = new ArrayList<ConnectionWorker>();
-    final List<AsynchronousConnection> connections = new ArrayList<AsynchronousConnection>();
+    final List<Thread> threads = new ArrayList<Thread>();
+    final List<AsynchronousConnection> connections =
+      new ArrayList<AsynchronousConnection>();
 
     AsynchronousConnection connection = null;
     try
@@ -1246,18 +1069,20 @@
           connection.addConnectionEventListener(this);
           connections.add(connection);
         }
-        final ConnectionWorker worker = newConnectionWorker(connection,
-            connectionFactory);
-        workers.add(worker);
-        worker.startWork();
+        for (int j = 0; j < numThreads; j++)
+        {
+          final Thread thread = newWorkerThread(connection, connectionFactory);
+          threads.add(thread);
+          thread.start();
+        }
       }
 
       final Thread statsThread = newStatsThread();
       statsThread.start();
 
-      for (final ConnectionWorker w : workers)
+      for (final Thread t : threads)
       {
-        w.waitFor();
+        t.join();
       }
       stopRequested = true;
       statsThread.join();
diff --git a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java
index 2d485c1..a78c738 100644
--- a/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java
+++ b/opendj3/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/SearchRate.java
@@ -62,10 +62,9 @@
     private final class SearchStatsHandler extends
         UpdateStatsResultHandler<Result> implements SearchResultHandler
     {
-      private SearchStatsHandler(final long startTime,
-          final AsynchronousConnection connection, final ConnectionWorker worker)
+      private SearchStatsHandler(final long startTime)
       {
-        super(startTime, connection, worker);
+        super(startTime);
       }
 
 
@@ -121,7 +120,7 @@
 
 
 
-    private final class SearchWorkerThread extends ConnectionWorker
+    private final class SearchWorkerThread extends WorkerThread
     {
       private SearchRequest sr;
 
@@ -162,8 +161,7 @@
           sr.setFilter(String.format(filter, data));
           sr.setName(String.format(baseDN, data));
         }
-        return connection.search(sr, new SearchStatsHandler(startTime,
-            connection, this));
+        return connection.search(sr, new SearchStatsHandler(startTime));
       }
     }
 
@@ -190,7 +188,7 @@
 
 
     @Override
-    ConnectionWorker newConnectionWorker(
+    WorkerThread newWorkerThread(
         final AsynchronousConnection connection,
         final ConnectionFactory connectionFactory)
     {

--
Gitblit v1.10.0