mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
18.28.2013 7d1bbf9b372e41121198be2b9f0f322d58b8d014
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,23 +27,27 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.*;
import static java.lang.System.*;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static org.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT;
import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_FAILED;
import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
@@ -54,30 +58,297 @@
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.GenericExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import org.forgerock.opendj.ldif.ConnectionEntryReader;
import com.forgerock.opendj.ldap.ConnectionState;
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.FutureResultTransformer;
import com.forgerock.opendj.util.RecursiveFutureResult;
import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.TimeSource;
import com.forgerock.opendj.util.Validator;
/**
 * An heart beat connection factory can be used to create connections that sends
 * a periodic search request to a Directory Server.
 * <p>
 * Before returning new connections to the application this factory will first
 * send an initial heart beat in order to determine that the remote server is
 * responsive. If the heart beat fails or times out then the connection is
 * closed immediately and an error returned to the client.
 * <p>
 * Once a connection has been established successfully (including the initial
 * heart beat), this factory will periodically send heart beats on the
 * connection based on the configured heart beat interval. If the heart beat
 * times out then the server is assumed to be down and an appropriate
 * {@link ConnectionException} generated and published to any registered
 * {@link ConnectionEventListener}s. Note however, that heart beats will only be
 * sent when the connection is determined to be reasonably idle: there is no
 * point in sending heart beats if the connection has recently received a
 * response. A connection is deemed to be idle if no response has been received
 * during a period equivalent to half the heart beat interval.
 * <p>
 * The LDAP protocol specifically precludes clients from performing operations
 * while bind or startTLS requests are being performed. Likewise, a bind or
 * startTLS request will cause active operations to be aborted. This factory
 * coordinates heart beats with bind or startTLS requests, ensuring that they
 * are not performed concurrently. Specifically, bind and startTLS requests are
 * queued up while a heart beat is pending, and heart beats are not sent at all
 * while there are pending bind or startTLS requests.
 */
