From c91053c42e0e0f2de8290d3352f09021bb81f949 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 30 Aug 2013 14:04:28 +0000
Subject: [PATCH] Final fix for OPENDJ-1112: LoadBalancing connection factories need better diagnostic messages

---
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java         |   14 
 opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java     |  219 +----------------
 opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java                          |  226 ++++++++++++++++++
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java       |   73 +++++
 opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java |   99 ++++++++
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java         |   78 +++++
 6 files changed, 484 insertions(+), 225 deletions(-)

diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
index 5abac02..cd99442 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -297,17 +297,9 @@
     private ScheduledFuture<?> monitoringFuture;
     private AtomicBoolean isClosed = new AtomicBoolean();
 
-    AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) {
-        this(factories, 1, TimeUnit.SECONDS, null);
-    }
-
     AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
-            final long interval, final TimeUnit unit) {
-        this(factories, interval, unit, null);
-    }
-
-    AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
-            final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) {
+            final LoadBalancerEventListener listener, final long interval, final TimeUnit unit,
+            final ScheduledExecutorService scheduler) {
         Validator.ensureNotNull(factories, unit);
 
         this.monitoredFactories = new ArrayList<MonitoredConnectionFactory>(factories.size());
@@ -318,7 +310,7 @@
         this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
         this.monitoringInterval = interval;
         this.monitoringIntervalTimeUnit = unit;
-        this.listener = DEFAULT_LISTENER;
+        this.listener = listener != null ? listener : DEFAULT_LISTENER;
     }
 
     @Override
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java
index 44c1ed1..9e9f6a1 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2010 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
@@ -35,10 +36,10 @@
  * underlying connection factories.
  * <p>
  * This algorithm is typically used for load-balancing <i>between</i> data
- * centers, where there is preference to always forward connection
- * requests to the <i>closest available</i> data center. This algorithm
- * contrasts with the {@link RoundRobinLoadBalancingAlgorithm} which is used for
- * load-balancing <i>within</i> a data center.
+ * centers, where there is preference to always forward connection requests to
+ * the <i>closest available</i> data center. This algorithm contrasts with the
+ * {@link RoundRobinLoadBalancingAlgorithm} which is used for load-balancing
+ * <i>within</i> a data center.
  * <p>
  * This algorithm selects connection factories based on the order in which they
  * were provided during construction. More specifically, an attempt to obtain a
