From 3e1db08dce649f6d426aae67d506c8c23b15f6e8 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 18 Sep 2013 16:05:59 +0000
Subject: [PATCH] Fix OPENDJ-1058 – HeartbeatConnectionFactory does not actively shutdown dead connections

---
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java                          |  192 ++-
 opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json                              |    6 
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java                |   18 
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java |    2 
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java              |    6 
 opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java                    |  250 ++++
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java                   |    6 
 opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java                        |   17 
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java                          |   22 
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java             | 1342 +++++++++++++++----------
 opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java        |  132 ++
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java                             |   52 +
 opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties                        |    4 
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java                            |   31 
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java     |  427 +++++--
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java                   |   31 
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java                 |   24 
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java                  |   53 +
 opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java                        |  387 +++++++
 opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java                            |    1 
 20 files changed, 2,198 insertions(+), 805 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java
new file mode 100644
index 0000000..5d4190a
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/ConnectionState.java
@@ -0,0 +1,387 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS.
+ */
+package com.forgerock.opendj.ldap;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.forgerock.opendj.ldap.ConnectionEventListener;
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+
+/**
+ * This class can be used to manage the internal state of a connection, ensuring
+ * valid and atomic state transitions, as well as connection event listener
+ * notification. There are 4 states:
+ * <ul>
+ * <li>connection is <b>valid</b> (isClosed()=false, isFailed()=false): can fail
+ * or be closed
+ * <li>connection has failed due to an <b>error</b> (isClosed()=false,
+ * isFailed()=true): can be closed
+ * <li>connection has been <b>closed</b> by the application (isClosed()=true,
+ * isFailed()=false): terminal state
+ * <li>connection has failed due to an <b>error</b> and has been <b>closed</b>
+ * by the application (isClosed()=true, isFailed()=true): terminal state
+ * </ul>
+ * All methods are synchronized and container classes may also synchronize on
+ * the state where needed. The state transition methods,
+ * {@link #notifyConnectionClosed()} and
+ * {@link #notifyConnectionError(boolean, ErrorResultException)}, correspond to
+ * methods in the {@link ConnectionEventListener} interface except that they
+ * return a boolean indicating whether the transition was successful or not.
+ */
+public final class ConnectionState {
+    /*
+     * FIXME: The synchronization in this class has been kept simple for now.
+     * However, ideally we should notify listeners without synchronizing on the
+     * state in case a listener takes a long time to complete.
+     */
+
+    /*
+     * FIXME: This class should be used by connection pool and ldap connection
+     * implementations as well.
+     */
+
+    /**
+     * Use the State design pattern to manage state transitions.
+     */
+    private enum State {
+
+        /**
+         * Connection has not encountered an error nor has it been closed
+         * (initial state).
+         */
+        VALID() {
+            @Override
+            void addConnectionEventListener(final ConnectionState cs,
+                    final ConnectionEventListener listener) {
+                cs.listeners.add(listener);
+            }
+
+            @Override
+            boolean isClosed() {
+                return false;
+            }
+
+            @Override
+            boolean isFailed() {
+                return false;
+            }
+
+            @Override
+            boolean isValid() {
+                return true;
+            }
+
+            @Override
+            boolean notifyConnectionClosed(final ConnectionState cs) {
+                for (final ConnectionEventListener listener : cs.listeners) {
+                    listener.handleConnectionClosed();
+                }
+                cs.state = CLOSED;
+                return true;
+            }
+
+            @Override
+            boolean notifyConnectionError(final ConnectionState cs,
+                    final boolean isDisconnectNotification, final ErrorResultException error) {
+                // Transition from valid to error state.
+                cs.failedDueToDisconnect = isDisconnectNotification;
+                cs.connectionError = error;
+                for (final ConnectionEventListener listener : cs.listeners) {
+                    // Use the reason provided in the disconnect notification.
+                    listener.handleConnectionError(isDisconnectNotification, error);
+                }
+                cs.state = ERROR;
+                return true;
+            }
+
+            @Override
+            void notifyUnsolicitedNotification(final ConnectionState cs,
+                    final ExtendedResult notification) {
+                for (final ConnectionEventListener listener : cs.listeners) {
+                    listener.handleUnsolicitedNotification(notification);
+                }
+            }
+        },
+
+        /**
+         * Connection has encountered an error, but has not been closed.
+         */
+        ERROR() {
+            @Override
+            void addConnectionEventListener(final ConnectionState cs,
+                    final ConnectionEventListener listener) {
+                listener.handleConnectionError(cs.failedDueToDisconnect, cs.connectionError);
+                cs.listeners.add(listener);
+            }
+
+            @Override
+            boolean isClosed() {
+                return false;
+            }
+
+            @Override
+            boolean isFailed() {
+                return true;
+            }
+
+            @Override
+            boolean isValid() {
+                return false;
+            }
+
+            @Override
+            boolean notifyConnectionClosed(final ConnectionState cs) {
+                for (final ConnectionEventListener listener : cs.listeners) {
+                    listener.handleConnectionClosed();
+                }
+                cs.state = ERROR_CLOSED;
+                return true;
+            }
+        },
+
+        /**
+         * Connection has been closed (terminal state).
+         */
+        CLOSED() {
+            @Override
+            void addConnectionEventListener(final ConnectionState cs,
+                    final ConnectionEventListener listener) {
+                listener.handleConnectionClosed();
+            }
+
+            @Override
+            boolean isClosed() {
+                return true;
+            }
+
+            @Override
+            boolean isFailed() {
+                return false;
+            }
+
+            @Override
+            boolean isValid() {
+                return false;
+            }
+        },
+
+        /**
+         * Connection has encountered an error and has been closed (terminal
+         * state).
+         */
+        ERROR_CLOSED() {
+            @Override
+            void addConnectionEventListener(final ConnectionState cs,
+                    final ConnectionEventListener listener) {
+                listener.handleConnectionError(cs.failedDueToDisconnect, cs.connectionError);
+                listener.handleConnectionClosed();
+            }
+
+            @Override
+            boolean isClosed() {
+                return true;
+            }
+
+            @Override
+            boolean isFailed() {
+                return true;
+            }
+
+            @Override
+            boolean isValid() {
+                return false;
+            }
+        };
+
+        abstract void addConnectionEventListener(ConnectionState cs,
+                final ConnectionEventListener listener);
+
+        abstract boolean isClosed();
+
+        abstract boolean isFailed();
+
+        abstract boolean isValid();
+
+        boolean notifyConnectionClosed(final ConnectionState cs) {
+            return false;
+        }
+
+        boolean notifyConnectionError(final ConnectionState cs,
+                final boolean isDisconnectNotification, final ErrorResultException error) {
+            return false;
+        }
+
+        void notifyUnsolicitedNotification(final ConnectionState cs,
+                final ExtendedResult notification) {
+            // Do nothing by default.
+        }
+    }
+
+    /**
+     * Non-{@code null} once the connection has failed due to a connection
+     * error. Volatile so that it can be read without synchronization.
+     */
+    private volatile ErrorResultException connectionError = null;
+
+    /**
+     * {@code true} if the connection has failed due to a disconnect
+     * notification.
+     */
+    private boolean failedDueToDisconnect = false;
+
+    /**
+     * Registered event listeners.
+     */
+    private final List<ConnectionEventListener> listeners =
+            new LinkedList<ConnectionEventListener>();
+
+    /**
+     * Internal state implementation.
+     */
+    private volatile State state = State.VALID;
+
+    /**
+     * Creates a new connection state which is initially valid.
+     */
+    public ConnectionState() {
+        // Nothing to do.
+    }
+
+    /**
+     * Registers the provided connection event listener so that it will be
+     * notified when this connection is closed by the application, receives an
+     * unsolicited notification, or experiences a fatal error.
+     *
+     * @param listener
+     *            The listener which wants to be notified when events occur on
+     *            this connection.
+     * @throws IllegalStateException
+     *             If this connection has already been closed, i.e. if
+     *             {@code isClosed() == true}.
+     * @throws NullPointerException
+     *             If the {@code listener} was {@code null}.
+     */
+    public synchronized void addConnectionEventListener(final ConnectionEventListener listener) {
+        state.addConnectionEventListener(this, listener);
+    }
+
+    /**
+     * Returns the error that caused the connection to fail, or {@code null} if
+     * the connection has not failed.
+     *
+     * @return The error that caused the connection to fail, or {@code null} if
+     *         the connection has not failed.
+     */
+    public ErrorResultException getConnectionError() {
+        return connectionError;
+    }
+
+    /**
+     * Indicates whether or not this connection has been explicitly closed by
+     * calling {@code close}. This method will not return {@code true} if a
+     * fatal error has occurred on the connection unless {@code close} has been
+     * called.
+     *
+     * @return {@code true} if this connection has been explicitly closed by
+     *         calling {@code close}, or {@code false} otherwise.
+     */
+    public boolean isClosed() {
+        return state.isClosed();
+    }
+
+    /**
+     * Returns {@code true} if the associated connection has not been closed and
+     * no fatal errors have been detected.
+     *
+     * @return {@code true} if this connection is valid, {@code false}
+     *         otherwise.
+     */
+    public boolean isValid() {
+        return state.isValid();
+    }
+
+    /**
+     * Attempts to transition this connection state to closed and invokes event
+     * listeners if successful.
+     *
+     * @return {@code true} if the state changed to closed, or {@code false} if
+     *         the state was already closed.
+     * @see ConnectionEventListener#handleConnectionClosed()
+     */
+    public synchronized boolean notifyConnectionClosed() {
+        return state.notifyConnectionClosed(this);
+    }
+
+    /**
+     * Attempts to transition this connection state to error and invokes event
+     * listeners if successful.
+     *
+     * @param isDisconnectNotification
+     *            {@code true} if the error was triggered by a disconnect
+     *            notification sent by the server, otherwise {@code false}.
+     * @param error
+     *            The exception that is about to be thrown to the application.
+     * @return {@code true} if the state changed to error, or {@code false} if
+     *         the state was already error or closed.
+     * @see ConnectionEventListener#handleConnectionError(boolean,
+     *      ErrorResultException)
+     */
+    public synchronized boolean notifyConnectionError(final boolean isDisconnectNotification,
+            final ErrorResultException error) {
+        return state.notifyConnectionError(this, isDisconnectNotification, error);
+    }
+
+    /**
+     * Notifies event listeners of the provided unsolicited notification if the
+     * state is valid.
+     *
+     * @param notification
+     *            The unsolicited notification.
+     * @see ConnectionEventListener#handleUnsolicitedNotification(ExtendedResult)
+     */
+    public synchronized void notifyUnsolicitedNotification(final ExtendedResult notification) {
+        state.notifyUnsolicitedNotification(this, notification);
+    }
+
+    /**
+     * Removes the provided connection event listener from this connection so
+     * that it will no longer be notified when this connection is closed by the
+     * application, receives an unsolicited notification, or experiences a fatal
+     * error.
+     *
+     * @param listener
+     *            The listener which no longer wants to be notified when events
+     *            occur on this connection.
+     * @throws NullPointerException
+     *             If the {@code listener} was {@code null}.
+     */
+    public synchronized void removeConnectionEventListener(final ConnectionEventListener listener) {
+        listeners.remove(listener);
+    }
+
+}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java
index c68229d..83914d4 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/FutureResultTransformer.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS.
  */
 
 package com.forgerock.opendj.util;
@@ -44,15 +45,12 @@
  *            The type of the outer result.
  */
 public abstract class FutureResultTransformer<M, N> implements FutureResult<N>, ResultHandler<M> {
-
     private final ResultHandler<? super N> handler;
-
     private volatile FutureResult<? extends M> future = null;
 
     // These do not need to be volatile since the future acts as a memory
     // barrier.
     private N transformedResult = null;
-
     private ErrorResultException transformedErrorResult = null;
 
     /**
@@ -77,8 +75,11 @@
      * {@inheritDoc}
      */
     public final N get() throws ErrorResultException, InterruptedException {
-        future.get();
-
+        try {
+            future.get();
+        } catch (ErrorResultException ignored) {
+            // Ignore untransformed error.
+        }
         // The handlers are guaranteed to have been invoked at this point.
         return get0();
     }
@@ -88,8 +89,11 @@
      */
     public final N get(final long timeout, final TimeUnit unit) throws ErrorResultException,
             TimeoutException, InterruptedException {
-        future.get(timeout, unit);
-
+        try {
+            future.get(timeout, unit);
+        } catch (ErrorResultException ignored) {
+            // Ignore untransformed error.
+        }
         // The handlers are guaranteed to have been invoked at this point.
         return get0();
     }
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java
new file mode 100644
index 0000000..4c9a368
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/TimeSource.java
@@ -0,0 +1,52 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS.
+ */
+
+package com.forgerock.opendj.util;
+
+/**
+ * An interface which is used by various components in order to obtain the
+ * current time.
+ */
+public interface TimeSource {
+    /**
+     * Default implementation which delegates to
+     * {@link System#currentTimeMillis()}.
+     */
+    public static final TimeSource DEFAULT = new TimeSource() {
+
+        @Override
+        public long currentTimeMillis() {
+            return System.currentTimeMillis();
+        }
+    };
+
+    /**
+     * Returns the current time in milli-seconds.
+     *
+     * @return The current time in milli-seconds.
+     */
+    long currentTimeMillis();
+}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index 48511a7..c2176e9 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -35,7 +35,6 @@
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -67,6 +66,7 @@
 import com.forgerock.opendj.util.AsynchronousFutureResult;
 import com.forgerock.opendj.util.CompletedFutureResult;
 import com.forgerock.opendj.util.ReferenceCountedObject;
+import com.forgerock.opendj.util.TimeSource;
 import com.forgerock.opendj.util.Validator;
 
 /**
@@ -552,7 +552,7 @@
                  * since we don't want to hold the lock too long.
                  */
                 idleConnections = new LinkedList<Connection>();
-                final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis;
+                final long timeoutMillis = timeSource.currentTimeMillis() - idleTimeoutMillis;
                 int nonCoreConnectionCount = currentPoolSize() - corePoolSize;
                 for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0
                         && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) {
@@ -631,10 +631,10 @@
     }
 
     /**
-     * This is intended for unit testing only in order to inject fake time
-     * stamps. Use System.currentTimeMillis() when null.
+     * This is package private in order to allow unit tests to inject fake time
+     * stamps.
      */
