mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
10.29.2016 6af1b61dec13bfbdd371672ef1e37eae63a079f7
OPENDJSDK-16: implement sharded request load-balancer

Added RequestLoadBalancer as common base class for for request based
load-balancers. Sharded load-balancer is implemented in Connections via
a static factory method.
2 files added
4 files modified
1551 ■■■■■ changed files
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java 134 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java 7 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java 260 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java 104 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java 959 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java 87 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -34,12 +34,28 @@
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
import org.forgerock.opendj.ldap.requests.GSSAPISASLBindRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
import org.forgerock.opendj.ldap.requests.Request;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
import org.forgerock.util.Function;
import org.forgerock.util.Option;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import org.forgerock.util.promise.NeverThrowsException;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.time.Duration;
@@ -413,6 +429,7 @@
     * @param options
     *         This configuration options for the load-balancer.
     * @return The new round-robin load balancer.
     * @see #newShardedRequestLoadBalancer(Collection, Options)
     * @see #newFailoverLoadBalancer(Collection, Options)
     * @see #LOAD_BALANCER_EVENT_LISTENER
     * @see #LOAD_BALANCER_MONITORING_INTERVAL
@@ -480,6 +497,7 @@
     *         This configuration options for the load-balancer.
     * @return The new fail-over load balancer.
     * @see #newRoundRobinLoadBalancer(Collection, Options)
     * @see #newShardedRequestLoadBalancer(Collection, Options)
     * @see #LOAD_BALANCER_EVENT_LISTENER
     * @see #LOAD_BALANCER_MONITORING_INTERVAL
     * @see #LOAD_BALANCER_SCHEDULER
