mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
18.37.2013 3f7ddbf313aaabbfba4650cb2036cb41e51a9bde
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -40,6 +40,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.logging.Level;
@@ -64,7 +65,7 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.FutureResultTransformer;
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
@@ -75,7 +76,7 @@
    /**
     * A connection that sends heart beats and supports all operations.
     */
    private final class ConnectionImpl extends AbstractConnectionWrapper implements
    private final class ConnectionImpl extends AbstractConnectionWrapper<Connection> implements
            ConnectionEventListener, SearchResultHandler {
        /**
@@ -85,9 +86,8 @@
         * @param <R>
         *            The type of result returned by the request.
         */
        private abstract class DelayedFuture<R extends Result>
                extends AsynchronousFutureResult<R, ResultHandler<? super R>>
                implements Runnable {
        private abstract class DelayedFuture<R extends Result> extends
                AsynchronousFutureResult<R, ResultHandler<? super R>> implements Runnable {
            private volatile FutureResult<R> innerFuture = null;
            protected DelayedFuture(final ResultHandler<? super R> handler) {
@@ -123,14 +123,19 @@
        }
        // List of pending Bind or StartTLS requests which must be invoked
        // when the current heart beat completes.
        /*
         * List of pending Bind or StartTLS requests which must be invoked when
         * the current heart beat completes.
         */
        private final Queue<Runnable> pendingRequests = new ConcurrentLinkedQueue<Runnable>();
        // Coordinates heart-beats with Bind and StartTLS requests.
        /* Coordinates heart-beats with Bind and StartTLS requests. */
        private final Sync sync = new Sync();
        // Timestamp of last response received (any response, not just heart beats).
        /*
         * Timestamp of last response received (any response, not just heart
         * beats).
         */
        private volatile long timestamp = currentTimeMillis(); // Assume valid at creation.
        private ConnectionImpl(final Connection connection) {
@@ -206,8 +211,10 @@
                return connection.bindAsync(request, intermediateResponseHandler, timestamper(
                        resultHandler, true));
            } else {
                // A heart beat must be in progress so create a runnable task
                // which will be executed when the heart beat completes.
                /*
                 * A heart beat must be in progress so create a runnable task
                 * which will be executed when the heart beat completes.
                 */
                final DelayedFuture<BindResult> future =
                        new DelayedFuture<BindResult>(resultHandler) {
                            @Override
@@ -216,7 +223,10 @@
                                        timestamper(this, true));
                            }
                        };
                // Enqueue and flush if the heart beat has completed in the mean time.
                /*
                 * Enqueue and flush if the heart beat has completed in the mean
                 * time.
                 */
                pendingRequests.offer(future);
                flushPendingRequests();
                return future;
@@ -342,8 +352,11 @@
                    return connection.extendedRequestAsync(request, intermediateResponseHandler,
                            timestamper(resultHandler, true));
                } else {
                    // A heart beat must be in progress so create a runnable task
                    // which will be executed when the heart beat completes.
                    /*
                     * A heart beat must be in progress so create a runnable
                     * task which will be executed when the heart beat
                     * completes.
                     */
                    final DelayedFuture<R> future = new DelayedFuture<R>(resultHandler) {
                        @Override
                        public FutureResult<R> dispatch() {
@@ -351,7 +364,11 @@
                                    intermediateResponseHandler, timestamper(this, true));
                        }
                    };
                    // Enqueue and flush if the heart beat has completed in the mean time.
                    /*
                     * Enqueue and flush if the heart beat has completed in the
                     * mean time.
                     */
                    pendingRequests.offer(future);
                    flushPendingRequests();
                    return future;
@@ -382,7 +399,7 @@
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format("Heartbeat failed: " + error.getMessage()));
                DEBUG_LOG.fine(String.format("Heartbeat failed: %s", error.getMessage()));
            }
            updateTimestamp();
            releaseHeartBeatLock();
