mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
29.33.2016 953657d6645a26d4ce33f21eeafa31ed4e61fd81
OPENDJ-3425: implement simple distribution load-balancer

Uses consistent hashing based on MD5 hash of target DNs. Can
be extended in future to support different weightings, as well
as dynamic addition and removal of partitions.
2 files modified
4 files added
1498 ■■■■■ changed files
opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java 179 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java 498 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java 288 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties 10 ●●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java 382 ●●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java 141 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -569,67 +569,134 @@
            @Override
            public RequestWithIndex apply(final Request request) {
                // Normalize the hash to a valid factory index, taking care of negative hash values and especially
                // Integer.MIN_VALUE (see doc for Math.abs()).
                final int index = computeIndexBasedOnDnHashCode(request);
                return new RequestWithIndex(request, index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex));
            }
            private int computeIndexBasedOnDnHashCode(final Request request) {
                // The following conditions are ordered such that the most common operations appear first in order to
                // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
                // would only apply to the core operations, not extended operations or SASL binds.
                if (request instanceof SearchRequest) {
                    return ((SearchRequest) request).getName().hashCode();
                } else if (request instanceof ModifyRequest) {
                    return ((ModifyRequest) request).getName().hashCode();
                } else if (request instanceof SimpleBindRequest) {
                    return hashCodeOfDnString(((SimpleBindRequest) request).getName());
                } else if (request instanceof AddRequest) {
                    return ((AddRequest) request).getName().hashCode();
                } else if (request instanceof DeleteRequest) {
                    return ((DeleteRequest) request).getName().hashCode();
                } else if (request instanceof CompareRequest) {
                    return ((CompareRequest) request).getName().hashCode();
                } else if (request instanceof ModifyDNRequest) {
                    return ((ModifyDNRequest) request).getName().hashCode();
                } else if (request instanceof PasswordModifyExtendedRequest) {
                    return hashCodeOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
                } else if (request instanceof PlainSASLBindRequest) {
                    return hashCodeOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
                } else if (request instanceof DigestMD5SASLBindRequest) {
                    return hashCodeOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
                } else if (request instanceof GSSAPISASLBindRequest) {
                    return hashCodeOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
                } else if (request instanceof CRAMMD5SASLBindRequest) {
                    return hashCodeOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
                } else {
                    return distributeRequestAtRandom();
                }
            }
            private int hashCodeOfAuthzid(final String authzid) {
                if (authzid != null && authzid.startsWith("dn:")) {
                    return hashCodeOfDnString(authzid.substring(3));
                }
                return distributeRequestAtRandom();
            }
            private int hashCodeOfDnString(final String dnString) {
                try {
                    return DN.valueOf(dnString).hashCode();
                } catch (final IllegalArgumentException ignored) {
                    return distributeRequestAtRandom();
                }
            }
            private int distributeRequestAtRandom() {
                return ThreadLocalRandom.current().nextInt(0, maxIndex);
                final int index = computePartitionIdFromDN(dnOfRequest(request), maxIndex);
                return new RequestWithIndex(request, index);
            }
        };
    }
    /**
     * Returns the partition ID which should be selected based on the provided DN and number of partitions, taking care
     * of negative hash values and especially Integer.MIN_VALUE (see doc for Math.abs()). If the provided DN is
     * {@code null} then a random partition ID will be returned.
     *
     * @param dn
     *         The DN whose partition ID is to be determined, which may be {@code null}.
     * @param numberOfPartitions
     *         The total number of partitions.
     * @return A partition ID in the range 0 <= partitionID < numberOfPartitions.
     */
    private static int computePartitionIdFromDN(final DN dn, final int numberOfPartitions) {
        final int index = dn != null ? dn.hashCode() : ThreadLocalRandom.current().nextInt(0, numberOfPartitions);
        return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % numberOfPartitions);
    }
    /**
     * Returns the DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be
     * determined. This method will return {@code null} for most extended operations and SASL bind requests which
     * specify a non-DN authorization ID.
     *
     * @param request
     *         The request whose target entry DN is to be determined.
     * @return The DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be
     *         determined.
     */
    static DN dnOfRequest(final Request request) {
        // The following conditions are ordered such that the most common operations appear first in order to
        // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
        // would only apply to the core operations, not extended operations or SASL binds.
        if (request instanceof SearchRequest) {
            return ((SearchRequest) request).getName();
        } else if (request instanceof ModifyRequest) {
            return ((ModifyRequest) request).getName();
        } else if (request instanceof SimpleBindRequest) {
            return dnOf(((SimpleBindRequest) request).getName());
        } else if (request instanceof AddRequest) {
            return ((AddRequest) request).getName();
        } else if (request instanceof DeleteRequest) {
            return ((DeleteRequest) request).getName();
        } else if (request instanceof CompareRequest) {
            return ((CompareRequest) request).getName();
        } else if (request instanceof ModifyDNRequest) {
            return ((ModifyDNRequest) request).getName();
        } else if (request instanceof PasswordModifyExtendedRequest) {
            return dnOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
        } else if (request instanceof PlainSASLBindRequest) {
            return dnOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
        } else if (request instanceof DigestMD5SASLBindRequest) {
            return dnOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
        } else if (request instanceof GSSAPISASLBindRequest) {
            return dnOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
        } else if (request instanceof CRAMMD5SASLBindRequest) {
            return dnOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
        } else {
            return null;
        }
    }
    private static DN dnOfAuthzid(final String authzid) {
        if (authzid != null && authzid.startsWith("dn:")) {
            return dnOf(authzid.substring(3));
        }
        return null;
    }
    private static DN dnOf(final String dnString) {
        try {
            return DN.valueOf(dnString);
        } catch (final IllegalArgumentException ignored) {
            return null;
        }
    }
    /**
     * Creates a distribution load balancer which uses consistent hashing to distributes requests across a set of
     * partitions based on a hash of each request's target DN. More precisely, a partition is selected as follows:
     * <ul>
     * <li>if the targeted entry lies beneath the partition base DN then the partition is selected based on a hash of
     * the DN which is superior to the target DN and immediately subordinate to the partition base DN</li>
     * <li>otherwise, if the request is not a search then the request is routed to a random partition</li>
     * <li>otherwise, if the search request targets the partition base DN, or a superior thereof, then the search is
     * routed to a random partition or broadcast to all partitions, depending on the search scope. When broadcasting,
     * care is taken to re-scope sub-requests in order to avoid returning duplicate entries</li>
     * <li>otherwise, the search is routed to a random partition because its scope lies outside of the partition
     * space.</li>
     * </ul>
     * This load balancer allows client applications to linearly scale their deployment for write throughput as well
     * as total number of entries. For example, if a single replicated topology can support 10000 updates/s and a
     * total of 100M entries, then a 4 way distributed topology could support up to 40000 updates/s and 400M entries.
     * <p/>
     * <b>NOTE:</b> there are a number of assumptions in the design of this load balancer as well as a number of
     * limitations:
     * <ul>
     * <li>simple paged results, server side sorting, and VLV request controls are not supported for searches which
     * traverse all partitions. </li>
     * <li>persistent searches which traverse all partitions are only supported if they request changes only. </li>
     * <li>requests which target an entry which is not below the partition base DN will be routed to a partition
     * selected based on the request's DN, thereby providing affinity. Note that this behavior assumes that entries
     * which are not below the partition base DN are replicated across all partitions.</li>
     * <li>searches that traverse multiple partitions as well as entries above the partition base DN may return
     * results in a non-hierarchical order. Specifically, entries from a partition (below the partition base DN)
     * may be returned before entries above the partition base DN. Although not required by the LDAP standard, some
     * legacy clients expect entries to be returned in hierarchical order.
     * </li>
     * </ul>
     *
     * @param partitionBaseDN
     *         The DN beneath which data is partitioned. All other data is assumed to be shared across all partitions.
     * @param partitions
     *         The consistent hash map containing the partitions to be distributed.
     * @param options
     *         The configuration options for the load-balancer (no options are supported currently).
     * @return The new distribution load balancer.
     */
    @SuppressWarnings("unused")
    public static ConnectionFactory newFixedSizeDistributionLoadBalancer(final DN partitionBaseDN,
            final ConsistentHashMap<? extends ConnectionFactory> partitions, final Options options) {
        return new ConsistentHashDistributionLoadBalancer(partitionBaseDN, partitions);
    }
    /**
     * Creates a new "least requests" load-balancer which will load-balance individual requests across the provided
     * set of connection factories, each typically representing a single replica, using an algorithm that ensures that
     * requests are routed to the replica which has the minimum number of active requests.
opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java
New file
@@ -0,0 +1,498 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.ldap.CoreMessages.*;
import static org.forgerock.opendj.ldap.Connections.dnOfRequest;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_USER_CANCELLED;
import static org.forgerock.opendj.ldap.ResultCode.PROTOCOL_ERROR;
import static org.forgerock.opendj.ldap.ResultCode.UNWILLING_TO_PERFORM;
import static org.forgerock.opendj.ldap.SearchScope.SUBORDINATES;
import static org.forgerock.opendj.ldap.requests.Requests.copyOfSearchRequest;
import static org.forgerock.opendj.ldap.spi.LdapPromises.newFailedLdapPromise;
import static org.forgerock.util.Utils.closeSilently;
import static org.forgerock.util.Utils.joinAsString;
import static org.forgerock.util.promise.Promises.newResultPromise;
import static org.forgerock.util.promise.Promises.when;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.controls.PersistentSearchRequestControl;
import org.forgerock.opendj.ldap.controls.ServerSideSortRequestControl;
import org.forgerock.opendj.ldap.controls.SimplePagedResultsControl;
import org.forgerock.opendj.ldap.controls.VirtualListViewRequestControl;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.IntermediateResponse;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import org.forgerock.opendj.ldap.spi.ConnectionState;
import org.forgerock.opendj.ldap.spi.LdapPromises;
import org.forgerock.util.AsyncFunction;
import org.forgerock.util.promise.ExceptionHandler;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.promise.PromiseImpl;
import org.forgerock.util.promise.ResultHandler;
import org.forgerock.util.promise.RuntimeExceptionHandler;
/**
 * A simple distribution algorithm which distributes requests equally across a fixed number of partitions based on a
 * hash of the request's target DN. See {@link Connections#newFixedSizeDistributionLoadBalancer}.
 */
