From fa5af37362b2f9b1fd07e8efbefe4b45a8088229 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 19 Dec 2012 17:18:53 +0000
Subject: [PATCH] Fix OPENDJ-660: HeartbeatConnectionFactory should avoid doing heart-beats and Bind/StartTLS operations concurrently
---
opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java | 12
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java | 840 +++++++++++++++++++++++++++++++++++++++++++++++++++----
2 files changed, 776 insertions(+), 76 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java b/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
index 016815d..ea156ff 100644
--- a/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
+++ b/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -452,11 +452,13 @@
final int remotePort = Integer.parseInt(args[i + 1]);
factories.add(Connections.newFixedConnectionPool(Connections
- .newAuthenticatedConnectionFactory(new LDAPConnectionFactory(remoteAddress,
- remotePort), Requests.newSimpleBindRequest(proxyDN, proxyPassword
- .toCharArray())), Integer.MAX_VALUE));
- bindFactories.add(Connections.newFixedConnectionPool(new LDAPConnectionFactory(
- remoteAddress, remotePort), Integer.MAX_VALUE));
+ .newAuthenticatedConnectionFactory(Connections
+ .newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
+ remotePort)), Requests.newSimpleBindRequest(proxyDN,
+ proxyPassword.toCharArray())), Integer.MAX_VALUE));
+ bindFactories.add(Connections.newFixedConnectionPool(Connections
+ .newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
+ remotePort)), Integer.MAX_VALUE));
}
final RoundRobinLoadBalancingAlgorithm algorithm =
new RoundRobinLoadBalancingAlgorithm(factories);
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 9a36dba..b3edf1c 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,19 +27,39 @@
package org.forgerock.opendj.ldap;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static java.lang.System.currentTimeMillis;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
+
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.logging.Level;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.GenericExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
+import org.forgerock.opendj.ldif.ConnectionEntryReader;
+import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.ConnectionDecorator;
import com.forgerock.opendj.util.FutureResultTransformer;
import com.forgerock.opendj.util.StaticUtils;
@@ -55,15 +75,288 @@
*/
private final class ConnectionImpl extends ConnectionDecorator implements
ConnectionEventListener, SearchResultHandler {
- private long lastSuccessfulPing;
- private FutureResult<Result> lastPingFuture;
+ /**
+ * Runs pending request once the shared lock becomes available (when no
+ * heart beat is in progress).
+ *
+ * @param <R>
+ * The type of result returned by the request.
+ */
+ private abstract class DelayedFuture<R extends Result> extends AsynchronousFutureResult<R>
+ implements Runnable {
+ private volatile FutureResult<R> innerFuture = null;
+
+ protected DelayedFuture(final ResultHandler<? super R> handler) {
+ super(handler);
+ }
+
+ @Override
+ public final int getRequestID() {
+ return innerFuture != null ? innerFuture.getRequestID() : -1;
+ }
+
+ @Override
+ public final void run() {
+ if (!isCancelled()) {
+ sync.lockShared(); // Will not block.
+ innerFuture = dispatch();
+ if (isCancelled() && !innerFuture.isCancelled()) {
+ innerFuture.cancel(false);
+ }
+ }
+ }
+
+ protected abstract FutureResult<R> dispatch();
+
+ @Override
+ protected final ErrorResultException handleCancelRequest(
+ final boolean mayInterruptIfRunning) {
+ if (innerFuture != null) {
+ innerFuture.cancel(mayInterruptIfRunning);
+ }
+ return null;
+ }
+
+ }
+
+ // List of pending Bind or StartTLS requests which must be invoked
+ // when the current heart beat completes.
+ private List<Runnable> pendingRequests = null;
+ private final Object pendingRequestsLock = new Object();
+
+ // Coordinates heart-beats with Bind and StartTLS requests.
+ private final Sync sync = new Sync();
+
+ // Timestamp of last response received (any response, not just heart beats).
+ private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
private ConnectionImpl(final Connection connection) {
super(connection);
}
@Override
+ public Result add(final AddRequest request) throws ErrorResultException {
+ try {
+ return timestamp(connection.add(request));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public Result add(final Entry entry) throws ErrorResultException {
+ try {
+ return timestamp(connection.add(entry));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public Result add(final String... ldifLines) throws ErrorResultException {
+ try {
+ return timestamp(connection.add(ldifLines));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public FutureResult<Result> addAsync(final AddRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return connection.addAsync(request, intermediateResponseHandler,
+ timestamper(resultHandler));
+ }
+
+ @Override
+ public BindResult bind(final BindRequest request) throws ErrorResultException {
+ acquireBindOrStartTLSLock();
+ try {
+ return timestamp(connection.bind(request));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ } finally {
+ releaseBindOrStartTLSLock();
+ }
+ }
+
+ @Override
+ public BindResult bind(final String name, final char[] password)
+ throws ErrorResultException {
+ acquireBindOrStartTLSLock();
+ try {
+ return timestamp(connection.bind(name, password));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ } finally {
+ releaseBindOrStartTLSLock();
+ }
+ }
+
+ @Override
+ public FutureResult<BindResult> bindAsync(final BindRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super BindResult> resultHandler) {
+ if (sync.tryLockShared()) {
+ // Fast path
+ return connection.bindAsync(request, intermediateResponseHandler, timestamper(
+ resultHandler, true));
+ } else {
+ // A heart beat must be in progress so create a runnable task
+ // which will be executed when the heart beat completes.
+ final DelayedFuture<BindResult> future =
+ new DelayedFuture<BindResult>(resultHandler) {
+ @Override
+ public FutureResult<BindResult> dispatch() {
+ return connection.bindAsync(request, intermediateResponseHandler,
+ timestamper(this, true));
+ }
+ };
+ addPendingRequest(future);
+ return future;
+ }
+ }
+
+ @Override
+ public CompareResult compare(final CompareRequest request) throws ErrorResultException {
+ try {
+ return timestamp(connection.compare(request));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public CompareResult compare(final String name, final String attributeDescription,
+ final String assertionValue) throws ErrorResultException {
+ try {
+ return timestamp(connection.compare(name, attributeDescription, assertionValue));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public FutureResult<CompareResult> compareAsync(final CompareRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super CompareResult> resultHandler) {
+ return connection.compareAsync(request, intermediateResponseHandler,
+ timestamper(resultHandler));
+ }
+
+ @Override
+ public Result delete(final DeleteRequest request) throws ErrorResultException {
+ try {
+ return timestamp(connection.delete(request));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public Result delete(final String name) throws ErrorResultException {
+ try {
+ return timestamp(connection.delete(name));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public FutureResult<Result> deleteAsync(final DeleteRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return connection.deleteAsync(request, intermediateResponseHandler,
+ timestamper(resultHandler));
+ }
+
+ @Override
+ public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request)
+ throws ErrorResultException {
+ final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
+ if (isStartTLS) {
+ acquireBindOrStartTLSLock();
+ }
+ try {
+ return timestamp(connection.extendedRequest(request));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ } finally {
+ if (isStartTLS) {
+ releaseBindOrStartTLSLock();
+ }
+ }
+ }
+
+ @Override
+ public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request,
+ final IntermediateResponseHandler handler) throws ErrorResultException {
+ final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
+ if (isStartTLS) {
+ acquireBindOrStartTLSLock();
+ }
+ try {
+ return timestamp(connection.extendedRequest(request, handler));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ } finally {
+ if (isStartTLS) {
+ releaseBindOrStartTLSLock();
+ }
+ }
+ }
+
+ @Override
+ public GenericExtendedResult extendedRequest(final String requestName,
+ final ByteString requestValue) throws ErrorResultException {
+ final boolean isStartTLS = requestName.equals(StartTLSExtendedRequest.OID);
+ if (isStartTLS) {
+ acquireBindOrStartTLSLock();
+ }
+ try {
+ return timestamp(connection.extendedRequest(requestName, requestValue));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ } finally {
+ if (isStartTLS) {
+ releaseBindOrStartTLSLock();
+ }
+ }
+ }
+
+ @Override
+ public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
+ final ExtendedRequest<R> request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super R> resultHandler) {
+ final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID);
+ if (isStartTLS) {
+ if (sync.tryLockShared()) {
+ // Fast path
+ return connection.extendedRequestAsync(request, intermediateResponseHandler,
+ timestamper(resultHandler, true));
+ } else {
+ // A heart beat must be in progress so create a runnable task
+ // which will be executed when the heart beat completes.
+ final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
+ @Override
+ public FutureResult<R> dispatch() {
+ return connection.extendedRequestAsync(request,
+ intermediateResponseHandler, timestamper(this, true));
+ }
+ };
+ addPendingRequest(future);
+ return future;
+ }
+ } else {
+ return connection.extendedRequestAsync(request, intermediateResponseHandler,
+ timestamper(resultHandler));
+ }
+ }
+
+ @Override
public void handleConnectionClosed() {
notifyClosed();
}
@@ -74,58 +367,205 @@
notifyClosed();
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean handleEntry(final SearchResultEntry entry) {
- // Ignore.
+ updateTimestamp();
return true;
}
- /**
- * {@inheritDoc}
- */
@Override
public void handleErrorResult(final ErrorResultException error) {
- connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: " + error);
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage()));
+ }
+ updateTimestamp();
+ releaseHeartBeatLock();
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean handleReference(final SearchResultReference reference) {
- // Ignore.
+ updateTimestamp();
return true;
}
- /**
- * {@inheritDoc}
- */
@Override
public void handleResult(final Result result) {
- lastSuccessfulPing = System.currentTimeMillis();
+ updateTimestamp();
+ releaseHeartBeatLock();
}
@Override
public void handleUnsolicitedNotification(final ExtendedResult notification) {
- // Do nothing
+ updateTimestamp();
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean isValid() {
- return connection.isValid()
- && (lastSuccessfulPing <= 0 || System.currentTimeMillis() - lastSuccessfulPing < unit
- .toMillis(interval) * 2);
+ return connection.isValid() && currentTimeMillis() < (timestamp + timeoutMS);
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public Result modify(final ModifyRequest request) throws ErrorResultException {
+ try {
+ return timestamp(connection.modify(request));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public Result modify(final String... ldifLines) throws ErrorResultException {
+ try {
+ return timestamp(connection.modify(ldifLines));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public FutureResult<Result> modifyAsync(final ModifyRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return connection.modifyAsync(request, intermediateResponseHandler,
+ timestamper(resultHandler));
+ }
+
+ @Override
+ public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException {
+ try {
+ return timestamp(connection.modifyDN(request));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public Result modifyDN(final String name, final String newRDN) throws ErrorResultException {
+ try {
+ return timestamp(connection.modifyDN(name, newRDN));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler) {
+ return connection.modifyDNAsync(request, intermediateResponseHandler,
+ timestamper(resultHandler));
+ }
+
+ @Override
+ public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions)
+ throws ErrorResultException {
+ try {
+ return timestamp(connection.readEntry(name, attributeDescriptions));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public SearchResultEntry readEntry(final String name, final String... attributeDescriptions)
+ throws ErrorResultException {
+ try {
+ return timestamp(connection.readEntry(name, attributeDescriptions));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
+ final Collection<String> attributeDescriptions,
+ final ResultHandler<? super SearchResultEntry> handler) {
+ return connection.readEntryAsync(name, attributeDescriptions, timestamper(handler));
+ }
+
+ @Override
+ public ConnectionEntryReader search(final SearchRequest request) {
+ // Ensure that search results update timestamp.
+ return new ConnectionEntryReader(this, request);
+ }
+
+ @Override
+ public Result search(final SearchRequest request,
+ final Collection<? super SearchResultEntry> entries) throws ErrorResultException {
+ try {
+ return timestamp(connection.search(request, entries));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public Result search(final SearchRequest request,
+ final Collection<? super SearchResultEntry> entries,
+ final Collection<? super SearchResultReference> references)
+ throws ErrorResultException {
+ try {
+ return timestamp(connection.search(request, entries, references));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public Result search(final SearchRequest request, final SearchResultHandler handler)
+ throws ErrorResultException {
+ try {
+ return connection.search(request, timestamper(handler));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public ConnectionEntryReader search(final String baseObject, final SearchScope scope,
+ final String filter, final String... attributeDescriptions) {
+ // Ensure that search results update timestamp.
+ final SearchRequest request =
+ Requests.newSearchRequest(baseObject, scope, filter, attributeDescriptions);
+ return new ConnectionEntryReader(this, request);
+ }
+
+ @Override
+ public FutureResult<Result> searchAsync(final SearchRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final SearchResultHandler resultHandler) {
+ return connection.searchAsync(request, intermediateResponseHandler,
+ timestamper(resultHandler));
+ }
+
+ @Override
+ public SearchResultEntry searchSingleEntry(final SearchRequest request)
+ throws ErrorResultException {
+ try {
+ return timestamp(connection.searchSingleEntry(request));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public SearchResultEntry searchSingleEntry(final String baseObject,
+ final SearchScope scope, final String filter, final String... attributeDescriptions)
+ throws ErrorResultException {
+ try {
+ return timestamp(connection.searchSingleEntry(baseObject, scope, filter,
+ attributeDescriptions));
+ } catch (final ErrorResultException e) {
+ throw timestamp(e);
+ }
+ }
+
+ @Override
+ public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request,
+ final ResultHandler<? super SearchResultEntry> handler) {
+ return connection.searchSingleEntryAsync(request, timestamper(handler));
+ }
+
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -135,72 +575,311 @@
return builder.toString();
}
+ private void acquireBindOrStartTLSLock() throws ErrorResultException {
+ // Wait for pending heartbeats and prevent new heartbeats from
+ // being sent while the bind is in progress.
+ try {
+ if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
+ // Give up - it looks like the connection is dead.
+ // FIXME: improve error message.
+ throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN);
+ }
+ } catch (final InterruptedException e) {
+ throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
+ }
+ }
+
+ private void addPendingRequest(final DelayedFuture<? extends Result> runner) {
+ List<Runnable> tmp = null;
+ synchronized (pendingRequestsLock) {
+ if (pendingRequests == null) {
+ pendingRequests = new LinkedList<Runnable>();
+ }
+ pendingRequests.add(runner);
+
+ // The heart beat may have completed in which case we must try
+ // to invoke the pending request(s) now so that they are not left
+ // stranded. Keep the lock until the requests have been dispatched
+ // to avoid becoming blocked during the dispatch when the runner
+ // attempts to acquire the shared lock.
+ if (sync.tryLockShared()) {
+ tmp = pendingRequests;
+ pendingRequests = null;
+ }
+ }
+ if (tmp != null) {
+ try {
+ for (final Runnable pendingRequest : tmp) {
+ pendingRequest.run();
+ }
+ } finally {
+ sync.unlockShared();
+ }
+ }
+ }
+
private void notifyClosed() {
synchronized (activeConnections) {
connection.removeConnectionEventListener(this);
activeConnections.remove(this);
-
if (activeConnections.isEmpty()) {
- // This is the last active connection, so stop the heart
- // beat.
+ // This is the last active connection, so stop the heartbeat.
heartBeatFuture.cancel(false);
}
}
}
- }
- private final class FutureResultImpl extends FutureResultTransformer<Connection, Connection>
- implements ResultHandler<Connection> {
-
- private FutureResultImpl(final ResultHandler<? super Connection> handler) {
- super(handler);
+ private void releaseBindOrStartTLSLock() {
+ sync.unlockShared();
}
- /**
- * {@inheritDoc}
- */
- @Override
- protected Connection transformResult(final Connection connection)
- throws ErrorResultException {
- return adaptConnection(connection);
- }
-
- }
-
- private final class HeartBeatRunnable implements Runnable {
- private HeartBeatRunnable() {
- // Nothing to do.
- }
-
- @Override
- public void run() {
- synchronized (activeConnections) {
- for (final ConnectionImpl connection : activeConnections) {
- if (connection.lastPingFuture == null || connection.lastPingFuture.isDone()) {
- connection.lastPingFuture =
- connection.searchAsync(heartBeat, null, connection);
+ private void releaseHeartBeatLock() {
+ sync.unlockExclusively();
+ List<Runnable> tmp = null;
+ synchronized (pendingRequestsLock) {
+ if (pendingRequests != null) {
+ // Invoke any pending request(s). Keep the lock until the requests
+ // have been dispatched to avoid becoming blocked during the dispatch
+ // when the runner attempts to acquire the shared lock.
+ if (sync.tryLockShared()) {
+ tmp = pendingRequests;
+ pendingRequests = null;
}
}
}
+ if (tmp != null) {
+ try {
+ for (final Runnable pendingRequest : tmp) {
+ pendingRequest.run();
+ }
+ } finally {
+ sync.unlockShared();
+ }
+ }
+ }
+
+ private void sendHeartBeat() {
+ // Only send the heartbeat if the connection has been idle for some time.
+ if (currentTimeMillis() < (timestamp + minDelayMS)) {
+ return;
+ }
+
+ // Don't send a heart beat if there is already a heart beat,
+ // bind, or startTLS in progress. Note that the bind/startTLS
+ // response will update the timestamp as if it were a heart beat.
+ if (sync.tryLockExclusively()) {
+ try {
+ searchAsync(heartBeatRequest, null, this);
+ } catch (final Exception e) {
+ // This may happen when we attempt to send the heart beat just
+ // after the connection is closed but before we are notified.
+
+ // Release the lock because we're never going to get a response.
+ releaseHeartBeatLock();
+ }
+ }
+ }
+
+ private <R> R timestamp(final R response) {
+ updateTimestamp();
+ return response;
+ }
+
+ private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler) {
+ return timestamper(handler, false);
+ }
+
+ private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler,
+ final boolean isBindOrStartTLS) {
+ return new ResultHandler<R>() {
+ @Override
+ public void handleErrorResult(final ErrorResultException error) {
+ releaseIfNeeded();
+ if (handler != null) {
+ handler.handleErrorResult(timestamp(error));
+ } else {
+ timestamp(error);
+ }
+ }
+
+ @Override
+ public void handleResult(final R result) {
+ releaseIfNeeded();
+ if (handler != null) {
+ handler.handleResult(timestamp(result));
+ } else {
+ timestamp(result);
+ }
+ }
+
+ private void releaseIfNeeded() {
+ if (isBindOrStartTLS) {
+ releaseBindOrStartTLSLock();
+ }
+ }
+ };
+ }
+
+ private SearchResultHandler timestamper(final SearchResultHandler handler) {
+ return new SearchResultHandler() {
+ @Override
+ public boolean handleEntry(final SearchResultEntry entry) {
+ return handler.handleEntry(timestamp(entry));
+ }
+
+ @Override
+ public void handleErrorResult(final ErrorResultException error) {
+ handler.handleErrorResult(timestamp(error));
+ }
+
+ @Override
+ public boolean handleReference(final SearchResultReference reference) {
+ return handler.handleReference(timestamp(reference));
+ }
+
+ @Override
+ public void handleResult(final Result result) {
+ handler.handleResult(timestamp(result));
+ }
+ };
+ }
+
+ private void updateTimestamp() {
+ timestamp = currentTimeMillis();
}
}
- private final SearchRequest heartBeat;
+ /**
+ * This synchronizer prevents Bind or StartTLS operations from being
+ * processed concurrently with heart-beats. This is required because the
+ * LDAP protocol specifically states that servers receiving a Bind operation
+ * should either wait for existing operations to complete or abandon them.
+ * The same presumably applies to StartTLS operations. Note that concurrent
+ * bind/StartTLS operations are not permitted.
+ * <p>
+ * This connection factory only coordinates Bind and StartTLS requests with
+ * heart-beats. It does not attempt to prevent or control attempts to send
+ * multiple concurrent Bind or StartTLS operations, etc.
+ * <p>
+ * This synchronizer can be thought of as cross between a read-write lock
+ * and a semaphore. Unlike a read-write lock there is no requirement that a
+ * thread releasing a lock must hold it. In addition, this synchronizer does
+ * not support reentrancy. A thread attempting to acquire exclusively more
+ * than once will deadlock, and a thread attempting to acquire shared more
+ * than once will succeed and be required to release an equivalent number of
+ * times.
+ * <p>
+ * The synchronizer has three states:
+ * <ul>
+ * <li>UNLOCKED(0) - the synchronizer may be acquired shared or exclusively
+ * <li>LOCKED_EXCLUSIVELY(-1) - the synchronizer is held exclusively and
+ * cannot be acquired shared or exclusively. An exclusive lock is held while
+ * a heart beat is in progress
+ * <li>LOCKED_SHARED(>0) - the synchronizer is held shared and cannot be
+ * acquired exclusively. N shared locks are held while N Bind or StartTLS
+ * operations are in progress.
+ * </ul>
+ */
+ private static final class Sync extends AbstractQueuedSynchronizer {
+ // Lock states. Positive values indicate that the shared lock is taken.
+ private static final int UNLOCKED = 0; // initial state
+ private static final int LOCKED_EXCLUSIVELY = -1;
- private final long interval;
+ // Keep compiler quiet.
+ private static final long serialVersionUID = -3590428415442668336L;
- private final ScheduledExecutorService scheduler;
+ @Override
+ protected boolean isHeldExclusively() {
+ return getState() == LOCKED_EXCLUSIVELY;
+ }
- private final TimeUnit unit;
+ @Override
+ protected boolean tryAcquire(final int ignored) {
+ if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) {
+ setExclusiveOwnerThread(Thread.currentThread());
+ return true;
+ }
+ return false;
+ }
- private final List<ConnectionImpl> activeConnections;
+ @Override
+ protected int tryAcquireShared(final int readers) {
+ for (;;) {
+ final int state = getState();
+ if (state == LOCKED_EXCLUSIVELY) {
+ return LOCKED_EXCLUSIVELY; // failed
+ }
+ final int newState = state + readers;
+ if (compareAndSetState(state, newState)) {
+ return newState; // succeeded + more readers allowed
+ }
+ }
+ }
- private final ConnectionFactory factory;
+ @Override
+ protected boolean tryRelease(final int ignored) {
+ if (getState() != LOCKED_EXCLUSIVELY) {
+ throw new IllegalMonitorStateException();
+ }
+ setExclusiveOwnerThread(null);
+ setState(UNLOCKED);
+ return true;
+ }
+
+ @Override
+ protected boolean tryReleaseShared(final int ignored) {
+ for (;;) {
+ final int state = getState();
+ if (state == UNLOCKED || state == LOCKED_EXCLUSIVELY) {
+ throw new IllegalMonitorStateException();
+ }
+ final int newState = state - 1;
+ if (compareAndSetState(state, newState)) {
+ // We could always return true here, but since there cannot
+ // be waiting readers we can specialize for waiting writers.
+ return newState == UNLOCKED;
+ }
+ }
+ }
+
+ void lockShared() {
+ acquireShared(1);
+ }
+
+ boolean tryLockExclusively() {
+ return tryAcquire(0 /* unused */);
+ }
+
+ boolean tryLockShared() {
+ return tryAcquireShared(1) > 0;
+ }
+
+ boolean tryLockShared(final long timeout, final TimeUnit unit) throws InterruptedException {
+ return tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ }
+
+ void unlockExclusively() {
+ release(0 /* unused */);
+ }
+
+ void unlockShared() {
+ releaseShared(0 /* unused */);
+ }
+
+ }
private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("",
SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
+ private final List<ConnectionImpl> activeConnections;
+ private final ConnectionFactory factory;
private ScheduledFuture<?> heartBeatFuture;
+ private final SearchRequest heartBeatRequest;
+ private final long interval;
+ private final long minDelayMS;
+ private final ScheduledExecutorService scheduler;
+ private final long timeoutMS;
+ private final TimeUnit unit;
/**
* Creates a new heart-beat connection factory which will create connections
@@ -277,12 +956,14 @@
Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
Validator.ensureTrue(interval >= 0, "negative timeout");
- this.heartBeat = heartBeat;
+ this.heartBeatRequest = heartBeat;
this.interval = interval;
this.unit = unit;
this.activeConnections = new LinkedList<ConnectionImpl>();
this.factory = factory;
this.scheduler = scheduler;
+ this.timeoutMS = unit.toMillis(interval) * 2;
+ this.minDelayMS = unit.toMillis(interval) / 2;
}
/**
@@ -299,7 +980,15 @@
@Override
public FutureResult<Connection> getConnectionAsync(
final ResultHandler<? super Connection> handler) {
- final FutureResultImpl future = new FutureResultImpl(handler);
+ final FutureResultTransformer<Connection, Connection> future =
+ new FutureResultTransformer<Connection, Connection>(handler) {
+ @Override
+ protected Connection transformResult(final Connection connection)
+ throws ErrorResultException {
+ return adaptConnection(connection);
+ }
+ };
+
future.setFutureResult(factory.getConnectionAsync(future));
return future;
}
@@ -322,9 +1011,18 @@
connection.addConnectionEventListener(heartBeatConnection);
if (activeConnections.isEmpty()) {
// This is the first active connection, so start the heart beat.
- heartBeatFuture =
- scheduler
- .scheduleWithFixedDelay(new HeartBeatRunnable(), 0, interval, unit);
+ heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ final ConnectionImpl[] tmp;
+ synchronized (activeConnections) {
+ tmp = activeConnections.toArray(new ConnectionImpl[0]);
+ }
+ for (final ConnectionImpl connection : tmp) {
+ connection.sendHeartBeat();
+ }
+ }
+ }, 0, interval, unit);
}
activeConnections.add(heartBeatConnection);
}
--
Gitblit v1.10.0