From fd389fc0b592e2d3203fef53ecfb5c3d61d2736d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 17 Feb 2016 00:06:20 +0000
Subject: [PATCH] OPENDJSDK-16: refactoring work required for affinity based load-balancer
---
/dev/null | 79 -------
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConnectionLoadBalancer.java | 70 +++++++
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancerEventListener.java | 8
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java | 102 +++++++--
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java | 301 ++++++++++++++++++++++++++---
opendj-sdk/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java | 4
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/LoadBalancerTestCase.java | 8
7 files changed, 422 insertions(+), 150 deletions(-)
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
deleted file mode 100644
index 1c30dad..0000000
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS.
- */
-package org.forgerock.opendj.ldap;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.forgerock.i18n.LocalizableMessage;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.util.Options;
-import org.forgerock.util.Reject;
-import org.forgerock.util.AsyncFunction;
-import org.forgerock.util.promise.Promise;
-
-import com.forgerock.opendj.util.ReferenceCountedObject;
-
-import static org.forgerock.opendj.ldap.LdapException.*;
-import static org.forgerock.util.promise.Promises.*;
-
-import static com.forgerock.opendj.util.StaticUtils.*;
-
-/**
- * An abstract load balancing algorithm providing monitoring and failover
- * capabilities.
- * <p>
- * Implementations should override the method
- * {@code getInitialConnectionFactoryIndex()} in order to provide the policy for
- * selecting the first connection factory to use for each connection request.
- */
-abstract class AbstractLoadBalancingAlgorithm implements LoadBalancingAlgorithm {
- private final class MonitoredConnectionFactory implements ConnectionFactory,
- LdapResultHandler<Connection> {
-
- private final ConnectionFactory factory;
- private final AtomicBoolean isOperational = new AtomicBoolean(true);
- private volatile Promise<?, LdapException> pendingConnectPromise;
- private final int index;
-
- private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) {
- this.factory = factory;
- this.index = index;
- }
-
- @Override
- public void close() {
- // Should we cancel the promise?
- factory.close();
- }
-
- @Override
- public Connection getConnection() throws LdapException {
- final Connection connection;
- try {
- connection = factory.getConnection();
- } catch (LdapException e) {
- // Attempt failed - try next factory.
- notifyOffline(e);
- final int nextIndex = (index + 1) % monitoredFactories.size();
- final MonitoredConnectionFactory nextFactory =
- getMonitoredConnectionFactory(nextIndex);
- return nextFactory.getConnection();
- }
- notifyOnline();
- return connection;
- }
-
- @Override
- public Promise<Connection, LdapException> getConnectionAsync() {
- return factory.getConnectionAsync().thenAsync(
- new AsyncFunction<Connection, Connection, LdapException>() {
- @Override
- public Promise<Connection, LdapException> apply(Connection value) throws LdapException {
- notifyOnline();
- return newResultPromise(value);
- }
- },
- new AsyncFunction<LdapException, Connection, LdapException>() {
- @Override
- public Promise<Connection, LdapException> apply(LdapException error) throws LdapException {
- // Attempt failed - try next factory.
- notifyOffline(error);
- final int nextIndex = (index + 1) % monitoredFactories.size();
- return getMonitoredConnectionFactory(nextIndex).getConnectionAsync();
- }
- });
- }
-
-
-
- /**
- * Handle monitoring connection request failure.
- */
- @Override
- public void handleException(final LdapException exception) {
- notifyOffline(exception);
- }
-
- /**
- * Handle monitoring connection request success.
- */
- @Override
- public void handleResult(final Connection connection) {
- // The connection is not going to be used, so close it immediately.
- connection.close();
- notifyOnline();
- }
-
- @Override
- public String toString() {
- return factory.toString();
- }
-
- /**
- * Attempt to connect to the factory if it is offline and there is no
- * pending monitoring request.
- */
- private synchronized void checkIfAvailable() {
- if (!isOperational.get() && (pendingConnectPromise == null || pendingConnectPromise.isDone())) {
- logger.debug(LocalizableMessage.raw("Attempting reconnect to offline factory '%s'", this));
- pendingConnectPromise = factory.getConnectionAsync().thenOnResult(this).thenOnException(this);
- }
- }
-
- private void notifyOffline(final LdapException error) {
- // Save the error in case the load-balancer is exhausted.
- lastFailure = error;
-
- if (isOperational.getAndSet(false)) {
- // Transition from online to offline.
- synchronized (listenerLock) {
- try {
- listener.handleConnectionFactoryOffline(factory, error);
- } catch (RuntimeException e) {
- handleListenerException(e);
- }
- }
-
- synchronized (stateLock) {
- offlineFactoriesCount++;
- if (offlineFactoriesCount == 1) {
- logger.debug(LocalizableMessage.raw("Starting monitoring thread"));
- monitoringFuture =
- scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0,
- monitoringIntervalMS, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
- private void notifyOnline() {
- if (!isOperational.getAndSet(true)) {
- // Transition from offline to online.
- synchronized (listenerLock) {
- try {
- listener.handleConnectionFactoryOnline(factory);
- } catch (RuntimeException e) {
- handleListenerException(e);
- }
- }
-
- synchronized (stateLock) {
- offlineFactoriesCount--;
- if (offlineFactoriesCount == 0) {
- logger.debug(LocalizableMessage.raw("Stopping monitoring thread"));
-
- monitoringFuture.cancel(false);
- monitoringFuture = null;
- }
- }
- }
- }
-
- private void handleListenerException(RuntimeException e) {
- // TODO: I18N
- logger.error(LocalizableMessage.raw("A run-time error occurred while processing a load-balancer event", e));
- }
- }
-
- private final class MonitorRunnable implements Runnable {
- private MonitorRunnable() {
- // Nothing to do.
- }
-
- @Override
- public void run() {
- for (final MonitoredConnectionFactory factory : monitoredFactories) {
- factory.checkIfAvailable();
- }
- }
- }
-
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private final List<MonitoredConnectionFactory> monitoredFactories;
- private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
- private final Object stateLock = new Object();
-
- /**
- * The last connection failure which caused a connection factory to be
- * marked offline. This is used in order to help diagnose problems when the
- * load-balancer has exhausted all of its factories.
- */
- private volatile LdapException lastFailure;
-
- /**
- * The event listener which should be notified when connection factories go
- * on or off-line.
- */
- private final LoadBalancerEventListener listener;
-
- /**
- * Ensures that events are notified one at a time.
- */
- private final Object listenerLock = new Object();
-
- /**
- * Guarded by stateLock.
- */
- private int offlineFactoriesCount;
- private final long monitoringIntervalMS;
-
- /**
- * Guarded by stateLock.
- */
- private ScheduledFuture<?> monitoringFuture;
- private final AtomicBoolean isClosed = new AtomicBoolean();
-
- AbstractLoadBalancingAlgorithm(final Collection<? extends ConnectionFactory> factories, final Options options) {
- Reject.ifNull(factories, options);
-
- this.monitoredFactories = new ArrayList<>(factories.size());
- int i = 0;
- for (final ConnectionFactory f : factories) {
- this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++));
- }
- this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(options.get(LOAD_BALANCER_SCHEDULER));
- this.monitoringIntervalMS = options.get(LOAD_BALANCER_MONITORING_INTERVAL).to(TimeUnit.MILLISECONDS);
- this.listener = options.get(LOAD_BALANCER_EVENT_LISTENER);
- }
-
- @Override
- public void close() {
- if (isClosed.compareAndSet(false, true)) {
- synchronized (stateLock) {
- if (monitoringFuture != null) {
- monitoringFuture.cancel(false);
- monitoringFuture = null;
- }
- }
- for (ConnectionFactory factory : monitoredFactories) {
- factory.close();
- }
- scheduler.release();
- }
- }
-
- @Override
- public final ConnectionFactory getConnectionFactory() throws LdapException {
- final int index = getInitialConnectionFactoryIndex();
- return getMonitoredConnectionFactory(index);
- }
-
- @Override
- public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append(getAlgorithmName());
- builder.append('(');
- boolean isFirst = true;
- for (final ConnectionFactory factory : monitoredFactories) {
- if (!isFirst) {
- builder.append(',');
- } else {
- isFirst = false;
- }
- builder.append(factory);
- }
- builder.append(')');
- return builder.toString();
- }
-
- /**
- * Returns the name of this load balancing algorithm.
- *
- * @return The name of this load balancing algorithm.
- */
- abstract String getAlgorithmName();
-
- /**
- * Returns the index of the first connection factory which should be used in
- * order to satisfy the next connection request.
- *
- * @return The index of the first connection factory which should be used in
- * order to satisfy the next connection request.
- */
- abstract int getInitialConnectionFactoryIndex();
-
- /** Return the first factory after index which is operational. */
- private MonitoredConnectionFactory getMonitoredConnectionFactory(final int initialIndex) throws LdapException {
- int index = initialIndex;
- final int maxIndex = monitoredFactories.size();
- do {
- final MonitoredConnectionFactory factory = monitoredFactories.get(index);
- if (factory.isOperational.get()) {
- return factory;
- }
- index = (index + 1) % maxIndex;
- } while (index != initialIndex);
-
- /*
- * All factories are offline so give up. We could have a configurable
- * policy here such as waiting indefinitely, or for a configurable
- * timeout period.
- */
- throw newLdapException(ResultCode.CLIENT_SIDE_CONNECT_ERROR,
- "No operational connection factories available", lastFailure);
- }
-}
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConnectionLoadBalancer.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConnectionLoadBalancer.java
new file mode 100644
index 0000000..95518b4
--- /dev/null
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConnectionLoadBalancer.java
@@ -0,0 +1,70 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.forgerock.util.promise.Promises.newExceptionPromise;
+
+import java.util.Collection;
+
+import org.forgerock.util.Options;
+import org.forgerock.util.promise.Promise;
+
+/**
+ * An abstract connection based load balancer. Load balancing is performed when the application attempts to obtain a
+ * connection.
+ * <p>
+ * Implementations should override the method {@code getInitialConnectionFactoryIndex()} in order to provide the policy
+ * for selecting the first connection factory to use for each connection request.
+ */
+abstract class ConnectionLoadBalancer extends LoadBalancer {
+ ConnectionLoadBalancer(final String loadBalancerName,
+ final Collection<? extends ConnectionFactory> factories,
+ final Options options) {
+ super(loadBalancerName, factories, options);
+ }
+
+ @Override
+ public final Connection getConnection() throws LdapException {
+ return getMonitoredConnectionFactory(getInitialConnectionFactoryIndex()).getConnection();
+ }
+
+ @Override
+ public final Promise<Connection, LdapException> getConnectionAsync() {
+ try {
+ return getMonitoredConnectionFactory(getInitialConnectionFactoryIndex()).getConnectionAsync();
+ } catch (final LdapException e) {
+ return newExceptionPromise(e);
+ }
+ }
+
+ /**
+ * Returns the index of the first connection factory which should be used in order to satisfy the next connection
+ * request.
+ *
+ * @return The index of the first connection factory which should be used in order to satisfy the next connection
+ * request.
+ */
+ abstract int getInitialConnectionFactoryIndex();
+}
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
index da5300a..ccffb0d 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -22,22 +22,26 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
+ * Portions Copyright 2011-2016 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
import static org.forgerock.opendj.ldap.RequestHandlerFactoryAdapter.adaptRequestHandler;
+import static org.forgerock.util.time.Duration.duration;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.forgerock.util.Option;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import org.forgerock.util.promise.Promise;
+import org.forgerock.util.time.Duration;
/**
* This class contains methods for creating and manipulating connection
@@ -45,6 +49,27 @@
*/
public final class Connections {
/**
+ * Specifies the interval between successive attempts to reconnect to offline load-balanced connection factories.
+ * The default configuration is to attempt to reconnect every second.
+ */
+ public static final Option<Duration> LOAD_BALANCER_MONITORING_INTERVAL = Option.withDefault(duration("1 seconds"));
+
+ /**
+ * Specifies the event listener which should be notified whenever a load-balanced connection factory changes state
+ * from online to offline or vice-versa. By default events will be logged to the {@code LoadBalancingAlgorithm}
+ * logger using the {@link LoadBalancerEventListener#LOG_EVENTS} listener.
+ */
+ public static final Option<LoadBalancerEventListener> LOAD_BALANCER_EVENT_LISTENER =
+ Option.of(LoadBalancerEventListener.class, LoadBalancerEventListener.LOG_EVENTS);
+
+ /**
+ * Specifies the scheduler which will be used for periodically reconnecting to offline connection factories. A
+ * system-wide scheduler will be used by default.
+ */
+ public static final Option<ScheduledExecutorService> LOAD_BALANCER_SCHEDULER =
+ Option.of(ScheduledExecutorService.class, null);
+
+ /**
* Creates a new connection pool which creates new connections as needed
* using the provided connection factory, but will reuse previously
* allocated connections when they are available.
@@ -365,14 +390,15 @@
}
/**
- * Creates a new "round-robin" load-balance which will load-balance connections across the provided set of
+ * Creates a new "round-robin" load-balancer which will load-balance connections across the provided set of
* connection factories. A round robin load balancing algorithm distributes connection requests across a list of
* connection factories one at a time. When the end of the list is reached, the algorithm starts again from the
* beginning.
* <p/>
* This algorithm is typically used for load-balancing <i>within</i> data centers, where load must be distributed
- * equally across multiple data sources. This algorithm contrasts with the {@link FailoverLoadBalancingAlgorithm}
- * which is used for load-balancing <i>between</i> data centers.
+ * equally across multiple data sources. This algorithm contrasts with the
+ * {@link #newFailoverLoadBalancer(Collection, Options)} which is used for load-balancing <i>between</i> data
+ * centers.
* <p/>
* If a problem occurs that temporarily prevents connections from being obtained for one of the connection
* factories, then this algorithm automatically "fails over" to the next operational connection factory in the list.
@@ -385,24 +411,55 @@
* @param factories
* The connection factories.
* @param options
- * This configuration options for the load-balancer. See {@link LoadBalancingAlgorithm} for common options.
+ * This configuration options for the load-balancer.
* @return The new round-robin load balancer.
* @see #newFailoverLoadBalancer(Collection, Options)
- * @see LoadBalancingAlgorithm
+ * @see #LOAD_BALANCER_EVENT_LISTENER
+ * @see #LOAD_BALANCER_MONITORING_INTERVAL
+ * @see #LOAD_BALANCER_SCHEDULER
*/
public static ConnectionFactory newRoundRobinLoadBalancer(
final Collection<? extends ConnectionFactory> factories, final Options options) {
- return new LoadBalancer(new RoundRobinLoadBalancingAlgorithm(factories, options));
+ return new ConnectionLoadBalancer("RoundRobinLoadBalancer", factories, options) {
+ private final int maxIndex = factories.size();
+ private final AtomicInteger nextIndex = new AtomicInteger(-1);
+
+ @Override
+ int getInitialConnectionFactoryIndex() {
+ // A round robin pool of one connection factories is unlikely in
+ // practice and requires special treatment.
+ if (maxIndex == 1) {
+ return 0;
+ }
+
+ // Determine the next factory to use: avoid blocking algorithm.
+ int oldNextIndex;
+ int newNextIndex;
+ do {
+ oldNextIndex = nextIndex.get();
+ newNextIndex = oldNextIndex + 1;
+ if (newNextIndex == maxIndex) {
+ newNextIndex = 0;
+ }
+ } while (!nextIndex.compareAndSet(oldNextIndex, newNextIndex));
+
+ // There's a potential, but benign, race condition here: other threads
+ // could jump in and rotate through the list before we return the
+ // connection factory.
+ return newNextIndex;
+ }
+ };
}
/**
- * Creates a new "fail-over" load-balance which will load-balance connections across the provided set of connection
+ * Creates a new "fail-over" load-balancer which will load-balance connections across the provided set of connection
* factories. A fail-over load balancing algorithm provides fault tolerance across multiple underlying connection
* factories.
* <p/>
* This algorithm is typically used for load-balancing <i>between</i> data centers, where there is preference to
* always forward connection requests to the <i>closest available</i> data center. This algorithm contrasts with the
- * {@link RoundRobinLoadBalancingAlgorithm} which is used for load-balancing <i>within</i> a data center.
+ * {@link #newRoundRobinLoadBalancer(Collection, Options)} which is used for load-balancing <i>within</i> a data
+ * center.
* <p/>
* This algorithm selects connection factories based on the order in which they were provided during construction.
* More specifically, an attempt to obtain a connection factory will always return the <i>first operational</i>
@@ -420,27 +477,22 @@
* @param factories
* The connection factories.
* @param options
- * This configuration options for the load-balancer. See {@link LoadBalancingAlgorithm} for common options.
+ * This configuration options for the load-balancer.
* @return The new fail-over load balancer.
* @see #newRoundRobinLoadBalancer(Collection, Options)
- * @see LoadBalancingAlgorithm
+ * @see #LOAD_BALANCER_EVENT_LISTENER
+ * @see #LOAD_BALANCER_MONITORING_INTERVAL
+ * @see #LOAD_BALANCER_SCHEDULER
*/
public static ConnectionFactory newFailoverLoadBalancer(
final Collection<? extends ConnectionFactory> factories, final Options options) {
- return new LoadBalancer(new FailoverLoadBalancingAlgorithm(factories, options));
- }
-
- /**
- * Creates a new load balancer which will obtain connections using the provided load balancing algorithm.
- *
- * @param algorithm
- * The load balancing algorithm which will be used to obtain the next
- * @return The new load balancer.
- * @throws NullPointerException
- * If {@code algorithm} was {@code null}.
- */
- public static ConnectionFactory newLoadBalancer(final LoadBalancingAlgorithm algorithm) {
- return new LoadBalancer(algorithm);
+ return new ConnectionLoadBalancer("FailoverLoadBalancer", factories, options) {
+ @Override
+ int getInitialConnectionFactoryIndex() {
+ // Always start with the first connection factory.
+ return 0;
+ }
+ };
}
/**
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java
deleted file mode 100644
index 7290b01..0000000
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/FailoverLoadBalancingAlgorithm.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2013-2015 ForgeRock AS.
- */
-
-package org.forgerock.opendj.ldap;
-
-import java.util.Collection;
-
-import org.forgerock.util.Options;
-
-/**
- * A fail-over load balancing algorithm provides fault tolerance across multiple
- * underlying connection factories.
- */
-final class FailoverLoadBalancingAlgorithm extends AbstractLoadBalancingAlgorithm {
- FailoverLoadBalancingAlgorithm(final Collection<? extends ConnectionFactory> factories, final Options options) {
- super(factories, options);
- }
-
- @Override
- String getAlgorithmName() {
- return "Failover";
- }
-
- @Override
- int getInitialConnectionFactoryIndex() {
- // Always start with the first connection factory.
- return 0;
- }
-}
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
index 90e89c2..eaa3bc1 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
@@ -22,58 +22,289 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2014 ForgeRock AS.
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
-
package org.forgerock.opendj.ldap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.util.Options;
import org.forgerock.util.Reject;
+import org.forgerock.util.AsyncFunction;
import org.forgerock.util.promise.Promise;
+import com.forgerock.opendj.util.ReferenceCountedObject;
+
+import static org.forgerock.opendj.ldap.Connections.*;
+import static org.forgerock.opendj.ldap.LdapException.*;
+import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_CONNECT_ERROR;
+import static org.forgerock.util.Utils.closeSilently;
+import static org.forgerock.util.Utils.joinAsString;
import static org.forgerock.util.promise.Promises.*;
+import static com.forgerock.opendj.util.StaticUtils.*;
+
/**
- * A load balancing connection factory allocates connections using the provided
- * algorithm.
+ * An abstract load balancer providing common monitoring and failover capabilities.
+ * <p>
+ * Implementations should override the {@link ConnectionFactory} methods and use the
+ * {@link #getMonitoredConnectionFactory(int)} method in order to obtain the desired load-balanced connection factory.
+ * If the requested connection factory is unavailable then a linear probe will be performed in order to the next
+ * suitable connection factory.
*/
-final class LoadBalancer implements ConnectionFactory {
- private final LoadBalancingAlgorithm algorithm;
+abstract class LoadBalancer implements ConnectionFactory {
+ LoadBalancer(final String loadBalancerName,
+ final Collection<? extends ConnectionFactory> factories,
+ final Options options) {
+ Reject.ifNull(loadBalancerName, factories, options);
- LoadBalancer(final LoadBalancingAlgorithm algorithm) {
- Reject.ifNull(algorithm);
- this.algorithm = algorithm;
- }
-
- @Override
- public void close() {
- // Delegate to the algorithm.
- algorithm.close();
- }
-
- @Override
- public Connection getConnection() throws LdapException {
- return algorithm.getConnectionFactory().getConnection();
- }
-
- @Override
- public Promise<Connection, LdapException> getConnectionAsync() {
- final ConnectionFactory factory;
-
- try {
- factory = algorithm.getConnectionFactory();
- } catch (final LdapException e) {
- return newExceptionPromise(e);
+ this.loadBalancerName = loadBalancerName;
+ this.monitoredFactories = new ArrayList<>(factories.size());
+ int i = 0;
+ for (final ConnectionFactory f : factories) {
+ this.monitoredFactories.add(new MonitoredConnectionFactory(f, i++));
}
-
- return factory.getConnectionAsync();
+ this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(options.get(LOAD_BALANCER_SCHEDULER));
+ this.monitoringIntervalMS = options.get(LOAD_BALANCER_MONITORING_INTERVAL).to(TimeUnit.MILLISECONDS);
+ this.listener = options.get(LOAD_BALANCER_EVENT_LISTENER);
}
@Override
- public String toString() {
+ public final void close() {
+ if (isClosed.compareAndSet(false, true)) {
+ synchronized (stateLock) {
+ if (monitoringFuture != null) {
+ monitoringFuture.cancel(false);
+ monitoringFuture = null;
+ }
+ }
+ closeSilently(monitoredFactories);
+ scheduler.release();
+ }
+ }
+
+ @Override
+ public final String toString() {
final StringBuilder builder = new StringBuilder();
- builder.append("LoadBalancer(");
- builder.append(algorithm);
+ builder.append(loadBalancerName);
+ builder.append('(');
+ joinAsString(builder, ",", monitoredFactories);
builder.append(')');
return builder.toString();
}
+
+ private final class MonitoredConnectionFactory implements ConnectionFactory, LdapResultHandler<Connection> {
+ private final ConnectionFactory factory;
+ private final AtomicBoolean isOperational = new AtomicBoolean(true);
+ private volatile Promise<?, LdapException> pendingConnectPromise;
+ private final int index;
+
+ private MonitoredConnectionFactory(final ConnectionFactory factory, final int index) {
+ this.factory = factory;
+ this.index = index;
+ }
+
+ @Override
+ public void close() {
+ // Should we cancel the promise?
+ factory.close();
+ }
+
+ @Override
+ public Connection getConnection() throws LdapException {
+ final Connection connection;
+ try {
+ connection = factory.getConnection();
+ } catch (LdapException e) {
+ // Attempt failed - try next factory.
+ notifyOffline(e);
+ final int nextIndex = (index + 1) % monitoredFactories.size();
+ return getMonitoredConnectionFactory(nextIndex).getConnection();
+ }
+ notifyOnline();
+ return connection;
+ }
+
+ @Override
+ public Promise<Connection, LdapException> getConnectionAsync() {
+ return factory.getConnectionAsync().thenAsync(
+ new AsyncFunction<Connection, Connection, LdapException>() {
+ @Override
+ public Promise<Connection, LdapException> apply(Connection value) throws LdapException {
+ notifyOnline();
+ return newResultPromise(value);
+ }
+ },
+ new AsyncFunction<LdapException, Connection, LdapException>() {
+ @Override
+ public Promise<Connection, LdapException> apply(LdapException error) throws LdapException {
+ // Attempt failed - try next factory.
+ notifyOffline(error);
+ final int nextIndex = (index + 1) % monitoredFactories.size();
+ return getMonitoredConnectionFactory(nextIndex).getConnectionAsync();
+ }
+ });
+ }
+
+ /** Handle monitoring connection request failure. */
+ @Override
+ public void handleException(final LdapException exception) {
+ notifyOffline(exception);
+ }
+
+ /** Handle monitoring connection request success. */
+ @Override
+ public void handleResult(final Connection connection) {
+ // The connection is not going to be used, so close it immediately.
+ connection.close();
+ notifyOnline();
+ }
+
+ @Override
+ public String toString() {
+ return factory.toString();
+ }
+
+ /** Attempt to connect to the factory if it is offline and there is no pending monitoring request. */
+ private synchronized void checkIfAvailable() {
+ if (!isOperational.get() && (pendingConnectPromise == null || pendingConnectPromise.isDone())) {
+ logger.debug(LocalizableMessage.raw("Attempting reconnect to offline factory '%s'", this));
+ pendingConnectPromise = factory.getConnectionAsync().thenOnResult(this).thenOnException(this);
+ }
+ }
+
+ private void notifyOffline(final LdapException error) {
+ // Save the error in case the load-balancer is exhausted.
+ lastFailure = error;
+ if (isOperational.getAndSet(false)) {
+ // Transition from online to offline.
+ synchronized (listenerLock) {
+ try {
+ listener.handleConnectionFactoryOffline(factory, error);
+ } catch (RuntimeException e) {
+ handleListenerException(e);
+ }
+ }
+ synchronized (stateLock) {
+ offlineFactoriesCount++;
+ if (offlineFactoriesCount == 1) {
+ logger.debug(LocalizableMessage.raw("Starting monitoring thread"));
+ monitoringFuture =
+ scheduler.get().scheduleWithFixedDelay(new MonitorRunnable(), 0,
+ monitoringIntervalMS, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ private void notifyOnline() {
+ if (!isOperational.getAndSet(true)) {
+ // Transition from offline to online.
+ synchronized (listenerLock) {
+ try {
+ listener.handleConnectionFactoryOnline(factory);
+ } catch (RuntimeException e) {
+ handleListenerException(e);
+ }
+ }
+ synchronized (stateLock) {
+ offlineFactoriesCount--;
+ if (offlineFactoriesCount == 0) {
+ logger.debug(LocalizableMessage.raw("Stopping monitoring thread"));
+
+ monitoringFuture.cancel(false);
+ monitoringFuture = null;
+ }
+ }
+ }
+ }
+
+ private void handleListenerException(RuntimeException e) {
+ // TODO: I18N
+ logger.error(LocalizableMessage.raw(
+ "A run-time error occurred while processing a load-balancer event", e));
+ }
+ }
+
+ private final class MonitorRunnable implements Runnable {
+ private MonitorRunnable() {
+ // Nothing to do.
+ }
+
+ @Override
+ public void run() {
+ for (final MonitoredConnectionFactory factory : monitoredFactories) {
+ factory.checkIfAvailable();
+ }
+ }
+ }
+
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private final String loadBalancerName;
+ private final List<MonitoredConnectionFactory> monitoredFactories;
+ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
+ private final Object stateLock = new Object();
+
+ /**
+ * The last connection failure which caused a connection factory to be
+ * marked offline. This is used in order to help diagnose problems when the
+ * load-balancer has exhausted all of its factories.
+ */
+ private volatile LdapException lastFailure;
+
+ /** The event listener which should be notified when connection factories go on or off-line. */
+ private final LoadBalancerEventListener listener;
+
+ /** Ensures that events are notified one at a time. */
+ private final Object listenerLock = new Object();
+
+ /** Guarded by stateLock. */
+ private int offlineFactoriesCount;
+ private final long monitoringIntervalMS;
+
+ /** Guarded by stateLock. */
+ private ScheduledFuture<?> monitoringFuture;
+ private final AtomicBoolean isClosed = new AtomicBoolean();
+
+ /**
+ * Return the first available connection factory starting from {@code initialIndex}.
+ *
+ * @param initialIndex The index of the connection factory to be returned if operational. The index may be
+ * greater than the number of factories in which case this method will perform a modulus
+ * operation to bring it into range. NOTE: for performance reasons callers should attempt to
+ * use valid indexes because the modulus operation is relatively expensive.
+ * @return The first available connection factory starting from the initial index.
+ * @throws LdapException If no connection factories are available.
+ */
+ final ConnectionFactory getMonitoredConnectionFactory(final int initialIndex) throws LdapException {
+ final int maxIndex = monitoredFactories.size();
+ int index = initialIndex < maxIndex ? initialIndex : initialIndex % maxIndex;
+ do {
+ final MonitoredConnectionFactory factory = monitoredFactories.get(index);
+ if (factory.isOperational.get()) {
+ return factory;
+ }
+ index = (index + 1) % maxIndex;
+ } while (index != initialIndex);
+
+ /*
+ * All factories are offline so give up. We could have a configurable
+ * policy here such as waiting indefinitely, or for a configurable
+ * timeout period.
+ */
+ throw newLdapException(CLIENT_SIDE_CONNECT_ERROR, "No operational connection factories available", lastFailure);
+ }
+
+ final String getLoadBalancerName() {
+ return loadBalancerName;
+ }
}
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancerEventListener.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancerEventListener.java
index fb1c1db..724d59d 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancerEventListener.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancerEventListener.java
@@ -20,7 +20,7 @@
*
* CDDL HEADER END
*
- * Copyright 2013-2015 ForgeRock AS.
+ * Copyright 2013-2016 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -40,16 +40,16 @@
* sent at a time. Event listener implementations should not need to be thread
* safe.
*
- * @see LoadBalancingAlgorithm#LOAD_BALANCER_EVENT_LISTENER
+ * @see Connections#LOAD_BALANCER_EVENT_LISTENER
*/
public interface LoadBalancerEventListener extends EventListener {
/**
* An event listener implementation which logs events to the LoadBalancingAlgorithm logger. This event listener is
- * the default implementation configured using the {@link LoadBalancingAlgorithm#LOAD_BALANCER_EVENT_LISTENER}
+ * the default implementation configured using the {@link Connections#LOAD_BALANCER_EVENT_LISTENER}
* option.
*/
LoadBalancerEventListener LOG_EVENTS = new LoadBalancerEventListener() {
- private final LocalizedLogger logger = LocalizedLogger.getLocalizedLogger(LoadBalancingAlgorithm.class);
+ private final LocalizedLogger logger = LocalizedLogger.getLocalizedLogger(LoadBalancer.class);
@Override
public void handleConnectionFactoryOnline(final ConnectionFactory factory) {
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java
deleted file mode 100644
index eebdead..0000000
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancingAlgorithm.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2010 Sun Microsystems, Inc.
- * Portions Copyright 2013-2015 ForgeRock AS.
- */
-
-package org.forgerock.opendj.ldap;
-
-import static org.forgerock.util.time.Duration.duration;
-
-import java.io.Closeable;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.forgerock.util.Option;
-import org.forgerock.util.time.Duration;
-
-/**
- * A load balancing algorithm distributes connection requests across one or more underlying connection factories in an
- * implementation defined manner.
- *
- * @see Connections#newLoadBalancer(LoadBalancingAlgorithm)
- */
-public interface LoadBalancingAlgorithm extends Closeable {
- /**
- * Specifies the interval between successive attempts to reconnect to offline load-balanced connection factories.
- * The default configuration is to attempt to reconnect every second.
- */
- Option<Duration> LOAD_BALANCER_MONITORING_INTERVAL = Option.withDefault(duration("1 seconds"));
-
- /**
- * Specifies the event listener which should be notified whenever a load-balanced connection factory changes state
- * from online to offline or vice-versa. By default events will be logged to the {@code LoadBalancingAlgorithm}
- * logger using the {@link LoadBalancerEventListener#LOG_EVENTS} listener.
- */
- Option<LoadBalancerEventListener> LOAD_BALANCER_EVENT_LISTENER =
- Option.of(LoadBalancerEventListener.class, LoadBalancerEventListener.LOG_EVENTS);
-
- /**
- * Specifies the scheduler which will be used for periodically reconnecting to offline connection factories. A
- * system-wide scheduler will be used by default.
- */
- Option<ScheduledExecutorService> LOAD_BALANCER_SCHEDULER = Option.of(ScheduledExecutorService.class, null);
-
- /**
- * Releases any resources associated with this algorithm, including any associated connection factories.
- */
- @Override
- void close();
-
- /**
- * Returns a connection factory which should be used in order to satisfy the next connection request.
- *
- * @return The connection factory.
- * @throws LdapException
- * If no connection factories are available for use.
- */
- ConnectionFactory getConnectionFactory() throws LdapException;
-}
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java
deleted file mode 100644
index 997fdaf..0000000
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RoundRobinLoadBalancingAlgorithm.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2013-2015 ForgeRock AS.
- */
-
-package org.forgerock.opendj.ldap;
-
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.forgerock.util.Options;
-
-/**
- * A round robin load balancing algorithm distributes connection requests across a list of
- * connection factories one at a time. When the end of the list is reached, the algorithm starts again from the
- * beginning.
- */
-final class RoundRobinLoadBalancingAlgorithm extends AbstractLoadBalancingAlgorithm {
- private final int maxIndex;
- private final AtomicInteger nextIndex = new AtomicInteger(-1);
-
- RoundRobinLoadBalancingAlgorithm(final Collection<? extends ConnectionFactory> factories, final Options options) {
- super(factories, options);
- this.maxIndex = factories.size();
- }
-
- @Override
- String getAlgorithmName() {
- return "RoundRobin";
- }
-
- @Override
- int getInitialConnectionFactoryIndex() {
- // A round robin pool of one connection factories is unlikely in
- // practice and requires special treatment.
- if (maxIndex == 1) {
- return 0;
- }
-
- // Determine the next factory to use: avoid blocking algorithm.
- int oldNextIndex;
- int newNextIndex;
- do {
- oldNextIndex = nextIndex.get();
- newNextIndex = oldNextIndex + 1;
- if (newNextIndex == maxIndex) {
- newNextIndex = 0;
- }
- } while (!nextIndex.compareAndSet(oldNextIndex, newNextIndex));
-
- // There's a potential, but benign, race condition here: other threads
- // could jump in and rotate through the list before we return the
- // connection factory.
- return newNextIndex;
- }
-
-}
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/LoadBalancerTestCase.java
similarity index 94%
rename from opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
rename to opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/LoadBalancerTestCase.java
index d13164a..d7e6ec6 100644
--- a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithmTestCase.java
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/LoadBalancerTestCase.java
@@ -21,18 +21,16 @@
* CDDL HEADER END
*
*
- * Copyright 2014-2015 ForgeRock AS
+ * Copyright 2014-2016 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
import static java.util.Arrays.asList;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
+import static org.forgerock.opendj.ldap.Connections.*;
import static org.forgerock.opendj.ldap.Connections.newRoundRobinLoadBalancer;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
-import static org.forgerock.opendj.ldap.LoadBalancingAlgorithm.LOAD_BALANCER_EVENT_LISTENER;
-import static org.forgerock.opendj.ldap.LoadBalancingAlgorithm.LOAD_BALANCER_MONITORING_INTERVAL;
-import static org.forgerock.opendj.ldap.LoadBalancingAlgorithm.LOAD_BALANCER_SCHEDULER;
import static org.forgerock.util.Options.defaultOptions;
import static org.forgerock.util.promise.Promises.newExceptionPromise;
import static org.forgerock.util.promise.Promises.newResultPromise;
@@ -51,7 +49,7 @@
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
-public class AbstractLoadBalancingAlgorithmTestCase extends SdkTestCase {
+public class LoadBalancerTestCase extends SdkTestCase {
private static ConnectionFactory mockAsync(final ConnectionFactory mock) {
return new ConnectionFactory() {
@Override
diff --git a/opendj-sdk/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java b/opendj-sdk/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
index 10b8ee0..8782d36 100644
--- a/opendj-sdk/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
+++ b/opendj-sdk/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
@@ -11,7 +11,7 @@
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
- * Copyright 2013-2015 ForgeRock AS.
+ * Copyright 2013-2016 ForgeRock AS.
*/
package org.forgerock.opendj.rest2ldap;
@@ -21,7 +21,7 @@
import static org.forgerock.opendj.ldap.Connections.newFailoverLoadBalancer;
import static org.forgerock.opendj.ldap.Connections.newRoundRobinLoadBalancer;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
-import static org.forgerock.opendj.ldap.LoadBalancingAlgorithm.LOAD_BALANCER_MONITORING_INTERVAL;
+import static org.forgerock.opendj.ldap.Connections.LOAD_BALANCER_MONITORING_INTERVAL;
import static org.forgerock.opendj.ldap.requests.Requests.newSearchRequest;
import static org.forgerock.opendj.ldap.schema.CoreSchema.getEntryUUIDAttributeType;
import static org.forgerock.opendj.rest2ldap.ReadOnUpdatePolicy.CONTROLS;
--
Gitblit v1.10.0