From fa5af37362b2f9b1fd07e8efbefe4b45a8088229 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 19 Dec 2012 17:18:53 +0000
Subject: [PATCH] Fix OPENDJ-660: HeartbeatConnectionFactory should avoid doing heart-beats and Bind/StartTLS operations concurrently

---
 opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java         |   12 
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java |  840 +++++++++++++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 776 insertions(+), 76 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java b/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
index 016815d..ea156ff 100644
--- a/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
+++ b/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -452,11 +452,13 @@
             final int remotePort = Integer.parseInt(args[i + 1]);
 
             factories.add(Connections.newFixedConnectionPool(Connections
-                    .newAuthenticatedConnectionFactory(new LDAPConnectionFactory(remoteAddress,
-                            remotePort), Requests.newSimpleBindRequest(proxyDN, proxyPassword
-                            .toCharArray())), Integer.MAX_VALUE));
-            bindFactories.add(Connections.newFixedConnectionPool(new LDAPConnectionFactory(
-                    remoteAddress, remotePort), Integer.MAX_VALUE));
+                    .newAuthenticatedConnectionFactory(Connections
+                            .newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
+                                    remotePort)), Requests.newSimpleBindRequest(proxyDN,
+                            proxyPassword.toCharArray())), Integer.MAX_VALUE));
+            bindFactories.add(Connections.newFixedConnectionPool(Connections
+                    .newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
+                            remotePort)), Integer.MAX_VALUE));
         }
         final RoundRobinLoadBalancingAlgorithm algorithm =
                 new RoundRobinLoadBalancingAlgorithm(factories);
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 9a36dba..b3edf1c 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,19 +27,39 @@
 
 package org.forgerock.opendj.ldap;
 
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static java.lang.System.currentTimeMillis;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.logging.Level;
 
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
 import org.forgerock.opendj.ldap.requests.Requests;
 import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.GenericExtendedResult;
 import org.forgerock.opendj.ldap.responses.Result;
 import org.forgerock.opendj.ldap.responses.SearchResultEntry;
 import org.forgerock.opendj.ldap.responses.SearchResultReference;
+import org.forgerock.opendj.ldif.ConnectionEntryReader;
 
+import com.forgerock.opendj.util.AsynchronousFutureResult;
 import com.forgerock.opendj.util.ConnectionDecorator;
 import com.forgerock.opendj.util.FutureResultTransformer;
 import com.forgerock.opendj.util.StaticUtils;
