/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010 Sun Microsystems, Inc.
*/
package org.opends.sdk;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.opends.sdk.responses.Responses;
import com.sun.opends.sdk.util.AsynchronousFutureResult;
import com.sun.opends.sdk.util.StaticUtils;
import com.sun.opends.sdk.util.Validator;
/**
* An abstract load balancing algorithm providing monitoring and failover
* capabilities.
*
* Implementations should override the method
* {@code getInitialConnectionFactoryIndex()} in order to provide the policy for
* selecting the first connection factory to use for each connection request.
*/
abstract class AbstractLoadBalancingAlgorithm implements LoadBalancingAlgorithm
{
private final class MonitoredConnectionFactory extends
AbstractConnectionFactory implements
ResultHandler
{
private final ConnectionFactory factory;
private final AtomicBoolean isOperational = new AtomicBoolean(true);
private volatile FutureResult> pendingConnectFuture = null;
private final int index;
private MonitoredConnectionFactory(final ConnectionFactory factory,
final int index)
{
this.factory = factory;
this.index = index;
}
/**
* {@inheritDoc}
*/
@Override
public FutureResult getAsynchronousConnection(
final ResultHandler super AsynchronousConnection> resultHandler)
{
final AsynchronousFutureResult future =
new AsynchronousFutureResult(resultHandler);
final ResultHandler failoverHandler =
new ResultHandler()
{
@Override
public void handleErrorResult(final ErrorResultException error)
{
// Attempt failed - try next factory.
notifyOffline(error);
final int nextIndex = (index + 1) % monitoredFactories.size();
try
{
final MonitoredConnectionFactory nextFactory =
getMonitoredConnectionFactory(nextIndex);
nextFactory.getAsynchronousConnection(future);
}
catch (final ErrorResultException e)
{
future.handleErrorResult(e);
}
}
@Override
public void handleResult(final AsynchronousConnection result)
{
notifyOnline();
future.handleResult(result);
}
};
factory.getAsynchronousConnection(failoverHandler);
return future;
}
/**
* Handle monitoring connection request failure.
*/
@Override
public void handleErrorResult(final ErrorResultException error)
{
notifyOffline(error);
}
/**
* Handle monitoring connection request success.
*/
@Override
public void handleResult(final AsynchronousConnection connection)
{
notifyOnline();
// The connection is not going to be used, so close it immediately.
connection.close();
}
/**
* {@inheritDoc}
*/
@Override
public String toString()
{
return factory.toString();
}
/**
* Attempt to connect to the factory if it is offline and there is no
* pending monitoring request.
*/
private synchronized void checkIfAvailable()
{
if (!isOperational.get()
&& (pendingConnectFuture == null || pendingConnectFuture.isDone()))
{
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINEST))
{
StaticUtils.DEBUG_LOG.finest(String
.format("Attempting connect on factory " + this));
}
pendingConnectFuture = factory.getAsynchronousConnection(this);
}
}
private void notifyOffline(final ErrorResultException error)
{
if (isOperational.getAndSet(false))
{
// Transition from online to offline.
synchronized (stateLock)
{
offlineFactoriesCount++;
if (offlineFactoriesCount == 1)
{
// Enable monitoring.
monitoringFuture = scheduler.scheduleWithFixedDelay(
new MonitorRunnable(), 0, monitoringInterval,
monitoringIntervalTimeUnit);
}
}
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
StaticUtils.DEBUG_LOG.fine(String.format("Connection factory "
+ factory + " is no longer operational: " + error.getMessage()));
}
}
}
private void notifyOnline()
{
if (!isOperational.getAndSet(true))
{
// Transition from offline to online.
synchronized (stateLock)
{
offlineFactoriesCount--;
if (offlineFactoriesCount == 0)
{
monitoringFuture.cancel(false);
monitoringFuture = null;
}
}
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
StaticUtils.DEBUG_LOG.fine(String.format("Connection factory "
+ factory + " is now operational"));
}
}
}
}
private final class MonitorRunnable implements Runnable
{
private MonitorRunnable()
{
// Nothing to do.
}
@Override
public void run()
{
for (final MonitoredConnectionFactory factory : monitoredFactories)
{
factory.checkIfAvailable();
}
}
}
private final List monitoredFactories;
private final ScheduledExecutorService scheduler;
private final Object stateLock = new Object();
// Guarded by stateLock.
private int offlineFactoriesCount = 0;
private final long monitoringInterval;
private final TimeUnit monitoringIntervalTimeUnit;
// Guarded by stateLock.
private ScheduledFuture> monitoringFuture;
/**
* Creates a new abstract load balancing algorithm which will monitor offline
* connection factories every 10 seconds using the default scheduler.
*
* @param factories
* The connection factories.
*/
AbstractLoadBalancingAlgorithm(final Collection factories)
{
this(factories, 10, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
}
/**
* Creates a new abstract load balancing algorithm which will monitor offline
* connection factories using the specified frequency using the default
* scheduler.
*
* @param factories
* The connection factories.
* @param interval
* The interval between attempts to poll offline factories.
* @param unit
* The time unit for the interval between attempts to poll offline
* factories.
*/
AbstractLoadBalancingAlgorithm(final Collection factories,
final long interval, final TimeUnit unit)
{
this(factories, interval, unit, StaticUtils.getDefaultScheduler());
}
/**
* Creates a new abstract load balancing algorithm which will monitor offline
* connection factories using the specified frequency and scheduler.
*
* @param factories
* The connection factories.
* @param interval
* The interval between attempts to poll offline factories.
* @param unit
* The time unit for the interval between attempts to poll offline
* factories.
* @param scheduler
* The scheduler which should for periodically monitoring dead
* connection factories to see if they are usable again.
*/
AbstractLoadBalancingAlgorithm(final Collection factories,
final long interval, final TimeUnit unit,
final ScheduledExecutorService scheduler)
{
Validator.ensureNotNull(factories, scheduler, unit);
this.monitoredFactories = new ArrayList(
factories.size());
int i = 0;
for (final ConnectionFactory f : factories)
{
this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++));
}
this.scheduler = scheduler;
this.monitoringInterval = interval;
this.monitoringIntervalTimeUnit = unit;
}
/**
* {@inheritDoc}
*/
@Override
public final ConnectionFactory getConnectionFactory()
throws ErrorResultException
{
final int index = getInitialConnectionFactoryIndex();
return getMonitoredConnectionFactory(index);
}
/**
* {@inheritDoc}
*/
@Override
public String toString()
{
final StringBuilder builder = new StringBuilder();
builder.append(getAlgorithmName());
builder.append('(');
boolean isFirst = true;
for (final ConnectionFactory factory : monitoredFactories)
{
if (!isFirst)
{
builder.append(',');
}
else
{
isFirst = false;
}
builder.append(factory);
}
builder.append(')');
return builder.toString();
}
/**
* Returns the name of this load balancing algorithm.
*
* @return The name of this load balancing algorithm.
*/
abstract String getAlgorithmName();
/**
* Returns the index of the first connection factory which should be used in
* order to satisfy the next connection request.
*
* @return The index of the first connection factory which should be used in
* order to satisfy the next connection request.
*/
abstract int getInitialConnectionFactoryIndex();
// Return the first factory after index which is operational.
private MonitoredConnectionFactory getMonitoredConnectionFactory(
final int initialIndex) throws ErrorResultException
{
int index = initialIndex;
final int maxIndex = monitoredFactories.size();
do
{
final MonitoredConnectionFactory factory = monitoredFactories.get(index);
if (factory.isOperational.get())
{
return factory;
}
index = (index + 1) % maxIndex;
}
while (index != initialIndex);
// All factories are offline so give up. We could have a
// configurable policy here such as waiting indefinitely, or for a
// configurable timeout period.
throw ErrorResultException.wrap(Responses.newResult(
ResultCode.CLIENT_SIDE_CONNECT_ERROR).setDiagnosticMessage(
"No operational connection factories available"));
}
}