From 6af1b61dec13bfbdd371672ef1e37eae63a079f7 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 17 Feb 2016 00:10:18 +0000
Subject: [PATCH] OPENDJSDK-16: implement sharded request load-balancer

---
 opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java         |  104 +++
 opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java                 |  134 +++++
 opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java                |    7 
 opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java |  959 ++++++++++++++++++++++++++++++++++++
 opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java         |  260 ++++++++++
 opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java      |   87 ++
 6 files changed, 1,527 insertions(+), 24 deletions(-)

diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
index ccffb0d..7bc89f4 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -34,12 +34,28 @@
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
+import org.forgerock.opendj.ldap.requests.GSSAPISASLBindRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
+import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
+import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
+import org.forgerock.util.Function;
 import org.forgerock.util.Option;
 import org.forgerock.util.Options;
 import org.forgerock.util.Reject;
+import org.forgerock.util.promise.NeverThrowsException;
 import org.forgerock.util.promise.Promise;
 import org.forgerock.util.time.Duration;
 
@@ -413,6 +429,7 @@
      * @param options
      *         This configuration options for the load-balancer.
      * @return The new round-robin load balancer.
+     * @see #newShardedRequestLoadBalancer(Collection, Options)
      * @see #newFailoverLoadBalancer(Collection, Options)
      * @see #LOAD_BALANCER_EVENT_LISTENER
      * @see #LOAD_BALANCER_MONITORING_INTERVAL
@@ -480,6 +497,7 @@
      *         This configuration options for the load-balancer.
      * @return The new fail-over load balancer.
      * @see #newRoundRobinLoadBalancer(Collection, Options)
+     * @see #newShardedRequestLoadBalancer(Collection, Options)
      * @see #LOAD_BALANCER_EVENT_LISTENER
      * @see #LOAD_BALANCER_MONITORING_INTERVAL
      * @see #LOAD_BALANCER_SCHEDULER
