opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
File was deleted opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConnectionLoadBalancer.java
New file @@ -0,0 +1,70 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2016 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static org.forgerock.util.promise.Promises.newExceptionPromise; import java.util.Collection; import org.forgerock.util.Options; import org.forgerock.util.promise.Promise; /** * An abstract connection based load balancer. Load balancing is performed when the application attempts to obtain a * connection. * <p> * Implementations should override the method {@code getInitialConnectionFactoryIndex()} in order to provide the policy * for selecting the first connection factory to use for each connection request. */ abstract class ConnectionLoadBalancer extends LoadBalancer { ConnectionLoadBalancer(final String loadBalancerName, final Collection<? extends ConnectionFactory> factories, final Options options) { super(loadBalancerName, factories, options); } @Override public final Connection getConnection() throws LdapException { return getMonitoredConnectionFactory(getInitialConnectionFactoryIndex()).getConnection(); } @Override public final Promise<Connection, LdapException> getConnectionAsync() { try { return getMonitoredConnectionFactory(getInitialConnectionFactoryIndex()).getConnectionAsync(); } catch (final LdapException e) { return newExceptionPromise(e); } } /** * Returns the index of the first connection factory which should be used in order to satisfy the next connection * request. * * @return The index of the first connection factory which should be used in order to satisfy the next connection * request. */ abstract int getInitialConnectionFactoryIndex(); } opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -22,22 +22,26 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2015 ForgeRock AS * Portions Copyright 2011-2016 ForgeRock AS */ package org.forgerock.opendj.ldap; import static org.forgerock.opendj.ldap.RequestHandlerFactoryAdapter.adaptRequestHandler; import static org.forgerock.util.time.Duration.duration; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Collection; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.forgerock.util.Option; import org.forgerock.util.Options; import org.forgerock.util.Reject; import org.forgerock.util.promise.Promise; import org.forgerock.util.time.Duration; /** * This class contains methods for creating and manipulating connection @@ -45,6 +49,27 @@ */ public final class Connections { /** * Specifies the interval between successive attempts to reconnect to offline load-balanced connection factories. * The default configuration is to attempt to reconnect every second. */ public static final Option<Duration> LOAD_BALANCER_MONITORING_INTERVAL = Option.withDefault(duration("1 seconds")); /** * Specifies the event listener which should be notified whenever a load-balanced connection factory changes state * from online to offline or vice-versa. By default events will be logged to the {@code LoadBalancingAlgorithm} * logger using the {@link LoadBalancerEventListener#LOG_EVENTS} listener. */ public static final Option<LoadBalancerEventListener> LOAD_BALANCER_EVENT_LISTENER = Option.of(LoadBalancerEventListener.class, LoadBalancerEventListener.LOG_EVENTS); /** * Specifies the scheduler which will be used for periodically reconnecting to offline connection factories. A * system-wide scheduler will be used by default. */ public static final Option<ScheduledExecutorService> LOAD_BALANCER_SCHEDULER = Option.of(ScheduledExecutorService.class, null); /** * Creates a new connection pool which creates new connections as needed * using the provided connection factory, but will reuse previously * allocated connections when they are available. @@ -365,14 +390,15 @@ } /** * Creates a new "round-robin" load-balance which will load-balance connections across the provided set of * Creates a new "round-robin" load-balancer which will load-balance connections across the provided set of * connection factories. A round robin load balancing algorithm distributes connection requests across a list of * connection factories one at a time. When the end of the list is reached, the algorithm starts again from the * beginning. * <p/> * This algorithm is typically used for load-balancing <i>within</i> data centers, where load must be distributed * equally across multiple data sources. This algorithm contrasts with the {@link FailoverLoadBalancingAlgorithm} * which is used for load-balancing <i>between</i> data centers. * equally across multiple data sources. This algorithm contrasts with the * {@link #newFailoverLoadBalancer(Collection, Options)} which is used for load-balancing <i>between</i> data * centers. * <p/> * If a problem occurs that temporarily prevents connections from being obtained for one of the connection * factories, then this algorithm automatically "fails over" to the next operational connection factory in the list. @@ -385,24 +411,55 @@ * @param factories * The connection factories. * @param options * This configuration options for the load-balancer. See {@link LoadBalancingAlgorithm} for common options. * This configuration options for the load-balancer. * @return The new round-robin load balancer. * @see #newFailoverLoadBalancer(Collection, Options) * @see LoadBalancingAlgorithm * @see #LOAD_BALANCER_EVENT_LISTENER * @see #LOAD_BALANCER_MONITORING_INTERVAL * @see #LOAD_BALANCER_SCHEDULER */ public static ConnectionFactory newRoundRobinLoadBalancer( final Collection<? extends ConnectionFactory> factories, final Options options) { return new LoadBalancer(new RoundRobinLoadBalancingAlgorithm(factories, options)); return new ConnectionLoadBalancer("RoundRobinLoadBalancer", factories, options) { private final int maxIndex = factories.size(); private final AtomicInteger nextIndex = new AtomicInteger(-1); @Override int getInitialConnectionFactoryIndex() { // A round robin pool of one connection factories is unlikely in // practice and requires special treatment. if (maxIndex == 1) { return 0; } // Determine the next factory to use: avoid blocking algorithm. int oldNextIndex; int newNextIndex; do { oldNextIndex = nextIndex.get(); newNextIndex = oldNextIndex + 1; if (newNextIndex == maxIndex) { newNextIndex = 0; } } while (!nextIndex.compareAndSet(oldNextIndex, newNextIndex)); // There's a potential, but benign, race condition here: other threads // could jump in and rotate through the list before we return the // connection factory. return newNextIndex; } }; } /** * Creates a new "fail-over" load-balance which will load-balance connections across the provided set of connection * Creates a new "fail-over" load-balancer which will load-balance connections across the provided set of connection * factories. A fail-over load balancing algorithm provides fault tolerance across multiple underlying connection * factories. * <p/> * This algorithm is typically used for load-balancing <i>between</i> data centers, where there is preference to * always forward connection requests to the <i>closest available</i> data center. This algorithm contrasts with the * {@link RoundRobinLoadBalancingAlgorithm} which is used for load-balancing <i>within</i> a data center. * {@link #newRoundRobinLoadBalancer(Collection, Options)} which is used for load-balancing <i>within</i> a data * center. * <p/> * This algorithm selects connection factories based on the order in which they were provided during construction. * More specifically, an attempt to obtain a connection factory will always return the <i>first operational</i> @@ -420,27 +477,22 @@ * @param factories * The connection factories. * @param options * This configuration options for the load-balancer. See {@link LoadBalancingAlgorithm} for common options. * This configuration options for the load-balancer. * @return The new fail-over load balancer. * @see #newRoundRobinLoadBalancer(Collection, Options) * @see LoadBalancingAlgorithm * @see #LOAD_BALANCER_EVENT_LISTENER * @see #LOAD_BALANCER_MONITORING_INTERVAL * @see #LOAD_BALANCER_SCHEDULER */ public static ConnectionFactory newFailoverLoadBalancer( final Collection<? extends ConnectionFactory> factories, final Options options) { return new LoadBalancer(new FailoverLoadBalancingAlgorithm(factories, options)); } /** * Creates a new load balancer which will obtain connections using the provided load balancing algorithm. * * @param algorithm * The load balancing algorithm which will be used to obtain the next * @return The new load balancer. * @throws NullPointerException * If {@code algorithm} was {@code null}. */ public static ConnectionFactory newLoadBalancer(final LoadBalancingAlgorithm algorithm) { return new LoadBalancer(algorithm); return new ConnectionLoadBalancer("FailoverLoadBalancer", factories, options) { @Override int getInitialConnectionFactoryIndex() { // Always start with the first connection factory. return 0; } }; } /** opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java
File was deleted opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
@@ -22,58 +22,289 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions Copyright 2011-2014 ForgeRock AS. * Portions Copyright 2011-2016 ForgeRock AS. */ package org.forgerock.opendj.ldap; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.util.Options; import org.forgerock.util.Reject; import org.forgerock.util.AsyncFunction; import org.forgerock.util.promise.Promise; import com.forgerock.opendj.util.ReferenceCountedObject; import static org.forgerock.opendj.ldap.Connections.*; import static org.forgerock.opendj.ldap.LdapException.*; import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_CONNECT_ERROR; import static org.forgerock.util.Utils.closeSilently; import static org.forgerock.util.Utils.joinAsString; import static org.forgerock.util.promise.Promises.*; import static com.forgerock.opendj.util.StaticUtils.*; /** * A load balancing connection factory allocates connections using the provided * algorithm. * An abstract load balancer providing common monitoring and failover capabilities. * <p> * Implementations should override the {@link ConnectionFactory} methods and use the * {@link #getMonitoredConnectionFactory(int)} method in order to obtain the desired load-balanced connection factory. * If the requested connection factory is unavailable then a linear probe will be performed in order to the next * suitable connection factory. */ final class LoadBalancer implements ConnectionFactory { private final LoadBalancingAlgorithm algorithm; abstract class LoadBalancer implements ConnectionFactory { LoadBalancer(final String loadBalancerName, final Collection<? extends ConnectionFactory> factories, final Options options) { Reject.ifNull(loadBalancerName, factories, options); LoadBalancer(final LoadBalancingAlgorithm algorithm) { Reject.ifNull(algorithm); this.algorithm = algorithm; } @Override public void close() { // Delegate to the algorithm. algorithm.close(); } @Override public Connection getConnection() throws LdapException { return algorithm.getConnectionFactory().getConnection(); } @Override public Promise<Connection, LdapException> getConnectionAsync() { final ConnectionFactory factory; try { factory = algorithm.getConnectionFactory(); } catch (final LdapException e) { return newExceptionPromise(e); this.loadBalancerName = loadBalancerName; this.monitoredFactories = new ArrayList<>(factories.size()); int i = 0; for (final ConnectionFactory f : factories) { this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++)); } return factory.getConnectionAsync(); this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(options.get(LOAD_BALANCER_SCHEDULER)); this.monitoringIntervalMS = options.get(LOAD_BALANCER_MONITORING_INTERVAL).to(TimeUnit.MILLISECONDS); this.listener = options.get(LOAD_BALANCER_EVENT_LISTENER); } @Override public String toString() { public final void close() { if (isClosed.compareAndSet(false, true)) { synchronized (stateLock) { if (monitoringFuture != null) { monitoringFuture.cancel(false); monitoringFuture = null; } } closeSilently(monitoredFactories); scheduler.release(); } } @Override public final String toString() { final StringBuilder builder = new StringBuilder(); builder.append("LoadBalancer("); builder.append(algorithm); builder.append(loadBalancerName); builder.append('('); joinAsString(builder, ",", monitoredFactories); builder.append(')'); return builder.toString(); } private final class MonitoredConnectionFactory implements ConnectionFactory, LdapResultHandler<Connection> { private final ConnectionFactory factory; private final AtomicBoolean isOperational = new AtomicBoolean(true); private volatile Promise<?, LdapException> pendingConnectPromise; private final int index; private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) { this.factory = factory; this.index = index; } @Override public void close() { // Should we cancel the promise? factory.close(); } @Override public Connection getConnection() throws LdapException { final Connection connection; try { connection = factory.getConnection(); } catch (LdapException e) { // Attempt failed - try next factory. notifyOffline(e); final int nextIndex = (index + 1) % monitoredFactories.size(); return getMonitoredConnectionFactory(nextIndex).getConnection(); } notifyOnline(); return connection; } @Override public Promise<Connection, LdapException> getConnectionAsync() { return factory.getConnectionAsync().thenAsync( new AsyncFunction<Connection, Connection, LdapException>() { @Override public Promise<Connection, LdapException> apply(Connection value) throws LdapException { notifyOnline(); return newResultPromise(value); } }, new AsyncFunction<LdapException, Connection, LdapException>() { @Override public Promise<Connection, LdapException> apply(LdapException error) throws LdapException { // Attempt failed - try next factory. notifyOffline(error); final int nextIndex = (index + 1) % monitoredFactories.size(); return getMonitoredConnectionFactory(nextIndex).getConnectionAsync(); } }); } /** Handle monitoring connection request failure. */ @Override public void handleException(final LdapException exception) { notifyOffline(exception); } /** Handle monitoring connection request success. */ @Override public void handleResult(final Connection connection) { // The connection is not going to be used, so close it immediately. connection.close(); notifyOnline(); } @Override public String toString() { return factory.toString(); } /** Attempt to connect to the factory if it is offline and there is no pending monitoring request. */ private synchronized void checkIfAvailable() { if (!isOperational.get() && (pendingConnectPromise == null || pendingConnectPromise.isDone())) { logger.debug(LocalizableMessage.raw("Attempting reconnect to offline factory '%s'", this)); pendingConnectPromise = factory.getConnectionAsync().thenOnResult(this).thenOnException(this); } } private void notifyOffline(final LdapException error) { // Save the error in case the load-balancer is exhausted. lastFailure = error; if (isOperational.getAndSet(false)) { // Transition from online to offline. synchronized (listenerLock) { try { listener.handleConnectionFactoryOffline(factory, error); } catch (RuntimeException e) { handleListenerException(e); } } synchronized (stateLock) { offlineFactoriesCount++; if (offlineFactoriesCount == 1) { logger.debug(LocalizableMessage.raw("Starting monitoring thread")); monitoringFuture = scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0, monitoringIntervalMS, TimeUnit.MILLISECONDS); } } } } private void notifyOnline() { if (!isOperational.getAndSet(true)) { // Transition from offline to online. synchronized (listenerLock) { try { listener.handleConnectionFactoryOnline(factory); } catch (RuntimeException e) { handleListenerException(e); } } synchronized (stateLock) { offlineFactoriesCount--; if (offlineFactoriesCount == 0) { logger.debug(LocalizableMessage.raw("Stopping monitoring thread")); monitoringFuture.cancel(false); monitoringFuture = null; } } } } private void handleListenerException(RuntimeException e) { // TODO: I18N logger.error(LocalizableMessage.raw( "A run-time error occurred while processing a load-balancer event", e)); } } private final class MonitorRunnable implements Runnable { private MonitorRunnable() { // Nothing to do. } @Override public void run() { for (final MonitoredConnectionFactory factory : monitoredFactories) { factory.checkIfAvailable(); } } } private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private final String loadBalancerName; private final List<MonitoredConnectionFactory> monitoredFactories; private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; private final Object stateLock = new Object(); /** * The last connection failure which caused a connection factory to be * marked offline. This is used in order to help diagnose problems when the * load-balancer has exhausted all of its factories. */ private volatile LdapException lastFailure; /** The event listener which should be notified when connection factories go on or off-line. */ private final LoadBalancerEventListener listener; /** Ensures that events are notified one at a time. */ private final Object listenerLock = new Object(); /** Guarded by stateLock. */ private int offlineFactoriesCount; private final long monitoringIntervalMS; /** Guarded by stateLock. */ private ScheduledFuture<?> monitoringFuture; private final AtomicBoolean isClosed = new AtomicBoolean(); /** * Return the first available connection factory starting from {@code initialIndex}. * * @param initialIndex The index of the connection factory to be returned if operational. The index may be * greater than the number of factories in which case this method will perform a modulus * operation to bring it into range. NOTE: for performance reasons callers should attempt to * use valid indexes because the modulus operation is relatively expensive. * @return The first available connection factory starting from the initial index. * @throws LdapException If no connection factories are available. */ final ConnectionFactory getMonitoredConnectionFactory(final int initialIndex) throws LdapException { final int maxIndex = monitoredFactories.size(); int index = initialIndex < maxIndex ? initialIndex : initialIndex % maxIndex; do { final MonitoredConnectionFactory factory = monitoredFactories.get(index); if (factory.isOperational.get()) { return factory; } index = (index + 1) % maxIndex; } while (index != initialIndex); /* * All factories are offline so give up. We could have a configurable * policy here such as waiting indefinitely, or for a configurable * timeout period. */ throw newLdapException(CLIENT_SIDE_CONNECT_ERROR, "No operational connection factories available", lastFailure); } final String getLoadBalancerName() { return loadBalancerName; } } opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancerEventListener.java
@@ -20,7 +20,7 @@ * * CDDL HEADER END * * Copyright 2013-2015 ForgeRock AS. * Copyright 2013-2016 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -40,16 +40,16 @@ * sent at a time. Event listener implementations should not need to be thread * safe. * * @see LoadBalancingAlgorithm#LOAD_BALANCER_EVENT_LISTENER * @see Connections#LOAD_BALANCER_EVENT_LISTENER */ public interface LoadBalancerEventListener extends EventListener { /** * An event listener implementation which logs events to the LoadBalancingAlgorithm logger. This event listener is * the default implementation configured using the {@link LoadBalancingAlgorithm#LOAD_BALANCER_EVENT_LISTENER} * the default implementation configured using the {@link Connections#LOAD_BALANCER_EVENT_LISTENER} * option. */ LoadBalancerEventListener LOG_EVENTS = new LoadBalancerEventListener() { private final LocalizedLogger logger = LocalizedLogger.getLocalizedLogger(LoadBalancingAlgorithm.class); private final LocalizedLogger logger = LocalizedLogger.getLocalizedLogger(LoadBalancer.class); @Override public void handleConnectionFactoryOnline(final ConnectionFactory factory) { opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java
File was deleted opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java
File was deleted opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/LoadBalancerTestCase.java
File was renamed from opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java @@ -21,18 +21,16 @@ * CDDL HEADER END * * * Copyright 2014-2015 ForgeRock AS * Copyright 2014-2016 ForgeRock AS */ package org.forgerock.opendj.ldap; import static java.util.Arrays.asList; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Fail.fail; import static org.forgerock.opendj.ldap.Connections.*; import static org.forgerock.opendj.ldap.Connections.newRoundRobinLoadBalancer; import static org.forgerock.opendj.ldap.LdapException.newLdapException; import static org.forgerock.opendj.ldap.LoadBalancingAlgorithm.LOAD_BALANCER_EVENT_LISTENER; import static org.forgerock.opendj.ldap.LoadBalancingAlgorithm.LOAD_BALANCER_MONITORING_INTERVAL; import static org.forgerock.opendj.ldap.LoadBalancingAlgorithm.LOAD_BALANCER_SCHEDULER; import static org.forgerock.util.Options.defaultOptions; import static org.forgerock.util.promise.Promises.newExceptionPromise; import static org.forgerock.util.promise.Promises.newResultPromise; @@ -51,7 +49,7 @@ import org.testng.annotations.Test; @SuppressWarnings("javadoc") public class AbstractLoadBalancingAlgorithmTestCase extends SdkTestCase { public class LoadBalancerTestCase extends SdkTestCase { private static ConnectionFactory mockAsync(final ConnectionFactory mock) { return new ConnectionFactory() { @Override opendj-sdk/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
@@ -11,7 +11,7 @@ * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions copyright [year] [name of copyright owner]". * * Copyright 2013-2015 ForgeRock AS. * Copyright 2013-2016 ForgeRock AS. */ package org.forgerock.opendj.rest2ldap; @@ -21,7 +21,7 @@ import static org.forgerock.opendj.ldap.Connections.newFailoverLoadBalancer; import static org.forgerock.opendj.ldap.Connections.newRoundRobinLoadBalancer; import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*; import static org.forgerock.opendj.ldap.LoadBalancingAlgorithm.LOAD_BALANCER_MONITORING_INTERVAL; import static org.forgerock.opendj.ldap.Connections.LOAD_BALANCER_MONITORING_INTERVAL; import static org.forgerock.opendj.ldap.requests.Requests.newSearchRequest; import static org.forgerock.opendj.ldap.schema.CoreSchema.getEntryUUIDAttributeType; import static org.forgerock.opendj.rest2ldap.ReadOnUpdatePolicy.CONTROLS;