-    Callable<Long> testTimeSource = null;
+    TimeSource timeSource = TimeSource.DEFAULT;
 
     private final Semaphore availableConnections;
     private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
@@ -740,7 +740,7 @@
                 } else if (hasWaitingConnections()) {
                     holder = queue.removeFirst();
                 } else {
-                    holder = new QueueElement(handler, currentTimeMillis());
+                    holder = new QueueElement(handler, timeSource.currentTimeMillis());
                     queue.add(holder);
                 }
             }
@@ -802,23 +802,6 @@
         return maxPoolSize - availableConnections.availablePermits();
     }
 
-    /*
-     * This method delegates to System.currentTimeMillis() except in unit tests
-     * where we use injected times.
-     */
-    private long currentTimeMillis() {
-        if (testTimeSource == null) {
-            return System.currentTimeMillis();
-        } else {
-            try {
-                return testTimeSource.call();
-            } catch (final Exception e) {
-                // Should not happen.
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
     private boolean hasWaitingConnections() {
         return !queue.isEmpty() && !queue.getFirst().isWaitingFuture();
     }
@@ -839,7 +822,7 @@
                 connectionPoolIsClosing = true;
                 holder = null;
             } else {
-                holder = new QueueElement(connection, currentTimeMillis());
+                holder = new QueueElement(connection, timeSource.currentTimeMillis());
                 queue.add(holder);
                 return;
             }
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index 985c887..3b0b6df 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -245,7 +245,8 @@
      * Creates a new heart-beat connection factory which will create connections
      * using the provided connection factory and periodically ping any created
      * connections in order to detect that they are still alive every 10 seconds
-     * using the default scheduler.
+     * using the default scheduler. Connections will be marked as having failed
+     * if a heart-beat takes longer than 500ms.
      *
      * @param factory
      *            The connection factory to use for creating connections.
@@ -254,7 +255,8 @@
      *             If {@code factory} was {@code null}.
      */
     public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory) {
-        return new HeartBeatConnectionFactory(factory);
+        return new HeartBeatConnectionFactory(factory, 10000, 500, TimeUnit.MILLISECONDS, null,
+                null);
     }
 
     /**
@@ -267,6 +269,9 @@
      *            The connection factory to use for creating connections.
      * @param interval
      *            The interval between keepalive pings.
+     * @param timeout
+     *            The heart-beat timeout after which a connection will be marked
+     *            as failed.
      * @param unit
      *            The time unit for the interval between keepalive pings.
      * @return The new heart-beat connection factory.
@@ -276,8 +281,8 @@
      *             If {@code factory} or {@code unit} was {@code null}.
      */
     public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory,
-            final long interval, final TimeUnit unit) {
-        return new HeartBeatConnectionFactory(factory, interval, unit);
+            final long interval, final long timeout, final TimeUnit unit) {
+        return new HeartBeatConnectionFactory(factory, interval, timeout, unit, null, null);
     }
 
     /**
@@ -290,6 +295,9 @@
      *            The connection factory to use for creating connections.
      * @param interval
      *            The interval between keepalive pings.
+     * @param timeout
+     *            The heart-beat timeout after which a connection will be marked
+     *            as failed.
      * @param unit
      *            The time unit for the interval between keepalive pings.
      * @param heartBeat
@@ -302,8 +310,9 @@
      *             {@code null}.
      */
     public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory,
-            final long interval, final TimeUnit unit, final SearchRequest heartBeat) {
-        return new HeartBeatConnectionFactory(factory, interval, unit, heartBeat);
+            final long interval, final long timeout, final TimeUnit unit,
+            final SearchRequest heartBeat) {
+        return new HeartBeatConnectionFactory(factory, interval, timeout, unit, heartBeat, null);
     }
 
     /**
@@ -316,6 +325,9 @@
      *            The connection factory to use for creating connections.
      * @param interval
      *            The interval between keepalive pings.
+     * @param timeout
+     *            The heart-beat timeout after which a connection will be marked
+     *            as failed.
      * @param unit
      *            The time unit for the interval between keepalive pings.
      * @param heartBeat
@@ -331,9 +343,10 @@
      *             {@code null}.
      */
     public static ConnectionFactory newHeartBeatConnectionFactory(final ConnectionFactory factory,
-            final long interval, final TimeUnit unit, final SearchRequest heartBeat,
-            final ScheduledExecutorService scheduler) {
-        return new HeartBeatConnectionFactory(factory, interval, unit, heartBeat, scheduler);
+            final long interval, final long timeout, final TimeUnit unit,
+            final SearchRequest heartBeat, final ScheduledExecutorService scheduler) {
+        return new HeartBeatConnectionFactory(factory, interval, timeout, unit, heartBeat,
+                scheduler);
     }
 
     /**
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java
index e9cf922..45949d3 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ErrorResultException.java
@@ -72,7 +72,7 @@
      *             If {@code resultCode} was {@code null}.
      */
     public static ErrorResultException newErrorResult(ResultCode resultCode,
-            String diagnosticMessage) {
+            CharSequence diagnosticMessage) {
         return newErrorResult(resultCode, diagnosticMessage, null);
     }
 