@@ -496,6 +514,122 @@
     }
 
     /**
+     * Creates a new "sharded" load-balancer which will load-balance individual requests across the provided set of
+     * connection factories, each typically representing a single replica, using an algorithm that ensures that requests
+     * targeting a given DN will always be routed to the same replica. In other words, this load-balancer increases
+     * consistency whilst maintaining read-scalability by simulating a "single master" replication topology, where each
+     * replica is responsible for a subset of the entries. When a replica is unavailable the load-balancer "fails over"
+     * by performing a linear probe in order to find the next available replica thus ensuring high-availability when a
+     * network partition occurs while sacrificing consistency, since the unavailable replica may still be visible to
+     * other clients.
+     * <p/>
+     * This load-balancer distributes requests based on the hash of their target DN and handles all core operations, as
+     * well as any password modify extended requests and SASL bind requests which use authentication IDs having the
+     * "dn:" form. Note that subtree operations (searches, subtree deletes, and modify DN) are likely to include entries
+     * which are "mastered" on different replicas, so client applications should be more tolerant of inconsistencies.
+     * Requests that are either unrecognized or that do not have a parameter that may be considered to be a target DN
+     * will be routed randomly.
+     * <p/>
+     * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each
+     * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored
+     * since they cannot be routed; connection event listeners can be registered, but will only be notified when the
+     * fake connection is closed or when all of the connection factories are unavailable.
+     * <p/>
+     * <b>NOTE:</b> in deployments where there are multiple client applications, care should be taken to ensure that
+     * the factories are configured using the same ordering, otherwise requests will not be routed consistently
+     * across the client applications.
+     * <p/>
+     * The implementation periodically attempts to connect to failed connection factories in order to determine if they
+     * have become available again.
+     *
+     * @param factories
+     *         The connection factories.
+     * @param options
+     *         This configuration options for the load-balancer.
+     * @return The new affinity load balancer.
+     * @see #newRoundRobinLoadBalancer(Collection, Options)
+     * @see #newFailoverLoadBalancer(Collection, Options)
+     * @see #LOAD_BALANCER_EVENT_LISTENER
+     * @see #LOAD_BALANCER_MONITORING_INTERVAL
+     * @see #LOAD_BALANCER_SCHEDULER
+     */
+    public static ConnectionFactory newShardedRequestLoadBalancer(
+            final Collection<? extends ConnectionFactory> factories, final Options options) {
+        return new RequestLoadBalancer("ShardedRequestLoadBalancer",
+                                       factories,
+                                       options,
+                                       newShardedRequestLoadBalancerFunction(factories));
+    }
+
+    // Package private for testing.
+    static Function<Request, Integer, NeverThrowsException> newShardedRequestLoadBalancerFunction(
+            final Collection<? extends ConnectionFactory> factories) {
+        return new Function<Request, Integer, NeverThrowsException>() {
+            private final int maxIndex = factories.size();
+
+            @Override
+            public Integer apply(final Request request) {
+                // Normalize the hash to a valid factory index, taking care of negative hash values and especially
+                // Integer.MIN_VALUE (see doc for Math.abs()).
+                final int index = computeIndexBasedOnDnHashCode(request);
+                return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex);
+            }
+
+            private int computeIndexBasedOnDnHashCode(final Request request) {
+                // The following conditions are ordered such that the most common operations appear first in order to
+                // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
+                // would only apply to the core operations, not extended operations or SASL binds.
+                if (request instanceof SearchRequest) {
+                    return ((SearchRequest) request).getName().hashCode();
+                } else if (request instanceof ModifyRequest) {
+                    return ((ModifyRequest) request).getName().hashCode();
+                } else if (request instanceof SimpleBindRequest) {
+                    return hashCodeOfDnString(((SimpleBindRequest) request).getName());
+                } else if (request instanceof AddRequest) {
+                    return ((AddRequest) request).getName().hashCode();
+                } else if (request instanceof DeleteRequest) {
+                    return ((DeleteRequest) request).getName().hashCode();
+                } else if (request instanceof CompareRequest) {
+                    return ((CompareRequest) request).getName().hashCode();
+                } else if (request instanceof ModifyDNRequest) {
+                    return ((ModifyDNRequest) request).getName().hashCode();
+                } else if (request instanceof PasswordModifyExtendedRequest) {
+                    return hashCodeOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
+                } else if (request instanceof PlainSASLBindRequest) {
+                    return hashCodeOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
+                } else if (request instanceof DigestMD5SASLBindRequest) {
+                    return hashCodeOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
+                } else if (request instanceof GSSAPISASLBindRequest) {
+                    return hashCodeOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
+                } else if (request instanceof CRAMMD5SASLBindRequest) {
+                    return hashCodeOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
+                } else {
+                    return distributeRequestAtRandom();
+                }
+            }
+
+            private int hashCodeOfAuthzid(final String authzid) {
+                if (authzid != null && authzid.startsWith("dn:")) {
+                    return hashCodeOfDnString(authzid.substring(3));
+                }
+                return distributeRequestAtRandom();
+            }
+
+            private int hashCodeOfDnString(final String dnString) {
+                try {
+                    return DN.valueOf(dnString).hashCode();
+                } catch (final IllegalArgumentException ignored) {
+                    return distributeRequestAtRandom();
+                }
+            }
+
+            private int distributeRequestAtRandom() {
+                return ThreadLocalRandom.current().nextInt(0, maxIndex);
+            }
+        };
+    }
+
+    /**
      * Creates a new connection factory which forwards connection requests to
      * the provided factory, but whose {@code toString} method will always
      * return {@code name}.
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
index eaa3bc1..76ded9f 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
@@ -278,16 +278,13 @@
     /**
      * Return the first available connection factory starting from {@code initialIndex}.
      *
-     * @param initialIndex The index of the connection factory to be returned if operational. The index may be
-     *                     greater than the number of factories in which case this method will perform a modulus
-     *                     operation to bring it into range. NOTE: for performance reasons callers should attempt to
-     *                     use valid indexes because the modulus operation is relatively expensive.
+     * @param initialIndex The index of the connection factory to be returned if operational.
      * @return The first available connection factory starting from the initial index.
      * @throws LdapException If no connection factories are available.
      */
     final ConnectionFactory getMonitoredConnectionFactory(final int initialIndex) throws LdapException {
         final int maxIndex = monitoredFactories.size();
-        int index = initialIndex < maxIndex ? initialIndex : initialIndex % maxIndex;
+        int index = initialIndex;
         do {
             final MonitoredConnectionFactory factory = monitoredFactories.get(index);
             if (factory.isOperational.get()) {
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
new file mode 100644
index 0000000..b00775b
--- /dev/null
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
@@ -0,0 +1,260 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.forgerock.opendj.ldap.spi.LdapPromises.newFailedLdapPromise;
+import static org.forgerock.util.Utils.closeSilently;
+import static org.forgerock.util.promise.Promises.newResultPromise;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.opendj.ldap.spi.ConnectionState;
+import org.forgerock.opendj.ldap.spi.LdapPromises;
+import org.forgerock.util.AsyncFunction;
+import org.forgerock.util.Function;
+import org.forgerock.util.Options;
+import org.forgerock.util.promise.ExceptionHandler;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.forgerock.util.promise.Promise;
+import org.forgerock.util.promise.ResultHandler;
+
+/**
+ * A request based load balancer which load balances individual requests based on properties of the request, such as
+ * the target DN.
+ * <p>
+ * Implementations should override the method {@code getInitialConnectionFactoryIndex()} in order to provide the policy
+ * for selecting the first connection factory to use for each request.
+ */
+final class RequestLoadBalancer extends LoadBalancer {
+    /**
+     * A function which returns the index of the first connection factory which should be used in order to satisfy the
+     * next request. Implementations may base the decision on properties of the provided request, such as the target DN,
+     * whether the request is a read or update request, etc.
+     */
+    private final Function<Request, Integer, NeverThrowsException> nextFactoryFunction;
+
+    RequestLoadBalancer(final String loadBalancerName,
+                        final Collection<? extends ConnectionFactory> factories,
+                        final Options options,
+                        final Function<Request, Integer, NeverThrowsException> nextFactoryFunction) {
+        super(loadBalancerName, factories, options);
+        this.nextFactoryFunction = nextFactoryFunction;
+    }
+
+    @Override
+    public final Connection getConnection() throws LdapException {
+        return new ConnectionImpl();
+    }
+
+    @Override
+    public final Promise<Connection, LdapException> getConnectionAsync() {
+        return newResultPromise((Connection) new ConnectionImpl());
+    }
+
+    private class ConnectionImpl extends AbstractAsynchronousConnection {
+        private final ConnectionState state = new ConnectionState();
+
+        @Override
+        public String toString() {
+            return getLoadBalancerName() + "Connection";
+        }
+
+        @Override
+        public LdapPromise<Void> abandonAsync(final AbandonRequest request) {
+            // We cannot possibly route these correctly, so just drop them.
+            return LdapPromises.newSuccessfulLdapPromise(null);
+        }
+
+        @Override
+        public LdapPromise<Result> addAsync(
+                final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.addAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public void addConnectionEventListener(final ConnectionEventListener listener) {
+            state.addConnectionEventListener(listener);
+        }
+
+        @Override
+        public LdapPromise<BindResult> bindAsync(
+                final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, BindResult, LdapException>() {
+                @Override
+                public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.bindAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public void close(final UnbindRequest request, final String reason) {
+            state.notifyConnectionClosed();
+        }
+
+        @Override
+        public LdapPromise<CompareResult> compareAsync(
+                final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, CompareResult, LdapException>() {
+                @Override
+                public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.compareAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public LdapPromise<Result> deleteAsync(
+                final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.deleteAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(
+                final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) {
+            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, R, LdapException>() {
+                @Override
+                public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.extendedRequestAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public boolean isClosed() {
+            return state.isClosed();
+        }
+
+        @Override
+        public boolean isValid() {
+            return state.isValid();
+        }
+
+        @Override
+        public LdapPromise<Result> modifyAsync(
+                final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.modifyAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public LdapPromise<Result> modifyDNAsync(
+                final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
+            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.modifyDNAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public void removeConnectionEventListener(final ConnectionEventListener listener) {
+            state.removeConnectionEventListener(listener);
+        }
+
+        @Override
+        public LdapPromise<Result> searchAsync(
+                final SearchRequest request,
+                final IntermediateResponseHandler intermediateResponseHandler,
+                final SearchResultHandler entryHandler) {
+            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.searchAsync(request, intermediateResponseHandler, entryHandler);
+                }
+            });
+        }
+
+        private <R> LdapPromise<R> getConnectionAndSendRequest(
+                final Request request, final AsyncFunction<Connection, R, LdapException> sendRequest) {
+            if (state.isClosed()) {
+                throw new IllegalStateException();
+            }
+            final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
+            return getConnectionAsync(request)
+                    .thenOnResult(new ResultHandler<Connection>() {
+                        @Override
+                        public void handleResult(final Connection connection) {
+                            connectionHolder.set(connection);
+                        }
+                    })
+                    .thenAsync(sendRequest)
+                    .thenFinally(new Runnable() {
+                        @Override
+                        public void run() {
+                            closeSilently(connectionHolder.get());
+                        }
+                    });
+        }
+
+        private LdapPromise<Connection> getConnectionAsync(final Request request) {
+            try {
+                final int index = nextFactoryFunction.apply(request);
+                final ConnectionFactory factory = getMonitoredConnectionFactory(index);
+                return LdapPromises.asPromise(factory.getConnectionAsync()
+                                                     .thenOnException(new ExceptionHandler<LdapException>() {
+                                                         @Override
+                                                         public void handleException(final LdapException e) {
+                                                             state.notifyConnectionError(false, e);
+                                                         }
+                                                     }));
+            } catch (final LdapException e) {
+                state.notifyConnectionError(false, e);
+                return newFailedLdapPromise(e);
+            }
+        }
+    }
+}
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
index ff164c2..e589ce8 100644
--- a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
@@ -21,14 +21,34 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2016 ForgeRock AS
  */
 package org.forgerock.opendj.ldap;
 
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.Connections.newShardedRequestLoadBalancerFunction;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
 
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
+import org.forgerock.opendj.ldap.requests.GSSAPISASLBindRequest;
+import org.forgerock.opendj.ldap.requests.GenericExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
+import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
+import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
 import org.testng.annotations.Test;
 
 @SuppressWarnings("javadoc")
@@ -57,4 +77,86 @@
         uncloseable.close(null, null);
         verifyZeroInteractions(connection);
     }
+
+    @Test
+    public void shardedRequestLoadBalancerUsesConsistentIndexing() {
+        final Function<Request, Integer, NeverThrowsException> f =
+                newShardedRequestLoadBalancerFunction(asList(mock(ConnectionFactory.class),
+                                                             mock(ConnectionFactory.class)));
+
+        // These two DNs have a different hash code.
+        final DN dn1 = DN.valueOf("cn=target1,dc=example,dc=com");
+        final DN dn2 = DN.valueOf("cn=target2,dc=example,dc=com");
+
+        final AddRequest addRequest = mock(AddRequest.class);
+        when(addRequest.getName()).thenReturn(dn1, dn2);
+        final int dn1index = index(f, addRequest);
+        final int dn2index = index(f, addRequest);
+        assertThat(dn1index).isNotEqualTo(dn2index);
+
+        final SimpleBindRequest simpleBindRequest = mock(SimpleBindRequest.class);
+        when(simpleBindRequest.getName()).thenReturn(dn1.toString(), dn2.toString());
+        assertRequestsAreRoutedConsistently(f, simpleBindRequest, dn1index, dn2index);
+
+        final CompareRequest compareRequest = mock(CompareRequest.class);
+        when(compareRequest.getName()).thenReturn(dn1, dn2);
+        assertRequestsAreRoutedConsistently(f, compareRequest, dn1index, dn2index);
+
+        final DeleteRequest deleteRequest = mock(DeleteRequest.class);
+        when(deleteRequest.getName()).thenReturn(dn1, dn2);
+        assertRequestsAreRoutedConsistently(f, deleteRequest, dn1index, dn2index);
+
+        final ModifyRequest modifyRequest = mock(ModifyRequest.class);
+        when(modifyRequest.getName()).thenReturn(dn1, dn2);
+        assertRequestsAreRoutedConsistently(f, modifyRequest, dn1index, dn2index);
+
+        final ModifyDNRequest modifyDNRequest = mock(ModifyDNRequest.class);
+        when(modifyDNRequest.getName()).thenReturn(dn1, dn2);
+        assertRequestsAreRoutedConsistently(f, modifyDNRequest, dn1index, dn2index);
+
+        final SearchRequest searchRequest = mock(SearchRequest.class);
+        when(searchRequest.getName()).thenReturn(dn1, dn2);
+        assertRequestsAreRoutedConsistently(f, searchRequest, dn1index, dn2index);
+
+        // Authzid based operations.
+        final String authzid1 = "dn:" + dn1.toString();
+        final String authzid2 = "dn:" + dn2.toString();
+
+        final PasswordModifyExtendedRequest passwordModifyRequest = mock(PasswordModifyExtendedRequest.class);
+        when(passwordModifyRequest.getUserIdentityAsString()).thenReturn(authzid1, authzid2);
+        assertRequestsAreRoutedConsistently(f, passwordModifyRequest, dn1index, dn2index);
+
+        final PlainSASLBindRequest plainSASLBindRequest = mock(PlainSASLBindRequest.class);
+        when(plainSASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
+        assertRequestsAreRoutedConsistently(f, plainSASLBindRequest, dn1index, dn2index);
+
+        final CRAMMD5SASLBindRequest cramMD5SASLBindRequest = mock(CRAMMD5SASLBindRequest.class);
+        when(cramMD5SASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
+        assertRequestsAreRoutedConsistently(f, cramMD5SASLBindRequest, dn1index, dn2index);
+
+        final DigestMD5SASLBindRequest digestMD5SASLBindRequest = mock(DigestMD5SASLBindRequest.class);
+        when(digestMD5SASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
+        assertRequestsAreRoutedConsistently(f, digestMD5SASLBindRequest, dn1index, dn2index);
+
+        final GSSAPISASLBindRequest gssapiSASLBindRequest = mock(GSSAPISASLBindRequest.class);
+        when(gssapiSASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
+        assertRequestsAreRoutedConsistently(f, gssapiSASLBindRequest, dn1index, dn2index);
+
+        // Requests that have no target will return a random index, but since we only have one factory the index will
+        // always be 0.
+        final GenericExtendedRequest genericExtendedRequest = mock(GenericExtendedRequest.class);
+        assertThat(index(f, genericExtendedRequest)).isBetween(0, 1);
+    }
+
+    private void assertRequestsAreRoutedConsistently(final Function<Request, Integer, NeverThrowsException> f,
+                                                     final Request r,
+                                                     final int firstExpectedIndex,
+                                                     final int secondExpectedIndex) {
+        assertThat(index(f, r)).isEqualTo(firstExpectedIndex);
+        assertThat(index(f, r)).isEqualTo(secondExpectedIndex);
+    }
+
+    private int index(final Function<Request, Integer, NeverThrowsException> function, final Request request) {
+        return function.apply(request);
+    }
 }
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
new file mode 100644
index 0000000..6500ff4
--- /dev/null
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
@@ -0,0 +1,959 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.forgerock.opendj.ldap.LdapException.newLdapException;
+import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_CONNECT_ERROR;
+import static org.forgerock.opendj.ldap.responses.Responses.newBindResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newCompareResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newResult;
+import static org.forgerock.opendj.ldap.spi.LdapPromises.newSuccessfulLdapPromise;
+import static org.forgerock.util.Options.defaultOptions;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNotNull;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.util.ArrayList;
+import java.util.logging.Level;
+
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.GenericExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.forgerock.util.promise.Promises;
+import org.mockito.Mock;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+public class RequestLoadBalancerTestCase extends SdkTestCase {
+    @Test
+    public void closeLoadBalancerShouldCloseDelegateFactories() throws Exception {
+        configureAllFactoriesOnline();
+        loadBalancer.close();
+        verify(factory1).close();
+        verify(factory2).close();
+        verify(factory3).close();
+    }
+
+    @Test
+    public void getConnectionShouldNotInvokeDelegateFactory() throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnection()) {
+            assertThat(connection).isNotNull();
+            verifyZeroInteractions(factory1, factory2, factory3);
+        }
+        verifyZeroInteractions(factory1, factory2, factory3);
+        loadBalancer.close();
+    }
+
+    @Test
+    public void getConnectionReturnANewConnectionEachTime() throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection1 = loadBalancer.getConnection();
+             Connection connection2 = loadBalancer.getConnection()) {
+            assertThat(connection1).isNotSameAs(connection2);
+        }
+        loadBalancer.close();
+    }
+
+    @Test
+    public void getConnectionAsyncReturnANewConnectionEachTime() throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection1 = loadBalancer.getConnectionAsync().get();
+             Connection connection2 = loadBalancer.getConnectionAsync().get()) {
+            assertThat(connection1).isNotSameAs(connection2);
+        }
+        loadBalancer.close();
+    }
+
+    @Test
+    public void getConnectionAsyncShouldNotInvokeDelegateFactory() throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            assertThat(connection).isNotNull();
+            verifyZeroInteractions(factory1, factory2, factory3);
+        }
+        verifyZeroInteractions(factory1, factory2, factory3);
+    }
+
+    @Test
+    public void connectionEventListenersNotifiedOnClose() throws Exception {
+        configureAllFactoriesOnline();
+        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.addConnectionEventListener(listener);
+        }
+        verify(listener).handleConnectionClosed();
+    }
+
+    @Test
+    public void connectionEventListenersNotifiedOnError() throws Exception {
+        configureAllFactoriesOffline();
+        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.addConnectionEventListener(listener);
+            try {
+                connection.add(addRequest1);
+                fail("add unexpectedly succeeded");
+            } catch (LdapException ignored) {
+                // Ignore.
+            }
+        }
+        verify(listener).handleConnectionError(eq(false), any(LdapException.class));
+        verify(listener).handleConnectionClosed();
+    }
+
+    @Test
+    public void removedConnectionEventListenersShouldNotBeNotified() throws Exception {
+        configureAllFactoriesOnline();
+        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.addConnectionEventListener(listener);
+            connection.removeConnectionEventListener(listener);
+        }
+        verifyZeroInteractions(listener);
+    }
+
+    @Test
+    public void validAndClosedStateShouldBeMaintained() throws Exception {
+        configureAllFactoriesOffline();
+        final Connection connection = loadBalancer.getConnectionAsync().get();
+        assertThat(connection.isValid()).isTrue();
+        assertThat(connection.isClosed()).isFalse();
+
+        try {
+            connection.add(addRequest1);
+            fail("add unexpectedly succeeded");
+        } catch (LdapException ignored) {
+            // Ignore.
+        }
+        assertThat(connection.isValid()).isFalse();
+        assertThat(connection.isClosed()).isFalse();
+
+        connection.close();
+        assertThat(connection.isValid()).isFalse();
+        assertThat(connection.isClosed()).isTrue();
+    }
+
+    @Test
+    public void factoryToStringShouldReturnANonEmptyString() throws Exception {
+        configureAllFactoriesOffline();
+        assertThat(loadBalancer.toString()).isNotEmpty();
+    }
+
+    @Test
+    public void connectionToStringShouldReturnANonEmptyString() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            assertThat(connection.toString()).isNotEmpty();
+        }
+    }
+
+    @Test
+    public void abandonRequestShouldBeIgnored() throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.abandonAsync(mock(AbandonRequest.class));
+        }
+        verifyZeroInteractions(factory1, factory2, factory3);
+    }
+
+    // We can't use a DataProviders here because the mocks will be re-initialized for each test method call.
+
+    // ################## Add Requests ####################
+
+    @Test
+    public void addRequestsShouldBeRoutedCorrectly1() throws Exception {
+        addRequestsShouldBeRoutedCorrectlyImpl(addRequest1, factory1, connection1);
+    }
+
+    @Test
+    public void addRequestsShouldBeRoutedCorrectly2() throws Exception {
+        addRequestsShouldBeRoutedCorrectlyImpl(addRequest2, factory2, connection2);
+    }
+
+    @Test
+    public void addRequestsShouldBeRoutedCorrectly3() throws Exception {
+        addRequestsShouldBeRoutedCorrectlyImpl(addRequest3, factory3, connection3);
+    }
+
+    private void addRequestsShouldBeRoutedCorrectlyImpl(final AddRequest addRequest,
+                                                        final ConnectionFactory expectedFactory,
+                                                        final Connection expectedConnection) throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.add(addRequest);
+        }
+        verify(expectedFactory).getConnectionAsync();
+        verify(expectedConnection).addAsync(same(addRequest), isNull(IntermediateResponseHandler.class));
+        verify(expectedConnection).close();
+        verifyZeroInteractionsForRemainingFactories(expectedFactory);
+    }
+
+    @Test
+    public void addRequestsShouldLinearProbeOnFailure1() throws Exception {
+        addRequestsShouldLinearProbeOnFailureImpl(addRequest1);
+    }
+
+    @Test
+    public void addRequestsShouldLinearProbeOnFailure2() throws Exception {
+        addRequestsShouldLinearProbeOnFailureImpl(addRequest2);
+    }
+
+    @Test
+    public void addRequestsShouldLinearProbeOnFailure3() throws Exception {
+        addRequestsShouldLinearProbeOnFailureImpl(addRequest3);
+    }
+
+    private void addRequestsShouldLinearProbeOnFailureImpl(final AddRequest addRequest) throws Exception {
+        configureFactoriesOneAndTwoOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.add(addRequest);
+        }
+        verify(factory3).getConnectionAsync();
+        verify(connection3).addAsync(same(addRequest), isNull(IntermediateResponseHandler.class));
+        verify(connection3).close();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+    }
+
+    @Test
+    public void addRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            try {
+                connection.add(addRequest1);
+            } catch (ConnectionException e) {
+                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+            }
+        }
+        verify(factory1, atLeastOnce()).getConnectionAsync();
+        verify(factory2, atLeastOnce()).getConnectionAsync();
+        verify(factory3, atLeastOnce()).getConnectionAsync();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+        verifyZeroInteractions(connection3);
+    }
+
+    // ################## Bind Requests ####################
+
+    @Test
+    public void bindRequestsShouldBeRoutedCorrectly1() throws Exception {
+        bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest1, factory1, connection1);
+    }
+
+    @Test
+    public void bindRequestsShouldBeRoutedCorrectly2() throws Exception {
+        bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest2, factory2, connection2);
+    }
+
+    @Test
+    public void bindRequestsShouldBeRoutedCorrectly3() throws Exception {
+        bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest3, factory3, connection3);
+    }
+
+    private void bindRequestsShouldBeRoutedCorrectlyImpl(final BindRequest bindRequest,
+                                                         final ConnectionFactory expectedFactory,
+                                                         final Connection expectedConnection) throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.bind(bindRequest);
+        }
+        verify(expectedFactory).getConnectionAsync();
+        verify(expectedConnection).bindAsync(same(bindRequest), isNull(IntermediateResponseHandler.class));
+        verify(expectedConnection).close();
+        verifyZeroInteractionsForRemainingFactories(expectedFactory);
+    }
+
+    @Test
+    public void bindRequestsShouldLinearProbeOnFailure1() throws Exception {
+        bindRequestsShouldLinearProbeOnFailureImpl(bindRequest1);
+    }
+
+    @Test
+    public void bindRequestsShouldLinearProbeOnFailure2() throws Exception {
+        bindRequestsShouldLinearProbeOnFailureImpl(bindRequest2);
+    }
+
+    @Test
+    public void bindRequestsShouldLinearProbeOnFailure3() throws Exception {
+        bindRequestsShouldLinearProbeOnFailureImpl(bindRequest3);
+    }
+
+    private void bindRequestsShouldLinearProbeOnFailureImpl(final BindRequest bindRequest) throws Exception {
+        configureFactoriesOneAndTwoOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.bind(bindRequest);
+        }
+        verify(factory3).getConnectionAsync();
+        verify(connection3).bindAsync(same(bindRequest), isNull(IntermediateResponseHandler.class));
+        verify(connection3).close();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+    }
+
+    @Test
+    public void bindRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            try {
+                connection.bind(bindRequest1);
+            } catch (ConnectionException e) {
+                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+            }
+        }
+        verify(factory1, atLeastOnce()).getConnectionAsync();
+        verify(factory2, atLeastOnce()).getConnectionAsync();
+        verify(factory3, atLeastOnce()).getConnectionAsync();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+        verifyZeroInteractions(connection3);
+    }
+
+    // ################## Compare Requests ####################
+
+    @Test
+    public void compareRequestsShouldBeRoutedCorrectly1() throws Exception {
+        compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest1, factory1, connection1);
+    }
+
+    @Test
+    public void compareRequestsShouldBeRoutedCorrectly2() throws Exception {
+        compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest2, factory2, connection2);
+    }
+
+    @Test
+    public void compareRequestsShouldBeRoutedCorrectly3() throws Exception {
+        compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest3, factory3, connection3);
+    }
+
+    private void compareRequestsShouldBeRoutedCorrectlyImpl(final CompareRequest compareRequest,
+                                                            final ConnectionFactory expectedFactory,
+                                                            final Connection expectedConnection) throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.compare(compareRequest);
+        }
+        verify(expectedFactory).getConnectionAsync();
+        verify(expectedConnection).compareAsync(same(compareRequest), isNull(IntermediateResponseHandler.class));
+        verify(expectedConnection).close();
+        verifyZeroInteractionsForRemainingFactories(expectedFactory);
+    }
+
+    @Test
+    public void compareRequestsShouldLinearProbeOnFailure1() throws Exception {
+        compareRequestsShouldLinearProbeOnFailureImpl(compareRequest1);
+    }
+
+    @Test
+    public void compareRequestsShouldLinearProbeOnFailure2() throws Exception {
+        compareRequestsShouldLinearProbeOnFailureImpl(compareRequest2);
+    }
+
+    @Test
+    public void compareRequestsShouldLinearProbeOnFailure3() throws Exception {
+        compareRequestsShouldLinearProbeOnFailureImpl(compareRequest3);
+    }
+
+    private void compareRequestsShouldLinearProbeOnFailureImpl(final CompareRequest compareRequest) throws Exception {
+        configureFactoriesOneAndTwoOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.compare(compareRequest);
+        }
+        verify(factory3).getConnectionAsync();
+        verify(connection3).compareAsync(same(compareRequest), isNull(IntermediateResponseHandler.class));
+        verify(connection3).close();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+    }
+
+    @Test
+    public void compareRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            try {
+                connection.compare(compareRequest1);
+            } catch (ConnectionException e) {
+                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+            }
+        }
+        verify(factory1, atLeastOnce()).getConnectionAsync();
+        verify(factory2, atLeastOnce()).getConnectionAsync();
+        verify(factory3, atLeastOnce()).getConnectionAsync();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+        verifyZeroInteractions(connection3);
+    }
+
+    // ################## Delete Requests ####################
+
+    @Test
+    public void deleteRequestsShouldBeRoutedCorrectly1() throws Exception {
+        deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest1, factory1, connection1);
+    }
+
+    @Test
+    public void deleteRequestsShouldBeRoutedCorrectly2() throws Exception {
+        deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest2, factory2, connection2);
+    }
+
+    @Test
+    public void deleteRequestsShouldBeRoutedCorrectly3() throws Exception {
+        deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest3, factory3, connection3);
+    }
+
+    private void deleteRequestsShouldBeRoutedCorrectlyImpl(final DeleteRequest deleteRequest,
+                                                           final ConnectionFactory expectedFactory,
+                                                           final Connection expectedConnection) throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.delete(deleteRequest);
+        }
+        verify(expectedFactory).getConnectionAsync();
+        verify(expectedConnection).deleteAsync(same(deleteRequest), isNull(IntermediateResponseHandler.class));
+        verify(expectedConnection).close();
+        verifyZeroInteractionsForRemainingFactories(expectedFactory);
+    }
+
+    @Test
+    public void deleteRequestsShouldLinearProbeOnFailure1() throws Exception {
+        deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest1);
+    }
+
+    @Test
+    public void deleteRequestsShouldLinearProbeOnFailure2() throws Exception {
+        deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest2);
+    }
+
+    @Test
+    public void deleteRequestsShouldLinearProbeOnFailure3() throws Exception {
+        deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest3);
+    }
+
+    private void deleteRequestsShouldLinearProbeOnFailureImpl(final DeleteRequest deleteRequest) throws Exception {
+        configureFactoriesOneAndTwoOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.delete(deleteRequest);
+        }
+        verify(factory3).getConnectionAsync();
+        verify(connection3).deleteAsync(same(deleteRequest), isNull(IntermediateResponseHandler.class));
+        verify(connection3).close();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+    }
+
+    @Test
+    public void deleteRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            try {
+                connection.delete(deleteRequest1);
+            } catch (ConnectionException e) {
+                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+            }
+        }
+        verify(factory1, atLeastOnce()).getConnectionAsync();
+        verify(factory2, atLeastOnce()).getConnectionAsync();
+        verify(factory3, atLeastOnce()).getConnectionAsync();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+        verifyZeroInteractions(connection3);
+    }
+
+    // ################## Extended Requests ####################
+
+    @Test
+    public void extendedRequestsShouldBeRoutedCorrectly1() throws Exception {
+        extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest1, factory1, connection1);
+    }
+
+    @Test
+    public void extendedRequestsShouldBeRoutedCorrectly2() throws Exception {
+        extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest2, factory2, connection2);
+    }
+
+    @Test
+    public void extendedRequestsShouldBeRoutedCorrectly3() throws Exception {
+        extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest3, factory3, connection3);
+    }
+
+    private void extendedRequestsShouldBeRoutedCorrectlyImpl(final ExtendedRequest<?> extendedRequest,
+                                                             final ConnectionFactory expectedFactory,
+                                                             final Connection expectedConnection) throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.extendedRequest(extendedRequest);
+        }
+        verify(expectedFactory).getConnectionAsync();
+        verify(expectedConnection).extendedRequestAsync(same(extendedRequest),
+                                                        isNull(IntermediateResponseHandler.class));
+        verify(expectedConnection).close();
+        verifyZeroInteractionsForRemainingFactories(expectedFactory);
+    }
+
+    @Test
+    public void extendedRequestsShouldLinearProbeOnFailure1() throws Exception {
+        extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest1);
+    }
+
+    @Test
+    public void extendedRequestsShouldLinearProbeOnFailure2() throws Exception {
+        extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest2);
+    }
+
+    @Test
+    public void extendedRequestsShouldLinearProbeOnFailure3() throws Exception {
+        extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest3);
+    }
+
+    private void extendedRequestsShouldLinearProbeOnFailureImpl(
+            final ExtendedRequest<?> extendedRequest) throws Exception {
+        configureFactoriesOneAndTwoOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.extendedRequest(extendedRequest);
+        }
+        verify(factory3).getConnectionAsync();
+        verify(connection3).extendedRequestAsync(same(extendedRequest), isNull(IntermediateResponseHandler.class));
+        verify(connection3).close();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+    }
+
+    @Test
+    public void extendedRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            try {
+                connection.extendedRequest(extendedRequest1);
+            } catch (ConnectionException e) {
+                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+            }
+        }
+        verify(factory1, atLeastOnce()).getConnectionAsync();
+        verify(factory2, atLeastOnce()).getConnectionAsync();
+        verify(factory3, atLeastOnce()).getConnectionAsync();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+        verifyZeroInteractions(connection3);
+    }
+
+    // ################## Modify Requests ####################
+
+    @Test
+    public void modifyRequestsShouldBeRoutedCorrectly1() throws Exception {
+        modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest1, factory1, connection1);
+    }
+
+    @Test
+    public void modifyRequestsShouldBeRoutedCorrectly2() throws Exception {
+        modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest2, factory2, connection2);
+    }
+
+    @Test
+    public void modifyRequestsShouldBeRoutedCorrectly3() throws Exception {
+        modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest3, factory3, connection3);
+    }
+
+    private void modifyRequestsShouldBeRoutedCorrectlyImpl(final ModifyRequest modifyRequest,
+                                                           final ConnectionFactory expectedFactory,
+                                                           final Connection expectedConnection) throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.modify(modifyRequest);
+        }
+        verify(expectedFactory).getConnectionAsync();
+        verify(expectedConnection).modifyAsync(same(modifyRequest), isNull(IntermediateResponseHandler.class));
+        verify(expectedConnection).close();
+        verifyZeroInteractionsForRemainingFactories(expectedFactory);
+    }
+
+    @Test
+    public void modifyRequestsShouldLinearProbeOnFailure1() throws Exception {
+        modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest1);
+    }
+
+    @Test
+    public void modifyRequestsShouldLinearProbeOnFailure2() throws Exception {
+        modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest2);
+    }
+
+    @Test
+    public void modifyRequestsShouldLinearProbeOnFailure3() throws Exception {
+        modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest3);
+    }
+
+    private void modifyRequestsShouldLinearProbeOnFailureImpl(final ModifyRequest modifyRequest) throws Exception {
+        configureFactoriesOneAndTwoOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.modify(modifyRequest);
+        }
+        verify(factory3).getConnectionAsync();
+        verify(connection3).modifyAsync(same(modifyRequest), isNull(IntermediateResponseHandler.class));
+        verify(connection3).close();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+    }
+
+    @Test
+    public void modifyRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            try {
+                connection.modify(modifyRequest1);
+            } catch (ConnectionException e) {
+                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+            }
+        }
+        verify(factory1, atLeastOnce()).getConnectionAsync();
+        verify(factory2, atLeastOnce()).getConnectionAsync();
+        verify(factory3, atLeastOnce()).getConnectionAsync();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+        verifyZeroInteractions(connection3);
+    }
+
+    // ################## ModifyDN Requests ####################
+
+    @Test
+    public void modifyDNRequestsShouldBeRoutedCorrectly1() throws Exception {
+        modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest1, factory1, connection1);
+    }
+
+    @Test
+    public void modifyDNRequestsShouldBeRoutedCorrectly2() throws Exception {
+        modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest2, factory2, connection2);
+    }
+
+    @Test
+    public void modifyDNRequestsShouldBeRoutedCorrectly3() throws Exception {
+        modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest3, factory3, connection3);
+    }
+
+    private void modifyDNRequestsShouldBeRoutedCorrectlyImpl(final ModifyDNRequest modifyDNRequest,
+                                                             final ConnectionFactory expectedFactory,
+                                                             final Connection expectedConnection) throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.modifyDN(modifyDNRequest);
+        }
+        verify(expectedFactory).getConnectionAsync();
+        verify(expectedConnection).modifyDNAsync(same(modifyDNRequest), isNull(IntermediateResponseHandler.class));
+        verify(expectedConnection).close();
+        verifyZeroInteractionsForRemainingFactories(expectedFactory);
+    }
+
+    @Test
+    public void modifyDNRequestsShouldLinearProbeOnFailure1() throws Exception {
+        modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest1);
+    }
+
+    @Test
+    public void modifyDNRequestsShouldLinearProbeOnFailure2() throws Exception {
+        modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest2);
+    }
+
+    @Test
+    public void modifyDNRequestsShouldLinearProbeOnFailure3() throws Exception {
+        modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest3);
+    }
+
+    private void modifyDNRequestsShouldLinearProbeOnFailureImpl(
+            final ModifyDNRequest modifyDNRequest) throws Exception {
+        configureFactoriesOneAndTwoOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.modifyDN(modifyDNRequest);
+        }
+        verify(factory3).getConnectionAsync();
+        verify(connection3).modifyDNAsync(same(modifyDNRequest), isNull(IntermediateResponseHandler.class));
+        verify(connection3).close();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+    }
+
+    @Test
+    public void modifyDNRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            try {
+                connection.modifyDN(modifyDNRequest1);
+            } catch (ConnectionException e) {
+                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+            }
+        }
+        verify(factory1, atLeastOnce()).getConnectionAsync();
+        verify(factory2, atLeastOnce()).getConnectionAsync();
+        verify(factory3, atLeastOnce()).getConnectionAsync();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+        verifyZeroInteractions(connection3);
+    }
+
+    // ################## Search Requests ####################
+
+    @Test
+    public void searchRequestsShouldBeRoutedCorrectly1() throws Exception {
+        searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest1, factory1, connection1);
+    }
+
+    @Test
+    public void searchRequestsShouldBeRoutedCorrectly2() throws Exception {
+        searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest2, factory2, connection2);
+    }
+
+    @Test
+    public void searchRequestsShouldBeRoutedCorrectly3() throws Exception {
+        searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest3, factory3, connection3);
+    }
+
+    private void searchRequestsShouldBeRoutedCorrectlyImpl(final SearchRequest searchRequest,
+                                                           final ConnectionFactory expectedFactory,
+                                                           final Connection expectedConnection) throws Exception {
+        configureAllFactoriesOnline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.search(searchRequest, new ArrayList<>());
+        }
+        verify(expectedFactory).getConnectionAsync();
+        verify(expectedConnection).searchAsync(same(searchRequest),
+                                               isNull(IntermediateResponseHandler.class),
+                                               isNotNull(SearchResultHandler.class));
+        verify(expectedConnection).close();
+        verifyZeroInteractionsForRemainingFactories(expectedFactory);
+    }
+
+    @Test
+    public void searchRequestsShouldLinearProbeOnFailure1() throws Exception {
+        searchRequestsShouldLinearProbeOnFailureImpl(searchRequest1);
+    }
+
+    @Test
+    public void searchRequestsShouldLinearProbeOnFailure2() throws Exception {
+        searchRequestsShouldLinearProbeOnFailureImpl(searchRequest2);
+    }
+
+    @Test
+    public void searchRequestsShouldLinearProbeOnFailure3() throws Exception {
+        searchRequestsShouldLinearProbeOnFailureImpl(searchRequest3);
+    }
+
+    private void searchRequestsShouldLinearProbeOnFailureImpl(final SearchRequest searchRequest) throws Exception {
+        configureFactoriesOneAndTwoOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            connection.search(searchRequest, new ArrayList<>());
+        }
+        verify(factory3).getConnectionAsync();
+        verify(connection3).searchAsync(same(searchRequest),
+                                        isNull(IntermediateResponseHandler.class),
+                                        isNotNull(SearchResultHandler.class));
+        verify(connection3).close();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+    }
+
+    @Test
+    public void searchRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
+        configureAllFactoriesOffline();
+        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
+            try {
+                connection.search(searchRequest1, new ArrayList<>());
+            } catch (ConnectionException e) {
+                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
+            }
+        }
+        verify(factory1, atLeastOnce()).getConnectionAsync();
+        verify(factory2, atLeastOnce()).getConnectionAsync();
+        verify(factory3, atLeastOnce()).getConnectionAsync();
+        verifyZeroInteractions(connection1);
+        verifyZeroInteractions(connection2);
+        verifyZeroInteractions(connection3);
+    }
+
+    @Mock private ConnectionFactory factory1;
+    @Mock private ConnectionFactory factory2;
+    @Mock private ConnectionFactory factory3;
+
+    @Mock private AbstractAsynchronousConnection connection1;
+    @Mock private AbstractAsynchronousConnection connection2;
+    @Mock private AbstractAsynchronousConnection connection3;
+
+    @Mock private AddRequest addRequest1;
+    @Mock private AddRequest addRequest2;
+    @Mock private AddRequest addRequest3;
+
+    @Mock private BindRequest bindRequest1;
+    @Mock private BindRequest bindRequest2;
+    @Mock private BindRequest bindRequest3;
+
+    @Mock private CompareRequest compareRequest1;
+    @Mock private CompareRequest compareRequest2;
+    @Mock private CompareRequest compareRequest3;
+
+    @Mock private DeleteRequest deleteRequest1;
+    @Mock private DeleteRequest deleteRequest2;
+    @Mock private DeleteRequest deleteRequest3;
+
+    @Mock private GenericExtendedRequest extendedRequest1;
+    @Mock private GenericExtendedRequest extendedRequest2;
+    @Mock private GenericExtendedRequest extendedRequest3;
+
+    @Mock private ModifyRequest modifyRequest1;
+    @Mock private ModifyRequest modifyRequest2;
+    @Mock private ModifyRequest modifyRequest3;
+
+    @Mock private ModifyDNRequest modifyDNRequest1;
+    @Mock private ModifyDNRequest modifyDNRequest2;
+    @Mock private ModifyDNRequest modifyDNRequest3;
+
+    @Mock private SearchRequest searchRequest1;
+    @Mock private SearchRequest searchRequest2;
+    @Mock private SearchRequest searchRequest3;
+
+    private ConnectionFactory loadBalancer;
+
+    @BeforeMethod
+    public void beforeMethod() {
+        TestCaseUtils.setDefaultLogLevel(Level.SEVERE);
+        initMocks(this);
+        stub(this.connection1);
+        stub(this.connection2);
+        stub(this.connection3);
+        loadBalancer = new RequestLoadBalancer("Test",
+                                               asList(factory1, factory2, factory3),
+                                               defaultOptions(), newNextFactoryFunction());
+    }
+
+    private Function<Request, Integer, NeverThrowsException> newNextFactoryFunction() {
+        return new Function<Request, Integer, NeverThrowsException>() {
+            @Override
+            public Integer apply(final Request request) {
+                if (request == addRequest1 || request == bindRequest1 || request == compareRequest1
+                        || request == deleteRequest1 || request == extendedRequest1 || request == modifyRequest1
+                        || request == modifyDNRequest1 || request == searchRequest1) {
+                    return 0;
+                }
+
+                if (request == addRequest2 || request == bindRequest2 || request == compareRequest2
+                        || request == deleteRequest2 || request == extendedRequest2 || request == modifyRequest2
+                        || request == modifyDNRequest2 || request == searchRequest2) {
+                    return 1;
+                }
+
+                if (request == addRequest3 || request == bindRequest3 || request == compareRequest3
+                        || request == deleteRequest3 || request == extendedRequest3 || request == modifyRequest3
+                        || request == modifyDNRequest3 || request == searchRequest3) {
+                    return 2;
+                }
+
+                fail("Received unexpected request");
+                return -1; // Keep compiler happy.
+            }
+        };
+    }
+
+    private void stub(final Connection connection) {
+        when(connection.addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class)))
+                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+        when(connection.bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class)))
+                .thenReturn(newSuccessfulLdapPromise(newBindResult(ResultCode.SUCCESS)));
+        when(connection.compareAsync(any(CompareRequest.class), any(IntermediateResponseHandler.class)))
+                .thenReturn(newSuccessfulLdapPromise(newCompareResult(ResultCode.SUCCESS)));
+        when(connection.deleteAsync(any(DeleteRequest.class), any(IntermediateResponseHandler.class)))
+                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+        when(connection.extendedRequestAsync(any(GenericExtendedRequest.class), any(IntermediateResponseHandler.class)))
+                .thenReturn(newSuccessfulLdapPromise(newGenericExtendedResult(ResultCode.SUCCESS)));
+        when(connection.modifyAsync(any(ModifyRequest.class), any(IntermediateResponseHandler.class)))
+                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+        when(connection.modifyDNAsync(any(ModifyDNRequest.class), any(IntermediateResponseHandler.class)))
+                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+        when(connection.searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class),
+                                    any(SearchResultHandler.class)))
+                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
+    }
+
+    @AfterMethod
+    public void afterMethod() {
+        loadBalancer.close();
+        TestCaseUtils.setDefaultLogLevel(Level.INFO);
+    }
+
+    private void configureAllFactoriesOnline() {
+        when(factory1.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection1));
+        when(factory2.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection2));
+        when(factory3.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection3));
+    }
+
+    private void configureFactoriesOneAndTwoOffline() {
+        final LdapException connectionFailure = newLdapException(CLIENT_SIDE_CONNECT_ERROR);
+        when(factory1.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+        when(factory2.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+        when(factory3.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection3));
+    }
+
+    private void configureAllFactoriesOffline() {
+        final LdapException connectionFailure = newLdapException(CLIENT_SIDE_CONNECT_ERROR);
+        when(factory1.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+        when(factory2.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+        when(factory3.getConnectionAsync())
+                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
+    }
+
+    private void verifyZeroInteractionsForRemainingFactories(final ConnectionFactory expectedFactory) {
+        if (expectedFactory != factory1) {
+            verifyZeroInteractions(factory1);
+        }
+        if (expectedFactory != factory2) {
+            verifyZeroInteractions(factory2);
+        }
+        if (expectedFactory != factory3) {
+            verifyZeroInteractions(factory3);
+        }
+    }
+}
diff --git a/opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java b/opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
index 30177b6..661ef36 100644
--- a/opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
+++ b/opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -22,26 +22,27 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2015 ForgeRock AS.
+ *      Portions Copyright 2011-2016 ForgeRock AS.
  */
 
 package org.forgerock.opendj.examples;
 
 import static org.forgerock.opendj.ldap.LDAPConnectionFactory.AUTHN_BIND_REQUEST;
 import static org.forgerock.opendj.ldap.LDAPConnectionFactory.HEARTBEAT_ENABLED;
-import static org.forgerock.opendj.ldap.LDAPListener.*;
+import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
 import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.forgerock.opendj.ldap.ConnectionFactory;
 import org.forgerock.opendj.ldap.Connections;
-import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.LDAPClientContext;
 import org.forgerock.opendj.ldap.LDAPConnectionFactory;
 import org.forgerock.opendj.ldap.LDAPListener;
+import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.RequestContext;
 import org.forgerock.opendj.ldap.RequestHandlerFactory;
 import org.forgerock.opendj.ldap.ServerConnectionFactory;
@@ -61,33 +62,44 @@
  * This example takes the following command line parameters:
  *
  * <pre>
- *     {@code <listenAddress> <listenPort> <proxyDN> <proxyPassword> <remoteAddress1> <remotePort1>
- *      [<remoteAddress2> <remotePort2> ...]}
+ *     {@code [--load-balancer <mode>] <listenAddress> <listenPort> <proxyDN> <proxyPassword>
+ *         <remoteAddress1> <remotePort1> [<remoteAddress2> <remotePort2> ...]}
  * </pre>
+ *
+ * Where {@code <mode>} is one of "round-robin", "fail-over", or "sharded". The default is round-robin.
  */
 public final class Proxy {
     /**
      * Main method.
      *
      * @param args
-     *            The command line arguments: listen address, listen port,
+     *            The command line arguments: [--load-balancer <mode>] listen address, listen port,
      *            remote address1, remote port1, remote address2, remote port2,
      *            ...
      */
     public static void main(final String[] args) {
         if (args.length < 6 || args.length % 2 != 0) {
-            System.err.println("Usage: listenAddress listenPort "
-                    + "proxyDN proxyPassword remoteAddress1 remotePort1 "
-                    + "remoteAddress2 remotePort2 ...");
+            System.err.println("Usage: [--load-balancer <mode>] listenAddress listenPort "
+                    + "proxyDN proxyPassword remoteAddress1 remotePort1 remoteAddress2 remotePort2 ...");
             System.exit(1);
         }
 
         // Parse command line arguments.
-        final String localAddress = args[0];
-        final int localPort = Integer.parseInt(args[1]);
+        int i = 0;
 
-        final String proxyDN = args[2];
-        final String proxyPassword = args[3];
+        final LoadBalancingAlgorithm algorithm;
+        if ("--load-balancer".equals(args[i])) {
+            algorithm = getLoadBalancingAlgorithm(args[i + 1]);
+            i += 2;
+        } else {
+            algorithm = LoadBalancingAlgorithm.ROUND_ROBIN;
+        }
+
+        final String localAddress = args[i++];
+        final int localPort = Integer.parseInt(args[i++]);
+
+        final String proxyDN = args[i++];
+        final String proxyPassword = args[i++];
 
         // Create load balancer.
         // --- JCite pools ---
@@ -100,7 +112,7 @@
         final List<ConnectionFactory> bindFactories = new LinkedList<>();
         final Options bindFactoryOptions = Options.defaultOptions().set(HEARTBEAT_ENABLED, true);
 
-        for (int i = 4; i < args.length; i += 2) {
+        for (; i < args.length; i += 2) {
             final String remoteAddress = args[i];
             final int remotePort = Integer.parseInt(args[i + 1]);
 
@@ -114,10 +126,8 @@
         }
         // --- JCite pools ---
 
-        // --- JCite load balancer ---
-        final ConnectionFactory factory = Connections.newRoundRobinLoadBalancer(factories, factoryOptions);
-        final ConnectionFactory bindFactory = Connections.newRoundRobinLoadBalancer(bindFactories, bindFactoryOptions);
-        // --- JCite load balancer ---
+        final ConnectionFactory factory = algorithm.newLoadBalancer(factories, factoryOptions);
+        final ConnectionFactory bindFactory = algorithm.newLoadBalancer(bindFactories, bindFactoryOptions);
 
         // --- JCite backend ---
         /*
@@ -156,6 +166,47 @@
         // --- JCite listener ---
     }
 
+    private static LoadBalancingAlgorithm getLoadBalancingAlgorithm(final String algorithmName) {
+        switch (algorithmName) {
+        case "round-robin":
+            return LoadBalancingAlgorithm.ROUND_ROBIN;
+        case "fail-over":
+            return LoadBalancingAlgorithm.FAIL_OVER;
+        case "sharded":
+            return LoadBalancingAlgorithm.SHARDED;
+        default:
+            System.err.println("Unrecognized load-balancing algorithm '" + algorithmName + "'. Should be one of "
+                                       + "'round-robin', 'fail-over', or 'sharded'.");
+            System.exit(1);
+        }
+        return LoadBalancingAlgorithm.ROUND_ROBIN; // keep compiler happy.
+    }
+
+    private enum LoadBalancingAlgorithm {
+        ROUND_ROBIN {
+            @Override
+            ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
+                // --- JCite load balancer ---
+                return Connections.newRoundRobinLoadBalancer(factories, options);
+                // --- JCite load balancer ---
+            }
+        },
+        FAIL_OVER {
+            @Override
+            ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
+                return Connections.newFailoverLoadBalancer(factories, options);
+            }
+        },
+        SHARDED {
+            @Override
+            ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
+                return Connections.newShardedRequestLoadBalancer(factories, options);
+            }
+        };
+
+        abstract ConnectionFactory newLoadBalancer(Collection<ConnectionFactory> factories, Options options);
+    }
+
     private Proxy() {
         // Not used.
     }

--
Gitblit v1.10.0