final class ConsistentHashDistributionLoadBalancer implements ConnectionFactory {
    private static final String NAME = ConsistentHashDistributionLoadBalancer.class.getSimpleName();
    private final ConsistentHashMap<? extends ConnectionFactory> partitions;
    private final DN partitionBaseDN;
    ConsistentHashDistributionLoadBalancer(final DN partitionBaseDN,
                                           final ConsistentHashMap<? extends ConnectionFactory> partitions) {
        this.partitionBaseDN = partitionBaseDN;
        this.partitions = partitions;
    }
    @Override
    public final void close() {
        closeSilently(partitions.getAll());
    }
    @Override
    public final String toString() {
        return NAME + '(' + joinAsString(",", partitions) + ')';
    }
    @Override
    public final Connection getConnection() throws LdapException {
        return new ConnectionImpl();
    }
    @Override
    public final Promise<Connection, LdapException> getConnectionAsync() {
        return newResultPromise((Connection) new ConnectionImpl());
    }
    private class ConnectionImpl extends AbstractAsynchronousConnection {
        private final ConnectionState state = new ConnectionState();
        @Override
        public String toString() {
            return NAME + "Connection";
        }
        @Override
        public LdapPromise<Void> abandonAsync(final AbandonRequest request) {
            // We cannot possibly route these correctly, so just drop them.
            return LdapPromises.newSuccessfulLdapPromise(null);
        }
        @Override
        public LdapPromise<Result> addAsync(final AddRequest request,
                                            final IntermediateResponseHandler intermediateResponseHandler) {
            final ConnectionFactory partition = getPartition(request.getName());
            return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.addAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public void addConnectionEventListener(final ConnectionEventListener listener) {
            state.addConnectionEventListener(listener);
        }
        @Override
        public LdapPromise<BindResult> bindAsync(final BindRequest request,
                                                 final IntermediateResponseHandler intermediateResponseHandler) {
            final DN dn = dnOfRequest(request);
            if (dn == null) {
                return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM,
                                                             DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_BIND.get()));
            }
            final ConnectionFactory partition = getPartition(dn);
            return connectAndSendRequest(partition, new AsyncFunction<Connection, BindResult, LdapException>() {
                @Override
                public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.bindAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public void close(final UnbindRequest request, final String reason) {
            state.notifyConnectionClosed();
        }
        @Override
        public LdapPromise<CompareResult> compareAsync(final CompareRequest request,
                                                       final IntermediateResponseHandler intermediateResponseHandler) {
            final ConnectionFactory partition = getPartition(request.getName());
            return connectAndSendRequest(partition, new AsyncFunction<Connection, CompareResult, LdapException>() {
                @Override
                public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.compareAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public LdapPromise<Result> deleteAsync(final DeleteRequest request,
                                               final IntermediateResponseHandler intermediateResponseHandler) {
            // We could reject requests that attempt to delete entries superior to the partition base DN. However,
            // this is really the responsibility of the backend servers to enforce. In some cases such requests
            // should be allowed, e.g. if the partitions are all empty.
            final ConnectionFactory partition = getPartition(request.getName());
            return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.deleteAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(final ExtendedRequest<R> request,
                                                                              final IntermediateResponseHandler
                                                                                      intermediateResponseHandler) {
            final DN dn = dnOfRequest(request);
            if (dn == null) {
                return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM,
                                                             DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_EXT_OP.get()));
            }
            final ConnectionFactory partition = getPartition(dn);
            return connectAndSendRequest(partition, new AsyncFunction<Connection, R, LdapException>() {
                @Override
                public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.extendedRequestAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public boolean isClosed() {
            return state.isClosed();
        }
        @Override
        public boolean isValid() {
            return state.isValid();
        }
        @Override
        public LdapPromise<Result> modifyAsync(final ModifyRequest request,
                                               final IntermediateResponseHandler intermediateResponseHandler) {
            final ConnectionFactory partition = getPartition(request.getName());
            return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.modifyAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public LdapPromise<Result> modifyDNAsync(final ModifyDNRequest request,
                                                 final IntermediateResponseHandler intermediateResponseHandler) {
            final DN oldDN = request.getName();
            final ConnectionFactory oldPartition = getPartition(oldDN);
            final DN newParent = request.getNewSuperior() != null ? request.getNewSuperior() : oldDN.parent();
            final DN newDN = newParent.child(request.getNewRDN());
            final ConnectionFactory newPartition = getPartition(newDN);
            // Acceptable if the request is completely outside of the partitions or entirely within a single partition.
            if (oldPartition != newPartition) {
                return unwillingToPerform(DISTRIBUTION_MODDN_SPANS_MULTIPLE_PARTITIONS.get());
            }
            return connectAndSendRequest(oldPartition, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.modifyDNAsync(request, intermediateResponseHandler);
                }
            });
        }
        @Override
        public void removeConnectionEventListener(final ConnectionEventListener listener) {
            state.removeConnectionEventListener(listener);
        }
        @Override
        public LdapPromise<Result> searchAsync(final SearchRequest request,
                                               final IntermediateResponseHandler intermediateResponseHandler,
                                               final SearchResultHandler entryHandler) {
            // Broadcast if the search targets an entry which is superior to the base DN and has a scope which
            // reaches into the partitions.
            final DN dn = request.getName();
            switch (request.getScope().asEnum()) {
            case BASE_OBJECT:
                // Request can always be handled by a single partition.
                return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
            case SINGLE_LEVEL:
                // Needs broadcasting if the search DN is equal to the partition DN.
                if (dn.equals(partitionBaseDN)) {
                    return searchAllPartitions(request, intermediateResponseHandler, entryHandler);
                } else {
                    return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
                }
            case WHOLE_SUBTREE:
                // Needs broadcasting if the search DN is superior or equal to the partition DN.
                if (dn.isSuperiorOrEqualTo(partitionBaseDN)) {
                    // Split into primary search against search DN and secondary searches against partition DN.
                    return splitAndSearchAllPartitions(request, intermediateResponseHandler, entryHandler);
                } else {
                    return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
                }
            case SUBORDINATES:
                // Needs broadcasting if the search DN is superior or equal to the partition DN.
                if (dn.equals(partitionBaseDN)) {
                    return searchAllPartitions(request, intermediateResponseHandler, entryHandler);
                } else if (dn.isSuperiorOrEqualTo(partitionBaseDN)) {
                    // Split into primary search against search DN and secondary searches against partition DN.
                    return splitAndSearchAllPartitions(request, intermediateResponseHandler, entryHandler);
                } else {
                    return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
                }
            default: // UNKNOWN
                return unwillingToPerform(DISTRIBUTION_UNSUPPORTED_SEARCH_SCOPE.get(request.getScope().intValue()));
            }
        }
        /**
         * Forwards a search request to each partition. The search request is scoped such that duplicate results are
         * not possible.
         */
        private LdapPromise<Result> searchAllPartitions(final SearchRequest request,
                                                        final IntermediateResponseHandler irh,
                                                        final SearchResultHandler srh) {
            return broadcastSearch(request, request, irh, srh);
        }
        /**
         * In order to avoid duplicates, search one partition using the request's DN and the remaining partitions using
         * the partition base DN with scope "SUBORDINATES".
         * <p>
         * TODO: may return results that are not in hierarchical order.
         */
        private LdapPromise<Result> splitAndSearchAllPartitions(final SearchRequest primarySearch,
                                                                final IntermediateResponseHandler irh,
                                                                final SearchResultHandler srh) {
            final SearchRequest secondarySearch =
                    copyOfSearchRequest(primarySearch).setName(partitionBaseDN).setScope(SUBORDINATES);
            return broadcastSearch(primarySearch, secondarySearch, irh, srh);
        }
        /**
         * Take care to ensure that all searches are cancelled when one of them fails (important if the
         * searches are persistent).
         */
        private LdapPromise<Result> broadcastSearch(final SearchRequest primarySearch,
                                                    final SearchRequest secondarySearch,
                                                    final IntermediateResponseHandler irh,
                                                    final SearchResultHandler srh) {
            // First reject requests that contain controls that we do not support when broadcast.
            if (primarySearch.containsControl(VirtualListViewRequestControl.OID)) {
                return unwillingToPerform(DISTRIBUTION_VLV_CONTROL_NOT_SUPPORTED.get());
            } else if (primarySearch.containsControl(SimplePagedResultsControl.OID)) {
                return unwillingToPerform(DISTRIBUTION_SPR_CONTROL_NOT_SUPPORTED.get());
            } else if (primarySearch.containsControl(ServerSideSortRequestControl.OID)) {
                return unwillingToPerform(DISTRIBUTION_SSS_CONTROL_NOT_SUPPORTED.get());
            } else if (primarySearch.containsControl(PersistentSearchRequestControl.OID)) {
                // Persistent searches return results in two phases: the initial search results and then the changes.
                // Unfortunately, there's no way to determine when the first phase has completed, so it's not
                // possible to prevent the two phases from being interleaved when broadcast.
                try {
                    final PersistentSearchRequestControl control =
                            primarySearch.getControl(PersistentSearchRequestControl.DECODER, new DecodeOptions());
                    if (!control.isChangesOnly()) {
                        return unwillingToPerform(DISTRIBUTION_PSEARCH_CONTROL_NOT_SUPPORTED.get());
                    }
                } catch (final DecodeException e) {
                    return newFailedLdapPromise(newLdapException(PROTOCOL_ERROR, e.getMessage(), e));
                }
            }
            final List<Promise<Result, LdapException>> promises = new ArrayList<>(partitions.size());
            // Launch all searches with the primary search targeting a random partition.
            final ConnectionFactory primaryPartition = getPartition(primarySearch.getName());
            final IntermediateResponseHandler sirh = synchronize(irh);
            final SearchResultHandler ssrh = synchronize(srh);
            for (final ConnectionFactory partition : partitions.getAll()) {
                final SearchRequest searchRequest = partition == primaryPartition ? primarySearch : secondarySearch;
                promises.add(searchSinglePartition(searchRequest, sirh, ssrh, partition));
            }
            // FIXME: chained PromiseImpl and Promises.when() don't chain cancellation requests.
            final PromiseImpl<Result, LdapException> reducedPromise = new PromiseImpl<Result, LdapException>() {
                @Override
                protected LdapException tryCancel(final boolean mayInterruptIfRunning) {
                    for (Promise<Result, LdapException> promise : promises) {
                        promise.cancel(mayInterruptIfRunning);
                    }
                    return newLdapException(CLIENT_SIDE_USER_CANCELLED);
                }
            };
            when(promises).thenOnResult(new ResultHandler<List<Result>>() {
                @Override
                public void handleResult(final List<Result> results) {
                    // TODO: Depending on controls we may want to merge these results in some way.
                    reducedPromise.handleResult(results.get(0));
                }
            }).thenOnException(new ExceptionHandler<LdapException>() {
                @Override
                public void handleException(final LdapException exception) {
                    reducedPromise.handleException(exception);
                }
            }).thenOnRuntimeException(new RuntimeExceptionHandler() {
                @Override
                public void handleRuntimeException(final RuntimeException exception) {
                    reducedPromise.handleRuntimeException(exception);
                }
            }).thenFinally(new Runnable() {
                @Override
                public void run() {
                    // Ensure that any remaining searches are terminated.
                    for (Promise<Result, LdapException> promise : promises) {
                        promise.cancel(true);
                    }
                }
            });
            return LdapPromises.asPromise(reducedPromise);
        }
        private LdapPromise<Result> unwillingToPerform(final LocalizableMessage msg) {
            return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM, msg));
        }
        /**
         * Forwards a search request to a single partition based on the search's base DN. If the DN does not target a
         * specific partition then a random partition will be selected.
         */
        private LdapPromise<Result> searchSinglePartition(final SearchRequest request,
                                                          final IntermediateResponseHandler irh,
                                                          final SearchResultHandler srh) {
            final ConnectionFactory partition = getPartition(request.getName());
            return searchSinglePartition(request, irh, srh, partition);
        }
        private LdapPromise<Result> searchSinglePartition(final SearchRequest request,
                                                          final IntermediateResponseHandler irh,
                                                          final SearchResultHandler srh,
                                                          final ConnectionFactory partition) {
            return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
                @Override
                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
                    return connection.searchAsync(request, irh, srh);
                }
            });
        }
        /** Returns a search result handler that ensures that only one response is processed at a time. */
        private SearchResultHandler synchronize(final SearchResultHandler srh) {
            return new SearchResultHandler() {
                @Override
                public synchronized boolean handleEntry(final SearchResultEntry entry) {
                    return srh.handleEntry(entry);
                }
                @Override
                public synchronized boolean handleReference(final SearchResultReference reference) {
                    return srh.handleReference(reference);
                }
            };
        }
        /** Returns an intermediate response handler that ensures that only one response is processed at a time. */
        private IntermediateResponseHandler synchronize(final IntermediateResponseHandler irh) {
            return new IntermediateResponseHandler() {
                @Override
                public synchronized boolean handleIntermediateResponse(final IntermediateResponse response) {
                    return irh.handleIntermediateResponse(response);
                }
            };
        }
        /**
         * Returns the partition for requests having the provided DN. If the DN is not associated with a specific
         * partition then a random partition will be returned.
         */
        private ConnectionFactory getPartition(final DN dn) {
            final DN partitionDN = getPartitionDN(dn);
            return partitions.get((partitionDN != null ? partitionDN : dn).toNormalizedUrlSafeString());
        }
        /**
         * Returns the DN which is a child of the partition base DN and which is equal to or superior to the provided
         * DN, otherwise {@code null}.
         */
        private DN getPartitionDN(final DN dn) {
            final int depthBelowBaseDN = dn.size() - partitionBaseDN.size();
            if (depthBelowBaseDN > 0 && dn.isSubordinateOrEqualTo(partitionBaseDN)) {
                return dn.parent(depthBelowBaseDN - 1);
            }
            return null;
        }
        private <R> LdapPromise<R> connectAndSendRequest(final ConnectionFactory partition,
                                                         final AsyncFunction<Connection, R, LdapException> doRequest) {
            if (state.isClosed()) {
                throw new IllegalStateException("Connection is already closed");
            }
            final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
            return connectAsync(partition).thenOnResult(new ResultHandler<Connection>() {
                @Override
                public void handleResult(final Connection connection) {
                    connectionHolder.set(connection);
                }
                // FIXME: how do we support cancellation of the request?
            }).thenAsync(doRequest).thenFinally(new Runnable() {
                @Override
                public void run() {
                    closeSilently(connectionHolder.get());
                }
            });
        }
        private LdapPromise<Connection> connectAsync(final ConnectionFactory partition) {
            return LdapPromises.asPromise(partition.getConnectionAsync()
                                                   .thenOnException(new ExceptionHandler<LdapException>() {
                                                       @Override
                                                       public void handleException(final LdapException e) {
                                                           state.notifyConnectionError(false, e);
                                                       }
                                                   }));
        }
    }
}
opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java
New file
@@ -0,0 +1,288 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import org.forgerock.util.Function;
import org.forgerock.util.Reject;
import org.forgerock.util.annotations.VisibleForTesting;
import org.forgerock.util.promise.NeverThrowsException;
/**
 * An implementation of "consistent hashing" supporting per-partition weighting. This implementation is thread safe
 * and allows partitions to be added and removed during use.
 * <p>
 * This implementation maps partitions to one or more points on a circle ranging from {@link Integer#MIN_VALUE} to
 * {@link Integer#MAX_VALUE}. The number of points per partition is dictated by the partition's weight. A partition
 * with a weight which is higher than another partition will receive a proportionally higher load.
 *
 * @param <P> The type of partition object.
 *
 * @see <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.23.3738">Consistent Hashing and Random
 * Trees</a>
 * @see <a href="http://www8.org/w8-papers/2a-webserver/caching/paper2.html">Web Caching with Consistent Hashing</a>
 */
public final class ConsistentHashMap<P> {
    // TODO: add methods for determining which partitions will need to be rebalanced when a partition is added or
    // removed.
    /** The default weight. The value is relatively high in order to minimize the risk of imbalances. */
    private static final int DEFAULT_WEIGHT = 200;
    /** Default hash function based on MD5. */
    @VisibleForTesting
    static final Function<Object, Integer, NeverThrowsException> MD5 =
            new Function<Object, Integer, NeverThrowsException>() {
                @Override
                public Integer apply(final Object key) {
                    final byte[] bytes = key.toString().getBytes(UTF_8);
                    final byte[] digest = getMD5Digest().digest(bytes);
                    return ByteString.wrap(digest).toInt();
                }
                private MessageDigest getMD5Digest() {
                    // TODO: we may want to cache these.
                    try {
                        return MessageDigest.getInstance("MD5");
                    } catch (NoSuchAlgorithmException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
    /** Synchronizes updates. Reads are protected by copy on write. */
    private final ReentrantLock writeLock = new ReentrantLock();
    /** Consistent hash map circle. */
    private volatile NavigableMap<Integer, Node<P>> circle = new TreeMap<>();
    /** Maps partition IDs to their partition. */
    private volatile Map<String, P> partitions = new LinkedHashMap<>();
    /** Function used for hashing keys. */
    private final Function<Object, Integer, NeverThrowsException> hashFunction;
    /** Creates a new consistent hash map which will hash keys using MD5. */
    public ConsistentHashMap() {
        this(MD5);
    }
    /**
     * Creates a new consistent hash map which will hash keys using the provided hash function.
     *
     * @param hashFunction
     *         The function which should be used for hashing keys.
     */
    public ConsistentHashMap(final Function<Object, Integer, NeverThrowsException> hashFunction) {
        this.hashFunction = hashFunction;
    }
    /**
     * Puts a partition into this consistent hash map using the default weight which is sufficiently high to ensure a
     * reasonably uniform distribution among all partitions having the same weight.
     *
     * @param partitionId
     *         The partition ID.
     * @param partition
     *         The partition.
     * @return This consistent hash map.
     */
    public ConsistentHashMap<P> put(final String partitionId, final P partition) {
        return put(partitionId, partition, DEFAULT_WEIGHT);
    }
    /**
     * Puts a partition into this consistent hash map using the specified weight. If all partitions have the same
     * weight then they will each receive a similar amount of load. A partition having a weight which is twice that
     * of another will receive twice the load. Weight values should generally be great than 200 in order to minimize
     * the risk of unexpected imbalances due to the way in which logical partitions are mapped to real partitions.
     *
     * @param partitionId
     *         The partition ID.
     * @param partition
     *         The partition.
     * @param weight
     *         The partition's weight, which should typically be over 200 and never negative.
     * @return This consistent hash map.
     */
    public ConsistentHashMap<P> put(final String partitionId, final P partition, final int weight) {
        Reject.ifNull(partitionId, "partitionId must be non-null");
        Reject.ifNull(partition, "partition must be non-null");
        Reject.ifTrue(weight < 0, "Weight must be a positive integer");
        final Node<P> node = new Node<>(partitionId, partition, weight);
        writeLock.lock();
        try {
            final TreeMap<Integer, Node<P>> newCircle = new TreeMap<>(circle);
            for (int i = 0; i < weight; i++) {
                newCircle.put(hashFunction.apply(partitionId + i), node);
            }
            final Map<String, P> newPartitions = new LinkedHashMap<>(partitions);
            newPartitions.put(partitionId, partition);
            // It doesn't matter that these assignments are not atomic.
            circle = newCircle;
            partitions = newPartitions;
        } finally {
            writeLock.unlock();
        }
        return this;
    }
    /**
     * Removes the partition that was previously added using the provided partition ID.
     *
     * @param partitionId
     *         The partition ID.
     * @return This consistent hash map.
     */
    public ConsistentHashMap<P> remove(final String partitionId) {
        Reject.ifNull(partitionId, "partitionId must be non-null");
        writeLock.lock();
        try {
            if (partitions.containsKey(partitionId)) {
                final TreeMap<Integer, Node<P>> newCircle = new TreeMap<>(circle);
                final Node<P> node = newCircle.remove(hashFunction.apply(partitionId + 0));
                for (int i = 1; i < node.weight; i++) {
                    newCircle.remove(hashFunction.apply(partitionId + i));
                }
                final Map<String, P> newPartitions = new LinkedHashMap<>(partitions);
                newPartitions.remove(partitionId);
                // It doesn't matter that these assignments are not atomic.
                circle = newCircle;
                partitions = newPartitions;
            }
        } finally {
            writeLock.unlock();
        }
        return this;
    }
    /**
     * Returns the partition from this map corresponding to the provided key's hash, or {@code null} if this map is
     * empty.
     *
     * @param key
     *         The key for which a corresponding partition is to be returned.
     * @return The partition from this map corresponding to the provided key's hash, or {@code null} if this map is
     * empty.
     */
    P get(final Object key) {
        final NavigableMap<Integer, Node<P>> circleSnapshot = circle;
        final Map.Entry<Integer, Node<P>> ceilingEntry = circleSnapshot.ceilingEntry(hashFunction.apply(key));
        if (ceilingEntry != null) {
            return ceilingEntry.getValue().partition;
        }
        final Map.Entry<Integer, Node<P>> firstEntry = circleSnapshot.firstEntry();
        return firstEntry != null ? firstEntry.getValue().partition : null;
    }
    /**
     * Returns a collection containing all of the partitions contained in this consistent hash map.
     *
     * @return A collection containing all of the partitions contained in this consistent hash map.
     */
    Collection<P> getAll() {
        return partitions.values();
    }
    /**
     * Returns the number of partitions in this consistent hash map.
     *
     * @return The number of partitions in this consistent hash map.
     */
    int size() {
        return partitions.size();
    }
    /**
     * Returns {@code true} if there are no partitions in this consistent hash map.
     *
     * @return {@code true} if there are no partitions in this consistent hash map.
     */
    boolean isEmpty() {
        return partitions.isEmpty();
    }
    /**
     * Returns a map whose keys are the partitions stored in this map and whose values are the actual weights associated
     * with each partition. The sum of the weights will be equal to 2^32.
     * <p/>
     * This method is intended for testing, but may one day be used in order to query the current status of the
     * load-balancer and, in particular, the weighting associated with each partition as a percentage.
     *
     * @return A map whose keys are the partitions stored in this map and whose values are the actual weights associated
     * with each partition.
     */
    @VisibleForTesting
    Map<P, Long> getWeights() {
        final NavigableMap<Integer, Node<P>> circleSnapshot = circle;
        final IdentityHashMap<P, Long> weights = new IdentityHashMap<>();
        Map.Entry<Integer, Node<P>> previousEntry = null;
        for (final Map.Entry<Integer, Node<P>> entry : circleSnapshot.entrySet()) {
            final long index = entry.getKey();
            final P partition = entry.getValue().partition;
            if (previousEntry == null) {
                // Special case for first value since the range begins with the last entry.
                final long range1 = (long) Integer.MAX_VALUE - circleSnapshot.lastEntry().getKey();
                final long range2 = index - Integer.MIN_VALUE;
                weights.put(partition, range1 + range2 + 1);
            } else {
                final long start = previousEntry.getKey();
                final long end = entry.getKey();
                if (weights.containsKey(partition)) {
                    weights.put(partition, weights.get(partition) + (end - start));
                } else {
                    weights.put(partition, end - start);
                }
            }
            previousEntry = entry;
        }
        return weights;
    }
    @Override
    public String toString() {
        return getWeights().toString();
    }
    /** A partition stored in the consistent hash map circle. */
    private static final class Node<P> {
        private final String partitionId;
        private final P partition;
        private final int weight;
        private Node(final String partitionId, final P partition, final int weight) {
            this.partitionId = partitionId;
            this.partition = partition;
            this.weight = weight;
        }
        @Override
        public String toString() {
            return partitionId;
        }
    }
}
opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
@@ -1725,3 +1725,13 @@
ERR_UNKNOWN_REQUEST_TYPE=The following request has a unknown request type: %s
WARN_DECODING_AFFINITY_CONTROL=An error occurred while decoding an affinity control: %s
ERR_SASL_BIND_MULTI_STAGE=An error occurred during multi-stage authentication: '%s'
DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_BIND=Unable to determine the partition ID from bind request
DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_EXT_OP=Unable to determine the partition ID from extended request
DISTRIBUTION_DELETE_SPANS_MULTIPLE_PARTITIONS=The delete request spans multiple partitions
DISTRIBUTION_MODDN_SPANS_MULTIPLE_PARTITIONS=The modify DN request spans multiple partitions
DISTRIBUTION_UNSUPPORTED_SEARCH_SCOPE=The search scope %d is not supported by the distribution layer
DISTRIBUTION_VLV_CONTROL_NOT_SUPPORTED=The VLV request control is not supported across multiple partitions
DISTRIBUTION_SPR_CONTROL_NOT_SUPPORTED=The Simple Paged Results request control is not supported across multiple partitions
DISTRIBUTION_SSS_CONTROL_NOT_SUPPORTED=The Server Side Sort request control is not supported across multiple partitions
DISTRIBUTION_PSEARCH_CONTROL_NOT_SUPPORTED=The Persistent Search request control is not supported across multiple \
  partitions, except if it requests changes only
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java
New file
@@ -0,0 +1,382 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static org.assertj.core.api.Assertions.*;
import static org.forgerock.opendj.ldap.Connections.newFixedSizeDistributionLoadBalancer;
import static org.forgerock.opendj.ldap.Filter.alwaysTrue;
import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT;
import static org.forgerock.opendj.ldap.SearchScope.SINGLE_LEVEL;
import static org.forgerock.opendj.ldap.SearchScope.SUBORDINATES;
import static org.forgerock.opendj.ldap.SearchScope.WHOLE_SUBTREE;
import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
import static org.forgerock.opendj.ldap.requests.Requests.*;
import static org.forgerock.opendj.ldap.responses.Responses.newBindResult;
import static org.forgerock.opendj.ldap.responses.Responses.newCompareResult;
import static org.forgerock.opendj.ldap.responses.Responses.newResult;
import static org.forgerock.opendj.ldap.spi.LdapPromises.newSuccessfulLdapPromise;
import static org.forgerock.util.Options.defaultOptions;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.util.Function;
import org.forgerock.util.promise.NeverThrowsException;
import org.mockito.Mock;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
public class ConsistentHashDistributionLoadBalancerTest extends SdkTestCase {
    private static final DN PARTITION_BASE_DN = DN.valueOf("ou=people,dc=example,dc=com");
    private static final DN DN_1_BELOW_PARTITION_BASE_DN = PARTITION_BASE_DN.child("uid=bjensen");
    private static final DN DN_2_BELOW_PARTITION_BASE_DN = DN_1_BELOW_PARTITION_BASE_DN.child("cn=prefs");
    private static final DN DN_ABOVE_PARTITION_BASE_DN = PARTITION_BASE_DN.parent();
    private static final DN UNPARTITIONED_DN = DN.valueOf("ou=groups,dc=example,dc=com");
    private static final LdapPromise<Result> SUCCESS = newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS));
    private static final LdapPromise<BindResult> BIND_SUCCESS =
            newSuccessfulLdapPromise(newBindResult(ResultCode.SUCCESS));
    private static final LdapPromise<CompareResult> COMPARE_SUCCESS =
            newSuccessfulLdapPromise(newCompareResult(ResultCode.SUCCESS));
    private static final int P1_HASH = 0x00000000;
    private static final int P2_HASH = 0x80000000;
    @Mock
    private Connection partition1Conn;
    @Mock
    private Connection partition2Conn;
    @Mock
    private Function<Object, Integer, NeverThrowsException> hashFunction;
    private ConnectionFactory partition1;
    private ConnectionFactory partition2;
    private ConnectionFactory loadBalancer;
    @BeforeMethod
    public void beforeMethod() {
        initMocks(this);
        partition1 = mockConnectionFactory(partition1Conn);
        partition2 = mockConnectionFactory(partition2Conn);
        when(hashFunction.apply(any())).thenReturn(P1_HASH, P2_HASH);
        final ConsistentHashMap<ConnectionFactory> partitions = new ConsistentHashMap<>(hashFunction);
        partitions.put("P1", partition1, 1);
        partitions.put("P2", partition2, 1);
        loadBalancer = newFixedSizeDistributionLoadBalancer(PARTITION_BASE_DN, partitions, defaultOptions());
        when(partition1Conn.addAsync(any(AddRequest.class),
                                     any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition2Conn.addAsync(any(AddRequest.class),
                                     any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition1Conn.bindAsync(any(BindRequest.class),
                                      any(IntermediateResponseHandler.class))).thenReturn(BIND_SUCCESS);
        when(partition2Conn.bindAsync(any(BindRequest.class),
                                      any(IntermediateResponseHandler.class))).thenReturn(BIND_SUCCESS);
        when(partition1Conn.compareAsync(any(CompareRequest.class),
                                         any(IntermediateResponseHandler.class))).thenReturn(COMPARE_SUCCESS);
        when(partition2Conn.compareAsync(any(CompareRequest.class),
                                         any(IntermediateResponseHandler.class))).thenReturn(COMPARE_SUCCESS);
        when(partition1Conn.deleteAsync(any(DeleteRequest.class),
                                        any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition2Conn.deleteAsync(any(DeleteRequest.class),
                                        any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition1Conn.extendedRequestAsync(any(ExtendedRequest.class),
                                                 any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition2Conn.extendedRequestAsync(any(ExtendedRequest.class),
                                                 any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition1Conn.modifyAsync(any(ModifyRequest.class),
                                        any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition2Conn.modifyAsync(any(ModifyRequest.class),
                                        any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition1Conn.modifyDNAsync(any(ModifyDNRequest.class),
                                          any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition2Conn.modifyDNAsync(any(ModifyDNRequest.class),
                                          any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
        when(partition1Conn.searchAsync(any(SearchRequest.class),
                                        any(IntermediateResponseHandler.class),
                                        any(SearchResultHandler.class))) .thenReturn(SUCCESS);
        when(partition2Conn.searchAsync(any(SearchRequest.class),
                                        any(IntermediateResponseHandler.class),
                                        any(SearchResultHandler.class))).thenReturn(SUCCESS);
    }
    @Test
    public void closeShouldCloseAllPartitions() throws Exception {
        loadBalancer.close();
        verify(partition1).close();
        verify(partition2).close();
    }
    @Test
    public void getConnectionShouldReturnLogicalConnection() throws Exception {
        try (final Connection connection = loadBalancer.getConnection()) {
            assertThat(connection).isNotNull();
            connection.close();
        }
        verifyZeroInteractions(partition1, partition2);
    }
    @Test
    public void getConnectionAsyncShouldReturnLogicalConnection() throws Exception {
        try (final Connection connection = loadBalancer.getConnectionAsync().get()) {
            assertThat(connection).isNotNull();
            connection.close();
        }
        verifyZeroInteractions(partition1, partition2);
    }
    @DataProvider
    public Object[][] requests() {
        return new Object[][] { { P1_HASH, false, UNPARTITIONED_DN, UNPARTITIONED_DN },
                                { P2_HASH, true, UNPARTITIONED_DN, UNPARTITIONED_DN },
                                { P1_HASH, false, DN_ABOVE_PARTITION_BASE_DN, DN_ABOVE_PARTITION_BASE_DN },
                                { P2_HASH, true, DN_ABOVE_PARTITION_BASE_DN, DN_ABOVE_PARTITION_BASE_DN },
                                { P1_HASH, false, DN_1_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
                                { P2_HASH, true, DN_1_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
                                { P1_HASH, false, DN_2_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
                                { P2_HASH, true, DN_2_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN } };
    }
    @Test(dataProvider = "requests")
    public void addShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
                                        final DN partitionDN) throws Exception {
        when(hashFunction.apply(any())).thenReturn(hash);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.addAsync(newAddRequest(requestDN));
        }
        verify(hitPartition(isSecondPartition)).addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class));
        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
        verify(hitPartition(isSecondPartition)).close();
        verifyZeroInteractions(missPartition(isSecondPartition));
    }
    @Test(dataProvider = "requests")
    public void bindShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
                                         final DN partitionDN) throws Exception {
        when(hashFunction.apply(any())).thenReturn(hash);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.bindAsync(Requests.newSimpleBindRequest(requestDN.toString(), "password".toCharArray()));
        }
        verify(hitPartition(isSecondPartition)).bindAsync(any(BindRequest.class),
                                                          any(IntermediateResponseHandler.class));
        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
        verify(hitPartition(isSecondPartition)).close();
        verifyZeroInteractions(missPartition(isSecondPartition));
    }
    @Test(dataProvider = "requests")
    public void compareShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
                                            final DN partitionDN) throws Exception {
        when(hashFunction.apply(any())).thenReturn(hash);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.compareAsync(newCompareRequest(requestDN.toString(), "cn", "test"));
        }
        verify(hitPartition(isSecondPartition)).compareAsync(any(CompareRequest.class),
                                                             any(IntermediateResponseHandler.class));
        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
        verify(hitPartition(isSecondPartition)).close();
        verifyZeroInteractions(missPartition(isSecondPartition));
    }
    @Test(dataProvider = "requests")
    public void deleteShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
                                           final DN partitionDN) throws Exception {
        when(hashFunction.apply(any())).thenReturn(hash);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.deleteAsync(newDeleteRequest(requestDN));
        }
        verify(hitPartition(isSecondPartition)).deleteAsync(any(DeleteRequest.class),
                                                            any(IntermediateResponseHandler.class));
        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
        verify(hitPartition(isSecondPartition)).close();
        verifyZeroInteractions(missPartition(isSecondPartition));
    }
    @Test(dataProvider = "requests")
    public void extendedShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
                                             final DN partitionDN) throws Exception {
        when(hashFunction.apply(any())).thenReturn(hash);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.extendedRequestAsync(newPasswordModifyExtendedRequest().setUserIdentity("dn:" + requestDN));
        }
        verify(hitPartition(isSecondPartition)).extendedRequestAsync(any(ExtendedRequest.class),
                                                                     any(IntermediateResponseHandler.class));
        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
        verify(hitPartition(isSecondPartition)).close();
        verifyZeroInteractions(missPartition(isSecondPartition));
    }
    @Test(dataProvider = "requests")
    public void modifyShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
                                           final DN partitionDN) throws Exception {
        when(hashFunction.apply(any())).thenReturn(hash);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.modifyAsync(newModifyRequest(requestDN));
        }
        verify(hitPartition(isSecondPartition)).modifyAsync(any(ModifyRequest.class),
                                                            any(IntermediateResponseHandler.class));
        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
        verify(hitPartition(isSecondPartition)).close();
        verifyZeroInteractions(missPartition(isSecondPartition));
    }
    @Test(dataProvider = "requests")
    public void modifyDNShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
                                             final DN partitionDN) throws Exception {
        when(hashFunction.apply(any())).thenReturn(hash);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.modifyDNAsync(newModifyDNRequest(requestDN, RDN.valueOf("cn=changed")));
        }
        verify(hitPartition(isSecondPartition)).modifyDNAsync(any(ModifyDNRequest.class),
                                                              any(IntermediateResponseHandler.class));
        verify(hashFunction, atLeastOnce()).apply(partitionDN.toNormalizedUrlSafeString());
        verify(hitPartition(isSecondPartition)).close();
        verifyZeroInteractions(missPartition(isSecondPartition));
    }
    @Test(dataProvider = "requests")
    public void searchBaseShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
                                               final DN partitionDN) throws Exception {
        when(hashFunction.apply(any())).thenReturn(hash);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.searchAsync(newSearchRequest(requestDN, BASE_OBJECT, alwaysTrue()),
                                   mock(SearchResultHandler.class));
        }
        verify(hitPartition(isSecondPartition)).searchAsync(any(SearchRequest.class),
                                                            any(IntermediateResponseHandler.class),
                                                            any(SearchResultHandler.class));
        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
        verify(hitPartition(isSecondPartition)).close();
        verifyZeroInteractions(missPartition(isSecondPartition));
    }
    @Test
    public void searchSingleLevelBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
        verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, SINGLE_LEVEL);
    }
    @Test
    public void searchSingleLevelAbovePartitionBaseDNShouldRouteToSinglePartition() throws Exception {
        verifySearchAgainstSinglePartition(DN_ABOVE_PARTITION_BASE_DN, SINGLE_LEVEL);
    }
    @Test
    public void searchSingleLevelAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
        verifySearchAgainstSinglePartition(UNPARTITIONED_DN, SINGLE_LEVEL);
    }
    @Test
    public void searchSingleLevelAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
        verifySearchAgainstAllPartitions(PARTITION_BASE_DN, SINGLE_LEVEL);
    }
    @Test
    public void searchSubtreeBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
        verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, WHOLE_SUBTREE);
    }
    @Test
    public void searchSubtreeAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
        verifySearchAgainstSinglePartition(UNPARTITIONED_DN, WHOLE_SUBTREE);
    }
    @Test
    public void searchSubtreeAbovePartitionBaseDNShouldRouteToAllPartitions() throws Exception {
        verifySearchAgainstAllPartitions(DN_ABOVE_PARTITION_BASE_DN, WHOLE_SUBTREE);
    }
    @Test
    public void searchSubtreeAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
        verifySearchAgainstAllPartitions(PARTITION_BASE_DN, WHOLE_SUBTREE);
    }
    @Test
    public void searchSubordinatesBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
        verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, SUBORDINATES);
    }
    @Test
    public void searchSubordinatesAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
        verifySearchAgainstSinglePartition(UNPARTITIONED_DN, SUBORDINATES);
    }
    @Test
    public void searchSubordinatesAbovePartitionBaseDNShouldRouteToAllPartitions() throws Exception {
        verifySearchAgainstAllPartitions(DN_ABOVE_PARTITION_BASE_DN, SUBORDINATES);
    }
    @Test
    public void searchSubordinatesAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
        verifySearchAgainstAllPartitions(PARTITION_BASE_DN, SUBORDINATES);
    }
    private void verifySearchAgainstSinglePartition(final DN dn, final SearchScope scope) throws Exception {
        when(hashFunction.apply(any())).thenReturn(P1_HASH);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.searchAsync(newSearchRequest(dn, scope, alwaysTrue()),
                                   mock(SearchResultHandler.class));
        }
        verify(partition1Conn).searchAsync(any(SearchRequest.class),
                                           any(IntermediateResponseHandler.class),
                                           any(SearchResultHandler.class));
        verify(hashFunction).apply(dn.toNormalizedUrlSafeString());
        verify(partition1Conn).close();
        verifyZeroInteractions(partition2Conn);
    }
    private void verifySearchAgainstAllPartitions(final DN dn, final SearchScope scope) throws Exception {
        when(hashFunction.apply(any())).thenReturn(P1_HASH);
        try (final Connection connection = loadBalancer.getConnection()) {
            connection.searchAsync(newSearchRequest(dn, scope, alwaysTrue()), mock(SearchResultHandler.class));
        }
        verify(partition1Conn).searchAsync(any(SearchRequest.class),
                                           any(IntermediateResponseHandler.class),
                                           any(SearchResultHandler.class));
        verify(partition2Conn).searchAsync(any(SearchRequest.class),
                                           any(IntermediateResponseHandler.class),
                                           any(SearchResultHandler.class));
        verify(partition1Conn).close();
        verify(partition2Conn).close();
    }
    private Connection missPartition(final boolean isSecondPartition) {
        return isSecondPartition ? partition1Conn : partition2Conn;
    }
    private Connection hitPartition(final boolean isSecondPartition) {
        return isSecondPartition ? partition2Conn : partition1Conn;
    }
}
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java
New file
@@ -0,0 +1,141 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Map;
import org.forgerock.util.Function;
import org.forgerock.util.promise.NeverThrowsException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
public class ConsistentHashMapTest extends SdkTestCase {
    private static final String P1 = "partition-1";
    private static final String P2 = "partition-2";
    private static final String P3 = "partition-3";
    private static final String P4 = "partition-4";
    @DataProvider
    public static Object[][] md5HashFunction() {
        // @Checkstyle:off
        return new Object[][] {
            { P1, ByteString.valueOfHex("94f940e6c2a31703d6be067b87d67968").toInt() },
            { P2, ByteString.valueOfHex("1038f27a22f8b574670bfdd7f918ffb4").toInt() },
            { P3, ByteString.valueOfHex("a88f746bfae24480be40df47c679228a").toInt() },
            { P4, ByteString.valueOfHex("bdb89200d2a310f5afb345621f544183").toInt() },
        };
        // @Checkstyle:on
    }
    @Test(dataProvider = "md5HashFunction")
    public void testMD5HashFunction(final String s, final int expected) {
        assertThat(ConsistentHashMap.MD5.apply(s)).isEqualTo(expected);
    }
    @Test
    public void testConsistentHashMap() {
        // Create a hash function that returns predictable values.
        @SuppressWarnings({ "rawtypes", "unchecked" })
        final Function<Object, Integer, NeverThrowsException> hashFunction = mock(Function.class);
        // Hashes for the 16 logical partitions.
        when(hashFunction.apply(any())).thenReturn(0x00000000, 0x40000000, 0x80000000, 0xC0000000,
                                                   0x10000000, 0x50000000, 0x90000000, 0xD0000000,
                                                   0x20000000, 0x60000000, 0xA0000000, 0xE0000000,
                                                   0x30000000, 0x70000000, 0xB0000000, 0xF0000000);
        final ConsistentHashMap<String> partitions = new ConsistentHashMap<>(hashFunction);
        partitions.put(P1, P1, 4);
        partitions.put(P2, P2, 4);
        partitions.put(P3, P3, 4);
        partitions.put(P4, P4, 4);
        // Check the structure of the CHM.
        assertThat(partitions.isEmpty()).isFalse();
        assertThat(partitions.size()).isEqualTo(4);
        assertThat(partitions.getAll()).containsOnly(P1, P2, P3, P4);
        final Map<String, Long> weights = partitions.getWeights();
        assertThat(weights.size()).isEqualTo(4);
        assertThat(weights).containsEntry(P1, 0x40000000L);
        assertThat(weights).containsEntry(P2, 0x40000000L);
        assertThat(weights).containsEntry(P3, 0x40000000L);
        assertThat(weights).containsEntry(P4, 0x40000000L);
        // Hashes for several key lookups.
        when(hashFunction.apply(any())).thenReturn(0x70000001, 0x7FFFFFFF, 0x80000000,          // P1
                                                   0xF0000001, 0xFFFFFFFF, 0x00000000,          // P1
                                                   0x00000001, 0x0FFFFFFF, 0x10000000,          // P2
                                                   0xE0000001, 0xEFFFFFFF, 0xF0000000);         // P4
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P2)).isEqualTo(P2);
        assertThat(partitions.get(P2)).isEqualTo(P2);
        assertThat(partitions.get(P2)).isEqualTo(P2);
        assertThat(partitions.get(P4)).isEqualTo(P4);
        assertThat(partitions.get(P4)).isEqualTo(P4);
        assertThat(partitions.get(P4)).isEqualTo(P4);
        // Remove a partition and retry.
        when(hashFunction.apply(any())).thenReturn(0x10000000, 0x50000000, 0x90000000, 0xD0000000);
        partitions.remove(P2);
        // Check the structure of the CHM.
        assertThat(partitions.isEmpty()).isFalse();
        assertThat(partitions.size()).isEqualTo(3);
        assertThat(partitions.getAll()).containsOnly(P1, P3, P4);
        final Map<String, Long> newWeights = partitions.getWeights();
        assertThat(newWeights.size()).isEqualTo(3);
        assertThat(newWeights).containsEntry(P1, 0x40000000L);
        assertThat(newWeights).containsEntry(P3, 0x80000000L); // P2 now falls through to P3
        assertThat(newWeights).containsEntry(P4, 0x40000000L);
        // Hashes for several key lookups.
        when(hashFunction.apply(any())).thenReturn(0x70000001, 0x7FFFFFFF, 0x80000000,          // P1
                                                   0xF0000001, 0xFFFFFFFF, 0x00000000,          // P1
                                                   0x00000001, 0x0FFFFFFF, 0x10000000,          // P3 (was P2)
                                                   0xE0000001, 0xEFFFFFFF, 0xF0000000);         // P4
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P1)).isEqualTo(P1);
        assertThat(partitions.get(P2)).isEqualTo(P3);
        assertThat(partitions.get(P2)).isEqualTo(P3);
        assertThat(partitions.get(P2)).isEqualTo(P3);
        assertThat(partitions.get(P4)).isEqualTo(P4);
        assertThat(partitions.get(P4)).isEqualTo(P4);
        assertThat(partitions.get(P4)).isEqualTo(P4);
    }
}