@@ -582,8 +599,10 @@
        }
        private void acquireBindOrStartTLSLock() throws ErrorResultException {
            // Wait for pending heartbeats and prevent new heartbeats from
            // being sent while the bind is in progress.
            /*
             * Wait for pending heartbeats and prevent new heartbeats from being
             * sent while the bind is in progress.
             */
            try {
                if (!sync.tryLockShared(timeoutMS, TimeUnit.MILLISECONDS)) {
                    // Give up - it looks like the connection is dead.
@@ -597,8 +616,11 @@
        private void flushPendingRequests() {
            if (!pendingRequests.isEmpty()) {
                // The pending requests will acquire the shared lock, but we take
                // it here anyway to ensure that pending requests do not get blocked.
                /*
                 * The pending requests will acquire the shared lock, but we
                 * take it here anyway to ensure that pending requests do not
                 * get blocked.
                 */
                if (sync.tryLockShared()) {
                    try {
                        Runnable pendingRequest;
@@ -617,7 +639,10 @@
                connection.removeConnectionEventListener(this);
                activeConnections.remove(this);
                if (activeConnections.isEmpty()) {
                    // This is the last active connection, so stop the heartbeat.
                    /*
                     * This is the last active connection, so stop the
                     * heartbeat.
                     */
                    heartBeatFuture.cancel(false);
                }
            }
@@ -633,22 +658,33 @@
        }
        private void sendHeartBeat() {
            // Only send the heartbeat if the connection has been idle for some time.
            /*
             * Only send the heartbeat if the connection has been idle for some
             * time.
             */
            if (currentTimeMillis() < (timestamp + minDelayMS)) {
                return;
            }
            // Don't send a heart beat if there is already a heart beat,
            // bind, or startTLS in progress. Note that the bind/startTLS
            // response will update the timestamp as if it were a heart beat.
            /*
             * Don't send a heart beat if there is already a heart beat, bind,
             * or startTLS in progress. Note that the bind/startTLS response
             * will update the timestamp as if it were a heart beat.
             */
            if (sync.tryLockExclusively()) {
                try {
                    connection.searchAsync(heartBeatRequest, null, this);
                } catch (final Exception e) {
                    // This may happen when we attempt to send the heart beat just
                    // after the connection is closed but before we are notified.
                    /*
                     * This may happen when we attempt to send the heart beat
                     * just after the connection is closed but before we are
                     * notified.
                     */
                    // Release the lock because we're never going to get a response.
                    /*
                     * Release the lock because we're never going to get a
                     * response.
                     */
                    releaseHeartBeatLock();
                }
            }
@@ -755,7 +791,7 @@
     * </ul>
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        // Lock states. Positive values indicate that the shared lock is taken.
        /* Lock states. Positive values indicate that the shared lock is taken. */
        private static final int UNLOCKED = 0; // initial state
        private static final int LOCKED_EXCLUSIVELY = -1;
@@ -809,8 +845,10 @@
                }
                final int newState = state - 1;
                if (compareAndSetState(state, newState)) {
                    // We could always return true here, but since there cannot
                    // be waiting readers we can specialize for waiting writers.
                    /*
                     * We could always return true here, but since there cannot
                     * be waiting readers we can specialize for waiting writers.
                     */
                    return newState == UNLOCKED;
                }
            }
@@ -851,83 +889,29 @@
    private final SearchRequest heartBeatRequest;
    private final long interval;
    private final long minDelayMS;
    private final ScheduledExecutorService scheduler;
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    private final long timeoutMS;
    private final TimeUnit unit;
    private AtomicBoolean isClosed = new AtomicBoolean();
    /**
     * Creates a new heart-beat connection factory which will create connections
     * using the provided connection factory and periodically ping any created
     * connections in order to detect that they are still alive every 10 seconds
     * using the default scheduler.
     *
     * @param factory
     *            The connection factory to use for creating connections.
     */
    HeartBeatConnectionFactory(final ConnectionFactory factory) {
        this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
        this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, null);
    }
    /**
     * Creates a new heart-beat connection factory which will create connections
     * using the provided connection factory and periodically ping any created
     * connections in order to detect that they are still alive using the
     * specified frequency and the default scheduler.
     *
     * @param factory
     *            The connection factory to use for creating connections.
     * @param interval
     *            The interval between keepalive pings.
     * @param unit
     *            The time unit for the interval between keepalive pings.
     */
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit) {
        this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils.getDefaultScheduler());
        this(factory, interval, unit, DEFAULT_SEARCH, null);
    }
    /**
     * Creates a new heart-beat connection factory which will create connections
     * using the provided connection factory and periodically ping any created
     * connections using the specified search request in order to detect that
     * they are still alive.
     *
     * @param factory
     *            The connection factory to use for creating connections.
     * @param interval
     *            The interval between keepalive pings.
     * @param unit
     *            The time unit for the interval between keepalive pings.
     * @param heartBeat
     *            The search request to use for keepalive pings.
     */
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit, final SearchRequest heartBeat) {
        this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler());
        this(factory, interval, unit, heartBeat, null);
    }
    /**
     * Creates a new heart-beat connection factory which will create connections
     * using the provided connection factory and periodically ping any created
     * connections using the specified search request in order to detect that
     * they are still alive.
     *
     * @param factory
     *            The connection factory to use for creating connections.
     * @param interval
     *            The interval between keepalive pings.
     * @param unit
     *            The time unit for the interval between keepalive pings.
     * @param heartBeat
     *            The search request to use for keepalive pings.
     * @param scheduler
     *            The scheduler which should for periodically sending keepalive
     *            pings.
     */
    HeartBeatConnectionFactory(final ConnectionFactory factory, final long interval,
            final TimeUnit unit, final SearchRequest heartBeat,
            final ScheduledExecutorService scheduler) {
        Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
        Validator.ensureNotNull(factory, heartBeat, unit);
        Validator.ensureTrue(interval >= 0, "negative timeout");
        this.heartBeatRequest = heartBeat;
@@ -935,22 +919,34 @@
        this.unit = unit;
        this.activeConnections = new LinkedList<ConnectionImpl>();
        this.factory = factory;
        this.scheduler = scheduler;
        this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
        this.timeoutMS = unit.toMillis(interval) * 2;
        this.minDelayMS = unit.toMillis(interval) / 2;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            synchronized (activeConnections) {
                if (!activeConnections.isEmpty()) {
                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
                        DEBUG_LOG.fine(String.format(
                                "HeartbeatConnectionFactory '%s' is closing while %d "
                                        + "active connections remain", toString(),
                                activeConnections.size()));
                    }
                }
            }
            scheduler.release();
            factory.close();
        }
    }
    @Override
    public Connection getConnection() throws ErrorResultException {
        return adaptConnection(factory.getConnection());
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
@@ -967,9 +963,6 @@
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
@@ -984,8 +977,8 @@
        synchronized (activeConnections) {
            connection.addConnectionEventListener(heartBeatConnection);
            if (activeConnections.isEmpty()) {
                // This is the first active connection, so start the heart beat.
                heartBeatFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
                /* This is the first active connection, so start the heart beat. */
                heartBeatFuture = scheduler.get().scheduleWithFixedDelay(new Runnable() {
                    @Override
                    public void run() {
                        final ConnectionImpl[] tmp;