From 953657d6645a26d4ce33f21eeafa31ed4e61fd81 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 10 Nov 2016 09:39:34 +0000
Subject: [PATCH] OPENDJ-3425: implement simple distribution load-balancer

---
 opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java                      |  141 +++++
 opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java     |  498 +++++++++++++++++++
 opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java                          |  288 +++++++++++
 opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties                            |   10 
 opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java |  382 +++++++++++++++
 opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java                                |  179 ++++--
 6 files changed, 1,442 insertions(+), 56 deletions(-)

diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
index bfd9999..43a3c3b 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -569,67 +569,134 @@
 
             @Override
             public RequestWithIndex apply(final Request request) {
-                // Normalize the hash to a valid factory index, taking care of negative hash values and especially
-                // Integer.MIN_VALUE (see doc for Math.abs()).
-                final int index = computeIndexBasedOnDnHashCode(request);
-                return new RequestWithIndex(request, index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex));
-            }
-
-            private int computeIndexBasedOnDnHashCode(final Request request) {
-                // The following conditions are ordered such that the most common operations appear first in order to
-                // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
-                // would only apply to the core operations, not extended operations or SASL binds.
-                if (request instanceof SearchRequest) {
-                    return ((SearchRequest) request).getName().hashCode();
-                } else if (request instanceof ModifyRequest) {
-                    return ((ModifyRequest) request).getName().hashCode();
-                } else if (request instanceof SimpleBindRequest) {
-                    return hashCodeOfDnString(((SimpleBindRequest) request).getName());
-                } else if (request instanceof AddRequest) {
-                    return ((AddRequest) request).getName().hashCode();
-                } else if (request instanceof DeleteRequest) {
-                    return ((DeleteRequest) request).getName().hashCode();
-                } else if (request instanceof CompareRequest) {
-                    return ((CompareRequest) request).getName().hashCode();
-                } else if (request instanceof ModifyDNRequest) {
-                    return ((ModifyDNRequest) request).getName().hashCode();
-                } else if (request instanceof PasswordModifyExtendedRequest) {
-                    return hashCodeOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
-                } else if (request instanceof PlainSASLBindRequest) {
-                    return hashCodeOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
-                } else if (request instanceof DigestMD5SASLBindRequest) {
-                    return hashCodeOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
-                } else if (request instanceof GSSAPISASLBindRequest) {
-                    return hashCodeOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
-                } else if (request instanceof CRAMMD5SASLBindRequest) {
-                    return hashCodeOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
-                } else {
-                    return distributeRequestAtRandom();
-                }
-            }
-
-            private int hashCodeOfAuthzid(final String authzid) {
-                if (authzid != null && authzid.startsWith("dn:")) {
-                    return hashCodeOfDnString(authzid.substring(3));
-                }
-                return distributeRequestAtRandom();
-            }
-
-            private int hashCodeOfDnString(final String dnString) {
-                try {
-                    return DN.valueOf(dnString).hashCode();
-                } catch (final IllegalArgumentException ignored) {
-                    return distributeRequestAtRandom();
-                }
-            }
-
-            private int distributeRequestAtRandom() {
-                return ThreadLocalRandom.current().nextInt(0, maxIndex);
+                final int index = computePartitionIdFromDN(dnOfRequest(request), maxIndex);
+                return new RequestWithIndex(request, index);
             }
         };
     }
 
     /**
+     * Returns the partition ID which should be selected based on the provided DN and number of partitions, taking care
+     * of negative hash values and especially Integer.MIN_VALUE (see doc for Math.abs()). If the provided DN is
+     * {@code null} then a random partition ID will be returned.
+     *
+     * @param dn
+     *         The DN whose partition ID is to be determined, which may be {@code null}.
+     * @param numberOfPartitions
+     *         The total number of partitions.
+     * @return A partition ID in the range 0 <= partitionID < numberOfPartitions.
+     */
+    private static int computePartitionIdFromDN(final DN dn, final int numberOfPartitions) {
+        final int index = dn != null ? dn.hashCode() : ThreadLocalRandom.current().nextInt(0, numberOfPartitions);
+        return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % numberOfPartitions);
+    }
+
+    /**
+     * Returns the DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be
+     * determined. This method will return {@code null} for most extended operations and SASL bind requests which
+     * specify a non-DN authorization ID.
+     *
+     * @param request
+     *         The request whose target entry DN is to be determined.
+     * @return The DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be
+     *         determined.
+     */
+    static DN dnOfRequest(final Request request) {
+        // The following conditions are ordered such that the most common operations appear first in order to
+        // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
+        // would only apply to the core operations, not extended operations or SASL binds.
+        if (request instanceof SearchRequest) {
+            return ((SearchRequest) request).getName();
+        } else if (request instanceof ModifyRequest) {
+            return ((ModifyRequest) request).getName();
+        } else if (request instanceof SimpleBindRequest) {
+            return dnOf(((SimpleBindRequest) request).getName());
+        } else if (request instanceof AddRequest) {
+            return ((AddRequest) request).getName();
+        } else if (request instanceof DeleteRequest) {
+            return ((DeleteRequest) request).getName();
+        } else if (request instanceof CompareRequest) {
+            return ((CompareRequest) request).getName();
+        } else if (request instanceof ModifyDNRequest) {
+            return ((ModifyDNRequest) request).getName();
+        } else if (request instanceof PasswordModifyExtendedRequest) {
+            return dnOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
+        } else if (request instanceof PlainSASLBindRequest) {
+            return dnOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
+        } else if (request instanceof DigestMD5SASLBindRequest) {
+            return dnOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
+        } else if (request instanceof GSSAPISASLBindRequest) {
+            return dnOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
+        } else if (request instanceof CRAMMD5SASLBindRequest) {
+            return dnOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
+        } else {
+            return null;
+        }
+    }
+
+    private static DN dnOfAuthzid(final String authzid) {
+        if (authzid != null && authzid.startsWith("dn:")) {
+            return dnOf(authzid.substring(3));
+        }
+        return null;
+    }
+
+    private static DN dnOf(final String dnString) {
+        try {
+            return DN.valueOf(dnString);
+        } catch (final IllegalArgumentException ignored) {
+            return null;
+        }
+    }
+
+    /**
+     * Creates a distribution load balancer which uses consistent hashing to distributes requests across a set of
+     * partitions based on a hash of each request's target DN. More precisely, a partition is selected as follows:
+     * <ul>
+     * <li>if the targeted entry lies beneath the partition base DN then the partition is selected based on a hash of
+     * the DN which is superior to the target DN and immediately subordinate to the partition base DN</li>
+     * <li>otherwise, if the request is not a search then the request is routed to a random partition</li>
+     * <li>otherwise, if the search request targets the partition base DN, or a superior thereof, then the search is
+     * routed to a random partition or broadcast to all partitions, depending on the search scope. When broadcasting,
+     * care is taken to re-scope sub-requests in order to avoid returning duplicate entries</li>
+     * <li>otherwise, the search is routed to a random partition because its scope lies outside of the partition
+     * space.</li>
+     * </ul>
+     * This load balancer allows client applications to linearly scale their deployment for write throughput as well
+     * as total number of entries. For example, if a single replicated topology can support 10000 updates/s and a
+     * total of 100M entries, then a 4 way distributed topology could support up to 40000 updates/s and 400M entries.
+     * <p/>
+     * <b>NOTE:</b> there are a number of assumptions in the design of this load balancer as well as a number of
+     * limitations:
+     * <ul>
+     * <li>simple paged results, server side sorting, and VLV request controls are not supported for searches which
+     * traverse all partitions. </li>
+     * <li>persistent searches which traverse all partitions are only supported if they request changes only. </li>
+     * <li>requests which target an entry which is not below the partition base DN will be routed to a partition
+     * selected based on the request's DN, thereby providing affinity. Note that this behavior assumes that entries
+     * which are not below the partition base DN are replicated across all partitions.</li>
+     * <li>searches that traverse multiple partitions as well as entries above the partition base DN may return
+     * results in a non-hierarchical order. Specifically, entries from a partition (below the partition base DN)
+     * may be returned before entries above the partition base DN. Although not required by the LDAP standard, some
+     * legacy clients expect entries to be returned in hierarchical order.
+     * </li>
+     * </ul>
+     *
+     * @param partitionBaseDN
+     *         The DN beneath which data is partitioned. All other data is assumed to be shared across all partitions.
+     * @param partitions
+     *         The consistent hash map containing the partitions to be distributed.
+     * @param options
+     *         The configuration options for the load-balancer (no options are supported currently).
+     * @return The new distribution load balancer.
+     */
+    @SuppressWarnings("unused")
+    public static ConnectionFactory newFixedSizeDistributionLoadBalancer(final DN partitionBaseDN,
+            final ConsistentHashMap<? extends ConnectionFactory> partitions, final Options options) {
+        return new ConsistentHashDistributionLoadBalancer(partitionBaseDN, partitions);
+    }
+
+    /**
      * Creates a new "least requests" load-balancer which will load-balance individual requests across the provided
      * set of connection factories, each typically representing a single replica, using an algorithm that ensures that
      * requests are routed to the replica which has the minimum number of active requests.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java
new file mode 100644
index 0000000..2441151
--- /dev/null
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java
@@ -0,0 +1,498 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static com.forgerock.opendj.ldap.CoreMessages.*;
+import static org.forgerock.opendj.ldap.Connections.dnOfRequest;
+import static org.forgerock.opendj.ldap.LdapException.newLdapException;
+import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_USER_CANCELLED;
+import static org.forgerock.opendj.ldap.ResultCode.PROTOCOL_ERROR;
+import static org.forgerock.opendj.ldap.ResultCode.UNWILLING_TO_PERFORM;
+import static org.forgerock.opendj.ldap.SearchScope.SUBORDINATES;
+import static org.forgerock.opendj.ldap.requests.Requests.copyOfSearchRequest;
+import static org.forgerock.opendj.ldap.spi.LdapPromises.newFailedLdapPromise;
+import static org.forgerock.util.Utils.closeSilently;
+import static org.forgerock.util.Utils.joinAsString;
+import static org.forgerock.util.promise.Promises.newResultPromise;
+import static org.forgerock.util.promise.Promises.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.opendj.ldap.controls.PersistentSearchRequestControl;
+import org.forgerock.opendj.ldap.controls.ServerSideSortRequestControl;
+import org.forgerock.opendj.ldap.controls.SimplePagedResultsControl;
+import org.forgerock.opendj.ldap.controls.VirtualListViewRequestControl;
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.IntermediateResponse;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.opendj.ldap.responses.SearchResultEntry;
+import org.forgerock.opendj.ldap.responses.SearchResultReference;
+import org.forgerock.opendj.ldap.spi.ConnectionState;
+import org.forgerock.opendj.ldap.spi.LdapPromises;
+import org.forgerock.util.AsyncFunction;
+import org.forgerock.util.promise.ExceptionHandler;
+import org.forgerock.util.promise.Promise;
+import org.forgerock.util.promise.PromiseImpl;
+import org.forgerock.util.promise.ResultHandler;
+import org.forgerock.util.promise.RuntimeExceptionHandler;
+
+/**
+ * A simple distribution algorithm which distributes requests equally across a fixed number of partitions based on a
+ * hash of the request's target DN. See {@link Connections#newFixedSizeDistributionLoadBalancer}.
+ */
+final class ConsistentHashDistributionLoadBalancer implements ConnectionFactory {
+    private static final String NAME = ConsistentHashDistributionLoadBalancer.class.getSimpleName();
+    private final ConsistentHashMap<? extends ConnectionFactory> partitions;
+    private final DN partitionBaseDN;
+
+    ConsistentHashDistributionLoadBalancer(final DN partitionBaseDN,
+                                           final ConsistentHashMap<? extends ConnectionFactory> partitions) {
+        this.partitionBaseDN = partitionBaseDN;
+        this.partitions = partitions;
+    }
+
+    @Override
+    public final void close() {
+        closeSilently(partitions.getAll());
+    }
+
+    @Override
+    public final String toString() {
+        return NAME + '(' + joinAsString(",", partitions) + ')';
+    }
+
+    @Override
+    public final Connection getConnection() throws LdapException {
+        return new ConnectionImpl();
+    }
+
+    @Override
+    public final Promise<Connection, LdapException> getConnectionAsync() {
+        return newResultPromise((Connection) new ConnectionImpl());
+    }
+
+    private class ConnectionImpl extends AbstractAsynchronousConnection {
+        private final ConnectionState state = new ConnectionState();
+
+        @Override
+        public String toString() {
+            return NAME + "Connection";
+        }
+
+        @Override
+        public LdapPromise<Void> abandonAsync(final AbandonRequest request) {
+            // We cannot possibly route these correctly, so just drop them.
+            return LdapPromises.newSuccessfulLdapPromise(null);
+        }
+
+        @Override
+        public LdapPromise<Result> addAsync(final AddRequest request,
+                                            final IntermediateResponseHandler intermediateResponseHandler) {
+            final ConnectionFactory partition = getPartition(request.getName());
+            return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.addAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public void addConnectionEventListener(final ConnectionEventListener listener) {
+            state.addConnectionEventListener(listener);
+        }
+
+        @Override
+        public LdapPromise<BindResult> bindAsync(final BindRequest request,
+                                                 final IntermediateResponseHandler intermediateResponseHandler) {
+            final DN dn = dnOfRequest(request);
+            if (dn == null) {
+                return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM,
+                                                             DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_BIND.get()));
+            }
+            final ConnectionFactory partition = getPartition(dn);
+            return connectAndSendRequest(partition, new AsyncFunction<Connection, BindResult, LdapException>() {
+                @Override
+                public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.bindAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public void close(final UnbindRequest request, final String reason) {
+            state.notifyConnectionClosed();
+        }
+
+        @Override
+        public LdapPromise<CompareResult> compareAsync(final CompareRequest request,
+                                                       final IntermediateResponseHandler intermediateResponseHandler) {
+            final ConnectionFactory partition = getPartition(request.getName());
+            return connectAndSendRequest(partition, new AsyncFunction<Connection, CompareResult, LdapException>() {
+                @Override
+                public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.compareAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public LdapPromise<Result> deleteAsync(final DeleteRequest request,
+                                               final IntermediateResponseHandler intermediateResponseHandler) {
+            // We could reject requests that attempt to delete entries superior to the partition base DN. However,
+            // this is really the responsibility of the backend servers to enforce. In some cases such requests
+            // should be allowed, e.g. if the partitions are all empty.
+            final ConnectionFactory partition = getPartition(request.getName());
+            return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.deleteAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(final ExtendedRequest<R> request,
+                                                                              final IntermediateResponseHandler
+                                                                                      intermediateResponseHandler) {
+            final DN dn = dnOfRequest(request);
+            if (dn == null) {
+                return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM,
+                                                             DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_EXT_OP.get()));
+            }
+            final ConnectionFactory partition = getPartition(dn);
+            return connectAndSendRequest(partition, new AsyncFunction<Connection, R, LdapException>() {
+                @Override
+                public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.extendedRequestAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public boolean isClosed() {
+            return state.isClosed();
+        }
+
+        @Override
+        public boolean isValid() {
+            return state.isValid();
+        }
+
+        @Override
+        public LdapPromise<Result> modifyAsync(final ModifyRequest request,
+                                               final IntermediateResponseHandler intermediateResponseHandler) {
+            final ConnectionFactory partition = getPartition(request.getName());
+            return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.modifyAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public LdapPromise<Result> modifyDNAsync(final ModifyDNRequest request,
+                                                 final IntermediateResponseHandler intermediateResponseHandler) {
+            final DN oldDN = request.getName();
+            final ConnectionFactory oldPartition = getPartition(oldDN);
+
+            final DN newParent = request.getNewSuperior() != null ? request.getNewSuperior() : oldDN.parent();
+            final DN newDN = newParent.child(request.getNewRDN());
+            final ConnectionFactory newPartition = getPartition(newDN);
+
+            // Acceptable if the request is completely outside of the partitions or entirely within a single partition.
+            if (oldPartition != newPartition) {
+                return unwillingToPerform(DISTRIBUTION_MODDN_SPANS_MULTIPLE_PARTITIONS.get());
+            }
+
+            return connectAndSendRequest(oldPartition, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.modifyDNAsync(request, intermediateResponseHandler);
+                }
+            });
+        }
+
+        @Override
+        public void removeConnectionEventListener(final ConnectionEventListener listener) {
+            state.removeConnectionEventListener(listener);
+        }
+
+        @Override
+        public LdapPromise<Result> searchAsync(final SearchRequest request,
+                                               final IntermediateResponseHandler intermediateResponseHandler,
+                                               final SearchResultHandler entryHandler) {
+            // Broadcast if the search targets an entry which is superior to the base DN and has a scope which
+            // reaches into the partitions.
+            final DN dn = request.getName();
+            switch (request.getScope().asEnum()) {
+            case BASE_OBJECT:
+                // Request can always be handled by a single partition.
+                return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
+            case SINGLE_LEVEL:
+                // Needs broadcasting if the search DN is equal to the partition DN.
+                if (dn.equals(partitionBaseDN)) {
+                    return searchAllPartitions(request, intermediateResponseHandler, entryHandler);
+                } else {
+                    return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
+                }
+            case WHOLE_SUBTREE:
+                // Needs broadcasting if the search DN is superior or equal to the partition DN.
+                if (dn.isSuperiorOrEqualTo(partitionBaseDN)) {
+                    // Split into primary search against search DN and secondary searches against partition DN.
+                    return splitAndSearchAllPartitions(request, intermediateResponseHandler, entryHandler);
+                } else {
+                    return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
+                }
+            case SUBORDINATES:
+                // Needs broadcasting if the search DN is superior or equal to the partition DN.
+                if (dn.equals(partitionBaseDN)) {
+                    return searchAllPartitions(request, intermediateResponseHandler, entryHandler);
+                } else if (dn.isSuperiorOrEqualTo(partitionBaseDN)) {
+                    // Split into primary search against search DN and secondary searches against partition DN.
+                    return splitAndSearchAllPartitions(request, intermediateResponseHandler, entryHandler);
+                } else {
+                    return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
+                }
+            default: // UNKNOWN
+                return unwillingToPerform(DISTRIBUTION_UNSUPPORTED_SEARCH_SCOPE.get(request.getScope().intValue()));
+            }
+        }
+
+        /**
+         * Forwards a search request to each partition. The search request is scoped such that duplicate results are
+         * not possible.
+         */
+        private LdapPromise<Result> searchAllPartitions(final SearchRequest request,
+                                                        final IntermediateResponseHandler irh,
+                                                        final SearchResultHandler srh) {
+            return broadcastSearch(request, request, irh, srh);
+        }
+
+        /**
+         * In order to avoid duplicates, search one partition using the request's DN and the remaining partitions using
+         * the partition base DN with scope "SUBORDINATES".
+         * <p>
+         * TODO: may return results that are not in hierarchical order.
+         */
+        private LdapPromise<Result> splitAndSearchAllPartitions(final SearchRequest primarySearch,
+                                                                final IntermediateResponseHandler irh,
+                                                                final SearchResultHandler srh) {
+            final SearchRequest secondarySearch =
+                    copyOfSearchRequest(primarySearch).setName(partitionBaseDN).setScope(SUBORDINATES);
+            return broadcastSearch(primarySearch, secondarySearch, irh, srh);
+        }
+
+        /**
+         * Take care to ensure that all searches are cancelled when one of them fails (important if the
+         * searches are persistent).
+         */
+        private LdapPromise<Result> broadcastSearch(final SearchRequest primarySearch,
+                                                    final SearchRequest secondarySearch,
+                                                    final IntermediateResponseHandler irh,
+                                                    final SearchResultHandler srh) {
+            // First reject requests that contain controls that we do not support when broadcast.
+            if (primarySearch.containsControl(VirtualListViewRequestControl.OID)) {
+                return unwillingToPerform(DISTRIBUTION_VLV_CONTROL_NOT_SUPPORTED.get());
+            } else if (primarySearch.containsControl(SimplePagedResultsControl.OID)) {
+                return unwillingToPerform(DISTRIBUTION_SPR_CONTROL_NOT_SUPPORTED.get());
+            } else if (primarySearch.containsControl(ServerSideSortRequestControl.OID)) {
+                return unwillingToPerform(DISTRIBUTION_SSS_CONTROL_NOT_SUPPORTED.get());
+            } else if (primarySearch.containsControl(PersistentSearchRequestControl.OID)) {
+                // Persistent searches return results in two phases: the initial search results and then the changes.
+                // Unfortunately, there's no way to determine when the first phase has completed, so it's not
+                // possible to prevent the two phases from being interleaved when broadcast.
+                try {
+                    final PersistentSearchRequestControl control =
+                            primarySearch.getControl(PersistentSearchRequestControl.DECODER, new DecodeOptions());
+                    if (!control.isChangesOnly()) {
+                        return unwillingToPerform(DISTRIBUTION_PSEARCH_CONTROL_NOT_SUPPORTED.get());
+                    }
+                } catch (final DecodeException e) {
+                    return newFailedLdapPromise(newLdapException(PROTOCOL_ERROR, e.getMessage(), e));
+                }
+            }
+
+            final List<Promise<Result, LdapException>> promises = new ArrayList<>(partitions.size());
+
+            // Launch all searches with the primary search targeting a random partition.
+            final ConnectionFactory primaryPartition = getPartition(primarySearch.getName());
+            final IntermediateResponseHandler sirh = synchronize(irh);
+            final SearchResultHandler ssrh = synchronize(srh);
+            for (final ConnectionFactory partition : partitions.getAll()) {
+                final SearchRequest searchRequest = partition == primaryPartition ? primarySearch : secondarySearch;
+                promises.add(searchSinglePartition(searchRequest, sirh, ssrh, partition));
+            }
+
+            // FIXME: chained PromiseImpl and Promises.when() don't chain cancellation requests.
+            final PromiseImpl<Result, LdapException> reducedPromise = new PromiseImpl<Result, LdapException>() {
+                @Override
+                protected LdapException tryCancel(final boolean mayInterruptIfRunning) {
+                    for (Promise<Result, LdapException> promise : promises) {
+                        promise.cancel(mayInterruptIfRunning);
+                    }
+                    return newLdapException(CLIENT_SIDE_USER_CANCELLED);
+                }
+            };
+            when(promises).thenOnResult(new ResultHandler<List<Result>>() {
+                @Override
+                public void handleResult(final List<Result> results) {
+                    // TODO: Depending on controls we may want to merge these results in some way.
+                    reducedPromise.handleResult(results.get(0));
+                }
+            }).thenOnException(new ExceptionHandler<LdapException>() {
+                @Override
+                public void handleException(final LdapException exception) {
+                    reducedPromise.handleException(exception);
+                }
+            }).thenOnRuntimeException(new RuntimeExceptionHandler() {
+                @Override
+                public void handleRuntimeException(final RuntimeException exception) {
+                    reducedPromise.handleRuntimeException(exception);
+                }
+            }).thenFinally(new Runnable() {
+                @Override
+                public void run() {
+                    // Ensure that any remaining searches are terminated.
+                    for (Promise<Result, LdapException> promise : promises) {
+                        promise.cancel(true);
+                    }
+                }
+            });
+
+            return LdapPromises.asPromise(reducedPromise);
+        }
+
+        private LdapPromise<Result> unwillingToPerform(final LocalizableMessage msg) {
+            return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM, msg));
+        }
+
+        /**
+         * Forwards a search request to a single partition based on the search's base DN. If the DN does not target a
+         * specific partition then a random partition will be selected.
+         */
+        private LdapPromise<Result> searchSinglePartition(final SearchRequest request,
+                                                          final IntermediateResponseHandler irh,
+                                                          final SearchResultHandler srh) {
+            final ConnectionFactory partition = getPartition(request.getName());
+            return searchSinglePartition(request, irh, srh, partition);
+        }
+
+        private LdapPromise<Result> searchSinglePartition(final SearchRequest request,
+                                                          final IntermediateResponseHandler irh,
+                                                          final SearchResultHandler srh,
+                                                          final ConnectionFactory partition) {
+            return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
+                @Override
+                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                    return connection.searchAsync(request, irh, srh);
+                }
+            });
+        }
+
+        /** Returns a search result handler that ensures that only one response is processed at a time. */
+        private SearchResultHandler synchronize(final SearchResultHandler srh) {
+            return new SearchResultHandler() {
+                @Override
+                public synchronized boolean handleEntry(final SearchResultEntry entry) {
+                    return srh.handleEntry(entry);
+                }
+
+                @Override
+                public synchronized boolean handleReference(final SearchResultReference reference) {
+                    return srh.handleReference(reference);
+                }
+            };
+        }
+
+        /** Returns an intermediate response handler that ensures that only one response is processed at a time. */
+        private IntermediateResponseHandler synchronize(final IntermediateResponseHandler irh) {
+            return new IntermediateResponseHandler() {
+                @Override
+                public synchronized boolean handleIntermediateResponse(final IntermediateResponse response) {
+                    return irh.handleIntermediateResponse(response);
+                }
+            };
+        }
+
+        /**
+         * Returns the partition for requests having the provided DN. If the DN is not associated with a specific
+         * partition then a random partition will be returned.
+         */
+        private ConnectionFactory getPartition(final DN dn) {
+            final DN partitionDN = getPartitionDN(dn);
+            return partitions.get((partitionDN != null ? partitionDN : dn).toNormalizedUrlSafeString());
+        }
+
+        /**
+         * Returns the DN which is a child of the partition base DN and which is equal to or superior to the provided
+         * DN, otherwise {@code null}.
+         */
+        private DN getPartitionDN(final DN dn) {
+            final int depthBelowBaseDN = dn.size() - partitionBaseDN.size();
+            if (depthBelowBaseDN > 0 && dn.isSubordinateOrEqualTo(partitionBaseDN)) {
+                return dn.parent(depthBelowBaseDN - 1);
+            }
+            return null;
+        }
+
+        private <R> LdapPromise<R> connectAndSendRequest(final ConnectionFactory partition,
+                                                         final AsyncFunction<Connection, R, LdapException> doRequest) {
+            if (state.isClosed()) {
+                throw new IllegalStateException("Connection is already closed");
+            }
+            final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
+            return connectAsync(partition).thenOnResult(new ResultHandler<Connection>() {
+                @Override
+                public void handleResult(final Connection connection) {
+                    connectionHolder.set(connection);
+                }
+                // FIXME: how do we support cancellation of the request?
+            }).thenAsync(doRequest).thenFinally(new Runnable() {
+                @Override
+                public void run() {
+                    closeSilently(connectionHolder.get());
+                }
+            });
+        }
+
+        private LdapPromise<Connection> connectAsync(final ConnectionFactory partition) {
+            return LdapPromises.asPromise(partition.getConnectionAsync()
+                                                   .thenOnException(new ExceptionHandler<LdapException>() {
+                                                       @Override
+                                                       public void handleException(final LdapException e) {
+                                                           state.notifyConnectionError(false, e);
+                                                       }
+                                                   }));
+        }
+    }
+}
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java
new file mode 100644
index 0000000..a43bf7b
--- /dev/null
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java
@@ -0,0 +1,288 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.forgerock.util.Function;
+import org.forgerock.util.Reject;
+import org.forgerock.util.annotations.VisibleForTesting;
+import org.forgerock.util.promise.NeverThrowsException;
+
+/**
+ * An implementation of "consistent hashing" supporting per-partition weighting. This implementation is thread safe
+ * and allows partitions to be added and removed during use.
+ * <p>
+ * This implementation maps partitions to one or more points on a circle ranging from {@link Integer#MIN_VALUE} to
+ * {@link Integer#MAX_VALUE}. The number of points per partition is dictated by the partition's weight. A partition
+ * with a weight which is higher than another partition will receive a proportionally higher load.
+ *
+ * @param <P> The type of partition object.
+ *
+ * @see <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.23.3738">Consistent Hashing and Random
+ * Trees</a>
+ * @see <a href="http://www8.org/w8-papers/2a-webserver/caching/paper2.html">Web Caching with Consistent Hashing</a>
+ */
+public final class ConsistentHashMap<P> {
+    // TODO: add methods for determining which partitions will need to be rebalanced when a partition is added or
+    // removed.
+    /** The default weight. The value is relatively high in order to minimize the risk of imbalances. */
+    private static final int DEFAULT_WEIGHT = 200;
+    /** Default hash function based on MD5. */
+    @VisibleForTesting
+    static final Function<Object, Integer, NeverThrowsException> MD5 =
+            new Function<Object, Integer, NeverThrowsException>() {
+                @Override
+                public Integer apply(final Object key) {
+                    final byte[] bytes = key.toString().getBytes(UTF_8);
+                    final byte[] digest = getMD5Digest().digest(bytes);
+                    return ByteString.wrap(digest).toInt();
+                }
+
+                private MessageDigest getMD5Digest() {
+                    // TODO: we may want to cache these.
+                    try {
+                        return MessageDigest.getInstance("MD5");
+                    } catch (NoSuchAlgorithmException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            };
+    /** Synchronizes updates. Reads are protected by copy on write. */
+    private final ReentrantLock writeLock = new ReentrantLock();
+    /** Consistent hash map circle. */
+    private volatile NavigableMap<Integer, Node<P>> circle = new TreeMap<>();
+    /** Maps partition IDs to their partition. */
+    private volatile Map<String, P> partitions = new LinkedHashMap<>();
+    /** Function used for hashing keys. */
+    private final Function<Object, Integer, NeverThrowsException> hashFunction;
+
+    /** Creates a new consistent hash map which will hash keys using MD5. */
+    public ConsistentHashMap() {
+        this(MD5);
+    }
+
+    /**
+     * Creates a new consistent hash map which will hash keys using the provided hash function.
+     *
+     * @param hashFunction
+     *         The function which should be used for hashing keys.
+     */
+    public ConsistentHashMap(final Function<Object, Integer, NeverThrowsException> hashFunction) {
+        this.hashFunction = hashFunction;
+    }
+
+    /**
+     * Puts a partition into this consistent hash map using the default weight which is sufficiently high to ensure a
+     * reasonably uniform distribution among all partitions having the same weight.
+     *
+     * @param partitionId
+     *         The partition ID.
+     * @param partition
+     *         The partition.
+     * @return This consistent hash map.
+     */
+    public ConsistentHashMap<P> put(final String partitionId, final P partition) {
+        return put(partitionId, partition, DEFAULT_WEIGHT);
+    }
+
+    /**
+     * Puts a partition into this consistent hash map using the specified weight. If all partitions have the same
+     * weight then they will each receive a similar amount of load. A partition having a weight which is twice that
+     * of another will receive twice the load. Weight values should generally be great than 200 in order to minimize
+     * the risk of unexpected imbalances due to the way in which logical partitions are mapped to real partitions.
+     *
+     * @param partitionId
+     *         The partition ID.
+     * @param partition
+     *         The partition.
+     * @param weight
+     *         The partition's weight, which should typically be over 200 and never negative.
+     * @return This consistent hash map.
+     */
+    public ConsistentHashMap<P> put(final String partitionId, final P partition, final int weight) {
+        Reject.ifNull(partitionId, "partitionId must be non-null");
+        Reject.ifNull(partition, "partition must be non-null");
+        Reject.ifTrue(weight < 0, "Weight must be a positive integer");
+
+        final Node<P> node = new Node<>(partitionId, partition, weight);
+        writeLock.lock();
+        try {
+            final TreeMap<Integer, Node<P>> newCircle = new TreeMap<>(circle);
+            for (int i = 0; i < weight; i++) {
+                newCircle.put(hashFunction.apply(partitionId + i), node);
+            }
+
+            final Map<String, P> newPartitions = new LinkedHashMap<>(partitions);
+            newPartitions.put(partitionId, partition);
+
+            // It doesn't matter that these assignments are not atomic.
+            circle = newCircle;
+            partitions = newPartitions;
+        } finally {
+            writeLock.unlock();
+        }
+        return this;
+    }
+
+    /**
+     * Removes the partition that was previously added using the provided partition ID.
+     *
+     * @param partitionId
+     *         The partition ID.
+     * @return This consistent hash map.
+     */
+    public ConsistentHashMap<P> remove(final String partitionId) {
+        Reject.ifNull(partitionId, "partitionId must be non-null");
+
+        writeLock.lock();
+        try {
+            if (partitions.containsKey(partitionId)) {
+                final TreeMap<Integer, Node<P>> newCircle = new TreeMap<>(circle);
+                final Node<P> node = newCircle.remove(hashFunction.apply(partitionId + 0));
+                for (int i = 1; i < node.weight; i++) {
+                    newCircle.remove(hashFunction.apply(partitionId + i));
+                }
+
+                final Map<String, P> newPartitions = new LinkedHashMap<>(partitions);
+                newPartitions.remove(partitionId);
+
+                // It doesn't matter that these assignments are not atomic.
+                circle = newCircle;
+                partitions = newPartitions;
+            }
+        } finally {
+            writeLock.unlock();
+        }
+        return this;
+    }
+
+    /**
+     * Returns the partition from this map corresponding to the provided key's hash, or {@code null} if this map is
+     * empty.
+     *
+     * @param key
+     *         The key for which a corresponding partition is to be returned.
+     * @return The partition from this map corresponding to the provided key's hash, or {@code null} if this map is
+     * empty.
+     */
+    P get(final Object key) {
+        final NavigableMap<Integer, Node<P>> circleSnapshot = circle;
+        final Map.Entry<Integer, Node<P>> ceilingEntry = circleSnapshot.ceilingEntry(hashFunction.apply(key));
+        if (ceilingEntry != null) {
+            return ceilingEntry.getValue().partition;
+        }
+        final Map.Entry<Integer, Node<P>> firstEntry = circleSnapshot.firstEntry();
+        return firstEntry != null ? firstEntry.getValue().partition : null;
+    }
+
+    /**
+     * Returns a collection containing all of the partitions contained in this consistent hash map.
+     *
+     * @return A collection containing all of the partitions contained in this consistent hash map.
+     */
+    Collection<P> getAll() {
+        return partitions.values();
+    }
+
+    /**
+     * Returns the number of partitions in this consistent hash map.
+     *
+     * @return The number of partitions in this consistent hash map.
+     */
+    int size() {
+        return partitions.size();
+    }
+
+    /**
+     * Returns {@code true} if there are no partitions in this consistent hash map.
+     *
+     * @return {@code true} if there are no partitions in this consistent hash map.
+     */
+    boolean isEmpty() {
+        return partitions.isEmpty();
+    }
+
+    /**
+     * Returns a map whose keys are the partitions stored in this map and whose values are the actual weights associated
+     * with each partition. The sum of the weights will be equal to 2^32.
+     * <p/>
+     * This method is intended for testing, but may one day be used in order to query the current status of the
+     * load-balancer and, in particular, the weighting associated with each partition as a percentage.
+     *
+     * @return A map whose keys are the partitions stored in this map and whose values are the actual weights associated
+     * with each partition.
+     */
+    @VisibleForTesting
+    Map<P, Long> getWeights() {
+        final NavigableMap<Integer, Node<P>> circleSnapshot = circle;
+        final IdentityHashMap<P, Long> weights = new IdentityHashMap<>();
+        Map.Entry<Integer, Node<P>> previousEntry = null;
+        for (final Map.Entry<Integer, Node<P>> entry : circleSnapshot.entrySet()) {
+            final long index = entry.getKey();
+            final P partition = entry.getValue().partition;
+            if (previousEntry == null) {
+                // Special case for first value since the range begins with the last entry.
+                final long range1 = (long) Integer.MAX_VALUE - circleSnapshot.lastEntry().getKey();
+                final long range2 = index - Integer.MIN_VALUE;
+                weights.put(partition, range1 + range2 + 1);
+            } else {
+                final long start = previousEntry.getKey();
+                final long end = entry.getKey();
+                if (weights.containsKey(partition)) {
+                    weights.put(partition, weights.get(partition) + (end - start));
+                } else {
+                    weights.put(partition, end - start);
+                }
+            }
+            previousEntry = entry;
+        }
+        return weights;
+    }
+
+    @Override
+    public String toString() {
+        return getWeights().toString();
+    }
+
+    /** A partition stored in the consistent hash map circle. */
+    private static final class Node<P> {
+        private final String partitionId;
+        private final P partition;
+        private final int weight;
+
+        private Node(final String partitionId, final P partition, final int weight) {
+            this.partitionId = partitionId;
+            this.partition = partition;
+            this.weight = weight;
+        }
+
+        @Override
+        public String toString() {
+            return partitionId;
+        }
+    }
+}
diff --git a/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties b/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
index d3d8375..f5ec2c8 100644
--- a/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
+++ b/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
@@ -1725,3 +1725,13 @@
 ERR_UNKNOWN_REQUEST_TYPE=The following request has a unknown request type: %s
 WARN_DECODING_AFFINITY_CONTROL=An error occurred while decoding an affinity control: %s
 ERR_SASL_BIND_MULTI_STAGE=An error occurred during multi-stage authentication: '%s'
+DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_BIND=Unable to determine the partition ID from bind request
+DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_EXT_OP=Unable to determine the partition ID from extended request
+DISTRIBUTION_DELETE_SPANS_MULTIPLE_PARTITIONS=The delete request spans multiple partitions
+DISTRIBUTION_MODDN_SPANS_MULTIPLE_PARTITIONS=The modify DN request spans multiple partitions
+DISTRIBUTION_UNSUPPORTED_SEARCH_SCOPE=The search scope %d is not supported by the distribution layer
+DISTRIBUTION_VLV_CONTROL_NOT_SUPPORTED=The VLV request control is not supported across multiple partitions
+DISTRIBUTION_SPR_CONTROL_NOT_SUPPORTED=The Simple Paged Results request control is not supported across multiple partitions
+DISTRIBUTION_SSS_CONTROL_NOT_SUPPORTED=The Server Side Sort request control is not supported across multiple partitions
+DISTRIBUTION_PSEARCH_CONTROL_NOT_SUPPORTED=The Persistent Search request control is not supported across multiple \
+  partitions, except if it requests changes only
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java
new file mode 100644
index 0000000..44f91a2
--- /dev/null
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java
@@ -0,0 +1,382 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.forgerock.opendj.ldap.Connections.newFixedSizeDistributionLoadBalancer;
+import static org.forgerock.opendj.ldap.Filter.alwaysTrue;
+import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT;
+import static org.forgerock.opendj.ldap.SearchScope.SINGLE_LEVEL;
+import static org.forgerock.opendj.ldap.SearchScope.SUBORDINATES;
+import static org.forgerock.opendj.ldap.SearchScope.WHOLE_SUBTREE;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
+import static org.forgerock.opendj.ldap.requests.Requests.*;
+import static org.forgerock.opendj.ldap.responses.Responses.newBindResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newCompareResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newResult;
+import static org.forgerock.opendj.ldap.spi.LdapPromises.newSuccessfulLdapPromise;
+import static org.forgerock.util.Options.defaultOptions;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.Requests;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.mockito.Mock;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+public class ConsistentHashDistributionLoadBalancerTest extends SdkTestCase {
+    private static final DN PARTITION_BASE_DN = DN.valueOf("ou=people,dc=example,dc=com");
+    private static final DN DN_1_BELOW_PARTITION_BASE_DN = PARTITION_BASE_DN.child("uid=bjensen");
+    private static final DN DN_2_BELOW_PARTITION_BASE_DN = DN_1_BELOW_PARTITION_BASE_DN.child("cn=prefs");
+    private static final DN DN_ABOVE_PARTITION_BASE_DN = PARTITION_BASE_DN.parent();
+    private static final DN UNPARTITIONED_DN = DN.valueOf("ou=groups,dc=example,dc=com");
+    private static final LdapPromise<Result> SUCCESS = newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS));
+    private static final LdapPromise<BindResult> BIND_SUCCESS =
+            newSuccessfulLdapPromise(newBindResult(ResultCode.SUCCESS));
+    private static final LdapPromise<CompareResult> COMPARE_SUCCESS =
+            newSuccessfulLdapPromise(newCompareResult(ResultCode.SUCCESS));
+    private static final int P1_HASH = 0x00000000;
+    private static final int P2_HASH = 0x80000000;
+    @Mock
+    private Connection partition1Conn;
+    @Mock
+    private Connection partition2Conn;
+    @Mock
+    private Function<Object, Integer, NeverThrowsException> hashFunction;
+    private ConnectionFactory partition1;
+    private ConnectionFactory partition2;
+    private ConnectionFactory loadBalancer;
+
+    @BeforeMethod
+    public void beforeMethod() {
+        initMocks(this);
+
+        partition1 = mockConnectionFactory(partition1Conn);
+        partition2 = mockConnectionFactory(partition2Conn);
+
+        when(hashFunction.apply(any())).thenReturn(P1_HASH, P2_HASH);
+
+        final ConsistentHashMap<ConnectionFactory> partitions = new ConsistentHashMap<>(hashFunction);
+        partitions.put("P1", partition1, 1);
+        partitions.put("P2", partition2, 1);
+
+        loadBalancer = newFixedSizeDistributionLoadBalancer(PARTITION_BASE_DN, partitions, defaultOptions());
+
+        when(partition1Conn.addAsync(any(AddRequest.class),
+                                     any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+        when(partition2Conn.addAsync(any(AddRequest.class),
+                                     any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+        when(partition1Conn.bindAsync(any(BindRequest.class),
+                                      any(IntermediateResponseHandler.class))).thenReturn(BIND_SUCCESS);
+        when(partition2Conn.bindAsync(any(BindRequest.class),
+                                      any(IntermediateResponseHandler.class))).thenReturn(BIND_SUCCESS);
+
+        when(partition1Conn.compareAsync(any(CompareRequest.class),
+                                         any(IntermediateResponseHandler.class))).thenReturn(COMPARE_SUCCESS);
+        when(partition2Conn.compareAsync(any(CompareRequest.class),
+                                         any(IntermediateResponseHandler.class))).thenReturn(COMPARE_SUCCESS);
+
+        when(partition1Conn.deleteAsync(any(DeleteRequest.class),
+                                        any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+        when(partition2Conn.deleteAsync(any(DeleteRequest.class),
+                                        any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+        when(partition1Conn.extendedRequestAsync(any(ExtendedRequest.class),
+                                                 any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+        when(partition2Conn.extendedRequestAsync(any(ExtendedRequest.class),
+                                                 any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+        when(partition1Conn.modifyAsync(any(ModifyRequest.class),
+                                        any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+        when(partition2Conn.modifyAsync(any(ModifyRequest.class),
+                                        any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+        when(partition1Conn.modifyDNAsync(any(ModifyDNRequest.class),
+                                          any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+        when(partition2Conn.modifyDNAsync(any(ModifyDNRequest.class),
+                                          any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+        when(partition1Conn.searchAsync(any(SearchRequest.class),
+                                        any(IntermediateResponseHandler.class),
+                                        any(SearchResultHandler.class))) .thenReturn(SUCCESS);
+        when(partition2Conn.searchAsync(any(SearchRequest.class),
+                                        any(IntermediateResponseHandler.class),
+                                        any(SearchResultHandler.class))).thenReturn(SUCCESS);
+    }
+
+    @Test
+    public void closeShouldCloseAllPartitions() throws Exception {
+        loadBalancer.close();
+        verify(partition1).close();
+        verify(partition2).close();
+    }
+
+    @Test
+    public void getConnectionShouldReturnLogicalConnection() throws Exception {
+        try (final Connection connection = loadBalancer.getConnection()) {
+            assertThat(connection).isNotNull();
+            connection.close();
+        }
+        verifyZeroInteractions(partition1, partition2);
+    }
+
+    @Test
+    public void getConnectionAsyncShouldReturnLogicalConnection() throws Exception {
+        try (final Connection connection = loadBalancer.getConnectionAsync().get()) {
+            assertThat(connection).isNotNull();
+            connection.close();
+        }
+        verifyZeroInteractions(partition1, partition2);
+    }
+
+    @DataProvider
+    public Object[][] requests() {
+        return new Object[][] { { P1_HASH, false, UNPARTITIONED_DN, UNPARTITIONED_DN },
+                                { P2_HASH, true, UNPARTITIONED_DN, UNPARTITIONED_DN },
+                                { P1_HASH, false, DN_ABOVE_PARTITION_BASE_DN, DN_ABOVE_PARTITION_BASE_DN },
+                                { P2_HASH, true, DN_ABOVE_PARTITION_BASE_DN, DN_ABOVE_PARTITION_BASE_DN },
+                                { P1_HASH, false, DN_1_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
+                                { P2_HASH, true, DN_1_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
+                                { P1_HASH, false, DN_2_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
+                                { P2_HASH, true, DN_2_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN } };
+    }
+
+    @Test(dataProvider = "requests")
+    public void addShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+                                        final DN partitionDN) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(hash);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.addAsync(newAddRequest(requestDN));
+        }
+        verify(hitPartition(isSecondPartition)).addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class));
+        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+        verify(hitPartition(isSecondPartition)).close();
+        verifyZeroInteractions(missPartition(isSecondPartition));
+    }
+
+    @Test(dataProvider = "requests")
+    public void bindShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+                                         final DN partitionDN) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(hash);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.bindAsync(Requests.newSimpleBindRequest(requestDN.toString(), "password".toCharArray()));
+        }
+        verify(hitPartition(isSecondPartition)).bindAsync(any(BindRequest.class),
+                                                          any(IntermediateResponseHandler.class));
+        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+        verify(hitPartition(isSecondPartition)).close();
+        verifyZeroInteractions(missPartition(isSecondPartition));
+    }
+
+    @Test(dataProvider = "requests")
+    public void compareShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+                                            final DN partitionDN) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(hash);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.compareAsync(newCompareRequest(requestDN.toString(), "cn", "test"));
+        }
+        verify(hitPartition(isSecondPartition)).compareAsync(any(CompareRequest.class),
+                                                             any(IntermediateResponseHandler.class));
+        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+        verify(hitPartition(isSecondPartition)).close();
+        verifyZeroInteractions(missPartition(isSecondPartition));
+    }
+
+    @Test(dataProvider = "requests")
+    public void deleteShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+                                           final DN partitionDN) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(hash);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.deleteAsync(newDeleteRequest(requestDN));
+        }
+        verify(hitPartition(isSecondPartition)).deleteAsync(any(DeleteRequest.class),
+                                                            any(IntermediateResponseHandler.class));
+        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+        verify(hitPartition(isSecondPartition)).close();
+        verifyZeroInteractions(missPartition(isSecondPartition));
+    }
+
+    @Test(dataProvider = "requests")
+    public void extendedShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+                                             final DN partitionDN) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(hash);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.extendedRequestAsync(newPasswordModifyExtendedRequest().setUserIdentity("dn:" + requestDN));
+        }
+        verify(hitPartition(isSecondPartition)).extendedRequestAsync(any(ExtendedRequest.class),
+                                                                     any(IntermediateResponseHandler.class));
+        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+        verify(hitPartition(isSecondPartition)).close();
+        verifyZeroInteractions(missPartition(isSecondPartition));
+    }
+
+    @Test(dataProvider = "requests")
+    public void modifyShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+                                           final DN partitionDN) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(hash);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.modifyAsync(newModifyRequest(requestDN));
+        }
+        verify(hitPartition(isSecondPartition)).modifyAsync(any(ModifyRequest.class),
+                                                            any(IntermediateResponseHandler.class));
+        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+        verify(hitPartition(isSecondPartition)).close();
+        verifyZeroInteractions(missPartition(isSecondPartition));
+    }
+
+    @Test(dataProvider = "requests")
+    public void modifyDNShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+                                             final DN partitionDN) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(hash);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.modifyDNAsync(newModifyDNRequest(requestDN, RDN.valueOf("cn=changed")));
+        }
+        verify(hitPartition(isSecondPartition)).modifyDNAsync(any(ModifyDNRequest.class),
+                                                              any(IntermediateResponseHandler.class));
+        verify(hashFunction, atLeastOnce()).apply(partitionDN.toNormalizedUrlSafeString());
+        verify(hitPartition(isSecondPartition)).close();
+        verifyZeroInteractions(missPartition(isSecondPartition));
+    }
+
+    @Test(dataProvider = "requests")
+    public void searchBaseShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+                                               final DN partitionDN) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(hash);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.searchAsync(newSearchRequest(requestDN, BASE_OBJECT, alwaysTrue()),
+                                   mock(SearchResultHandler.class));
+        }
+        verify(hitPartition(isSecondPartition)).searchAsync(any(SearchRequest.class),
+                                                            any(IntermediateResponseHandler.class),
+                                                            any(SearchResultHandler.class));
+        verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+        verify(hitPartition(isSecondPartition)).close();
+        verifyZeroInteractions(missPartition(isSecondPartition));
+    }
+
+    @Test
+    public void searchSingleLevelBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+        verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, SINGLE_LEVEL);
+    }
+
+    @Test
+    public void searchSingleLevelAbovePartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+        verifySearchAgainstSinglePartition(DN_ABOVE_PARTITION_BASE_DN, SINGLE_LEVEL);
+    }
+
+    @Test
+    public void searchSingleLevelAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+        verifySearchAgainstSinglePartition(UNPARTITIONED_DN, SINGLE_LEVEL);
+    }
+
+    @Test
+    public void searchSingleLevelAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+        verifySearchAgainstAllPartitions(PARTITION_BASE_DN, SINGLE_LEVEL);
+    }
+
+    @Test
+    public void searchSubtreeBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+        verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, WHOLE_SUBTREE);
+    }
+
+    @Test
+    public void searchSubtreeAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+        verifySearchAgainstSinglePartition(UNPARTITIONED_DN, WHOLE_SUBTREE);
+    }
+
+    @Test
+    public void searchSubtreeAbovePartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+        verifySearchAgainstAllPartitions(DN_ABOVE_PARTITION_BASE_DN, WHOLE_SUBTREE);
+    }
+
+    @Test
+    public void searchSubtreeAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+        verifySearchAgainstAllPartitions(PARTITION_BASE_DN, WHOLE_SUBTREE);
+    }
+
+    @Test
+    public void searchSubordinatesBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+        verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, SUBORDINATES);
+    }
+
+    @Test
+    public void searchSubordinatesAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+        verifySearchAgainstSinglePartition(UNPARTITIONED_DN, SUBORDINATES);
+    }
+
+    @Test
+    public void searchSubordinatesAbovePartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+        verifySearchAgainstAllPartitions(DN_ABOVE_PARTITION_BASE_DN, SUBORDINATES);
+    }
+
+    @Test
+    public void searchSubordinatesAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+        verifySearchAgainstAllPartitions(PARTITION_BASE_DN, SUBORDINATES);
+    }
+
+    private void verifySearchAgainstSinglePartition(final DN dn, final SearchScope scope) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(P1_HASH);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.searchAsync(newSearchRequest(dn, scope, alwaysTrue()),
+                                   mock(SearchResultHandler.class));
+        }
+        verify(partition1Conn).searchAsync(any(SearchRequest.class),
+                                           any(IntermediateResponseHandler.class),
+                                           any(SearchResultHandler.class));
+        verify(hashFunction).apply(dn.toNormalizedUrlSafeString());
+        verify(partition1Conn).close();
+        verifyZeroInteractions(partition2Conn);
+    }
+
+    private void verifySearchAgainstAllPartitions(final DN dn, final SearchScope scope) throws Exception {
+        when(hashFunction.apply(any())).thenReturn(P1_HASH);
+        try (final Connection connection = loadBalancer.getConnection()) {
+            connection.searchAsync(newSearchRequest(dn, scope, alwaysTrue()), mock(SearchResultHandler.class));
+        }
+        verify(partition1Conn).searchAsync(any(SearchRequest.class),
+                                           any(IntermediateResponseHandler.class),
+                                           any(SearchResultHandler.class));
+        verify(partition2Conn).searchAsync(any(SearchRequest.class),
+                                           any(IntermediateResponseHandler.class),
+                                           any(SearchResultHandler.class));
+        verify(partition1Conn).close();
+        verify(partition2Conn).close();
+    }
+
+    private Connection missPartition(final boolean isSecondPartition) {
+        return isSecondPartition ? partition1Conn : partition2Conn;
+    }
+
+    private Connection hitPartition(final boolean isSecondPartition) {
+        return isSecondPartition ? partition2Conn : partition1Conn;
+    }
+}
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java
new file mode 100644
index 0000000..c7316dc
--- /dev/null
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java
@@ -0,0 +1,141 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+public class ConsistentHashMapTest extends SdkTestCase {
+    private static final String P1 = "partition-1";
+    private static final String P2 = "partition-2";
+    private static final String P3 = "partition-3";
+    private static final String P4 = "partition-4";
+
+    @DataProvider
+    public static Object[][] md5HashFunction() {
+        // @Checkstyle:off
+        return new Object[][] {
+            { P1, ByteString.valueOfHex("94f940e6c2a31703d6be067b87d67968").toInt() },
+            { P2, ByteString.valueOfHex("1038f27a22f8b574670bfdd7f918ffb4").toInt() },
+            { P3, ByteString.valueOfHex("a88f746bfae24480be40df47c679228a").toInt() },
+            { P4, ByteString.valueOfHex("bdb89200d2a310f5afb345621f544183").toInt() },
+        };
+        // @Checkstyle:on
+    }
+
+    @Test(dataProvider = "md5HashFunction")
+    public void testMD5HashFunction(final String s, final int expected) {
+        assertThat(ConsistentHashMap.MD5.apply(s)).isEqualTo(expected);
+    }
+
+    @Test
+    public void testConsistentHashMap() {
+        // Create a hash function that returns predictable values.
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        final Function<Object, Integer, NeverThrowsException> hashFunction = mock(Function.class);
+
+        // Hashes for the 16 logical partitions.
+        when(hashFunction.apply(any())).thenReturn(0x00000000, 0x40000000, 0x80000000, 0xC0000000,
+                                                   0x10000000, 0x50000000, 0x90000000, 0xD0000000,
+                                                   0x20000000, 0x60000000, 0xA0000000, 0xE0000000,
+                                                   0x30000000, 0x70000000, 0xB0000000, 0xF0000000);
+
+        final ConsistentHashMap<String> partitions = new ConsistentHashMap<>(hashFunction);
+        partitions.put(P1, P1, 4);
+        partitions.put(P2, P2, 4);
+        partitions.put(P3, P3, 4);
+        partitions.put(P4, P4, 4);
+
+        // Check the structure of the CHM.
+        assertThat(partitions.isEmpty()).isFalse();
+        assertThat(partitions.size()).isEqualTo(4);
+        assertThat(partitions.getAll()).containsOnly(P1, P2, P3, P4);
+
+        final Map<String, Long> weights = partitions.getWeights();
+        assertThat(weights.size()).isEqualTo(4);
+        assertThat(weights).containsEntry(P1, 0x40000000L);
+        assertThat(weights).containsEntry(P2, 0x40000000L);
+        assertThat(weights).containsEntry(P3, 0x40000000L);
+        assertThat(weights).containsEntry(P4, 0x40000000L);
+
+        // Hashes for several key lookups.
+        when(hashFunction.apply(any())).thenReturn(0x70000001, 0x7FFFFFFF, 0x80000000,          // P1
+                                                   0xF0000001, 0xFFFFFFFF, 0x00000000,          // P1
+                                                   0x00000001, 0x0FFFFFFF, 0x10000000,          // P2
+                                                   0xE0000001, 0xEFFFFFFF, 0xF0000000);         // P4
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+
+        assertThat(partitions.get(P2)).isEqualTo(P2);
+        assertThat(partitions.get(P2)).isEqualTo(P2);
+        assertThat(partitions.get(P2)).isEqualTo(P2);
+
+        assertThat(partitions.get(P4)).isEqualTo(P4);
+        assertThat(partitions.get(P4)).isEqualTo(P4);
+        assertThat(partitions.get(P4)).isEqualTo(P4);
+
+        // Remove a partition and retry.
+        when(hashFunction.apply(any())).thenReturn(0x10000000, 0x50000000, 0x90000000, 0xD0000000);
+        partitions.remove(P2);
+
+        // Check the structure of the CHM.
+        assertThat(partitions.isEmpty()).isFalse();
+        assertThat(partitions.size()).isEqualTo(3);
+        assertThat(partitions.getAll()).containsOnly(P1, P3, P4);
+
+        final Map<String, Long> newWeights = partitions.getWeights();
+        assertThat(newWeights.size()).isEqualTo(3);
+        assertThat(newWeights).containsEntry(P1, 0x40000000L);
+        assertThat(newWeights).containsEntry(P3, 0x80000000L); // P2 now falls through to P3
+        assertThat(newWeights).containsEntry(P4, 0x40000000L);
+
+        // Hashes for several key lookups.
+        when(hashFunction.apply(any())).thenReturn(0x70000001, 0x7FFFFFFF, 0x80000000,          // P1
+                                                   0xF0000001, 0xFFFFFFFF, 0x00000000,          // P1
+                                                   0x00000001, 0x0FFFFFFF, 0x10000000,          // P3 (was P2)
+                                                   0xE0000001, 0xEFFFFFFF, 0xF0000000);         // P4
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+        assertThat(partitions.get(P1)).isEqualTo(P1);
+
+        assertThat(partitions.get(P2)).isEqualTo(P3);
+        assertThat(partitions.get(P2)).isEqualTo(P3);
+        assertThat(partitions.get(P2)).isEqualTo(P3);
+
+        assertThat(partitions.get(P4)).isEqualTo(P4);
+        assertThat(partitions.get(P4)).isEqualTo(P4);
+        assertThat(partitions.get(P4)).isEqualTo(P4);
+    }
+}

--
Gitblit v1.10.0