mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
18.37.2013 3f7ddbf313aaabbfba4650cb2036cb41e51a9bde
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -22,11 +22,13 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.ArrayList;
@@ -39,7 +41,7 @@
import java.util.logging.Level;
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
@@ -55,11 +57,8 @@
            ResultHandler<Connection> {
        private final ConnectionFactory factory;
        private final AtomicBoolean isOperational = new AtomicBoolean(true);
        private volatile FutureResult<?> pendingConnectFuture = null;
        private final int index;
        private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) {
@@ -67,9 +66,12 @@
            this.index = index;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void close() {
            // Should we cancel the future?
            factory.close();
        }
        @Override
        public Connection getConnection() throws ErrorResultException {
            final Connection connection;
@@ -87,14 +89,12 @@
            return connection;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public FutureResult<Connection> getConnectionAsync(
                final ResultHandler<? super Connection> resultHandler) {
            final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
                   new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler);
                    new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
                            resultHandler);
            final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() {
                @Override
@@ -141,9 +141,6 @@
            connection.close();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public String toString() {
            return factory.toString();
@@ -156,9 +153,9 @@
        private synchronized void checkIfAvailable() {
            if (!isOperational.get()
                    && (pendingConnectFuture == null || pendingConnectFuture.isDone())) {
                if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
                    StaticUtils.DEBUG_LOG.fine(String
                            .format("Attempting reconnect to offline factory " + this));
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format("Attempting reconnect to offline factory '%s'",
                            this));
                }
                pendingConnectFuture = factory.getConnectionAsync(this);
            }
@@ -167,21 +164,22 @@
        private void notifyOffline(final ErrorResultException error) {
            if (isOperational.getAndSet(false)) {
                // Transition from online to offline.
                if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) {
                    StaticUtils.DEBUG_LOG.warning(String.format("Connection factory " + factory
                            + " is no longer operational: " + error.getMessage()));
                if (DEBUG_LOG.isLoggable(Level.WARNING)) {
                    DEBUG_LOG.warning(String.format(
                            "Connection factory '%s' is no longer operational: %s", factory, error
                                    .getMessage()));
                }
                synchronized (stateLock) {
                    offlineFactoriesCount++;
                    if (offlineFactoriesCount == 1) {
                        // Enable monitoring.
                        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
                            StaticUtils.DEBUG_LOG.fine(String.format("Starting monitoring thread"));
                        if (DEBUG_LOG.isLoggable(Level.FINE)) {
                            DEBUG_LOG.fine(String.format("Starting monitoring thread"));
                        }
                        monitoringFuture =
                                scheduler.scheduleWithFixedDelay(new MonitorRunnable(), 0,
                                scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0,
                                        monitoringInterval, monitoringIntervalTimeUnit);
                    }
                }
@@ -191,16 +189,16 @@
        private void notifyOnline() {
            if (!isOperational.getAndSet(true)) {
                // Transition from offline to online.
                if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO)) {
                    StaticUtils.DEBUG_LOG.info(String.format("Connection factory " + factory
                            + " is now operational"));
                if (DEBUG_LOG.isLoggable(Level.INFO)) {
                    DEBUG_LOG.info(String.format("Connection factory'%s' is now operational",
                            factory));
                }
                synchronized (stateLock) {
                    offlineFactoriesCount--;
                    if (offlineFactoriesCount == 0) {
                        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) {
                            StaticUtils.DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
                        if (DEBUG_LOG.isLoggable(Level.FINE)) {
                            DEBUG_LOG.fine(String.format("Stopping monitoring thread"));
                        }
                        monitoringFuture.cancel(false);
@@ -225,91 +223,65 @@
    }
    private final List<MonitoredConnectionFactory> monitoredFactories;
    private final ScheduledExecutorService scheduler;
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    private final Object stateLock = new Object();
    // Guarded by stateLock.
    private int offlineFactoriesCount = 0;
    private final long monitoringInterval;
    private final TimeUnit monitoringIntervalTimeUnit;
    // Guarded by stateLock.
    private ScheduledFuture<?> monitoringFuture;
    /**
     * Creates a new abstract load balancing algorithm which will monitor
     * offline connection factories every second using the default scheduler.
     *
     * @param factories
     *            The connection factories.
     * Guarded by stateLock.
     */
    private int offlineFactoriesCount = 0;
    private final long monitoringInterval;
    private final TimeUnit monitoringIntervalTimeUnit;
    /**
     * Guarded by stateLock.
     */
    private ScheduledFuture<?> monitoringFuture;
    private AtomicBoolean isClosed = new AtomicBoolean();
    AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) {
        this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
        this(factories, 1, TimeUnit.SECONDS, null);
    }
    /**
     * Creates a new abstract load balancing algorithm which will monitor
     * offline connection factories using the specified frequency using the
     * default scheduler.
     *
     * @param factories
     *            The connection factories.
     * @param interval
     *            The interval between attempts to poll offline factories.
     * @param unit
     *            The time unit for the interval between attempts to poll
     *            offline factories.
     */
    AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
            final long interval, final TimeUnit unit) {
        this(factories, interval, unit, StaticUtils.getDefaultScheduler());
        this(factories, interval, unit, null);
    }
    /**
     * Creates a new abstract load balancing algorithm which will monitor
     * offline connection factories using the specified frequency and scheduler.
     *
     * @param factories
     *            The connection factories.
     * @param interval
     *            The interval between attempts to poll offline factories.
     * @param unit
     *            The time unit for the interval between attempts to poll
     *            offline factories.
     * @param scheduler
     *            The scheduler which should for periodically monitoring dead
     *            connection factories to see if they are usable again.
     */
    AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories,
            final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) {
        Validator.ensureNotNull(factories, scheduler, unit);
        Validator.ensureNotNull(factories, unit);
        this.monitoredFactories = new ArrayList<MonitoredConnectionFactory>(factories.size());
        int i = 0;
        for (final ConnectionFactory f : factories) {
            this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++));
        }
        this.scheduler = scheduler;
        this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
        this.monitoringInterval = interval;
        this.monitoringIntervalTimeUnit = unit;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        if (isClosed.compareAndSet(false, true)) {
            synchronized (stateLock) {
                if (monitoringFuture != null) {
                    monitoringFuture.cancel(false);
                    monitoringFuture = null;
                }
            }
            for (ConnectionFactory factory : monitoredFactories) {
                factory.close();
            }
            scheduler.release();
        }
    }
    @Override
    public final ConnectionFactory getConnectionFactory() throws ErrorResultException {
        final int index = getInitialConnectionFactoryIndex();
        return getMonitoredConnectionFactory(index);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
@@ -357,9 +329,11 @@
            index = (index + 1) % maxIndex;
        } while (index != initialIndex);
        // All factories are offline so give up. We could have a
        // configurable policy here such as waiting indefinitely, or for a
        // configurable timeout period.
        /*
         * All factories are offline so give up. We could have a configurable
         * policy here such as waiting indefinitely, or for a configurable
         * timeout period.
         */
        throw newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR,
                "No operational connection factories available");
    }