@@ -69,7 +70,70 @@
      *            The ordered collection of connection factories.
      */
     public FailoverLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) {
-        super(factories);
+        this(factories, null, 1, TimeUnit.SECONDS, null);
+    }
+
+    /**
+     * Creates a new fail-over load balancing algorithm which will monitor
+     * offline connection factories every 1 second using the default scheduler.
+     *
+     * @param factories
+     *            The ordered collection of connection factories.
+     * @param listener
+     *            The event listener which should be notified whenever a
+     *            connection factory changes state from online to offline or
+     *            vice-versa.
+     */
+    public FailoverLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
+            final LoadBalancerEventListener listener) {
+        this(factories, listener, 1, TimeUnit.SECONDS, null);
+    }
+
+    /**
+     * Creates a new fail-over load balancing algorithm which will monitor
+     * offline connection factories using the specified frequency using the
+     * default scheduler.
+     *
+     * @param factories
+     *            The connection factories.
+     * @param listener
+     *            The event listener which should be notified whenever a
+     *            connection factory changes state from online to offline or
+     *            vice-versa.
+     * @param interval
+     *            The interval between attempts to poll offline factories.
+     * @param unit
+     *            The time unit for the interval between attempts to poll
+     *            offline factories.
+     */
+    public FailoverLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
+            final LoadBalancerEventListener listener, final long interval, final TimeUnit unit) {
+        this(factories, listener, interval, unit, null);
+    }
+
+    /**
+     * Creates a new fail-over load balancing algorithm which will monitor
+     * offline connection factories using the specified frequency and scheduler.
+     *
+     * @param factories
+     *            The connection factories.
+     * @param listener
+     *            The event listener which should be notified whenever a
+     *            connection factory changes state from online to offline or
+     *            vice-versa.
+     * @param interval
+     *            The interval between attempts to poll offline factories.
+     * @param unit
+     *            The time unit for the interval between attempts to poll
+     *            offline factories.
+     * @param scheduler
+     *            The scheduler which should for periodically monitoring dead
+     *            connection factories to see if they are usable again.
+     */
+    public FailoverLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
+            final LoadBalancerEventListener listener, final long interval, final TimeUnit unit,
+            final ScheduledExecutorService scheduler) {
+        super(factories, listener, interval, unit, scheduler);
     }
 
     /**
@@ -87,7 +151,7 @@
      */
     public FailoverLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
             final long interval, final TimeUnit unit) {
-        super(factories, interval, unit);
+        this(factories, null, interval, unit, null);
     }
 
     /**
@@ -107,7 +171,7 @@
      */
     public FailoverLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
             final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) {
-        super(factories, interval, unit, scheduler);
+        this(factories, null, interval, unit, scheduler);
     }
 
     /**
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java
index 2726a89..236f070 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2010 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
@@ -55,7 +56,6 @@
  */
 public final class RoundRobinLoadBalancingAlgorithm extends AbstractLoadBalancingAlgorithm {
     private final int maxIndex;
-
     private final AtomicInteger nextIndex = new AtomicInteger(-1);
 
     /**
@@ -66,7 +66,70 @@
      *            The ordered collection of connection factories.
      */
     public RoundRobinLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) {
-        super(factories);
+        this(factories, null, 1, TimeUnit.SECONDS, null);
+    }
+
+    /**
+     * Creates a new round robin load balancing algorithm which will monitor
+     * offline connection factories every 1 second using the default scheduler.
+     *
+     * @param factories
+     *            The ordered collection of connection factories.
+     * @param listener
+     *            The event listener which should be notified whenever a
+     *            connection factory changes state from online to offline or
+     *            vice-versa.
+     */
+    public RoundRobinLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
+            final LoadBalancerEventListener listener) {
+        this(factories, listener, 1, TimeUnit.SECONDS, null);
+    }
+
+    /**
+     * Creates a new round robin load balancing algorithm which will monitor
+     * offline connection factories using the specified frequency using the
+     * default scheduler.
+     *
+     * @param factories
+     *            The connection factories.
+     * @param listener
+     *            The event listener which should be notified whenever a
+     *            connection factory changes state from online to offline or
+     *            vice-versa.
+     * @param interval
+     *            The interval between attempts to poll offline factories.
+     * @param unit
+     *            The time unit for the interval between attempts to poll
+     *            offline factories.
+     */
+    public RoundRobinLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
+            final LoadBalancerEventListener listener, final long interval, final TimeUnit unit) {
+        this(factories, null, interval, unit, null);
+    }
+
+    /**
+     * Creates a new round robin load balancing algorithm which will monitor
+     * offline connection factories using the specified frequency and scheduler.
+     *
+     * @param factories
+     *            The connection factories.
+     * @param listener
+     *            The event listener which should be notified whenever a
+     *            connection factory changes state from online to offline or
+     *            vice-versa.
+     * @param interval
+     *            The interval between attempts to poll offline factories.
+     * @param unit
+     *            The time unit for the interval between attempts to poll
+     *            offline factories.
+     * @param scheduler
+     *            The scheduler which should for periodically monitoring dead
+     *            connection factories to see if they are usable again.
+     */
+    public RoundRobinLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
+            final LoadBalancerEventListener listener, final long interval, final TimeUnit unit,
+            final ScheduledExecutorService scheduler) {
+        super(factories, listener, interval, unit, scheduler);
         this.maxIndex = factories.size();
     }
 