@@ -114,10 +114,10 @@
      *             If {@code resultCode} was {@code null}.
      */
     public static ErrorResultException newErrorResult(ResultCode resultCode,
-            String diagnosticMessage, Throwable cause) {
+            CharSequence diagnosticMessage, Throwable cause) {
         final Result result = Responses.newResult(resultCode);
         if (diagnosticMessage != null) {
-            result.setDiagnosticMessage(diagnosticMessage);
+            result.setDiagnosticMessage(diagnosticMessage.toString());
         } else if (cause != null) {
             result.setDiagnosticMessage(cause.getLocalizedMessage());
         }
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 669195f..89c115f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,23 +27,27 @@
 
 package org.forgerock.opendj.ldap;
 
-import static com.forgerock.opendj.util.StaticUtils.*;
-import static java.lang.System.*;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
+import static org.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT;
+import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_FAILED;
+import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
 
-import static org.forgerock.opendj.ldap.ErrorResultException.*;
-
-import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import java.util.logging.Level;
 
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
 import org.forgerock.opendj.ldap.requests.AddRequest;
 import org.forgerock.opendj.ldap.requests.BindRequest;
 import org.forgerock.opendj.ldap.requests.CompareRequest;
@@ -54,30 +58,297 @@
 import org.forgerock.opendj.ldap.requests.Requests;
 import org.forgerock.opendj.ldap.requests.SearchRequest;
 import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.BindResult;
 import org.forgerock.opendj.ldap.responses.CompareResult;
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
-import org.forgerock.opendj.ldap.responses.GenericExtendedResult;
 import org.forgerock.opendj.ldap.responses.Result;
 import org.forgerock.opendj.ldap.responses.SearchResultEntry;
 import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.opendj.ldif.ConnectionEntryReader;
 
+import com.forgerock.opendj.ldap.ConnectionState;
 import com.forgerock.opendj.util.AsynchronousFutureResult;
+import com.forgerock.opendj.util.CompletedFutureResult;
 import com.forgerock.opendj.util.FutureResultTransformer;
+import com.forgerock.opendj.util.RecursiveFutureResult;
 import com.forgerock.opendj.util.ReferenceCountedObject;
+import com.forgerock.opendj.util.TimeSource;
 import com.forgerock.opendj.util.Validator;
 
 /**
  * An heart beat connection factory can be used to create connections that sends
  * a periodic search request to a Directory Server.
+ * <p>
+ * Before returning new connections to the application this factory will first
+ * send an initial heart beat in order to determine that the remote server is
+ * responsive. If the heart beat fails or times out then the connection is
+ * closed immediately and an error returned to the client.
+ * <p>
+ * Once a connection has been established successfully (including the initial
+ * heart beat), this factory will periodically send heart beats on the
+ * connection based on the configured heart beat interval. If the heart beat
+ * times out then the server is assumed to be down and an appropriate
+ * {@link ConnectionException} generated and published to any registered
+ * {@link ConnectionEventListener}s. Note however, that heart beats will only be
+ * sent when the connection is determined to be reasonably idle: there is no
+ * point in sending heart beats if the connection has recently received a
+ * response. A connection is deemed to be idle if no response has been received
+ * during a period equivalent to half the heart beat interval.
+ * <p>
+ * The LDAP protocol specifically precludes clients from performing operations
+ * while bind or startTLS requests are being performed. Likewise, a bind or
+ * startTLS request will cause active operations to be aborted. This factory
+ * coordinates heart beats with bind or startTLS requests, ensuring that they
+ * are not performed concurrently. Specifically, bind and startTLS requests are
+ * queued up while a heart beat is pending, and heart beats are not sent at all
+ * while there are pending bind or startTLS requests.
  */
 final class HeartBeatConnectionFactory implements ConnectionFactory {
+
+    /**
+     * This class is responsible for performing an initial heart beat once a new
+     * connection has been obtained and, if the heart beat succeeds, adapting it
+     * to a {@code ConnectionImpl} and registering it in the table of valid
+     * connections.
+     */
+    private final class ConnectionFutureResultImpl {
+
+        /**
+         * This class handles the initial heart beat result notification or
+         * timeout. We need to take care to avoid processing multiple results,
+         * which may occur when the heart beat is timed out and a result follows
+         * soon after, or vice versa.
+         */
+        private final class InitialHeartBeatResultHandler implements SearchResultHandler, Runnable {
+            private final ResultHandler<? super Result> handler;
+
+            /**
+             * Due to a potential race between the heart beat timing out and the
+             * heart beat completing this atomic ensures that notification only
+             * occurs once.
+             */
+            private final AtomicBoolean isComplete = new AtomicBoolean();
+
+            private InitialHeartBeatResultHandler(final ResultHandler<? super Result> handler) {
+                this.handler = handler;
+            }
+
+            @Override
+            public boolean handleEntry(final SearchResultEntry entry) {
+                /*
+                 * Depending on the configuration, a heartbeat may return some
+                 * entries. However, we can just ignore them.
+                 */
+                return true;
+            }
+
+            @Override
+            public void handleErrorResult(final ErrorResultException error) {
+                if (isComplete.compareAndSet(false, true)) {
+                    handler.handleErrorResult(error);
+                }
+            }
+
+            @Override
+            public boolean handleReference(final SearchResultReference reference) {
+                /*
+                 * Depending on the configuration, a heartbeat may return some
+                 * references. However, we can just ignore them.
+                 */
+                return true;
+            }
+
+            @Override
+            public void handleResult(final Result result) {
+                if (isComplete.compareAndSet(false, true)) {
+                    handler.handleResult(result);
+                }
+            }
+
+            /*
+             * Invoked by the scheduler when the heart beat times out.
+             */
+            @Override
+            public void run() {
+                handleErrorResult(newHeartBeatTimeoutError());
+            }
+        }
+
+        private Connection connection;
+        private final RecursiveFutureResult<Connection, Result> futureConnectionResult;
+        private final FutureResultTransformer<Result, Connection> futureSearchResult;
+
+        private ConnectionFutureResultImpl(final ResultHandler<? super Connection> handler) {
+            // Create a future which will handle the initial heart beat result.
+            this.futureSearchResult = new FutureResultTransformer<Result, Connection>(handler) {
+
+                @Override
+                protected ErrorResultException transformErrorResult(
+                        final ErrorResultException errorResult) {
+                    // Ensure that the connection is closed.
+                    if (connection != null) {
+                        connection.close();
+                        connection = null;
+                    }
+                    return adaptHeartBeatError(errorResult);
+                }
+
+                @Override
+                protected Connection transformResult(final Result result)
+                        throws ErrorResultException {
+                    return adaptConnection(connection);
+                }
+
+            };
+
+            // Create a future which will handle connection result.
+            this.futureConnectionResult =
+                    new RecursiveFutureResult<Connection, Result>(futureSearchResult) {
+
+                        @Override
+                        protected FutureResult<? extends Result> chainResult(
+                                final Connection innerResult,
+                                final ResultHandler<? super Result> handler)
+                                throws ErrorResultException {
+                            // Save the connection for later once the heart beat completes.
+                            connection = innerResult;
+
+                            /*
+                             * Send the initial heart beat and schedule a client
+                             * side timeout notification.
+                             */
+                            final InitialHeartBeatResultHandler wrappedHandler =
+                                    new InitialHeartBeatResultHandler(handler);
+                            scheduler.get().schedule(wrappedHandler, timeoutMS,
+                                    TimeUnit.MILLISECONDS);
+                            return connection.searchAsync(heartBeatRequest, null, wrappedHandler);
+                        }
+                    };
+
+            // Link the two futures.
+            futureSearchResult.setFutureResult(futureConnectionResult);
+        }
+
+    }
+
     /**
      * A connection that sends heart beats and supports all operations.
      */
-    private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements
-            ConnectionEventListener, SearchResultHandler {
+    private final class ConnectionImpl extends AbstractAsynchronousConnection implements
+            ConnectionEventListener {
+
+        /**
+         * A result handler wrapper for operations which timestamps the
+         * connection for each response received. The wrapper ensures that
+         * completed requests are removed from the {@code pendingResults} queue,
+         * as well as ensuring that requests are only completed once.
+         */
+        private abstract class AbstractWrappedResultHandler<R, H extends ResultHandler<? super R>>
+                implements ResultHandler<R>, FutureResult<R> {
+            /** The user provided result handler. */
+            protected final H handler;
+
+            private final CountDownLatch completed = new CountDownLatch(1);
+            private ErrorResultException error;
+            private FutureResult<R> innerFuture;
+            private R result;
+
+            AbstractWrappedResultHandler(final H handler) {
+                this.handler = handler;
+            }
+
+            @Override
+            public boolean cancel(final boolean mayInterruptIfRunning) {
+                return innerFuture.cancel(mayInterruptIfRunning);
+            }
+
+            @Override
+            public R get() throws ErrorResultException, InterruptedException {
+                completed.await();
+                return get0();
+            }
+
+            @Override
+            public R get(final long timeout, final TimeUnit unit) throws ErrorResultException,
+                    TimeoutException, InterruptedException {
+                if (completed.await(timeout, unit)) {
+                    return get0();
+                } else {
+                    throw new TimeoutException();
+                }
+            }
+
+            @Override
+            public int getRequestID() {
+                return innerFuture.getRequestID();
+            }
+
+            @Override
+            public void handleErrorResult(final ErrorResultException error) {
+                if (tryComplete(null, error)) {
+                    if (handler != null) {
+                        handler.handleErrorResult(timestamp(error));
+                    } else {
+                        timestamp(error);
+                    }
+                }
+            }
+
+            @Override
+            public void handleResult(final R result) {
+                if (tryComplete(result, null)) {
+                    if (handler != null) {
+                        handler.handleResult(timestamp(result));
+                    } else {
+                        timestamp(result);
+                    }
+                }
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return innerFuture.isCancelled();
+            }
+
+            @Override
+            public boolean isDone() {
+                return completed.getCount() == 0;
+            }
+
+            abstract void releaseBindOrStartTLSLockIfNeeded();
+
+            FutureResult<R> setInnerFuture(final FutureResult<R> innerFuture) {
+                this.innerFuture = innerFuture;
+                return this;
+            }
+
+            private R get0() throws ErrorResultException {
+                if (result != null) {
+                    return result;
+                } else {
+                    throw error;
+                }
+            }
+
+            /**
+             * Attempts to complete this request, returning true if successful.
+             * This method is synchronized in order to avoid race conditions
+             * with search result processing.
+             */
+            private synchronized boolean tryComplete(final R result,
+                    final ErrorResultException error) {
+                if (pendingResults.remove(this)) {
+                    this.result = result;
+                    this.error = error;
+                    completed.countDown();
+                    releaseBindOrStartTLSLockIfNeeded();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+        }
 
         /**
          * Runs pending request once the shared lock becomes available (when no
@@ -120,161 +391,246 @@
                 }
                 return null;
             }
-
         }
 
-        /*
-         * List of pending Bind or StartTLS requests which must be invoked when
+        /**
+         * A result handler wrapper for bind or startTLS requests which releases
+         * the bind/startTLS lock on completion.
+         */
+        private final class WrappedBindOrStartTLSResultHandler<R> extends
+                AbstractWrappedResultHandler<R, ResultHandler<? super R>> {
+            WrappedBindOrStartTLSResultHandler(final ResultHandler<? super R> handler) {
+                super(handler);
+            }
+
+            @Override
+            void releaseBindOrStartTLSLockIfNeeded() {
+                releaseBindOrStartTLSLock();
+            }
+        }
+
+        /**
+         * A result handler wrapper for normal requests which does not release
+         * the bind/startTLS lock on completion.
+         */
+        private final class WrappedResultHandler<R> extends
+                AbstractWrappedResultHandler<R, ResultHandler<? super R>> {
+            WrappedResultHandler(final ResultHandler<? super R> handler) {
+                super(handler);
+            }
+
+            @Override
+            void releaseBindOrStartTLSLockIfNeeded() {
+                // No-op for normal operations.
+            }
+        }
+
+        /**
+         * A result handler wrapper for search operations. Ensures that search
+         * results are not sent once the request has been completed (see
+         * markComplete()).
+         */
+        private final class WrappedSearchResultHandler extends
+                AbstractWrappedResultHandler<Result, SearchResultHandler> implements
+                SearchResultHandler {
+            WrappedSearchResultHandler(final SearchResultHandler handler) {
+                super(handler);
+            }
+
+            @Override
+            public synchronized boolean handleEntry(final SearchResultEntry entry) {
+                if (!isDone()) {
+                    if (handler != null) {
+                        handler.handleEntry(timestamp(entry));
+                    } else {
+                        timestamp(entry);
+                    }
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            @Override
+            public synchronized boolean handleReference(final SearchResultReference reference) {
+                if (!isDone()) {
+                    if (handler != null) {
+                        handler.handleReference(timestamp(reference));
+                    } else {
+                        timestamp(reference);
+                    }
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            @Override
+            void releaseBindOrStartTLSLockIfNeeded() {
+                // No-op for normal operations.
+            }
+        }
+
+        /** The wrapped connection. */
+        private final Connection connection;
+
+        /**
+         * Search result handler for processing heart beat responses.
+         */
+        private final SearchResultHandler heartBeatHandler = new SearchResultHandler() {
+            @Override
+            public boolean handleEntry(final SearchResultEntry entry) {
+                timestamp(entry);
+                return true;
+            }
+
+            @Override
+            public void handleErrorResult(final ErrorResultException error) {
+                /*
+                 * Connection failure will be handled by connection event
+                 * listener. Ignore cancellation errors since these indicate
+                 * that the heart beat was aborted by a client-side close.
+                 */
+                if (!(error instanceof CancelledResultException)) {
+                    if (DEBUG_LOG.isLoggable(Level.WARNING)) {
+                        DEBUG_LOG.warning(String.format(
+                                "Heartbeat failed for connection factory '%s': %s", factory, error
+                                        .getMessage()));
+                    }
+                    timestamp(error);
+                }
+                releaseHeartBeatLock();
+            }
+
+            @Override
+            public boolean handleReference(final SearchResultReference reference) {
+                timestamp(reference);
+                return true;
+            }
+
+            @Override
+            public void handleResult(final Result result) {
+                timestamp(result);
+                releaseHeartBeatLock();
+            }
+        };
+
+        /**
+         * List of pending Bind or StartTLS requests which must be invoked once
          * the current heart beat completes.
          */
-        private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
+        private final Queue<Runnable> pendingBindOrStartTLSRequests =
+                new ConcurrentLinkedQueue<Runnable>();
 
-        /* Coordinates heart-beats with Bind and StartTLS requests. */
+        /**
+         * List of pending responses for all active operations. These will be
+         * signalled if no heart beat is detected within the permitted timeout
+         * period.
+         */
+        private final Queue<AbstractWrappedResultHandler<?, ?>> pendingResults =
+                new ConcurrentLinkedQueue<AbstractWrappedResultHandler<?, ?>>();
+
+        /** Internal connection state. */
+        private final ConnectionState state = new ConnectionState();
+
+        /** Coordinates heart-beats with Bind and StartTLS requests. */
         private final Sync sync = new Sync();
 
-        /*
+        /**
          * Timestamp of last response received (any response, not just heart
          * beats).
          */
-        private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
+        private volatile long lastResponseTimestamp = timeSource.currentTimeMillis(); // Assume valid at creation.
 
         private ConnectionImpl(final Connection connection) {
-            super(connection);
+            this.connection = connection;
+            connection.addConnectionEventListener(this);
         }
 
         @Override
-        public Result add(final AddRequest request) throws ErrorResultException {
-            try {
-                return timestamp(connection.add(request));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public Result add(final Entry entry) throws ErrorResultException {
-            try {
-                return timestamp(connection.add(entry));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public Result add(final String... ldifLines) throws ErrorResultException {
-            try {
-                return timestamp(connection.add(ldifLines));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
+        public FutureResult<Void> abandonAsync(final AbandonRequest request) {
+            return connection.abandonAsync(request);
         }
 
         @Override
         public FutureResult<Result> addAsync(final AddRequest request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final ResultHandler<? super Result> resultHandler) {
-            return connection.addAsync(request, intermediateResponseHandler,
-                    timestamper(resultHandler));
-        }
-
-        @Override
-        public BindResult bind(final BindRequest request) throws ErrorResultException {
-            acquireBindOrStartTLSLock();
-            try {
-                return timestamp(connection.bind(request));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            } finally {
-                releaseBindOrStartTLSLock();
+            if (checkState(resultHandler)) {
+                final WrappedResultHandler<Result> h = wrap(resultHandler);
+                return checkState(connection.addAsync(request, intermediateResponseHandler, h), h);
+            } else {
+                return newConnectionErrorFuture();
             }
         }
 
         @Override
-        public BindResult bind(final String name, final char[] password)
-                throws ErrorResultException {
-            acquireBindOrStartTLSLock();
-            try {
-                return timestamp(connection.bind(name, password));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            } finally {
-                releaseBindOrStartTLSLock();
-            }
+        public void addConnectionEventListener(final ConnectionEventListener listener) {
+            state.addConnectionEventListener(listener);
         }
 
         @Override
         public FutureResult<BindResult> bindAsync(final BindRequest request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final ResultHandler<? super BindResult> resultHandler) {
-            if (sync.tryLockShared()) {
-                // Fast path
-                return connection.bindAsync(request, intermediateResponseHandler, timestamper(
-                        resultHandler, true));
+            if (checkState(resultHandler)) {
+                if (sync.tryLockShared()) {
+                    // Fast path
+                    final WrappedBindOrStartTLSResultHandler<BindResult> h =
+                            wrapForBindOrStartTLS(resultHandler);
+                    return checkState(
+                            connection.bindAsync(request, intermediateResponseHandler, h), h);
+                } else {
+                    /*
+                     * A heart beat must be in progress so create a runnable
+                     * task which will be executed when the heart beat
+                     * completes.
+                     */
+                    final DelayedFuture<BindResult> future =
+                            new DelayedFuture<BindResult>(resultHandler) {
+                                @Override
+                                public FutureResult<BindResult> dispatch() {
+                                    final WrappedBindOrStartTLSResultHandler<BindResult> h =
+                                            wrapForBindOrStartTLS(this);
+                                    return checkState(connection.bindAsync(request,
+                                            intermediateResponseHandler, h), h);
+                                }
+                            };
+                    /*
+                     * Enqueue and flush if the heart beat has completed in the
+                     * mean time.
+                     */
+                    pendingBindOrStartTLSRequests.offer(future);
+                    flushPendingBindOrStartTLSRequests();
+                    return future;
+                }
             } else {
-                /*
-                 * A heart beat must be in progress so create a runnable task
-                 * which will be executed when the heart beat completes.
-                 */
-                final DelayedFuture<BindResult> future =
-                        new DelayedFuture<BindResult>(resultHandler) {
-                            @Override
-                            public FutureResult<BindResult> dispatch() {
-                                return connection.bindAsync(request, intermediateResponseHandler,
-                                        timestamper(this, true));
-                            }
-                        };
-                /*
-                 * Enqueue and flush if the heart beat has completed in the mean
-                 * time.
-                 */
-                pendingRequests.offer(future);
-                flushPendingRequests();
-                return future;
+                return newConnectionErrorFuture();
             }
         }
 
         @Override
-        public CompareResult compare(final CompareRequest request) throws ErrorResultException {
-            try {
-                return timestamp(connection.compare(request));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
+        public void close() {
+            handleConnectionClosed();
+            connection.close();
         }
 
         @Override
-        public CompareResult compare(final String name, final String attributeDescription,
-                final String assertionValue) throws ErrorResultException {
-            try {
-                return timestamp(connection.compare(name, attributeDescription, assertionValue));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
+        public void close(final UnbindRequest request, final String reason) {
+            handleConnectionClosed();
+            connection.close(request, reason);
         }
 
         @Override
         public FutureResult<CompareResult> compareAsync(final CompareRequest request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final ResultHandler<? super CompareResult> resultHandler) {
-            return connection.compareAsync(request, intermediateResponseHandler,
-                    timestamper(resultHandler));
-        }
-
-        @Override
-        public Result delete(final DeleteRequest request) throws ErrorResultException {
-            try {
-                return timestamp(connection.delete(request));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public Result delete(final String name) throws ErrorResultException {
-            try {
-                return timestamp(connection.delete(name));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
+            if (checkState(resultHandler)) {
+                final WrappedResultHandler<CompareResult> h = wrap(resultHandler);
+                return checkState(connection.compareAsync(request, intermediateResponseHandler, h),
+                        h);
+            } else {
+                return newConnectionErrorFuture();
             }
         }
 
@@ -282,61 +638,12 @@
         public FutureResult<Result> deleteAsync(final DeleteRequest request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final ResultHandler<? super Result> resultHandler) {
-            return connection.deleteAsync(request, intermediateResponseHandler,
-                    timestamper(resultHandler));
-        }
-
-        @Override
-        public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request)
-                throws ErrorResultException {
-            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
-            if (isStartTLS) {
-                acquireBindOrStartTLSLock();
-            }
-            try {
-                return timestamp(connection.extendedRequest(request));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            } finally {
-                if (isStartTLS) {
-                    releaseBindOrStartTLSLock();
-                }
-            }
-        }
-
-        @Override
-        public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request,
-                final IntermediateResponseHandler handler) throws ErrorResultException {
-            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
-            if (isStartTLS) {
-                acquireBindOrStartTLSLock();
-            }
-            try {
-                return timestamp(connection.extendedRequest(request, handler));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            } finally {
-                if (isStartTLS) {
-                    releaseBindOrStartTLSLock();
-                }
-            }
-        }
-
-        @Override
-        public GenericExtendedResult extendedRequest(final String requestName,
-                final ByteString requestValue) throws ErrorResultException {
-            final boolean isStartTLS = requestName.equals(StartTLSExtendedRequest.OID);
-            if (isStartTLS) {
-                acquireBindOrStartTLSLock();
-            }
-            try {
-                return timestamp(connection.extendedRequest(requestName, requestValue));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            } finally {
-                if (isStartTLS) {
-                    releaseBindOrStartTLSLock();
-                }
+            if (checkState(resultHandler)) {
+                final WrappedResultHandler<Result> h = wrap(resultHandler);
+                return checkState(connection.deleteAsync(request, intermediateResponseHandler, h),
+                        h);
+            } else {
+                return newConnectionErrorFuture();
             }
         }
 
@@ -345,129 +652,101 @@
                 final ExtendedRequest<R> request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final ResultHandler<? super R> resultHandler) {
-            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
-            if (isStartTLS) {
-                if (sync.tryLockShared()) {
-                    // Fast path
-                    return connection.extendedRequestAsync(request, intermediateResponseHandler,
-                            timestamper(resultHandler, true));
-                } else {
-                    /*
-                     * A heart beat must be in progress so create a runnable
-                     * task which will be executed when the heart beat
-                     * completes.
-                     */
-                    final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
-                        @Override
-                        public FutureResult<R> dispatch() {
-                            return connection.extendedRequestAsync(request,
-                                    intermediateResponseHandler, timestamper(this, true));
-                        }
-                    };
+            if (checkState(resultHandler)) {
+                if (isStartTLSRequest(request)) {
+                    if (sync.tryLockShared()) {
+                        // Fast path
+                        final WrappedBindOrStartTLSResultHandler<R> h =
+                                wrapForBindOrStartTLS(resultHandler);
+                        return checkState(connection.extendedRequestAsync(request,
+                                intermediateResponseHandler, h), h);
+                    } else {
+                        /*
+                         * A heart beat must be in progress so create a runnable
+                         * task which will be executed when the heart beat
+                         * completes.
+                         */
+                        final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
+                            @Override
+                            public FutureResult<R> dispatch() {
+                                final WrappedBindOrStartTLSResultHandler<R> h =
+                                        wrapForBindOrStartTLS(this);
+                                return checkState(connection.extendedRequestAsync(request,
+                                        intermediateResponseHandler, h), h);
+                            }
+                        };
 
-                    /*
-                     * Enqueue and flush if the heart beat has completed in the
-                     * mean time.
-                     */
-                    pendingRequests.offer(future);
-                    flushPendingRequests();
-                    return future;
+                        /*
+                         * Enqueue and flush if the heart beat has completed in
+                         * the mean time.
+                         */
+                        pendingBindOrStartTLSRequests.offer(future);
+                        flushPendingBindOrStartTLSRequests();
+                        return future;
+                    }
+                } else {
+                    final WrappedResultHandler<R> h = wrap(resultHandler);
+                    return checkState(connection.extendedRequestAsync(request,
+                            intermediateResponseHandler, h), h);
                 }
             } else {
-                return connection.extendedRequestAsync(request, intermediateResponseHandler,
-                        timestamper(resultHandler));
+                return newConnectionErrorFuture();
             }
         }
 
         @Override
         public void handleConnectionClosed() {
-            notifyClosed();
+            if (state.notifyConnectionClosed()) {
+                failPendingResults(newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED,
+                        HBCF_CONNECTION_CLOSED_BY_CLIENT.get()));
+                synchronized (validConnections) {
+                    connection.removeConnectionEventListener(this);
+                    validConnections.remove(this);
+                    if (validConnections.isEmpty()) {
+                        /*
+                         * This is the last active connection, so stop the
+                         * heartbeat.
+                         */
+                        heartBeatFuture.cancel(false);
+                    }
+                }
+            }
         }
 
         @Override
         public void handleConnectionError(final boolean isDisconnectNotification,
                 final ErrorResultException error) {
-            notifyClosed();
-        }
-
-        @Override
-        public boolean handleEntry(final SearchResultEntry entry) {
-            updateTimestamp();
-            return true;
-        }
-
-        @Override
-        public void handleErrorResult(final ErrorResultException error) {
-            if (DEBUG_LOG.isLoggable(Level.FINE)) {
-                DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage()));
+            if (state.notifyConnectionError(isDisconnectNotification, error)) {
+                failPendingResults(error);
             }
-            updateTimestamp();
-            releaseHeartBeatLock();
-        }
-
-        @Override
-        public boolean handleReference(final SearchResultReference reference) {
-            updateTimestamp();
-            return true;
-        }
-
-        @Override
-        public void handleResult(final Result result) {
-            updateTimestamp();
-            releaseHeartBeatLock();
         }
 
         @Override
         public void handleUnsolicitedNotification(final ExtendedResult notification) {
-            updateTimestamp();
+            timestamp(notification);
+            state.notifyUnsolicitedNotification(notification);
+        }
+
+        @Override
+        public boolean isClosed() {
+            return state.isClosed();
         }
 
         @Override
         public boolean isValid() {
-            return connection.isValid() && currentTimeMillis() < (timestamp + timeoutMS);
-        }
-
-        @Override
-        public Result modify(final ModifyRequest request) throws ErrorResultException {
-            try {
-                return timestamp(connection.modify(request));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public Result modify(final String... ldifLines) throws ErrorResultException {
-            try {
-                return timestamp(connection.modify(ldifLines));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
+            return state.isValid() && connection.isValid();
         }
 
         @Override
         public FutureResult<Result> modifyAsync(final ModifyRequest request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final ResultHandler<? super Result> resultHandler) {
-            return connection.modifyAsync(request, intermediateResponseHandler,
-                    timestamper(resultHandler));
-        }
-
-        @Override
-        public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException {
-            try {
-                return timestamp(connection.modifyDN(request));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public Result modifyDN(final String name, final String newRDN) throws ErrorResultException {
-            try {
-                return timestamp(connection.modifyDN(name, newRDN));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
+            if (checkState(resultHandler)) {
+                final WrappedResultHandler<Result> h = wrap(resultHandler);
+                return checkState(connection.modifyAsync(request, intermediateResponseHandler, h),
+                        h);
+            } else {
+                return newConnectionErrorFuture();
             }
         }
 
@@ -475,121 +754,34 @@
         public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final ResultHandler<? super Result> resultHandler) {
-            return connection.modifyDNAsync(request, intermediateResponseHandler,
-                    timestamper(resultHandler));
-        }
-
-        @Override
-        public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions)
-                throws ErrorResultException {
-            try {
-                return timestamp(connection.readEntry(name, attributeDescriptions));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
+            if (checkState(resultHandler)) {
+                final WrappedResultHandler<Result> h = wrap(resultHandler);
+                return checkState(
+                        connection.modifyDNAsync(request, intermediateResponseHandler, h), h);
+            } else {
+                return newConnectionErrorFuture();
             }
         }
 
         @Override
-        public SearchResultEntry readEntry(final String name, final String... attributeDescriptions)
-                throws ErrorResultException {
-            try {
-                return timestamp(connection.readEntry(name, attributeDescriptions));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
-                final Collection<String> attributeDescriptions,
-                final ResultHandler<? super SearchResultEntry> handler) {
-            return connection.readEntryAsync(name, attributeDescriptions, timestamper(handler));
-        }
-
-        @Override
-        public ConnectionEntryReader search(final SearchRequest request) {
-            // Ensure that search results update timestamp.
-            return new ConnectionEntryReader(this, request);
-        }
-
-        @Override
-        public Result search(final SearchRequest request,
-                final Collection<? super SearchResultEntry> entries) throws ErrorResultException {
-            try {
-                return timestamp(connection.search(request, entries));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public Result search(final SearchRequest request,
-                final Collection<? super SearchResultEntry> entries,
-                final Collection<? super SearchResultReference> references)
-                throws ErrorResultException {
-            try {
-                return timestamp(connection.search(request, entries, references));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public Result search(final SearchRequest request, final SearchResultHandler handler)
-                throws ErrorResultException {
-            try {
-                return connection.search(request, timestamper(handler));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public ConnectionEntryReader search(final String baseObject, final SearchScope scope,
-                final String filter, final String... attributeDescriptions) {
-            // Ensure that search results update timestamp.
-            final SearchRequest request =
-                    Requests.newSearchRequest(baseObject, scope, filter, attributeDescriptions);
-            return new ConnectionEntryReader(this, request);
+        public void removeConnectionEventListener(final ConnectionEventListener listener) {
+            state.removeConnectionEventListener(listener);
         }
 
         @Override
         public FutureResult<Result> searchAsync(final SearchRequest request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final SearchResultHandler resultHandler) {
-            return connection.searchAsync(request, intermediateResponseHandler,
-                    timestamper(resultHandler));
-        }
-
-        @Override
-        public SearchResultEntry searchSingleEntry(final SearchRequest request)
-                throws ErrorResultException {
-            try {
-                return timestamp(connection.searchSingleEntry(request));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
+            if (checkState(resultHandler)) {
+                final WrappedSearchResultHandler h = wrap(resultHandler);
+                return checkState(connection.searchAsync(request, intermediateResponseHandler, h),
+                        h);
+            } else {
+                return newConnectionErrorFuture();
             }
         }
 
         @Override
-        public SearchResultEntry searchSingleEntry(final String baseObject,
-                final SearchScope scope, final String filter, final String... attributeDescriptions)
-                throws ErrorResultException {
-            try {
-                return timestamp(connection.searchSingleEntry(baseObject, scope, filter,
-                        attributeDescriptions));
-            } catch (final ErrorResultException e) {
-                throw timestamp(e);
-            }
-        }
-
-        @Override
-        public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request,
-                final ResultHandler<? super SearchResultEntry> handler) {
-            return connection.searchSingleEntryAsync(request, timestamper(handler));
-        }
-
-        @Override
         public String toString() {
             final StringBuilder builder = new StringBuilder();
             builder.append("HeartBeatConnection(");
@@ -598,24 +790,57 @@
             return builder.toString();
         }
 
-        private void acquireBindOrStartTLSLock() throws ErrorResultException {
-            /*
-             * Wait for pending heartbeats and prevent new heartbeats from being
-             * sent while the bind is in progress.
-             */
-            try {
-                if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
-                    // Give up - it looks like the connection is dead.
-                    // FIXME: improve error message.
-                    throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
+        private void checkForHeartBeat() {
+            if (sync.isHeldExclusively()) {
+                /*
+                 * A heart beat is still in progress, but it should have
+                 * completed by now. Let's avoid aggressively terminating the
+                 * connection, because the heart beat may simply have been
+                 * delayed by a sudden surge of activity. Therefore, only flag
+                 * the connection as failed if no activity has been seen on the
+                 * connection since the heart beat was sent.
+                 */
+                final long currentTimeMillis = timeSource.currentTimeMillis();
+                if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) {
+                    if (DEBUG_LOG.isLoggable(Level.WARNING)) {
+                        DEBUG_LOG.warning(String.format(
+                                "No heartbeat detected for connection '%s'", connection));
+                    }
+                    handleConnectionError(false, newHeartBeatTimeoutError());
                 }
-            } catch (final InterruptedException e) {
-                throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
             }
         }
 
-        private void flushPendingRequests() {
-            if (!pendingRequests.isEmpty()) {
+        private <R> FutureResult<R> checkState(final FutureResult<R> future,
+                final AbstractWrappedResultHandler<R, ? extends ResultHandler<? super R>> h) {
+            h.setInnerFuture(future);
+            checkState(h);
+            return h;
+        }
+
+        private boolean checkState(final ResultHandler<?> h) {
+            final ErrorResultException error = state.getConnectionError();
+            if (error != null) {
+                h.handleErrorResult(error);
+                return false;
+            } else {
+                return true;
+            }
+        }
+
+        private void failPendingResults(final ErrorResultException error) {
+            /*
+             * Peek instead of pool because notification is responsible for
+             * removing the element from the queue.
+             */
+            AbstractWrappedResultHandler<?, ?> pendingResult;
+            while ((pendingResult = pendingResults.peek()) != null) {
+                pendingResult.handleErrorResult(error);
+            }
+        }
+
+        private void flushPendingBindOrStartTLSRequests() {
+            if (!pendingBindOrStartTLSRequests.isEmpty()) {
                 /*
                  * The pending requests will acquire the shared lock, but we
                  * take it here anyway to ensure that pending requests do not
@@ -624,7 +849,8 @@
                 if (sync.tryLockShared()) {
                     try {
                         Runnable pendingRequest;
-                        while ((pendingRequest = pendingRequests.poll()) != null) {
+                        while ((pendingRequest = pendingBindOrStartTLSRequests.poll()) != null) {
+                            // Dispatch the waiting request. This will not block.
                             pendingRequest.run();
                         }
                     } finally {
@@ -634,18 +860,12 @@
             }
         }
 
-        private void notifyClosed() {
-            synchronized (activeConnections) {
-                connection.removeConnectionEventListener(this);
-                activeConnections.remove(this);
-                if (activeConnections.isEmpty()) {
-                    /*
-                     * This is the last active connection, so stop the
-                     * heartbeat.
-                     */
-                    heartBeatFuture.cancel(false);
-                }
-            }
+        private boolean isStartTLSRequest(final ExtendedRequest<?> request) {
+            return request.getOID().equals(StartTLSExtendedRequest.OID);
+        }
+
+        private <R> CompletedFutureResult<R> newConnectionErrorFuture() {
+            return new CompletedFutureResult<R>(state.getConnectionError());
         }
 
         private void releaseBindOrStartTLSLock() {
@@ -654,27 +874,43 @@
 
         private void releaseHeartBeatLock() {
             sync.unlockExclusively();
-            flushPendingRequests();
+            flushPendingBindOrStartTLSRequests();
         }
 
-        private void sendHeartBeat() {
+        /**
+         * Sends a heart beat on this connection if required to do so.
+         *
+         * @return {@code true} if a heart beat was sent, otherwise
+         *         {@code false}.
+         */
+        private boolean sendHeartBeat() {
             /*
-             * Only send the heartbeat if the connection has been idle for some
+             * Don't attempt to send a heart beat if the connection has already
+             * failed.
+             */
+            if (!state.isValid()) {
+                return false;
+            }
+
+            /*
+             * Only send the heart beat if the connection has been idle for some
              * time.
              */
-            if (currentTimeMillis() < (timestamp + minDelayMS)) {
-                return;
+            final long currentTimeMillis = timeSource.currentTimeMillis();
+            if (currentTimeMillis < (lastResponseTimestamp + minDelayMS)) {
+                return false;
             }
 
             /*
              * Don't send a heart beat if there is already a heart beat, bind,
              * or startTLS in progress. Note that the bind/startTLS response
-             * will update the timestamp as if it were a heart beat.
+             * will update the lastResponseTimestamp as if it were a heart beat.
              */
             if (sync.tryLockExclusively()) {
                 try {
-                    connection.searchAsync(heartBeatRequest, null, this);
-                } catch (final Exception e) {
+                    connection.searchAsync(heartBeatRequest, null, heartBeatHandler);
+                    return true;
+                } catch (final IllegalStateException e) {
                     /*
                      * This may happen when we attempt to send the heart beat
                      * just after the connection is closed but before we are
@@ -688,74 +924,34 @@
                     releaseHeartBeatLock();
                 }
             }
+            return false;
         }
 
         private <R> R timestamp(final R response) {
-            updateTimestamp();
+            if (!(response instanceof ConnectionException)) {
+                lastResponseTimestamp = timeSource.currentTimeMillis();
+            }
             return response;
         }
 
-        private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler) {
-            return timestamper(handler, false);
+        private <R> WrappedResultHandler<R> wrap(final ResultHandler<? super R> handler) {
+            final WrappedResultHandler<R> h = new WrappedResultHandler<R>(handler);
+            pendingResults.add(h);
+            return h;
         }
 
-        private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler,
-                final boolean isBindOrStartTLS) {
-            return new ResultHandler<R>() {
-                @Override
-                public void handleErrorResult(final ErrorResultException error) {
-                    releaseIfNeeded();
-                    if (handler != null) {
-                        handler.handleErrorResult(timestamp(error));
-                    } else {
-                        timestamp(error);
-                    }
-                }
-
-                @Override
-                public void handleResult(final R result) {
-                    releaseIfNeeded();
-                    if (handler != null) {
-                        handler.handleResult(timestamp(result));
-                    } else {
-                        timestamp(result);
-                    }
-                }
-
-                private void releaseIfNeeded() {
-                    if (isBindOrStartTLS) {
-                        releaseBindOrStartTLSLock();
-                    }
-                }
-            };
+        private WrappedSearchResultHandler wrap(final SearchResultHandler handler) {
+            final WrappedSearchResultHandler h = new WrappedSearchResultHandler(handler);
+            pendingResults.add(h);
+            return h;
         }
 
-        private SearchResultHandler timestamper(final SearchResultHandler handler) {
-            return new SearchResultHandler() {
-                @Override
-                public boolean handleEntry(final SearchResultEntry entry) {
-                    return handler.handleEntry(timestamp(entry));
-                }
-
-                @Override
-                public void handleErrorResult(final ErrorResultException error) {
-                    handler.handleErrorResult(timestamp(error));
-                }
-
-                @Override
-                public boolean handleReference(final SearchResultReference reference) {
-                    return handler.handleReference(timestamp(reference));
-                }
-
-                @Override
-                public void handleResult(final Result result) {
-                    handler.handleResult(timestamp(result));
-                }
-            };
-        }
-
-        private void updateTimestamp() {
-            timestamp = currentTimeMillis();
+        private <R> WrappedBindOrStartTLSResultHandler<R> wrapForBindOrStartTLS(
+                final ResultHandler<? super R> handler) {
+            final WrappedBindOrStartTLSResultHandler<R> h =
+                    new WrappedBindOrStartTLSResultHandler<R>(handler);
+            pendingResults.add(h);
+            return h;
         }
     }
 
@@ -791,13 +987,13 @@
      * </ul>
      */
     private static final class Sync extends AbstractQueuedSynchronizer {
-        /* Lock states. Positive values indicate that the shared lock is taken. */
-        private static final int UNLOCKED = 0; // initial state
         private static final int LOCKED_EXCLUSIVELY = -1;
-
         // Keep compiler quiet.
         private static final long serialVersionUID = -3590428415442668336L;
 
+        /* Lock states. Positive values indicate that the shared lock is taken. */
+        private static final int UNLOCKED = 0; // initial state
+
         @Override
         protected boolean isHeldExclusively() {
             return getState() == LOCKED_EXCLUSIVELY;
@@ -866,10 +1062,6 @@
             return tryAcquireShared(1) > 0;
         }
 
-        boolean tryLockShared(final long timeout, final TimeUnit unit) throws InterruptedException {
-            return tryAcquireSharedNanos(1, unit.toNanos(timeout));
-        }
-
         void unlockExclusively() {
             release(0 /* unused */);
         }
@@ -880,60 +1072,122 @@
 
     }
 
+    /**
+     * Default heart beat which will target the root DSE but not return any
+     * results.
+     */
     private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("",
-            SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
+            SearchScope.BASE_OBJECT, "(objectClass=1.1)", "1.1");
 
-    private final List<ConnectionImpl> activeConnections;
+    /**
+     * This is package private in order to allow unit tests to inject fake time
+     * stamps.
+     */
+    TimeSource timeSource = TimeSource.DEFAULT;
+
+    /**
+     * Scheduled task which checks that all heart beats have been received
+     * within the timeout period.
+     */
+    private final Runnable checkHeartBeatRunnable = new Runnable() {
+        @Override
+        public void run() {
+            for (final ConnectionImpl connection : getValidConnections()) {
+                connection.checkForHeartBeat();
+            }
+        }
+    };
+
+    /**
+     * Underlying connection factory.
+     */
     private final ConnectionFactory factory;
+
+    /**
+     * The heartbeat scheduled future - which may be null if heartbeats are not
+     * being sent (no valid connections).
+     */
     private ScheduledFuture<?> heartBeatFuture;
+
+    /**
+     * The heartbeat search request.
+     */
     private final SearchRequest heartBeatRequest;
+
+    /**
+     * The interval between successive heartbeats.
+     */
     private final long interval;
+    private final TimeUnit intervalUnit;
+
+    /**
+     * Flag which indicates whether this factory has been closed.
+     */
+    private final AtomicBoolean isClosed = new AtomicBoolean();
+
+    /**
+     * The minimum amount of time the connection should remain idle (no
+     * responses) before starting to send heartbeats.
+     */
     private final long minDelayMS;
+    /**
+     * The heartbeat scheduler.
+     */
     private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
+
+    /**
+     * Scheduled task which sends heart beats for all valid idle connections.
+     */
+    private final Runnable sendHeartBeatRunnable = new Runnable() {
+        @Override
+        public void run() {
+            boolean heartBeatSent = false;
+            for (final ConnectionImpl connection : getValidConnections()) {
+                heartBeatSent |= connection.sendHeartBeat();
+            }
+            if (heartBeatSent) {
+                scheduler.get().schedule(checkHeartBeatRunnable, timeoutMS, TimeUnit.MILLISECONDS);
+            }
+        }
+    };
+
+    /**
+     * The heartbeat timeout in milli-seconds. The connection will be marked as
+     * failed if no heartbeat response is received within the timeout.
+     */
     private final long timeoutMS;
-    private final TimeUnit unit;
-    private AtomicBoolean isClosed = new AtomicBoolean();
 
-    HeartBeatConnectionFactory(final ConnectionFactory factory) {
-        this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null);
-    }
+    /**
+     * List of valid connections to which heartbeats will be sent.
+     */
+    private final List<ConnectionImpl> validConnections = new LinkedList<ConnectionImpl>();
 
     HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
-            final TimeUnit unit) {
-        this(factory, interval, unit, DEFAULT_SEARCH, null);
-    }
-
-    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
-            final TimeUnit unit, final SearchRequest heartBeat) {
-        this(factory, interval, unit, heartBeat, null);
-    }
-
-    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
-            final TimeUnit unit, final SearchRequest heartBeat,
+            final long timeout, final TimeUnit unit, final SearchRequest heartBeat,
             final ScheduledExecutorService scheduler) {
-        Validator.ensureNotNull(factory, heartBeat, unit);
-        Validator.ensureTrue(interval >= 0, "negative timeout");
+        Validator.ensureNotNull(factory, unit);
+        Validator.ensureTrue(interval >= 0, "negative interval");
+        Validator.ensureTrue(timeout >= 0, "negative timeout");
 
-        this.heartBeatRequest = heartBeat;
+        this.heartBeatRequest = heartBeat != null ? heartBeat : DEFAULT_SEARCH;
         this.interval = interval;
-        this.unit = unit;
-        this.activeConnections = new LinkedList<ConnectionImpl>();
+        this.intervalUnit = unit;
         this.factory = factory;
         this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
-        this.timeoutMS = unit.toMillis(interval) * 2;
+        this.timeoutMS = unit.toMillis(timeout);
         this.minDelayMS = unit.toMillis(interval) / 2;
     }
 
     @Override
     public void close() {
         if (isClosed.compareAndSet(false, true)) {
-            synchronized (activeConnections) {
-                if (!activeConnections.isEmpty()) {
+            synchronized (validConnections) {
+                if (!validConnections.isEmpty()) {
                     if (DEBUG_LOG.isLoggable(Level.FINE)) {
                         DEBUG_LOG.fine(String.format(
                                 "HeartbeatConnectionFactory '%s' is closing while %d "
-                                        + "active connections remain", toString(),
-                                activeConnections.size()));
+                                        + "active connections remain", toString(), validConnections
+                                        .size()));
                     }
                 }
             }
@@ -944,23 +1198,41 @@
 
     @Override
     public Connection getConnection() throws ErrorResultException {
-        return adaptConnection(factory.getConnection());
+        /*
+         * Immediately send a heart beat in order to determine if the connected
+         * server is responding.
+         */
+        final Connection connection = factory.getConnection();
+        boolean keepConnection = false;
+        try {
+            connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
+                    TimeUnit.MILLISECONDS);
+            keepConnection = true;
+            return adaptConnection(connection);
+        } catch (final Exception e) {
+            throw adaptHeartBeatError(e);
+        } finally {
+            if (!keepConnection) {
+                connection.close();
+            }
+        }
     }
 
     @Override
     public FutureResult<Connection> getConnectionAsync(
             final ResultHandler<? super Connection> handler) {
-        final FutureResultTransformer<Connection, Connection> future =
-                new FutureResultTransformer<Connection, Connection>(handler) {
-                    @Override
-                    protected Connection transformResult(final Connection connection)
-                            throws ErrorResultException {
-                        return adaptConnection(connection);
-                    }
-                };
+        // Create a future responsible for chaining the initial heartbeat search.
+        final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler);
 
-        future.setFutureResult(factory.getConnectionAsync(future));
-        return future;
+        // Request a connection.
+        final FutureResult<Connection> connectionFuture =
+                factory.getConnectionAsync(compositeFuture.futureConnectionResult);
+
+        // Set the connection future in the composite so that the returned search future can delegate.
+        compositeFuture.futureConnectionResult.setFutureResult(connectionFuture);
+
+        // Return the future representing the heartbeat.
+        return compositeFuture.futureSearchResult;
     }
 
     @Override
@@ -973,26 +1245,42 @@
     }
 
     private Connection adaptConnection(final Connection connection) {
-        final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection);
-        synchronized (activeConnections) {
-            connection.addConnectionEventListener(heartBeatConnection);
-            if (activeConnections.isEmpty()) {
+        synchronized (validConnections) {
+            final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection);
+            if (validConnections.isEmpty()) {
                 /* This is the first active connection, so start the heart beat. */
-                heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() {
-                    @Override
-                    public void run() {
-                        final ConnectionImpl[] tmp;
-                        synchronized (activeConnections) {
-                            tmp = activeConnections.toArray(new ConnectionImpl[0]);
-                        }
-                        for (final ConnectionImpl connection : tmp) {
-                            connection.sendHeartBeat();
-                        }
-                    }
-                }, 0, interval, unit);
+                heartBeatFuture =
+                        scheduler.get().scheduleWithFixedDelay(sendHeartBeatRunnable, 0, interval,
+                                intervalUnit);
             }
-            activeConnections.add(heartBeatConnection);
+            validConnections.add(heartBeatConnection);
+            return heartBeatConnection;
         }
-        return heartBeatConnection;
+    }
+
+    private ErrorResultException adaptHeartBeatError(final Exception error) {
+        if (error instanceof ConnectionException) {
+            return (ErrorResultException) error;
+        } else if (error instanceof TimeoutResultException || error instanceof TimeoutException) {
+            return newHeartBeatTimeoutError();
+        } else if (error instanceof InterruptedException) {
+            return newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, error);
+        } else {
+            return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_FAILED.get(),
+                    error);
+        }
+    }
+
+    private ConnectionImpl[] getValidConnections() {
+        final ConnectionImpl[] tmp;
+        synchronized (validConnections) {
+            tmp = validConnections.toArray(new ConnectionImpl[0]);
+        }
+        return tmp;
+    }
+
+    private ErrorResultException newHeartBeatTimeoutError() {
+        return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_TIMEOUT
+                .get(timeoutMS));
     }
 }
diff --git a/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties b/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
index abe82c2..1057f00 100755
--- a/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
+++ b/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
@@ -1418,3 +1418,7 @@
 ERR_ENTRY_INCREMENT_CANNOT_PARSE_AS_INT=Unable to increment the value of \
  attribute '%s' because either the current value or the increment could \
  not be parsed as an integer
+HBCF_CONNECTION_CLOSED_BY_CLIENT=Connection closed by client
+HBCF_HEARTBEAT_FAILED=Heartbeat failed
+HBCF_HEARTBEAT_TIMEOUT=Heartbeat timed out after %d ms
+
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java b/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java
new file mode 100644
index 0000000..18ef073
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/ldap/ConnectionStateTest.java
@@ -0,0 +1,250 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS.
+ */
+package com.forgerock.opendj.ldap;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.forgerock.opendj.ldap.ConnectionEventListener;
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for {@linkConnectionState}.
+ */
+@SuppressWarnings("javadoc")
+public class ConnectionStateTest extends LDAPTestCase {
+    private static final ErrorResultException ERROR = newErrorResult(ResultCode.OTHER);
+    private static final ErrorResultException LATE_ERROR = newErrorResult(ResultCode.BUSY);
+    private static final ExtendedResult UNSOLICITED =
+            newGenericExtendedResult(ResultCode.OPERATIONS_ERROR);
+
+    @Test
+    public void testCloseEventInClosedState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyConnectionClosed();
+        state.notifyConnectionClosed();
+
+        assertThat(state.isValid()).isFalse();
+        assertThat(state.isClosed()).isTrue();
+        assertThat(state.getConnectionError()).isNull();
+        verify(listener1).handleConnectionClosed();
+        verifyNoMoreInteractions(listener1);
+
+        // Listeners registered after event should be notified immediately.
+        final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener2);
+        verify(listener2).handleConnectionClosed();
+        verifyNoMoreInteractions(listener2);
+    }
+
+    @Test
+    public void testCloseEventInErrorState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyConnectionError(false, ERROR);
+        state.notifyConnectionClosed();
+
+        assertThat(state.isValid()).isFalse();
+        assertThat(state.isClosed()).isTrue();
+        assertThat(state.getConnectionError()).isSameAs(ERROR);
+        verify(listener1).handleConnectionError(false, ERROR);
+        verify(listener1).handleConnectionClosed();
+        verifyNoMoreInteractions(listener1);
+
+        // Listeners registered after event should be notified immediately.
+        final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener2);
+        verify(listener2).handleConnectionError(false, ERROR);
+        verify(listener2).handleConnectionClosed();
+        verifyNoMoreInteractions(listener2);
+    }
+
+    @Test
+    public void testCloseEventInValidState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyConnectionClosed();
+
+        assertThat(state.isValid()).isFalse();
+        assertThat(state.isClosed()).isTrue();
+        assertThat(state.getConnectionError()).isNull();
+        verify(listener1).handleConnectionClosed();
+        verifyNoMoreInteractions(listener1);
+
+        // Listeners registered after event should be notified immediately.
+        final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener2);
+        verify(listener2).handleConnectionClosed();
+        verifyNoMoreInteractions(listener2);
+    }
+
+    @Test
+    public void testErrorEventInClosedState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyConnectionClosed();
+        state.notifyConnectionError(false, ERROR);
+
+        assertThat(state.isValid()).isFalse();
+        assertThat(state.isClosed()).isTrue();
+        assertThat(state.getConnectionError()).isNull();
+        verify(listener1).handleConnectionClosed();
+        verifyNoMoreInteractions(listener1);
+
+        // Listeners registered after event should be notified immediately.
+        final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener2);
+        verify(listener2).handleConnectionClosed();
+        verifyNoMoreInteractions(listener2);
+    }
+
+    @Test
+    public void testErrorEventInErrorState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyConnectionError(false, ERROR);
+        state.notifyConnectionError(false, LATE_ERROR);
+
+        assertThat(state.isValid()).isFalse();
+        assertThat(state.isClosed()).isFalse();
+        assertThat(state.getConnectionError()).isSameAs(ERROR);
+        verify(listener1).handleConnectionError(false, ERROR);
+        verifyNoMoreInteractions(listener1);
+
+        // Listeners registered after event should be notified immediately.
+        final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener2);
+        assertThat(state.getConnectionError()).isSameAs(ERROR);
+        verify(listener2).handleConnectionError(false, ERROR);
+        verifyNoMoreInteractions(listener2);
+    }
+
+    @Test
+    public void testErrorEventInValidState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyConnectionError(false, ERROR);
+
+        assertThat(state.isValid()).isFalse();
+        assertThat(state.isClosed()).isFalse();
+        assertThat(state.getConnectionError()).isSameAs(ERROR);
+        verify(listener1).handleConnectionError(false, ERROR);
+        verifyNoMoreInteractions(listener1);
+
+        // Listeners registered after event should be notified immediately.
+        final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener2);
+        verify(listener2).handleConnectionError(false, ERROR);
+        verifyNoMoreInteractions(listener2);
+    }
+
+    @Test
+    public void testRemoveEventListener() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener);
+        state.notifyConnectionError(false, ERROR);
+        verify(listener).handleConnectionError(false, ERROR);
+        state.removeConnectionEventListener(listener);
+        state.notifyConnectionClosed();
+        verifyNoMoreInteractions(listener);
+    }
+
+    @Test
+    public void testUnsolicitedNotificationEventInClosedState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyConnectionClosed();
+        state.notifyUnsolicitedNotification(UNSOLICITED);
+
+        assertThat(state.isValid()).isFalse();
+        assertThat(state.isClosed()).isTrue();
+        assertThat(state.getConnectionError()).isNull();
+        verify(listener1).handleConnectionClosed();
+        verifyNoMoreInteractions(listener1);
+    }
+
+    @Test
+    public void testUnsolicitedNotificationEventInErrorState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyConnectionError(false, ERROR);
+        state.notifyUnsolicitedNotification(UNSOLICITED);
+
+        assertThat(state.isValid()).isFalse();
+        assertThat(state.isClosed()).isFalse();
+        assertThat(state.getConnectionError()).isSameAs(ERROR);
+        verify(listener1).handleConnectionError(false, ERROR);
+        verifyNoMoreInteractions(listener1);
+    }
+
+    @Test
+    public void testUnsolicitedNotificationEventInValidState() {
+        final ConnectionState state = new ConnectionState();
+        final ConnectionEventListener listener1 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener1);
+        state.notifyUnsolicitedNotification(UNSOLICITED);
+
+        assertThat(state.isValid()).isTrue();
+        assertThat(state.isClosed()).isFalse();
+        assertThat(state.getConnectionError()).isNull();
+        verify(listener1).handleUnsolicitedNotification(same(UNSOLICITED));
+        verifyNoMoreInteractions(listener1);
+
+        /*
+         * Listeners registered after event will not be notified (unsolicited
+         * notifications are not cached).
+         */
+        final ConnectionEventListener listener2 = mock(ConnectionEventListener.class);
+        state.addConnectionEventListener(listener2);
+        verifyNoMoreInteractions(listener2);
+    }
+
+    @Test
+    public void testValidState() {
+        final ConnectionState state = new ConnectionState();
+        assertThat(state.isValid()).isTrue();
+        assertThat(state.isClosed()).isFalse();
+        assertThat(state.getConnectionError()).isNull();
+    }
+}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java
new file mode 100644
index 0000000..5372fd7
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/test/java/com/forgerock/opendj/util/FutureResultTransformerTestCase.java
@@ -0,0 +1,132 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2013 ForgeRock AS.
+ */
+
+package com.forgerock.opendj.util;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.fest.assertions.Fail.fail;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import org.forgerock.opendj.ldap.ErrorResultException;
+import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.ResultHandler;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link FutureResultTransformer}.
+ */
+@SuppressWarnings("javadoc")
+public class FutureResultTransformerTestCase extends UtilTestCase {
+    private static final class TestFuture extends FutureResultTransformer<Integer, String> {
+        public TestFuture(final ResultHandler<? super String> handler) {
+            super(handler);
+        }
+
+        @Override
+        protected ErrorResultException transformErrorResult(final ErrorResultException error) {
+            assertThat(error).isSameAs(UNTRANSFORMED_ERROR);
+            return TRANSFORMED_ERROR;
+        }
+
+        @Override
+        protected String transformResult(final Integer result) throws ErrorResultException {
+            assertThat(result).isSameAs(UNTRANSFORMED_RESULT);
+            return TRANSFORMED_RESULT;
+        }
+    }
+
+    private static final ErrorResultException TRANSFORMED_ERROR = newErrorResult(ResultCode.OTHER,
+            "transformed");
+    private static final String TRANSFORMED_RESULT = "transformed";
+    private static final ErrorResultException UNTRANSFORMED_ERROR = newErrorResult(
+            ResultCode.OTHER, "untransformed");
+    private static final Integer UNTRANSFORMED_RESULT = Integer.valueOf(0);
+
+    @Test
+    public void testGetTransformsError() throws Exception {
+        final TestFuture future = new TestFuture(null);
+        future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR));
+        future.handleErrorResult(UNTRANSFORMED_ERROR);
+        try {
+            future.get();
+            fail();
+        } catch (final ErrorResultException e) {
+            assertThat(e).isSameAs(TRANSFORMED_ERROR);
+        }
+    }
+
+    @Test
+    public void testGetTransformsResult() throws Exception {
+        final TestFuture future = new TestFuture(null);
+        future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT));
+        future.handleResult(UNTRANSFORMED_RESULT);
+        assertThat(future.get()).isSameAs(TRANSFORMED_RESULT);
+    }
+
+    @Test
+    public void testGetWithTimeoutTransformsError() throws Exception {
+        final TestFuture future = new TestFuture(null);
+        future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR));
+        future.handleErrorResult(UNTRANSFORMED_ERROR);
+        try {
+            future.get(100, TimeUnit.SECONDS);
+            fail();
+        } catch (final ErrorResultException e) {
+            assertThat(e).isSameAs(TRANSFORMED_ERROR);
+        }
+    }
+
+    @Test
+    public void testGetWithTimeoutTransformsResult() throws Exception {
+        final TestFuture future = new TestFuture(null);
+        future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT));
+        future.handleResult(UNTRANSFORMED_RESULT);
+        assertThat(future.get(100, TimeUnit.SECONDS)).isSameAs(TRANSFORMED_RESULT);
+    }
+
+    @Test
+    public void testResultHandlerTransformsError() throws Exception {
+        @SuppressWarnings("unchecked")
+        final ResultHandler<String> handler = mock(ResultHandler.class);
+        final TestFuture future = new TestFuture(handler);
+        future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_ERROR));
+        future.handleErrorResult(UNTRANSFORMED_ERROR);
+        verify(handler).handleErrorResult(TRANSFORMED_ERROR);
+    }
+
+    @Test
+    public void testResultHandlerTransformsResult() throws Exception {
+        @SuppressWarnings("unchecked")
+        final ResultHandler<String> handler = mock(ResultHandler.class);
+        final TestFuture future = new TestFuture(handler);
+        future.setFutureResult(new CompletedFutureResult<Integer>(UNTRANSFORMED_RESULT));
+        future.handleResult(UNTRANSFORMED_RESULT);
+        verify(handler).handleResult(TRANSFORMED_RESULT);
+    }
+}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
index b8f4815..49d573d 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
@@ -181,7 +181,7 @@
             assertThat(scheduler.isScheduled()).isTrue();
 
             // Forcefully run the monitor task and check that the first factory is online.
-            scheduler.getCommand().run();
+            scheduler.runFirstTask();
             verify(listener).handleConnectionFactoryOnline(firstAsync);
             verifyNoMoreInteractions(listener);
         } finally {
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
index 4361cc5..8ceb48f 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2012 ForgeRock AS
+ *      Portions copyright 2011-2013 ForgeRock AS
  */
 
 package org.forgerock.opendj.ldap;
@@ -138,8 +138,8 @@
                         "objectclass=*", "cn");
 
         factories[0][0] =
-                new HeartBeatConnectionFactory(new LDAPConnectionFactory(getServerSocketAddress()),
-                        1000, TimeUnit.MILLISECONDS, request);
+                Connections.newHeartBeatConnectionFactory(new LDAPConnectionFactory(
+                        getServerSocketAddress()), 1000, 500, TimeUnit.MILLISECONDS, request);
 
         // InternalConnectionFactory
         factories[1][0] = Connections.newInternalConnectionFactory(LDAPServer.getInstance(), null);
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
index 83348e3..0dd7596 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -44,7 +44,6 @@
 
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.forgerock.opendj.ldap.requests.BindRequest;
@@ -448,10 +447,7 @@
         assertThat(scheduler.isScheduled()).isTrue();
 
         // First populate the pool with idle connections at time 0.
-        @SuppressWarnings("unchecked")
-        final Callable<Long> timeSource = mock(Callable.class);
-        when(timeSource.call()).thenReturn(0L);
-        pool.testTimeSource = timeSource;
+        pool.timeSource = mockTimeSource(0);
 
         assertThat(pool.currentPoolSize()).isEqualTo(0);
         Connection c1 = pool.getConnection();
@@ -466,13 +462,13 @@
         assertThat(pool.currentPoolSize()).isEqualTo(4);
 
         // First purge at time 50 is no-op because no connections have expired.
-        when(timeSource.call()).thenReturn(50L);
-        scheduler.getCommand().run();
+        when(pool.timeSource.currentTimeMillis()).thenReturn(50L);
+        scheduler.runFirstTask();
         assertThat(pool.currentPoolSize()).isEqualTo(4);
 
         // Second purge at time 150 should remove 2 non-core connections.
-        when(timeSource.call()).thenReturn(150L);
-        scheduler.getCommand().run();
+        when(pool.timeSource.currentTimeMillis()).thenReturn(150L);
+        scheduler.runFirstTask();
         assertThat(pool.currentPoolSize()).isEqualTo(2);
 
         verify(pooledConnection1, times(1)).close();
@@ -481,7 +477,7 @@
         verify(pooledConnection4, times(0)).close();
 
         // Regrow the pool at time 200.
-        when(timeSource.call()).thenReturn(200L);
+        when(pool.timeSource.currentTimeMillis()).thenReturn(200L);
         Connection c5 = pool.getConnection(); // pooledConnection3
         Connection c6 = pool.getConnection(); // pooledConnection4
         Connection c7 = pool.getConnection(); // pooledConnection5
@@ -494,13 +490,13 @@
         assertThat(pool.currentPoolSize()).isEqualTo(4);
 
         // Third purge at time 250 should not remove any connections.
-        when(timeSource.call()).thenReturn(250L);
-        scheduler.getCommand().run();
+        when(pool.timeSource.currentTimeMillis()).thenReturn(250L);
+        scheduler.runFirstTask();
         assertThat(pool.currentPoolSize()).isEqualTo(4);
 
         // Fourth purge at time 350 should remove 2 non-core connections.
-        when(timeSource.call()).thenReturn(350L);
-        scheduler.getCommand().run();
+        when(pool.timeSource.currentTimeMillis()).thenReturn(350L);
+        scheduler.runFirstTask();
         assertThat(pool.currentPoolSize()).isEqualTo(2);
 
         verify(pooledConnection3, times(1)).close();
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
index 06e5337..e44cf75 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
@@ -27,13 +27,15 @@
 package org.forgerock.opendj.ldap;
 
 import static org.fest.assertions.Assertions.assertThat;
-import static org.forgerock.opendj.ldap.Connections.newHeartBeatConnectionFactory;
+import static org.fest.assertions.Fail.fail;
 import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
 import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT;
 import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection;
 import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource;
 import static org.forgerock.opendj.ldap.requests.Requests.newModifyRequest;
 import static org.forgerock.opendj.ldap.requests.Requests.newSearchRequest;
+import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
 import static org.forgerock.opendj.ldap.responses.Responses.newResult;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.same;
@@ -46,14 +48,22 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
 
+import org.forgerock.opendj.ldap.requests.BindRequest;
 import org.forgerock.opendj.ldap.requests.SearchRequest;
 import org.forgerock.opendj.ldap.responses.Result;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.StaticUtils;
+
 /**
  * Tests the connection pool implementation..
  */
@@ -78,48 +88,169 @@
 
     // @formatter:on
 
-    @Test(enabled = false)
-    public void testHeartBeatTimeout() throws Exception {
-        // Mock connection which never responds to heartbeat.
-        final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>();
-        final Connection connection = mockConnection(listeners);
-        when(connection.isValid()).thenReturn(true);
+    private static final SearchRequest HEARTBEAT = newSearchRequest("dc=test", BASE_OBJECT,
+            "(objectclass=*)", "1.1");
 
-        // Underlying connection factory.
-        final ConnectionFactory factory = mockConnectionFactory(connection);
+    private Connection connection;
+    private ConnectionFactory factory;
+    private Connection hbc;
+    private HeartBeatConnectionFactory hbcf;
+    private List<ConnectionEventListener> listeners;
+    private MockScheduler scheduler;
 
-        // Create heart beat connection factory.
-        final SearchRequest hb = newSearchRequest("dc=test", BASE_OBJECT, "(objectclass=*)", "1.1");
-        final MockScheduler scheduler = new MockScheduler();
-        final ConnectionFactory hbcf =
-                newHeartBeatConnectionFactory(factory, 0, TimeUnit.MILLISECONDS, hb, scheduler);
+    /**
+     * Disables logging before the tests.
+     */
+    @BeforeClass()
+    public void disableLogging() {
+        StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE);
+    }
 
-        // First connection should cause heart beat to be scheduled.
-        final Connection hbc = hbcf.getConnection();
-        assertThat(scheduler.isScheduled()).isTrue();
-        assertThat(scheduler.getCommand()).isNotNull();
-        assertThat(scheduler.getDelay()).isEqualTo(0);
-        assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
+    /**
+     * Re-enable logging after the tests.
+     */
+    @AfterClass()
+    public void enableLogging() {
+        StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        try {
+            if (hbc != null) {
+                hbc.close();
+                assertThat(hbc.isValid()).isFalse();
+                assertThat(hbc.isClosed()).isTrue();
+            }
+            scheduler.runAllTasks(); // Flush any remaining timeout tasks.
+            assertThat(scheduler.isScheduled()).isFalse(); // No more connections to check.
+            hbcf.close();
+        } finally {
+            connection = null;
+            factory = null;
+            hbcf = null;
+            listeners = null;
+            scheduler = null;
+            hbc = null;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testBindWhileHeartBeatInProgress() throws Exception {
+        // Mock connection with successful heartbeat.
+        init(ResultCode.SUCCESS);
+
+        // Get a connection.
+        hbc = hbcf.getConnection();
+
+        /*
+         * Send a heartbeat, trapping the search call-back so that we can send
+         * the response once we have attempted a bind.
+         */
+        when(
+                connection.searchAsync(any(SearchRequest.class),
+                        any(IntermediateResponseHandler.class), any(SearchResultHandler.class)))
+                .thenReturn(null);
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+        scheduler.runAllTasks(); // Send the heartbeat.
+
+        // Capture the heartbeat search result handler.
+        final ArgumentCaptor<SearchResultHandler> arg =
+                ArgumentCaptor.forClass(SearchResultHandler.class);
+        verify(connection, times(2)).searchAsync(same(HEARTBEAT),
+                any(IntermediateResponseHandler.class), arg.capture());
+        assertThat(hbc.isValid()).isTrue(); // Not checked yet.
+
+        /*
+         * Now attempt a bind request, which should be held in a queue until the
+         * heart beat completes.
+         */
+        hbc.bindAsync(newSimpleBindRequest(), null, null);
+        verify(connection, times(0)).bindAsync(any(BindRequest.class),
+                any(IntermediateResponseHandler.class), any(ResultHandler.class));
+
+        // Send fake heartbeat response, releasing the bind request.
+        arg.getValue().handleResult(newResult(ResultCode.SUCCESS));
+        verify(connection, times(1)).bindAsync(any(BindRequest.class),
+                any(IntermediateResponseHandler.class), any(ResultHandler.class));
+    }
+
+    @Test
+    public void testGetConnection() throws Exception {
+        // Mock connection with successful initial heartbeat.
+        init(ResultCode.SUCCESS);
+        hbc = hbcf.getConnection();
+        assertThat(hbc).isNotNull();
+        assertThat(hbc.isValid()).isTrue();
+    }
+
+    @Test
+    public void testGetConnectionAsync() throws Exception {
+        // Mock connection with successful initial heartbeat.
+        init(ResultCode.SUCCESS);
+        hbc = hbcf.getConnectionAsync(null).get();
+        assertThat(hbc).isNotNull();
+        assertThat(hbc.isValid()).isTrue();
+    }
+
+    @Test
+    public void testGetConnectionAsyncWithInitialHeartBeatError() throws Exception {
+        // Mock connection with failing initial heartbeat.
+        init(ResultCode.BUSY);
+        try {
+            hbcf.getConnectionAsync(null).get();
+            fail("Unexpectedly obtained a connection");
+        } catch (final ErrorResultException e) {
+            checkInitialHeartBeatFailure(e);
+        }
+    }
+
+    @Test
+    public void testGetConnectionWithInitialHeartBeatError() throws Exception {
+        // Mock connection with failing initial heartbeat.
+        init(ResultCode.BUSY);
+        try {
+            hbcf.getConnection();
+            fail("Unexpectedly obtained a connection");
+        } catch (final ErrorResultException e) {
+            checkInitialHeartBeatFailure(e);
+        }
+    }
+
+    @Test
+    public void testHeartBeatSucceedsThenFails() throws Exception {
+        // Mock connection with successful heartbeat.
+        init(ResultCode.SUCCESS);
+
+        // Get a connection and check that it was pinged.
+        hbc = hbcf.getConnection();
+
+        verifyHeartBeatSent(connection, 1);
+        assertThat(scheduler.isScheduled()).isTrue(); // heartbeater
         assertThat(listeners).hasSize(1);
+        assertThat(hbc.isValid()).isTrue();
 
-        // The connection should be immediately invalid due to 0 timeout.
-        assertThat(connection.isValid()).isTrue();
+        // Invoke heartbeat before the connection is considered idle.
+        scheduler.runAllTasks();
+        verifyHeartBeatSent(connection, 1); // No heartbeat sent - not idle yet.
+        assertThat(hbc.isValid()).isTrue();
+
+        // Invoke heartbeat after the connection is considered idle.
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(6000L);
+        scheduler.runAllTasks();
+        verifyHeartBeatSent(connection, 2); // Heartbeat sent.
+        assertThat(hbc.isValid()).isTrue();
+
+        // Now force the heartbeat to fail.
+        mockHeartBeatResponse(connection, listeners, ResultCode.CLIENT_SIDE_SERVER_DOWN);
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+        scheduler.runAllTasks();
+        verifyHeartBeatSent(connection, 3);
         assertThat(hbc.isValid()).isFalse();
 
-        /*
-         * Attempt to send heartbeat. This should trigger the connection to be
-         * closed and all subsequent request attempts to fail.
-         */
-        scheduler.getCommand().run();
-
-        // The underlying connection should have been closed.
-        verify(connection).close();
-
-        /*
-         * ...and the scheduled heart beat stopped because there are no
-         * remaining connections.
-         */
-        assertThat(scheduler.isScheduled()).isFalse();
+        // Flush redundant timeout tasks.
+        scheduler.runAllTasks();
 
         // Attempt to send a new request: it should fail immediately.
         @SuppressWarnings("unchecked")
@@ -130,120 +261,150 @@
         verify(mockHandler).handleErrorResult(arg.capture());
         assertThat(arg.getValue().getResult().getResultCode()).isEqualTo(
                 ResultCode.CLIENT_SIDE_SERVER_DOWN);
+
+        assertThat(hbc.isValid()).isFalse();
+        assertThat(hbc.isClosed()).isFalse();
     }
 
     @Test
-    public void testSchedulerRegistration() throws Exception {
-        // Three mock connections.
-        final List<ConnectionEventListener> listeners1 = new LinkedList<ConnectionEventListener>();
-        final Connection connection1 = heartBeat(mockConnection(listeners1), ResultCode.SUCCESS);
+    public void testHeartBeatTimeout() throws Exception {
+        // Mock connection with successful heartbeat.
+        init(ResultCode.SUCCESS);
 
-        final List<ConnectionEventListener> listeners2 = new LinkedList<ConnectionEventListener>();
-        final Connection connection2 = heartBeat(mockConnection(listeners2), ResultCode.SUCCESS);
+        // Get a connection and check that it was pinged.
+        hbc = hbcf.getConnection();
 
-        final List<ConnectionEventListener> listeners3 = new LinkedList<ConnectionEventListener>();
-        final Connection connection3 = heartBeat(mockConnection(listeners3), ResultCode.SUCCESS);
-
-        // Underlying connection factory.
-        final ConnectionFactory factory =
-                mockConnectionFactory(connection1, connection2, connection3);
-
-        // Create heart beat connection factory.
-        final SearchRequest hb = newSearchRequest("dc=test", BASE_OBJECT, "(objectclass=*)", "1.1");
-        final MockScheduler scheduler = new MockScheduler();
-        final ConnectionFactory hbcf =
-                newHeartBeatConnectionFactory(factory, 0, TimeUnit.MILLISECONDS, hb, scheduler);
-
-        // Heart beat should not be scheduled yet.
-        assertThat(scheduler.isScheduled()).isFalse();
-
-        // First connection should cause heart beat to be scheduled.
-        hbcf.getConnection();
-        assertThat(scheduler.isScheduled()).isTrue();
-        assertThat(scheduler.getCommand()).isNotNull();
-        assertThat(scheduler.getDelay()).isEqualTo(0);
-        assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
-        assertThat(listeners1).hasSize(1);
-
-        // Second connection should not change anything.
-        hbcf.getConnection();
-        assertThat(scheduler.isScheduled()).isTrue();
-        assertThat(listeners2).hasSize(1);
-
-        // Check heart-beat sent to both connections.
-        scheduler.getCommand().run();
-        verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-        verify(connection2, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-        verify(connection3, times(0)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-
-        // Close first connection: heart beat should still be scheduled.
-        listeners1.get(0).handleConnectionClosed();
-        assertThat(scheduler.isScheduled()).isTrue();
-        assertThat(listeners1).isEmpty();
-
-        // Check heart-beat only sent to second connection.
-        scheduler.getCommand().run();
-        verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-        verify(connection2, times(2)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-        verify(connection3, times(0)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-
-        // Close second connection: heart beat should now be stopped.
-        listeners2.get(0).handleConnectionClosed();
-        assertThat(scheduler.isScheduled()).isFalse();
-        assertThat(listeners2).isEmpty();
-
-        // Opening another connection should restart the heart beat.
-        hbcf.getConnection();
-        assertThat(scheduler.isScheduled()).isTrue();
-        assertThat(scheduler.getCommand()).isNotNull();
-        assertThat(scheduler.getDelay()).isEqualTo(0);
-        assertThat(scheduler.getUnit()).isEqualTo(TimeUnit.MILLISECONDS);
-        assertThat(listeners3).hasSize(1);
-
-        // Check heart-beat only sent to third connection.
-        scheduler.getCommand().run();
-        verify(connection1, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-        verify(connection2, times(2)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-        verify(connection3, times(1)).searchAsync(same(hb), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
+        // Now force the heartbeat to fail due to timeout.
+        mockHeartBeatResponse(connection, listeners, null /* no response */);
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+        scheduler.runAllTasks(); // Send the heartbeat.
+        verifyHeartBeatSent(connection, 2);
+        assertThat(hbc.isValid()).isTrue(); // Not checked yet.
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(12000L);
+        scheduler.runAllTasks(); // Check for heartbeat.
+        assertThat(hbc.isValid()).isFalse(); // Now invalid.
+        assertThat(hbc.isClosed()).isFalse();
     }
 
-    private Connection heartBeat(final Connection mockConnection, final ResultCode resultCode) {
-        doAnswer(new Answer<Void>() {
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testHeartBeatWhileBindInProgress() throws Exception {
+        // Mock connection with successful heartbeat.
+        init(ResultCode.SUCCESS);
+
+        // Get a connection.
+        hbc = hbcf.getConnection();
+
+        /*
+         * Send a bind request, trapping the bind call-back so that we can send
+         * the response once we have attempted a heartbeat.
+         */
+        when(
+                connection.bindAsync(any(BindRequest.class),
+                        any(IntermediateResponseHandler.class), any(ResultHandler.class)))
+                .thenReturn(null);
+        hbc.bindAsync(newSimpleBindRequest(), null, null);
+
+        // Capture the bind result handler.
+        @SuppressWarnings("rawtypes")
+        final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
+        verify(connection, times(1)).bindAsync(any(BindRequest.class),
+                any(IntermediateResponseHandler.class), arg.capture());
+
+        /*
+         * Now attempt the heartbeat which should not happen because there is a
+         * bind in progress.
+         */
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+        scheduler.runAllTasks(); // Attempt to send the heartbeat.
+        verify(connection, times(1)).searchAsync(same(HEARTBEAT),
+                any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+
+        // Send fake bind response, releasing the heartbeat.
+        arg.getValue().handleResult(newResult(ResultCode.SUCCESS));
+
+        // Attempt to send a heartbeat again.
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(16000L);
+        scheduler.runAllTasks(); // Attempt to send the heartbeat.
+        verify(connection, times(2)).searchAsync(same(HEARTBEAT),
+                any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+    }
+
+    @Test
+    public void testToString() {
+        init(ResultCode.SUCCESS);
+        assertThat(hbcf.toString()).isNotNull();
+    }
+
+    private void checkInitialHeartBeatFailure(final ErrorResultException e) {
+        /*
+         * Initial heartbeat failure should trigger connection exception with
+         * heartbeat cause.
+         */
+        assertThat(e).isInstanceOf(ConnectionException.class);
+        assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_SERVER_DOWN);
+        assertThat(e.getCause()).isInstanceOf(ErrorResultException.class);
+        assertThat(((ErrorResultException) e.getCause()).getResult().getResultCode()).isEqualTo(
+                ResultCode.BUSY);
+    }
+
+    private void init(final ResultCode initialHeartBeatResult) {
+        // Mock connection with successful heartbeat.
+        listeners = new LinkedList<ConnectionEventListener>();
+        connection = mockConnection(listeners);
+        when(connection.isValid()).thenReturn(true);
+        mockHeartBeatResponse(connection, listeners, initialHeartBeatResult);
+
+        // Underlying connection factory.
+        factory = mockConnectionFactory(connection);
+
+        // Create heart beat connection factory.
+        scheduler = new MockScheduler();
+        hbcf =
+                new HeartBeatConnectionFactory(factory, 10000, 100, TimeUnit.MILLISECONDS,
+                        HEARTBEAT, scheduler);
+
+        // Set initial time stamp.
+        hbcf.timeSource = mockTimeSource(0);
+    }
+
+    private Connection mockHeartBeatResponse(final Connection mockConnection,
+            final List<ConnectionEventListener> listeners, final ResultCode resultCode) {
+        doAnswer(new Answer<FutureResult<Result>>() {
             @Override
-            public Void answer(final InvocationOnMock invocation) throws Throwable {
+            public FutureResult<Result> answer(final InvocationOnMock invocation) throws Throwable {
+                if (resultCode == null) {
+                    return null;
+                }
+
                 final SearchResultHandler handler =
                         (SearchResultHandler) invocation.getArguments()[2];
                 if (resultCode.isExceptional()) {
-                    handler.handleErrorResult(newErrorResult(resultCode));
+                    final ErrorResultException error = newErrorResult(resultCode);
+                    if (handler != null) {
+                        handler.handleErrorResult(error);
+                    }
+                    if (error instanceof ConnectionException) {
+                        for (final ConnectionEventListener listener : listeners) {
+                            listener.handleConnectionError(false, error);
+                        }
+                    }
+                    return new CompletedFutureResult<Result>(error);
                 } else {
-                    handler.handleResult(newResult(resultCode));
+                    final Result result = newResult(resultCode);
+                    if (handler != null) {
+                        handler.handleResult(result);
+                    }
+                    return new CompletedFutureResult<Result>(result);
                 }
-                return null;
             }
         }).when(mockConnection).searchAsync(any(SearchRequest.class),
                 any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
         return mockConnection;
     }
 
-    //    private void sleepFor(final long ms) {
-    //        // Avoid premature wake-ups.
-    //        final long until = System.currentTimeMillis() + ms;
-    //        do {
-    //            try {
-    //                Thread.sleep(until - System.currentTimeMillis());
-    //            } catch (final InterruptedException e) {
-    //                Thread.currentThread().interrupt();
-    //                return;
-    //            }
-    //        } while (System.currentTimeMillis() < until);
-    //    }
+    private void verifyHeartBeatSent(final Connection connection, final int times) {
+        verify(connection, times(times)).searchAsync(same(HEARTBEAT),
+                any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+    }
 }
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
index af7ff5c..605711c 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockScheduler.java
@@ -25,10 +25,14 @@
  */
 package org.forgerock.opendj.ldap;
 
+import static java.util.concurrent.Executors.callable;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -43,11 +47,74 @@
  */
 final class MockScheduler implements ScheduledExecutorService {
 
-    // Saved scheduled task.
-    private Runnable command;
-    private long delay;
-    private boolean isScheduled = false;
-    private TimeUnit unit;
+    private final class ScheduledCallableFuture<T> implements ScheduledFuture<T>, Callable<T> {
+        private final Callable<T> callable;
+        private final CountDownLatch isDone = new CountDownLatch(1);
+        private final boolean removeAfterCall;
+        private T result = null;
+
+        private ScheduledCallableFuture(final Callable<T> callable, final boolean removeAfterCall) {
+            this.callable = callable;
+            this.removeAfterCall = removeAfterCall;
+        }
+
+        @Override
+        public T call() throws Exception {
+            result = callable.call();
+            isDone.countDown();
+            if (removeAfterCall) {
+                tasks.remove(this);
+            }
+            return result;
+        }
+
+        @Override
+        public boolean cancel(final boolean mayInterruptIfRunning) {
+            return tasks.remove(this);
+        }
+
+        @Override
+        public int compareTo(final Delayed o) {
+            // Unused.
+            return 0;
+        }
+
+        @Override
+        public T get() throws InterruptedException, ExecutionException {
+            isDone.await();
+            return result;
+        }
+
+        @Override
+        public T get(final long timeout, final TimeUnit unit) throws InterruptedException,
+                ExecutionException, TimeoutException {
+            if (isDone.await(timeout, unit)) {
+                return result;
+            } else {
+                throw new TimeoutException();
+            }
+        }
+
+        @Override
+        public long getDelay(final TimeUnit unit) {
+            // Unused.
+            return 0;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return tasks.contains(this);
+        }
+
+        @Override
+        public boolean isDone() {
+            return isDone.getCount() == 0;
+        }
+
+    }
+
+    // Saved scheduled tasks.
+    private final List<Callable<?>> tasks = new CopyOnWriteArrayList<Callable<?>>();
 
     MockScheduler() {
         // Nothing to do.
@@ -109,74 +176,24 @@
     @Override
     public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay,
             final TimeUnit unit) {
-        // Unused.
-        return null;
+        return onceOnly(callable);
     }
 
     @Override
     public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
-        // Unused.
-        return null;
+        return onceOnly(callable(command));
     }
 
     @Override
     public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay,
             final long period, final TimeUnit unit) {
-        // Unused.
-        return null;
+        return repeated(callable(command));
     }
 
     @Override
     public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
             final long initialDelay, final long delay, final TimeUnit unit) {
-        this.command = command;
-        this.delay = delay;
-        this.unit = unit;
-        this.isScheduled = true;
-        return new ScheduledFuture<Object>() {
-            @Override
-            public boolean cancel(final boolean mayInterruptIfRunning) {
-                isScheduled = false;
-                return true;
-            }
-
-            @Override
-            public int compareTo(final Delayed o) {
-                // Unused.
-                return 0;
-            }
-
-            @Override
-            public Object get() throws InterruptedException, ExecutionException {
-                // Unused.
-                return null;
-            }
-
-            @Override
-            public Object get(final long timeout, final TimeUnit unit) throws InterruptedException,
-                    ExecutionException, TimeoutException {
-                // Unused.
-                return null;
-            }
-
-            @Override
-            public long getDelay(final TimeUnit unit) {
-                // Unused.
-                return 0;
-            }
-
-            @Override
-            public boolean isCancelled() {
-                return !isScheduled;
-            }
-
-            @Override
-            public boolean isDone() {
-                // Unused.
-                return false;
-            }
-
-        };
+        return repeated(callable(command));
     }
 
     @Override
@@ -192,35 +209,62 @@
 
     @Override
     public <T> Future<T> submit(final Callable<T> task) {
-        // Unused.
-        return null;
+        return onceOnly(task);
     }
 
     @Override
     public Future<?> submit(final Runnable task) {
-        // Unused.
-        return null;
+        return onceOnly(callable(task));
     }
 
     @Override
     public <T> Future<T> submit(final Runnable task, final T result) {
-        // Unused.
-        return null;
+        return onceOnly(callable(task, result));
     }
 
-    Runnable getCommand() {
-        return command;
+    List<Callable<?>> getAllTasks() {
+        return tasks;
     }
 
-    long getDelay() {
-        return delay;
-    }
-
-    TimeUnit getUnit() {
-        return unit;
+    Callable<?> getFirstTask() {
+        return tasks.get(0);
     }
 
     boolean isScheduled() {
-        return isScheduled;
+        return !tasks.isEmpty();
+    }
+
+    void runAllTasks() {
+        for (final Callable<?> task : tasks) {
+            runTask0(task);
+        }
+    }
+
+    void runFirstTask() {
+        runTask(0);
+    }
+
+    void runTask(final int i) {
+        runTask0(tasks.get(i));
+    }
+
+    private <T> ScheduledCallableFuture<T> onceOnly(final Callable<T> callable) {
+        final ScheduledCallableFuture<T> wrapped = new ScheduledCallableFuture<T>(callable, true);
+        tasks.add(wrapped);
+        return wrapped;
+    }
+
+    private <T> ScheduledCallableFuture<T> repeated(final Callable<T> callable) {
+        final ScheduledCallableFuture<T> wrapped = new ScheduledCallableFuture<T>(callable, false);
+        tasks.add(wrapped);
+        return wrapped;
+    }
+
+    private void runTask0(final Callable<?> task) {
+        try {
+            task.call();
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java
index 48facfa..708776d 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/SdkTestCase.java
@@ -33,7 +33,6 @@
  * An abstract class that all types unit tests should extend. A type represents
  * the classes found directly under the package org.forgerock.opendj.ldap.
  */
-
 @Test(groups = { "precommit", "types", "sdk" })
 public abstract class SdkTestCase extends ForgeRockTestCase {
 }
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java
index 33b34f5..2a8c3bd 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions copyright 2012 ForgeRock AS.
+ *      Portions copyright 2012-2013 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
@@ -42,8 +42,10 @@
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
 
 import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.TimeSource;
 
 /**
  * This class defines some utility functions which can be used by test cases.
@@ -202,4 +204,22 @@
 
         return mockConnection;
     }
+
+    /**
+     * Returns a mock {@link TimeSource} which can be used for injecting fake
+     * time stamps into components.
+     *
+     * @param times
+     *            The times in milli-seconds which should be returned by the
+     *            time source.
+     * @return The mock time source.
+     */
+    public static TimeSource mockTimeSource(final long... times) {
+        final TimeSource mock = mock(TimeSource.class);
+        OngoingStubbing<Long> stubbing = when(mock.currentTimeMillis());
+        for (long t : times) {
+            stubbing = stubbing.thenReturn(t);
+        }
+        return mock;
+    }
 }
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java
new file mode 100644
index 0000000..3cc8944
--- /dev/null
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/TestCaseUtilsTestCase.java
@@ -0,0 +1,53 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource;
+
+import org.testng.annotations.Test;
+
+import com.forgerock.opendj.util.TimeSource;
+
+@SuppressWarnings("javadoc")
+public class TestCaseUtilsTestCase extends SdkTestCase {
+
+    /**
+     * Test for {@link #mockTimeSource(long...)}.
+     */
+    @Test
+    public void testMockTimeSource() {
+        final TimeSource mock1 = mockTimeSource(10);
+        assertThat(mock1.currentTimeMillis()).isEqualTo(10);
+        assertThat(mock1.currentTimeMillis()).isEqualTo(10);
+
+        final TimeSource mock2 = mockTimeSource(10, 20, 30);
+        assertThat(mock2.currentTimeMillis()).isEqualTo(10);
+        assertThat(mock2.currentTimeMillis()).isEqualTo(20);
+        assertThat(mock2.currentTimeMillis()).isEqualTo(30);
+        assertThat(mock2.currentTimeMillis()).isEqualTo(30);
+    }
+}
diff --git a/opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json b/opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json
index 7c2e135..0381038 100644
--- a/opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json
+++ b/opendj3/opendj-rest2ldap-servlet/src/main/webapp/opendj-rest2ldap-servlet.json
@@ -35,8 +35,10 @@
             // Re-usable pool of 24 connections per server.
             "connectionPoolSize"       : 24,
 
-            // Check pooled connections are alive every 30 seconds.
-            "heartBeatIntervalSeconds" : 30,
+            // Check pooled connections are alive every 30 seconds with a 500ms
+            // heart beat timeout.
+            "heartBeatIntervalSeconds"    : 30,
+            "heartBeatTimeoutMilliSeconds" : 500,
 
             // The preferred load-balancing pool.
             "primaryLDAPServers"       : [
diff --git a/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java b/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
index 7f84a81..edb811a 100644
--- a/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
+++ b/opendj3/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
@@ -970,6 +970,9 @@
                 Math.max(configuration.get("connectionPoolSize").defaultTo(10).asInteger(), 1);
         final int heartBeatIntervalSeconds =
                 Math.max(configuration.get("heartBeatIntervalSeconds").defaultTo(30).asInteger(), 1);
+        final int heartBeatTimeoutMilliSeconds =
+                Math.max(configuration.get("heartBeatTimeoutMilliSeconds").defaultTo(500)
+                        .asInteger(), 100);
 
         // Parse authentication parameters.
         final BindRequest bindRequest;
@@ -1036,7 +1039,7 @@
         }
         final ConnectionFactory primary =
                 parseLDAPServers(primaryLDAPServers, bindRequest, connectionPoolSize,
-                        heartBeatIntervalSeconds, options);
+                        heartBeatIntervalSeconds, heartBeatTimeoutMilliSeconds, options);
 
         // Parse secondary data center(s).
         final JsonValue secondaryLDAPServers = configuration.get("secondaryLDAPServers");
@@ -1045,7 +1048,7 @@
             if (secondaryLDAPServers.size() > 0) {
                 secondary =
                         parseLDAPServers(secondaryLDAPServers, bindRequest, connectionPoolSize,
-                                heartBeatIntervalSeconds, options);
+                                heartBeatIntervalSeconds, heartBeatTimeoutMilliSeconds, options);
             } else {
                 secondary = null;
             }
@@ -1093,20 +1096,22 @@
 
     private static ConnectionFactory parseLDAPServers(final JsonValue config,
             final BindRequest bindRequest, final int connectionPoolSize,
-            final int heartBeatIntervalSeconds, final LDAPOptions options) {
+            final int heartBeatIntervalSeconds, final int heartBeatTimeoutMilliSeconds,
+            final LDAPOptions options) {
         final List<ConnectionFactory> servers = new ArrayList<ConnectionFactory>(config.size());
         for (final JsonValue server : config) {
             final String host = server.get("hostname").required().asString();
             final int port = server.get("port").required().asInteger();
             ConnectionFactory factory = new LDAPConnectionFactory(host, port, options);
+            factory =
+                    Connections.newHeartBeatConnectionFactory(factory,
+                            heartBeatIntervalSeconds * 1000, heartBeatTimeoutMilliSeconds,
+                            TimeUnit.MILLISECONDS);
             if (bindRequest != null) {
                 factory = Connections.newAuthenticatedConnectionFactory(factory, bindRequest);
             }
             if (connectionPoolSize > 1) {
                 factory =
-                        Connections.newHeartBeatConnectionFactory(factory,
-                                heartBeatIntervalSeconds, TimeUnit.SECONDS);
-                factory =
                         Connections.newCachedConnectionPool(factory, 0, connectionPoolSize, 60L,
                                 TimeUnit.SECONDS);
             }

--
Gitblit v1.10.0