/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER; import static org.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT; import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_FAILED; import static org.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.logging.Level; import org.forgerock.opendj.ldap.requests.AbandonRequest; import org.forgerock.opendj.ldap.requests.AddRequest; import org.forgerock.opendj.ldap.requests.BindRequest; import org.forgerock.opendj.ldap.requests.CompareRequest; import org.forgerock.opendj.ldap.requests.DeleteRequest; import org.forgerock.opendj.ldap.requests.ExtendedRequest; import org.forgerock.opendj.ldap.requests.ModifyDNRequest; import org.forgerock.opendj.ldap.requests.ModifyRequest; import org.forgerock.opendj.ldap.requests.Requests; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.BindResult; import org.forgerock.opendj.ldap.responses.CompareResult; import org.forgerock.opendj.ldap.responses.ExtendedResult; import org.forgerock.opendj.ldap.responses.Result; import org.forgerock.opendj.ldap.responses.SearchResultEntry; import org.forgerock.opendj.ldap.responses.SearchResultReference; import com.forgerock.opendj.ldap.ConnectionState; import com.forgerock.opendj.util.AsynchronousFutureResult; import com.forgerock.opendj.util.CompletedFutureResult; import com.forgerock.opendj.util.FutureResultTransformer; import com.forgerock.opendj.util.RecursiveFutureResult; import com.forgerock.opendj.util.ReferenceCountedObject; import com.forgerock.opendj.util.TimeSource; import com.forgerock.opendj.util.Validator; /** * An heart beat connection factory can be used to create connections that sends * a periodic search request to a Directory Server. *

* Before returning new connections to the application this factory will first * send an initial heart beat in order to determine that the remote server is * responsive. If the heart beat fails or times out then the connection is * closed immediately and an error returned to the client. *

* Once a connection has been established successfully (including the initial * heart beat), this factory will periodically send heart beats on the * connection based on the configured heart beat interval. If the heart beat * times out then the server is assumed to be down and an appropriate * {@link ConnectionException} generated and published to any registered * {@link ConnectionEventListener}s. Note however, that heart beats will only be * sent when the connection is determined to be reasonably idle: there is no * point in sending heart beats if the connection has recently received a * response. A connection is deemed to be idle if no response has been received * during a period equivalent to half the heart beat interval. *

* The LDAP protocol specifically precludes clients from performing operations * while bind or startTLS requests are being performed. Likewise, a bind or * startTLS request will cause active operations to be aborted. This factory * coordinates heart beats with bind or startTLS requests, ensuring that they * are not performed concurrently. Specifically, bind and startTLS requests are * queued up while a heart beat is pending, and heart beats are not sent at all * while there are pending bind or startTLS requests. */ final class HeartBeatConnectionFactory implements ConnectionFactory { /** * This class is responsible for performing an initial heart beat once a new * connection has been obtained and, if the heart beat succeeds, adapting it * to a {@code ConnectionImpl} and registering it in the table of valid * connections. */ private final class ConnectionFutureResultImpl { /** * This class handles the initial heart beat result notification or * timeout. We need to take care to avoid processing multiple results, * which may occur when the heart beat is timed out and a result follows * soon after, or vice versa. */ private final class InitialHeartBeatResultHandler implements SearchResultHandler, Runnable { private final ResultHandler handler; /** * Due to a potential race between the heart beat timing out and the * heart beat completing this atomic ensures that notification only * occurs once. */ private final AtomicBoolean isComplete = new AtomicBoolean(); private InitialHeartBeatResultHandler(final ResultHandler handler) { this.handler = handler; } @Override public boolean handleEntry(final SearchResultEntry entry) { /* * Depending on the configuration, a heartbeat may return some * entries. However, we can just ignore them. */ return true; } @Override public void handleErrorResult(final ErrorResultException error) { if (isComplete.compareAndSet(false, true)) { handler.handleErrorResult(error); } } @Override public boolean handleReference(final SearchResultReference reference) { /* * Depending on the configuration, a heartbeat may return some * references. However, we can just ignore them. */ return true; } @Override public void handleResult(final Result result) { if (isComplete.compareAndSet(false, true)) { handler.handleResult(result); } } /* * Invoked by the scheduler when the heart beat times out. */ @Override public void run() { handleErrorResult(newHeartBeatTimeoutError()); } } private Connection connection; private final RecursiveFutureResult futureConnectionResult; private final FutureResultTransformer futureSearchResult; private ConnectionFutureResultImpl(final ResultHandler handler) { // Create a future which will handle the initial heart beat result. this.futureSearchResult = new FutureResultTransformer(handler) { @Override protected ErrorResultException transformErrorResult( final ErrorResultException errorResult) { // Ensure that the connection is closed. if (connection != null) { connection.close(); connection = null; } return adaptHeartBeatError(errorResult); } @Override protected Connection transformResult(final Result result) throws ErrorResultException { return adaptConnection(connection); } }; // Create a future which will handle connection result. this.futureConnectionResult = new RecursiveFutureResult(futureSearchResult) { @Override protected FutureResult chainResult( final Connection innerResult, final ResultHandler handler) throws ErrorResultException { // Save the connection for later once the heart beat completes. connection = innerResult; /* * Send the initial heart beat and schedule a client * side timeout notification. */ final InitialHeartBeatResultHandler wrappedHandler = new InitialHeartBeatResultHandler(handler); scheduler.get().schedule(wrappedHandler, timeoutMS, TimeUnit.MILLISECONDS); return connection.searchAsync(heartBeatRequest, null, wrappedHandler); } }; // Link the two futures. futureSearchResult.setFutureResult(futureConnectionResult); } } /** * A connection that sends heart beats and supports all operations. */ private final class ConnectionImpl extends AbstractAsynchronousConnection implements ConnectionEventListener { /** * A result handler wrapper for operations which timestamps the * connection for each response received. The wrapper ensures that * completed requests are removed from the {@code pendingResults} queue, * as well as ensuring that requests are only completed once. */ private abstract class AbstractWrappedResultHandler> implements ResultHandler, FutureResult { /** The user provided result handler. */ protected final H handler; private final CountDownLatch completed = new CountDownLatch(1); private ErrorResultException error; private FutureResult innerFuture; private R result; AbstractWrappedResultHandler(final H handler) { this.handler = handler; } @Override public boolean cancel(final boolean mayInterruptIfRunning) { return innerFuture.cancel(mayInterruptIfRunning); } @Override public R get() throws ErrorResultException, InterruptedException { completed.await(); return get0(); } @Override public R get(final long timeout, final TimeUnit unit) throws ErrorResultException, TimeoutException, InterruptedException { if (completed.await(timeout, unit)) { return get0(); } else { throw new TimeoutException(); } } @Override public int getRequestID() { return innerFuture.getRequestID(); } @Override public void handleErrorResult(final ErrorResultException error) { if (tryComplete(null, error)) { if (handler != null) { handler.handleErrorResult(timestamp(error)); } else { timestamp(error); } } } @Override public void handleResult(final R result) { if (tryComplete(result, null)) { if (handler != null) { handler.handleResult(timestamp(result)); } else { timestamp(result); } } } @Override public boolean isCancelled() { return innerFuture.isCancelled(); } @Override public boolean isDone() { return completed.getCount() == 0; } abstract void releaseBindOrStartTLSLockIfNeeded(); FutureResult setInnerFuture(final FutureResult innerFuture) { this.innerFuture = innerFuture; return this; } private R get0() throws ErrorResultException { if (result != null) { return result; } else { throw error; } } /** * Attempts to complete this request, returning true if successful. * This method is synchronized in order to avoid race conditions * with search result processing. */ private synchronized boolean tryComplete(final R result, final ErrorResultException error) { if (pendingResults.remove(this)) { this.result = result; this.error = error; completed.countDown(); releaseBindOrStartTLSLockIfNeeded(); return true; } else { return false; } } } /** * Runs pending request once the shared lock becomes available (when no * heart beat is in progress). * * @param * The type of result returned by the request. */ private abstract class DelayedFuture extends AsynchronousFutureResult> implements Runnable { private volatile FutureResult innerFuture = null; protected DelayedFuture(final ResultHandler handler) { super(handler); } @Override public final int getRequestID() { return innerFuture != null ? innerFuture.getRequestID() : -1; } @Override public final void run() { if (!isCancelled()) { sync.lockShared(); // Will not block. innerFuture = dispatch(); if (isCancelled() && !innerFuture.isCancelled()) { innerFuture.cancel(false); } } } protected abstract FutureResult dispatch(); @Override protected final ErrorResultException handleCancelRequest( final boolean mayInterruptIfRunning) { if (innerFuture != null) { innerFuture.cancel(mayInterruptIfRunning); } return null; } } /** * A result handler wrapper for bind or startTLS requests which releases * the bind/startTLS lock on completion. */ private final class WrappedBindOrStartTLSResultHandler extends AbstractWrappedResultHandler> { WrappedBindOrStartTLSResultHandler(final ResultHandler handler) { super(handler); } @Override void releaseBindOrStartTLSLockIfNeeded() { releaseBindOrStartTLSLock(); } } /** * A result handler wrapper for normal requests which does not release * the bind/startTLS lock on completion. */ private final class WrappedResultHandler extends AbstractWrappedResultHandler> { WrappedResultHandler(final ResultHandler handler) { super(handler); } @Override void releaseBindOrStartTLSLockIfNeeded() { // No-op for normal operations. } } /** * A result handler wrapper for search operations. Ensures that search * results are not sent once the request has been completed (see * markComplete()). */ private final class WrappedSearchResultHandler extends AbstractWrappedResultHandler implements SearchResultHandler { WrappedSearchResultHandler(final SearchResultHandler handler) { super(handler); } @Override public synchronized boolean handleEntry(final SearchResultEntry entry) { if (!isDone()) { if (handler != null) { handler.handleEntry(timestamp(entry)); } else { timestamp(entry); } return true; } else { return false; } } @Override public synchronized boolean handleReference(final SearchResultReference reference) { if (!isDone()) { if (handler != null) { handler.handleReference(timestamp(reference)); } else { timestamp(reference); } return true; } else { return false; } } @Override void releaseBindOrStartTLSLockIfNeeded() { // No-op for normal operations. } } /** The wrapped connection. */ private final Connection connection; /** * Search result handler for processing heart beat responses. */ private final SearchResultHandler heartBeatHandler = new SearchResultHandler() { @Override public boolean handleEntry(final SearchResultEntry entry) { timestamp(entry); return true; } @Override public void handleErrorResult(final ErrorResultException error) { /* * Connection failure will be handled by connection event * listener. Ignore cancellation errors since these indicate * that the heart beat was aborted by a client-side close. */ if (!(error instanceof CancelledResultException)) { if (DEBUG_LOG.isLoggable(Level.WARNING)) { DEBUG_LOG.warning(String.format( "Heartbeat failed for connection factory '%s': %s", factory, error .getMessage())); } timestamp(error); } releaseHeartBeatLock(); } @Override public boolean handleReference(final SearchResultReference reference) { timestamp(reference); return true; } @Override public void handleResult(final Result result) { timestamp(result); releaseHeartBeatLock(); } }; /** * List of pending Bind or StartTLS requests which must be invoked once * the current heart beat completes. */ private final Queue pendingBindOrStartTLSRequests = new ConcurrentLinkedQueue(); /** * List of pending responses for all active operations. These will be * signalled if no heart beat is detected within the permitted timeout * period. */ private final Queue> pendingResults = new ConcurrentLinkedQueue>(); /** Internal connection state. */ private final ConnectionState state = new ConnectionState(); /** Coordinates heart-beats with Bind and StartTLS requests. */ private final Sync sync = new Sync(); /** * Timestamp of last response received (any response, not just heart * beats). */ private volatile long lastResponseTimestamp = timeSource.currentTimeMillis(); // Assume valid at creation. private ConnectionImpl(final Connection connection) { this.connection = connection; connection.addConnectionEventListener(this); } @Override public FutureResult abandonAsync(final AbandonRequest request) { return connection.abandonAsync(request); } @Override public FutureResult addAsync(final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { if (checkState(resultHandler)) { final WrappedResultHandler h = wrap(resultHandler); return checkState(connection.addAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public void addConnectionEventListener(final ConnectionEventListener listener) { state.addConnectionEventListener(listener); } @Override public FutureResult bindAsync(final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { if (checkState(resultHandler)) { if (sync.tryLockShared()) { // Fast path final WrappedBindOrStartTLSResultHandler h = wrapForBindOrStartTLS(resultHandler); return checkState( connection.bindAsync(request, intermediateResponseHandler, h), h); } else { /* * A heart beat must be in progress so create a runnable * task which will be executed when the heart beat * completes. */ final DelayedFuture future = new DelayedFuture(resultHandler) { @Override public FutureResult dispatch() { final WrappedBindOrStartTLSResultHandler h = wrapForBindOrStartTLS(this); return checkState(connection.bindAsync(request, intermediateResponseHandler, h), h); } }; /* * Enqueue and flush if the heart beat has completed in the * mean time. */ pendingBindOrStartTLSRequests.offer(future); flushPendingBindOrStartTLSRequests(); return future; } } else { return newConnectionErrorFuture(); } } @Override public void close() { handleConnectionClosed(); connection.close(); } @Override public void close(final UnbindRequest request, final String reason) { handleConnectionClosed(); connection.close(request, reason); } @Override public FutureResult compareAsync(final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { if (checkState(resultHandler)) { final WrappedResultHandler h = wrap(resultHandler); return checkState(connection.compareAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public FutureResult deleteAsync(final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { if (checkState(resultHandler)) { final WrappedResultHandler h = wrap(resultHandler); return checkState(connection.deleteAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public FutureResult extendedRequestAsync( final ExtendedRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { if (checkState(resultHandler)) { if (isStartTLSRequest(request)) { if (sync.tryLockShared()) { // Fast path final WrappedBindOrStartTLSResultHandler h = wrapForBindOrStartTLS(resultHandler); return checkState(connection.extendedRequestAsync(request, intermediateResponseHandler, h), h); } else { /* * A heart beat must be in progress so create a runnable * task which will be executed when the heart beat * completes. */ final DelayedFuture future = new DelayedFuture(resultHandler) { @Override public FutureResult dispatch() { final WrappedBindOrStartTLSResultHandler h = wrapForBindOrStartTLS(this); return checkState(connection.extendedRequestAsync(request, intermediateResponseHandler, h), h); } }; /* * Enqueue and flush if the heart beat has completed in * the mean time. */ pendingBindOrStartTLSRequests.offer(future); flushPendingBindOrStartTLSRequests(); return future; } } else { final WrappedResultHandler h = wrap(resultHandler); return checkState(connection.extendedRequestAsync(request, intermediateResponseHandler, h), h); } } else { return newConnectionErrorFuture(); } } @Override public void handleConnectionClosed() { if (state.notifyConnectionClosed()) { failPendingResults(newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, HBCF_CONNECTION_CLOSED_BY_CLIENT.get())); synchronized (validConnections) { connection.removeConnectionEventListener(this); validConnections.remove(this); if (validConnections.isEmpty()) { /* * This is the last active connection, so stop the * heartbeat. */ heartBeatFuture.cancel(false); } } } } @Override public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { if (state.notifyConnectionError(isDisconnectNotification, error)) { failPendingResults(error); } } @Override public void handleUnsolicitedNotification(final ExtendedResult notification) { timestamp(notification); state.notifyUnsolicitedNotification(notification); } @Override public boolean isClosed() { return state.isClosed(); } @Override public boolean isValid() { return state.isValid() && connection.isValid(); } @Override public FutureResult modifyAsync(final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { if (checkState(resultHandler)) { final WrappedResultHandler h = wrap(resultHandler); return checkState(connection.modifyAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public FutureResult modifyDNAsync(final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler, final ResultHandler resultHandler) { if (checkState(resultHandler)) { final WrappedResultHandler h = wrap(resultHandler); return checkState( connection.modifyDNAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public void removeConnectionEventListener(final ConnectionEventListener listener) { state.removeConnectionEventListener(listener); } @Override public FutureResult searchAsync(final SearchRequest request, final IntermediateResponseHandler intermediateResponseHandler, final SearchResultHandler resultHandler) { if (checkState(resultHandler)) { final WrappedSearchResultHandler h = wrap(resultHandler); return checkState(connection.searchAsync(request, intermediateResponseHandler, h), h); } else { return newConnectionErrorFuture(); } } @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("HeartBeatConnection("); builder.append(connection); builder.append(')'); return builder.toString(); } private void checkForHeartBeat() { if (sync.isHeldExclusively()) { /* * A heart beat is still in progress, but it should have * completed by now. Let's avoid aggressively terminating the * connection, because the heart beat may simply have been * delayed by a sudden surge of activity. Therefore, only flag * the connection as failed if no activity has been seen on the * connection since the heart beat was sent. */ final long currentTimeMillis = timeSource.currentTimeMillis(); if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) { if (DEBUG_LOG.isLoggable(Level.WARNING)) { DEBUG_LOG.warning(String.format( "No heartbeat detected for connection '%s'", connection)); } handleConnectionError(false, newHeartBeatTimeoutError()); } } } private FutureResult checkState(final FutureResult future, final AbstractWrappedResultHandler> h) { h.setInnerFuture(future); checkState(h); return h; } private boolean checkState(final ResultHandler h) { final ErrorResultException error = state.getConnectionError(); if (error != null) { h.handleErrorResult(error); return false; } else { return true; } } private void failPendingResults(final ErrorResultException error) { /* * Peek instead of pool because notification is responsible for * removing the element from the queue. */ AbstractWrappedResultHandler pendingResult; while ((pendingResult = pendingResults.peek()) != null) { pendingResult.handleErrorResult(error); } } private void flushPendingBindOrStartTLSRequests() { if (!pendingBindOrStartTLSRequests.isEmpty()) { /* * The pending requests will acquire the shared lock, but we * take it here anyway to ensure that pending requests do not * get blocked. */ if (sync.tryLockShared()) { try { Runnable pendingRequest; while ((pendingRequest = pendingBindOrStartTLSRequests.poll()) != null) { // Dispatch the waiting request. This will not block. pendingRequest.run(); } } finally { sync.unlockShared(); } } } } private boolean isStartTLSRequest(final ExtendedRequest request) { return request.getOID().equals(StartTLSExtendedRequest.OID); } private CompletedFutureResult newConnectionErrorFuture() { return new CompletedFutureResult(state.getConnectionError()); } private void releaseBindOrStartTLSLock() { sync.unlockShared(); } private void releaseHeartBeatLock() { sync.unlockExclusively(); flushPendingBindOrStartTLSRequests(); } /** * Sends a heart beat on this connection if required to do so. * * @return {@code true} if a heart beat was sent, otherwise * {@code false}. */ private boolean sendHeartBeat() { /* * Don't attempt to send a heart beat if the connection has already * failed. */ if (!state.isValid()) { return false; } /* * Only send the heart beat if the connection has been idle for some * time. */ final long currentTimeMillis = timeSource.currentTimeMillis(); if (currentTimeMillis < (lastResponseTimestamp + minDelayMS)) { return false; } /* * Don't send a heart beat if there is already a heart beat, bind, * or startTLS in progress. Note that the bind/startTLS response * will update the lastResponseTimestamp as if it were a heart beat. */ if (sync.tryLockExclusively()) { try { connection.searchAsync(heartBeatRequest, null, heartBeatHandler); return true; } catch (final IllegalStateException e) { /* * This may happen when we attempt to send the heart beat * just after the connection is closed but before we are * notified. */ /* * Release the lock because we're never going to get a * response. */ releaseHeartBeatLock(); } } return false; } private R timestamp(final R response) { if (!(response instanceof ConnectionException)) { lastResponseTimestamp = timeSource.currentTimeMillis(); } return response; } private WrappedResultHandler wrap(final ResultHandler handler) { final WrappedResultHandler h = new WrappedResultHandler(handler); pendingResults.add(h); return h; } private WrappedSearchResultHandler wrap(final SearchResultHandler handler) { final WrappedSearchResultHandler h = new WrappedSearchResultHandler(handler); pendingResults.add(h); return h; } private WrappedBindOrStartTLSResultHandler wrapForBindOrStartTLS( final ResultHandler handler) { final WrappedBindOrStartTLSResultHandler h = new WrappedBindOrStartTLSResultHandler(handler); pendingResults.add(h); return h; } } /** * This synchronizer prevents Bind or StartTLS operations from being * processed concurrently with heart-beats. This is required because the * LDAP protocol specifically states that servers receiving a Bind operation * should either wait for existing operations to complete or abandon them. * The same presumably applies to StartTLS operations. Note that concurrent * bind/StartTLS operations are not permitted. *

* This connection factory only coordinates Bind and StartTLS requests with * heart-beats. It does not attempt to prevent or control attempts to send * multiple concurrent Bind or StartTLS operations, etc. *

* This synchronizer can be thought of as cross between a read-write lock * and a semaphore. Unlike a read-write lock there is no requirement that a * thread releasing a lock must hold it. In addition, this synchronizer does * not support reentrancy. A thread attempting to acquire exclusively more * than once will deadlock, and a thread attempting to acquire shared more * than once will succeed and be required to release an equivalent number of * times. *

* The synchronizer has three states: *

*/ private static final class Sync extends AbstractQueuedSynchronizer { private static final int LOCKED_EXCLUSIVELY = -1; // Keep compiler quiet. private static final long serialVersionUID = -3590428415442668336L; /* Lock states. Positive values indicate that the shared lock is taken. */ private static final int UNLOCKED = 0; // initial state @Override protected boolean isHeldExclusively() { return getState() == LOCKED_EXCLUSIVELY; } @Override protected boolean tryAcquire(final int ignored) { if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected int tryAcquireShared(final int readers) { for (;;) { final int state = getState(); if (state == LOCKED_EXCLUSIVELY) { return LOCKED_EXCLUSIVELY; // failed } final int newState = state + readers; if (compareAndSetState(state, newState)) { return newState; // succeeded + more readers allowed } } } @Override protected boolean tryRelease(final int ignored) { if (getState() != LOCKED_EXCLUSIVELY) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(UNLOCKED); return true; } @Override protected boolean tryReleaseShared(final int ignored) { for (;;) { final int state = getState(); if (state == UNLOCKED || state == LOCKED_EXCLUSIVELY) { throw new IllegalMonitorStateException(); } final int newState = state - 1; if (compareAndSetState(state, newState)) { /* * We could always return true here, but since there cannot * be waiting readers we can specialize for waiting writers. */ return newState == UNLOCKED; } } } void lockShared() { acquireShared(1); } boolean tryLockExclusively() { return tryAcquire(0 /* unused */); } boolean tryLockShared() { return tryAcquireShared(1) > 0; } void unlockExclusively() { release(0 /* unused */); } void unlockShared() { releaseShared(0 /* unused */); } } /** * Default heart beat which will target the root DSE but not return any * results. */ private static final SearchRequest DEFAULT_SEARCH = Requests.newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=1.1)", "1.1"); /** * This is package private in order to allow unit tests to inject fake time * stamps. */ TimeSource timeSource = TimeSource.DEFAULT; /** * Scheduled task which checks that all heart beats have been received * within the timeout period. */ private final Runnable checkHeartBeatRunnable = new Runnable() { @Override public void run() { for (final ConnectionImpl connection : getValidConnections()) { connection.checkForHeartBeat(); } } }; /** * Underlying connection factory. */ private final ConnectionFactory factory; /** * The heartbeat scheduled future - which may be null if heartbeats are not * being sent (no valid connections). */ private ScheduledFuture heartBeatFuture; /** * The heartbeat search request. */ private final SearchRequest heartBeatRequest; /** * The interval between successive heartbeats. */ private final long interval; private final TimeUnit intervalUnit; /** * Flag which indicates whether this factory has been closed. */ private final AtomicBoolean isClosed = new AtomicBoolean(); /** * The minimum amount of time the connection should remain idle (no * responses) before starting to send heartbeats. */ private final long minDelayMS; /** * The heartbeat scheduler. */ private final ReferenceCountedObject.Reference scheduler; /** * Scheduled task which sends heart beats for all valid idle connections. */ private final Runnable sendHeartBeatRunnable = new Runnable() { @Override public void run() { boolean heartBeatSent = false; for (final ConnectionImpl connection : getValidConnections()) { heartBeatSent |= connection.sendHeartBeat(); } if (heartBeatSent) { scheduler.get().schedule(checkHeartBeatRunnable, timeoutMS, TimeUnit.MILLISECONDS); } } }; /** * The heartbeat timeout in milli-seconds. The connection will be marked as * failed if no heartbeat response is received within the timeout. */ private final long timeoutMS; /** * List of valid connections to which heartbeats will be sent. */ private final List validConnections = new LinkedList(); HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, final long timeout, final TimeUnit unit, final SearchRequest heartBeat, final ScheduledExecutorService scheduler) { Validator.ensureNotNull(factory, unit); Validator.ensureTrue(interval >= 0, "negative interval"); Validator.ensureTrue(timeout >= 0, "negative timeout"); this.heartBeatRequest = heartBeat != null ? heartBeat : DEFAULT_SEARCH; this.interval = interval; this.intervalUnit = unit; this.factory = factory; this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler); this.timeoutMS = unit.toMillis(timeout); this.minDelayMS = unit.toMillis(interval) / 2; } @Override public void close() { if (isClosed.compareAndSet(false, true)) { synchronized (validConnections) { if (!validConnections.isEmpty()) { if (DEBUG_LOG.isLoggable(Level.FINE)) { DEBUG_LOG.fine(String.format( "HeartbeatConnectionFactory '%s' is closing while %d " + "active connections remain", toString(), validConnections .size())); } } } scheduler.release(); factory.close(); } } @Override public Connection getConnection() throws ErrorResultException { /* * Immediately send a heart beat in order to determine if the connected * server is responding. */ final Connection connection = factory.getConnection(); boolean keepConnection = false; try { connection.searchAsync(heartBeatRequest, null, null).get(timeoutMS, TimeUnit.MILLISECONDS); keepConnection = true; return adaptConnection(connection); } catch (final Exception e) { throw adaptHeartBeatError(e); } finally { if (!keepConnection) { connection.close(); } } } @Override public FutureResult getConnectionAsync( final ResultHandler handler) { // Create a future responsible for chaining the initial heartbeat search. final ConnectionFutureResultImpl compositeFuture = new ConnectionFutureResultImpl(handler); // Request a connection. final FutureResult connectionFuture = factory.getConnectionAsync(compositeFuture.futureConnectionResult); // Set the connection future in the composite so that the returned search future can delegate. compositeFuture.futureConnectionResult.setFutureResult(connectionFuture); // Return the future representing the heartbeat. return compositeFuture.futureSearchResult; } @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("HeartBeatConnectionFactory("); builder.append(String.valueOf(factory)); builder.append(')'); return builder.toString(); } private Connection adaptConnection(final Connection connection) { synchronized (validConnections) { final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection); if (validConnections.isEmpty()) { /* This is the first active connection, so start the heart beat. */ heartBeatFuture = scheduler.get().scheduleWithFixedDelay(sendHeartBeatRunnable, 0, interval, intervalUnit); } validConnections.add(heartBeatConnection); return heartBeatConnection; } } private ErrorResultException adaptHeartBeatError(final Exception error) { if (error instanceof ConnectionException) { return (ErrorResultException) error; } else if (error instanceof TimeoutResultException || error instanceof TimeoutException) { return newHeartBeatTimeoutError(); } else if (error instanceof InterruptedException) { return newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, error); } else { return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_FAILED.get(), error); } } private ConnectionImpl[] getValidConnections() { final ConnectionImpl[] tmp; synchronized (validConnections) { tmp = validConnections.toArray(new ConnectionImpl[0]); } return tmp; } private ErrorResultException newHeartBeatTimeoutError() { return newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, HBCF_HEARTBEAT_TIMEOUT .get(timeoutMS)); } }