@@ -85,8 +148,7 @@
      */
     public RoundRobinLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
             final long interval, final TimeUnit unit) {
-        super(factories, interval, unit);
-        this.maxIndex = factories.size();
+        this(factories, null, interval, unit, null);
     }
 
     /**
@@ -106,8 +168,7 @@
      */
     public RoundRobinLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
             final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) {
-        super(factories, interval, unit, scheduler);
-        this.maxIndex = factories.size();
+        this(factories, null, interval, unit, scheduler);
     }
 
     /**
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
index 82aa5a2..b8f4815 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
@@ -31,18 +31,57 @@
 import static org.forgerock.opendj.ldap.Connections.newLoadBalancer;
 import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.forgerock.opendj.util.CompletedFutureResult;
 import com.forgerock.opendj.util.StaticUtils;
 
 @SuppressWarnings("javadoc")
 public class AbstractLoadBalancingAlgorithmTestCase extends SdkTestCase {
+    private static ConnectionFactory mockAsync(final ConnectionFactory mock) {
+        return new ConnectionFactory() {
+            @Override
+            public void close() {
+                mock.close();
+            }
+
+            @Override
+            public Connection getConnection() throws ErrorResultException {
+                return mock.getConnection();
+            }
+
+            @Override
+            public FutureResult<Connection> getConnectionAsync(
+                    final ResultHandler<? super Connection> handler) {
+                try {
+                    final Connection connection = mock.getConnection();
+                    if (handler != null) {
+                        handler.handleResult(connection);
+                    }
+                    return new CompletedFutureResult<Connection>(connection);
+                } catch (final ErrorResultException e) {
+                    if (handler != null) {
+                        handler.handleErrorResult(e);
+                    }
+                    return new CompletedFutureResult<Connection>(e);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return mock.toString();
+            }
+        };
+    }
 
     /**
      * Disables logging before the tests.
@@ -69,11 +108,11 @@
         /*
          * Create load-balancer with two failed connection factories.
          */
-        final ConnectionFactory first = mock(ConnectionFactory.class);
+        final ConnectionFactory first = mock(ConnectionFactory.class, "first");
         final ErrorResultException firstError = newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
         when(first.getConnection()).thenThrow(firstError);
 
-        final ConnectionFactory second = mock(ConnectionFactory.class);
+        final ConnectionFactory second = mock(ConnectionFactory.class, "second");
         final ErrorResultException secondError = newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
         when(second.getConnection()).thenThrow(secondError);
 
@@ -89,10 +128,64 @@
         try {
             loadBalancer.getConnection().close();
             fail("Unexpectedly obtained a connection");
-        } catch (ErrorResultException e) {
+        } catch (final ErrorResultException e) {
             assertThat(e.getCause()).isSameAs(secondError);
         } finally {
             loadBalancer.close();
         }
     }
