From 953657d6645a26d4ce33f21eeafa31ed4e61fd81 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 10 Nov 2016 09:39:34 +0000
Subject: [PATCH] OPENDJ-3425: implement simple distribution load-balancer
---
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java | 141 +++++
opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java | 498 +++++++++++++++++++
opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java | 288 +++++++++++
opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties | 10
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java | 382 +++++++++++++++
opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java | 179 ++++--
6 files changed, 1,442 insertions(+), 56 deletions(-)
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
index bfd9999..43a3c3b 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -569,67 +569,134 @@
@Override
public RequestWithIndex apply(final Request request) {
- // Normalize the hash to a valid factory index, taking care of negative hash values and especially
- // Integer.MIN_VALUE (see doc for Math.abs()).
- final int index = computeIndexBasedOnDnHashCode(request);
- return new RequestWithIndex(request, index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex));
- }
-
- private int computeIndexBasedOnDnHashCode(final Request request) {
- // The following conditions are ordered such that the most common operations appear first in order to
- // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
- // would only apply to the core operations, not extended operations or SASL binds.
- if (request instanceof SearchRequest) {
- return ((SearchRequest) request).getName().hashCode();
- } else if (request instanceof ModifyRequest) {
- return ((ModifyRequest) request).getName().hashCode();
- } else if (request instanceof SimpleBindRequest) {
- return hashCodeOfDnString(((SimpleBindRequest) request).getName());
- } else if (request instanceof AddRequest) {
- return ((AddRequest) request).getName().hashCode();
- } else if (request instanceof DeleteRequest) {
- return ((DeleteRequest) request).getName().hashCode();
- } else if (request instanceof CompareRequest) {
- return ((CompareRequest) request).getName().hashCode();
- } else if (request instanceof ModifyDNRequest) {
- return ((ModifyDNRequest) request).getName().hashCode();
- } else if (request instanceof PasswordModifyExtendedRequest) {
- return hashCodeOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
- } else if (request instanceof PlainSASLBindRequest) {
- return hashCodeOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
- } else if (request instanceof DigestMD5SASLBindRequest) {
- return hashCodeOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
- } else if (request instanceof GSSAPISASLBindRequest) {
- return hashCodeOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
- } else if (request instanceof CRAMMD5SASLBindRequest) {
- return hashCodeOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
- } else {
- return distributeRequestAtRandom();
- }
- }
-
- private int hashCodeOfAuthzid(final String authzid) {
- if (authzid != null && authzid.startsWith("dn:")) {
- return hashCodeOfDnString(authzid.substring(3));
- }
- return distributeRequestAtRandom();
- }
-
- private int hashCodeOfDnString(final String dnString) {
- try {
- return DN.valueOf(dnString).hashCode();
- } catch (final IllegalArgumentException ignored) {
- return distributeRequestAtRandom();
- }
- }
-
- private int distributeRequestAtRandom() {
- return ThreadLocalRandom.current().nextInt(0, maxIndex);
+ final int index = computePartitionIdFromDN(dnOfRequest(request), maxIndex);
+ return new RequestWithIndex(request, index);
}
};
}
/**
+ * Returns the partition ID which should be selected based on the provided DN and number of partitions, taking care
+ * of negative hash values and especially Integer.MIN_VALUE (see doc for Math.abs()). If the provided DN is
+ * {@code null} then a random partition ID will be returned.
+ *
+ * @param dn
+ * The DN whose partition ID is to be determined, which may be {@code null}.
+ * @param numberOfPartitions
+ * The total number of partitions.
+ * @return A partition ID in the range 0 <= partitionID < numberOfPartitions.
+ */
+ private static int computePartitionIdFromDN(final DN dn, final int numberOfPartitions) {
+ final int index = dn != null ? dn.hashCode() : ThreadLocalRandom.current().nextInt(0, numberOfPartitions);
+ return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % numberOfPartitions);
+ }
+
+ /**
+ * Returns the DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be
+ * determined. This method will return {@code null} for most extended operations and SASL bind requests which
+ * specify a non-DN authorization ID.
+ *
+ * @param request
+ * The request whose target entry DN is to be determined.
+ * @return The DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be
+ * determined.
+ */
+ static DN dnOfRequest(final Request request) {
+ // The following conditions are ordered such that the most common operations appear first in order to
+ // reduce the average number of branches. A better solution would be to use a visitor, but a visitor
+ // would only apply to the core operations, not extended operations or SASL binds.
+ if (request instanceof SearchRequest) {
+ return ((SearchRequest) request).getName();
+ } else if (request instanceof ModifyRequest) {
+ return ((ModifyRequest) request).getName();
+ } else if (request instanceof SimpleBindRequest) {
+ return dnOf(((SimpleBindRequest) request).getName());
+ } else if (request instanceof AddRequest) {
+ return ((AddRequest) request).getName();
+ } else if (request instanceof DeleteRequest) {
+ return ((DeleteRequest) request).getName();
+ } else if (request instanceof CompareRequest) {
+ return ((CompareRequest) request).getName();
+ } else if (request instanceof ModifyDNRequest) {
+ return ((ModifyDNRequest) request).getName();
+ } else if (request instanceof PasswordModifyExtendedRequest) {
+ return dnOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString());
+ } else if (request instanceof PlainSASLBindRequest) {
+ return dnOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID());
+ } else if (request instanceof DigestMD5SASLBindRequest) {
+ return dnOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID());
+ } else if (request instanceof GSSAPISASLBindRequest) {
+ return dnOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID());
+ } else if (request instanceof CRAMMD5SASLBindRequest) {
+ return dnOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID());
+ } else {
+ return null;
+ }
+ }
+
+ private static DN dnOfAuthzid(final String authzid) {
+ if (authzid != null && authzid.startsWith("dn:")) {
+ return dnOf(authzid.substring(3));
+ }
+ return null;
+ }
+
+ private static DN dnOf(final String dnString) {
+ try {
+ return DN.valueOf(dnString);
+ } catch (final IllegalArgumentException ignored) {
+ return null;
+ }
+ }
+
+ /**
+ * Creates a distribution load balancer which uses consistent hashing to distributes requests across a set of
+ * partitions based on a hash of each request's target DN. More precisely, a partition is selected as follows:
+ * <ul>
+ * <li>if the targeted entry lies beneath the partition base DN then the partition is selected based on a hash of
+ * the DN which is superior to the target DN and immediately subordinate to the partition base DN</li>
+ * <li>otherwise, if the request is not a search then the request is routed to a random partition</li>
+ * <li>otherwise, if the search request targets the partition base DN, or a superior thereof, then the search is
+ * routed to a random partition or broadcast to all partitions, depending on the search scope. When broadcasting,
+ * care is taken to re-scope sub-requests in order to avoid returning duplicate entries</li>
+ * <li>otherwise, the search is routed to a random partition because its scope lies outside of the partition
+ * space.</li>
+ * </ul>
+ * This load balancer allows client applications to linearly scale their deployment for write throughput as well
+ * as total number of entries. For example, if a single replicated topology can support 10000 updates/s and a
+ * total of 100M entries, then a 4 way distributed topology could support up to 40000 updates/s and 400M entries.
+ * <p/>
+ * <b>NOTE:</b> there are a number of assumptions in the design of this load balancer as well as a number of
+ * limitations:
+ * <ul>
+ * <li>simple paged results, server side sorting, and VLV request controls are not supported for searches which
+ * traverse all partitions. </li>
+ * <li>persistent searches which traverse all partitions are only supported if they request changes only. </li>
+ * <li>requests which target an entry which is not below the partition base DN will be routed to a partition
+ * selected based on the request's DN, thereby providing affinity. Note that this behavior assumes that entries
+ * which are not below the partition base DN are replicated across all partitions.</li>
+ * <li>searches that traverse multiple partitions as well as entries above the partition base DN may return
+ * results in a non-hierarchical order. Specifically, entries from a partition (below the partition base DN)
+ * may be returned before entries above the partition base DN. Although not required by the LDAP standard, some
+ * legacy clients expect entries to be returned in hierarchical order.
+ * </li>
+ * </ul>
+ *
+ * @param partitionBaseDN
+ * The DN beneath which data is partitioned. All other data is assumed to be shared across all partitions.
+ * @param partitions
+ * The consistent hash map containing the partitions to be distributed.
+ * @param options
+ * The configuration options for the load-balancer (no options are supported currently).
+ * @return The new distribution load balancer.
+ */
+ @SuppressWarnings("unused")
+ public static ConnectionFactory newFixedSizeDistributionLoadBalancer(final DN partitionBaseDN,
+ final ConsistentHashMap<? extends ConnectionFactory> partitions, final Options options) {
+ return new ConsistentHashDistributionLoadBalancer(partitionBaseDN, partitions);
+ }
+
+ /**
* Creates a new "least requests" load-balancer which will load-balance individual requests across the provided
* set of connection factories, each typically representing a single replica, using an algorithm that ensures that
* requests are routed to the replica which has the minimum number of active requests.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java
new file mode 100644
index 0000000..2441151
--- /dev/null
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java
@@ -0,0 +1,498 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static com.forgerock.opendj.ldap.CoreMessages.*;
+import static org.forgerock.opendj.ldap.Connections.dnOfRequest;
+import static org.forgerock.opendj.ldap.LdapException.newLdapException;
+import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_USER_CANCELLED;
+import static org.forgerock.opendj.ldap.ResultCode.PROTOCOL_ERROR;
+import static org.forgerock.opendj.ldap.ResultCode.UNWILLING_TO_PERFORM;
+import static org.forgerock.opendj.ldap.SearchScope.SUBORDINATES;
+import static org.forgerock.opendj.ldap.requests.Requests.copyOfSearchRequest;
+import static org.forgerock.opendj.ldap.spi.LdapPromises.newFailedLdapPromise;
+import static org.forgerock.util.Utils.closeSilently;
+import static org.forgerock.util.Utils.joinAsString;
+import static org.forgerock.util.promise.Promises.newResultPromise;
+import static org.forgerock.util.promise.Promises.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.opendj.ldap.controls.PersistentSearchRequestControl;
+import org.forgerock.opendj.ldap.controls.ServerSideSortRequestControl;
+import org.forgerock.opendj.ldap.controls.SimplePagedResultsControl;
+import org.forgerock.opendj.ldap.controls.VirtualListViewRequestControl;
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.IntermediateResponse;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.opendj.ldap.responses.SearchResultEntry;
+import org.forgerock.opendj.ldap.responses.SearchResultReference;
+import org.forgerock.opendj.ldap.spi.ConnectionState;
+import org.forgerock.opendj.ldap.spi.LdapPromises;
+import org.forgerock.util.AsyncFunction;
+import org.forgerock.util.promise.ExceptionHandler;
+import org.forgerock.util.promise.Promise;
+import org.forgerock.util.promise.PromiseImpl;
+import org.forgerock.util.promise.ResultHandler;
+import org.forgerock.util.promise.RuntimeExceptionHandler;
+
+/**
+ * A simple distribution algorithm which distributes requests equally across a fixed number of partitions based on a
+ * hash of the request's target DN. See {@link Connections#newFixedSizeDistributionLoadBalancer}.
+ */
+final class ConsistentHashDistributionLoadBalancer implements ConnectionFactory {
+ private static final String NAME = ConsistentHashDistributionLoadBalancer.class.getSimpleName();
+ private final ConsistentHashMap<? extends ConnectionFactory> partitions;
+ private final DN partitionBaseDN;
+
+ ConsistentHashDistributionLoadBalancer(final DN partitionBaseDN,
+ final ConsistentHashMap<? extends ConnectionFactory> partitions) {
+ this.partitionBaseDN = partitionBaseDN;
+ this.partitions = partitions;
+ }
+
+ @Override
+ public final void close() {
+ closeSilently(partitions.getAll());
+ }
+
+ @Override
+ public final String toString() {
+ return NAME + '(' + joinAsString(",", partitions) + ')';
+ }
+
+ @Override
+ public final Connection getConnection() throws LdapException {
+ return new ConnectionImpl();
+ }
+
+ @Override
+ public final Promise<Connection, LdapException> getConnectionAsync() {
+ return newResultPromise((Connection) new ConnectionImpl());
+ }
+
+ private class ConnectionImpl extends AbstractAsynchronousConnection {
+ private final ConnectionState state = new ConnectionState();
+
+ @Override
+ public String toString() {
+ return NAME + "Connection";
+ }
+
+ @Override
+ public LdapPromise<Void> abandonAsync(final AbandonRequest request) {
+ // We cannot possibly route these correctly, so just drop them.
+ return LdapPromises.newSuccessfulLdapPromise(null);
+ }
+
+ @Override
+ public LdapPromise<Result> addAsync(final AddRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler) {
+ final ConnectionFactory partition = getPartition(request.getName());
+ return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.addAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public void addConnectionEventListener(final ConnectionEventListener listener) {
+ state.addConnectionEventListener(listener);
+ }
+
+ @Override
+ public LdapPromise<BindResult> bindAsync(final BindRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler) {
+ final DN dn = dnOfRequest(request);
+ if (dn == null) {
+ return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM,
+ DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_BIND.get()));
+ }
+ final ConnectionFactory partition = getPartition(dn);
+ return connectAndSendRequest(partition, new AsyncFunction<Connection, BindResult, LdapException>() {
+ @Override
+ public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.bindAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public void close(final UnbindRequest request, final String reason) {
+ state.notifyConnectionClosed();
+ }
+
+ @Override
+ public LdapPromise<CompareResult> compareAsync(final CompareRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler) {
+ final ConnectionFactory partition = getPartition(request.getName());
+ return connectAndSendRequest(partition, new AsyncFunction<Connection, CompareResult, LdapException>() {
+ @Override
+ public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.compareAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public LdapPromise<Result> deleteAsync(final DeleteRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler) {
+ // We could reject requests that attempt to delete entries superior to the partition base DN. However,
+ // this is really the responsibility of the backend servers to enforce. In some cases such requests
+ // should be allowed, e.g. if the partitions are all empty.
+ final ConnectionFactory partition = getPartition(request.getName());
+ return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.deleteAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(final ExtendedRequest<R> request,
+ final IntermediateResponseHandler
+ intermediateResponseHandler) {
+ final DN dn = dnOfRequest(request);
+ if (dn == null) {
+ return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM,
+ DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_EXT_OP.get()));
+ }
+ final ConnectionFactory partition = getPartition(dn);
+ return connectAndSendRequest(partition, new AsyncFunction<Connection, R, LdapException>() {
+ @Override
+ public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.extendedRequestAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public boolean isClosed() {
+ return state.isClosed();
+ }
+
+ @Override
+ public boolean isValid() {
+ return state.isValid();
+ }
+
+ @Override
+ public LdapPromise<Result> modifyAsync(final ModifyRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler) {
+ final ConnectionFactory partition = getPartition(request.getName());
+ return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.modifyAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public LdapPromise<Result> modifyDNAsync(final ModifyDNRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler) {
+ final DN oldDN = request.getName();
+ final ConnectionFactory oldPartition = getPartition(oldDN);
+
+ final DN newParent = request.getNewSuperior() != null ? request.getNewSuperior() : oldDN.parent();
+ final DN newDN = newParent.child(request.getNewRDN());
+ final ConnectionFactory newPartition = getPartition(newDN);
+
+ // Acceptable if the request is completely outside of the partitions or entirely within a single partition.
+ if (oldPartition != newPartition) {
+ return unwillingToPerform(DISTRIBUTION_MODDN_SPANS_MULTIPLE_PARTITIONS.get());
+ }
+
+ return connectAndSendRequest(oldPartition, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.modifyDNAsync(request, intermediateResponseHandler);
+ }
+ });
+ }
+
+ @Override
+ public void removeConnectionEventListener(final ConnectionEventListener listener) {
+ state.removeConnectionEventListener(listener);
+ }
+
+ @Override
+ public LdapPromise<Result> searchAsync(final SearchRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final SearchResultHandler entryHandler) {
+ // Broadcast if the search targets an entry which is superior to the base DN and has a scope which
+ // reaches into the partitions.
+ final DN dn = request.getName();
+ switch (request.getScope().asEnum()) {
+ case BASE_OBJECT:
+ // Request can always be handled by a single partition.
+ return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
+ case SINGLE_LEVEL:
+ // Needs broadcasting if the search DN is equal to the partition DN.
+ if (dn.equals(partitionBaseDN)) {
+ return searchAllPartitions(request, intermediateResponseHandler, entryHandler);
+ } else {
+ return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
+ }
+ case WHOLE_SUBTREE:
+ // Needs broadcasting if the search DN is superior or equal to the partition DN.
+ if (dn.isSuperiorOrEqualTo(partitionBaseDN)) {
+ // Split into primary search against search DN and secondary searches against partition DN.
+ return splitAndSearchAllPartitions(request, intermediateResponseHandler, entryHandler);
+ } else {
+ return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
+ }
+ case SUBORDINATES:
+ // Needs broadcasting if the search DN is superior or equal to the partition DN.
+ if (dn.equals(partitionBaseDN)) {
+ return searchAllPartitions(request, intermediateResponseHandler, entryHandler);
+ } else if (dn.isSuperiorOrEqualTo(partitionBaseDN)) {
+ // Split into primary search against search DN and secondary searches against partition DN.
+ return splitAndSearchAllPartitions(request, intermediateResponseHandler, entryHandler);
+ } else {
+ return searchSinglePartition(request, intermediateResponseHandler, entryHandler);
+ }
+ default: // UNKNOWN
+ return unwillingToPerform(DISTRIBUTION_UNSUPPORTED_SEARCH_SCOPE.get(request.getScope().intValue()));
+ }
+ }
+
+ /**
+ * Forwards a search request to each partition. The search request is scoped such that duplicate results are
+ * not possible.
+ */
+ private LdapPromise<Result> searchAllPartitions(final SearchRequest request,
+ final IntermediateResponseHandler irh,
+ final SearchResultHandler srh) {
+ return broadcastSearch(request, request, irh, srh);
+ }
+
+ /**
+ * In order to avoid duplicates, search one partition using the request's DN and the remaining partitions using
+ * the partition base DN with scope "SUBORDINATES".
+ * <p>
+ * TODO: may return results that are not in hierarchical order.
+ */
+ private LdapPromise<Result> splitAndSearchAllPartitions(final SearchRequest primarySearch,
+ final IntermediateResponseHandler irh,
+ final SearchResultHandler srh) {
+ final SearchRequest secondarySearch =
+ copyOfSearchRequest(primarySearch).setName(partitionBaseDN).setScope(SUBORDINATES);
+ return broadcastSearch(primarySearch, secondarySearch, irh, srh);
+ }
+
+ /**
+ * Take care to ensure that all searches are cancelled when one of them fails (important if the
+ * searches are persistent).
+ */
+ private LdapPromise<Result> broadcastSearch(final SearchRequest primarySearch,
+ final SearchRequest secondarySearch,
+ final IntermediateResponseHandler irh,
+ final SearchResultHandler srh) {
+ // First reject requests that contain controls that we do not support when broadcast.
+ if (primarySearch.containsControl(VirtualListViewRequestControl.OID)) {
+ return unwillingToPerform(DISTRIBUTION_VLV_CONTROL_NOT_SUPPORTED.get());
+ } else if (primarySearch.containsControl(SimplePagedResultsControl.OID)) {
+ return unwillingToPerform(DISTRIBUTION_SPR_CONTROL_NOT_SUPPORTED.get());
+ } else if (primarySearch.containsControl(ServerSideSortRequestControl.OID)) {
+ return unwillingToPerform(DISTRIBUTION_SSS_CONTROL_NOT_SUPPORTED.get());
+ } else if (primarySearch.containsControl(PersistentSearchRequestControl.OID)) {
+ // Persistent searches return results in two phases: the initial search results and then the changes.
+ // Unfortunately, there's no way to determine when the first phase has completed, so it's not
+ // possible to prevent the two phases from being interleaved when broadcast.
+ try {
+ final PersistentSearchRequestControl control =
+ primarySearch.getControl(PersistentSearchRequestControl.DECODER, new DecodeOptions());
+ if (!control.isChangesOnly()) {
+ return unwillingToPerform(DISTRIBUTION_PSEARCH_CONTROL_NOT_SUPPORTED.get());
+ }
+ } catch (final DecodeException e) {
+ return newFailedLdapPromise(newLdapException(PROTOCOL_ERROR, e.getMessage(), e));
+ }
+ }
+
+ final List<Promise<Result, LdapException>> promises = new ArrayList<>(partitions.size());
+
+ // Launch all searches with the primary search targeting a random partition.
+ final ConnectionFactory primaryPartition = getPartition(primarySearch.getName());
+ final IntermediateResponseHandler sirh = synchronize(irh);
+ final SearchResultHandler ssrh = synchronize(srh);
+ for (final ConnectionFactory partition : partitions.getAll()) {
+ final SearchRequest searchRequest = partition == primaryPartition ? primarySearch : secondarySearch;
+ promises.add(searchSinglePartition(searchRequest, sirh, ssrh, partition));
+ }
+
+ // FIXME: chained PromiseImpl and Promises.when() don't chain cancellation requests.
+ final PromiseImpl<Result, LdapException> reducedPromise = new PromiseImpl<Result, LdapException>() {
+ @Override
+ protected LdapException tryCancel(final boolean mayInterruptIfRunning) {
+ for (Promise<Result, LdapException> promise : promises) {
+ promise.cancel(mayInterruptIfRunning);
+ }
+ return newLdapException(CLIENT_SIDE_USER_CANCELLED);
+ }
+ };
+ when(promises).thenOnResult(new ResultHandler<List<Result>>() {
+ @Override
+ public void handleResult(final List<Result> results) {
+ // TODO: Depending on controls we may want to merge these results in some way.
+ reducedPromise.handleResult(results.get(0));
+ }
+ }).thenOnException(new ExceptionHandler<LdapException>() {
+ @Override
+ public void handleException(final LdapException exception) {
+ reducedPromise.handleException(exception);
+ }
+ }).thenOnRuntimeException(new RuntimeExceptionHandler() {
+ @Override
+ public void handleRuntimeException(final RuntimeException exception) {
+ reducedPromise.handleRuntimeException(exception);
+ }
+ }).thenFinally(new Runnable() {
+ @Override
+ public void run() {
+ // Ensure that any remaining searches are terminated.
+ for (Promise<Result, LdapException> promise : promises) {
+ promise.cancel(true);
+ }
+ }
+ });
+
+ return LdapPromises.asPromise(reducedPromise);
+ }
+
+ private LdapPromise<Result> unwillingToPerform(final LocalizableMessage msg) {
+ return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM, msg));
+ }
+
+ /**
+ * Forwards a search request to a single partition based on the search's base DN. If the DN does not target a
+ * specific partition then a random partition will be selected.
+ */
+ private LdapPromise<Result> searchSinglePartition(final SearchRequest request,
+ final IntermediateResponseHandler irh,
+ final SearchResultHandler srh) {
+ final ConnectionFactory partition = getPartition(request.getName());
+ return searchSinglePartition(request, irh, srh, partition);
+ }
+
+ private LdapPromise<Result> searchSinglePartition(final SearchRequest request,
+ final IntermediateResponseHandler irh,
+ final SearchResultHandler srh,
+ final ConnectionFactory partition) {
+ return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.searchAsync(request, irh, srh);
+ }
+ });
+ }
+
+ /** Returns a search result handler that ensures that only one response is processed at a time. */
+ private SearchResultHandler synchronize(final SearchResultHandler srh) {
+ return new SearchResultHandler() {
+ @Override
+ public synchronized boolean handleEntry(final SearchResultEntry entry) {
+ return srh.handleEntry(entry);
+ }
+
+ @Override
+ public synchronized boolean handleReference(final SearchResultReference reference) {
+ return srh.handleReference(reference);
+ }
+ };
+ }
+
+ /** Returns an intermediate response handler that ensures that only one response is processed at a time. */
+ private IntermediateResponseHandler synchronize(final IntermediateResponseHandler irh) {
+ return new IntermediateResponseHandler() {
+ @Override
+ public synchronized boolean handleIntermediateResponse(final IntermediateResponse response) {
+ return irh.handleIntermediateResponse(response);
+ }
+ };
+ }
+
+ /**
+ * Returns the partition for requests having the provided DN. If the DN is not associated with a specific
+ * partition then a random partition will be returned.
+ */
+ private ConnectionFactory getPartition(final DN dn) {
+ final DN partitionDN = getPartitionDN(dn);
+ return partitions.get((partitionDN != null ? partitionDN : dn).toNormalizedUrlSafeString());
+ }
+
+ /**
+ * Returns the DN which is a child of the partition base DN and which is equal to or superior to the provided
+ * DN, otherwise {@code null}.
+ */
+ private DN getPartitionDN(final DN dn) {
+ final int depthBelowBaseDN = dn.size() - partitionBaseDN.size();
+ if (depthBelowBaseDN > 0 && dn.isSubordinateOrEqualTo(partitionBaseDN)) {
+ return dn.parent(depthBelowBaseDN - 1);
+ }
+ return null;
+ }
+
+ private <R> LdapPromise<R> connectAndSendRequest(final ConnectionFactory partition,
+ final AsyncFunction<Connection, R, LdapException> doRequest) {
+ if (state.isClosed()) {
+ throw new IllegalStateException("Connection is already closed");
+ }
+ final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
+ return connectAsync(partition).thenOnResult(new ResultHandler<Connection>() {
+ @Override
+ public void handleResult(final Connection connection) {
+ connectionHolder.set(connection);
+ }
+ // FIXME: how do we support cancellation of the request?
+ }).thenAsync(doRequest).thenFinally(new Runnable() {
+ @Override
+ public void run() {
+ closeSilently(connectionHolder.get());
+ }
+ });
+ }
+
+ private LdapPromise<Connection> connectAsync(final ConnectionFactory partition) {
+ return LdapPromises.asPromise(partition.getConnectionAsync()
+ .thenOnException(new ExceptionHandler<LdapException>() {
+ @Override
+ public void handleException(final LdapException e) {
+ state.notifyConnectionError(false, e);
+ }
+ }));
+ }
+ }
+}
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java
new file mode 100644
index 0000000..a43bf7b
--- /dev/null
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java
@@ -0,0 +1,288 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.forgerock.util.Function;
+import org.forgerock.util.Reject;
+import org.forgerock.util.annotations.VisibleForTesting;
+import org.forgerock.util.promise.NeverThrowsException;
+
+/**
+ * An implementation of "consistent hashing" supporting per-partition weighting. This implementation is thread safe
+ * and allows partitions to be added and removed during use.
+ * <p>
+ * This implementation maps partitions to one or more points on a circle ranging from {@link Integer#MIN_VALUE} to
+ * {@link Integer#MAX_VALUE}. The number of points per partition is dictated by the partition's weight. A partition
+ * with a weight which is higher than another partition will receive a proportionally higher load.
+ *
+ * @param <P> The type of partition object.
+ *
+ * @see <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.23.3738">Consistent Hashing and Random
+ * Trees</a>
+ * @see <a href="http://www8.org/w8-papers/2a-webserver/caching/paper2.html">Web Caching with Consistent Hashing</a>
+ */
+public final class ConsistentHashMap<P> {
+ // TODO: add methods for determining which partitions will need to be rebalanced when a partition is added or
+ // removed.
+ /** The default weight. The value is relatively high in order to minimize the risk of imbalances. */
+ private static final int DEFAULT_WEIGHT = 200;
+ /** Default hash function based on MD5. */
+ @VisibleForTesting
+ static final Function<Object, Integer, NeverThrowsException> MD5 =
+ new Function<Object, Integer, NeverThrowsException>() {
+ @Override
+ public Integer apply(final Object key) {
+ final byte[] bytes = key.toString().getBytes(UTF_8);
+ final byte[] digest = getMD5Digest().digest(bytes);
+ return ByteString.wrap(digest).toInt();
+ }
+
+ private MessageDigest getMD5Digest() {
+ // TODO: we may want to cache these.
+ try {
+ return MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ /** Synchronizes updates. Reads are protected by copy on write. */
+ private final ReentrantLock writeLock = new ReentrantLock();
+ /** Consistent hash map circle. */
+ private volatile NavigableMap<Integer, Node<P>> circle = new TreeMap<>();
+ /** Maps partition IDs to their partition. */
+ private volatile Map<String, P> partitions = new LinkedHashMap<>();
+ /** Function used for hashing keys. */
+ private final Function<Object, Integer, NeverThrowsException> hashFunction;
+
+ /** Creates a new consistent hash map which will hash keys using MD5. */
+ public ConsistentHashMap() {
+ this(MD5);
+ }
+
+ /**
+ * Creates a new consistent hash map which will hash keys using the provided hash function.
+ *
+ * @param hashFunction
+ * The function which should be used for hashing keys.
+ */
+ public ConsistentHashMap(final Function<Object, Integer, NeverThrowsException> hashFunction) {
+ this.hashFunction = hashFunction;
+ }
+
+ /**
+ * Puts a partition into this consistent hash map using the default weight which is sufficiently high to ensure a
+ * reasonably uniform distribution among all partitions having the same weight.
+ *
+ * @param partitionId
+ * The partition ID.
+ * @param partition
+ * The partition.
+ * @return This consistent hash map.
+ */
+ public ConsistentHashMap<P> put(final String partitionId, final P partition) {
+ return put(partitionId, partition, DEFAULT_WEIGHT);
+ }
+
+ /**
+ * Puts a partition into this consistent hash map using the specified weight. If all partitions have the same
+ * weight then they will each receive a similar amount of load. A partition having a weight which is twice that
+ * of another will receive twice the load. Weight values should generally be great than 200 in order to minimize
+ * the risk of unexpected imbalances due to the way in which logical partitions are mapped to real partitions.
+ *
+ * @param partitionId
+ * The partition ID.
+ * @param partition
+ * The partition.
+ * @param weight
+ * The partition's weight, which should typically be over 200 and never negative.
+ * @return This consistent hash map.
+ */
+ public ConsistentHashMap<P> put(final String partitionId, final P partition, final int weight) {
+ Reject.ifNull(partitionId, "partitionId must be non-null");
+ Reject.ifNull(partition, "partition must be non-null");
+ Reject.ifTrue(weight < 0, "Weight must be a positive integer");
+
+ final Node<P> node = new Node<>(partitionId, partition, weight);
+ writeLock.lock();
+ try {
+ final TreeMap<Integer, Node<P>> newCircle = new TreeMap<>(circle);
+ for (int i = 0; i < weight; i++) {
+ newCircle.put(hashFunction.apply(partitionId + i), node);
+ }
+
+ final Map<String, P> newPartitions = new LinkedHashMap<>(partitions);
+ newPartitions.put(partitionId, partition);
+
+ // It doesn't matter that these assignments are not atomic.
+ circle = newCircle;
+ partitions = newPartitions;
+ } finally {
+ writeLock.unlock();
+ }
+ return this;
+ }
+
+ /**
+ * Removes the partition that was previously added using the provided partition ID.
+ *
+ * @param partitionId
+ * The partition ID.
+ * @return This consistent hash map.
+ */
+ public ConsistentHashMap<P> remove(final String partitionId) {
+ Reject.ifNull(partitionId, "partitionId must be non-null");
+
+ writeLock.lock();
+ try {
+ if (partitions.containsKey(partitionId)) {
+ final TreeMap<Integer, Node<P>> newCircle = new TreeMap<>(circle);
+ final Node<P> node = newCircle.remove(hashFunction.apply(partitionId + 0));
+ for (int i = 1; i < node.weight; i++) {
+ newCircle.remove(hashFunction.apply(partitionId + i));
+ }
+
+ final Map<String, P> newPartitions = new LinkedHashMap<>(partitions);
+ newPartitions.remove(partitionId);
+
+ // It doesn't matter that these assignments are not atomic.
+ circle = newCircle;
+ partitions = newPartitions;
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ return this;
+ }
+
+ /**
+ * Returns the partition from this map corresponding to the provided key's hash, or {@code null} if this map is
+ * empty.
+ *
+ * @param key
+ * The key for which a corresponding partition is to be returned.
+ * @return The partition from this map corresponding to the provided key's hash, or {@code null} if this map is
+ * empty.
+ */
+ P get(final Object key) {
+ final NavigableMap<Integer, Node<P>> circleSnapshot = circle;
+ final Map.Entry<Integer, Node<P>> ceilingEntry = circleSnapshot.ceilingEntry(hashFunction.apply(key));
+ if (ceilingEntry != null) {
+ return ceilingEntry.getValue().partition;
+ }
+ final Map.Entry<Integer, Node<P>> firstEntry = circleSnapshot.firstEntry();
+ return firstEntry != null ? firstEntry.getValue().partition : null;
+ }
+
+ /**
+ * Returns a collection containing all of the partitions contained in this consistent hash map.
+ *
+ * @return A collection containing all of the partitions contained in this consistent hash map.
+ */
+ Collection<P> getAll() {
+ return partitions.values();
+ }
+
+ /**
+ * Returns the number of partitions in this consistent hash map.
+ *
+ * @return The number of partitions in this consistent hash map.
+ */
+ int size() {
+ return partitions.size();
+ }
+
+ /**
+ * Returns {@code true} if there are no partitions in this consistent hash map.
+ *
+ * @return {@code true} if there are no partitions in this consistent hash map.
+ */
+ boolean isEmpty() {
+ return partitions.isEmpty();
+ }
+
+ /**
+ * Returns a map whose keys are the partitions stored in this map and whose values are the actual weights associated
+ * with each partition. The sum of the weights will be equal to 2^32.
+ * <p/>
+ * This method is intended for testing, but may one day be used in order to query the current status of the
+ * load-balancer and, in particular, the weighting associated with each partition as a percentage.
+ *
+ * @return A map whose keys are the partitions stored in this map and whose values are the actual weights associated
+ * with each partition.
+ */
+ @VisibleForTesting
+ Map<P, Long> getWeights() {
+ final NavigableMap<Integer, Node<P>> circleSnapshot = circle;
+ final IdentityHashMap<P, Long> weights = new IdentityHashMap<>();
+ Map.Entry<Integer, Node<P>> previousEntry = null;
+ for (final Map.Entry<Integer, Node<P>> entry : circleSnapshot.entrySet()) {
+ final long index = entry.getKey();
+ final P partition = entry.getValue().partition;
+ if (previousEntry == null) {
+ // Special case for first value since the range begins with the last entry.
+ final long range1 = (long) Integer.MAX_VALUE - circleSnapshot.lastEntry().getKey();
+ final long range2 = index - Integer.MIN_VALUE;
+ weights.put(partition, range1 + range2 + 1);
+ } else {
+ final long start = previousEntry.getKey();
+ final long end = entry.getKey();
+ if (weights.containsKey(partition)) {
+ weights.put(partition, weights.get(partition) + (end - start));
+ } else {
+ weights.put(partition, end - start);
+ }
+ }
+ previousEntry = entry;
+ }
+ return weights;
+ }
+
+ @Override
+ public String toString() {
+ return getWeights().toString();
+ }
+
+ /** A partition stored in the consistent hash map circle. */
+ private static final class Node<P> {
+ private final String partitionId;
+ private final P partition;
+ private final int weight;
+
+ private Node(final String partitionId, final P partition, final int weight) {
+ this.partitionId = partitionId;
+ this.partition = partition;
+ this.weight = weight;
+ }
+
+ @Override
+ public String toString() {
+ return partitionId;
+ }
+ }
+}
diff --git a/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties b/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
index d3d8375..f5ec2c8 100644
--- a/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
+++ b/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
@@ -1725,3 +1725,13 @@
ERR_UNKNOWN_REQUEST_TYPE=The following request has a unknown request type: %s
WARN_DECODING_AFFINITY_CONTROL=An error occurred while decoding an affinity control: %s
ERR_SASL_BIND_MULTI_STAGE=An error occurred during multi-stage authentication: '%s'
+DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_BIND=Unable to determine the partition ID from bind request
+DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_EXT_OP=Unable to determine the partition ID from extended request
+DISTRIBUTION_DELETE_SPANS_MULTIPLE_PARTITIONS=The delete request spans multiple partitions
+DISTRIBUTION_MODDN_SPANS_MULTIPLE_PARTITIONS=The modify DN request spans multiple partitions
+DISTRIBUTION_UNSUPPORTED_SEARCH_SCOPE=The search scope %d is not supported by the distribution layer
+DISTRIBUTION_VLV_CONTROL_NOT_SUPPORTED=The VLV request control is not supported across multiple partitions
+DISTRIBUTION_SPR_CONTROL_NOT_SUPPORTED=The Simple Paged Results request control is not supported across multiple partitions
+DISTRIBUTION_SSS_CONTROL_NOT_SUPPORTED=The Server Side Sort request control is not supported across multiple partitions
+DISTRIBUTION_PSEARCH_CONTROL_NOT_SUPPORTED=The Persistent Search request control is not supported across multiple \
+ partitions, except if it requests changes only
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java
new file mode 100644
index 0000000..44f91a2
--- /dev/null
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java
@@ -0,0 +1,382 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.forgerock.opendj.ldap.Connections.newFixedSizeDistributionLoadBalancer;
+import static org.forgerock.opendj.ldap.Filter.alwaysTrue;
+import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT;
+import static org.forgerock.opendj.ldap.SearchScope.SINGLE_LEVEL;
+import static org.forgerock.opendj.ldap.SearchScope.SUBORDINATES;
+import static org.forgerock.opendj.ldap.SearchScope.WHOLE_SUBTREE;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
+import static org.forgerock.opendj.ldap.requests.Requests.*;
+import static org.forgerock.opendj.ldap.responses.Responses.newBindResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newCompareResult;
+import static org.forgerock.opendj.ldap.responses.Responses.newResult;
+import static org.forgerock.opendj.ldap.spi.LdapPromises.newSuccessfulLdapPromise;
+import static org.forgerock.util.Options.defaultOptions;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.Requests;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.mockito.Mock;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+public class ConsistentHashDistributionLoadBalancerTest extends SdkTestCase {
+ private static final DN PARTITION_BASE_DN = DN.valueOf("ou=people,dc=example,dc=com");
+ private static final DN DN_1_BELOW_PARTITION_BASE_DN = PARTITION_BASE_DN.child("uid=bjensen");
+ private static final DN DN_2_BELOW_PARTITION_BASE_DN = DN_1_BELOW_PARTITION_BASE_DN.child("cn=prefs");
+ private static final DN DN_ABOVE_PARTITION_BASE_DN = PARTITION_BASE_DN.parent();
+ private static final DN UNPARTITIONED_DN = DN.valueOf("ou=groups,dc=example,dc=com");
+ private static final LdapPromise<Result> SUCCESS = newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS));
+ private static final LdapPromise<BindResult> BIND_SUCCESS =
+ newSuccessfulLdapPromise(newBindResult(ResultCode.SUCCESS));
+ private static final LdapPromise<CompareResult> COMPARE_SUCCESS =
+ newSuccessfulLdapPromise(newCompareResult(ResultCode.SUCCESS));
+ private static final int P1_HASH = 0x00000000;
+ private static final int P2_HASH = 0x80000000;
+ @Mock
+ private Connection partition1Conn;
+ @Mock
+ private Connection partition2Conn;
+ @Mock
+ private Function<Object, Integer, NeverThrowsException> hashFunction;
+ private ConnectionFactory partition1;
+ private ConnectionFactory partition2;
+ private ConnectionFactory loadBalancer;
+
+ @BeforeMethod
+ public void beforeMethod() {
+ initMocks(this);
+
+ partition1 = mockConnectionFactory(partition1Conn);
+ partition2 = mockConnectionFactory(partition2Conn);
+
+ when(hashFunction.apply(any())).thenReturn(P1_HASH, P2_HASH);
+
+ final ConsistentHashMap<ConnectionFactory> partitions = new ConsistentHashMap<>(hashFunction);
+ partitions.put("P1", partition1, 1);
+ partitions.put("P2", partition2, 1);
+
+ loadBalancer = newFixedSizeDistributionLoadBalancer(PARTITION_BASE_DN, partitions, defaultOptions());
+
+ when(partition1Conn.addAsync(any(AddRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+ when(partition2Conn.addAsync(any(AddRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+ when(partition1Conn.bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(BIND_SUCCESS);
+ when(partition2Conn.bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(BIND_SUCCESS);
+
+ when(partition1Conn.compareAsync(any(CompareRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(COMPARE_SUCCESS);
+ when(partition2Conn.compareAsync(any(CompareRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(COMPARE_SUCCESS);
+
+ when(partition1Conn.deleteAsync(any(DeleteRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+ when(partition2Conn.deleteAsync(any(DeleteRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+ when(partition1Conn.extendedRequestAsync(any(ExtendedRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+ when(partition2Conn.extendedRequestAsync(any(ExtendedRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+ when(partition1Conn.modifyAsync(any(ModifyRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+ when(partition2Conn.modifyAsync(any(ModifyRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+ when(partition1Conn.modifyDNAsync(any(ModifyDNRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+ when(partition2Conn.modifyDNAsync(any(ModifyDNRequest.class),
+ any(IntermediateResponseHandler.class))).thenReturn(SUCCESS);
+
+ when(partition1Conn.searchAsync(any(SearchRequest.class),
+ any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class))) .thenReturn(SUCCESS);
+ when(partition2Conn.searchAsync(any(SearchRequest.class),
+ any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class))).thenReturn(SUCCESS);
+ }
+
+ @Test
+ public void closeShouldCloseAllPartitions() throws Exception {
+ loadBalancer.close();
+ verify(partition1).close();
+ verify(partition2).close();
+ }
+
+ @Test
+ public void getConnectionShouldReturnLogicalConnection() throws Exception {
+ try (final Connection connection = loadBalancer.getConnection()) {
+ assertThat(connection).isNotNull();
+ connection.close();
+ }
+ verifyZeroInteractions(partition1, partition2);
+ }
+
+ @Test
+ public void getConnectionAsyncShouldReturnLogicalConnection() throws Exception {
+ try (final Connection connection = loadBalancer.getConnectionAsync().get()) {
+ assertThat(connection).isNotNull();
+ connection.close();
+ }
+ verifyZeroInteractions(partition1, partition2);
+ }
+
+ @DataProvider
+ public Object[][] requests() {
+ return new Object[][] { { P1_HASH, false, UNPARTITIONED_DN, UNPARTITIONED_DN },
+ { P2_HASH, true, UNPARTITIONED_DN, UNPARTITIONED_DN },
+ { P1_HASH, false, DN_ABOVE_PARTITION_BASE_DN, DN_ABOVE_PARTITION_BASE_DN },
+ { P2_HASH, true, DN_ABOVE_PARTITION_BASE_DN, DN_ABOVE_PARTITION_BASE_DN },
+ { P1_HASH, false, DN_1_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
+ { P2_HASH, true, DN_1_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
+ { P1_HASH, false, DN_2_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN },
+ { P2_HASH, true, DN_2_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN } };
+ }
+
+ @Test(dataProvider = "requests")
+ public void addShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+ final DN partitionDN) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(hash);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.addAsync(newAddRequest(requestDN));
+ }
+ verify(hitPartition(isSecondPartition)).addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class));
+ verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+ verify(hitPartition(isSecondPartition)).close();
+ verifyZeroInteractions(missPartition(isSecondPartition));
+ }
+
+ @Test(dataProvider = "requests")
+ public void bindShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+ final DN partitionDN) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(hash);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.bindAsync(Requests.newSimpleBindRequest(requestDN.toString(), "password".toCharArray()));
+ }
+ verify(hitPartition(isSecondPartition)).bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class));
+ verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+ verify(hitPartition(isSecondPartition)).close();
+ verifyZeroInteractions(missPartition(isSecondPartition));
+ }
+
+ @Test(dataProvider = "requests")
+ public void compareShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+ final DN partitionDN) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(hash);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.compareAsync(newCompareRequest(requestDN.toString(), "cn", "test"));
+ }
+ verify(hitPartition(isSecondPartition)).compareAsync(any(CompareRequest.class),
+ any(IntermediateResponseHandler.class));
+ verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+ verify(hitPartition(isSecondPartition)).close();
+ verifyZeroInteractions(missPartition(isSecondPartition));
+ }
+
+ @Test(dataProvider = "requests")
+ public void deleteShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+ final DN partitionDN) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(hash);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.deleteAsync(newDeleteRequest(requestDN));
+ }
+ verify(hitPartition(isSecondPartition)).deleteAsync(any(DeleteRequest.class),
+ any(IntermediateResponseHandler.class));
+ verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+ verify(hitPartition(isSecondPartition)).close();
+ verifyZeroInteractions(missPartition(isSecondPartition));
+ }
+
+ @Test(dataProvider = "requests")
+ public void extendedShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+ final DN partitionDN) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(hash);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.extendedRequestAsync(newPasswordModifyExtendedRequest().setUserIdentity("dn:" + requestDN));
+ }
+ verify(hitPartition(isSecondPartition)).extendedRequestAsync(any(ExtendedRequest.class),
+ any(IntermediateResponseHandler.class));
+ verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+ verify(hitPartition(isSecondPartition)).close();
+ verifyZeroInteractions(missPartition(isSecondPartition));
+ }
+
+ @Test(dataProvider = "requests")
+ public void modifyShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+ final DN partitionDN) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(hash);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.modifyAsync(newModifyRequest(requestDN));
+ }
+ verify(hitPartition(isSecondPartition)).modifyAsync(any(ModifyRequest.class),
+ any(IntermediateResponseHandler.class));
+ verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+ verify(hitPartition(isSecondPartition)).close();
+ verifyZeroInteractions(missPartition(isSecondPartition));
+ }
+
+ @Test(dataProvider = "requests")
+ public void modifyDNShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+ final DN partitionDN) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(hash);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.modifyDNAsync(newModifyDNRequest(requestDN, RDN.valueOf("cn=changed")));
+ }
+ verify(hitPartition(isSecondPartition)).modifyDNAsync(any(ModifyDNRequest.class),
+ any(IntermediateResponseHandler.class));
+ verify(hashFunction, atLeastOnce()).apply(partitionDN.toNormalizedUrlSafeString());
+ verify(hitPartition(isSecondPartition)).close();
+ verifyZeroInteractions(missPartition(isSecondPartition));
+ }
+
+ @Test(dataProvider = "requests")
+ public void searchBaseShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN,
+ final DN partitionDN) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(hash);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.searchAsync(newSearchRequest(requestDN, BASE_OBJECT, alwaysTrue()),
+ mock(SearchResultHandler.class));
+ }
+ verify(hitPartition(isSecondPartition)).searchAsync(any(SearchRequest.class),
+ any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString());
+ verify(hitPartition(isSecondPartition)).close();
+ verifyZeroInteractions(missPartition(isSecondPartition));
+ }
+
+ @Test
+ public void searchSingleLevelBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+ verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, SINGLE_LEVEL);
+ }
+
+ @Test
+ public void searchSingleLevelAbovePartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+ verifySearchAgainstSinglePartition(DN_ABOVE_PARTITION_BASE_DN, SINGLE_LEVEL);
+ }
+
+ @Test
+ public void searchSingleLevelAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+ verifySearchAgainstSinglePartition(UNPARTITIONED_DN, SINGLE_LEVEL);
+ }
+
+ @Test
+ public void searchSingleLevelAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+ verifySearchAgainstAllPartitions(PARTITION_BASE_DN, SINGLE_LEVEL);
+ }
+
+ @Test
+ public void searchSubtreeBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+ verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, WHOLE_SUBTREE);
+ }
+
+ @Test
+ public void searchSubtreeAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+ verifySearchAgainstSinglePartition(UNPARTITIONED_DN, WHOLE_SUBTREE);
+ }
+
+ @Test
+ public void searchSubtreeAbovePartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+ verifySearchAgainstAllPartitions(DN_ABOVE_PARTITION_BASE_DN, WHOLE_SUBTREE);
+ }
+
+ @Test
+ public void searchSubtreeAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+ verifySearchAgainstAllPartitions(PARTITION_BASE_DN, WHOLE_SUBTREE);
+ }
+
+ @Test
+ public void searchSubordinatesBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+ verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, SUBORDINATES);
+ }
+
+ @Test
+ public void searchSubordinatesAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception {
+ verifySearchAgainstSinglePartition(UNPARTITIONED_DN, SUBORDINATES);
+ }
+
+ @Test
+ public void searchSubordinatesAbovePartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+ verifySearchAgainstAllPartitions(DN_ABOVE_PARTITION_BASE_DN, SUBORDINATES);
+ }
+
+ @Test
+ public void searchSubordinatesAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception {
+ verifySearchAgainstAllPartitions(PARTITION_BASE_DN, SUBORDINATES);
+ }
+
+ private void verifySearchAgainstSinglePartition(final DN dn, final SearchScope scope) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(P1_HASH);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.searchAsync(newSearchRequest(dn, scope, alwaysTrue()),
+ mock(SearchResultHandler.class));
+ }
+ verify(partition1Conn).searchAsync(any(SearchRequest.class),
+ any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(hashFunction).apply(dn.toNormalizedUrlSafeString());
+ verify(partition1Conn).close();
+ verifyZeroInteractions(partition2Conn);
+ }
+
+ private void verifySearchAgainstAllPartitions(final DN dn, final SearchScope scope) throws Exception {
+ when(hashFunction.apply(any())).thenReturn(P1_HASH);
+ try (final Connection connection = loadBalancer.getConnection()) {
+ connection.searchAsync(newSearchRequest(dn, scope, alwaysTrue()), mock(SearchResultHandler.class));
+ }
+ verify(partition1Conn).searchAsync(any(SearchRequest.class),
+ any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(partition2Conn).searchAsync(any(SearchRequest.class),
+ any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ verify(partition1Conn).close();
+ verify(partition2Conn).close();
+ }
+
+ private Connection missPartition(final boolean isSecondPartition) {
+ return isSecondPartition ? partition1Conn : partition2Conn;
+ }
+
+ private Connection hitPartition(final boolean isSecondPartition) {
+ return isSecondPartition ? partition2Conn : partition1Conn;
+ }
+}
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java
new file mode 100644
index 0000000..c7316dc
--- /dev/null
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java
@@ -0,0 +1,141 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.forgerock.opendj.ldap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+public class ConsistentHashMapTest extends SdkTestCase {
+ private static final String P1 = "partition-1";
+ private static final String P2 = "partition-2";
+ private static final String P3 = "partition-3";
+ private static final String P4 = "partition-4";
+
+ @DataProvider
+ public static Object[][] md5HashFunction() {
+ // @Checkstyle:off
+ return new Object[][] {
+ { P1, ByteString.valueOfHex("94f940e6c2a31703d6be067b87d67968").toInt() },
+ { P2, ByteString.valueOfHex("1038f27a22f8b574670bfdd7f918ffb4").toInt() },
+ { P3, ByteString.valueOfHex("a88f746bfae24480be40df47c679228a").toInt() },
+ { P4, ByteString.valueOfHex("bdb89200d2a310f5afb345621f544183").toInt() },
+ };
+ // @Checkstyle:on
+ }
+
+ @Test(dataProvider = "md5HashFunction")
+ public void testMD5HashFunction(final String s, final int expected) {
+ assertThat(ConsistentHashMap.MD5.apply(s)).isEqualTo(expected);
+ }
+
+ @Test
+ public void testConsistentHashMap() {
+ // Create a hash function that returns predictable values.
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ final Function<Object, Integer, NeverThrowsException> hashFunction = mock(Function.class);
+
+ // Hashes for the 16 logical partitions.
+ when(hashFunction.apply(any())).thenReturn(0x00000000, 0x40000000, 0x80000000, 0xC0000000,
+ 0x10000000, 0x50000000, 0x90000000, 0xD0000000,
+ 0x20000000, 0x60000000, 0xA0000000, 0xE0000000,
+ 0x30000000, 0x70000000, 0xB0000000, 0xF0000000);
+
+ final ConsistentHashMap<String> partitions = new ConsistentHashMap<>(hashFunction);
+ partitions.put(P1, P1, 4);
+ partitions.put(P2, P2, 4);
+ partitions.put(P3, P3, 4);
+ partitions.put(P4, P4, 4);
+
+ // Check the structure of the CHM.
+ assertThat(partitions.isEmpty()).isFalse();
+ assertThat(partitions.size()).isEqualTo(4);
+ assertThat(partitions.getAll()).containsOnly(P1, P2, P3, P4);
+
+ final Map<String, Long> weights = partitions.getWeights();
+ assertThat(weights.size()).isEqualTo(4);
+ assertThat(weights).containsEntry(P1, 0x40000000L);
+ assertThat(weights).containsEntry(P2, 0x40000000L);
+ assertThat(weights).containsEntry(P3, 0x40000000L);
+ assertThat(weights).containsEntry(P4, 0x40000000L);
+
+ // Hashes for several key lookups.
+ when(hashFunction.apply(any())).thenReturn(0x70000001, 0x7FFFFFFF, 0x80000000, // P1
+ 0xF0000001, 0xFFFFFFFF, 0x00000000, // P1
+ 0x00000001, 0x0FFFFFFF, 0x10000000, // P2
+ 0xE0000001, 0xEFFFFFFF, 0xF0000000); // P4
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+
+ assertThat(partitions.get(P2)).isEqualTo(P2);
+ assertThat(partitions.get(P2)).isEqualTo(P2);
+ assertThat(partitions.get(P2)).isEqualTo(P2);
+
+ assertThat(partitions.get(P4)).isEqualTo(P4);
+ assertThat(partitions.get(P4)).isEqualTo(P4);
+ assertThat(partitions.get(P4)).isEqualTo(P4);
+
+ // Remove a partition and retry.
+ when(hashFunction.apply(any())).thenReturn(0x10000000, 0x50000000, 0x90000000, 0xD0000000);
+ partitions.remove(P2);
+
+ // Check the structure of the CHM.
+ assertThat(partitions.isEmpty()).isFalse();
+ assertThat(partitions.size()).isEqualTo(3);
+ assertThat(partitions.getAll()).containsOnly(P1, P3, P4);
+
+ final Map<String, Long> newWeights = partitions.getWeights();
+ assertThat(newWeights.size()).isEqualTo(3);
+ assertThat(newWeights).containsEntry(P1, 0x40000000L);
+ assertThat(newWeights).containsEntry(P3, 0x80000000L); // P2 now falls through to P3
+ assertThat(newWeights).containsEntry(P4, 0x40000000L);
+
+ // Hashes for several key lookups.
+ when(hashFunction.apply(any())).thenReturn(0x70000001, 0x7FFFFFFF, 0x80000000, // P1
+ 0xF0000001, 0xFFFFFFFF, 0x00000000, // P1
+ 0x00000001, 0x0FFFFFFF, 0x10000000, // P3 (was P2)
+ 0xE0000001, 0xEFFFFFFF, 0xF0000000); // P4
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+ assertThat(partitions.get(P1)).isEqualTo(P1);
+
+ assertThat(partitions.get(P2)).isEqualTo(P3);
+ assertThat(partitions.get(P2)).isEqualTo(P3);
+ assertThat(partitions.get(P2)).isEqualTo(P3);
+
+ assertThat(partitions.get(P4)).isEqualTo(P4);
+ assertThat(partitions.get(P4)).isEqualTo(P4);
+ assertThat(partitions.get(P4)).isEqualTo(P4);
+ }
+}
--
Gitblit v1.10.0