@@ -55,15 +75,288 @@
      */
     private final class ConnectionImpl extends ConnectionDecorator implements
             ConnectionEventListener, SearchResultHandler {
-        private long lastSuccessfulPing;
 
-        private FutureResult<Result> lastPingFuture;
+        /**
+         * Runs pending request once the shared lock becomes available (when no
+         * heart beat is in progress).
+         *
+         * @param <R>
+         *            The type of result returned by the request.
+         */
+        private abstract class DelayedFuture<R extends Result> extends AsynchronousFutureResult<R>
+                implements Runnable {
+            private volatile FutureResult<R> innerFuture = null;
+
+            protected DelayedFuture(final ResultHandler<? super R> handler) {
+                super(handler);
+            }
+
+            @Override
+            public final int getRequestID() {
+                return innerFuture != null ? innerFuture.getRequestID() : -1;
+            }
+
+            @Override
+            public final void run() {
+                if (!isCancelled()) {
+                    sync.lockShared(); // Will not block.
+                    innerFuture = dispatch();
+                    if (isCancelled() && !innerFuture.isCancelled()) {
+                        innerFuture.cancel(false);
+                    }
+                }
+            }
+
+            protected abstract FutureResult<R> dispatch();
+
+            @Override
+            protected final ErrorResultException handleCancelRequest(
+                    final boolean mayInterruptIfRunning) {
+                if (innerFuture != null) {
+                    innerFuture.cancel(mayInterruptIfRunning);
+                }
+                return null;
+            }
+
+        }
+
+        // List of pending Bind or StartTLS requests which must be invoked
+        // when the current heart beat completes.
+        private List<Runnable> pendingRequests = null;
+        private final Object pendingRequestsLock = new Object();
+
+        // Coordinates heart-beats with Bind and StartTLS requests.
+        private final Sync sync = new Sync();
+
+        // Timestamp of last response received (any response, not just heart beats).
+        private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
 
         private ConnectionImpl(final Connection connection) {
             super(connection);
         }
 
         @Override
+        public Result add(final AddRequest request) throws ErrorResultException {
+            try {
+                return timestamp(connection.add(request));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public Result add(final Entry entry) throws ErrorResultException {
+            try {
+                return timestamp(connection.add(entry));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public Result add(final String... ldifLines) throws ErrorResultException {
+            try {
+                return timestamp(connection.add(ldifLines));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public FutureResult<Result> addAsync(final AddRequest request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final ResultHandler<? super Result> resultHandler) {
+            return connection.addAsync(request, intermediateResponseHandler,
+                    timestamper(resultHandler));
+        }
+
+        @Override
+        public BindResult bind(final BindRequest request) throws ErrorResultException {
+            acquireBindOrStartTLSLock();
+            try {
+                return timestamp(connection.bind(request));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            } finally {
+                releaseBindOrStartTLSLock();
+            }
+        }
+
+        @Override
+        public BindResult bind(final String name, final char[] password)
+                throws ErrorResultException {
+            acquireBindOrStartTLSLock();
+            try {
+                return timestamp(connection.bind(name, password));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            } finally {
+                releaseBindOrStartTLSLock();
+            }
+        }
+
+        @Override
+        public FutureResult<BindResult> bindAsync(final BindRequest request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final ResultHandler<? super BindResult> resultHandler) {
+            if (sync.tryLockShared()) {
+                // Fast path
+                return connection.bindAsync(request, intermediateResponseHandler, timestamper(
+                        resultHandler, true));
+            } else {
+                // A heart beat must be in progress so create a runnable task
+                // which will be executed when the heart beat completes.
+                final DelayedFuture<BindResult> future =
+                        new DelayedFuture<BindResult>(resultHandler) {
+                            @Override
+                            public FutureResult<BindResult> dispatch() {
+                                return connection.bindAsync(request, intermediateResponseHandler,
+                                        timestamper(this, true));
+                            }
+                        };
+                addPendingRequest(future);
+                return future;
+            }
+        }
+
+        @Override
+        public CompareResult compare(final CompareRequest request) throws ErrorResultException {
+            try {
+                return timestamp(connection.compare(request));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public CompareResult compare(final String name, final String attributeDescription,
+                final String assertionValue) throws ErrorResultException {
+            try {
+                return timestamp(connection.compare(name, attributeDescription, assertionValue));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public FutureResult<CompareResult> compareAsync(final CompareRequest request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final ResultHandler<? super CompareResult> resultHandler) {
+            return connection.compareAsync(request, intermediateResponseHandler,
+                    timestamper(resultHandler));
+        }
+
+        @Override
+        public Result delete(final DeleteRequest request) throws ErrorResultException {
+            try {
+                return timestamp(connection.delete(request));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public Result delete(final String name) throws ErrorResultException {
+            try {
+                return timestamp(connection.delete(name));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public FutureResult<Result> deleteAsync(final DeleteRequest request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final ResultHandler<? super Result> resultHandler) {
+            return connection.deleteAsync(request, intermediateResponseHandler,
+                    timestamper(resultHandler));
+        }
+
+        @Override
+        public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request)
+                throws ErrorResultException {
+            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
+            if (isStartTLS) {
+                acquireBindOrStartTLSLock();
+            }
+            try {
+                return timestamp(connection.extendedRequest(request));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            } finally {
+                if (isStartTLS) {
+                    releaseBindOrStartTLSLock();
+                }
+            }
+        }
+
+        @Override
+        public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request,
+                final IntermediateResponseHandler handler) throws ErrorResultException {
+            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
+            if (isStartTLS) {
+                acquireBindOrStartTLSLock();
+            }
+            try {
+                return timestamp(connection.extendedRequest(request, handler));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            } finally {
+                if (isStartTLS) {
+                    releaseBindOrStartTLSLock();
+                }
+            }
+        }
+
+        @Override
+        public GenericExtendedResult extendedRequest(final String requestName,
+                final ByteString requestValue) throws ErrorResultException {
+            final boolean isStartTLS = requestName.equals(StartTLSExtendedRequest.OID);
+            if (isStartTLS) {
+                acquireBindOrStartTLSLock();
+            }
+            try {
+                return timestamp(connection.extendedRequest(requestName, requestValue));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            } finally {
+                if (isStartTLS) {
+                    releaseBindOrStartTLSLock();
+                }
+            }
+        }
+
+        @Override
+        public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
+                final ExtendedRequest<R> request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final ResultHandler<? super R> resultHandler) {
+            final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
+            if (isStartTLS) {
+                if (sync.tryLockShared()) {
+                    // Fast path
+                    return connection.extendedRequestAsync(request, intermediateResponseHandler,
+                            timestamper(resultHandler, true));
+                } else {
+                    // A heart beat must be in progress so create a runnable task
+                    // which will be executed when the heart beat completes.
+                    final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
+                        @Override
+                        public FutureResult<R> dispatch() {
+                            return connection.extendedRequestAsync(request,
+                                    intermediateResponseHandler, timestamper(this, true));
+                        }
+                    };
+                    addPendingRequest(future);
+                    return future;
+                }
+            } else {
+                return connection.extendedRequestAsync(request, intermediateResponseHandler,
+                        timestamper(resultHandler));
+            }
+        }
+
+        @Override
         public void handleConnectionClosed() {
             notifyClosed();
         }
@@ -74,58 +367,205 @@
             notifyClosed();
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public boolean handleEntry(final SearchResultEntry entry) {
-            // Ignore.
+            updateTimestamp();
             return true;
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public void handleErrorResult(final ErrorResultException error) {
-            connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: " + error);
+            if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage()));
+            }
+            updateTimestamp();
+            releaseHeartBeatLock();
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public boolean handleReference(final SearchResultReference reference) {
-            // Ignore.
+            updateTimestamp();
             return true;
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public void handleResult(final Result result) {
-            lastSuccessfulPing = System.currentTimeMillis();
+            updateTimestamp();
+            releaseHeartBeatLock();
         }
 
         @Override
         public void handleUnsolicitedNotification(final ExtendedResult notification) {
-            // Do nothing
+            updateTimestamp();
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public boolean isValid() {
-            return connection.isValid()
-                    && (lastSuccessfulPing <= 0 || System.currentTimeMillis() - lastSuccessfulPing < unit
-                            .toMillis(interval) * 2);
+            return connection.isValid() && currentTimeMillis() < (timestamp + timeoutMS);
         }
 
-        /**
-         * {@inheritDoc}
-         */
+        @Override
+        public Result modify(final ModifyRequest request) throws ErrorResultException {
+            try {
+                return timestamp(connection.modify(request));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public Result modify(final String... ldifLines) throws ErrorResultException {
+            try {
+                return timestamp(connection.modify(ldifLines));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public FutureResult<Result> modifyAsync(final ModifyRequest request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final ResultHandler<? super Result> resultHandler) {
+            return connection.modifyAsync(request, intermediateResponseHandler,
+                    timestamper(resultHandler));
+        }
+
+        @Override
+        public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException {
+            try {
+                return timestamp(connection.modifyDN(request));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public Result modifyDN(final String name, final String newRDN) throws ErrorResultException {
+            try {
+                return timestamp(connection.modifyDN(name, newRDN));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final ResultHandler<? super Result> resultHandler) {
+            return connection.modifyDNAsync(request, intermediateResponseHandler,
+                    timestamper(resultHandler));
+        }
+
+        @Override
+        public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions)
+                throws ErrorResultException {
+            try {
+                return timestamp(connection.readEntry(name, attributeDescriptions));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public SearchResultEntry readEntry(final String name, final String... attributeDescriptions)
+                throws ErrorResultException {
+            try {
+                return timestamp(connection.readEntry(name, attributeDescriptions));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
+                final Collection<String> attributeDescriptions,
+                final ResultHandler<? super SearchResultEntry> handler) {
+            return connection.readEntryAsync(name, attributeDescriptions, timestamper(handler));
+        }
+
+        @Override
+        public ConnectionEntryReader search(final SearchRequest request) {
+            // Ensure that search results update timestamp.
+            return new ConnectionEntryReader(this, request);
+        }
+
+        @Override
+        public Result search(final SearchRequest request,
+                final Collection<? super SearchResultEntry> entries) throws ErrorResultException {
+            try {
+                return timestamp(connection.search(request, entries));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public Result search(final SearchRequest request,
+                final Collection<? super SearchResultEntry> entries,
+                final Collection<? super SearchResultReference> references)
+                throws ErrorResultException {
+            try {
+                return timestamp(connection.search(request, entries, references));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public Result search(final SearchRequest request, final SearchResultHandler handler)
+                throws ErrorResultException {
+            try {
+                return connection.search(request, timestamper(handler));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public ConnectionEntryReader search(final String baseObject, final SearchScope scope,
+                final String filter, final String... attributeDescriptions) {
+            // Ensure that search results update timestamp.
+            final SearchRequest request =
+                    Requests.newSearchRequest(baseObject, scope, filter, attributeDescriptions);
+            return new ConnectionEntryReader(this, request);
+        }
+
+        @Override
+        public FutureResult<Result> searchAsync(final SearchRequest request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final SearchResultHandler resultHandler) {
+            return connection.searchAsync(request, intermediateResponseHandler,
+                    timestamper(resultHandler));
+        }
+
+        @Override
+        public SearchResultEntry searchSingleEntry(final SearchRequest request)
+                throws ErrorResultException {
+            try {
+                return timestamp(connection.searchSingleEntry(request));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public SearchResultEntry searchSingleEntry(final String baseObject,
+                final SearchScope scope, final String filter, final String... attributeDescriptions)
+                throws ErrorResultException {
+            try {
+                return timestamp(connection.searchSingleEntry(baseObject, scope, filter,
+                        attributeDescriptions));
+            } catch (final ErrorResultException e) {
+                throw timestamp(e);
+            }
+        }
+
+        @Override
+        public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request,
+                final ResultHandler<? super SearchResultEntry> handler) {
+            return connection.searchSingleEntryAsync(request, timestamper(handler));
+        }
+
         @Override
         public String toString() {
             final StringBuilder builder = new StringBuilder();
@@ -135,72 +575,311 @@
             return builder.toString();
         }
 
+        private void acquireBindOrStartTLSLock() throws ErrorResultException {
+            // Wait for pending heartbeats and prevent new heartbeats from
+            // being sent while the bind is in progress.
+            try {
+                if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
+                    // Give up - it looks like the connection is dead.
+                    // FIXME: improve error message.
+                    throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
+                }
+            } catch (final InterruptedException e) {
+                throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
+            }
+        }
+
+        private void addPendingRequest(final DelayedFuture<? extends Result> runner) {
+            List<Runnable> tmp = null;
+            synchronized (pendingRequestsLock) {
+                if (pendingRequests == null) {
+                    pendingRequests = new LinkedList<Runnable>();
+                }
+                pendingRequests.add(runner);
+
+                // The heart beat may have completed in which case we must try
+                // to invoke the pending request(s) now so that they are not left
+                // stranded. Keep the lock until the requests have been dispatched
+                // to avoid becoming blocked during the dispatch when the runner
+                // attempts to acquire the shared lock.
+                if (sync.tryLockShared()) {
+                    tmp = pendingRequests;
+                    pendingRequests = null;
+                }
+            }
+            if (tmp != null) {
+                try {
+                    for (final Runnable pendingRequest : tmp) {
+                        pendingRequest.run();
+                    }
+                } finally {
+                    sync.unlockShared();
+                }
+            }
+        }
+
         private void notifyClosed() {
             synchronized (activeConnections) {
                 connection.removeConnectionEventListener(this);
                 activeConnections.remove(this);
-
                 if (activeConnections.isEmpty()) {
-                    // This is the last active connection, so stop the heart
-                    // beat.
+                    // This is the last active connection, so stop the heartbeat.
                     heartBeatFuture.cancel(false);
                 }
             }
         }
-    }
 
-    private final class FutureResultImpl extends FutureResultTransformer<Connection, Connection>
-            implements ResultHandler<Connection> {
-
-        private FutureResultImpl(final ResultHandler<? super Connection> handler) {
-            super(handler);
+        private void releaseBindOrStartTLSLock() {
+            sync.unlockShared();
         }
 
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        protected Connection transformResult(final Connection connection)
-                throws ErrorResultException {
-            return adaptConnection(connection);
-        }
-
-    }
-
-    private final class HeartBeatRunnable implements Runnable {
-        private HeartBeatRunnable() {
-            // Nothing to do.
-        }
-
-        @Override
-        public void run() {
-            synchronized (activeConnections) {
-                for (final ConnectionImpl connection : activeConnections) {
-                    if (connection.lastPingFuture == null || connection.lastPingFuture.isDone()) {
-                        connection.lastPingFuture =
-                                connection.searchAsync(heartBeat, null, connection);
+        private void releaseHeartBeatLock() {
+            sync.unlockExclusively();
+            List<Runnable> tmp = null;
+            synchronized (pendingRequestsLock) {
+                if (pendingRequests != null) {
+                    // Invoke any pending request(s). Keep the lock until the requests
+                    // have been dispatched to avoid becoming blocked during the dispatch
+                    // when the runner attempts to acquire the shared lock.
+                    if (sync.tryLockShared()) {
+                        tmp = pendingRequests;
+                        pendingRequests = null;
                     }
                 }
             }
+            if (tmp != null) {
+                try {
+                    for (final Runnable pendingRequest : tmp) {
+                        pendingRequest.run();
+                    }
+                } finally {
+                    sync.unlockShared();
+                }
+            }
+        }
+
+        private void sendHeartBeat() {
+            // Only send the heartbeat if the connection has been idle for some time.
+            if (currentTimeMillis() < (timestamp + minDelayMS)) {
+                return;
+            }
+
+            // Don't send a heart beat if there is already a heart beat,
+            // bind, or startTLS in progress. Note that the bind/startTLS
+            // response will update the timestamp as if it were a heart beat.
+            if (sync.tryLockExclusively()) {
+                try {
+                    searchAsync(heartBeatRequest, null, this);
+                } catch (final Exception e) {
+                    // This may happen when we attempt to send the heart beat just
+                    // after the connection is closed but before we are notified.
+
+                    // Release the lock because we're never going to get a response.
+                    releaseHeartBeatLock();
+                }
+            }
+        }
+
+        private <R> R timestamp(final R response) {
+            updateTimestamp();
+            return response;
+        }
+
+        private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler) {
+            return timestamper(handler, false);
+        }
+
+        private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler,
+                final boolean isBindOrStartTLS) {
+            return new ResultHandler<R>() {
+                @Override
+                public void handleErrorResult(final ErrorResultException error) {
+                    releaseIfNeeded();
+                    if (handler != null) {
+                        handler.handleErrorResult(timestamp(error));
+                    } else {
+                        timestamp(error);
+                    }
+                }
+
+                @Override
+                public void handleResult(final R result) {
+                    releaseIfNeeded();
+                    if (handler != null) {
+                        handler.handleResult(timestamp(result));
+                    } else {
+                        timestamp(result);
+                    }
+                }
+
+                private void releaseIfNeeded() {
+                    if (isBindOrStartTLS) {
+                        releaseBindOrStartTLSLock();
+                    }
+                }
+            };
+        }
+
+        private SearchResultHandler timestamper(final SearchResultHandler handler) {
+            return new SearchResultHandler() {
+                @Override
+                public boolean handleEntry(final SearchResultEntry entry) {
+                    return handler.handleEntry(timestamp(entry));
+                }
+
+                @Override
+                public void handleErrorResult(final ErrorResultException error) {
+                    handler.handleErrorResult(timestamp(error));
+                }
+
+                @Override
+                public boolean handleReference(final SearchResultReference reference) {
+                    return handler.handleReference(timestamp(reference));
+                }
+
+                @Override
+                public void handleResult(final Result result) {
+                    handler.handleResult(timestamp(result));
+                }
+            };
+        }
+
+        private void updateTimestamp() {
+            timestamp = currentTimeMillis();
         }
     }
 
-    private final SearchRequest heartBeat;
+    /**
+     * This synchronizer prevents Bind or StartTLS operations from being
+     * processed concurrently with heart-beats. This is required because the
+     * LDAP protocol specifically states that servers receiving a Bind operation
+     * should either wait for existing operations to complete or abandon them.
+     * The same presumably applies to StartTLS operations. Note that concurrent
+     * bind/StartTLS operations are not permitted.
+     * <p>
+     * This connection factory only coordinates Bind and StartTLS requests with
+     * heart-beats. It does not attempt to prevent or control attempts to send
+     * multiple concurrent Bind or StartTLS operations, etc.
+     * <p>
+     * This synchronizer can be thought of as cross between a read-write lock
+     * and a semaphore. Unlike a read-write lock there is no requirement that a
+     * thread releasing a lock must hold it. In addition, this synchronizer does
+     * not support reentrancy. A thread attempting to acquire exclusively more
+     * than once will deadlock, and a thread attempting to acquire shared more
+     * than once will succeed and be required to release an equivalent number of
+     * times.
+     * <p>
+     * The synchronizer has three states:
+     * <ul>
+     * <li>UNLOCKED(0) - the synchronizer may be acquired shared or exclusively
+     * <li>LOCKED_EXCLUSIVELY(-1) - the synchronizer is held exclusively and
+     * cannot be acquired shared or exclusively. An exclusive lock is held while
+     * a heart beat is in progress
+     * <li>LOCKED_SHARED(>0) - the synchronizer is held shared and cannot be
+     * acquired exclusively. N shared locks are held while N Bind or StartTLS
+     * operations are in progress.
+     * </ul>
+     */
+    private static final class Sync extends AbstractQueuedSynchronizer {
+        // Lock states. Positive values indicate that the shared lock is taken.
+        private static final int UNLOCKED = 0; // initial state
+        private static final int LOCKED_EXCLUSIVELY = -1;
 
-    private final long interval;
+        // Keep compiler quiet.
+        private static final long serialVersionUID = -3590428415442668336L;
 
-    private final ScheduledExecutorService scheduler;
+        @Override
+        protected boolean isHeldExclusively() {
+            return getState() == LOCKED_EXCLUSIVELY;
+        }
 
-    private final TimeUnit unit;
+        @Override
+        protected boolean tryAcquire(final int ignored) {
+            if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) {
+                setExclusiveOwnerThread(Thread.currentThread());
+                return true;
+            }
+            return false;
+        }
 
-    private final List<ConnectionImpl> activeConnections;
+        @Override
+        protected int tryAcquireShared(final int readers) {
+            for (;;) {
+                final int state = getState();
+                if (state == LOCKED_EXCLUSIVELY) {
+                    return LOCKED_EXCLUSIVELY; // failed
+                }
+                final int newState = state + readers;
+                if (compareAndSetState(state, newState)) {
+                    return newState; // succeeded + more readers allowed
+                }
+            }
+        }
 
-    private final ConnectionFactory factory;
+        @Override
+        protected boolean tryRelease(final int ignored) {
+            if (getState() != LOCKED_EXCLUSIVELY) {
+                throw new IllegalMonitorStateException();
+            }
+            setExclusiveOwnerThread(null);
+            setState(UNLOCKED);
+            return true;
+        }
+
+        @Override
+        protected boolean tryReleaseShared(final int ignored) {
+            for (;;) {
+                final int state = getState();
+                if (state == UNLOCKED || state == LOCKED_EXCLUSIVELY) {
+                    throw new IllegalMonitorStateException();
+                }
+                final int newState = state - 1;
+                if (compareAndSetState(state, newState)) {
+                    // We could always return true here, but since there cannot
+                    // be waiting readers we can specialize for waiting writers.
+                    return newState == UNLOCKED;
+                }
+            }
+        }
+
+        void lockShared() {
+            acquireShared(1);
+        }
+
+        boolean tryLockExclusively() {
+            return tryAcquire(0 /* unused */);
+        }
+
+        boolean tryLockShared() {
+            return tryAcquireShared(1) > 0;
+        }
+
+        boolean tryLockShared(final long timeout, final TimeUnit unit) throws InterruptedException {
+            return tryAcquireSharedNanos(1, unit.toNanos(timeout));
+        }
+
+        void unlockExclusively() {
+            release(0 /* unused */);
+        }
+
+        void unlockShared() {
+            releaseShared(0 /* unused */);
+        }
+
+    }
 
     private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("",
             SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
 
+    private final List<ConnectionImpl> activeConnections;
+    private final ConnectionFactory factory;
     private ScheduledFuture<?> heartBeatFuture;
+    private final SearchRequest heartBeatRequest;
+    private final long interval;
+    private final long minDelayMS;
+    private final ScheduledExecutorService scheduler;
+    private final long timeoutMS;
+    private final TimeUnit unit;
 
     /**
      * Creates a new heart-beat connection factory which will create connections
@@ -277,12 +956,14 @@
         Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
         Validator.ensureTrue(interval >= 0, "negative timeout");
 
-        this.heartBeat = heartBeat;
+        this.heartBeatRequest = heartBeat;
         this.interval = interval;
         this.unit = unit;
         this.activeConnections = new LinkedList<ConnectionImpl>();
         this.factory = factory;
         this.scheduler = scheduler;
+        this.timeoutMS = unit.toMillis(interval) * 2;
+        this.minDelayMS = unit.toMillis(interval) / 2;
     }
 
     /**
@@ -299,7 +980,15 @@
     @Override
     public FutureResult<Connection> getConnectionAsync(
             final ResultHandler<? super Connection> handler) {
-        final FutureResultImpl future = new FutureResultImpl(handler);
+        final FutureResultTransformer<Connection, Connection> future =
+                new FutureResultTransformer<Connection, Connection>(handler) {
+                    @Override
+                    protected Connection transformResult(final Connection connection)
+                            throws ErrorResultException {
+                        return adaptConnection(connection);
+                    }
+                };
+
         future.setFutureResult(factory.getConnectionAsync(future));
         return future;
     }
@@ -322,9 +1011,18 @@
             connection.addConnectionEventListener(heartBeatConnection);
             if (activeConnections.isEmpty()) {
                 // This is the first active connection, so start the heart beat.
-                heartBeatFuture =
-                        scheduler
-                                .scheduleWithFixedDelay(new HeartBeatRunnable(), 0, interval, unit);
+                heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        final ConnectionImpl[] tmp;
+                        synchronized (activeConnections) {
+                            tmp = activeConnections.toArray(new ConnectionImpl[0]);
+                        }
+                        for (final ConnectionImpl connection : tmp) {
+                            connection.sendHeartBeat();
+                        }
+                    }
+                }, 0, interval, unit);
             }
             activeConnections.add(heartBeatConnection);
         }

--
Gitblit v1.10.0