+
+    /**
+     * Tests fix for OPENDJ-1112: event listener should be notified when a
+     * factory changes state from online to offline or vice versa.
+     */
+    @Test
+    public void testLoadBalancerEventListenerNotification() throws Exception {
+        /*
+         * Create load-balancer with two failed connection factories and a mock
+         * listener and scheduler. The first factory will succeed on the second
+         * attempt.
+         */
+        final ConnectionFactory first = mock(ConnectionFactory.class, "first");
+        final ConnectionFactory firstAsync = mockAsync(first);
+        final ErrorResultException firstError = newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
+        when(first.getConnection()).thenThrow(firstError).thenReturn(mock(Connection.class));
+
+        final ConnectionFactory second = mock(ConnectionFactory.class, "second");
+        final ConnectionFactory secondAsync = mockAsync(second);
+        final ErrorResultException secondError = newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
+        when(second.getConnection()).thenThrow(secondError);
+
+        final LoadBalancerEventListener listener = mock(LoadBalancerEventListener.class);
+        final MockScheduler scheduler = new MockScheduler();
+        final ConnectionFactory loadBalancer =
+                newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(
+                        asList(firstAsync, secondAsync), listener, 1, TimeUnit.SECONDS, scheduler));
+
+        /*
+         * Belt and braces check to ensure that factory methods don't return
+         * same instance and fool this test.
+         */
+        assertThat(firstError).isNotSameAs(secondError);
+
+        try {
+            loadBalancer.getConnection().close();
+            fail("Unexpectedly obtained a connection");
+        } catch (final ErrorResultException e) {
+            // Check that the event listener has been fired for both factories.
+            verify(listener).handleConnectionFactoryOffline(firstAsync, firstError);
+            verify(listener).handleConnectionFactoryOffline(secondAsync, secondError);
+            verifyNoMoreInteractions(listener);
+
+            // Check that the factories are being monitored.
+            assertThat(scheduler.isScheduled()).isTrue();
+
+            // Forcefully run the monitor task and check that the first factory is online.
+            scheduler.getCommand().run();
+            verify(listener).handleConnectionFactoryOnline(firstAsync);
+            verifyNoMoreInteractions(listener);
+        } finally {
+            loadBalancer.close();
+        }
+    }
 }
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
index ab5a5a5..06e5337 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
@@ -43,18 +43,9 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.forgerock.opendj.ldap.requests.SearchRequest;
 import org.forgerock.opendj.ldap.responses.Result;
@@ -87,174 +78,6 @@
 
     // @formatter:on
 
