| | |
| | | * |
| | | * |
| | | * Copyright 2009-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2012 ForgeRock AS. |
| | | * Portions copyright 2011-2013 ForgeRock AS. |
| | | */ |
| | | |
| | | package org.forgerock.opendj.ldap; |
| | |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.locks.AbstractQueuedSynchronizer; |
| | | import java.util.logging.Level; |
| | | |
| | |
| | | |
| | | import com.forgerock.opendj.util.AsynchronousFutureResult; |
| | | import com.forgerock.opendj.util.FutureResultTransformer; |
| | | import com.forgerock.opendj.util.StaticUtils; |
| | | import com.forgerock.opendj.util.ReferenceCountedObject; |
| | | import com.forgerock.opendj.util.Validator; |
| | | |
| | | /** |
| | |
| | | /** |
| | | * A connection that sends heart beats and supports all operations. |
| | | */ |
| | | private final class ConnectionImpl extends AbstractConnectionWrapper implements |
| | | private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements |
| | | ConnectionEventListener, SearchResultHandler { |
| | | |
| | | /** |
| | |
| | | * @param <R> |
| | | * The type of result returned by the request. |
| | | */ |
| | | private abstract class DelayedFuture<R extends Result> |
| | | extends AsynchronousFutureResult<R, ResultHandler<? super R>> |
| | | implements Runnable { |
| | | private abstract class DelayedFuture<R extends Result> extends |
| | | AsynchronousFutureResult<R, ResultHandler<? super R>> implements Runnable { |
| | | private volatile FutureResult<R> innerFuture = null; |
| | | |
| | | protected DelayedFuture(final ResultHandler<? super R> handler) { |
| | |
| | | |
| | | } |
| | | |
| | | // List of pending Bind or StartTLS requests which must be invoked |
| | | // when the current heart beat completes. |
| | | /* |
| | | * List of pending Bind or StartTLS requests which must be invoked when |
| | | * the current heart beat completes. |
| | | */ |
| | | private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>(); |
| | | |
| | | // Coordinates heart-beats with Bind and StartTLS requests. |
| | | /* Coordinates heart-beats with Bind and StartTLS requests. */ |
| | | private final Sync sync = new Sync(); |
| | | |
| | | // Timestamp of last response received (any response, not just heart beats). |
| | | /* |
| | | * Timestamp of last response received (any response, not just heart |
| | | * beats). |
| | | */ |
| | | private volatile long timestamp = currentTimeMillis(); // Assume valid at creation. |
| | | |
| | | private ConnectionImpl(final Connection connection) { |
| | |
| | | return connection.bindAsync(request, intermediateResponseHandler, timestamper( |
| | | resultHandler, true)); |
| | | } else { |
| | | // A heart beat must be in progress so create a runnable task |
| | | // which will be executed when the heart beat completes. |
| | | /* |
| | | * A heart beat must be in progress so create a runnable task |
| | | * which will be executed when the heart beat completes. |
| | | */ |
| | | final DelayedFuture<BindResult> future = |
| | | new DelayedFuture<BindResult>(resultHandler) { |
| | | @Override |
| | |
| | | timestamper(this, true)); |
| | | } |
| | | }; |
| | | // Enqueue and flush if the heart beat has completed in the mean time. |
| | | /* |
| | | * Enqueue and flush if the heart beat has completed in the mean |
| | | * time. |
| | | */ |
| | | pendingRequests.offer(future); |
| | | flushPendingRequests(); |
| | | return future; |
| | |
| | | return connection.extendedRequestAsync(request, intermediateResponseHandler, |
| | | timestamper(resultHandler, true)); |
| | | } else { |
| | | // A heart beat must be in progress so create a runnable task |
| | | // which will be executed when the heart beat completes. |
| | | /* |
| | | * A heart beat must be in progress so create a runnable |
| | | * task which will be executed when the heart beat |
| | | * completes. |
| | | */ |
| | | final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) { |
| | | @Override |
| | | public FutureResult<R> dispatch() { |
| | |
| | | intermediateResponseHandler, timestamper(this, true)); |
| | | } |
| | | }; |
| | | // Enqueue and flush if the heart beat has completed in the mean time. |
| | | |
| | | /* |
| | | * Enqueue and flush if the heart beat has completed in the |
| | | * mean time. |
| | | */ |
| | | pendingRequests.offer(future); |
| | | flushPendingRequests(); |
| | | return future; |
| | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage())); |
| | | DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage())); |
| | | } |
| | | updateTimestamp(); |
| | | releaseHeartBeatLock(); |
| | |
| | | } |
| | | |
| | | private void acquireBindOrStartTLSLock() throws ErrorResultException { |
| | | // Wait for pending heartbeats and prevent new heartbeats from |
| | | // being sent while the bind is in progress. |
| | | /* |
| | | * Wait for pending heartbeats and prevent new heartbeats from being |
| | | * sent while the bind is in progress. |
| | | */ |
| | | try { |
| | | if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) { |
| | | // Give up - it looks like the connection is dead. |
| | |
| | | |
| | | private void flushPendingRequests() { |
| | | if (!pendingRequests.isEmpty()) { |
| | | // The pending requests will acquire the shared lock, but we take |
| | | // it here anyway to ensure that pending requests do not get blocked. |
| | | /* |
| | | * The pending requests will acquire the shared lock, but we |
| | | * take it here anyway to ensure that pending requests do not |
| | | * get blocked. |
| | | */ |
| | | if (sync.tryLockShared()) { |
| | | try { |
| | | Runnable pendingRequest; |
| | |
| | | connection.removeConnectionEventListener(this); |
| | | activeConnections.remove(this); |
| | | if (activeConnections.isEmpty()) { |
| | | // This is the last active connection, so stop the heartbeat. |
| | | /* |
| | | * This is the last active connection, so stop the |
| | | * heartbeat. |
| | | */ |
| | | heartBeatFuture.cancel(false); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | private void sendHeartBeat() { |
| | | // Only send the heartbeat if the connection has been idle for some time. |
| | | /* |
| | | * Only send the heartbeat if the connection has been idle for some |
| | | * time. |
| | | */ |
| | | if (currentTimeMillis() < (timestamp + minDelayMS)) { |
| | | return; |
| | | } |
| | | |
| | | // Don't send a heart beat if there is already a heart beat, |
| | | // bind, or startTLS in progress. Note that the bind/startTLS |
| | | // response will update the timestamp as if it were a heart beat. |
| | | /* |
| | | * Don't send a heart beat if there is already a heart beat, bind, |
| | | * or startTLS in progress. Note that the bind/startTLS response |
| | | * will update the timestamp as if it were a heart beat. |
| | | */ |
| | | if (sync.tryLockExclusively()) { |
| | | try { |
| | | connection.searchAsync(heartBeatRequest, null, this); |
| | | } catch (final Exception e) { |
| | | // This may happen when we attempt to send the heart beat just |
| | | // after the connection is closed but before we are notified. |
| | | /* |
| | | * This may happen when we attempt to send the heart beat |
| | | * just after the connection is closed but before we are |
| | | * notified. |
| | | */ |
| | | |
| | | // Release the lock because we're never going to get a response. |
| | | /* |
| | | * Release the lock because we're never going to get a |
| | | * response. |
| | | */ |
| | | releaseHeartBeatLock(); |
| | | } |
| | | } |
| | |
| | | * </ul> |
| | | */ |
| | | private static final class Sync extends AbstractQueuedSynchronizer { |
| | | // Lock states. Positive values indicate that the shared lock is taken. |
| | | /* Lock states. Positive values indicate that the shared lock is taken. */ |
| | | private static final int UNLOCKED = 0; // initial state |
| | | private static final int LOCKED_EXCLUSIVELY = -1; |
| | | |
| | |
| | | } |
| | | final int newState = state - 1; |
| | | if (compareAndSetState(state, newState)) { |
| | | // We could always return true here, but since there cannot |
| | | // be waiting readers we can specialize for waiting writers. |
| | | /* |
| | | * We could always return true here, but since there cannot |
| | | * be waiting readers we can specialize for waiting writers. |
| | | */ |
| | | return newState == UNLOCKED; |
| | | } |
| | | } |
| | |
| | | private final SearchRequest heartBeatRequest; |
| | | private final long interval; |
| | | private final long minDelayMS; |
| | | private final ScheduledExecutorService scheduler; |
| | | private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; |
| | | private final long timeoutMS; |
| | | private final TimeUnit unit; |
| | | private AtomicBoolean isClosed = new AtomicBoolean(); |
| | | |
| | | /** |
| | | * Creates a new heart-beat connection factory which will create connections |
| | | * using the provided connection factory and periodically ping any created |
| | | * connections in order to detect that they are still alive every 10 seconds |
| | | * using the default scheduler. |
| | | * |
| | | * @param factory |
| | | * The connection factory to use for creating connections. |
| | | */ |
| | | HeartBeatConnectionFactory(final ConnectionFactory factory) { |
| | | this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler()); |
| | | this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new heart-beat connection factory which will create connections |
| | | * using the provided connection factory and periodically ping any created |
| | | * connections in order to detect that they are still alive using the |
| | | * specified frequency and the default scheduler. |
| | | * |
| | | * @param factory |
| | | * The connection factory to use for creating connections. |
| | | * @param interval |
| | | * The interval between keepalive pings. |
| | | * @param unit |
| | | * The time unit for the interval between keepalive pings. |
| | | */ |
| | | HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, |
| | | final TimeUnit unit) { |
| | | this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler()); |
| | | this(factory, interval, unit, DEFAULT_SEARCH, null); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new heart-beat connection factory which will create connections |
| | | * using the provided connection factory and periodically ping any created |
| | | * connections using the specified search request in order to detect that |
| | | * they are still alive. |
| | | * |
| | | * @param factory |
| | | * The connection factory to use for creating connections. |
| | | * @param interval |
| | | * The interval between keepalive pings. |
| | | * @param unit |
| | | * The time unit for the interval between keepalive pings. |
| | | * @param heartBeat |
| | | * The search request to use for keepalive pings. |
| | | */ |
| | | HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, |
| | | final TimeUnit unit, final SearchRequest heartBeat) { |
| | | this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler()); |
| | | this(factory, interval, unit, heartBeat, null); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new heart-beat connection factory which will create connections |
| | | * using the provided connection factory and periodically ping any created |
| | | * connections using the specified search request in order to detect that |
| | | * they are still alive. |
| | | * |
| | | * @param factory |
| | | * The connection factory to use for creating connections. |
| | | * @param interval |
| | | * The interval between keepalive pings. |
| | | * @param unit |
| | | * The time unit for the interval between keepalive pings. |
| | | * @param heartBeat |
| | | * The search request to use for keepalive pings. |
| | | * @param scheduler |
| | | * The scheduler which should for periodically sending keepalive |
| | | * pings. |
| | | */ |
| | | HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval, |
| | | final TimeUnit unit, final SearchRequest heartBeat, |
| | | final ScheduledExecutorService scheduler) { |
| | | Validator.ensureNotNull(factory, heartBeat, unit, scheduler); |
| | | Validator.ensureNotNull(factory, heartBeat, unit); |
| | | Validator.ensureTrue(interval >= 0, "negative timeout"); |
| | | |
| | | this.heartBeatRequest = heartBeat; |
| | |
| | | this.unit = unit; |
| | | this.activeConnections = new LinkedList<ConnectionImpl>(); |
| | | this.factory = factory; |
| | | this.scheduler = scheduler; |
| | | this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler); |
| | | this.timeoutMS = unit.toMillis(interval) * 2; |
| | | this.minDelayMS = unit.toMillis(interval) / 2; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void close() { |
| | | if (isClosed.compareAndSet(false, true)) { |
| | | synchronized (activeConnections) { |
| | | if (!activeConnections.isEmpty()) { |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "HeartbeatConnectionFactory '%s' is closing while %d " |
| | | + "active connections remain", toString(), |
| | | activeConnections.size())); |
| | | } |
| | | } |
| | | } |
| | | scheduler.release(); |
| | | factory.close(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public Connection getConnection() throws ErrorResultException { |
| | | return adaptConnection(factory.getConnection()); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Connection> getConnectionAsync( |
| | | final ResultHandler<? super Connection> handler) { |
| | |
| | | return future; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() { |
| | | final StringBuilder builder = new StringBuilder(); |
| | |
| | | synchronized (activeConnections) { |
| | | connection.addConnectionEventListener(heartBeatConnection); |
| | | if (activeConnections.isEmpty()) { |
| | | // This is the first active connection, so start the heart beat. |
| | | heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() { |
| | | /* This is the first active connection, so start the heart beat. */ |
| | | heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | final ConnectionImpl[] tmp; |