final class HeartBeatConnectionFactory implements ConnectionFactory {
    /**
     * This class is responsible for performing an initial heart beat once a new
     * connection has been obtained and, if the heart beat succeeds, adapting it
     * to a {@code ConnectionImpl} and registering it in the table of valid
     * connections.
     */
    private final class ConnectionFutureResultImpl {
        /**
         * This class handles the initial heart beat result notification or
         * timeout. We need to take care to avoid processing multiple results,
         * which may occur when the heart beat is timed out and a result follows
         * soon after, or vice versa.
         */
        private final class InitialHeartBeatResultHandler implements SearchResultHandler, Runnable {
            private final ResultHandler<? super Result> handler;
            /**
             * Due to a potential race between the heart beat timing out and the
             * heart beat completing this atomic ensures that notification only
             * occurs once.
             */
            private final AtomicBoolean isComplete = new AtomicBoolean();
            private InitialHeartBeatResultHandler(final ResultHandler<? super Result> handler) {
                this.handler = handler;
            }
            @Override
            public boolean handleEntry(final SearchResultEntry entry) {
                /*
                 * Depending on the configuration, a heartbeat may return some
                 * entries. However, we can just ignore them.
                 */
                return true;
            }
            @Override
            public void handleErrorResult(final ErrorResultException error) {
                if (isComplete.compareAndSet(false, true)) {
                    handler.handleErrorResult(error);
                }
            }
            @Override
            public boolean handleReference(final SearchResultReference reference) {
                /*
                 * Depending on the configuration, a heartbeat may return some
                 * references. However, we can just ignore them.
                 */
                return true;
            }
            @Override
            public void handleResult(final Result result) {
                if (isComplete.compareAndSet(false, true)) {
                    handler.handleResult(result);
                }
            }
            /*
             * Invoked by the scheduler when the heart beat times out.
             */
            @Override
            public void run() {
                handleErrorResult(newHeartBeatTimeoutError());
            }
        }
        private Connection connection;
        private final RecursiveFutureResult<Connection, Result> futureConnectionResult;
        private final FutureResultTransformer<Result, Connection> futureSearchResult;
        private ConnectionFutureResultImpl(final ResultHandler<? super Connection> handler) {
            // Create a future which will handle the initial heart beat result.
            this.futureSearchResult = new FutureResultTransformer<Result, Connection>(handler) {
                @Override
                protected ErrorResultException transformErrorResult(
                        final ErrorResultException errorResult) {
                    // Ensure that the connection is closed.
                    if (connection != null) {
                        connection.close();
                        connection = null;
                    }
                    return adaptHeartBeatError(errorResult);
                }
                @Override
                protected Connection transformResult(final Result result)
                        throws ErrorResultException {
                    return adaptConnection(connection);
                }
            };
            // Create a future which will handle connection result.
            this.futureConnectionResult =
                    new RecursiveFutureResult<Connection, Result>(futureSearchResult) {
                        @Override
                        protected FutureResult<? extends Result> chainResult(
                                final Connection innerResult,
                                final ResultHandler<? super Result> handler)
                                throws ErrorResultException {
                            // Save the connection for later once the heart beat completes.
                            connection = innerResult;
                            /*
                             * Send the initial heart beat and schedule a client
                             * side timeout notification.
                             */
                            final InitialHeartBeatResultHandler wrappedHandler =
                                    new InitialHeartBeatResultHandler(handler);
                            scheduler.get().schedule(wrappedHandler, timeoutMS,
                                    TimeUnit.MILLISECONDS);
                            return connection.searchAsync(heartBeatRequest, null, wrappedHandler);
                        }
                    };
            // Link the two futures.
            futureSearchResult.setFutureResult(futureConnectionResult);
        }
    }
    /**
     * A connection that sends heart beats and supports all operations.
     */
    private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements
            ConnectionEventListener, SearchResultHandler {
    private final class ConnectionImpl extends AbstractAsynchronousConnection implements
            ConnectionEventListener {
        /**
         * A result handler wrapper for operations which timestamps the
         * connection for each response received. The wrapper ensures that
         * completed requests are removed from the {@code pendingResults} queue,
         * as well as ensuring that requests are only completed once.
         */
        private abstract class AbstractWrappedResultHandler<R, H extends ResultHandler<? super R>>
                implements ResultHandler<R>, FutureResult<R> {
            /** The user provided result handler. */
            protected final H handler;
            private final CountDownLatch completed = new CountDownLatch(1);
            private ErrorResultException error;
            private FutureResult<R> innerFuture;
            private R result;
            AbstractWrappedResultHandler(final H handler) {
                this.handler = handler;
            }
            @Override
            public boolean cancel(final boolean mayInterruptIfRunning) {
                return innerFuture.cancel(mayInterruptIfRunning);
            }
            @Override
            public R get() throws ErrorResultException, InterruptedException {
                completed.await();
                return get0();
            }
            @Override
            public R get(final long timeout, final TimeUnit unit) throws ErrorResultException,
                    TimeoutException, InterruptedException {
                if (completed.await(timeout, unit)) {
                    return get0();
                } else {
                    throw new TimeoutException();
                }
            }
            @Override
            public int getRequestID() {
                return innerFuture.getRequestID();
            }
            @Override
            public void handleErrorResult(final ErrorResultException error) {
                if (tryComplete(null, error)) {
                    if (handler != null) {
                        handler.handleErrorResult(timestamp(error));
                    } else {
                        timestamp(error);
                    }
                }
            }
            @Override
            public void handleResult(final R result) {
                if (tryComplete(result, null)) {
                    if (handler != null) {
                        handler.handleResult(timestamp(result));
                    } else {
                        timestamp(result);
                    }
                }
            }
            @Override
            public boolean isCancelled() {
                return innerFuture.isCancelled();
            }
            @Override
            public boolean isDone() {
                return completed.getCount() == 0;
            }
            abstract void releaseBindOrStartTLSLockIfNeeded();
            FutureResult<R> setInnerFuture(final FutureResult<R> innerFuture) {
                this.innerFuture = innerFuture;
                return this;
            }
            private R get0() throws ErrorResultException {
                if (result != null) {
                    return result;
                } else {
                    throw error;
                }
            }
            /**
             * Attempts to complete this request, returning true if successful.
             * This method is synchronized in order to avoid race conditions
             * with search result processing.
             */
            private synchronized boolean tryComplete(final R result,
                    final ErrorResultException error) {
                if (pendingResults.remove(this)) {
                    this.result = result;
                    this.error = error;
                    completed.countDown();
                    releaseBindOrStartTLSLockIfNeeded();
                    return true;
                } else {
                    return false;
                }
            }
        }
        /**
         * Runs pending request once the shared lock becomes available (when no
@@ -120,161 +391,246 @@
                }
                return null;
            }
        }
        /*
         * List of pending Bind or StartTLS requests which must be invoked when
        /**
         * A result handler wrapper for bind or startTLS requests which releases
         * the bind/startTLS lock on completion.
         */
        private final class WrappedBindOrStartTLSResultHandler<R> extends
                AbstractWrappedResultHandler<R, ResultHandler<? super R>> {
            WrappedBindOrStartTLSResultHandler(final ResultHandler<? super R> handler) {
                super(handler);
            }
            @Override
            void releaseBindOrStartTLSLockIfNeeded() {
                releaseBindOrStartTLSLock();
            }
        }
        /**
         * A result handler wrapper for normal requests which does not release
         * the bind/startTLS lock on completion.
         */
        private final class WrappedResultHandler<R> extends
                AbstractWrappedResultHandler<R, ResultHandler<? super R>> {
            WrappedResultHandler(final ResultHandler<? super R> handler) {
                super(handler);
            }
            @Override
            void releaseBindOrStartTLSLockIfNeeded() {
                // No-op for normal operations.
            }
        }
        /**
         * A result handler wrapper for search operations. Ensures that search
         * results are not sent once the request has been completed (see
         * markComplete()).
         */
        private final class WrappedSearchResultHandler extends
                AbstractWrappedResultHandler<Result, SearchResultHandler> implements
                SearchResultHandler {
            WrappedSearchResultHandler(final SearchResultHandler handler) {
                super(handler);
            }
            @Override
            public synchronized boolean handleEntry(final SearchResultEntry entry) {
                if (!isDone()) {
                    if (handler != null) {
                        handler.handleEntry(timestamp(entry));
                    } else {
                        timestamp(entry);
                    }
                    return true;
                } else {
                    return false;
                }
            }
            @Override
            public synchronized boolean handleReference(final SearchResultReference reference) {
                if (!isDone()) {
                    if (handler != null) {
                        handler.handleReference(timestamp(reference));
                    } else {
                        timestamp(reference);
                    }
                    return true;
                } else {
                    return false;
                }
            }
            @Override
            void releaseBindOrStartTLSLockIfNeeded() {
                // No-op for normal operations.
            }
        }
        /** The wrapped connection. */
        private final Connection connection;
        /**
         * Search result handler for processing heart beat responses.
         */
        private final SearchResultHandler heartBeatHandler = new SearchResultHandler() {
            @Override
            public boolean handleEntry(final SearchResultEntry entry) {
                timestamp(entry);
                return true;
            }
            @Override
            public void handleErrorResult(final ErrorResultException error) {
                /*
                 * Connection failure will be handled by connection event
                 * listener. Ignore cancellation errors since these indicate
                 * that the heart beat was aborted by a client-side close.
                 */
                if (!(error instanceof CancelledResultException)) {
                    if (DEBUG_LOG.isLoggable(Level.WARNING)) {
                        DEBUG_LOG.warning(String.format(
                                "Heartbeat failed for connection factory '%s': %s", factory, error
                                        .getMessage()));
                    }
                    timestamp(error);
                }
                releaseHeartBeatLock();
            }
            @Override
            public boolean handleReference(final SearchResultReference reference) {
                timestamp(reference);
                return true;
            }
            @Override
            public void handleResult(final Result result) {
                timestamp(result);
                releaseHeartBeatLock();
            }
        };
        /**
         * List of pending Bind or StartTLS requests which must be invoked once
         * the current heart beat completes.
         */
        private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
        private final Queue<Runnable> pendingBindOrStartTLSRequests =
                new ConcurrentLinkedQueue<Runnable>();
        /* Coordinates heart-beats with Bind and StartTLS requests. */
        /**
         * List of pending responses for all active operations. These will be
         * signalled if no heart beat is detected within the permitted timeout
         * period.
         */
        private final Queue<AbstractWrappedResultHandler<?, ?>> pendingResults =
                new ConcurrentLinkedQueue<AbstractWrappedResultHandler<?, ?>>();
        /** Internal connection state. */
        private final ConnectionState state = new ConnectionState();
        /** Coordinates heart-beats with Bind and StartTLS requests. */
        private final Sync sync = new Sync();
        /*
        /**
         * Timestamp of last response received (any response, not just heart
         * beats).
         */
        private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
        private volatile long lastResponseTimestamp = timeSource.currentTimeMillis(); // Assume valid at creation.
        private ConnectionImpl(final Connection connection) {
            super(connection);
            this.connection = connection;
            connection.addConnectionEventListener(this);
        }
        @Override
        public Result add(final AddRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.add(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result add(final Entry entry) throws ErrorResultException {
            try {
                return timestamp(connection.add(entry));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result add(final String... ldifLines) throws ErrorResultException {
            try {
                return timestamp(connection.add(ldifLines));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        public FutureResult<Void> abandonAsync(final AbandonRequest request) {
            return connection.abandonAsync(request);
        }
        @Override
        public FutureResult<Result> addAsync(final AddRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super Result> resultHandler) {
            return connection.addAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public BindResult bind(final BindRequest request) throws ErrorResultException {
            acquireBindOrStartTLSLock();
            try {
                return timestamp(connection.bind(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                releaseBindOrStartTLSLock();
            if (checkState(resultHandler)) {
                final WrappedResultHandler<Result> h = wrap(resultHandler);
                return checkState(connection.addAsync(request, intermediateResponseHandler, h), h);
            } else {
                return newConnectionErrorFuture();
            }
        }
        @Override
        public BindResult bind(final String name, final char[] password)
                throws ErrorResultException {
            acquireBindOrStartTLSLock();
            try {
                return timestamp(connection.bind(name, password));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                releaseBindOrStartTLSLock();
            }
        public void addConnectionEventListener(final ConnectionEventListener listener) {
            state.addConnectionEventListener(listener);
        }
        @Override
        public FutureResult<BindResult> bindAsync(final BindRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super BindResult> resultHandler) {
            if (sync.tryLockShared()) {
                // Fast path
                return connection.bindAsync(request, intermediateResponseHandler, timestamper(
                        resultHandler, true));
            if (checkState(resultHandler)) {
                if (sync.tryLockShared()) {
                    // Fast path
                    final WrappedBindOrStartTLSResultHandler<BindResult> h =
                            wrapForBindOrStartTLS(resultHandler);
                    return checkState(
                            connection.bindAsync(request, intermediateResponseHandler, h), h);
                } else {
                    /*
                     * A heart beat must be in progress so create a runnable
                     * task which will be executed when the heart beat
                     * completes.
                     */
                    final DelayedFuture<BindResult> future =
                            new DelayedFuture<BindResult>(resultHandler) {
                                @Override
                                public FutureResult<BindResult> dispatch() {
                                    final WrappedBindOrStartTLSResultHandler<BindResult> h =
                                            wrapForBindOrStartTLS(this);
                                    return checkState(connection.bindAsync(request,
                                            intermediateResponseHandler, h), h);
                                }
                            };
                    /*
                     * Enqueue and flush if the heart beat has completed in the
                     * mean time.
                     */
                    pendingBindOrStartTLSRequests.offer(future);
                    flushPendingBindOrStartTLSRequests();
                    return future;
                }
            } else {
                /*
                 * A heart beat must be in progress so create a runnable task
                 * which will be executed when the heart beat completes.
                 */
                final DelayedFuture<BindResult> future =
                        new DelayedFuture<BindResult>(resultHandler) {
                            @Override
                            public FutureResult<BindResult> dispatch() {
                                return connection.bindAsync(request, intermediateResponseHandler,
                                        timestamper(this, true));
                            }
                        };
                /*
                 * Enqueue and flush if the heart beat has completed in the mean
                 * time.
                 */
                pendingRequests.offer(future);
                flushPendingRequests();
                return future;
                return newConnectionErrorFuture();
            }
        }
        @Override
        public CompareResult compare(final CompareRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.compare(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        public void close() {
            handleConnectionClosed();
            connection.close();
        }
        @Override
        public CompareResult compare(final String name, final String attributeDescription,
                final String assertionValue) throws ErrorResultException {
            try {
                return timestamp(connection.compare(name, attributeDescription, assertionValue));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        public void close(final UnbindRequest request, final String reason) {
            handleConnectionClosed();
            connection.close(request, reason);
        }
        @Override
        public FutureResult<CompareResult> compareAsync(final CompareRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super CompareResult> resultHandler) {
            return connection.compareAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public Result delete(final DeleteRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.delete(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result delete(final String name) throws ErrorResultException {
            try {
                return timestamp(connection.delete(name));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            if (checkState(resultHandler)) {
                final WrappedResultHandler<CompareResult> h = wrap(resultHandler);
                return checkState(connection.compareAsync(request, intermediateResponseHandler, h),
                        h);
            } else {
                return newConnectionErrorFuture();
            }
        }
@@ -282,61 +638,12 @@
        public FutureResult<Result> deleteAsync(final DeleteRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super Result> resultHandler) {
            return connection.deleteAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request)
                throws ErrorResultException {
            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
            if (isStartTLS) {
                acquireBindOrStartTLSLock();
            }
            try {
                return timestamp(connection.extendedRequest(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                if (isStartTLS) {
                    releaseBindOrStartTLSLock();
                }
            }
        }
        @Override
        public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request,
                final IntermediateResponseHandler handler) throws ErrorResultException {
            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
            if (isStartTLS) {
                acquireBindOrStartTLSLock();
            }
            try {
                return timestamp(connection.extendedRequest(request, handler));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                if (isStartTLS) {
                    releaseBindOrStartTLSLock();
                }
            }
        }
        @Override
        public GenericExtendedResult extendedRequest(final String requestName,
                final ByteString requestValue) throws ErrorResultException {
            final boolean isStartTLS = requestName.equals(StartTLSExtendedRequest.OID);
            if (isStartTLS) {
                acquireBindOrStartTLSLock();
            }
            try {
                return timestamp(connection.extendedRequest(requestName, requestValue));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                if (isStartTLS) {
                    releaseBindOrStartTLSLock();
                }
            if (checkState(resultHandler)) {
                final WrappedResultHandler<Result> h = wrap(resultHandler);
                return checkState(connection.deleteAsync(request, intermediateResponseHandler, h),
                        h);
            } else {
                return newConnectionErrorFuture();
            }
        }
@@ -345,129 +652,101 @@
                final ExtendedRequest<R> request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super R> resultHandler) {
            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
            if (isStartTLS) {
                if (sync.tryLockShared()) {
                    // Fast path
                    return connection.extendedRequestAsync(request, intermediateResponseHandler,
                            timestamper(resultHandler, true));
                } else {
                    /*
                     * A heart beat must be in progress so create a runnable
                     * task which will be executed when the heart beat
                     * completes.
                     */
                    final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
                        @Override
                        public FutureResult<R> dispatch() {
                            return connection.extendedRequestAsync(request,
                                    intermediateResponseHandler, timestamper(this, true));
                        }
                    };
            if (checkState(resultHandler)) {
                if (isStartTLSRequest(request)) {
                    if (sync.tryLockShared()) {
                        // Fast path
                        final WrappedBindOrStartTLSResultHandler<R> h =
                                wrapForBindOrStartTLS(resultHandler);
                        return checkState(connection.extendedRequestAsync(request,
                                intermediateResponseHandler, h), h);
                    } else {
                        /*
                         * A heart beat must be in progress so create a runnable
                         * task which will be executed when the heart beat
                         * completes.
                         */
                        final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
                            @Override
                            public FutureResult<R> dispatch() {
                                final WrappedBindOrStartTLSResultHandler<R> h =
                                        wrapForBindOrStartTLS(this);
                                return checkState(connection.extendedRequestAsync(request,
                                        intermediateResponseHandler, h), h);
                            }
                        };
                    /*
                     * Enqueue and flush if the heart beat has completed in the
                     * mean time.
                     */
                    pendingRequests.offer(future);
                    flushPendingRequests();
                    return future;
                        /*
                         * Enqueue and flush if the heart beat has completed in
                         * the mean time.
                         */
                        pendingBindOrStartTLSRequests.offer(future);
                        flushPendingBindOrStartTLSRequests();
                        return future;
                    }
                } else {
                    final WrappedResultHandler<R> h = wrap(resultHandler);
                    return checkState(connection.extendedRequestAsync(request,
                            intermediateResponseHandler, h), h);
                }
            } else {
                return connection.extendedRequestAsync(request, intermediateResponseHandler,
                        timestamper(resultHandler));
                return newConnectionErrorFuture();
            }
        }
        @Override
        public void handleConnectionClosed() {
            notifyClosed();
            if (state.notifyConnectionClosed()) {
                failPendingResults(newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED,
                        HBCF_CONNECTION_CLOSED_BY_CLIENT.get()));
                synchronized (validConnections) {
                    connection.removeConnectionEventListener(this);
                    validConnections.remove(this);
                    if (validConnections.isEmpty()) {
                        /*
                         * This is the last active connection, so stop the
                         * heartbeat.
                         */
                        heartBeatFuture.cancel(false);
                    }
                }
            }
        }
        @Override
        public void handleConnectionError(final boolean isDisconnectNotification,
                final ErrorResultException error) {
            notifyClosed();
        }
        @Override
        public boolean handleEntry(final SearchResultEntry entry) {
            updateTimestamp();
            return true;
        }
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage()));
            if (state.notifyConnectionError(isDisconnectNotification, error)) {
                failPendingResults(error);
            }
            updateTimestamp();
            releaseHeartBeatLock();
        }
        @Override
        public boolean handleReference(final SearchResultReference reference) {
            updateTimestamp();
            return true;
        }
        @Override
        public void handleResult(final Result result) {
            updateTimestamp();
            releaseHeartBeatLock();
        }
        @Override
        public void handleUnsolicitedNotification(final ExtendedResult notification) {
            updateTimestamp();
            timestamp(notification);
            state.notifyUnsolicitedNotification(notification);
        }
        @Override
        public boolean isClosed() {
            return state.isClosed();
        }
        @Override
        public boolean isValid() {
            return connection.isValid() && currentTimeMillis() < (timestamp + timeoutMS);
        }
        @Override
        public Result modify(final ModifyRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.modify(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result modify(final String... ldifLines) throws ErrorResultException {
            try {
                return timestamp(connection.modify(ldifLines));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
            return state.isValid() && connection.isValid();
        }
        @Override
        public FutureResult<Result> modifyAsync(final ModifyRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super Result> resultHandler) {
            return connection.modifyAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.modifyDN(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result modifyDN(final String name, final String newRDN) throws ErrorResultException {
            try {
                return timestamp(connection.modifyDN(name, newRDN));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            if (checkState(resultHandler)) {
                final WrappedResultHandler<Result> h = wrap(resultHandler);
                return checkState(connection.modifyAsync(request, intermediateResponseHandler, h),
                        h);
            } else {
                return newConnectionErrorFuture();
            }
        }
@@ -475,121 +754,34 @@
        public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super Result> resultHandler) {
            return connection.modifyDNAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions)
                throws ErrorResultException {
            try {
                return timestamp(connection.readEntry(name, attributeDescriptions));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            if (checkState(resultHandler)) {
                final WrappedResultHandler<Result> h = wrap(resultHandler);
                return checkState(
                        connection.modifyDNAsync(request, intermediateResponseHandler, h), h);
            } else {
                return newConnectionErrorFuture();
            }
        }
        @Override
        public SearchResultEntry readEntry(final String name, final String... attributeDescriptions)
                throws ErrorResultException {
            try {
                return timestamp(connection.readEntry(name, attributeDescriptions));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
                final Collection<String> attributeDescriptions,
                final ResultHandler<? super SearchResultEntry> handler) {
            return connection.readEntryAsync(name, attributeDescriptions, timestamper(handler));
        }
        @Override
        public ConnectionEntryReader search(final SearchRequest request) {
            // Ensure that search results update timestamp.
            return new ConnectionEntryReader(this, request);
        }
        @Override
        public Result search(final SearchRequest request,
                final Collection<? super SearchResultEntry> entries) throws ErrorResultException {
            try {
                return timestamp(connection.search(request, entries));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result search(final SearchRequest request,
                final Collection<? super SearchResultEntry> entries,
                final Collection<? super SearchResultReference> references)
                throws ErrorResultException {
            try {
                return timestamp(connection.search(request, entries, references));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result search(final SearchRequest request, final SearchResultHandler handler)
                throws ErrorResultException {
            try {
                return connection.search(request, timestamper(handler));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public ConnectionEntryReader search(final String baseObject, final SearchScope scope,
                final String filter, final String... attributeDescriptions) {
            // Ensure that search results update timestamp.
            final SearchRequest request =
                    Requests.newSearchRequest(baseObject, scope, filter, attributeDescriptions);
            return new ConnectionEntryReader(this, request);
        public void removeConnectionEventListener(final ConnectionEventListener listener) {
            state.removeConnectionEventListener(listener);
        }
        @Override
        public FutureResult<Result> searchAsync(final SearchRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final SearchResultHandler resultHandler) {
            return connection.searchAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public SearchResultEntry searchSingleEntry(final SearchRequest request)
                throws ErrorResultException {
            try {
                return timestamp(connection.searchSingleEntry(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            if (checkState(resultHandler)) {
                final WrappedSearchResultHandler h = wrap(resultHandler);
                return checkState(connection.searchAsync(request, intermediateResponseHandler, h),
                        h);
            } else {
                return newConnectionErrorFuture();
            }
        }
        @Override
        public SearchResultEntry searchSingleEntry(final String baseObject,
                final SearchScope scope, final String filter, final String... attributeDescriptions)
                throws ErrorResultException {
            try {
                return timestamp(connection.searchSingleEntry(baseObject, scope, filter,
                        attributeDescriptions));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request,
                final ResultHandler<? super SearchResultEntry> handler) {
            return connection.searchSingleEntryAsync(request, timestamper(handler));
        }
        @Override
        public String toString() {
            final StringBuilder builder = new StringBuilder();
            builder.append("HeartBeatConnection(");
@@ -598,24 +790,57 @@
            return builder.toString();
        }
        private void acquireBindOrStartTLSLock() throws ErrorResultException {
            /*
             * Wait for pending heartbeats and prevent new heartbeats from being
             * sent while the bind is in progress.
             */
            try {
                if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
                    // Give up - it looks like the connection is dead.
                    // FIXME: improve error message.
                    throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
        private void checkForHeartBeat() {
            if (sync.isHeldExclusively()) {
                /*
                 * A heart beat is still in progress, but it should have
                 * completed by now. Let's avoid aggressively terminating the
                 * connection, because the heart beat may simply have been
                 * delayed by a sudden surge of activity. Therefore, only flag
                 * the connection as failed if no activity has been seen on the
                 * connection since the heart beat was sent.
                 */
                final long currentTimeMillis = timeSource.currentTimeMillis();
                if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) {
                    if (DEBUG_LOG.isLoggable(Level.WARNING)) {
                        DEBUG_LOG.warning(String.format(
                                "No heartbeat detected for connection '%s'", connection));
                    }
                    handleConnectionError(false, newHeartBeatTimeoutError());
                }
            } catch (final InterruptedException e) {
                throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
            }
        }
        private void flushPendingRequests() {
            if (!pendingRequests.isEmpty()) {
        private <R> FutureResult<R> checkState(final FutureResult<R> future,
                final AbstractWrappedResultHandler<R, ? extends ResultHandler<? super R>> h) {
            h.setInnerFuture(future);
            checkState(h);
            return h;
        }
        private boolean checkState(final ResultHandler<?> h) {
            final ErrorResultException error = state.getConnectionError();
            if (error != null) {
                h.handleErrorResult(error);
                return false;
            } else {
                return true;
            }
        }
        private void failPendingResults(final ErrorResultException error) {
            /*
             * Peek instead of pool because notification is responsible for
             * removing the element from the queue.
             */
            AbstractWrappedResultHandler<?, ?> pendingResult;
            while ((pendingResult = pendingResults.peek()) != null) {
                pendingResult.handleErrorResult(error);
            }
        }
        private void flushPendingBindOrStartTLSRequests() {
            if (!pendingBindOrStartTLSRequests.isEmpty()) {
                /*
                 * The pending requests will acquire the shared lock, but we
                 * take it here anyway to ensure that pending requests do not
@@ -624,7 +849,8 @@
                if (sync.tryLockShared()) {
                    try {
                        Runnable pendingRequest;
                        while ((pendingRequest = pendingRequests.poll()) != null) {
                        while ((pendingRequest = pendingBindOrStartTLSRequests.poll()) != null) {
                            // Dispatch the waiting request. This will not block.
                            pendingRequest.run();
                        }
                    } finally {
@@ -634,18 +860,12 @@
            }
        }
        private void notifyClosed() {
            synchronized (activeConnections) {
                connection.removeConnectionEventListener(this);
                activeConnections.remove(this);
                if (activeConnections.isEmpty()) {
                    /*
                     * This is the last active connection, so stop the
                     * heartbeat.
                     */
                    heartBeatFuture.cancel(false);
                }
            }
        private boolean isStartTLSRequest(final ExtendedRequest<?> request) {
            return request.getOID().equals(StartTLSExtendedRequest.OID);
        }
        private <R> CompletedFutureResult<R> newConnectionErrorFuture() {
            return new CompletedFutureResult<R>(state.getConnectionError());
        }
        private void releaseBindOrStartTLSLock() {
@@ -654,27 +874,43 @@
        private void releaseHeartBeatLock() {
            sync.unlockExclusively();
            flushPendingRequests();
            flushPendingBindOrStartTLSRequests();
        }
        private void sendHeartBeat() {
        /**
         * Sends a heart beat on this connection if required to do so.
         *
         * @return {@code true} if a heart beat was sent, otherwise
         *         {@code false}.
         */
        private boolean sendHeartBeat() {
            /*
             * Only send the heartbeat if the connection has been idle for some
             * Don't attempt to send a heart beat if the connection has already
             * failed.
             */
            if (!state.isValid()) {
                return false;
            }
            /*
             * Only send the heart beat if the connection has been idle for some
             * time.
             */
            if (currentTimeMillis() < (timestamp + minDelayMS)) {
                return;
            final long currentTimeMillis = timeSource.currentTimeMillis();
            if (currentTimeMillis < (lastResponseTimestamp + minDelayMS)) {
                return false;
            }
            /*
             * Don't send a heart beat if there is already a heart beat, bind,
             * or startTLS in progress. Note that the bind/startTLS response
             * will update the timestamp as if it were a heart beat.
             * will update the lastResponseTimestamp as if it were a heart beat.
             */
            if (sync.tryLockExclusively()) {
                try {
                    connection.searchAsync(heartBeatRequest, null, this);
                } catch (final Exception e) {
                    connection.searchAsync(heartBeatRequest, null, heartBeatHandler);
                    return true;
                } catch (final IllegalStateException e) {
                    /*
                     * This may happen when we attempt to send the heart beat
                     * just after the connection is closed but before we are
@@ -688,74 +924,34 @@
                    releaseHeartBeatLock();
                }
            }
            return false;
        }
        private <R> R timestamp(final R response) {
            updateTimestamp();
            if (!(response instanceof ConnectionException)) {
                lastResponseTimestamp = timeSource.currentTimeMillis();
            }
            return response;
        }
        private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler) {
            return timestamper(handler, false);
        private <R> WrappedResultHandler<R> wrap(final ResultHandler<? super R> handler) {
            final WrappedResultHandler<R> h = new WrappedResultHandler<R>(handler);
            pendingResults.add(h);
            return h;
        }
        private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler,
                final boolean isBindOrStartTLS) {
            return new ResultHandler<R>() {
                @Override
                public void handleErrorResult(final ErrorResultException error) {
                    releaseIfNeeded();
                    if (handler != null) {
                        handler.handleErrorResult(timestamp(error));
                    } else {
                        timestamp(error);
                    }
                }
                @Override
                public void handleResult(final R result) {
                    releaseIfNeeded();
                    if (handler != null) {
                        handler.handleResult(timestamp(result));
                    } else {
                        timestamp(result);
                    }
                }
                private void releaseIfNeeded() {
                    if (isBindOrStartTLS) {
                        releaseBindOrStartTLSLock();
                    }
                }
            };
        private WrappedSearchResultHandler wrap(final SearchResultHandler handler) {
            final WrappedSearchResultHandler h = new WrappedSearchResultHandler(handler);
            pendingResults.add(h);
            return h;
        }
        private SearchResultHandler timestamper(final SearchResultHandler handler) {
            return new SearchResultHandler() {
                @Override
                public boolean handleEntry(final SearchResultEntry entry) {
                    return handler.handleEntry(timestamp(entry));
                }
                @Override
                public void handleErrorResult(final ErrorResultException error) {
                    handler.handleErrorResult(timestamp(error));
                }
                @Override
                public boolean handleReference(final SearchResultReference reference) {
                    return handler.handleReference(timestamp(reference));
                }
                @Override
                public void handleResult(final Result result) {
                    handler.handleResult(timestamp(result));
                }
            };
        }
        private void updateTimestamp() {
            timestamp = currentTimeMillis();
        private <R> WrappedBindOrStartTLSResultHandler<R> wrapForBindOrStartTLS(
                final ResultHandler<? super R> handler) {
            final WrappedBindOrStartTLSResultHandler<R> h =
                    new WrappedBindOrStartTLSResultHandler<R>(handler);
            pendingResults.add(h);
            return h;
        }
    }
@@ -791,13 +987,13 @@
     * </ul>
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        /* Lock states. Positive values indicate that the shared lock is taken. */
        private static final int UNLOCKED = 0; // initial state
        private static final int LOCKED_EXCLUSIVELY = -1;
        // Keep compiler quiet.
        private static final long serialVersionUID = -3590428415442668336L;
        /* Lock states. Positive values indicate that the shared lock is taken. */
        private static final int UNLOCKED = 0; // initial state
        @Override
        protected boolean isHeldExclusively() {
            return getState() == LOCKED_EXCLUSIVELY;
@@ -866,10 +1062,6 @@
            return tryAcquireShared(1) > 0;
        }
        boolean tryLockShared(final long timeout, final TimeUnit unit) throws InterruptedException {
            return tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
        void unlockExclusively() {
            release(0 /* unused */);
        }
@@ -880,60 +1072,122 @@
    }
    /**
     * Default heart beat which will target the root DSE but not return any
     * results.
     */
    private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("",
            SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
            SearchScope.BASE_OBJECT, "(objectClass=1.1)", "1.1");
    private final List<ConnectionImpl> activeConnections;
    /**
     * This is package private in order to allow unit tests to inject fake time
     * stamps.
     */
    TimeSource timeSource = TimeSource.DEFAULT;
    /**
     * Scheduled task which checks that all heart beats have been received
     * within the timeout period.
     */
    private final Runnable checkHeartBeatRunnable = new Runnable() {
        @Override
        public void run() {
            for (final ConnectionImpl connection : getValidConnections()) {
                connection.checkForHeartBeat();
            }
        }
    };
    /**
     * Underlying connection factory.
     */
    private final ConnectionFactory factory;
    /**
     * The heartbeat scheduled future - which may be null if heartbeats are not
     * being sent (no valid connections).
     */
    private ScheduledFuture<?> heartBeatFuture;
    /**
     * The heartbeat search request.
     */
    private final SearchRequest heartBeatRequest;
    /**
     * The interval between successive heartbeats.
     */
    private final long interval;
    private final TimeUnit intervalUnit;
    /**
     * Flag which indicates whether this factory has been closed.
     */
    private final AtomicBoolean isClosed = new AtomicBoolean();
    /**
     * The minimum amount of time the connection should remain idle (no
     * responses) before starting to send heartbeats.
     */
    private final long minDelayMS;
    /**
     * The heartbeat scheduler.
     */
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    /**
     * Scheduled task which sends heart beats for all valid idle connections.
     */
    private final Runnable sendHeartBeatRunnable = new Runnable() {
        @Override
        public void run() {
            boolean heartBeatSent = false;
            for (final ConnectionImpl connection : getValidConnections()) {
                heartBeatSent |= connection.sendHeartBeat();
            }
            if (heartBeatSent) {
                scheduler.get().schedule(checkHeartBeatRunnable, timeoutMS, TimeUnit.MILLISECONDS);
            }
        }
    };
    /**
     * The heartbeat timeout in milli-seconds. The connection will be marked as
     * failed if no heartbeat response is received within the timeout.
     */
    private final long timeoutMS;
    private final TimeUnit unit;
    private AtomicBoolean isClosed = new AtomicBoolean();
    HeartBeatConnectionFactory(final ConnectionFactory factory) {
        this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null);
    }
    /**
     * List of valid connections to which heartbeats will be sent.
     */
    private final List<ConnectionImpl> validConnections = new LinkedList<ConnectionImpl>();
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit) {
        this(factory, interval, unit, DEFAULT_SEARCH, null);
    }
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit, final SearchRequest heartBeat) {
        this(factory, interval, unit, heartBeat, null);
    }
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit, final SearchRequest heartBeat,
            final long timeout, final TimeUnit unit, final SearchRequest heartBeat,
            final ScheduledExecutorService scheduler) {
        Validator.ensureNotNull(factory, heartBeat, unit);
        Validator.ensureTrue(interval >= 0, "negative timeout");
        Validator.ensureNotNull(factory, unit);
        Validator.ensureTrue(interval >= 0, "negative interval");
        Validator.ensureTrue(timeout >= 0, "negative timeout");
        this.heartBeatRequest = heartBeat;
        this.heartBeatRequest = heartBeat != null ? heartBeat : DEFAULT_SEARCH;
        this.interval = interval;
        this.unit = unit;
        this.activeConnections = new LinkedList<ConnectionImpl>();
        this.intervalUnit = unit;
        this.factory = factory;
        this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
        this.timeoutMS = unit.toMillis(interval) * 2;
        this.timeoutMS = unit.toMillis(timeout);
        this.minDelayMS = unit.toMillis(interval) / 2;
    }
    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            synchronized (activeConnections) {
                if (!activeConnections.isEmpty()) {
            synchronized (validConnections) {
                if (!validConnections.isEmpty()) {
                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
                        DEBUG_LOG.fine(String.format(
                                "HeartbeatConnectionFactory '%s' is closing while %d "
                                        + "active connections remain", toString(),
                                activeConnections.size()));
                                        + "active connections remain", toString(), validConnections
                                        .size()));
                    }
                }
            }
@@ -944,23 +1198,41 @@
    @Override
    public Connection getConnection() throws ErrorResultException {
        return adaptConnection(factory.getConnection());
        /*
         * Immediately send a heart beat in order to determine if the connected
         * server is responding.
         */
        final Connection connection = factory.getConnection();
        boolean keepConnection = false;
        try {
            connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS,
                    TimeUnit.MILLISECONDS);
            keepConnection = true;
            return adaptConnection(connection);
        } catch (final Exception e) {
            throw adaptHeartBeatError(e);
        } finally {
            if (!keepConnection) {
                connection.close();
            }
        }
    }
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        final FutureResultTransformer<Connection, Connection> future =
                new FutureResultTransformer<Connection, Connection>(handler) {
                    @Override
                    protected Connection transformResult(final Connection connection)
                            throws ErrorResultException {
                        return adaptConnection(connection);
                    }
                };
        // Create a future responsible for chaining the initial heartbeat search.
        final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler);
        future.setFutureResult(factory.getConnectionAsync(future));
        return future;
        // Request a connection.
        final FutureResult<Connection> connectionFuture =
                factory.getConnectionAsync(compositeFuture.futureConnectionResult);
        // Set the connection future in the composite so that the returned search future can delegate.
        compositeFuture.futureConnectionResult.setFutureResult(connectionFuture);
        // Return the future representing the heartbeat.
        return compositeFuture.futureSearchResult;
    }
    @Override
@@ -973,26 +1245,42 @@
    }
    private Connection adaptConnection(final Connection connection) {
        final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection);
        synchronized (activeConnections) {
            connection.addConnectionEventListener(heartBeatConnection);
            if (activeConnections.isEmpty()) {
        synchronized (validConnections) {
            final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection);
            if (validConnections.isEmpty()) {
                /* This is the first active connection, so start the heart beat. */
                heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() {
                    @Override
                    public void run() {
                        final ConnectionImpl[] tmp;
                        synchronized (activeConnections) {
                            tmp = activeConnections.toArray(new ConnectionImpl[0]);
                        }
                        for (final ConnectionImpl connection : tmp) {
                            connection.sendHeartBeat();
                        }
                    }
                }, 0, interval, unit);
                heartBeatFuture =
                        scheduler.get().scheduleWithFixedDelay(sendHeartBeatRunnable, 0, interval,
                                intervalUnit);
            }
            activeConnections.add(heartBeatConnection);
            validConnections.add(heartBeatConnection);
            return heartBeatConnection;
        }
        return heartBeatConnection;
    }
    private ErrorResultException adaptHeartBeatError(final Exception error) {
        if (error instanceof ConnectionException) {
            return (ErrorResultException) error;
        } else if (error instanceof TimeoutResultException || error instanceof TimeoutException) {
            return newHeartBeatTimeoutError();
        } else if (error instanceof InterruptedException) {
            return newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, error);
        } else {
            return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_FAILED.get(),
                    error);
        }
    }
    private ConnectionImpl[] getValidConnections() {
        final ConnectionImpl[] tmp;
        synchronized (validConnections) {
            tmp = validConnections.toArray(new ConnectionImpl[0]);
        }
        return tmp;
    }
    private ErrorResultException newHeartBeatTimeoutError() {
        return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_TIMEOUT
                .get(timeoutMS));
    }
}