-    private static final class MockScheduler implements ScheduledExecutorService {
-
-        private Runnable command;
-
-        private long delay;
-
-        private boolean isScheduled = false;
-
-        private TimeUnit unit;
-
-        @Override
-        public boolean awaitTermination(final long timeout, final TimeUnit unit)
-                throws InterruptedException {
-            // Unused.
-            return false;
-        }
-
-        @Override
-        public void execute(final Runnable command) {
-            // Unused.
-
-        }
-
-        @Override
-        public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
-                throws InterruptedException {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
-                final long timeout, final TimeUnit unit) throws InterruptedException {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public <T> T invokeAny(final Collection<? extends Callable<T>> tasks)
-                throws InterruptedException, ExecutionException {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout,
-                final TimeUnit unit) throws InterruptedException, ExecutionException,
-                TimeoutException {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public boolean isShutdown() {
-            // Unused.
-            return false;
-        }
-
-        @Override
-        public boolean isTerminated() {
-            // Unused.
-            return false;
-        }
-
-        @Override
-        public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay,
-                final TimeUnit unit) {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public ScheduledFuture<?> schedule(final Runnable command, final long delay,
-                final TimeUnit unit) {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
-                final long initialDelay, final long period, final TimeUnit unit) {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
-                final long initialDelay, final long delay, final TimeUnit unit) {
-            this.command = command;
-            this.delay = delay;
-            this.unit = unit;
-            this.isScheduled = true;
-            return new ScheduledFuture<Object>() {
-                @Override
-                public boolean cancel(final boolean mayInterruptIfRunning) {
-                    isScheduled = false;
-                    return true;
-                }
-
-                @Override
-                public int compareTo(final Delayed o) {
-                    // Unused.
-                    return 0;
-                }
-
-                @Override
-                public Object get() throws InterruptedException, ExecutionException {
-                    // Unused.
-                    return null;
-                }
-
-                @Override
-                public Object get(final long timeout, final TimeUnit unit)
-                        throws InterruptedException, ExecutionException, TimeoutException {
-                    // Unused.
-                    return null;
-                }
-
-                @Override
-                public long getDelay(final TimeUnit unit) {
-                    // Unused.
-                    return 0;
-                }
-
-                @Override
-                public boolean isCancelled() {
-                    return !isScheduled;
-                }
-
-                @Override
-                public boolean isDone() {
-                    // Unused.
-                    return false;
-                }
-
-            };
-        }
-
-        @Override
-        public void shutdown() {
-            // Unused.
-        }
-
-        @Override
-        public List<Runnable> shutdownNow() {
-            // Unused.
-            return Collections.emptyList();
-        }
-
-        @Override
-        public <T> Future<T> submit(final Callable<T> task) {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public Future<?> submit(final Runnable task) {
-            // Unused.
-            return null;
-        }
-
-        @Override
-        public <T> Future<T> submit(final Runnable task, final T result) {
-            // Unused.
-            return null;
-        }
-    }
-
     @Test(enabled = false)
     public void testHeartBeatTimeout() throws Exception {
         // Mock connection which never responds to heartbeat.
@@ -273,10 +96,10 @@
 
         // First connection should cause heart beat to be scheduled.
         final Connection hbc = hbcf.getConnection();
-        assertThat(scheduler.isScheduled).isTrue();
-        assertThat(scheduler.command).isNotNull();
-        assertThat(scheduler.delay).isEqualTo(0);
-        assertThat(scheduler.unit).isEqualTo(TimeUnit.MILLISECONDS);
+        assertThat(scheduler.isScheduled()).isTrue();
+        assertThat(scheduler.getCommand()).isNotNull();
+        assertThat(scheduler.getDelay()).isEqualTo(0);
+        assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
         assertThat(listeners).hasSize(1);
 
         // The connection should be immediately invalid due to 0 timeout.
@@ -287,7 +110,7 @@
          * Attempt to send heartbeat. This should trigger the connection to be
          * closed and all subsequent request attempts to fail.
          */
-        scheduler.command.run();
+        scheduler.getCommand().run();
 
         // The underlying connection should have been closed.
         verify(connection).close();
@@ -296,7 +119,7 @@
          * ...and the scheduled heart beat stopped because there are no
          * remaining connections.
          */
-        assertThat(scheduler.isScheduled).isFalse();
+        assertThat(scheduler.isScheduled()).isFalse();
 
         // Attempt to send a new request: it should fail immediately.
         @SuppressWarnings("unchecked")
@@ -332,23 +155,23 @@
                 newHeartBeatConnectionFactory(factory, 0, TimeUnit.MILLISECONDS, hb, scheduler);
 
         // Heart beat should not be scheduled yet.
-        assertThat(scheduler.isScheduled).isFalse();
+        assertThat(scheduler.isScheduled()).isFalse();
 
         // First connection should cause heart beat to be scheduled.
         hbcf.getConnection();
-        assertThat(scheduler.isScheduled).isTrue();
-        assertThat(scheduler.command).isNotNull();
-        assertThat(scheduler.delay).isEqualTo(0);
-        assertThat(scheduler.unit).isEqualTo(TimeUnit.MILLISECONDS);
+        assertThat(scheduler.isScheduled()).isTrue();
+        assertThat(scheduler.getCommand()).isNotNull();
+        assertThat(scheduler.getDelay()).isEqualTo(0);
+        assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
         assertThat(listeners1).hasSize(1);
 
         // Second connection should not change anything.
         hbcf.getConnection();
-        assertThat(scheduler.isScheduled).isTrue();
+        assertThat(scheduler.isScheduled()).isTrue();
         assertThat(listeners2).hasSize(1);
 
         // Check heart-beat sent to both connections.
-        scheduler.command.run();
+        scheduler.getCommand().run();
         verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
                 any(SearchResultHandler.class));
         verify(connection2, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
@@ -358,11 +181,11 @@
 
         // Close first connection: heart beat should still be scheduled.
         listeners1.get(0).handleConnectionClosed();
-        assertThat(scheduler.isScheduled).isTrue();
+        assertThat(scheduler.isScheduled()).isTrue();
         assertThat(listeners1).isEmpty();
 
         // Check heart-beat only sent to second connection.
-        scheduler.command.run();
+        scheduler.getCommand().run();
         verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
                 any(SearchResultHandler.class));
         verify(connection2, times(2)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
@@ -372,19 +195,19 @@
 
         // Close second connection: heart beat should now be stopped.
         listeners2.get(0).handleConnectionClosed();
-        assertThat(scheduler.isScheduled).isFalse();
+        assertThat(scheduler.isScheduled()).isFalse();
         assertThat(listeners2).isEmpty();
 
         // Opening another connection should restart the heart beat.
         hbcf.getConnection();
-        assertThat(scheduler.isScheduled).isTrue();
-        assertThat(scheduler.command).isNotNull();
-        assertThat(scheduler.delay).isEqualTo(0);
-        assertThat(scheduler.unit).isEqualTo(TimeUnit.MILLISECONDS);
+        assertThat(scheduler.isScheduled()).isTrue();
+        assertThat(scheduler.getCommand()).isNotNull();
+        assertThat(scheduler.getDelay()).isEqualTo(0);
+        assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
         assertThat(listeners3).hasSize(1);
 
         // Check heart-beat only sent to third connection.
-        scheduler.command.run();
+        scheduler.getCommand().run();
         verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
                 any(SearchResultHandler.class));
         verify(connection2, times(2)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
new file mode 100644
index 0000000..af7ff5c
--- /dev/null
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
@@ -0,0 +1,226 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A mock scheduled executor which allows unit tests to directly invoke
+ * scheduled tasks without needing to wait.
+ */
+final class MockScheduler implements ScheduledExecutorService {
+
+    // Saved scheduled task.
+    private Runnable command;
+    private long delay;
+    private boolean isScheduled = false;
+    private TimeUnit unit;
+
+    MockScheduler() {
+        // Nothing to do.
+    }
+
+    @Override
+    public boolean awaitTermination(final long timeout, final TimeUnit unit)
+            throws InterruptedException {
+        // Unused.
+        return false;
+    }
+
+    @Override
+    public void execute(final Runnable command) {
+        // Unused.
+
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
+            throws InterruptedException {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
+            final long timeout, final TimeUnit unit) throws InterruptedException {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks)
+            throws InterruptedException, ExecutionException {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout,
+            final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        // Unused.
+        return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        // Unused.
+        return false;
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay,
+            final TimeUnit unit) {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay,
+            final long period, final TimeUnit unit) {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
+            final long initialDelay, final long delay, final TimeUnit unit) {
+        this.command = command;
+        this.delay = delay;
+        this.unit = unit;
+        this.isScheduled = true;
+        return new ScheduledFuture<Object>() {
+            @Override
+            public boolean cancel(final boolean mayInterruptIfRunning) {
+                isScheduled = false;
+                return true;
+            }
+
+            @Override
+            public int compareTo(final Delayed o) {
+                // Unused.
+                return 0;
+            }
+
+            @Override
+            public Object get() throws InterruptedException, ExecutionException {
+                // Unused.
+                return null;
+            }
+
+            @Override
+            public Object get(final long timeout, final TimeUnit unit) throws InterruptedException,
+                    ExecutionException, TimeoutException {
+                // Unused.
+                return null;
+            }
+
+            @Override
+            public long getDelay(final TimeUnit unit) {
+                // Unused.
+                return 0;
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return !isScheduled;
+            }
+
+            @Override
+            public boolean isDone() {
+                // Unused.
+                return false;
+            }
+
+        };
+    }
+
+    @Override
+    public void shutdown() {
+        // Unused.
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        // Unused.
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <T> Future<T> submit(final Callable<T> task) {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public Future<?> submit(final Runnable task) {
+        // Unused.
+        return null;
+    }
+
+    @Override
+    public <T> Future<T> submit(final Runnable task, final T result) {
+        // Unused.
+        return null;
+    }
+
+    Runnable getCommand() {
+        return command;
+    }
+
+    long getDelay() {
+        return delay;
+    }
+
+    TimeUnit getUnit() {
+        return unit;
+    }
+
+    boolean isScheduled() {
+        return isScheduled;
+    }
+}

--
Gitblit v1.10.0