| | |
| | | |
| | | package org.forgerock.opendj.ldap; |
| | | |
| | | import static com.forgerock.opendj.util.StaticUtils.*; |
| | | import static java.lang.System.*; |
| | | import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; |
| | | import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER; |
| | | import static org.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT; |
| | | import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_FAILED; |
| | | import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT; |
| | | import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; |
| | | |
| | | import static org.forgerock.opendj.ldap.ErrorResultException.*; |
| | | |
| | | import java.util.Collection; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.locks.AbstractQueuedSynchronizer; |
| | | import java.util.logging.Level; |
| | | |
| | | import org.forgerock.opendj.ldap.requests.AbandonRequest; |
| | | import org.forgerock.opendj.ldap.requests.AddRequest; |
| | | import org.forgerock.opendj.ldap.requests.BindRequest; |
| | | import org.forgerock.opendj.ldap.requests.CompareRequest; |
| | |
| | | import org.forgerock.opendj.ldap.requests.Requests; |
| | | import org.forgerock.opendj.ldap.requests.SearchRequest; |
| | | import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest; |
| | | import org.forgerock.opendj.ldap.requests.UnbindRequest; |
| | | import org.forgerock.opendj.ldap.responses.BindResult; |
| | | import org.forgerock.opendj.ldap.responses.CompareResult; |
| | | import org.forgerock.opendj.ldap.responses.ExtendedResult; |
| | | import org.forgerock.opendj.ldap.responses.GenericExtendedResult; |
| | | import org.forgerock.opendj.ldap.responses.Result; |
| | | import org.forgerock.opendj.ldap.responses.SearchResultEntry; |
| | | import org.forgerock.opendj.ldap.responses.SearchResultReference; |
| | | import org.forgerock.opendj.ldif.ConnectionEntryReader; |
| | | |
| | | import com.forgerock.opendj.ldap.ConnectionState; |
| | | import com.forgerock.opendj.util.AsynchronousFutureResult; |
| | | import com.forgerock.opendj.util.CompletedFutureResult; |
| | | import com.forgerock.opendj.util.FutureResultTransformer; |
| | | import com.forgerock.opendj.util.RecursiveFutureResult; |
| | | import com.forgerock.opendj.util.ReferenceCountedObject; |
| | | import com.forgerock.opendj.util.TimeSource; |
| | | import com.forgerock.opendj.util.Validator; |
| | | |
| | | /** |
| | | * An heart beat connection factory can be used to create connections that sends |
| | | * a periodic search request to a Directory Server. |
| | | * <p> |
| | | * Before returning new connections to the application this factory will first |
| | | * send an initial heart beat in order to determine that the remote server is |
| | | * responsive. If the heart beat fails or times out then the connection is |
| | | * closed immediately and an error returned to the client. |
| | | * <p> |
| | | * Once a connection has been established successfully (including the initial |
| | | * heart beat), this factory will periodically send heart beats on the |
| | | * connection based on the configured heart beat interval. If the heart beat |
| | | * times out then the server is assumed to be down and an appropriate |
| | | * {@link ConnectionException} generated and published to any registered |
| | | * {@link ConnectionEventListener}s. Note however, that heart beats will only be |
| | | * sent when the connection is determined to be reasonably idle: there is no |
| | | * point in sending heart beats if the connection has recently received a |
| | | * response. A connection is deemed to be idle if no response has been received |
| | | * during a period equivalent to half the heart beat interval. |
| | | * <p> |
| | | * The LDAP protocol specifically precludes clients from performing operations |
| | | * while bind or startTLS requests are being performed. Likewise, a bind or |
| | | * startTLS request will cause active operations to be aborted. This factory |
| | | * coordinates heart beats with bind or startTLS requests, ensuring that they |
| | | * are not performed concurrently. Specifically, bind and startTLS requests are |
| | | * queued up while a heart beat is pending, and heart beats are not sent at all |
| | | * while there are pending bind or startTLS requests. |
| | | */ |
| | | final class HeartBeatConnectionFactory implements ConnectionFactory { |
| | | |
| | | /** |
| | | * This class is responsible for performing an initial heart beat once a new |
| | | * connection has been obtained and, if the heart beat succeeds, adapting it |
| | | * to a {@code ConnectionImpl} and registering it in the table of valid |
| | | * connections. |
| | | */ |
| | | private final class ConnectionFutureResultImpl { |
| | | |
| | | /** |
| | | * This class handles the initial heart beat result notification or |
| | | * timeout. We need to take care to avoid processing multiple results, |
| | | * which may occur when the heart beat is timed out and a result follows |
| | | * soon after, or vice versa. |
| | | */ |
| | | private final class InitialHeartBeatResultHandler implements SearchResultHandler, Runnable { |
| | | private final ResultHandler<? super Result> handler; |
| | | |
| | | /** |
| | | * Due to a potential race between the heart beat timing out and the |
| | | * heart beat completing this atomic ensures that notification only |
| | | * occurs once. |
| | | */ |
| | | private final AtomicBoolean isComplete = new AtomicBoolean(); |
| | | |
| | | private InitialHeartBeatResultHandler(final ResultHandler<? super Result> handler) { |
| | | this.handler = handler; |
| | | } |
| | | |
| | | @Override |
| | | public boolean handleEntry(final SearchResultEntry entry) { |
| | | /* |
| | | * Depending on the configuration, a heartbeat may return some |
| | | * entries. However, we can just ignore them. |
| | | */ |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | if (isComplete.compareAndSet(false, true)) { |
| | | handler.handleErrorResult(error); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean handleReference(final SearchResultReference reference) { |
| | | /* |
| | | * Depending on the configuration, a heartbeat may return some |
| | | * references. However, we can just ignore them. |
| | | */ |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public void handleResult(final Result result) { |
| | | if (isComplete.compareAndSet(false, true)) { |
| | | handler.handleResult(result); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * Invoked by the scheduler when the heart beat times out. |
| | | */ |
| | | @Override |
| | | public void run() { |
| | | handleErrorResult(newHeartBeatTimeoutError()); |
| | | } |
| | | } |
| | | |
| | | private Connection connection; |
| | | private final RecursiveFutureResult<Connection, Result> futureConnectionResult; |
| | | private final FutureResultTransformer<Result, Connection> futureSearchResult; |
| | | |
| | | private ConnectionFutureResultImpl(final ResultHandler<? super Connection> handler) { |
| | | // Create a future which will handle the initial heart beat result. |
| | | this.futureSearchResult = new FutureResultTransformer<Result, Connection>(handler) { |
| | | |
| | | @Override |
| | | protected ErrorResultException transformErrorResult( |
| | | final ErrorResultException errorResult) { |
| | | // Ensure that the connection is closed. |
| | | if (connection != null) { |
| | | connection.close(); |
| | | connection = null; |
| | | } |
| | | return adaptHeartBeatError(errorResult); |
| | | } |
| | | |
| | | @Override |
| | | protected Connection transformResult(final Result result) |
| | | throws ErrorResultException { |
| | | return adaptConnection(connection); |
| | | } |
| | | |
| | | }; |
| | | |
| | | // Create a future which will handle connection result. |
| | | this.futureConnectionResult = |
| | | new RecursiveFutureResult<Connection, Result>(futureSearchResult) { |
| | | |
| | | @Override |
| | | protected FutureResult<? extends Result> chainResult( |
| | | final Connection innerResult, |
| | | final ResultHandler<? super Result> handler) |
| | | throws ErrorResultException { |
| | | // Save the connection for later once the heart beat completes. |
| | | connection = innerResult; |
| | | |
| | | /* |
| | | * Send the initial heart beat and schedule a client |
| | | * side timeout notification. |
| | | */ |
| | | final InitialHeartBeatResultHandler wrappedHandler = |
| | | new InitialHeartBeatResultHandler(handler); |
| | | scheduler.get().schedule(wrappedHandler, timeoutMS, |
| | | TimeUnit.MILLISECONDS); |
| | | return connection.searchAsync(heartBeatRequest, null, wrappedHandler); |
| | | } |
| | | }; |
| | | |
| | | // Link the two futures. |
| | | futureSearchResult.setFutureResult(futureConnectionResult); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * A connection that sends heart beats and supports all operations. |
| | | */ |
| | | private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements |
| | | ConnectionEventListener, SearchResultHandler { |
| | | private final class ConnectionImpl extends AbstractAsynchronousConnection implements |
| | | ConnectionEventListener { |
| | | |
| | | /** |
| | | * A result handler wrapper for operations which timestamps the |
| | | * connection for each response received. The wrapper ensures that |
| | | * completed requests are removed from the {@code pendingResults} queue, |
| | | * as well as ensuring that requests are only completed once. |
| | | */ |
| | | private abstract class AbstractWrappedResultHandler<R, H extends ResultHandler<? super R>> |
| | | implements ResultHandler<R>, FutureResult<R> { |
| | | /** The user provided result handler. */ |
| | | protected final H handler; |
| | | |
| | | private final CountDownLatch completed = new CountDownLatch(1); |
| | | private ErrorResultException error; |
| | | private FutureResult<R> innerFuture; |
| | | private R result; |
| | | |
| | | AbstractWrappedResultHandler(final H handler) { |
| | | this.handler = handler; |
| | | } |
| | | |
| | | @Override |
| | | public boolean cancel(final boolean mayInterruptIfRunning) { |
| | | return innerFuture.cancel(mayInterruptIfRunning); |
| | | } |
| | | |
| | | @Override |
| | | public R get() throws ErrorResultException, InterruptedException { |
| | | completed.await(); |
| | | return get0(); |
| | | } |
| | | |
| | | @Override |
| | | public R get(final long timeout, final TimeUnit unit) throws ErrorResultException, |
| | | TimeoutException, InterruptedException { |
| | | if (completed.await(timeout, unit)) { |
| | | return get0(); |
| | | } else { |
| | | throw new TimeoutException(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public int getRequestID() { |
| | | return innerFuture.getRequestID(); |
| | | } |
| | | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | if (tryComplete(null, error)) { |
| | | if (handler != null) { |
| | | handler.handleErrorResult(timestamp(error)); |
| | | } else { |
| | | timestamp(error); |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void handleResult(final R result) { |
| | | if (tryComplete(result, null)) { |
| | | if (handler != null) { |
| | | handler.handleResult(timestamp(result)); |
| | | } else { |
| | | timestamp(result); |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean isCancelled() { |
| | | return innerFuture.isCancelled(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isDone() { |
| | | return completed.getCount() == 0; |
| | | } |
| | | |
| | | abstract void releaseBindOrStartTLSLockIfNeeded(); |
| | | |
| | | FutureResult<R> setInnerFuture(final FutureResult<R> innerFuture) { |
| | | this.innerFuture = innerFuture; |
| | | return this; |
| | | } |
| | | |
| | | private R get0() throws ErrorResultException { |
| | | if (result != null) { |
| | | return result; |
| | | } else { |
| | | throw error; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Attempts to complete this request, returning true if successful. |
| | | * This method is synchronized in order to avoid race conditions |
| | | * with search result processing. |
| | | */ |
| | | private synchronized boolean tryComplete(final R result, |
| | | final ErrorResultException error) { |
| | | if (pendingResults.remove(this)) { |
| | | this.result = result; |
| | | this.error = error; |
| | | completed.countDown(); |
| | | releaseBindOrStartTLSLockIfNeeded(); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Runs pending request once the shared lock becomes available (when no |
| | |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | } |
| | | |
| | | /* |
| | | * List of pending Bind or StartTLS requests which must be invoked when |
| | | /** |
| | | * A result handler wrapper for bind or startTLS requests which releases |
| | | * the bind/startTLS lock on completion. |
| | | */ |
| | | private final class WrappedBindOrStartTLSResultHandler<R> extends |
| | | AbstractWrappedResultHandler<R, ResultHandler<? super R>> { |
| | | WrappedBindOrStartTLSResultHandler(final ResultHandler<? super R> handler) { |
| | | super(handler); |
| | | } |
| | | |
| | | @Override |
| | | void releaseBindOrStartTLSLockIfNeeded() { |
| | | releaseBindOrStartTLSLock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * A result handler wrapper for normal requests which does not release |
| | | * the bind/startTLS lock on completion. |
| | | */ |
| | | private final class WrappedResultHandler<R> extends |
| | | AbstractWrappedResultHandler<R, ResultHandler<? super R>> { |
| | | WrappedResultHandler(final ResultHandler<? super R> handler) { |
| | | super(handler); |
| | | } |
| | | |
| | | @Override |
| | | void releaseBindOrStartTLSLockIfNeeded() { |
| | | // No-op for normal operations. |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * A result handler wrapper for search operations. Ensures that search |
| | | * results are not sent once the request has been completed (see |
| | | * markComplete()). |
| | | */ |
| | | private final class WrappedSearchResultHandler extends |
| | | AbstractWrappedResultHandler<Result, SearchResultHandler> implements |
| | | SearchResultHandler { |
| | | WrappedSearchResultHandler(final SearchResultHandler handler) { |
| | | super(handler); |
| | | } |
| | | |
| | | @Override |
| | | public synchronized boolean handleEntry(final SearchResultEntry entry) { |
| | | if (!isDone()) { |
| | | if (handler != null) { |
| | | handler.handleEntry(timestamp(entry)); |
| | | } else { |
| | | timestamp(entry); |
| | | } |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public synchronized boolean handleReference(final SearchResultReference reference) { |
| | | if (!isDone()) { |
| | | if (handler != null) { |
| | | handler.handleReference(timestamp(reference)); |
| | | } else { |
| | | timestamp(reference); |
| | | } |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | void releaseBindOrStartTLSLockIfNeeded() { |
| | | // No-op for normal operations. |
| | | } |
| | | } |
| | | |
| | | /** The wrapped connection. */ |
| | | private final Connection connection; |
| | | |
| | | /** |
| | | * Search result handler for processing heart beat responses. |
| | | */ |
| | | private final SearchResultHandler heartBeatHandler = new SearchResultHandler() { |
| | | @Override |
| | | public boolean handleEntry(final SearchResultEntry entry) { |
| | | timestamp(entry); |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | /* |
| | | * Connection failure will be handled by connection event |
| | | * listener. Ignore cancellation errors since these indicate |
| | | * that the heart beat was aborted by a client-side close. |
| | | */ |
| | | if (!(error instanceof CancelledResultException)) { |
| | | if (DEBUG_LOG.isLoggable(Level.WARNING)) { |
| | | DEBUG_LOG.warning(String.format( |
| | | "Heartbeat failed for connection factory '%s': %s", factory, error |
| | | .getMessage())); |
| | | } |
| | | timestamp(error); |
| | | } |
| | | releaseHeartBeatLock(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean handleReference(final SearchResultReference reference) { |
| | | timestamp(reference); |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public void handleResult(final Result result) { |
| | | timestamp(result); |
| | | releaseHeartBeatLock(); |
| | | } |
| | | }; |
| | | |
| | | /** |
| | | * List of pending Bind or StartTLS requests which must be invoked once |
| | | * the current heart beat completes. |
| | | */ |
| | | private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>(); |
| | | private final Queue<Runnable> pendingBindOrStartTLSRequests = |
| | | new ConcurrentLinkedQueue<Runnable>(); |
| | | |
| | | /* Coordinates heart-beats with Bind and StartTLS requests. */ |
| | | /** |
| | | * List of pending responses for all active operations. These will be |
| | | * signalled if no heart beat is detected within the permitted timeout |
| | | * period. |
| | | */ |
| | | private final Queue<AbstractWrappedResultHandler<?, ?>> pendingResults = |
| | | new ConcurrentLinkedQueue<AbstractWrappedResultHandler<?, ?>>(); |
| | | |
| | | /** Internal connection state. */ |
| | | private final ConnectionState state = new ConnectionState(); |
| | | |
| | | /** Coordinates heart-beats with Bind and StartTLS requests. */ |
| | | private final Sync sync = new Sync(); |
| | | |
| | | /* |
| | | /** |
| | | * Timestamp of last response received (any response, not just heart |
| | | * beats). |
| | | */ |
| | | private volatile long timestamp = currentTimeMillis(); // Assume valid at creation. |
| | | private volatile long lastResponseTimestamp = timeSource.currentTimeMillis(); // Assume valid at creation. |
| | | |
| | | private ConnectionImpl(final Connection connection) { |
| | | super(connection); |
| | | this.connection = connection; |
| | | connection.addConnectionEventListener(this); |
| | | } |
| | | |
| | | @Override |
| | | public Result add(final AddRequest request) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.add(request)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Result add(final Entry entry) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.add(entry)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Result add(final String... ldifLines) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.add(ldifLines)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | public FutureResult<Void> abandonAsync(final AbandonRequest request) { |
| | | return connection.abandonAsync(request); |
| | | } |
| | | |
| | | @Override |
| | | public FutureResult<Result> addAsync(final AddRequest request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final ResultHandler<? super Result> resultHandler) { |
| | | return connection.addAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler)); |
| | | } |
| | | |
| | | @Override |
| | | public BindResult bind(final BindRequest request) throws ErrorResultException { |
| | | acquireBindOrStartTLSLock(); |
| | | try { |
| | | return timestamp(connection.bind(request)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } finally { |
| | | releaseBindOrStartTLSLock(); |
| | | if (checkState(resultHandler)) { |
| | | final WrappedResultHandler<Result> h = wrap(resultHandler); |
| | | return checkState(connection.addAsync(request, intermediateResponseHandler, h), h); |
| | | } else { |
| | | return newConnectionErrorFuture(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public BindResult bind(final String name, final char[] password) |
| | | throws ErrorResultException { |
| | | acquireBindOrStartTLSLock(); |
| | | try { |
| | | return timestamp(connection.bind(name, password)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } finally { |
| | | releaseBindOrStartTLSLock(); |
| | | } |
| | | public void addConnectionEventListener(final ConnectionEventListener listener) { |
| | | state.addConnectionEventListener(listener); |
| | | } |
| | | |
| | | @Override |
| | | public FutureResult<BindResult> bindAsync(final BindRequest request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final ResultHandler<? super BindResult> resultHandler) { |
| | | if (sync.tryLockShared()) { |
| | | // Fast path |
| | | return connection.bindAsync(request, intermediateResponseHandler, timestamper( |
| | | resultHandler, true)); |
| | | if (checkState(resultHandler)) { |
| | | if (sync.tryLockShared()) { |
| | | // Fast path |
| | | final WrappedBindOrStartTLSResultHandler<BindResult> h = |
| | | wrapForBindOrStartTLS(resultHandler); |
| | | return checkState( |
| | | connection.bindAsync(request, intermediateResponseHandler, h), h); |
| | | } else { |
| | | /* |
| | | * A heart beat must be in progress so create a runnable |
| | | * task which will be executed when the heart beat |
| | | * completes. |
| | | */ |
| | | final DelayedFuture<BindResult> future = |
| | | new DelayedFuture<BindResult>(resultHandler) { |
| | | @Override |
| | | public FutureResult<BindResult> dispatch() { |
| | | final WrappedBindOrStartTLSResultHandler<BindResult> h = |
| | | wrapForBindOrStartTLS(this); |
| | | return checkState(connection.bindAsync(request, |
| | | intermediateResponseHandler, h), h); |
| | | } |
| | | }; |
| | | /* |
| | | * Enqueue and flush if the heart beat has completed in the |
| | | * mean time. |
| | | */ |
| | | pendingBindOrStartTLSRequests.offer(future); |
| | | flushPendingBindOrStartTLSRequests(); |
| | | return future; |
| | | } |
| | | } else { |
| | | /* |
| | | * A heart beat must be in progress so create a runnable task |
| | | * which will be executed when the heart beat completes. |
| | | */ |
| | | final DelayedFuture<BindResult> future = |
| | | new DelayedFuture<BindResult>(resultHandler) { |
| | | @Override |
| | | public FutureResult<BindResult> dispatch() { |
| | | return connection.bindAsync(request, intermediateResponseHandler, |
| | | timestamper(this, true)); |
| | | } |
| | | }; |
| | | /* |
| | | * Enqueue and flush if the heart beat has completed in the mean |
| | | * time. |
| | | */ |
| | | pendingRequests.offer(future); |
| | | flushPendingRequests(); |
| | | return future; |
| | | return newConnectionErrorFuture(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public CompareResult compare(final CompareRequest request) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.compare(request)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | public void close() { |
| | | handleConnectionClosed(); |
| | | connection.close(); |
| | | } |
| | | |
| | | @Override |
| | | public CompareResult compare(final String name, final String attributeDescription, |
| | | final String assertionValue) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.compare(name, attributeDescription, assertionValue)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | public void close(final UnbindRequest request, final String reason) { |
| | | handleConnectionClosed(); |
| | | connection.close(request, reason); |
| | | } |
| | | |
| | | @Override |
| | | public FutureResult<CompareResult> compareAsync(final CompareRequest request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final ResultHandler<? super CompareResult> resultHandler) { |
| | | return connection.compareAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler)); |
| | | } |
| | | |
| | | @Override |
| | | public Result delete(final DeleteRequest request) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.delete(request)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Result delete(final String name) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.delete(name)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | if (checkState(resultHandler)) { |
| | | final WrappedResultHandler<CompareResult> h = wrap(resultHandler); |
| | | return checkState(connection.compareAsync(request, intermediateResponseHandler, h), |
| | | h); |
| | | } else { |
| | | return newConnectionErrorFuture(); |
| | | } |
| | | } |
| | | |
| | |
| | | public FutureResult<Result> deleteAsync(final DeleteRequest request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final ResultHandler<? super Result> resultHandler) { |
| | | return connection.deleteAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler)); |
| | | } |
| | | |
| | | @Override |
| | | public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request) |
| | | throws ErrorResultException { |
| | | final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID); |
| | | if (isStartTLS) { |
| | | acquireBindOrStartTLSLock(); |
| | | } |
| | | try { |
| | | return timestamp(connection.extendedRequest(request)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } finally { |
| | | if (isStartTLS) { |
| | | releaseBindOrStartTLSLock(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public <R extends ExtendedResult> R extendedRequest(final ExtendedRequest<R> request, |
| | | final IntermediateResponseHandler handler) throws ErrorResultException { |
| | | final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID); |
| | | if (isStartTLS) { |
| | | acquireBindOrStartTLSLock(); |
| | | } |
| | | try { |
| | | return timestamp(connection.extendedRequest(request, handler)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } finally { |
| | | if (isStartTLS) { |
| | | releaseBindOrStartTLSLock(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public GenericExtendedResult extendedRequest(final String requestName, |
| | | final ByteString requestValue) throws ErrorResultException { |
| | | final boolean isStartTLS = requestName.equals(StartTLSExtendedRequest.OID); |
| | | if (isStartTLS) { |
| | | acquireBindOrStartTLSLock(); |
| | | } |
| | | try { |
| | | return timestamp(connection.extendedRequest(requestName, requestValue)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } finally { |
| | | if (isStartTLS) { |
| | | releaseBindOrStartTLSLock(); |
| | | } |
| | | if (checkState(resultHandler)) { |
| | | final WrappedResultHandler<Result> h = wrap(resultHandler); |
| | | return checkState(connection.deleteAsync(request, intermediateResponseHandler, h), |
| | | h); |
| | | } else { |
| | | return newConnectionErrorFuture(); |
| | | } |
| | | } |
| | | |
| | |
| | | final ExtendedRequest<R> request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final ResultHandler<? super R> resultHandler) { |
| | | final boolean isStartTLS = request.getOID().equals(StartTLSExtendedRequest.OID); |
| | | if (isStartTLS) { |
| | | if (sync.tryLockShared()) { |
| | | // Fast path |
| | | return connection.extendedRequestAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler, true)); |
| | | } else { |
| | | /* |
| | | * A heart beat must be in progress so create a runnable |
| | | * task which will be executed when the heart beat |
| | | * completes. |
| | | */ |
| | | final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) { |
| | | @Override |
| | | public FutureResult<R> dispatch() { |
| | | return connection.extendedRequestAsync(request, |
| | | intermediateResponseHandler, timestamper(this, true)); |
| | | } |
| | | }; |
| | | if (checkState(resultHandler)) { |
| | | if (isStartTLSRequest(request)) { |
| | | if (sync.tryLockShared()) { |
| | | // Fast path |
| | | final WrappedBindOrStartTLSResultHandler<R> h = |
| | | wrapForBindOrStartTLS(resultHandler); |
| | | return checkState(connection.extendedRequestAsync(request, |
| | | intermediateResponseHandler, h), h); |
| | | } else { |
| | | /* |
| | | * A heart beat must be in progress so create a runnable |
| | | * task which will be executed when the heart beat |
| | | * completes. |
| | | */ |
| | | final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) { |
| | | @Override |
| | | public FutureResult<R> dispatch() { |
| | | final WrappedBindOrStartTLSResultHandler<R> h = |
| | | wrapForBindOrStartTLS(this); |
| | | return checkState(connection.extendedRequestAsync(request, |
| | | intermediateResponseHandler, h), h); |
| | | } |
| | | }; |
| | | |
| | | /* |
| | | * Enqueue and flush if the heart beat has completed in the |
| | | * mean time. |
| | | */ |
| | | pendingRequests.offer(future); |
| | | flushPendingRequests(); |
| | | return future; |
| | | /* |
| | | * Enqueue and flush if the heart beat has completed in |
| | | * the mean time. |
| | | */ |
| | | pendingBindOrStartTLSRequests.offer(future); |
| | | flushPendingBindOrStartTLSRequests(); |
| | | return future; |
| | | } |
| | | } else { |
| | | final WrappedResultHandler<R> h = wrap(resultHandler); |
| | | return checkState(connection.extendedRequestAsync(request, |
| | | intermediateResponseHandler, h), h); |
| | | } |
| | | } else { |
| | | return connection.extendedRequestAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler)); |
| | | return newConnectionErrorFuture(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void handleConnectionClosed() { |
| | | notifyClosed(); |
| | | if (state.notifyConnectionClosed()) { |
| | | failPendingResults(newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, |
| | | HBCF_CONNECTION_CLOSED_BY_CLIENT.get())); |
| | | synchronized (validConnections) { |
| | | connection.removeConnectionEventListener(this); |
| | | validConnections.remove(this); |
| | | if (validConnections.isEmpty()) { |
| | | /* |
| | | * This is the last active connection, so stop the |
| | | * heartbeat. |
| | | */ |
| | | heartBeatFuture.cancel(false); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void handleConnectionError(final boolean isDisconnectNotification, |
| | | final ErrorResultException error) { |
| | | notifyClosed(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean handleEntry(final SearchResultEntry entry) { |
| | | updateTimestamp(); |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage())); |
| | | if (state.notifyConnectionError(isDisconnectNotification, error)) { |
| | | failPendingResults(error); |
| | | } |
| | | updateTimestamp(); |
| | | releaseHeartBeatLock(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean handleReference(final SearchResultReference reference) { |
| | | updateTimestamp(); |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public void handleResult(final Result result) { |
| | | updateTimestamp(); |
| | | releaseHeartBeatLock(); |
| | | } |
| | | |
| | | @Override |
| | | public void handleUnsolicitedNotification(final ExtendedResult notification) { |
| | | updateTimestamp(); |
| | | timestamp(notification); |
| | | state.notifyUnsolicitedNotification(notification); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isClosed() { |
| | | return state.isClosed(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isValid() { |
| | | return connection.isValid() && currentTimeMillis() < (timestamp + timeoutMS); |
| | | } |
| | | |
| | | @Override |
| | | public Result modify(final ModifyRequest request) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.modify(request)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Result modify(final String... ldifLines) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.modify(ldifLines)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | return state.isValid() && connection.isValid(); |
| | | } |
| | | |
| | | @Override |
| | | public FutureResult<Result> modifyAsync(final ModifyRequest request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final ResultHandler<? super Result> resultHandler) { |
| | | return connection.modifyAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler)); |
| | | } |
| | | |
| | | @Override |
| | | public Result modifyDN(final ModifyDNRequest request) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.modifyDN(request)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Result modifyDN(final String name, final String newRDN) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.modifyDN(name, newRDN)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | if (checkState(resultHandler)) { |
| | | final WrappedResultHandler<Result> h = wrap(resultHandler); |
| | | return checkState(connection.modifyAsync(request, intermediateResponseHandler, h), |
| | | h); |
| | | } else { |
| | | return newConnectionErrorFuture(); |
| | | } |
| | | } |
| | | |
| | |
| | | public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final ResultHandler<? super Result> resultHandler) { |
| | | return connection.modifyDNAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler)); |
| | | } |
| | | |
| | | @Override |
| | | public SearchResultEntry readEntry(final DN name, final String... attributeDescriptions) |
| | | throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.readEntry(name, attributeDescriptions)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | if (checkState(resultHandler)) { |
| | | final WrappedResultHandler<Result> h = wrap(resultHandler); |
| | | return checkState( |
| | | connection.modifyDNAsync(request, intermediateResponseHandler, h), h); |
| | | } else { |
| | | return newConnectionErrorFuture(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public SearchResultEntry readEntry(final String name, final String... attributeDescriptions) |
| | | throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.readEntry(name, attributeDescriptions)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public FutureResult<SearchResultEntry> readEntryAsync(final DN name, |
| | | final Collection<String> attributeDescriptions, |
| | | final ResultHandler<? super SearchResultEntry> handler) { |
| | | return connection.readEntryAsync(name, attributeDescriptions, timestamper(handler)); |
| | | } |
| | | |
| | | @Override |
| | | public ConnectionEntryReader search(final SearchRequest request) { |
| | | // Ensure that search results update timestamp. |
| | | return new ConnectionEntryReader(this, request); |
| | | } |
| | | |
| | | @Override |
| | | public Result search(final SearchRequest request, |
| | | final Collection<? super SearchResultEntry> entries) throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.search(request, entries)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Result search(final SearchRequest request, |
| | | final Collection<? super SearchResultEntry> entries, |
| | | final Collection<? super SearchResultReference> references) |
| | | throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.search(request, entries, references)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Result search(final SearchRequest request, final SearchResultHandler handler) |
| | | throws ErrorResultException { |
| | | try { |
| | | return connection.search(request, timestamper(handler)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public ConnectionEntryReader search(final String baseObject, final SearchScope scope, |
| | | final String filter, final String... attributeDescriptions) { |
| | | // Ensure that search results update timestamp. |
| | | final SearchRequest request = |
| | | Requests.newSearchRequest(baseObject, scope, filter, attributeDescriptions); |
| | | return new ConnectionEntryReader(this, request); |
| | | public void removeConnectionEventListener(final ConnectionEventListener listener) { |
| | | state.removeConnectionEventListener(listener); |
| | | } |
| | | |
| | | @Override |
| | | public FutureResult<Result> searchAsync(final SearchRequest request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final SearchResultHandler resultHandler) { |
| | | return connection.searchAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler)); |
| | | } |
| | | |
| | | @Override |
| | | public SearchResultEntry searchSingleEntry(final SearchRequest request) |
| | | throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.searchSingleEntry(request)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | if (checkState(resultHandler)) { |
| | | final WrappedSearchResultHandler h = wrap(resultHandler); |
| | | return checkState(connection.searchAsync(request, intermediateResponseHandler, h), |
| | | h); |
| | | } else { |
| | | return newConnectionErrorFuture(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public SearchResultEntry searchSingleEntry(final String baseObject, |
| | | final SearchScope scope, final String filter, final String... attributeDescriptions) |
| | | throws ErrorResultException { |
| | | try { |
| | | return timestamp(connection.searchSingleEntry(baseObject, scope, filter, |
| | | attributeDescriptions)); |
| | | } catch (final ErrorResultException e) { |
| | | throw timestamp(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public FutureResult<SearchResultEntry> searchSingleEntryAsync(final SearchRequest request, |
| | | final ResultHandler<? super SearchResultEntry> handler) { |
| | | return connection.searchSingleEntryAsync(request, timestamper(handler)); |
| | | } |
| | | |
| | | @Override |
| | | public String toString() { |
| | | final StringBuilder builder = new StringBuilder(); |
| | | builder.append("HeartBeatConnection("); |
| | |
| | | return builder.toString(); |
| | | } |
| | | |
| | | private void acquireBindOrStartTLSLock() throws ErrorResultException { |
| | | /* |
| | | * Wait for pending heartbeats and prevent new heartbeats from being |
| | | * sent while the bind is in progress. |
| | | */ |
| | | try { |
| | | if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) { |
| | | // Give up - it looks like the connection is dead. |
| | | // FIXME: improve error message. |
| | | throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN); |
| | | private void checkForHeartBeat() { |
| | | if (sync.isHeldExclusively()) { |
| | | /* |
| | | * A heart beat is still in progress, but it should have |
| | | * completed by now. Let's avoid aggressively terminating the |
| | | * connection, because the heart beat may simply have been |
| | | * delayed by a sudden surge of activity. Therefore, only flag |
| | | * the connection as failed if no activity has been seen on the |
| | | * connection since the heart beat was sent. |
| | | */ |
| | | final long currentTimeMillis = timeSource.currentTimeMillis(); |
| | | if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) { |
| | | if (DEBUG_LOG.isLoggable(Level.WARNING)) { |
| | | DEBUG_LOG.warning(String.format( |
| | | "No heartbeat detected for connection '%s'", connection)); |
| | | } |
| | | handleConnectionError(false, newHeartBeatTimeoutError()); |
| | | } |
| | | } catch (final InterruptedException e) { |
| | | throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e); |
| | | } |
| | | } |
| | | |
| | | private void flushPendingRequests() { |
| | | if (!pendingRequests.isEmpty()) { |
| | | private <R> FutureResult<R> checkState(final FutureResult<R> future, |
| | | final AbstractWrappedResultHandler<R, ? extends ResultHandler<? super R>> h) { |
| | | h.setInnerFuture(future); |
| | | checkState(h); |
| | | return h; |
| | | } |
| | | |
| | | private boolean checkState(final ResultHandler<?> h) { |
| | | final ErrorResultException error = state.getConnectionError(); |
| | | if (error != null) { |
| | | h.handleErrorResult(error); |
| | | return false; |
| | | } else { |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | private void failPendingResults(final ErrorResultException error) { |
| | | /* |
| | | * Peek instead of pool because notification is responsible for |
| | | * removing the element from the queue. |
| | | */ |
| | | AbstractWrappedResultHandler<?, ?> pendingResult; |
| | | while ((pendingResult = pendingResults.peek()) != null) { |
| | | pendingResult.handleErrorResult(error); |
| | | } |
| | | } |
| | | |
| | | private void flushPendingBindOrStartTLSRequests() { |
| | | if (!pendingBindOrStartTLSRequests.isEmpty()) { |
| | | /* |
| | | * The pending requests will acquire the shared lock, but we |
| | | * take it here anyway to ensure that pending requests do not |
| | |
| | | if (sync.tryLockShared()) { |
| | | try { |
| | | Runnable pendingRequest; |
| | | while ((pendingRequest = pendingRequests.poll()) != null) { |
| | | while ((pendingRequest = pendingBindOrStartTLSRequests.poll()) != null) { |
| | | // Dispatch the waiting request. This will not block. |
| | | pendingRequest.run(); |
| | | } |
| | | } finally { |
| | |
| | | } |
| | | } |
| | | |
| | | private void notifyClosed() { |
| | | synchronized (activeConnections) { |
| | | connection.removeConnectionEventListener(this); |
| | | activeConnections.remove(this); |
| | | if (activeConnections.isEmpty()) { |
| | | /* |
| | | * This is the last active connection, so stop the |
| | | * heartbeat. |
| | | */ |
| | | heartBeatFuture.cancel(false); |
| | | } |
| | | } |
| | | private boolean isStartTLSRequest(final ExtendedRequest<?> request) { |
| | | return request.getOID().equals(StartTLSExtendedRequest.OID); |
| | | } |
| | | |
| | | private <R> CompletedFutureResult<R> newConnectionErrorFuture() { |
| | | return new CompletedFutureResult<R>(state.getConnectionError()); |
| | | } |
| | | |
| | | private void releaseBindOrStartTLSLock() { |
| | |
| | | |
| | | private void releaseHeartBeatLock() { |
| | | sync.unlockExclusively(); |
| | | flushPendingRequests(); |
| | | flushPendingBindOrStartTLSRequests(); |
| | | } |
| | | |
| | | private void sendHeartBeat() { |
| | | /** |
| | | * Sends a heart beat on this connection if required to do so. |
| | | * |
| | | * @return {@code true} if a heart beat was sent, otherwise |
| | | * {@code false}. |
| | | */ |
| | | private boolean sendHeartBeat() { |
| | | /* |
| | | * Only send the heartbeat if the connection has been idle for some |
| | | * Don't attempt to send a heart beat if the connection has already |
| | | * failed. |
| | | */ |
| | | if (!state.isValid()) { |
| | | return false; |
| | | } |
| | | |
| | | /* |
| | | * Only send the heart beat if the connection has been idle for some |
| | | * time. |
| | | */ |
| | | if (currentTimeMillis() < (timestamp + minDelayMS)) { |
| | | return; |
| | | final long currentTimeMillis = timeSource.currentTimeMillis(); |
| | | if (currentTimeMillis < (lastResponseTimestamp + minDelayMS)) { |
| | | return false; |
| | | } |
| | | |
| | | /* |
| | | * Don't send a heart beat if there is already a heart beat, bind, |
| | | * or startTLS in progress. Note that the bind/startTLS response |
| | | * will update the timestamp as if it were a heart beat. |
| | | * will update the lastResponseTimestamp as if it were a heart beat. |
| | | */ |
| | | if (sync.tryLockExclusively()) { |
| | | try { |
| | | connection.searchAsync(heartBeatRequest, null, this); |
| | | } catch (final Exception e) { |
| | | connection.searchAsync(heartBeatRequest, null, heartBeatHandler); |
| | | return true; |
| | | } catch (final IllegalStateException e) { |
| | | /* |
| | | * This may happen when we attempt to send the heart beat |
| | | * just after the connection is closed but before we are |
| | |
| | | releaseHeartBeatLock(); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | private <R> R timestamp(final R response) { |
| | | updateTimestamp(); |
| | | if (!(response instanceof ConnectionException)) { |
| | | lastResponseTimestamp = timeSource.currentTimeMillis(); |
| | | } |
| | | return response; |
| | | } |
| | | |
| | | private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler) { |
| | | return timestamper(handler, false); |
| | | private <R> WrappedResultHandler<R> wrap(final ResultHandler<? super R> handler) { |
| | | final WrappedResultHandler<R> h = new WrappedResultHandler<R>(handler); |
| | | pendingResults.add(h); |
| | | return h; |
| | | } |
| | | |
| | | private <R> ResultHandler<R> timestamper(final ResultHandler<? super R> handler, |
| | | final boolean isBindOrStartTLS) { |
| | | return new ResultHandler<R>() { |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | releaseIfNeeded(); |
| | | if (handler != null) { |
| | | handler.handleErrorResult(timestamp(error)); |
| | | } else { |
| | | timestamp(error); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void handleResult(final R result) { |
| | | releaseIfNeeded(); |
| | | if (handler != null) { |
| | | handler.handleResult(timestamp(result)); |
| | | } else { |
| | | timestamp(result); |
| | | } |
| | | } |
| | | |
| | | private void releaseIfNeeded() { |
| | | if (isBindOrStartTLS) { |
| | | releaseBindOrStartTLSLock(); |
| | | } |
| | | } |
| | | }; |
| | | private WrappedSearchResultHandler wrap(final SearchResultHandler handler) { |
| | | final WrappedSearchResultHandler h = new WrappedSearchResultHandler(handler); |
| | | pendingResults.add(h); |
| | | return h; |
| | | } |
| | | |
| | | private SearchResultHandler timestamper(final SearchResultHandler handler) { |
| | | return new SearchResultHandler() { |
| | | @Override |
| | | public boolean handleEntry(final SearchResultEntry entry) { |
| | | return handler.handleEntry(timestamp(entry)); |
| | | } |
| | | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | handler.handleErrorResult(timestamp(error)); |
| | | } |
| | | |
| | | @Override |
| | | public boolean handleReference(final SearchResultReference reference) { |
| | | return handler.handleReference(timestamp(reference)); |
| | | } |
| | | |
| | | @Override |
| | | public void handleResult(final Result result) { |
| | | handler.handleResult(timestamp(result)); |
| | | } |
| | | }; |
| | | } |
| | | |
| | | private void updateTimestamp() { |
| | | timestamp = currentTimeMillis(); |
| | | private <R> WrappedBindOrStartTLSResultHandler<R> wrapForBindOrStartTLS( |
| | | final ResultHandler<? super R> handler) { |
| | | final WrappedBindOrStartTLSResultHandler<R> h = |
| | | new WrappedBindOrStartTLSResultHandler<R>(handler); |
| | | pendingResults.add(h); |
| | | return h; |
| | | } |
| | | } |
| | | |
| | |
| | | * </ul> |
| | | */ |
| | | private static final class Sync extends AbstractQueuedSynchronizer { |
| | | /* Lock states. Positive values indicate that the shared lock is taken. */ |
| | | private static final int UNLOCKED = 0; // initial state |
| | | private static final int LOCKED_EXCLUSIVELY = -1; |
| | | |
| | | // Keep compiler quiet. |
| | | private static final long serialVersionUID = -3590428415442668336L; |
| | | |
| | | /* Lock states. Positive values indicate that the shared lock is taken. */ |
| | | private static final int UNLOCKED = 0; // initial state |
| | | |
| | | @Override |
| | | protected boolean isHeldExclusively() { |
| | | return getState() == LOCKED_EXCLUSIVELY; |
| | |
| | | return tryAcquireShared(1) > 0; |
| | | } |
| | | |
| | | boolean tryLockShared(final long timeout, final TimeUnit unit) throws InterruptedException { |
| | | return tryAcquireSharedNanos(1, unit.toNanos(timeout)); |
| | | } |
| | | |
| | | void unlockExclusively() { |
| | | release(0 /* unused */); |
| | | } |
| | |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Default heart beat which will target the root DSE but not return any |
| | | * results. |
| | | */ |
| | | private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("", |
| | | SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1"); |
| | | SearchScope.BASE_OBJECT, "(objectClass=1.1)", "1.1"); |
| | | |
| | | private final List<ConnectionImpl> activeConnections; |
| | | /** |
| | | * This is package private in order to allow unit tests to inject fake time |
| | | * stamps. |
| | | */ |
| | | TimeSource timeSource = TimeSource.DEFAULT; |
| | | |
| | | /** |
| | | * Scheduled task which checks that all heart beats have been received |
| | | * within the timeout period. |
| | | */ |
| | | private final Runnable checkHeartBeatRunnable = new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | for (final ConnectionImpl connection : getValidConnections()) { |
| | | connection.checkForHeartBeat(); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | /** |
| | | * Underlying connection factory. |
| | | */ |
| | | private final ConnectionFactory factory; |
| | | |
| | | /** |
| | | * The heartbeat scheduled future - which may be null if heartbeats are not |
| | | * being sent (no valid connections). |
| | | */ |
| | | private ScheduledFuture<?> heartBeatFuture; |
| | | |
| | | /** |
| | | * The heartbeat search request. |
| | | */ |
| | | private final SearchRequest heartBeatRequest; |
| | | |
| | | /** |
| | | * The interval between successive heartbeats. |
| | | */ |
| | | private final long interval; |
| | | private final TimeUnit intervalUnit; |
| | | |
| | | /** |
| | | * Flag which indicates whether this factory has been closed. |
| | | */ |
| | | private final AtomicBoolean isClosed = new AtomicBoolean(); |
| | | |
| | | /** |
| | | * The minimum amount of time the connection should remain idle (no |
| | | * responses) before starting to send heartbeats. |
| | | */ |
| | | private final long minDelayMS; |
| | | /** |
| | | * The heartbeat scheduler. |
| | | */ |
| | | private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; |
| | | |
| | | /** |
| | | * Scheduled task which sends heart beats for all valid idle connections. |
| | | */ |
| | | private final Runnable sendHeartBeatRunnable = new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | boolean heartBeatSent = false; |
| | | for (final ConnectionImpl connection : getValidConnections()) { |
| | | heartBeatSent |= connection.sendHeartBeat(); |
| | | } |
| | | if (heartBeatSent) { |
| | | scheduler.get().schedule(checkHeartBeatRunnable, timeoutMS, TimeUnit.MILLISECONDS); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | /** |
| | | * The heartbeat timeout in milli-seconds. The connection will be marked as |
| | | * failed if no heartbeat response is received within the timeout. |
| | | */ |
| | | private final long timeoutMS; |
| | | private final TimeUnit unit; |
| | | private AtomicBoolean isClosed = new AtomicBoolean(); |
| | | |
| | | HeartBeatConnectionFactory(final ConnectionFactory factory) { |
| | | this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null); |
| | | } |
| | | /** |
| | | * List of valid connections to which heartbeats will be sent. |
| | | */ |
| | | private final List<ConnectionImpl> validConnections = new LinkedList<ConnectionImpl>(); |
| | | |
| | | HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, |
| | | final TimeUnit unit) { |
| | | this(factory, interval, unit, DEFAULT_SEARCH, null); |
| | | } |
| | | |
| | | HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, |
| | | final TimeUnit unit, final SearchRequest heartBeat) { |
| | | this(factory, interval, unit, heartBeat, null); |
| | | } |
| | | |
| | | HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, |
| | | final TimeUnit unit, final SearchRequest heartBeat, |
| | | final long timeout, final TimeUnit unit, final SearchRequest heartBeat, |
| | | final ScheduledExecutorService scheduler) { |
| | | Validator.ensureNotNull(factory, heartBeat, unit); |
| | | Validator.ensureTrue(interval >= 0, "negative timeout"); |
| | | Validator.ensureNotNull(factory, unit); |
| | | Validator.ensureTrue(interval >= 0, "negative interval"); |
| | | Validator.ensureTrue(timeout >= 0, "negative timeout"); |
| | | |
| | | this.heartBeatRequest = heartBeat; |
| | | this.heartBeatRequest = heartBeat != null ? heartBeat : DEFAULT_SEARCH; |
| | | this.interval = interval; |
| | | this.unit = unit; |
| | | this.activeConnections = new LinkedList<ConnectionImpl>(); |
| | | this.intervalUnit = unit; |
| | | this.factory = factory; |
| | | this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler); |
| | | this.timeoutMS = unit.toMillis(interval) * 2; |
| | | this.timeoutMS = unit.toMillis(timeout); |
| | | this.minDelayMS = unit.toMillis(interval) / 2; |
| | | } |
| | | |
| | | @Override |
| | | public void close() { |
| | | if (isClosed.compareAndSet(false, true)) { |
| | | synchronized (activeConnections) { |
| | | if (!activeConnections.isEmpty()) { |
| | | synchronized (validConnections) { |
| | | if (!validConnections.isEmpty()) { |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "HeartbeatConnectionFactory '%s' is closing while %d " |
| | | + "active connections remain", toString(), |
| | | activeConnections.size())); |
| | | + "active connections remain", toString(), validConnections |
| | | .size())); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | @Override |
| | | public Connection getConnection() throws ErrorResultException { |
| | | return adaptConnection(factory.getConnection()); |
| | | /* |
| | | * Immediately send a heart beat in order to determine if the connected |
| | | * server is responding. |
| | | */ |
| | | final Connection connection = factory.getConnection(); |
| | | boolean keepConnection = false; |
| | | try { |
| | | connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS, |
| | | TimeUnit.MILLISECONDS); |
| | | keepConnection = true; |
| | | return adaptConnection(connection); |
| | | } catch (final Exception e) { |
| | | throw adaptHeartBeatError(e); |
| | | } finally { |
| | | if (!keepConnection) { |
| | | connection.close(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public FutureResult<Connection> getConnectionAsync( |
| | | final ResultHandler<? super Connection> handler) { |
| | | final FutureResultTransformer<Connection, Connection> future = |
| | | new FutureResultTransformer<Connection, Connection>(handler) { |
| | | @Override |
| | | protected Connection transformResult(final Connection connection) |
| | | throws ErrorResultException { |
| | | return adaptConnection(connection); |
| | | } |
| | | }; |
| | | // Create a future responsible for chaining the initial heartbeat search. |
| | | final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler); |
| | | |
| | | future.setFutureResult(factory.getConnectionAsync(future)); |
| | | return future; |
| | | // Request a connection. |
| | | final FutureResult<Connection> connectionFuture = |
| | | factory.getConnectionAsync(compositeFuture.futureConnectionResult); |
| | | |
| | | // Set the connection future in the composite so that the returned search future can delegate. |
| | | compositeFuture.futureConnectionResult.setFutureResult(connectionFuture); |
| | | |
| | | // Return the future representing the heartbeat. |
| | | return compositeFuture.futureSearchResult; |
| | | } |
| | | |
| | | @Override |
| | |
| | | } |
| | | |
| | | private Connection adaptConnection(final Connection connection) { |
| | | final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection); |
| | | synchronized (activeConnections) { |
| | | connection.addConnectionEventListener(heartBeatConnection); |
| | | if (activeConnections.isEmpty()) { |
| | | synchronized (validConnections) { |
| | | final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection); |
| | | if (validConnections.isEmpty()) { |
| | | /* This is the first active connection, so start the heart beat. */ |
| | | heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | final ConnectionImpl[] tmp; |
| | | synchronized (activeConnections) { |
| | | tmp = activeConnections.toArray(new ConnectionImpl[0]); |
| | | } |
| | | for (final ConnectionImpl connection : tmp) { |
| | | connection.sendHeartBeat(); |
| | | } |
| | | } |
| | | }, 0, interval, unit); |
| | | heartBeatFuture = |
| | | scheduler.get().scheduleWithFixedDelay(sendHeartBeatRunnable, 0, interval, |
| | | intervalUnit); |
| | | } |
| | | activeConnections.add(heartBeatConnection); |
| | | validConnections.add(heartBeatConnection); |
| | | return heartBeatConnection; |
| | | } |
| | | return heartBeatConnection; |
| | | } |
| | | |
| | | private ErrorResultException adaptHeartBeatError(final Exception error) { |
| | | if (error instanceof ConnectionException) { |
| | | return (ErrorResultException) error; |
| | | } else if (error instanceof TimeoutResultException || error instanceof TimeoutException) { |
| | | return newHeartBeatTimeoutError(); |
| | | } else if (error instanceof InterruptedException) { |
| | | return newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, error); |
| | | } else { |
| | | return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_FAILED.get(), |
| | | error); |
| | | } |
| | | } |
| | | |
| | | private ConnectionImpl[] getValidConnections() { |
| | | final ConnectionImpl[] tmp; |
| | | synchronized (validConnections) { |
| | | tmp = validConnections.toArray(new ConnectionImpl[0]); |
| | | } |
| | | return tmp; |
| | | } |
| | | |
| | | private ErrorResultException newHeartBeatTimeoutError() { |
| | | return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_TIMEOUT |
| | | .get(timeoutMS)); |
| | | } |
| | | } |