From 6af1b61dec13bfbdd371672ef1e37eae63a079f7 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 17 Feb 2016 00:10:18 +0000
Subject: [PATCH] OPENDJSDK-16: implement sharded request load-balancer
---
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java | 104 +++
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java | 134 +++++
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java | 7
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java | 959 ++++++++++++++++++++++++++++++++++++
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java | 260 ++++++++++
opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java | 87 ++
6 files changed, 1,527 insertions(+), 24 deletions(-)
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
index ccffb0d..7bc89f4 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -34,12 +34,28 @@
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
+import org.forgerock.opendj.ldap.requests.GSSAPISASLBindRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
+import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
+import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
+import org.forgerock.util.Function;
import org.forgerock.util.Option;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
+import org.forgerock.util.promise.NeverThrowsException;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.time.Duration;
@@ -413,6 +429,7 @@
* @param options
* This configuration options for the load-balancer.
* @return The new round-robin load balancer.
+ * @see #newShardedRequestLoadBalancer(Collection, Options)
* @see #newFailoverLoadBalancer(Collection, Options)
* @see #LOAD_BALANCER_EVENT_LISTENER
* @see #LOAD_BALANCER_MONITORING_INTERVAL
@@ -480,6 +497,7 @@
* This configuration options for the load-balancer.
* @return The new fail-over load balancer.
* @see #newRoundRobinLoadBalancer(Collection, Options)
+ * @see #newShardedRequestLoadBalancer(Collection, Options)
* @see #LOAD_BALANCER_EVENT_LISTENER
* @see #LOAD_BALANCER_MONITORING_INTERVAL
* @see #LOAD_BALANCER_SCHEDULER
@@ -496,6 +514,122 @@
}
/**
+ * Creates a new "sharded" load-balancer which will load-balance individual requests across the provided set of
+ * connection factories, each typically representing a single replica, using an algorithm that ensures that requests
+ * targeting a given DN will always be routed to the same replica. In other words, this load-balancer increases
+ * consistency whilst maintaining read-scalability by simulating a "single master" replication topology, where each
+ * replica is responsible for a subset of the entries. When a replica is unavailable the load-balancer "fails over"
+ * by performing a linear probe in order to find the next available replica thus ensuring high-availability when a
+ * network partition occurs while sacrificing consistency, since the unavailable replica may still be visible to
+ * other clients.
+ * <p/>
+ * This load-balancer distributes requests based on the hash of their target DN and handles all core operations, as
+ * well as any password modify extended requests and SASL bind requests which use authentication IDs having the
+ * "dn:" form. Note that subtree operations (searches, subtree deletes, and modify DN) are likely to include entries
+ * which are "mastered" on different replicas, so client applications should be more tolerant of inconsistencies.
+ * Requests that are either unrecognized or that do not have a parameter that may be considered to be a target DN
+ * will be routed randomly.
+ * <p/>
+ * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each
+ * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored
+ * since they cannot be routed; connection event listeners can be registered, but will only be notified when the
+ * fake connection is closed or when all of the connection factories are unavailable.
+ * <p/>
+ * <b>NOTE:</b> in deployments where there are multiple client applications, care should be taken to ensure that
+ * the factories are configured using the same ordering, otherwise requests will not be routed consistently
+ * across the client applications.
+ * <p/>
+ * The implementation periodically attempts to connect to failed connection factories in order to determine if they
+ * have become available again.
+ *
+ * @param factories
+ * The connection factories.
+ * @param options
+ * This configuration options for the load-balancer.
+ * @return The new affinity load balancer.
+ * @see #newRoundRobinLoadBalancer(Collection, Options)
+ * @see #newFailoverLoadBalancer(Collection, Options)
+ * @see #LOAD_BALANCER_EVENT_LISTENER
+ * @see #LOAD_BALANCER_MONITORING_INTERVAL
+ * @see #LOAD_BALANCER_SCHEDULER
+ */
+ public static ConnectionFactory newShardedRequestLoadBalancer(
+ final Collection<? extends ConnectionFactory> factories, final Options options) {
+ return new RequestLoadBalancer("ShardedRequestLoadBalancer",
+ factories,
+ options,
+ newShardedRequestLoadBalancerFunction(factories));
+ }
+
+ // Package private for testing.
+ static Function<Request, Integer, NeverThrowsException> newShardedRequestLoadBalancerFunction(
+ final Collection<? extends ConnectionFactory> factories) {
+ return new Function<Request, Integer, NeverThrowsException>() {
+ private final int maxIndex = factories.size();
+
+ @Override
+ public Integer apply(final Request request) {
+ // Normalize the hash to a valid factory index, taking care of negative hash values and especially
+ // Integer.MIN_VALUE (see doc for Math.abs()).
+ final int index = computeIndexBasedOnDnHashCode(request);
+ return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex);
+ }
+
+ private int computeIndexBasedOnDnHashCode(final Request request) {
+ // The following conditions are ordered such that the most common operations appear first in order to
+ // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
+ // would only apply to the core operations, not extended operations or SASL binds.
+ if (request instanceof SearchRequest) {
+ return ((SearchRequest) request).getName().hashCode();
+ } else if (request instanceof ModifyRequest) {
+ return ((ModifyRequest) request).getName().hashCode();
+ } else if (request instanceof SimpleBindRequest) {
+ return hashCodeOfDnString(((SimpleBindRequest) request).getName());
+ } else if (request instanceof AddRequest) {
+ return ((AddRequest) request).getName().hashCode();
+ } else if (request instanceof DeleteRequest) {
+ return ((DeleteRequest) request).getName().hashCode();
+ } else if (request instanceof CompareRequest) {
+ return ((CompareRequest) request).getName().hashCode();
+ } else if (request instanceof ModifyDNRequest) {
+ return ((ModifyDNRequest) request).getName().hashCode();
+ } else if (request instanceof PasswordModifyExtendedRequest) {
+ return hashCodeOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
+ } else if (request instanceof PlainSASLBindRequest) {
+ return hashCodeOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
+ } else if (request instanceof DigestMD5SASLBindRequest) {
+ return hashCodeOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
+ } else if (request instanceof GSSAPISASLBindRequest) {
+ return hashCodeOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
+ } else if (request instanceof CRAMMD5SASLBindRequest) {
+ return hashCodeOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
+ } else {
+ return distributeRequestAtRandom();
+ }
+ }
+
+ private int hashCodeOfAuthzid(final String authzid) {
+ if (authzid != null && authzid.startsWith("dn:")) {
+ return hashCodeOfDnString(authzid.substring(3));
+ }
+ return distributeRequestAtRandom();
+ }
+
+ private int hashCodeOfDnString(final String dnString) {
+ try {
+ return DN.valueOf(dnString).hashCode();
+ } catch (final IllegalArgumentException ignored) {
+ return distributeRequestAtRandom();
+ }
+ }
+
+ private int distributeRequestAtRandom() {
+ return ThreadLocalRandom.current().nextInt(0, maxIndex);
+ }
+ };
+ }
+
+ /**
* Creates a new connection factory which forwards connection requests to
* the provided factory, but whose {@code toString} method will always
* return {@code name}.
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
index eaa3bc1..76ded9f 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
@@ -278,16 +278,13 @@
/**
* Return the first available connection factory starting from {@code initialIndex}.
*
- * @param initialIndex The index of the connection factory to be returned if operational. The index may be
- * greater than the number of factories in which case this method will perform a modulus
- * operation to bring it into range. NOTE: for performance reasons callers should attempt to
- * use valid indexes because the modulus operation is relatively expensive.
+ * @param initialIndex The index of the connection factory to be returned if operational.
* @return The first available connection factory starting from the initial index.
* @throws LdapException If no connection factories are available.
*/
final ConnectionFactory getMonitoredConnectionFactory(final int initialIndex) throws LdapException {
final int maxIndex = monitoredFactories.size();
- int index = initialIndex < maxIndex ? initialIndex : initialIndex % maxIndex;
+ int index = initialIndex;
do {
final MonitoredConnectionFactory factory = monitoredFactories.get(index);
if (factory.isOperational.get()) {
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
new file mode 100644
index 0000000..b00775b
--- /dev/null
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
@@ -0,0 +1,260 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.forgerock.opendj.ldap.spi.LdapPromises.newFailedLdapPromise;
+import static org.forgerock.util.Utils.closeSilently;
+import static org.forgerock.util.promise.Promises.newResultPromise;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.opendj.ldap.spi.ConnectionState;
+import org.forgerock.opendj.ldap.spi.LdapPromises;
+import org.forgerock.util.AsyncFunction;
+import org.forgerock.util.Function;
+import org.forgerock.util.Options;
+import org.forgerock.util.promise.ExceptionHandler;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.forgerock.util.promise.Promise;
+import org.forgerock.util.promise.ResultHandler;
+
+/**
+ * A request based load balancer which load balances individual requests based on properties of the request, such as
+ * the target DN.
+ * <p>
+ * Implementations should override the method {@code getInitialConnectionFactoryIndex()} in order to provide the policy
+ * for selecting the first connection factory to use for each request.
+ */
+final class RequestLoadBalancer extends LoadBalancer {
+ /**
+ * A function which returns the index of the first connection factory which should be used in order to satisfy the
+ * next request. Implementations may base the decision on properties of the provided request, such as the target DN,
+ * whether the request is a read or update request, etc.
+ */
+ private final Function<Request, Integer, NeverThrowsException> nextFactoryFunction;
+
+ RequestLoadBalancer(final String loadBalancerName,
+ final Collection<? extends ConnectionFactory> factories,
+ final Options options,
+ final Function<Request, Integer, NeverThrowsException> nextFactoryFunction) {
+ super(loadBalancerName, factories, options);
+ this.nextFactoryFunction = nextFactoryFunction;
+ }
+
+ @Override
+ public final Connection getConnection() throws LdapException {
+ return new ConnectionImpl();
+ }
+
+ @Override
+ public final Promise<Connection, LdapException> getConnectionAsync() {
+ return newResultPromise((Connection) new ConnectionImpl());
+ }
+
+ private class ConnectionImpl extends AbstractAsynchronousConnection {
+ private final ConnectionState state = new ConnectionState();
+
+ @Override
+ public String toString() {
+ return getLoadBalancerName() + "Connection";
+ }
+
+ @Override
+ public LdapPromise<Void> abandonAsync(final AbandonRequest request) {
+ // We cannot possibly route these correctly, so just drop them.
+ return LdapPromises.newSuccessfulLdapPromise(null);
+ }
+
+ @Override
+ public LdapPromise<Result> addAsync(
+ final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+ return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.addAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public void addConnectionEventListener(final ConnectionEventListener listener) {
+ state.addConnectionEventListener(listener);
+ }
+
+ @Override
+ public LdapPromise<BindResult> bindAsync(
+ final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+ return getConnectionAndSendRequest(request, new AsyncFunction<Connection, BindResult, LdapException>() {
+ @Override
+ public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.bindAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public void close(final UnbindRequest request, final String reason) {
+ state.notifyConnectionClosed();
+ }
+
+ @Override
+ public LdapPromise<CompareResult> compareAsync(
+ final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+ return getConnectionAndSendRequest(request, new AsyncFunction<Connection, CompareResult, LdapException>() {
+ @Override
+ public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.compareAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public LdapPromise<Result> deleteAsync(
+ final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+ return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.deleteAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(
+ final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) {
+ return getConnectionAndSendRequest(request, new AsyncFunction<Connection, R, LdapException>() {
+ @Override
+ public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.extendedRequestAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public boolean isClosed() {
+ return state.isClosed();
+ }
+
+ @Override
+ public boolean isValid() {
+ return state.isValid();
+ }
+
+ @Override
+ public LdapPromise<Result> modifyAsync(
+ final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+ return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.modifyAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public LdapPromise<Result> modifyDNAsync(
+ final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+ return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.modifyDNAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public void removeConnectionEventListener(final ConnectionEventListener listener) {
+ state.removeConnectionEventListener(listener);
+ }
+
+ @Override
+ public LdapPromise<Result> searchAsync(
+ final SearchRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final SearchResultHandler entryHandler) {
+ return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.searchAsync(request, intermediateResponseHandler, entryHandler);
+ }
+ });
+ }
+
+ private <R> LdapPromise<R> getConnectionAndSendRequest(
+ final Request request, final AsyncFunction<Connection, R, LdapException> sendRequest) {
+ if (state.isClosed()) {
+ throw new IllegalStateException();
+ }
+ final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
+ return getConnectionAsync(request)
+ .thenOnResult(new ResultHandler<Connection>() {
+ @Override
+ public void handleResult(final Connection connection) {
+ connectionHolder.set(connection);
+ }
+ })
+ .thenAsync(sendRequest)
+ .thenFinally(new Runnable() {
+ @Override
+ public void run() {
+ closeSilently(connectionHolder.get());
+ }
+ });
+ }
+
+ private LdapPromise<Connection> getConnectionAsync(final Request request) {
+ try {
+ final int index = nextFactoryFunction.apply(request);
+ final ConnectionFactory factory = getMonitoredConnectionFactory(index);
+ return LdapPromises.asPromise(factory.getConnectionAsync()
+ .thenOnException(new ExceptionHandler<LdapException>() {
+ @Override
+ public void handleException(final LdapException e) {
+ state.notifyConnectionError(false, e);
+ }
+ }));
+ } catch (final LdapException e) {
+ state.notifyConnectionError(false, e);
+ return newFailedLdapPromise(e);
+ }
+ }
+ }
+}
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
index ff164c2..e589ce8 100644
--- a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
@@ -21,14 +21,34 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2016 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.Connections.newShardedRequestLoadBalancerFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
+import org.forgerock.opendj.ldap.requests.GSSAPISASLBindRequest;
+import org.forgerock.opendj.ldap.requests.GenericExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
+import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
+import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
@@ -57,4 +77,86 @@
uncloseable.close(null, null);
verifyZeroInteractions(connection);
}
+
+ @Test
+ public void shardedRequestLoadBalancerUsesConsistentIndexing() {
+ final Function<Request, Integer, NeverThrowsException> f =
+ newShardedRequestLoadBalancerFunction(asList(mock(ConnectionFactory.class),
+ mock(ConnectionFactory.class)));
+
+ // These two DNs have a different hash code.
+ final DN dn1 = DN.valueOf("cn=target1,dc=example,dc=com");
+ final DN dn2 = DN.valueOf("cn=target2,dc=example,dc=com");
+
+ final AddRequest addRequest = mock(AddRequest.class);
+ when(addRequest.getName()).thenReturn(dn1, dn2);
+ final int dn1index = index(f, addRequest);
+ final int dn2index = index(f, addRequest);
+ assertThat(dn1index).isNotEqualTo(dn2index);
+
+ final SimpleBindRequest simpleBindRequest = mock(SimpleBindRequest.class);
+ when(simpleBindRequest.getName()).thenReturn(dn1.toString(), dn2.toString());
+ assertRequestsAreRoutedConsistently(f, simpleBindRequest, dn1index, dn2index);
+
+ final CompareRequest compareRequest = mock(CompareRequest.class);
+ when(compareRequest.getName()).thenReturn(dn1, dn2);
+ assertRequestsAreRoutedConsistently(f, compareRequest, dn1index, dn2index);
+
+ final DeleteRequest deleteRequest = mock(DeleteRequest.class);
+ when(deleteRequest.getName()).thenReturn(dn1, dn2);
+ assertRequestsAreRoutedConsistently(f, deleteRequest, dn1index, dn2index);
+
+ final ModifyRequest modifyRequest = mock(ModifyRequest.class);
+ when(modifyRequest.getName()).thenReturn(dn1, dn2);
+ assertRequestsAreRoutedConsistently(f, modifyRequest, dn1index, dn2index);
+
+ final ModifyDNRequest modifyDNRequest = mock(ModifyDNRequest.class);
+ when(modifyDNRequest.getName()).thenReturn(dn1, dn2);
+ assertRequestsAreRoutedConsistently(f, modifyDNRequest, dn1index, dn2index);
+
+ final SearchRequest searchRequest = mock(SearchRequest.class);
+ when(searchRequest.getName()).thenReturn(dn1, dn2);
+ assertRequestsAreRoutedConsistently(f, searchRequest, dn1index, dn2index);
+
+ // Authzid based operations.
+ final String authzid1 = "dn:" + dn1.toString();
+ final String authzid2 = "dn:" + dn2.toString();
+
+ final PasswordModifyExtendedRequest passwordModifyRequest = mock(PasswordModifyExtendedRequest.class);
+ when(passwordModifyRequest.getUserIdentityAsString()).thenReturn(authzid1, authzid2);
+ assertRequestsAreRoutedConsistently(f, passwordModifyRequest, dn1index, dn2index);
+
+ final PlainSASLBindRequest plainSASLBindRequest = mock(PlainSASLBindRequest.class);
+ when(plainSASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
+ assertRequestsAreRoutedConsistently(f, plainSASLBindRequest, dn1index, dn2index);
+
+ final CRAMMD5SASLBindRequest cramMD5SASLBindRequest = mock(CRAMMD5SASLBindRequest.class);
+ when(cramMD5SASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
+ assertRequestsAreRoutedConsistently(f, cramMD5SASLBindRequest, dn1index, dn2index);
+
+ final DigestMD5SASLBindRequest digestMD5SASLBindRequest = mock(DigestMD5SASLBindRequest.class);
+ when(digestMD5SASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
+ assertRequestsAreRoutedConsistently(f, digestMD5SASLBindRequest, dn1index, dn2index);
+
+ final GSSAPISASLBindRequest gssapiSASLBindRequest = mock(GSSAPISASLBindRequest.class);
+ when(gssapiSASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
+ assertRequestsAreRoutedConsistently(f, gssapiSASLBindRequest, dn1index, dn2index);
+
+ // Requests that have no target will return a random index, but since we only have one factory the index will
+ // always be 0.
+ final GenericExtendedRequest genericExtendedRequest = mock(GenericExtendedRequest.class);
+ assertThat(index(f, genericExtendedRequest)).isBetween(0, 1);
+ }
+
+ private void assertRequestsAreRoutedConsistently(final Function<Request, Integer, NeverThrowsException> f,
+ final Request r,
+ final int firstExpectedIndex,
+ final int secondExpectedIndex) {
+ assertThat(index(f, r)).isEqualTo(firstExpectedIndex);
+ assertThat(index(f, r)).isEqualTo(secondExpectedIndex);
+ }
+
+ private int index(final Function<Request, Integer, NeverThrowsException> function, final Request request) {
+ return function.apply(request);
+ }
}
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
new file mode 100644
index 0000000..6500ff4
--- /dev/null
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
@@ -0,0 +1,959 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.forgerock.opendj.ldap.LdapException.newLdapException;
+import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_CONNECT_ERROR;
+import static org.forgerock.opendj.ldap.responses.Responses.newBindResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newCompareResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newResult;
+import static org.forgerock.opendj.ldap.spi.LdapPromises.newSuccessfulLdapPromise;
+import static org.forgerock.util.Options.defaultOptions;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNotNull;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.util.ArrayList;
+import java.util.logging.Level;
+
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.GenericExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.forgerock.util.promise.Promises;
+import org.mockito.Mock;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+public class RequestLoadBalancerTestCase extends SdkTestCase {
+ @Test
+ public void closeLoadBalancerShouldCloseDelegateFactories() throws Exception {
+ configureAllFactoriesOnline();
+ loadBalancer.close();
+ verify(factory1).close();
+ verify(factory2).close();
+ verify(factory3).close();
+ }
+
+ @Test
+ public void getConnectionShouldNotInvokeDelegateFactory() throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnection()) {
+ assertThat(connection).isNotNull();
+ verifyZeroInteractions(factory1, factory2, factory3);
+ }
+ verifyZeroInteractions(factory1, factory2, factory3);
+ loadBalancer.close();
+ }
+
+ @Test
+ public void getConnectionReturnANewConnectionEachTime() throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection1 = loadBalancer.getConnection();
+ Connection connection2 = loadBalancer.getConnection()) {
+ assertThat(connection1).isNotSameAs(connection2);
+ }
+ loadBalancer.close();
+ }
+
+ @Test
+ public void getConnectionAsyncReturnANewConnectionEachTime() throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection1 = loadBalancer.getConnectionAsync().get();
+ Connection connection2 = loadBalancer.getConnectionAsync().get()) {
+ assertThat(connection1).isNotSameAs(connection2);
+ }
+ loadBalancer.close();
+ }
+
+ @Test
+ public void getConnectionAsyncShouldNotInvokeDelegateFactory() throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ assertThat(connection).isNotNull();
+ verifyZeroInteractions(factory1, factory2, factory3);
+ }
+ verifyZeroInteractions(factory1, factory2, factory3);
+ }
+
+ @Test
+ public void connectionEventListenersNotifiedOnClose() throws Exception {
+ configureAllFactoriesOnline();
+ final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.addConnectionEventListener(listener);
+ }
+ verify(listener).handleConnectionClosed();
+ }
+
+ @Test
+ public void connectionEventListenersNotifiedOnError() throws Exception {
+ configureAllFactoriesOffline();
+ final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.addConnectionEventListener(listener);
+ try {
+ connection.add(addRequest1);
+ fail("add unexpectedly succeeded");
+ } catch (LdapException ignored) {
+ // Ignore.
+ }
+ }
+ verify(listener).handleConnectionError(eq(false), any(LdapException.class));
+ verify(listener).handleConnectionClosed();
+ }
+
+ @Test
+ public void removedConnectionEventListenersShouldNotBeNotified() throws Exception {
+ configureAllFactoriesOnline();
+ final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.addConnectionEventListener(listener);
+ connection.removeConnectionEventListener(listener);
+ }
+ verifyZeroInteractions(listener);
+ }
+
+ @Test
+ public void validAndClosedStateShouldBeMaintained() throws Exception {
+ configureAllFactoriesOffline();
+ final Connection connection = loadBalancer.getConnectionAsync().get();
+ assertThat(connection.isValid()).isTrue();
+ assertThat(connection.isClosed()).isFalse();
+
+ try {
+ connection.add(addRequest1);
+ fail("add unexpectedly succeeded");
+ } catch (LdapException ignored) {
+ // Ignore.
+ }
+ assertThat(connection.isValid()).isFalse();
+ assertThat(connection.isClosed()).isFalse();
+
+ connection.close();
+ assertThat(connection.isValid()).isFalse();
+ assertThat(connection.isClosed()).isTrue();
+ }
+
+ @Test
+ public void factoryToStringShouldReturnANonEmptyString() throws Exception {
+ configureAllFactoriesOffline();
+ assertThat(loadBalancer.toString()).isNotEmpty();
+ }
+
+ @Test
+ public void connectionToStringShouldReturnANonEmptyString() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ assertThat(connection.toString()).isNotEmpty();
+ }
+ }
+
+ @Test
+ public void abandonRequestShouldBeIgnored() throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.abandonAsync(mock(AbandonRequest.class));
+ }
+ verifyZeroInteractions(factory1, factory2, factory3);
+ }
+
+ // We can't use a DataProviders here because the mocks will be re-initialized for each test method call.
+
+ // ################## Add Requests ####################
+
+ @Test
+ public void addRequestsShouldBeRoutedCorrectly1() throws Exception {
+ addRequestsShouldBeRoutedCorrectlyImpl(addRequest1, factory1, connection1);
+ }
+
+ @Test
+ public void addRequestsShouldBeRoutedCorrectly2() throws Exception {
+ addRequestsShouldBeRoutedCorrectlyImpl(addRequest2, factory2, connection2);
+ }
+
+ @Test
+ public void addRequestsShouldBeRoutedCorrectly3() throws Exception {
+ addRequestsShouldBeRoutedCorrectlyImpl(addRequest3, factory3, connection3);
+ }
+
+ private void addRequestsShouldBeRoutedCorrectlyImpl(final AddRequest addRequest,
+ final ConnectionFactory expectedFactory,
+ final Connection expectedConnection) throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.add(addRequest);
+ }
+ verify(expectedFactory).getConnectionAsync();
+ verify(expectedConnection).addAsync(same(addRequest), isNull(IntermediateResponseHandler.class));
+ verify(expectedConnection).close();
+ verifyZeroInteractionsForRemainingFactories(expectedFactory);
+ }
+
+ @Test
+ public void addRequestsShouldLinearProbeOnFailure1() throws Exception {
+ addRequestsShouldLinearProbeOnFailureImpl(addRequest1);
+ }
+
+ @Test
+ public void addRequestsShouldLinearProbeOnFailure2() throws Exception {
+ addRequestsShouldLinearProbeOnFailureImpl(addRequest2);
+ }
+
+ @Test
+ public void addRequestsShouldLinearProbeOnFailure3() throws Exception {
+ addRequestsShouldLinearProbeOnFailureImpl(addRequest3);
+ }
+
+ private void addRequestsShouldLinearProbeOnFailureImpl(final AddRequest addRequest) throws Exception {
+ configureFactoriesOneAndTwoOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.add(addRequest);
+ }
+ verify(factory3).getConnectionAsync();
+ verify(connection3).addAsync(same(addRequest), isNull(IntermediateResponseHandler.class));
+ verify(connection3).close();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ }
+
+ @Test
+ public void addRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ try {
+ connection.add(addRequest1);
+ } catch (ConnectionException e) {
+ assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+ }
+ }
+ verify(factory1, atLeastOnce()).getConnectionAsync();
+ verify(factory2, atLeastOnce()).getConnectionAsync();
+ verify(factory3, atLeastOnce()).getConnectionAsync();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ verifyZeroInteractions(connection3);
+ }
+
+ // ################## Bind Requests ####################
+
+ @Test
+ public void bindRequestsShouldBeRoutedCorrectly1() throws Exception {
+ bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest1, factory1, connection1);
+ }
+
+ @Test
+ public void bindRequestsShouldBeRoutedCorrectly2() throws Exception {
+ bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest2, factory2, connection2);
+ }
+
+ @Test
+ public void bindRequestsShouldBeRoutedCorrectly3() throws Exception {
+ bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest3, factory3, connection3);
+ }
+
+ private void bindRequestsShouldBeRoutedCorrectlyImpl(final BindRequest bindRequest,
+ final ConnectionFactory expectedFactory,
+ final Connection expectedConnection) throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.bind(bindRequest);
+ }
+ verify(expectedFactory).getConnectionAsync();
+ verify(expectedConnection).bindAsync(same(bindRequest), isNull(IntermediateResponseHandler.class));
+ verify(expectedConnection).close();
+ verifyZeroInteractionsForRemainingFactories(expectedFactory);
+ }
+
+ @Test
+ public void bindRequestsShouldLinearProbeOnFailure1() throws Exception {
+ bindRequestsShouldLinearProbeOnFailureImpl(bindRequest1);
+ }
+
+ @Test
+ public void bindRequestsShouldLinearProbeOnFailure2() throws Exception {
+ bindRequestsShouldLinearProbeOnFailureImpl(bindRequest2);
+ }
+
+ @Test
+ public void bindRequestsShouldLinearProbeOnFailure3() throws Exception {
+ bindRequestsShouldLinearProbeOnFailureImpl(bindRequest3);
+ }
+
+ private void bindRequestsShouldLinearProbeOnFailureImpl(final BindRequest bindRequest) throws Exception {
+ configureFactoriesOneAndTwoOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.bind(bindRequest);
+ }
+ verify(factory3).getConnectionAsync();
+ verify(connection3).bindAsync(same(bindRequest), isNull(IntermediateResponseHandler.class));
+ verify(connection3).close();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ }
+
+ @Test
+ public void bindRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ try {
+ connection.bind(bindRequest1);
+ } catch (ConnectionException e) {
+ assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+ }
+ }
+ verify(factory1, atLeastOnce()).getConnectionAsync();
+ verify(factory2, atLeastOnce()).getConnectionAsync();
+ verify(factory3, atLeastOnce()).getConnectionAsync();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ verifyZeroInteractions(connection3);
+ }
+
+ // ################## Compare Requests ####################
+
+ @Test
+ public void compareRequestsShouldBeRoutedCorrectly1() throws Exception {
+ compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest1, factory1, connection1);
+ }
+
+ @Test
+ public void compareRequestsShouldBeRoutedCorrectly2() throws Exception {
+ compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest2, factory2, connection2);
+ }
+
+ @Test
+ public void compareRequestsShouldBeRoutedCorrectly3() throws Exception {
+ compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest3, factory3, connection3);
+ }
+
+ private void compareRequestsShouldBeRoutedCorrectlyImpl(final CompareRequest compareRequest,
+ final ConnectionFactory expectedFactory,
+ final Connection expectedConnection) throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.compare(compareRequest);
+ }
+ verify(expectedFactory).getConnectionAsync();
+ verify(expectedConnection).compareAsync(same(compareRequest), isNull(IntermediateResponseHandler.class));
+ verify(expectedConnection).close();
+ verifyZeroInteractionsForRemainingFactories(expectedFactory);
+ }
+
+ @Test
+ public void compareRequestsShouldLinearProbeOnFailure1() throws Exception {
+ compareRequestsShouldLinearProbeOnFailureImpl(compareRequest1);
+ }
+
+ @Test
+ public void compareRequestsShouldLinearProbeOnFailure2() throws Exception {
+ compareRequestsShouldLinearProbeOnFailureImpl(compareRequest2);
+ }
+
+ @Test
+ public void compareRequestsShouldLinearProbeOnFailure3() throws Exception {
+ compareRequestsShouldLinearProbeOnFailureImpl(compareRequest3);
+ }
+
+ private void compareRequestsShouldLinearProbeOnFailureImpl(final CompareRequest compareRequest) throws Exception {
+ configureFactoriesOneAndTwoOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.compare(compareRequest);
+ }
+ verify(factory3).getConnectionAsync();
+ verify(connection3).compareAsync(same(compareRequest), isNull(IntermediateResponseHandler.class));
+ verify(connection3).close();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ }
+
+ @Test
+ public void compareRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ try {
+ connection.compare(compareRequest1);
+ } catch (ConnectionException e) {
+ assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+ }
+ }
+ verify(factory1, atLeastOnce()).getConnectionAsync();
+ verify(factory2, atLeastOnce()).getConnectionAsync();
+ verify(factory3, atLeastOnce()).getConnectionAsync();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ verifyZeroInteractions(connection3);
+ }
+
+ // ################## Delete Requests ####################
+
+ @Test
+ public void deleteRequestsShouldBeRoutedCorrectly1() throws Exception {
+ deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest1, factory1, connection1);
+ }
+
+ @Test
+ public void deleteRequestsShouldBeRoutedCorrectly2() throws Exception {
+ deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest2, factory2, connection2);
+ }
+
+ @Test
+ public void deleteRequestsShouldBeRoutedCorrectly3() throws Exception {
+ deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest3, factory3, connection3);
+ }
+
+ private void deleteRequestsShouldBeRoutedCorrectlyImpl(final DeleteRequest deleteRequest,
+ final ConnectionFactory expectedFactory,
+ final Connection expectedConnection) throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.delete(deleteRequest);
+ }
+ verify(expectedFactory).getConnectionAsync();
+ verify(expectedConnection).deleteAsync(same(deleteRequest), isNull(IntermediateResponseHandler.class));
+ verify(expectedConnection).close();
+ verifyZeroInteractionsForRemainingFactories(expectedFactory);
+ }
+
+ @Test
+ public void deleteRequestsShouldLinearProbeOnFailure1() throws Exception {
+ deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest1);
+ }
+
+ @Test
+ public void deleteRequestsShouldLinearProbeOnFailure2() throws Exception {
+ deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest2);
+ }
+
+ @Test
+ public void deleteRequestsShouldLinearProbeOnFailure3() throws Exception {
+ deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest3);
+ }
+
+ private void deleteRequestsShouldLinearProbeOnFailureImpl(final DeleteRequest deleteRequest) throws Exception {
+ configureFactoriesOneAndTwoOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.delete(deleteRequest);
+ }
+ verify(factory3).getConnectionAsync();
+ verify(connection3).deleteAsync(same(deleteRequest), isNull(IntermediateResponseHandler.class));
+ verify(connection3).close();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ }
+
+ @Test
+ public void deleteRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ try {
+ connection.delete(deleteRequest1);
+ } catch (ConnectionException e) {
+ assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+ }
+ }
+ verify(factory1, atLeastOnce()).getConnectionAsync();
+ verify(factory2, atLeastOnce()).getConnectionAsync();
+ verify(factory3, atLeastOnce()).getConnectionAsync();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ verifyZeroInteractions(connection3);
+ }
+
+ // ################## Extended Requests ####################
+
+ @Test
+ public void extendedRequestsShouldBeRoutedCorrectly1() throws Exception {
+ extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest1, factory1, connection1);
+ }
+
+ @Test
+ public void extendedRequestsShouldBeRoutedCorrectly2() throws Exception {
+ extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest2, factory2, connection2);
+ }
+
+ @Test
+ public void extendedRequestsShouldBeRoutedCorrectly3() throws Exception {
+ extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest3, factory3, connection3);
+ }
+
+ private void extendedRequestsShouldBeRoutedCorrectlyImpl(final ExtendedRequest<?> extendedRequest,
+ final ConnectionFactory expectedFactory,
+ final Connection expectedConnection) throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.extendedRequest(extendedRequest);
+ }
+ verify(expectedFactory).getConnectionAsync();
+ verify(expectedConnection).extendedRequestAsync(same(extendedRequest),
+ isNull(IntermediateResponseHandler.class));
+ verify(expectedConnection).close();
+ verifyZeroInteractionsForRemainingFactories(expectedFactory);
+ }
+
+ @Test
+ public void extendedRequestsShouldLinearProbeOnFailure1() throws Exception {
+ extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest1);
+ }
+
+ @Test
+ public void extendedRequestsShouldLinearProbeOnFailure2() throws Exception {
+ extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest2);
+ }
+
+ @Test
+ public void extendedRequestsShouldLinearProbeOnFailure3() throws Exception {
+ extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest3);
+ }
+
+ private void extendedRequestsShouldLinearProbeOnFailureImpl(
+ final ExtendedRequest<?> extendedRequest) throws Exception {
+ configureFactoriesOneAndTwoOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.extendedRequest(extendedRequest);
+ }
+ verify(factory3).getConnectionAsync();
+ verify(connection3).extendedRequestAsync(same(extendedRequest), isNull(IntermediateResponseHandler.class));
+ verify(connection3).close();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ }
+
+ @Test
+ public void extendedRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ try {
+ connection.extendedRequest(extendedRequest1);
+ } catch (ConnectionException e) {
+ assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+ }
+ }
+ verify(factory1, atLeastOnce()).getConnectionAsync();
+ verify(factory2, atLeastOnce()).getConnectionAsync();
+ verify(factory3, atLeastOnce()).getConnectionAsync();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ verifyZeroInteractions(connection3);
+ }
+
+ // ################## Modify Requests ####################
+
+ @Test
+ public void modifyRequestsShouldBeRoutedCorrectly1() throws Exception {
+ modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest1, factory1, connection1);
+ }
+
+ @Test
+ public void modifyRequestsShouldBeRoutedCorrectly2() throws Exception {
+ modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest2, factory2, connection2);
+ }
+
+ @Test
+ public void modifyRequestsShouldBeRoutedCorrectly3() throws Exception {
+ modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest3, factory3, connection3);
+ }
+
+ private void modifyRequestsShouldBeRoutedCorrectlyImpl(final ModifyRequest modifyRequest,
+ final ConnectionFactory expectedFactory,
+ final Connection expectedConnection) throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.modify(modifyRequest);
+ }
+ verify(expectedFactory).getConnectionAsync();
+ verify(expectedConnection).modifyAsync(same(modifyRequest), isNull(IntermediateResponseHandler.class));
+ verify(expectedConnection).close();
+ verifyZeroInteractionsForRemainingFactories(expectedFactory);
+ }
+
+ @Test
+ public void modifyRequestsShouldLinearProbeOnFailure1() throws Exception {
+ modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest1);
+ }
+
+ @Test
+ public void modifyRequestsShouldLinearProbeOnFailure2() throws Exception {
+ modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest2);
+ }
+
+ @Test
+ public void modifyRequestsShouldLinearProbeOnFailure3() throws Exception {
+ modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest3);
+ }
+
+ private void modifyRequestsShouldLinearProbeOnFailureImpl(final ModifyRequest modifyRequest) throws Exception {
+ configureFactoriesOneAndTwoOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.modify(modifyRequest);
+ }
+ verify(factory3).getConnectionAsync();
+ verify(connection3).modifyAsync(same(modifyRequest), isNull(IntermediateResponseHandler.class));
+ verify(connection3).close();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ }
+
+ @Test
+ public void modifyRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ try {
+ connection.modify(modifyRequest1);
+ } catch (ConnectionException e) {
+ assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+ }
+ }
+ verify(factory1, atLeastOnce()).getConnectionAsync();
+ verify(factory2, atLeastOnce()).getConnectionAsync();
+ verify(factory3, atLeastOnce()).getConnectionAsync();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ verifyZeroInteractions(connection3);
+ }
+
+ // ################## ModifyDN Requests ####################
+
+ @Test
+ public void modifyDNRequestsShouldBeRoutedCorrectly1() throws Exception {
+ modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest1, factory1, connection1);
+ }
+
+ @Test
+ public void modifyDNRequestsShouldBeRoutedCorrectly2() throws Exception {
+ modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest2, factory2, connection2);
+ }
+
+ @Test
+ public void modifyDNRequestsShouldBeRoutedCorrectly3() throws Exception {
+ modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest3, factory3, connection3);
+ }
+
+ private void modifyDNRequestsShouldBeRoutedCorrectlyImpl(final ModifyDNRequest modifyDNRequest,
+ final ConnectionFactory expectedFactory,
+ final Connection expectedConnection) throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.modifyDN(modifyDNRequest);
+ }
+ verify(expectedFactory).getConnectionAsync();
+ verify(expectedConnection).modifyDNAsync(same(modifyDNRequest), isNull(IntermediateResponseHandler.class));
+ verify(expectedConnection).close();
+ verifyZeroInteractionsForRemainingFactories(expectedFactory);
+ }
+
+ @Test
+ public void modifyDNRequestsShouldLinearProbeOnFailure1() throws Exception {
+ modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest1);
+ }
+
+ @Test
+ public void modifyDNRequestsShouldLinearProbeOnFailure2() throws Exception {
+ modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest2);
+ }
+
+ @Test
+ public void modifyDNRequestsShouldLinearProbeOnFailure3() throws Exception {
+ modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest3);
+ }
+
+ private void modifyDNRequestsShouldLinearProbeOnFailureImpl(
+ final ModifyDNRequest modifyDNRequest) throws Exception {
+ configureFactoriesOneAndTwoOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.modifyDN(modifyDNRequest);
+ }
+ verify(factory3).getConnectionAsync();
+ verify(connection3).modifyDNAsync(same(modifyDNRequest), isNull(IntermediateResponseHandler.class));
+ verify(connection3).close();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ }
+
+ @Test
+ public void modifyDNRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ try {
+ connection.modifyDN(modifyDNRequest1);
+ } catch (ConnectionException e) {
+ assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+ }
+ }
+ verify(factory1, atLeastOnce()).getConnectionAsync();
+ verify(factory2, atLeastOnce()).getConnectionAsync();
+ verify(factory3, atLeastOnce()).getConnectionAsync();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ verifyZeroInteractions(connection3);
+ }
+
+ // ################## Search Requests ####################
+
+ @Test
+ public void searchRequestsShouldBeRoutedCorrectly1() throws Exception {
+ searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest1, factory1, connection1);
+ }
+
+ @Test
+ public void searchRequestsShouldBeRoutedCorrectly2() throws Exception {
+ searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest2, factory2, connection2);
+ }
+
+ @Test
+ public void searchRequestsShouldBeRoutedCorrectly3() throws Exception {
+ searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest3, factory3, connection3);
+ }
+
+ private void searchRequestsShouldBeRoutedCorrectlyImpl(final SearchRequest searchRequest,
+ final ConnectionFactory expectedFactory,
+ final Connection expectedConnection) throws Exception {
+ configureAllFactoriesOnline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.search(searchRequest, new ArrayList<>());
+ }
+ verify(expectedFactory).getConnectionAsync();
+ verify(expectedConnection).searchAsync(same(searchRequest),
+ isNull(IntermediateResponseHandler.class),
+ isNotNull(SearchResultHandler.class));
+ verify(expectedConnection).close();
+ verifyZeroInteractionsForRemainingFactories(expectedFactory);
+ }
+
+ @Test
+ public void searchRequestsShouldLinearProbeOnFailure1() throws Exception {
+ searchRequestsShouldLinearProbeOnFailureImpl(searchRequest1);
+ }
+
+ @Test
+ public void searchRequestsShouldLinearProbeOnFailure2() throws Exception {
+ searchRequestsShouldLinearProbeOnFailureImpl(searchRequest2);
+ }
+
+ @Test
+ public void searchRequestsShouldLinearProbeOnFailure3() throws Exception {
+ searchRequestsShouldLinearProbeOnFailureImpl(searchRequest3);
+ }
+
+ private void searchRequestsShouldLinearProbeOnFailureImpl(final SearchRequest searchRequest) throws Exception {
+ configureFactoriesOneAndTwoOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ connection.search(searchRequest, new ArrayList<>());
+ }
+ verify(factory3).getConnectionAsync();
+ verify(connection3).searchAsync(same(searchRequest),
+ isNull(IntermediateResponseHandler.class),
+ isNotNull(SearchResultHandler.class));
+ verify(connection3).close();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ }
+
+ @Test
+ public void searchRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+ configureAllFactoriesOffline();
+ try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+ try {
+ connection.search(searchRequest1, new ArrayList<>());
+ } catch (ConnectionException e) {
+ assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+ }
+ }
+ verify(factory1, atLeastOnce()).getConnectionAsync();
+ verify(factory2, atLeastOnce()).getConnectionAsync();
+ verify(factory3, atLeastOnce()).getConnectionAsync();
+ verifyZeroInteractions(connection1);
+ verifyZeroInteractions(connection2);
+ verifyZeroInteractions(connection3);
+ }
+
+ @Mock private ConnectionFactory factory1;
+ @Mock private ConnectionFactory factory2;
+ @Mock private ConnectionFactory factory3;
+
+ @Mock private AbstractAsynchronousConnection connection1;
+ @Mock private AbstractAsynchronousConnection connection2;
+ @Mock private AbstractAsynchronousConnection connection3;
+
+ @Mock private AddRequest addRequest1;
+ @Mock private AddRequest addRequest2;
+ @Mock private AddRequest addRequest3;
+
+ @Mock private BindRequest bindRequest1;
+ @Mock private BindRequest bindRequest2;
+ @Mock private BindRequest bindRequest3;
+
+ @Mock private CompareRequest compareRequest1;
+ @Mock private CompareRequest compareRequest2;
+ @Mock private CompareRequest compareRequest3;
+
+ @Mock private DeleteRequest deleteRequest1;
+ @Mock private DeleteRequest deleteRequest2;
+ @Mock private DeleteRequest deleteRequest3;
+
+ @Mock private GenericExtendedRequest extendedRequest1;
+ @Mock private GenericExtendedRequest extendedRequest2;
+ @Mock private GenericExtendedRequest extendedRequest3;
+
+ @Mock private ModifyRequest modifyRequest1;
+ @Mock private ModifyRequest modifyRequest2;
+ @Mock private ModifyRequest modifyRequest3;
+
+ @Mock private ModifyDNRequest modifyDNRequest1;
+ @Mock private ModifyDNRequest modifyDNRequest2;
+ @Mock private ModifyDNRequest modifyDNRequest3;
+
+ @Mock private SearchRequest searchRequest1;
+ @Mock private SearchRequest searchRequest2;
+ @Mock private SearchRequest searchRequest3;
+
+ private ConnectionFactory loadBalancer;
+
+ @BeforeMethod
+ public void beforeMethod() {
+ TestCaseUtils.setDefaultLogLevel(Level.SEVERE);
+ initMocks(this);
+ stub(this.connection1);
+ stub(this.connection2);
+ stub(this.connection3);
+ loadBalancer = new RequestLoadBalancer("Test",
+ asList(factory1, factory2, factory3),
+ defaultOptions(), newNextFactoryFunction());
+ }
+
+ private Function<Request, Integer, NeverThrowsException> newNextFactoryFunction() {
+ return new Function<Request, Integer, NeverThrowsException>() {
+ @Override
+ public Integer apply(final Request request) {
+ if (request == addRequest1 || request == bindRequest1 || request == compareRequest1
+ || request == deleteRequest1 || request == extendedRequest1 || request == modifyRequest1
+ || request == modifyDNRequest1 || request == searchRequest1) {
+ return 0;
+ }
+
+ if (request == addRequest2 || request == bindRequest2 || request == compareRequest2
+ || request == deleteRequest2 || request == extendedRequest2 || request == modifyRequest2
+ || request == modifyDNRequest2 || request == searchRequest2) {
+ return 1;
+ }
+
+ if (request == addRequest3 || request == bindRequest3 || request == compareRequest3
+ || request == deleteRequest3 || request == extendedRequest3 || request == modifyRequest3
+ || request == modifyDNRequest3 || request == searchRequest3) {
+ return 2;
+ }
+
+ fail("Received unexpected request");
+ return -1; // Keep compiler happy.
+ }
+ };
+ }
+
+ private void stub(final Connection connection) {
+ when(connection.addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class)))
+ .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+ when(connection.bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class)))
+ .thenReturn(newSuccessfulLdapPromise(newBindResult(ResultCode.SUCCESS)));
+ when(connection.compareAsync(any(CompareRequest.class), any(IntermediateResponseHandler.class)))
+ .thenReturn(newSuccessfulLdapPromise(newCompareResult(ResultCode.SUCCESS)));
+ when(connection.deleteAsync(any(DeleteRequest.class), any(IntermediateResponseHandler.class)))
+ .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+ when(connection.extendedRequestAsync(any(GenericExtendedRequest.class), any(IntermediateResponseHandler.class)))
+ .thenReturn(newSuccessfulLdapPromise(newGenericExtendedResult(ResultCode.SUCCESS)));
+ when(connection.modifyAsync(any(ModifyRequest.class), any(IntermediateResponseHandler.class)))
+ .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+ when(connection.modifyDNAsync(any(ModifyDNRequest.class), any(IntermediateResponseHandler.class)))
+ .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+ when(connection.searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class)))
+ .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+ loadBalancer.close();
+ TestCaseUtils.setDefaultLogLevel(Level.INFO);
+ }
+
+ private void configureAllFactoriesOnline() {
+ when(factory1.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection1));
+ when(factory2.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection2));
+ when(factory3.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection3));
+ }
+
+ private void configureFactoriesOneAndTwoOffline() {
+ final LdapException connectionFailure = newLdapException(CLIENT_SIDE_CONNECT_ERROR);
+ when(factory1.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+ when(factory2.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+ when(factory3.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection3));
+ }
+
+ private void configureAllFactoriesOffline() {
+ final LdapException connectionFailure = newLdapException(CLIENT_SIDE_CONNECT_ERROR);
+ when(factory1.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+ when(factory2.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+ when(factory3.getConnectionAsync())
+ .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+ }
+
+ private void verifyZeroInteractionsForRemainingFactories(final ConnectionFactory expectedFactory) {
+ if (expectedFactory != factory1) {
+ verifyZeroInteractions(factory1);
+ }
+ if (expectedFactory != factory2) {
+ verifyZeroInteractions(factory2);
+ }
+ if (expectedFactory != factory3) {
+ verifyZeroInteractions(factory3);
+ }
+ }
+}
diff --git a/opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java b/opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
index 30177b6..661ef36 100644
--- a/opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
+++ b/opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -22,26 +22,27 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS.
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.forgerock.opendj.examples;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.AUTHN_BIND_REQUEST;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.HEARTBEAT_ENABLED;
-import static org.forgerock.opendj.ldap.LDAPListener.*;
+import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import java.io.IOException;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.forgerock.opendj.ldap.ConnectionFactory;
import org.forgerock.opendj.ldap.Connections;
-import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPConnectionFactory;
import org.forgerock.opendj.ldap.LDAPListener;
+import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.RequestContext;
import org.forgerock.opendj.ldap.RequestHandlerFactory;
import org.forgerock.opendj.ldap.ServerConnectionFactory;
@@ -61,33 +62,44 @@
* This example takes the following command line parameters:
*
* <pre>
- * {@code <listenAddress> <listenPort> <proxyDN> <proxyPassword> <remoteAddress1> <remotePort1>
- * [<remoteAddress2> <remotePort2> ...]}
+ * {@code [--load-balancer <mode>] <listenAddress> <listenPort> <proxyDN> <proxyPassword>
+ * <remoteAddress1> <remotePort1> [<remoteAddress2> <remotePort2> ...]}
* </pre>
+ *
+ * Where {@code <mode>} is one of "round-robin", "fail-over", or "sharded". The default is round-robin.
*/
public final class Proxy {
/**
* Main method.
*
* @param args
- * The command line arguments: listen address, listen port,
+ * The command line arguments: [--load-balancer <mode>] listen address, listen port,
* remote address1, remote port1, remote address2, remote port2,
* ...
*/
public static void main(final String[] args) {
if (args.length < 6 || args.length % 2 != 0) {
- System.err.println("Usage: listenAddress listenPort "
- + "proxyDN proxyPassword remoteAddress1 remotePort1 "
- + "remoteAddress2 remotePort2 ...");
+ System.err.println("Usage: [--load-balancer <mode>] listenAddress listenPort "
+ + "proxyDN proxyPassword remoteAddress1 remotePort1 remoteAddress2 remotePort2 ...");
System.exit(1);
}
// Parse command line arguments.
- final String localAddress = args[0];
- final int localPort = Integer.parseInt(args[1]);
+ int i = 0;
- final String proxyDN = args[2];
- final String proxyPassword = args[3];
+ final LoadBalancingAlgorithm algorithm;
+ if ("--load-balancer".equals(args[i])) {
+ algorithm = getLoadBalancingAlgorithm(args[i + 1]);
+ i += 2;
+ } else {
+ algorithm = LoadBalancingAlgorithm.ROUND_ROBIN;
+ }
+
+ final String localAddress = args[i++];
+ final int localPort = Integer.parseInt(args[i++]);
+
+ final String proxyDN = args[i++];
+ final String proxyPassword = args[i++];
// Create load balancer.
// --- JCite pools ---
@@ -100,7 +112,7 @@
final List<ConnectionFactory> bindFactories = new LinkedList<>();
final Options bindFactoryOptions = Options.defaultOptions().set(HEARTBEAT_ENABLED, true);
- for (int i = 4; i < args.length; i += 2) {
+ for (; i < args.length; i += 2) {
final String remoteAddress = args[i];
final int remotePort = Integer.parseInt(args[i + 1]);
@@ -114,10 +126,8 @@
}
// --- JCite pools ---
- // --- JCite load balancer ---
- final ConnectionFactory factory = Connections.newRoundRobinLoadBalancer(factories, factoryOptions);
- final ConnectionFactory bindFactory = Connections.newRoundRobinLoadBalancer(bindFactories, bindFactoryOptions);
- // --- JCite load balancer ---
+ final ConnectionFactory factory = algorithm.newLoadBalancer(factories, factoryOptions);
+ final ConnectionFactory bindFactory = algorithm.newLoadBalancer(bindFactories, bindFactoryOptions);
// --- JCite backend ---
/*
@@ -156,6 +166,47 @@
// --- JCite listener ---
}
+ private static LoadBalancingAlgorithm getLoadBalancingAlgorithm(final String algorithmName) {
+ switch (algorithmName) {
+ case "round-robin":
+ return LoadBalancingAlgorithm.ROUND_ROBIN;
+ case "fail-over":
+ return LoadBalancingAlgorithm.FAIL_OVER;
+ case "sharded":
+ return LoadBalancingAlgorithm.SHARDED;
+ default:
+ System.err.println("Unrecognized load-balancing algorithm '" + algorithmName + "'. Should be one of "
+ + "'round-robin', 'fail-over', or 'sharded'.");
+ System.exit(1);
+ }
+ return LoadBalancingAlgorithm.ROUND_ROBIN; // keep compiler happy.
+ }
+
+ private enum LoadBalancingAlgorithm {
+ ROUND_ROBIN {
+ @Override
+ ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
+ // --- JCite load balancer ---
+ return Connections.newRoundRobinLoadBalancer(factories, options);
+ // --- JCite load balancer ---
+ }
+ },
+ FAIL_OVER {
+ @Override
+ ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
+ return Connections.newFailoverLoadBalancer(factories, options);
+ }
+ },
+ SHARDED {
+ @Override
+ ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
+ return Connections.newShardedRequestLoadBalancer(factories, options);
+ }
+ };
+
+ abstract ConnectionFactory newLoadBalancer(Collection<ConnectionFactory> factories, Options options);
+ }
+
private Proxy() {
// Not used.
}
--
Gitblit v1.10.0