@@ -496,6 +514,122 @@
    }
    /**
     * Creates a new "sharded" load-balancer which will load-balance individual requests across the provided set of
     * connection factories, each typically representing a single replica, using an algorithm that ensures that requests
     * targeting a given DN will always be routed to the same replica. In other words, this load-balancer increases
     * consistency whilst maintaining read-scalability by simulating a "single master" replication topology, where each
     * replica is responsible for a subset of the entries. When a replica is unavailable the load-balancer "fails over"
     * by performing a linear probe in order to find the next available replica thus ensuring high-availability when a
     * network partition occurs while sacrificing consistency, since the unavailable replica may still be visible to
     * other clients.
     * <p/>
     * This load-balancer distributes requests based on the hash of their target DN and handles all core operations, as
     * well as any password modify extended requests and SASL bind requests which use authentication IDs having the
     * "dn:" form. Note that subtree operations (searches, subtree deletes, and modify DN) are likely to include entries
     * which are "mastered" on different replicas, so client applications should be more tolerant of inconsistencies.
     * Requests that are either unrecognized or that do not have a parameter that may be considered to be a target DN
     * will be routed randomly.
     * <p/>
     * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each
     * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored
     * since they cannot be routed; connection event listeners can be registered, but will only be notified when the
     * fake connection is closed or when all of the connection factories are unavailable.
     * <p/>
     * <b>NOTE:</b> in deployments where there are multiple client applications, care should be taken to ensure that
     * the factories are configured using the same ordering, otherwise requests will not be routed consistently
     * across the client applications.
     * <p/>
     * The implementation periodically attempts to connect to failed connection factories in order to determine if they
     * have become available again.
     *
     * @param factories
     *         The connection factories.
     * @param options
     *         This configuration options for the load-balancer.
     * @return The new affinity load balancer.
     * @see #newRoundRobinLoadBalancer(Collection, Options)
     * @see #newFailoverLoadBalancer(Collection, Options)
     * @see #LOAD_BALANCER_EVENT_LISTENER
     * @see #LOAD_BALANCER_MONITORING_INTERVAL
     * @see #LOAD_BALANCER_SCHEDULER
     */
    public static ConnectionFactory newShardedRequestLoadBalancer(
            final Collection<? extends ConnectionFactory> factories, final Options options) {
        return new RequestLoadBalancer("ShardedRequestLoadBalancer",
                                       factories,
                                       options,
                                       newShardedRequestLoadBalancerFunction(factories));
    }
    // Package private for testing.
    static Function<Request, Integer, NeverThrowsException> newShardedRequestLoadBalancerFunction(
            final Collection<? extends ConnectionFactory> factories) {
        return new Function<Request, Integer, NeverThrowsException>() {
            private final int maxIndex = factories.size();
            @Override
            public Integer apply(final Request request) {
                // Normalize the hash to a valid factory index, taking care of negative hash values and especially
                // Integer.MIN_VALUE (see doc for Math.abs()).
                final int index = computeIndexBasedOnDnHashCode(request);
                return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex);
            }
            private int computeIndexBasedOnDnHashCode(final Request request) {
                // The following conditions are ordered such that the most common operations appear first in order to
                // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
                // would only apply to the core operations, not extended operations or SASL binds.
                if (request instanceof SearchRequest) {
                    return ((SearchRequest) request).getName().hashCode();
                } else if (request instanceof ModifyRequest) {
                    return ((ModifyRequest) request).getName().hashCode();
                } else if (request instanceof SimpleBindRequest) {
                    return hashCodeOfDnString(((SimpleBindRequest) request).getName());
                } else if (request instanceof AddRequest) {
                    return ((AddRequest) request).getName().hashCode();
                } else if (request instanceof DeleteRequest) {
                    return ((DeleteRequest) request).getName().hashCode();
                } else if (request instanceof CompareRequest) {
                    return ((CompareRequest) request).getName().hashCode();
                } else if (request instanceof ModifyDNRequest) {
                    return ((ModifyDNRequest) request).getName().hashCode();
                } else if (request instanceof PasswordModifyExtendedRequest) {
                    return hashCodeOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
                } else if (request instanceof PlainSASLBindRequest) {
                    return hashCodeOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
                } else if (request instanceof DigestMD5SASLBindRequest) {
                    return hashCodeOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
                } else if (request instanceof GSSAPISASLBindRequest) {
                    return hashCodeOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
                } else if (request instanceof CRAMMD5SASLBindRequest) {
                    return hashCodeOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
                } else {
                    return distributeRequestAtRandom();
                }
            }
            private int hashCodeOfAuthzid(final String authzid) {
                if (authzid != null && authzid.startsWith("dn:")) {
                    return hashCodeOfDnString(authzid.substring(3));
                }
                return distributeRequestAtRandom();
            }
            private int hashCodeOfDnString(final String dnString) {
                try {
                    return DN.valueOf(dnString).hashCode();
                } catch (final IllegalArgumentException ignored) {
                    return distributeRequestAtRandom();
                }
            }
            private int distributeRequestAtRandom() {
                return ThreadLocalRandom.current().nextInt(0, maxIndex);
            }
        };
    }
    /**
     * Creates a new connection factory which forwards connection requests to
     * the provided factory, but whose {@code toString} method will always
     * return {@code name}.
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/LoadBalancer.java
@@ -278,16 +278,13 @@
    /**
     * Return the first available connection factory starting from {@code initialIndex}.
     *
     * @param initialIndex The index of the connection factory to be returned if operational. The index may be
     *                     greater than the number of factories in which case this method will perform a modulus
     *                     operation to bring it into range. NOTE: for performance reasons callers should attempt to
     *                     use valid indexes because the modulus operation is relatively expensive.
     * @param initialIndex The index of the connection factory to be returned if operational.
     * @return The first available connection factory starting from the initial index.
     * @throws LdapException If no connection factories are available.
     */
    final ConnectionFactory getMonitoredConnectionFactory(final int initialIndex) throws LdapException {
        final int maxIndex = monitoredFactories.size();
        int index = initialIndex < maxIndex ? initialIndex : initialIndex % maxIndex;
        int index = initialIndex;
        do {
            final MonitoredConnectionFactory factory = monitoredFactories.get(index);
            if (factory.isOperational.get()) {
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
New file
@@ -0,0 +1,260 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static org.forgerock.opendj.ldap.spi.LdapPromises.newFailedLdapPromise;
import static org.forgerock.util.Utils.closeSilently;
import static org.forgerock.util.promise.Promises.newResultPromise;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.Request;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.spi.ConnectionState;
import org.forgerock.opendj.ldap.spi.LdapPromises;
import org.forgerock.util.AsyncFunction;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.promise.ExceptionHandler;
import org.forgerock.util.promise.NeverThrowsException;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.promise.ResultHandler;
/**
 * A request based load balancer which load balances individual requests based on properties of the request, such as
 * the target DN.
 * <p>
 * Implementations should override the method {@code getInitialConnectionFactoryIndex()} in order to provide the policy
 * for selecting the first connection factory to use for each request.
 */
final class RequestLoadBalancer extends LoadBalancer {
    /**
     * A function which returns the index of the first connection factory which should be used in order to satisfy the
     * next request. Implementations may base the decision on properties of the provided request, such as the target DN,
     * whether the request is a read or update request, etc.
     */
    private final Function<Request, Integer, NeverThrowsException> nextFactoryFunction;
    RequestLoadBalancer(final String loadBalancerName,
                        final Collection<? extends ConnectionFactory> factories,
                        final Options options,
                        final Function<Request, Integer, NeverThrowsException> nextFactoryFunction) {
        super(loadBalancerName, factories, options);
        this.nextFactoryFunction = nextFactoryFunction;
    }
    @Override
    public final Connection getConnection() throws LdapException {
        return new ConnectionImpl();
    }
    @Override
    public final Promise<Connection, LdapException> getConnectionAsync() {
        return newResultPromise((Connection) new ConnectionImpl());
    }
    private class ConnectionImpl extends AbstractAsynchronousConnection {
        private final ConnectionState state = new ConnectionState();
        @Override
        public String toString() {
            return getLoadBalancerName() + "Connection";
        }
        @Override
        public LdapPromise<Void> abandonAsync(final AbandonRequest request) {
            // We cannot possibly route these correctly, so just drop them.
            return LdapPromises.newSuccessfulLdapPromise(null);
        }
        @Override
        public LdapPromise<Result> addAsync(
                final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.addAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public void addConnectionEventListener(final ConnectionEventListener listener) {
            state.addConnectionEventListener(listener);
        }
        @Override
        public LdapPromise<BindResult> bindAsync(
                final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, BindResult, LdapException>() {
                @Override
                public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.bindAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public void close(final UnbindRequest request, final String reason) {
            state.notifyConnectionClosed();
        }
        @Override
        public LdapPromise<CompareResult> compareAsync(
                final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, CompareResult, LdapException>() {
                @Override
                public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.compareAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public LdapPromise<Result> deleteAsync(
                final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.deleteAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(
                final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) {
            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, R, LdapException>() {
                @Override
                public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.extendedRequestAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public boolean isClosed() {
            return state.isClosed();
        }
        @Override
        public boolean isValid() {
            return state.isValid();
        }
        @Override
        public LdapPromise<Result> modifyAsync(
                final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.modifyAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public LdapPromise<Result> modifyDNAsync(
                final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.modifyDNAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public void removeConnectionEventListener(final ConnectionEventListener listener) {
            state.removeConnectionEventListener(listener);
        }
        @Override
        public LdapPromise<Result> searchAsync(
                final SearchRequest request,
                final IntermediateResponseHandler intermediateResponseHandler,
                final SearchResultHandler entryHandler) {
            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.searchAsync(request, intermediateResponseHandler, entryHandler);
                }
            });
        }
        private <R> LdapPromise<R> getConnectionAndSendRequest(
                final Request request, final AsyncFunction<Connection, R, LdapException> sendRequest) {
            if (state.isClosed()) {
                throw new IllegalStateException();
            }
            final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
            return getConnectionAsync(request)
                    .thenOnResult(new ResultHandler<Connection>() {
                        @Override
                        public void handleResult(final Connection connection) {
                            connectionHolder.set(connection);
                        }
                    })
                    .thenAsync(sendRequest)
                    .thenFinally(new Runnable() {
                        @Override
                        public void run() {
                            closeSilently(connectionHolder.get());
                        }
                    });
        }
        private LdapPromise<Connection> getConnectionAsync(final Request request) {
            try {
                final int index = nextFactoryFunction.apply(request);
                final ConnectionFactory factory = getMonitoredConnectionFactory(index);
                return LdapPromises.asPromise(factory.getConnectionAsync()
                                                     .thenOnException(new ExceptionHandler<LdapException>() {
                                                         @Override
                                                         public void handleException(final LdapException e) {
                                                             state.notifyConnectionError(false, e);
                                                         }
                                                     }));
            } catch (final LdapException e) {
                state.notifyConnectionError(false, e);
                return newFailedLdapPromise(e);
            }
        }
    }
}
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
@@ -21,14 +21,34 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2016 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.forgerock.opendj.ldap.Connections.newShardedRequestLoadBalancerFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
import org.forgerock.opendj.ldap.requests.GSSAPISASLBindRequest;
import org.forgerock.opendj.ldap.requests.GenericExtendedRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
import org.forgerock.opendj.ldap.requests.Request;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
import org.forgerock.util.Function;
import org.forgerock.util.promise.NeverThrowsException;
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
@@ -57,4 +77,86 @@
        uncloseable.close(null, null);
        verifyZeroInteractions(connection);
    }
    @Test
    public void shardedRequestLoadBalancerUsesConsistentIndexing() {
        final Function<Request, Integer, NeverThrowsException> f =
                newShardedRequestLoadBalancerFunction(asList(mock(ConnectionFactory.class),
                                                             mock(ConnectionFactory.class)));
        // These two DNs have a different hash code.
        final DN dn1 = DN.valueOf("cn=target1,dc=example,dc=com");
        final DN dn2 = DN.valueOf("cn=target2,dc=example,dc=com");
        final AddRequest addRequest = mock(AddRequest.class);
        when(addRequest.getName()).thenReturn(dn1, dn2);
        final int dn1index = index(f, addRequest);
        final int dn2index = index(f, addRequest);
        assertThat(dn1index).isNotEqualTo(dn2index);
        final SimpleBindRequest simpleBindRequest = mock(SimpleBindRequest.class);
        when(simpleBindRequest.getName()).thenReturn(dn1.toString(), dn2.toString());
        assertRequestsAreRoutedConsistently(f, simpleBindRequest, dn1index, dn2index);
        final CompareRequest compareRequest = mock(CompareRequest.class);
        when(compareRequest.getName()).thenReturn(dn1, dn2);
        assertRequestsAreRoutedConsistently(f, compareRequest, dn1index, dn2index);
        final DeleteRequest deleteRequest = mock(DeleteRequest.class);
        when(deleteRequest.getName()).thenReturn(dn1, dn2);
        assertRequestsAreRoutedConsistently(f, deleteRequest, dn1index, dn2index);
        final ModifyRequest modifyRequest = mock(ModifyRequest.class);
        when(modifyRequest.getName()).thenReturn(dn1, dn2);
        assertRequestsAreRoutedConsistently(f, modifyRequest, dn1index, dn2index);
        final ModifyDNRequest modifyDNRequest = mock(ModifyDNRequest.class);
        when(modifyDNRequest.getName()).thenReturn(dn1, dn2);
        assertRequestsAreRoutedConsistently(f, modifyDNRequest, dn1index, dn2index);
        final SearchRequest searchRequest = mock(SearchRequest.class);
        when(searchRequest.getName()).thenReturn(dn1, dn2);
        assertRequestsAreRoutedConsistently(f, searchRequest, dn1index, dn2index);
        // Authzid based operations.
        final String authzid1 = "dn:" + dn1.toString();
        final String authzid2 = "dn:" + dn2.toString();
        final PasswordModifyExtendedRequest passwordModifyRequest = mock(PasswordModifyExtendedRequest.class);
        when(passwordModifyRequest.getUserIdentityAsString()).thenReturn(authzid1, authzid2);
        assertRequestsAreRoutedConsistently(f, passwordModifyRequest, dn1index, dn2index);
        final PlainSASLBindRequest plainSASLBindRequest = mock(PlainSASLBindRequest.class);
        when(plainSASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
        assertRequestsAreRoutedConsistently(f, plainSASLBindRequest, dn1index, dn2index);
        final CRAMMD5SASLBindRequest cramMD5SASLBindRequest = mock(CRAMMD5SASLBindRequest.class);
        when(cramMD5SASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
        assertRequestsAreRoutedConsistently(f, cramMD5SASLBindRequest, dn1index, dn2index);
        final DigestMD5SASLBindRequest digestMD5SASLBindRequest = mock(DigestMD5SASLBindRequest.class);
        when(digestMD5SASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
        assertRequestsAreRoutedConsistently(f, digestMD5SASLBindRequest, dn1index, dn2index);
        final GSSAPISASLBindRequest gssapiSASLBindRequest = mock(GSSAPISASLBindRequest.class);
        when(gssapiSASLBindRequest.getAuthenticationID()).thenReturn(authzid1, authzid2);
        assertRequestsAreRoutedConsistently(f, gssapiSASLBindRequest, dn1index, dn2index);
        // Requests that have no target will return a random index, but since we only have one factory the index will
        // always be 0.
        final GenericExtendedRequest genericExtendedRequest = mock(GenericExtendedRequest.class);
        assertThat(index(f, genericExtendedRequest)).isBetween(0, 1);
    }
    private void assertRequestsAreRoutedConsistently(final Function<Request, Integer, NeverThrowsException> f,
                                                     final Request r,
                                                     final int firstExpectedIndex,
                                                     final int secondExpectedIndex) {
        assertThat(index(f, r)).isEqualTo(firstExpectedIndex);
        assertThat(index(f, r)).isEqualTo(secondExpectedIndex);
    }
    private int index(final Function<Request, Integer, NeverThrowsException> function, final Request request) {
        return function.apply(request);
    }
}
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
New file
@@ -0,0 +1,959 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_CONNECT_ERROR;
import static org.forgerock.opendj.ldap.responses.Responses.newBindResult;
import static org.forgerock.opendj.ldap.responses.Responses.newCompareResult;
import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult;
import static org.forgerock.opendj.ldap.responses.Responses.newResult;
import static org.forgerock.opendj.ldap.spi.LdapPromises.newSuccessfulLdapPromise;
import static org.forgerock.util.Options.defaultOptions;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNotNull;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.GenericExtendedRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.Request;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.util.Function;
import org.forgerock.util.promise.NeverThrowsException;
import org.forgerock.util.promise.Promises;
import org.mockito.Mock;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
public class RequestLoadBalancerTestCase extends SdkTestCase {
    @Test
    public void closeLoadBalancerShouldCloseDelegateFactories() throws Exception {
        configureAllFactoriesOnline();
        loadBalancer.close();
        verify(factory1).close();
        verify(factory2).close();
        verify(factory3).close();
    }
    @Test
    public void getConnectionShouldNotInvokeDelegateFactory() throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnection()) {
            assertThat(connection).isNotNull();
            verifyZeroInteractions(factory1, factory2, factory3);
        }
        verifyZeroInteractions(factory1, factory2, factory3);
        loadBalancer.close();
    }
    @Test
    public void getConnectionReturnANewConnectionEachTime() throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection1 = loadBalancer.getConnection();
             Connection connection2 = loadBalancer.getConnection()) {
            assertThat(connection1).isNotSameAs(connection2);
        }
        loadBalancer.close();
    }
    @Test
    public void getConnectionAsyncReturnANewConnectionEachTime() throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection1 = loadBalancer.getConnectionAsync().get();
             Connection connection2 = loadBalancer.getConnectionAsync().get()) {
            assertThat(connection1).isNotSameAs(connection2);
        }
        loadBalancer.close();
    }
    @Test
    public void getConnectionAsyncShouldNotInvokeDelegateFactory() throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            assertThat(connection).isNotNull();
            verifyZeroInteractions(factory1, factory2, factory3);
        }
        verifyZeroInteractions(factory1, factory2, factory3);
    }
    @Test
    public void connectionEventListenersNotifiedOnClose() throws Exception {
        configureAllFactoriesOnline();
        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.addConnectionEventListener(listener);
        }
        verify(listener).handleConnectionClosed();
    }
    @Test
    public void connectionEventListenersNotifiedOnError() throws Exception {
        configureAllFactoriesOffline();
        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.addConnectionEventListener(listener);
            try {
                connection.add(addRequest1);
                fail("add unexpectedly succeeded");
            } catch (LdapException ignored) {
                // Ignore.
            }
        }
        verify(listener).handleConnectionError(eq(false), any(LdapException.class));
        verify(listener).handleConnectionClosed();
    }
    @Test
    public void removedConnectionEventListenersShouldNotBeNotified() throws Exception {
        configureAllFactoriesOnline();
        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.addConnectionEventListener(listener);
            connection.removeConnectionEventListener(listener);
        }
        verifyZeroInteractions(listener);
    }
    @Test
    public void validAndClosedStateShouldBeMaintained() throws Exception {
        configureAllFactoriesOffline();
        final Connection connection = loadBalancer.getConnectionAsync().get();
        assertThat(connection.isValid()).isTrue();
        assertThat(connection.isClosed()).isFalse();
        try {
            connection.add(addRequest1);
            fail("add unexpectedly succeeded");
        } catch (LdapException ignored) {
            // Ignore.
        }
        assertThat(connection.isValid()).isFalse();
        assertThat(connection.isClosed()).isFalse();
        connection.close();
        assertThat(connection.isValid()).isFalse();
        assertThat(connection.isClosed()).isTrue();
    }
    @Test
    public void factoryToStringShouldReturnANonEmptyString() throws Exception {
        configureAllFactoriesOffline();
        assertThat(loadBalancer.toString()).isNotEmpty();
    }
    @Test
    public void connectionToStringShouldReturnANonEmptyString() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            assertThat(connection.toString()).isNotEmpty();
        }
    }
    @Test
    public void abandonRequestShouldBeIgnored() throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.abandonAsync(mock(AbandonRequest.class));
        }
        verifyZeroInteractions(factory1, factory2, factory3);
    }
    // We can't use a DataProviders here because the mocks will be re-initialized for each test method call.
    // ################## Add Requests ####################
    @Test
    public void addRequestsShouldBeRoutedCorrectly1() throws Exception {
        addRequestsShouldBeRoutedCorrectlyImpl(addRequest1, factory1, connection1);
    }
    @Test
    public void addRequestsShouldBeRoutedCorrectly2() throws Exception {
        addRequestsShouldBeRoutedCorrectlyImpl(addRequest2, factory2, connection2);
    }
    @Test
    public void addRequestsShouldBeRoutedCorrectly3() throws Exception {
        addRequestsShouldBeRoutedCorrectlyImpl(addRequest3, factory3, connection3);
    }
    private void addRequestsShouldBeRoutedCorrectlyImpl(final AddRequest addRequest,
                                                        final ConnectionFactory expectedFactory,
                                                        final Connection expectedConnection) throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.add(addRequest);
        }
        verify(expectedFactory).getConnectionAsync();
        verify(expectedConnection).addAsync(same(addRequest), isNull(IntermediateResponseHandler.class));
        verify(expectedConnection).close();
        verifyZeroInteractionsForRemainingFactories(expectedFactory);
    }
    @Test
    public void addRequestsShouldLinearProbeOnFailure1() throws Exception {
        addRequestsShouldLinearProbeOnFailureImpl(addRequest1);
    }
    @Test
    public void addRequestsShouldLinearProbeOnFailure2() throws Exception {
        addRequestsShouldLinearProbeOnFailureImpl(addRequest2);
    }
    @Test
    public void addRequestsShouldLinearProbeOnFailure3() throws Exception {
        addRequestsShouldLinearProbeOnFailureImpl(addRequest3);
    }
    private void addRequestsShouldLinearProbeOnFailureImpl(final AddRequest addRequest) throws Exception {
        configureFactoriesOneAndTwoOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.add(addRequest);
        }
        verify(factory3).getConnectionAsync();
        verify(connection3).addAsync(same(addRequest), isNull(IntermediateResponseHandler.class));
        verify(connection3).close();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
    }
    @Test
    public void addRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            try {
                connection.add(addRequest1);
            } catch (ConnectionException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
            }
        }
        verify(factory1, atLeastOnce()).getConnectionAsync();
        verify(factory2, atLeastOnce()).getConnectionAsync();
        verify(factory3, atLeastOnce()).getConnectionAsync();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
        verifyZeroInteractions(connection3);
    }
    // ################## Bind Requests ####################
    @Test
    public void bindRequestsShouldBeRoutedCorrectly1() throws Exception {
        bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest1, factory1, connection1);
    }
    @Test
    public void bindRequestsShouldBeRoutedCorrectly2() throws Exception {
        bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest2, factory2, connection2);
    }
    @Test
    public void bindRequestsShouldBeRoutedCorrectly3() throws Exception {
        bindRequestsShouldBeRoutedCorrectlyImpl(bindRequest3, factory3, connection3);
    }
    private void bindRequestsShouldBeRoutedCorrectlyImpl(final BindRequest bindRequest,
                                                         final ConnectionFactory expectedFactory,
                                                         final Connection expectedConnection) throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.bind(bindRequest);
        }
        verify(expectedFactory).getConnectionAsync();
        verify(expectedConnection).bindAsync(same(bindRequest), isNull(IntermediateResponseHandler.class));
        verify(expectedConnection).close();
        verifyZeroInteractionsForRemainingFactories(expectedFactory);
    }
    @Test
    public void bindRequestsShouldLinearProbeOnFailure1() throws Exception {
        bindRequestsShouldLinearProbeOnFailureImpl(bindRequest1);
    }
    @Test
    public void bindRequestsShouldLinearProbeOnFailure2() throws Exception {
        bindRequestsShouldLinearProbeOnFailureImpl(bindRequest2);
    }
    @Test
    public void bindRequestsShouldLinearProbeOnFailure3() throws Exception {
        bindRequestsShouldLinearProbeOnFailureImpl(bindRequest3);
    }
    private void bindRequestsShouldLinearProbeOnFailureImpl(final BindRequest bindRequest) throws Exception {
        configureFactoriesOneAndTwoOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.bind(bindRequest);
        }
        verify(factory3).getConnectionAsync();
        verify(connection3).bindAsync(same(bindRequest), isNull(IntermediateResponseHandler.class));
        verify(connection3).close();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
    }
    @Test
    public void bindRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            try {
                connection.bind(bindRequest1);
            } catch (ConnectionException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
            }
        }
        verify(factory1, atLeastOnce()).getConnectionAsync();
        verify(factory2, atLeastOnce()).getConnectionAsync();
        verify(factory3, atLeastOnce()).getConnectionAsync();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
        verifyZeroInteractions(connection3);
    }
    // ################## Compare Requests ####################
    @Test
    public void compareRequestsShouldBeRoutedCorrectly1() throws Exception {
        compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest1, factory1, connection1);
    }
    @Test
    public void compareRequestsShouldBeRoutedCorrectly2() throws Exception {
        compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest2, factory2, connection2);
    }
    @Test
    public void compareRequestsShouldBeRoutedCorrectly3() throws Exception {
        compareRequestsShouldBeRoutedCorrectlyImpl(compareRequest3, factory3, connection3);
    }
    private void compareRequestsShouldBeRoutedCorrectlyImpl(final CompareRequest compareRequest,
                                                            final ConnectionFactory expectedFactory,
                                                            final Connection expectedConnection) throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.compare(compareRequest);
        }
        verify(expectedFactory).getConnectionAsync();
        verify(expectedConnection).compareAsync(same(compareRequest), isNull(IntermediateResponseHandler.class));
        verify(expectedConnection).close();
        verifyZeroInteractionsForRemainingFactories(expectedFactory);
    }
    @Test
    public void compareRequestsShouldLinearProbeOnFailure1() throws Exception {
        compareRequestsShouldLinearProbeOnFailureImpl(compareRequest1);
    }
    @Test
    public void compareRequestsShouldLinearProbeOnFailure2() throws Exception {
        compareRequestsShouldLinearProbeOnFailureImpl(compareRequest2);
    }
    @Test
    public void compareRequestsShouldLinearProbeOnFailure3() throws Exception {
        compareRequestsShouldLinearProbeOnFailureImpl(compareRequest3);
    }
    private void compareRequestsShouldLinearProbeOnFailureImpl(final CompareRequest compareRequest) throws Exception {
        configureFactoriesOneAndTwoOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.compare(compareRequest);
        }
        verify(factory3).getConnectionAsync();
        verify(connection3).compareAsync(same(compareRequest), isNull(IntermediateResponseHandler.class));
        verify(connection3).close();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
    }
    @Test
    public void compareRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            try {
                connection.compare(compareRequest1);
            } catch (ConnectionException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
            }
        }
        verify(factory1, atLeastOnce()).getConnectionAsync();
        verify(factory2, atLeastOnce()).getConnectionAsync();
        verify(factory3, atLeastOnce()).getConnectionAsync();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
        verifyZeroInteractions(connection3);
    }
    // ################## Delete Requests ####################
    @Test
    public void deleteRequestsShouldBeRoutedCorrectly1() throws Exception {
        deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest1, factory1, connection1);
    }
    @Test
    public void deleteRequestsShouldBeRoutedCorrectly2() throws Exception {
        deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest2, factory2, connection2);
    }
    @Test
    public void deleteRequestsShouldBeRoutedCorrectly3() throws Exception {
        deleteRequestsShouldBeRoutedCorrectlyImpl(deleteRequest3, factory3, connection3);
    }
    private void deleteRequestsShouldBeRoutedCorrectlyImpl(final DeleteRequest deleteRequest,
                                                           final ConnectionFactory expectedFactory,
                                                           final Connection expectedConnection) throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.delete(deleteRequest);
        }
        verify(expectedFactory).getConnectionAsync();
        verify(expectedConnection).deleteAsync(same(deleteRequest), isNull(IntermediateResponseHandler.class));
        verify(expectedConnection).close();
        verifyZeroInteractionsForRemainingFactories(expectedFactory);
    }
    @Test
    public void deleteRequestsShouldLinearProbeOnFailure1() throws Exception {
        deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest1);
    }
    @Test
    public void deleteRequestsShouldLinearProbeOnFailure2() throws Exception {
        deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest2);
    }
    @Test
    public void deleteRequestsShouldLinearProbeOnFailure3() throws Exception {
        deleteRequestsShouldLinearProbeOnFailureImpl(deleteRequest3);
    }
    private void deleteRequestsShouldLinearProbeOnFailureImpl(final DeleteRequest deleteRequest) throws Exception {
        configureFactoriesOneAndTwoOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.delete(deleteRequest);
        }
        verify(factory3).getConnectionAsync();
        verify(connection3).deleteAsync(same(deleteRequest), isNull(IntermediateResponseHandler.class));
        verify(connection3).close();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
    }
    @Test
    public void deleteRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            try {
                connection.delete(deleteRequest1);
            } catch (ConnectionException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
            }
        }
        verify(factory1, atLeastOnce()).getConnectionAsync();
        verify(factory2, atLeastOnce()).getConnectionAsync();
        verify(factory3, atLeastOnce()).getConnectionAsync();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
        verifyZeroInteractions(connection3);
    }
    // ################## Extended Requests ####################
    @Test
    public void extendedRequestsShouldBeRoutedCorrectly1() throws Exception {
        extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest1, factory1, connection1);
    }
    @Test
    public void extendedRequestsShouldBeRoutedCorrectly2() throws Exception {
        extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest2, factory2, connection2);
    }
    @Test
    public void extendedRequestsShouldBeRoutedCorrectly3() throws Exception {
        extendedRequestsShouldBeRoutedCorrectlyImpl(extendedRequest3, factory3, connection3);
    }
    private void extendedRequestsShouldBeRoutedCorrectlyImpl(final ExtendedRequest<?> extendedRequest,
                                                             final ConnectionFactory expectedFactory,
                                                             final Connection expectedConnection) throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.extendedRequest(extendedRequest);
        }
        verify(expectedFactory).getConnectionAsync();
        verify(expectedConnection).extendedRequestAsync(same(extendedRequest),
                                                        isNull(IntermediateResponseHandler.class));
        verify(expectedConnection).close();
        verifyZeroInteractionsForRemainingFactories(expectedFactory);
    }
    @Test
    public void extendedRequestsShouldLinearProbeOnFailure1() throws Exception {
        extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest1);
    }
    @Test
    public void extendedRequestsShouldLinearProbeOnFailure2() throws Exception {
        extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest2);
    }
    @Test
    public void extendedRequestsShouldLinearProbeOnFailure3() throws Exception {
        extendedRequestsShouldLinearProbeOnFailureImpl(extendedRequest3);
    }
    private void extendedRequestsShouldLinearProbeOnFailureImpl(
            final ExtendedRequest<?> extendedRequest) throws Exception {
        configureFactoriesOneAndTwoOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.extendedRequest(extendedRequest);
        }
        verify(factory3).getConnectionAsync();
        verify(connection3).extendedRequestAsync(same(extendedRequest), isNull(IntermediateResponseHandler.class));
        verify(connection3).close();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
    }
    @Test
    public void extendedRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            try {
                connection.extendedRequest(extendedRequest1);
            } catch (ConnectionException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
            }
        }
        verify(factory1, atLeastOnce()).getConnectionAsync();
        verify(factory2, atLeastOnce()).getConnectionAsync();
        verify(factory3, atLeastOnce()).getConnectionAsync();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
        verifyZeroInteractions(connection3);
    }
    // ################## Modify Requests ####################
    @Test
    public void modifyRequestsShouldBeRoutedCorrectly1() throws Exception {
        modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest1, factory1, connection1);
    }
    @Test
    public void modifyRequestsShouldBeRoutedCorrectly2() throws Exception {
        modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest2, factory2, connection2);
    }
    @Test
    public void modifyRequestsShouldBeRoutedCorrectly3() throws Exception {
        modifyRequestsShouldBeRoutedCorrectlyImpl(modifyRequest3, factory3, connection3);
    }
    private void modifyRequestsShouldBeRoutedCorrectlyImpl(final ModifyRequest modifyRequest,
                                                           final ConnectionFactory expectedFactory,
                                                           final Connection expectedConnection) throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.modify(modifyRequest);
        }
        verify(expectedFactory).getConnectionAsync();
        verify(expectedConnection).modifyAsync(same(modifyRequest), isNull(IntermediateResponseHandler.class));
        verify(expectedConnection).close();
        verifyZeroInteractionsForRemainingFactories(expectedFactory);
    }
    @Test
    public void modifyRequestsShouldLinearProbeOnFailure1() throws Exception {
        modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest1);
    }
    @Test
    public void modifyRequestsShouldLinearProbeOnFailure2() throws Exception {
        modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest2);
    }
    @Test
    public void modifyRequestsShouldLinearProbeOnFailure3() throws Exception {
        modifyRequestsShouldLinearProbeOnFailureImpl(modifyRequest3);
    }
    private void modifyRequestsShouldLinearProbeOnFailureImpl(final ModifyRequest modifyRequest) throws Exception {
        configureFactoriesOneAndTwoOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.modify(modifyRequest);
        }
        verify(factory3).getConnectionAsync();
        verify(connection3).modifyAsync(same(modifyRequest), isNull(IntermediateResponseHandler.class));
        verify(connection3).close();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
    }
    @Test
    public void modifyRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            try {
                connection.modify(modifyRequest1);
            } catch (ConnectionException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
            }
        }
        verify(factory1, atLeastOnce()).getConnectionAsync();
        verify(factory2, atLeastOnce()).getConnectionAsync();
        verify(factory3, atLeastOnce()).getConnectionAsync();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
        verifyZeroInteractions(connection3);
    }
    // ################## ModifyDN Requests ####################
    @Test
    public void modifyDNRequestsShouldBeRoutedCorrectly1() throws Exception {
        modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest1, factory1, connection1);
    }
    @Test
    public void modifyDNRequestsShouldBeRoutedCorrectly2() throws Exception {
        modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest2, factory2, connection2);
    }
    @Test
    public void modifyDNRequestsShouldBeRoutedCorrectly3() throws Exception {
        modifyDNRequestsShouldBeRoutedCorrectlyImpl(modifyDNRequest3, factory3, connection3);
    }
    private void modifyDNRequestsShouldBeRoutedCorrectlyImpl(final ModifyDNRequest modifyDNRequest,
                                                             final ConnectionFactory expectedFactory,
                                                             final Connection expectedConnection) throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.modifyDN(modifyDNRequest);
        }
        verify(expectedFactory).getConnectionAsync();
        verify(expectedConnection).modifyDNAsync(same(modifyDNRequest), isNull(IntermediateResponseHandler.class));
        verify(expectedConnection).close();
        verifyZeroInteractionsForRemainingFactories(expectedFactory);
    }
    @Test
    public void modifyDNRequestsShouldLinearProbeOnFailure1() throws Exception {
        modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest1);
    }
    @Test
    public void modifyDNRequestsShouldLinearProbeOnFailure2() throws Exception {
        modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest2);
    }
    @Test
    public void modifyDNRequestsShouldLinearProbeOnFailure3() throws Exception {
        modifyDNRequestsShouldLinearProbeOnFailureImpl(modifyDNRequest3);
    }
    private void modifyDNRequestsShouldLinearProbeOnFailureImpl(
            final ModifyDNRequest modifyDNRequest) throws Exception {
        configureFactoriesOneAndTwoOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.modifyDN(modifyDNRequest);
        }
        verify(factory3).getConnectionAsync();
        verify(connection3).modifyDNAsync(same(modifyDNRequest), isNull(IntermediateResponseHandler.class));
        verify(connection3).close();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
    }
    @Test
    public void modifyDNRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            try {
                connection.modifyDN(modifyDNRequest1);
            } catch (ConnectionException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
            }
        }
        verify(factory1, atLeastOnce()).getConnectionAsync();
        verify(factory2, atLeastOnce()).getConnectionAsync();
        verify(factory3, atLeastOnce()).getConnectionAsync();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
        verifyZeroInteractions(connection3);
    }
    // ################## Search Requests ####################
    @Test
    public void searchRequestsShouldBeRoutedCorrectly1() throws Exception {
        searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest1, factory1, connection1);
    }
    @Test
    public void searchRequestsShouldBeRoutedCorrectly2() throws Exception {
        searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest2, factory2, connection2);
    }
    @Test
    public void searchRequestsShouldBeRoutedCorrectly3() throws Exception {
        searchRequestsShouldBeRoutedCorrectlyImpl(searchRequest3, factory3, connection3);
    }
    private void searchRequestsShouldBeRoutedCorrectlyImpl(final SearchRequest searchRequest,
                                                           final ConnectionFactory expectedFactory,
                                                           final Connection expectedConnection) throws Exception {
        configureAllFactoriesOnline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.search(searchRequest, new ArrayList<>());
        }
        verify(expectedFactory).getConnectionAsync();
        verify(expectedConnection).searchAsync(same(searchRequest),
                                               isNull(IntermediateResponseHandler.class),
                                               isNotNull(SearchResultHandler.class));
        verify(expectedConnection).close();
        verifyZeroInteractionsForRemainingFactories(expectedFactory);
    }
    @Test
    public void searchRequestsShouldLinearProbeOnFailure1() throws Exception {
        searchRequestsShouldLinearProbeOnFailureImpl(searchRequest1);
    }
    @Test
    public void searchRequestsShouldLinearProbeOnFailure2() throws Exception {
        searchRequestsShouldLinearProbeOnFailureImpl(searchRequest2);
    }
    @Test
    public void searchRequestsShouldLinearProbeOnFailure3() throws Exception {
        searchRequestsShouldLinearProbeOnFailureImpl(searchRequest3);
    }
    private void searchRequestsShouldLinearProbeOnFailureImpl(final SearchRequest searchRequest) throws Exception {
        configureFactoriesOneAndTwoOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            connection.search(searchRequest, new ArrayList<>());
        }
        verify(factory3).getConnectionAsync();
        verify(connection3).searchAsync(same(searchRequest),
                                        isNull(IntermediateResponseHandler.class),
                                        isNotNull(SearchResultHandler.class));
        verify(connection3).close();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
    }
    @Test
    public void searchRequestsShouldFailWhenAllFactoriesOffline() throws Exception {
        configureAllFactoriesOffline();
        try (Connection connection = loadBalancer.getConnectionAsync().get()) {
            try {
                connection.search(searchRequest1, new ArrayList<>());
            } catch (ConnectionException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(CLIENT_SIDE_CONNECT_ERROR);
            }
        }
        verify(factory1, atLeastOnce()).getConnectionAsync();
        verify(factory2, atLeastOnce()).getConnectionAsync();
        verify(factory3, atLeastOnce()).getConnectionAsync();
        verifyZeroInteractions(connection1);
        verifyZeroInteractions(connection2);
        verifyZeroInteractions(connection3);
    }
    @Mock private ConnectionFactory factory1;
    @Mock private ConnectionFactory factory2;
    @Mock private ConnectionFactory factory3;
    @Mock private AbstractAsynchronousConnection connection1;
    @Mock private AbstractAsynchronousConnection connection2;
    @Mock private AbstractAsynchronousConnection connection3;
    @Mock private AddRequest addRequest1;
    @Mock private AddRequest addRequest2;
    @Mock private AddRequest addRequest3;
    @Mock private BindRequest bindRequest1;
    @Mock private BindRequest bindRequest2;
    @Mock private BindRequest bindRequest3;
    @Mock private CompareRequest compareRequest1;
    @Mock private CompareRequest compareRequest2;
    @Mock private CompareRequest compareRequest3;
    @Mock private DeleteRequest deleteRequest1;
    @Mock private DeleteRequest deleteRequest2;
    @Mock private DeleteRequest deleteRequest3;
    @Mock private GenericExtendedRequest extendedRequest1;
    @Mock private GenericExtendedRequest extendedRequest2;
    @Mock private GenericExtendedRequest extendedRequest3;
    @Mock private ModifyRequest modifyRequest1;
    @Mock private ModifyRequest modifyRequest2;
    @Mock private ModifyRequest modifyRequest3;
    @Mock private ModifyDNRequest modifyDNRequest1;
    @Mock private ModifyDNRequest modifyDNRequest2;
    @Mock private ModifyDNRequest modifyDNRequest3;
    @Mock private SearchRequest searchRequest1;
    @Mock private SearchRequest searchRequest2;
    @Mock private SearchRequest searchRequest3;
    private ConnectionFactory loadBalancer;
    @BeforeMethod
    public void beforeMethod() {
        TestCaseUtils.setDefaultLogLevel(Level.SEVERE);
        initMocks(this);
        stub(this.connection1);
        stub(this.connection2);
        stub(this.connection3);
        loadBalancer = new RequestLoadBalancer("Test",
                                               asList(factory1, factory2, factory3),
                                               defaultOptions(), newNextFactoryFunction());
    }
    private Function<Request, Integer, NeverThrowsException> newNextFactoryFunction() {
        return new Function<Request, Integer, NeverThrowsException>() {
            @Override
            public Integer apply(final Request request) {
                if (request == addRequest1 || request == bindRequest1 || request == compareRequest1
                        || request == deleteRequest1 || request == extendedRequest1 || request == modifyRequest1
                        || request == modifyDNRequest1 || request == searchRequest1) {
                    return 0;
                }
                if (request == addRequest2 || request == bindRequest2 || request == compareRequest2
                        || request == deleteRequest2 || request == extendedRequest2 || request == modifyRequest2
                        || request == modifyDNRequest2 || request == searchRequest2) {
                    return 1;
                }
                if (request == addRequest3 || request == bindRequest3 || request == compareRequest3
                        || request == deleteRequest3 || request == extendedRequest3 || request == modifyRequest3
                        || request == modifyDNRequest3 || request == searchRequest3) {
                    return 2;
                }
                fail("Received unexpected request");
                return -1; // Keep compiler happy.
            }
        };
    }
    private void stub(final Connection connection) {
        when(connection.addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class)))
                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
        when(connection.bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class)))
                .thenReturn(newSuccessfulLdapPromise(newBindResult(ResultCode.SUCCESS)));
        when(connection.compareAsync(any(CompareRequest.class), any(IntermediateResponseHandler.class)))
                .thenReturn(newSuccessfulLdapPromise(newCompareResult(ResultCode.SUCCESS)));
        when(connection.deleteAsync(any(DeleteRequest.class), any(IntermediateResponseHandler.class)))
                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
        when(connection.extendedRequestAsync(any(GenericExtendedRequest.class), any(IntermediateResponseHandler.class)))
                .thenReturn(newSuccessfulLdapPromise(newGenericExtendedResult(ResultCode.SUCCESS)));
        when(connection.modifyAsync(any(ModifyRequest.class), any(IntermediateResponseHandler.class)))
                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
        when(connection.modifyDNAsync(any(ModifyDNRequest.class), any(IntermediateResponseHandler.class)))
                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
        when(connection.searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class),
                                    any(SearchResultHandler.class)))
                .thenReturn(newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)));
    }
    @AfterMethod
    public void afterMethod() {
        loadBalancer.close();
        TestCaseUtils.setDefaultLogLevel(Level.INFO);
    }
    private void configureAllFactoriesOnline() {
        when(factory1.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection1));
        when(factory2.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection2));
        when(factory3.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection3));
    }
    private void configureFactoriesOneAndTwoOffline() {
        final LdapException connectionFailure = newLdapException(CLIENT_SIDE_CONNECT_ERROR);
        when(factory1.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
        when(factory2.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
        when(factory3.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newResultPromise(connection3));
    }
    private void configureAllFactoriesOffline() {
        final LdapException connectionFailure = newLdapException(CLIENT_SIDE_CONNECT_ERROR);
        when(factory1.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
        when(factory2.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
        when(factory3.getConnectionAsync())
                .thenReturn(Promises.<Connection, LdapException>newExceptionPromise(connectionFailure));
    }
    private void verifyZeroInteractionsForRemainingFactories(final ConnectionFactory expectedFactory) {
        if (expectedFactory != factory1) {
            verifyZeroInteractions(factory1);
        }
        if (expectedFactory != factory2) {
            verifyZeroInteractions(factory2);
        }
        if (expectedFactory != factory3) {
            verifyZeroInteractions(factory3);
        }
    }
}
opendj-sdk/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -22,26 +22,27 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2015 ForgeRock AS.
 *      Portions Copyright 2011-2016 ForgeRock AS.
 */
package org.forgerock.opendj.examples;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.AUTHN_BIND_REQUEST;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.HEARTBEAT_ENABLED;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.forgerock.opendj.ldap.ConnectionFactory;
import org.forgerock.opendj.ldap.Connections;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPConnectionFactory;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.RequestContext;
import org.forgerock.opendj.ldap.RequestHandlerFactory;
import org.forgerock.opendj.ldap.ServerConnectionFactory;
@@ -61,33 +62,44 @@
 * This example takes the following command line parameters:
 *
 * <pre>
 *     {@code <listenAddress> <listenPort> <proxyDN> <proxyPassword> <remoteAddress1> <remotePort1>
 *      [<remoteAddress2> <remotePort2> ...]}
 *     {@code [--load-balancer <mode>] <listenAddress> <listenPort> <proxyDN> <proxyPassword>
 *         <remoteAddress1> <remotePort1> [<remoteAddress2> <remotePort2> ...]}
 * </pre>
 *
 * Where {@code <mode>} is one of "round-robin", "fail-over", or "sharded". The default is round-robin.
 */
public final class Proxy {
    /**
     * Main method.
     *
     * @param args
     *            The command line arguments: listen address, listen port,
     *            The command line arguments: [--load-balancer <mode>] listen address, listen port,
     *            remote address1, remote port1, remote address2, remote port2,
     *            ...
     */
    public static void main(final String[] args) {
        if (args.length < 6 || args.length % 2 != 0) {
            System.err.println("Usage: listenAddress listenPort "
                    + "proxyDN proxyPassword remoteAddress1 remotePort1 "
                    + "remoteAddress2 remotePort2 ...");
            System.err.println("Usage: [--load-balancer <mode>] listenAddress listenPort "
                    + "proxyDN proxyPassword remoteAddress1 remotePort1 remoteAddress2 remotePort2 ...");
            System.exit(1);
        }
        // Parse command line arguments.
        final String localAddress = args[0];
        final int localPort = Integer.parseInt(args[1]);
        int i = 0;
        final String proxyDN = args[2];
        final String proxyPassword = args[3];
        final LoadBalancingAlgorithm algorithm;
        if ("--load-balancer".equals(args[i])) {
            algorithm = getLoadBalancingAlgorithm(args[i + 1]);
            i += 2;
        } else {
            algorithm = LoadBalancingAlgorithm.ROUND_ROBIN;
        }
        final String localAddress = args[i++];
        final int localPort = Integer.parseInt(args[i++]);
        final String proxyDN = args[i++];
        final String proxyPassword = args[i++];
        // Create load balancer.
        // --- JCite pools ---
@@ -100,7 +112,7 @@
        final List<ConnectionFactory> bindFactories = new LinkedList<>();
        final Options bindFactoryOptions = Options.defaultOptions().set(HEARTBEAT_ENABLED, true);
        for (int i = 4; i < args.length; i += 2) {
        for (; i < args.length; i += 2) {
            final String remoteAddress = args[i];
            final int remotePort = Integer.parseInt(args[i + 1]);
@@ -114,10 +126,8 @@
        }
        // --- JCite pools ---
        // --- JCite load balancer ---
        final ConnectionFactory factory = Connections.newRoundRobinLoadBalancer(factories, factoryOptions);
        final ConnectionFactory bindFactory = Connections.newRoundRobinLoadBalancer(bindFactories, bindFactoryOptions);
        // --- JCite load balancer ---
        final ConnectionFactory factory = algorithm.newLoadBalancer(factories, factoryOptions);
        final ConnectionFactory bindFactory = algorithm.newLoadBalancer(bindFactories, bindFactoryOptions);
        // --- JCite backend ---
        /*
@@ -156,6 +166,47 @@
        // --- JCite listener ---
    }
    private static LoadBalancingAlgorithm getLoadBalancingAlgorithm(final String algorithmName) {
        switch (algorithmName) {
        case "round-robin":
            return LoadBalancingAlgorithm.ROUND_ROBIN;
        case "fail-over":
            return LoadBalancingAlgorithm.FAIL_OVER;
        case "sharded":
            return LoadBalancingAlgorithm.SHARDED;
        default:
            System.err.println("Unrecognized load-balancing algorithm '" + algorithmName + "'. Should be one of "
                                       + "'round-robin', 'fail-over', or 'sharded'.");
            System.exit(1);
        }
        return LoadBalancingAlgorithm.ROUND_ROBIN; // keep compiler happy.
    }
    private enum LoadBalancingAlgorithm {
        ROUND_ROBIN {
            @Override
            ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
                // --- JCite load balancer ---
                return Connections.newRoundRobinLoadBalancer(factories, options);
                // --- JCite load balancer ---
            }
        },
        FAIL_OVER {
            @Override
            ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
                return Connections.newFailoverLoadBalancer(factories, options);
            }
        },
        SHARDED {
            @Override
            ConnectionFactory newLoadBalancer(final Collection<ConnectionFactory> factories, final Options options) {
                return Connections.newShardedRequestLoadBalancer(factories, options);
            }
        };
        abstract ConnectionFactory newLoadBalancer(Collection<ConnectionFactory> factories, Options options);
    }
    private Proxy() {
        // Not used.
    }