From 3070174573f3217558d17db2a57815963a2cb027 Mon Sep 17 00:00:00 2001
From: Gaetan Boismal <gaetan.boismal@forgerock.com>
Date: Thu, 15 Jan 2015 09:26:10 +0000
Subject: [PATCH] OPENDJ-1693 OPENDJ-1694 OPENDJ-1721 (CR-5748) addrate tool new version This version includes     *Bug fixes         ** OPENDJ-1693: Solve NoSuchElementException by checking race conditions.         ** OPENDJ-1694: Changes the age threshold feature implementation.     *Improvement: Adding purge phase

---
 opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java |   73 +++++++++++++++++++++---------------
 1 files changed, 43 insertions(+), 30 deletions(-)

diff --git a/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java b/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
index 50e0795..36677e7 100644
--- a/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
+++ b/opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/PerformanceRunner.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2014 ForgeRock AS.
+ *      Portions Copyright 2011-2015 ForgeRock AS.
  */
 package com.forgerock.opendj.ldap.tools;
 
@@ -440,6 +440,7 @@
             return EMPTY_STRINGS;
         }
 
+        /** Resets both general and recent statistic indicators. */
         void resetStats() {
             failedCount = 0;
             operationCount = 0;
@@ -451,13 +452,17 @@
         }
     }
 
-    private class TimerThread extends Thread {
+    class TimerThread extends Thread {
         private final long timeToWait;
 
-        public TimerThread(long timeToWait) {
+        TimerThread(long timeToWait) {
             this.timeToWait = timeToWait;
         }
 
+        void performStopOperations() {
+            stopRequested = true;
+        }
+
         @Override
         public void run() {
             try {
@@ -465,7 +470,7 @@
             } catch (InterruptedException e) {
                 throw new IllegalStateException(e);
             } finally {
-                stopRequested = true;
+                performStopOperations();
             }
         }
     }
@@ -512,6 +517,7 @@
         private int count;
         private final Connection connection;
         private final ConnectionFactory connectionFactory;
+        boolean localStopRequested;
 
         WorkerThread(final Connection connection, final ConnectionFactory connectionFactory) {
             super("Worker Thread");
@@ -526,11 +532,11 @@
         public void run() {
             Promise<?, LdapException> promise;
             Connection connection;
-
             final double targetTimeInMS = 1000.0 / (targetThroughput / (double) (numThreads * numConnections));
             double sleepTimeInMS = 0;
-            long start;
-            while (!stopRequested && !(maxIterations > 0 && count >= maxIterations)) {
+
+            while (!stopRequested && !localStopRequested
+                    && (maxIterations <= 0 || count < maxIterations)) {
                 if (this.connection == null) {
                     try {
                         connection = connectionFactory.getConnectionAsync().getOrThrow();
@@ -565,15 +571,12 @@
                     }
                 }
 
-                start = System.nanoTime();
+                long start = System.nanoTime();
                 promise = performOperation(connection, dataSources.get(), start);
                 operationRecentCount.getAndIncrement();
-                count++;
                 if (!isAsync) {
                     try {
-                        if (promise != null) {
-                            promise.getOrThrow();
-                        }
+                        promise.getOrThrow();
                     } catch (final InterruptedException e) {
                         // Ignore and check stop requested
                         continue;
@@ -607,6 +610,10 @@
                 }
             }
         }
+
+        void incrementIterationCount() {
+            count++;
+        }
     }
 
     private static final String[] EMPTY_STRINGS = new String[0];
@@ -636,10 +643,10 @@
 
     };
 
-    private volatile boolean stopRequested;
+    int numThreads;
+    int numConnections;
+    volatile boolean stopRequested;
     private volatile boolean isWarmingUp;
-    private int numThreads;
-    private int numConnections;
     private int targetThroughput;
     private int maxIterations;
     /** Warm-up duration time in ms. **/
@@ -650,7 +657,6 @@
     private boolean noRebind;
     private int statsInterval;
     private final IntegerArgument numThreadsArgument;
-    private final IntegerArgument maxIterationsArgument;
     private final IntegerArgument maxDurationArgument;
     private final IntegerArgument statsIntervalArgument;
     private final IntegerArgument targetThroughputArgument;
@@ -660,8 +666,11 @@
     private final BooleanArgument noRebindArgument;
     private final BooleanArgument asyncArgument;
     private final StringArgument arguments;
+    protected final IntegerArgument maxIterationsArgument;
     protected final IntegerArgument warmUpArgument;
 
+    private final List<Thread> workerThreads = new ArrayList<Thread>();
+
     PerformanceRunner(final PerformanceRunnerOptions options) throws ArgumentException {
         ArgumentParser argParser = options.getArgumentParser();
 
@@ -686,8 +695,8 @@
 
         maxIterationsArgument =
                 new IntegerArgument("maxIterations", 'm', "maxIterations", false, false, true,
-                        LocalizableMessage.raw("{maxIterations}"), 0, null, LocalizableMessage
-                                .raw("Max iterations, 0 for unlimited"));
+                        LocalizableMessage.raw("{maxIterations}"), 0, null,
+                        LocalizableMessage.raw("Max iterations, 0 for unlimited"));
         maxIterationsArgument.setPropertyName("maxIterations");
         argParser.addArgument(maxIterationsArgument);
 
@@ -828,19 +837,19 @@
 
     final DataSource[] getDataSources() {
         if (dataSourcePrototypes == null) {
-            throw new IllegalStateException(
-                    "dataSources are null - validate() must be called first");
+            throw new IllegalStateException("dataSources are null - validate() must be called first");
         }
         return dataSourcePrototypes;
     }
 
-    abstract WorkerThread newWorkerThread(final Connection connection,
-            final ConnectionFactory connectionFactory);
-
+    abstract WorkerThread newWorkerThread(final Connection connection, final ConnectionFactory connectionFactory);
     abstract StatsThread newStatsThread();
 
+    TimerThread newEndTimerThread(final long timeTowait) {
+        return new TimerThread(timeTowait);
+    }
+
     final int run(final ConnectionFactory connectionFactory) {
-        final List<Thread> threads = new ArrayList<Thread>();
         final List<Connection> connections = new ArrayList<Connection>();
 
         Connection connection = null;
@@ -854,13 +863,13 @@
                 }
                 for (int j = 0; j < numThreads; j++) {
                     final Thread thread = newWorkerThread(connection, connectionFactory);
-                    threads.add(thread);
+                    workerThreads.add(thread);
                     thread.start();
                 }
             }
 
             if (maxDurationTime > 0) {
-                new TimerThread(maxDurationTime).start();
+                newEndTimerThread(maxDurationTime).start();
             }
 
             final StatsThread statsThread = newStatsThread();
@@ -873,11 +882,9 @@
                 statsThread.resetStats();
                 isWarmingUp = false;
             }
-            statsThread.start();
 
-            for (final Thread t : threads) {
-                t.join();
-            }
+            statsThread.start();
+            joinAllWorkerThreads();
             stopRequested = true;
             statsThread.join();
         } catch (final InterruptedException e) {
@@ -891,4 +898,10 @@
 
         return 0;
     }
+
+    protected void joinAllWorkerThreads() throws InterruptedException {
+        for (final Thread t : workerThreads) {
+            t.join();
+        }
+    }
 }

--
Gitblit v1.10.0