| | |
| | | * |
| | | * |
| | | * Copyright 2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2012 ForgeRock AS. |
| | | * Portions copyright 2011-2013 ForgeRock AS. |
| | | */ |
| | | |
| | | package org.forgerock.opendj.ldap; |
| | | |
| | | import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; |
| | | import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER; |
| | | import static org.forgerock.opendj.ldap.ErrorResultException.*; |
| | | |
| | | import java.util.ArrayList; |
| | |
| | | import java.util.logging.Level; |
| | | |
| | | import com.forgerock.opendj.util.AsynchronousFutureResult; |
| | | import com.forgerock.opendj.util.StaticUtils; |
| | | import com.forgerock.opendj.util.ReferenceCountedObject; |
| | | import com.forgerock.opendj.util.Validator; |
| | | |
| | | /** |
| | |
| | | ResultHandler<Connection> { |
| | | |
| | | private final ConnectionFactory factory; |
| | | |
| | | private final AtomicBoolean isOperational = new AtomicBoolean(true); |
| | | |
| | | private volatile FutureResult<?> pendingConnectFuture = null; |
| | | |
| | | private final int index; |
| | | |
| | | private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) { |
| | |
| | | this.index = index; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void close() { |
| | | // Should we cancel the future? |
| | | factory.close(); |
| | | } |
| | | |
| | | @Override |
| | | public Connection getConnection() throws ErrorResultException { |
| | | final Connection connection; |
| | |
| | | return connection; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Connection> getConnectionAsync( |
| | | final ResultHandler<? super Connection> resultHandler) { |
| | | final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future = |
| | | new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler); |
| | | new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>( |
| | | resultHandler); |
| | | |
| | | final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() { |
| | | @Override |
| | |
| | | connection.close(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() { |
| | | return factory.toString(); |
| | |
| | | private synchronized void checkIfAvailable() { |
| | | if (!isOperational.get() |
| | | && (pendingConnectFuture == null || pendingConnectFuture.isDone())) { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | StaticUtils.DEBUG_LOG.fine(String |
| | | .format("Attempting reconnect to offline factory " + this)); |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Attempting reconnect to offline factory '%s'", |
| | | this)); |
| | | } |
| | | pendingConnectFuture = factory.getConnectionAsync(this); |
| | | } |
| | |
| | | private void notifyOffline(final ErrorResultException error) { |
| | | if (isOperational.getAndSet(false)) { |
| | | // Transition from online to offline. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { |
| | | StaticUtils.DEBUG_LOG.warning(String.format("Connection factory " + factory |
| | | + " is no longer operational: " + error.getMessage())); |
| | | if (DEBUG_LOG.isLoggable(Level.WARNING)) { |
| | | DEBUG_LOG.warning(String.format( |
| | | "Connection factory '%s' is no longer operational: %s", factory, error |
| | | .getMessage())); |
| | | } |
| | | |
| | | synchronized (stateLock) { |
| | | offlineFactoriesCount++; |
| | | if (offlineFactoriesCount == 1) { |
| | | // Enable monitoring. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | StaticUtils.DEBUG_LOG.fine(String.format("Starting monitoring thread")); |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Starting monitoring thread")); |
| | | } |
| | | |
| | | monitoringFuture = |
| | | scheduler.scheduleWithFixedDelay(new MonitorRunnable(), 0, |
| | | scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0, |
| | | monitoringInterval, monitoringIntervalTimeUnit); |
| | | } |
| | | } |
| | |
| | | private void notifyOnline() { |
| | | if (!isOperational.getAndSet(true)) { |
| | | // Transition from offline to online. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO)) { |
| | | StaticUtils.DEBUG_LOG.info(String.format("Connection factory " + factory |
| | | + " is now operational")); |
| | | if (DEBUG_LOG.isLoggable(Level.INFO)) { |
| | | DEBUG_LOG.info(String.format("Connection factory'%s' is now operational", |
| | | factory)); |
| | | } |
| | | |
| | | synchronized (stateLock) { |
| | | offlineFactoriesCount--; |
| | | if (offlineFactoriesCount == 0) { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | StaticUtils.DEBUG_LOG.fine(String.format("Stopping monitoring thread")); |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Stopping monitoring thread")); |
| | | } |
| | | |
| | | monitoringFuture.cancel(false); |
| | |
| | | } |
| | | |
| | | private final List<MonitoredConnectionFactory> monitoredFactories; |
| | | |
| | | private final ScheduledExecutorService scheduler; |
| | | |
| | | private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; |
| | | private final Object stateLock = new Object(); |
| | | |
| | | // Guarded by stateLock. |
| | | private int offlineFactoriesCount = 0; |
| | | |
| | | private final long monitoringInterval; |
| | | |
| | | private final TimeUnit monitoringIntervalTimeUnit; |
| | | |
| | | // Guarded by stateLock. |
| | | private ScheduledFuture<?> monitoringFuture; |
| | | |
| | | /** |
| | | * Creates a new abstract load balancing algorithm which will monitor |
| | | * offline connection factories every second using the default scheduler. |
| | | * |
| | | * @param factories |
| | | * The connection factories. |
| | | * Guarded by stateLock. |
| | | */ |
| | | private int offlineFactoriesCount = 0; |
| | | private final long monitoringInterval; |
| | | private final TimeUnit monitoringIntervalTimeUnit; |
| | | /** |
| | | * Guarded by stateLock. |
| | | */ |
| | | private ScheduledFuture<?> monitoringFuture; |
| | | private AtomicBoolean isClosed = new AtomicBoolean(); |
| | | |
| | | AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) { |
| | | this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler()); |
| | | this(factories, 1, TimeUnit.SECONDS, null); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new abstract load balancing algorithm which will monitor |
| | | * offline connection factories using the specified frequency using the |
| | | * default scheduler. |
| | | * |
| | | * @param factories |
| | | * The connection factories. |
| | | * @param interval |
| | | * The interval between attempts to poll offline factories. |
| | | * @param unit |
| | | * The time unit for the interval between attempts to poll |
| | | * offline factories. |
| | | */ |
| | | AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories, |
| | | final long interval, final TimeUnit unit) { |
| | | this(factories, interval, unit, StaticUtils.getDefaultScheduler()); |
| | | this(factories, interval, unit, null); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new abstract load balancing algorithm which will monitor |
| | | * offline connection factories using the specified frequency and scheduler. |
| | | * |
| | | * @param factories |
| | | * The connection factories. |
| | | * @param interval |
| | | * The interval between attempts to poll offline factories. |
| | | * @param unit |
| | | * The time unit for the interval between attempts to poll |
| | | * offline factories. |
| | | * @param scheduler |
| | | * The scheduler which should for periodically monitoring dead |
| | | * connection factories to see if they are usable again. |
| | | */ |
| | | AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories, |
| | | final long interval, final TimeUnit unit, final ScheduledExecutorService scheduler) { |
| | | Validator.ensureNotNull(factories, scheduler, unit); |
| | | Validator.ensureNotNull(factories, unit); |
| | | |
| | | this.monitoredFactories = new ArrayList<MonitoredConnectionFactory>(factories.size()); |
| | | int i = 0; |
| | | for (final ConnectionFactory f : factories) { |
| | | this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++)); |
| | | } |
| | | this.scheduler = scheduler; |
| | | this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler); |
| | | this.monitoringInterval = interval; |
| | | this.monitoringIntervalTimeUnit = unit; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void close() { |
| | | if (isClosed.compareAndSet(false, true)) { |
| | | synchronized (stateLock) { |
| | | if (monitoringFuture != null) { |
| | | monitoringFuture.cancel(false); |
| | | monitoringFuture = null; |
| | | } |
| | | } |
| | | for (ConnectionFactory factory : monitoredFactories) { |
| | | factory.close(); |
| | | } |
| | | scheduler.release(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public final ConnectionFactory getConnectionFactory() throws ErrorResultException { |
| | | final int index = getInitialConnectionFactoryIndex(); |
| | | return getMonitoredConnectionFactory(index); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() { |
| | | final StringBuilder builder = new StringBuilder(); |
| | |
| | | index = (index + 1) % maxIndex; |
| | | } while (index != initialIndex); |
| | | |
| | | // All factories are offline so give up. We could have a |
| | | // configurable policy here such as waiting indefinitely, or for a |
| | | // configurable timeout period. |
| | | /* |
| | | * All factories are offline so give up. We could have a configurable |
| | | * policy here such as waiting indefinitely, or for a configurable |
| | | * timeout period. |
| | | */ |
| | | throw newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR, |
| | | "No operational connection factories available"); |
| | | } |