mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
19.18.2012 fa5af37362b2f9b1fd07e8efbefe4b45a8088229
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,19 +27,39 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static java.lang.System.currentTimeMillis;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.GenericExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import org.forgerock.opendj.ldif.ConnectionEntryReader;
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.ConnectionDecorator;
import com.forgerock.opendj.util.FutureResultTransformer;
import com.forgerock.opendj.util.StaticUtils;
@@ -55,15 +75,288 @@
     */
    private final class ConnectionImpl extends ConnectionDecorator implements
            ConnectionEventListener, SearchResultHandler {
        private long lastSuccessfulPing;
        private FutureResult<Result> lastPingFuture;
        /**
         * Runs pending request once the shared lock becomes available (when no
         * heart beat is in progress).
         *
         * @param <R>
         *            The type of result returned by the request.
         */
        private abstract class DelayedFuture<R extends Result> extends AsynchronousFutureResult<R>
                implements Runnable {
            private volatile FutureResult<R> innerFuture = null;
            protected DelayedFuture(final ResultHandler<? super R> handler) {
                super(handler);
            }
            @Override
            public final int getRequestID() {
                return innerFuture != null ? innerFuture.getRequestID() : -1;
            }
            @Override
            public final void run() {
                if (!isCancelled()) {
                    sync.lockShared(); // Will not block.
                    innerFuture = dispatch();
                    if (isCancelled() && !innerFuture.isCancelled()) {
                        innerFuture.cancel(false);
                    }
                }
            }
            protected abstract FutureResult<R> dispatch();
            @Override
            protected final ErrorResultException handleCancelRequest(
                    final boolean mayInterruptIfRunning) {
                if (innerFuture != null) {
                    innerFuture.cancel(mayInterruptIfRunning);
                }
                return null;
            }
        }
        // List of pending Bind or StartTLS requests which must be invoked
        // when the current heart beat completes.
        private List<Runnable> pendingRequests = null;
        private final Object pendingRequestsLock = new Object();
        // Coordinates heart-beats with Bind and StartTLS requests.
        private final Sync sync = new Sync();
        // Timestamp of last response received (any response, not just heart beats).
        private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
        private ConnectionImpl(final Connection connection) {
            super(connection);
        }
        @Override
        public Result add(final AddRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.add(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result add(final Entry entry) throws ErrorResultException {
            try {
                return timestamp(connection.add(entry));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result add(final String... ldifLines) throws ErrorResultException {
            try {
                return timestamp(connection.add(ldifLines));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<Result> addAsync(final AddRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super Result> resultHandler) {
            return connection.addAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public BindResult bind(final BindRequest request) throws ErrorResultException {
            acquireBindOrStartTLSLock();
            try {
                return timestamp(connection.bind(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                releaseBindOrStartTLSLock();
            }
        }
        @Override
        public BindResult bind(final String name, final char[] password)
                throws ErrorResultException {
            acquireBindOrStartTLSLock();
            try {
                return timestamp(connection.bind(name, password));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                releaseBindOrStartTLSLock();
            }
        }
        @Override
        public FutureResult<BindResult> bindAsync(final BindRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super BindResult> resultHandler) {
            if (sync.tryLockShared()) {
                // Fast path
                return connection.bindAsync(request, intermediateResponseHandler, timestamper(
                        resultHandler, true));
            } else {
                // A heart beat must be in progress so create a runnable task
                // which will be executed when the heart beat completes.
                final DelayedFuture<BindResult> future =
                        new DelayedFuture<BindResult>(resultHandler) {
                            @Override
                            public FutureResult<BindResult> dispatch() {
                                return connection.bindAsync(request, intermediateResponseHandler,
                                        timestamper(this, true));
                            }
                        };
                addPendingRequest(future);
                return future;
            }
        }
        @Override
        public CompareResult compare(final CompareRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.compare(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public CompareResult compare(final String name, final String attributeDescription,
                final String assertionValue) throws ErrorResultException {
            try {
                return timestamp(connection.compare(name, attributeDescription, assertionValue));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<CompareResult> compareAsync(final CompareRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super CompareResult> resultHandler) {
            return connection.compareAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public Result delete(final DeleteRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.delete(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result delete(final String name) throws ErrorResultException {
            try {
                return timestamp(connection.delete(name));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<Result> deleteAsync(final DeleteRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super Result> resultHandler) {
            return connection.deleteAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request)
                throws ErrorResultException {
            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
            if (isStartTLS) {
                acquireBindOrStartTLSLock();
            }
            try {
                return timestamp(connection.extendedRequest(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                if (isStartTLS) {
                    releaseBindOrStartTLSLock();
                }
            }
        }
        @Override
        public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request,
                final IntermediateResponseHandler handler) throws ErrorResultException {
            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
            if (isStartTLS) {
                acquireBindOrStartTLSLock();
            }
            try {
                return timestamp(connection.extendedRequest(request, handler));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                if (isStartTLS) {
                    releaseBindOrStartTLSLock();
                }
            }
        }
        @Override
        public GenericExtendedResult extendedRequest(final String requestName,
                final ByteString requestValue) throws ErrorResultException {
            final boolean isStartTLS = requestName.equals(StartTLSExtendedRequest.OID);
            if (isStartTLS) {
                acquireBindOrStartTLSLock();
            }
            try {
                return timestamp(connection.extendedRequest(requestName, requestValue));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            } finally {
                if (isStartTLS) {
                    releaseBindOrStartTLSLock();
                }
            }
        }
        @Override
        public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
                final ExtendedRequest<R> request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super R> resultHandler) {
            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
            if (isStartTLS) {
                if (sync.tryLockShared()) {
                    // Fast path
                    return connection.extendedRequestAsync(request, intermediateResponseHandler,
                            timestamper(resultHandler, true));
                } else {
                    // A heart beat must be in progress so create a runnable task
                    // which will be executed when the heart beat completes.
                    final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
                        @Override
                        public FutureResult<R> dispatch() {
                            return connection.extendedRequestAsync(request,
                                    intermediateResponseHandler, timestamper(this, true));
                        }
                    };
                    addPendingRequest(future);
                    return future;
                }
            } else {
                return connection.extendedRequestAsync(request, intermediateResponseHandler,
                        timestamper(resultHandler));
            }
        }
        @Override
        public void handleConnectionClosed() {
            notifyClosed();
        }
@@ -74,58 +367,205 @@
            notifyClosed();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public boolean handleEntry(final SearchResultEntry entry) {
            // Ignore.
            updateTimestamp();
            return true;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: " + error);
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage()));
            }
            updateTimestamp();
            releaseHeartBeatLock();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public boolean handleReference(final SearchResultReference reference) {
            // Ignore.
            updateTimestamp();
            return true;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void handleResult(final Result result) {
            lastSuccessfulPing = System.currentTimeMillis();
            updateTimestamp();
            releaseHeartBeatLock();
        }
        @Override
        public void handleUnsolicitedNotification(final ExtendedResult notification) {
            // Do nothing
            updateTimestamp();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public boolean isValid() {
            return connection.isValid()
                    && (lastSuccessfulPing <= 0 || System.currentTimeMillis() - lastSuccessfulPing < unit
                            .toMillis(interval) * 2);
            return connection.isValid() && currentTimeMillis() < (timestamp + timeoutMS);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public Result modify(final ModifyRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.modify(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result modify(final String... ldifLines) throws ErrorResultException {
            try {
                return timestamp(connection.modify(ldifLines));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<Result> modifyAsync(final ModifyRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super Result> resultHandler) {
            return connection.modifyAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException {
            try {
                return timestamp(connection.modifyDN(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result modifyDN(final String name, final String newRDN) throws ErrorResultException {
            try {
                return timestamp(connection.modifyDN(name, newRDN));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final ResultHandler<? super Result> resultHandler) {
            return connection.modifyDNAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions)
                throws ErrorResultException {
            try {
                return timestamp(connection.readEntry(name, attributeDescriptions));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public SearchResultEntry readEntry(final String name, final String... attributeDescriptions)
                throws ErrorResultException {
            try {
                return timestamp(connection.readEntry(name, attributeDescriptions));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
                final Collection<String> attributeDescriptions,
                final ResultHandler<? super SearchResultEntry> handler) {
            return connection.readEntryAsync(name, attributeDescriptions, timestamper(handler));
        }
        @Override
        public ConnectionEntryReader search(final SearchRequest request) {
            // Ensure that search results update timestamp.
            return new ConnectionEntryReader(this, request);
        }
        @Override
        public Result search(final SearchRequest request,
                final Collection<? super SearchResultEntry> entries) throws ErrorResultException {
            try {
                return timestamp(connection.search(request, entries));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result search(final SearchRequest request,
                final Collection<? super SearchResultEntry> entries,
                final Collection<? super SearchResultReference> references)
                throws ErrorResultException {
            try {
                return timestamp(connection.search(request, entries, references));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public Result search(final SearchRequest request, final SearchResultHandler handler)
                throws ErrorResultException {
            try {
                return connection.search(request, timestamper(handler));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public ConnectionEntryReader search(final String baseObject, final SearchScope scope,
                final String filter, final String... attributeDescriptions) {
            // Ensure that search results update timestamp.
            final SearchRequest request =
                    Requests.newSearchRequest(baseObject, scope, filter, attributeDescriptions);
            return new ConnectionEntryReader(this, request);
        }
        @Override
        public FutureResult<Result> searchAsync(final SearchRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final SearchResultHandler resultHandler) {
            return connection.searchAsync(request, intermediateResponseHandler,
                    timestamper(resultHandler));
        }
        @Override
        public SearchResultEntry searchSingleEntry(final SearchRequest request)
                throws ErrorResultException {
            try {
                return timestamp(connection.searchSingleEntry(request));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public SearchResultEntry searchSingleEntry(final String baseObject,
                final SearchScope scope, final String filter, final String... attributeDescriptions)
                throws ErrorResultException {
            try {
                return timestamp(connection.searchSingleEntry(baseObject, scope, filter,
                        attributeDescriptions));
            } catch (final ErrorResultException e) {
                throw timestamp(e);
            }
        }
        @Override
        public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request,
                final ResultHandler<? super SearchResultEntry> handler) {
            return connection.searchSingleEntryAsync(request, timestamper(handler));
        }
        @Override
        public String toString() {
            final StringBuilder builder = new StringBuilder();
@@ -135,72 +575,311 @@
            return builder.toString();
        }
        private void acquireBindOrStartTLSLock() throws ErrorResultException {
            // Wait for pending heartbeats and prevent new heartbeats from
            // being sent while the bind is in progress.
            try {
                if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
                    // Give up - it looks like the connection is dead.
                    // FIXME: improve error message.
                    throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
                }
            } catch (final InterruptedException e) {
                throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
            }
        }
        private void addPendingRequest(final DelayedFuture<? extends Result> runner) {
            List<Runnable> tmp = null;
            synchronized (pendingRequestsLock) {
                if (pendingRequests == null) {
                    pendingRequests = new LinkedList<Runnable>();
                }
                pendingRequests.add(runner);
                // The heart beat may have completed in which case we must try
                // to invoke the pending request(s) now so that they are not left
                // stranded. Keep the lock until the requests have been dispatched
                // to avoid becoming blocked during the dispatch when the runner
                // attempts to acquire the shared lock.
                if (sync.tryLockShared()) {
                    tmp = pendingRequests;
                    pendingRequests = null;
                }
            }
            if (tmp != null) {
                try {
                    for (final Runnable pendingRequest : tmp) {
                        pendingRequest.run();
                    }
                } finally {
                    sync.unlockShared();
                }
            }
        }
        private void notifyClosed() {
            synchronized (activeConnections) {
                connection.removeConnectionEventListener(this);
                activeConnections.remove(this);
                if (activeConnections.isEmpty()) {
                    // This is the last active connection, so stop the heart
                    // beat.
                    // This is the last active connection, so stop the heartbeat.
                    heartBeatFuture.cancel(false);
                }
            }
        }
    }
    private final class FutureResultImpl extends FutureResultTransformer<Connection, Connection>
            implements ResultHandler<Connection> {
        private FutureResultImpl(final ResultHandler<? super Connection> handler) {
            super(handler);
        private void releaseBindOrStartTLSLock() {
            sync.unlockShared();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        protected Connection transformResult(final Connection connection)
                throws ErrorResultException {
            return adaptConnection(connection);
        }
    }
    private final class HeartBeatRunnable implements Runnable {
        private HeartBeatRunnable() {
            // Nothing to do.
        }
        @Override
        public void run() {
            synchronized (activeConnections) {
                for (final ConnectionImpl connection : activeConnections) {
                    if (connection.lastPingFuture == null || connection.lastPingFuture.isDone()) {
                        connection.lastPingFuture =
                                connection.searchAsync(heartBeat, null, connection);
        private void releaseHeartBeatLock() {
            sync.unlockExclusively();
            List<Runnable> tmp = null;
            synchronized (pendingRequestsLock) {
                if (pendingRequests != null) {
                    // Invoke any pending request(s). Keep the lock until the requests
                    // have been dispatched to avoid becoming blocked during the dispatch
                    // when the runner attempts to acquire the shared lock.
                    if (sync.tryLockShared()) {
                        tmp = pendingRequests;
                        pendingRequests = null;
                    }
                }
            }
            if (tmp != null) {
                try {
                    for (final Runnable pendingRequest : tmp) {
                        pendingRequest.run();
                    }
                } finally {
                    sync.unlockShared();
                }
            }
        }
        private void sendHeartBeat() {
            // Only send the heartbeat if the connection has been idle for some time.
            if (currentTimeMillis() < (timestamp + minDelayMS)) {
                return;
            }
            // Don't send a heart beat if there is already a heart beat,
            // bind, or startTLS in progress. Note that the bind/startTLS
            // response will update the timestamp as if it were a heart beat.
            if (sync.tryLockExclusively()) {
                try {
                    searchAsync(heartBeatRequest, null, this);
                } catch (final Exception e) {
                    // This may happen when we attempt to send the heart beat just
                    // after the connection is closed but before we are notified.
                    // Release the lock because we're never going to get a response.
                    releaseHeartBeatLock();
                }
            }
        }
        private <R> R timestamp(final R response) {
            updateTimestamp();
            return response;
        }
        private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler) {
            return timestamper(handler, false);
        }
        private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler,
                final boolean isBindOrStartTLS) {
            return new ResultHandler<R>() {
                @Override
                public void handleErrorResult(final ErrorResultException error) {
                    releaseIfNeeded();
                    if (handler != null) {
                        handler.handleErrorResult(timestamp(error));
                    } else {
                        timestamp(error);
                    }
                }
                @Override
                public void handleResult(final R result) {
                    releaseIfNeeded();
                    if (handler != null) {
                        handler.handleResult(timestamp(result));
                    } else {
                        timestamp(result);
                    }
                }
                private void releaseIfNeeded() {
                    if (isBindOrStartTLS) {
                        releaseBindOrStartTLSLock();
                    }
                }
            };
        }
        private SearchResultHandler timestamper(final SearchResultHandler handler) {
            return new SearchResultHandler() {
                @Override
                public boolean handleEntry(final SearchResultEntry entry) {
                    return handler.handleEntry(timestamp(entry));
                }
                @Override
                public void handleErrorResult(final ErrorResultException error) {
                    handler.handleErrorResult(timestamp(error));
                }
                @Override
                public boolean handleReference(final SearchResultReference reference) {
                    return handler.handleReference(timestamp(reference));
                }
                @Override
                public void handleResult(final Result result) {
                    handler.handleResult(timestamp(result));
                }
            };
        }
        private void updateTimestamp() {
            timestamp = currentTimeMillis();
        }
    }
    private final SearchRequest heartBeat;
    /**
     * This synchronizer prevents Bind or StartTLS operations from being
     * processed concurrently with heart-beats. This is required because the
     * LDAP protocol specifically states that servers receiving a Bind operation
     * should either wait for existing operations to complete or abandon them.
     * The same presumably applies to StartTLS operations. Note that concurrent
     * bind/StartTLS operations are not permitted.
     * <p>
     * This connection factory only coordinates Bind and StartTLS requests with
     * heart-beats. It does not attempt to prevent or control attempts to send
     * multiple concurrent Bind or StartTLS operations, etc.
     * <p>
     * This synchronizer can be thought of as cross between a read-write lock
     * and a semaphore. Unlike a read-write lock there is no requirement that a
     * thread releasing a lock must hold it. In addition, this synchronizer does
     * not support reentrancy. A thread attempting to acquire exclusively more
     * than once will deadlock, and a thread attempting to acquire shared more
     * than once will succeed and be required to release an equivalent number of
     * times.
     * <p>
     * The synchronizer has three states:
     * <ul>
     * <li>UNLOCKED(0) - the synchronizer may be acquired shared or exclusively
     * <li>LOCKED_EXCLUSIVELY(-1) - the synchronizer is held exclusively and
     * cannot be acquired shared or exclusively. An exclusive lock is held while
     * a heart beat is in progress
     * <li>LOCKED_SHARED(>0) - the synchronizer is held shared and cannot be
     * acquired exclusively. N shared locks are held while N Bind or StartTLS
     * operations are in progress.
     * </ul>
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        // Lock states. Positive values indicate that the shared lock is taken.
        private static final int UNLOCKED = 0; // initial state
        private static final int LOCKED_EXCLUSIVELY = -1;
    private final long interval;
        // Keep compiler quiet.
        private static final long serialVersionUID = -3590428415442668336L;
    private final ScheduledExecutorService scheduler;
        @Override
        protected boolean isHeldExclusively() {
            return getState() == LOCKED_EXCLUSIVELY;
        }
    private final TimeUnit unit;
        @Override
        protected boolean tryAcquire(final int ignored) {
            if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    private final List<ConnectionImpl> activeConnections;
        @Override
        protected int tryAcquireShared(final int readers) {
            for (;;) {
                final int state = getState();
                if (state == LOCKED_EXCLUSIVELY) {
                    return LOCKED_EXCLUSIVELY; // failed
                }
                final int newState = state + readers;
                if (compareAndSetState(state, newState)) {
                    return newState; // succeeded + more readers allowed
                }
            }
        }
    private final ConnectionFactory factory;
        @Override
        protected boolean tryRelease(final int ignored) {
            if (getState() != LOCKED_EXCLUSIVELY) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(UNLOCKED);
            return true;
        }
        @Override
        protected boolean tryReleaseShared(final int ignored) {
            for (;;) {
                final int state = getState();
                if (state == UNLOCKED || state == LOCKED_EXCLUSIVELY) {
                    throw new IllegalMonitorStateException();
                }
                final int newState = state - 1;
                if (compareAndSetState(state, newState)) {
                    // We could always return true here, but since there cannot
                    // be waiting readers we can specialize for waiting writers.
                    return newState == UNLOCKED;
                }
            }
        }
        void lockShared() {
            acquireShared(1);
        }
        boolean tryLockExclusively() {
            return tryAcquire(0 /* unused */);
        }
        boolean tryLockShared() {
            return tryAcquireShared(1) > 0;
        }
        boolean tryLockShared(final long timeout, final TimeUnit unit) throws InterruptedException {
            return tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
        void unlockExclusively() {
            release(0 /* unused */);
        }
        void unlockShared() {
            releaseShared(0 /* unused */);
        }
    }
    private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("",
            SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
    private final List<ConnectionImpl> activeConnections;
    private final ConnectionFactory factory;
    private ScheduledFuture<?> heartBeatFuture;
    private final SearchRequest heartBeatRequest;
    private final long interval;
    private final long minDelayMS;
    private final ScheduledExecutorService scheduler;
    private final long timeoutMS;
    private final TimeUnit unit;
    /**
     * Creates a new heart-beat connection factory which will create connections
@@ -277,12 +956,14 @@
        Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
        Validator.ensureTrue(interval >= 0, "negative timeout");
        this.heartBeat = heartBeat;
        this.heartBeatRequest = heartBeat;
        this.interval = interval;
        this.unit = unit;
        this.activeConnections = new LinkedList<ConnectionImpl>();
        this.factory = factory;
        this.scheduler = scheduler;
        this.timeoutMS = unit.toMillis(interval) * 2;
        this.minDelayMS = unit.toMillis(interval) / 2;
    }
    /**
@@ -299,7 +980,15 @@
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        final FutureResultImpl future = new FutureResultImpl(handler);
        final FutureResultTransformer<Connection, Connection> future =
                new FutureResultTransformer<Connection, Connection>(handler) {
                    @Override
                    protected Connection transformResult(final Connection connection)
                            throws ErrorResultException {
                        return adaptConnection(connection);
                    }
                };
        future.setFutureResult(factory.getConnectionAsync(future));
        return future;
    }
@@ -322,9 +1011,18 @@
            connection.addConnectionEventListener(heartBeatConnection);
            if (activeConnections.isEmpty()) {
                // This is the first active connection, so start the heart beat.
                heartBeatFuture =
                        scheduler
                                .scheduleWithFixedDelay(new HeartBeatRunnable(), 0, interval, unit);
                heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
                    @Override
                    public void run() {
                        final ConnectionImpl[] tmp;
                        synchronized (activeConnections) {
                            tmp = activeConnections.toArray(new ConnectionImpl[0]);
                        }
                        for (final ConnectionImpl connection : tmp) {
                            connection.sendHeartBeat();
                        }
                    }
                }, 0, interval, unit);
            }
            activeConnections.add(heartBeatConnection);
        }