| opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java | ●●●●● patch | view | raw | blame | history | |
| opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java | ●●●●● patch | view | raw | blame | history | |
| opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java | ●●●●● patch | view | raw | blame | history | |
| opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties | ●●●●● patch | view | raw | blame | history | |
| opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java | ●●●●● patch | view | raw | blame | history | |
| opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java | ●●●●● patch | view | raw | blame | history |
opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -569,67 +569,134 @@ @Override public RequestWithIndex apply(final Request request) { // Normalize the hash to a valid factory index, taking care of negative hash values and especially // Integer.MIN_VALUE (see doc for Math.abs()). final int index = computeIndexBasedOnDnHashCode(request); return new RequestWithIndex(request, index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex)); } private int computeIndexBasedOnDnHashCode(final Request request) { // The following conditions are ordered such that the most common operations appear first in order to // reduce the average number of branches. A better solution would be to use a visitor, but a visitor // would only apply to the core operations, not extended operations or SASL binds. if (request instanceof SearchRequest) { return ((SearchRequest) request).getName().hashCode(); } else if (request instanceof ModifyRequest) { return ((ModifyRequest) request).getName().hashCode(); } else if (request instanceof SimpleBindRequest) { return hashCodeOfDnString(((SimpleBindRequest) request).getName()); } else if (request instanceof AddRequest) { return ((AddRequest) request).getName().hashCode(); } else if (request instanceof DeleteRequest) { return ((DeleteRequest) request).getName().hashCode(); } else if (request instanceof CompareRequest) { return ((CompareRequest) request).getName().hashCode(); } else if (request instanceof ModifyDNRequest) { return ((ModifyDNRequest) request).getName().hashCode(); } else if (request instanceof PasswordModifyExtendedRequest) { return hashCodeOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString()); } else if (request instanceof PlainSASLBindRequest) { return hashCodeOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID()); } else if (request instanceof DigestMD5SASLBindRequest) { return hashCodeOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID()); } else if (request instanceof GSSAPISASLBindRequest) { return hashCodeOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID()); } else if (request instanceof CRAMMD5SASLBindRequest) { return hashCodeOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID()); } else { return distributeRequestAtRandom(); } } private int hashCodeOfAuthzid(final String authzid) { if (authzid != null && authzid.startsWith("dn:")) { return hashCodeOfDnString(authzid.substring(3)); } return distributeRequestAtRandom(); } private int hashCodeOfDnString(final String dnString) { try { return DN.valueOf(dnString).hashCode(); } catch (final IllegalArgumentException ignored) { return distributeRequestAtRandom(); } } private int distributeRequestAtRandom() { return ThreadLocalRandom.current().nextInt(0, maxIndex); final int index = computePartitionIdFromDN(dnOfRequest(request), maxIndex); return new RequestWithIndex(request, index); } }; } /** * Returns the partition ID which should be selected based on the provided DN and number of partitions, taking care * of negative hash values and especially Integer.MIN_VALUE (see doc for Math.abs()). If the provided DN is * {@code null} then a random partition ID will be returned. * * @param dn * The DN whose partition ID is to be determined, which may be {@code null}. * @param numberOfPartitions * The total number of partitions. * @return A partition ID in the range 0 <= partitionID < numberOfPartitions. */ private static int computePartitionIdFromDN(final DN dn, final int numberOfPartitions) { final int index = dn != null ? dn.hashCode() : ThreadLocalRandom.current().nextInt(0, numberOfPartitions); return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % numberOfPartitions); } /** * Returns the DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be * determined. This method will return {@code null} for most extended operations and SASL bind requests which * specify a non-DN authorization ID. * * @param request * The request whose target entry DN is to be determined. * @return The DN of the entry targeted by the provided request, or {@code null} if the target entry cannot be * determined. */ static DN dnOfRequest(final Request request) { // The following conditions are ordered such that the most common operations appear first in order to // reduce the average number of branches. A better solution would be to use a visitor, but a visitor // would only apply to the core operations, not extended operations or SASL binds. if (request instanceof SearchRequest) { return ((SearchRequest) request).getName(); } else if (request instanceof ModifyRequest) { return ((ModifyRequest) request).getName(); } else if (request instanceof SimpleBindRequest) { return dnOf(((SimpleBindRequest) request).getName()); } else if (request instanceof AddRequest) { return ((AddRequest) request).getName(); } else if (request instanceof DeleteRequest) { return ((DeleteRequest) request).getName(); } else if (request instanceof CompareRequest) { return ((CompareRequest) request).getName(); } else if (request instanceof ModifyDNRequest) { return ((ModifyDNRequest) request).getName(); } else if (request instanceof PasswordModifyExtendedRequest) { return dnOfAuthzid(((PasswordModifyExtendedRequest) request).getUserIdentityAsString()); } else if (request instanceof PlainSASLBindRequest) { return dnOfAuthzid(((PlainSASLBindRequest) request).getAuthenticationID()); } else if (request instanceof DigestMD5SASLBindRequest) { return dnOfAuthzid(((DigestMD5SASLBindRequest) request).getAuthenticationID()); } else if (request instanceof GSSAPISASLBindRequest) { return dnOfAuthzid(((GSSAPISASLBindRequest) request).getAuthenticationID()); } else if (request instanceof CRAMMD5SASLBindRequest) { return dnOfAuthzid(((CRAMMD5SASLBindRequest) request).getAuthenticationID()); } else { return null; } } private static DN dnOfAuthzid(final String authzid) { if (authzid != null && authzid.startsWith("dn:")) { return dnOf(authzid.substring(3)); } return null; } private static DN dnOf(final String dnString) { try { return DN.valueOf(dnString); } catch (final IllegalArgumentException ignored) { return null; } } /** * Creates a distribution load balancer which uses consistent hashing to distributes requests across a set of * partitions based on a hash of each request's target DN. More precisely, a partition is selected as follows: * <ul> * <li>if the targeted entry lies beneath the partition base DN then the partition is selected based on a hash of * the DN which is superior to the target DN and immediately subordinate to the partition base DN</li> * <li>otherwise, if the request is not a search then the request is routed to a random partition</li> * <li>otherwise, if the search request targets the partition base DN, or a superior thereof, then the search is * routed to a random partition or broadcast to all partitions, depending on the search scope. When broadcasting, * care is taken to re-scope sub-requests in order to avoid returning duplicate entries</li> * <li>otherwise, the search is routed to a random partition because its scope lies outside of the partition * space.</li> * </ul> * This load balancer allows client applications to linearly scale their deployment for write throughput as well * as total number of entries. For example, if a single replicated topology can support 10000 updates/s and a * total of 100M entries, then a 4 way distributed topology could support up to 40000 updates/s and 400M entries. * <p/> * <b>NOTE:</b> there are a number of assumptions in the design of this load balancer as well as a number of * limitations: * <ul> * <li>simple paged results, server side sorting, and VLV request controls are not supported for searches which * traverse all partitions. </li> * <li>persistent searches which traverse all partitions are only supported if they request changes only. </li> * <li>requests which target an entry which is not below the partition base DN will be routed to a partition * selected based on the request's DN, thereby providing affinity. Note that this behavior assumes that entries * which are not below the partition base DN are replicated across all partitions.</li> * <li>searches that traverse multiple partitions as well as entries above the partition base DN may return * results in a non-hierarchical order. Specifically, entries from a partition (below the partition base DN) * may be returned before entries above the partition base DN. Although not required by the LDAP standard, some * legacy clients expect entries to be returned in hierarchical order. * </li> * </ul> * * @param partitionBaseDN * The DN beneath which data is partitioned. All other data is assumed to be shared across all partitions. * @param partitions * The consistent hash map containing the partitions to be distributed. * @param options * The configuration options for the load-balancer (no options are supported currently). * @return The new distribution load balancer. */ @SuppressWarnings("unused") public static ConnectionFactory newFixedSizeDistributionLoadBalancer(final DN partitionBaseDN, final ConsistentHashMap<? extends ConnectionFactory> partitions, final Options options) { return new ConsistentHashDistributionLoadBalancer(partitionBaseDN, partitions); } /** * Creates a new "least requests" load-balancer which will load-balance individual requests across the provided * set of connection factories, each typically representing a single replica, using an algorithm that ensures that * requests are routed to the replica which has the minimum number of active requests. opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancer.java
New file @@ -0,0 +1,498 @@ /* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2016 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static com.forgerock.opendj.ldap.CoreMessages.*; import static org.forgerock.opendj.ldap.Connections.dnOfRequest; import static org.forgerock.opendj.ldap.LdapException.newLdapException; import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_USER_CANCELLED; import static org.forgerock.opendj.ldap.ResultCode.PROTOCOL_ERROR; import static org.forgerock.opendj.ldap.ResultCode.UNWILLING_TO_PERFORM; import static org.forgerock.opendj.ldap.SearchScope.SUBORDINATES; import static org.forgerock.opendj.ldap.requests.Requests.copyOfSearchRequest; import static org.forgerock.opendj.ldap.spi.LdapPromises.newFailedLdapPromise; import static org.forgerock.util.Utils.closeSilently; import static org.forgerock.util.Utils.joinAsString; import static org.forgerock.util.promise.Promises.newResultPromise; import static org.forgerock.util.promise.Promises.when; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.opendj.ldap.controls.PersistentSearchRequestControl; import org.forgerock.opendj.ldap.controls.ServerSideSortRequestControl; import org.forgerock.opendj.ldap.controls.SimplePagedResultsControl; import org.forgerock.opendj.ldap.controls.VirtualListViewRequestControl; import org.forgerock.opendj.ldap.requests.AbandonRequest; import org.forgerock.opendj.ldap.requests.AddRequest; import org.forgerock.opendj.ldap.requests.BindRequest; import org.forgerock.opendj.ldap.requests.CompareRequest; import org.forgerock.opendj.ldap.requests.DeleteRequest; import org.forgerock.opendj.ldap.requests.ExtendedRequest; import org.forgerock.opendj.ldap.requests.ModifyDNRequest; import org.forgerock.opendj.ldap.requests.ModifyRequest; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.BindResult; import org.forgerock.opendj.ldap.responses.CompareResult; import org.forgerock.opendj.ldap.responses.ExtendedResult; import org.forgerock.opendj.ldap.responses.IntermediateResponse; import org.forgerock.opendj.ldap.responses.Result; import org.forgerock.opendj.ldap.responses.SearchResultEntry; import org.forgerock.opendj.ldap.responses.SearchResultReference; import org.forgerock.opendj.ldap.spi.ConnectionState; import org.forgerock.opendj.ldap.spi.LdapPromises; import org.forgerock.util.AsyncFunction; import org.forgerock.util.promise.ExceptionHandler; import org.forgerock.util.promise.Promise; import org.forgerock.util.promise.PromiseImpl; import org.forgerock.util.promise.ResultHandler; import org.forgerock.util.promise.RuntimeExceptionHandler; /** * A simple distribution algorithm which distributes requests equally across a fixed number of partitions based on a * hash of the request's target DN. See {@link Connections#newFixedSizeDistributionLoadBalancer}. */ final class ConsistentHashDistributionLoadBalancer implements ConnectionFactory { private static final String NAME = ConsistentHashDistributionLoadBalancer.class.getSimpleName(); private final ConsistentHashMap<? extends ConnectionFactory> partitions; private final DN partitionBaseDN; ConsistentHashDistributionLoadBalancer(final DN partitionBaseDN, final ConsistentHashMap<? extends ConnectionFactory> partitions) { this.partitionBaseDN = partitionBaseDN; this.partitions = partitions; } @Override public final void close() { closeSilently(partitions.getAll()); } @Override public final String toString() { return NAME + '(' + joinAsString(",", partitions) + ')'; } @Override public final Connection getConnection() throws LdapException { return new ConnectionImpl(); } @Override public final Promise<Connection, LdapException> getConnectionAsync() { return newResultPromise((Connection) new ConnectionImpl()); } private class ConnectionImpl extends AbstractAsynchronousConnection { private final ConnectionState state = new ConnectionState(); @Override public String toString() { return NAME + "Connection"; } @Override public LdapPromise<Void> abandonAsync(final AbandonRequest request) { // We cannot possibly route these correctly, so just drop them. return LdapPromises.newSuccessfulLdapPromise(null); } @Override public LdapPromise<Result> addAsync(final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler) { final ConnectionFactory partition = getPartition(request.getName()); return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() { @Override public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { return connection.addAsync(request, intermediateResponseHandler); } }); } @Override public void addConnectionEventListener(final ConnectionEventListener listener) { state.addConnectionEventListener(listener); } @Override public LdapPromise<BindResult> bindAsync(final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler) { final DN dn = dnOfRequest(request); if (dn == null) { return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM, DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_BIND.get())); } final ConnectionFactory partition = getPartition(dn); return connectAndSendRequest(partition, new AsyncFunction<Connection, BindResult, LdapException>() { @Override public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException { return connection.bindAsync(request, intermediateResponseHandler); } }); } @Override public void close(final UnbindRequest request, final String reason) { state.notifyConnectionClosed(); } @Override public LdapPromise<CompareResult> compareAsync(final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler) { final ConnectionFactory partition = getPartition(request.getName()); return connectAndSendRequest(partition, new AsyncFunction<Connection, CompareResult, LdapException>() { @Override public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException { return connection.compareAsync(request, intermediateResponseHandler); } }); } @Override public LdapPromise<Result> deleteAsync(final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler) { // We could reject requests that attempt to delete entries superior to the partition base DN. However, // this is really the responsibility of the backend servers to enforce. In some cases such requests // should be allowed, e.g. if the partitions are all empty. final ConnectionFactory partition = getPartition(request.getName()); return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() { @Override public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { return connection.deleteAsync(request, intermediateResponseHandler); } }); } @Override public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) { final DN dn = dnOfRequest(request); if (dn == null) { return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM, DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_EXT_OP.get())); } final ConnectionFactory partition = getPartition(dn); return connectAndSendRequest(partition, new AsyncFunction<Connection, R, LdapException>() { @Override public Promise<R, LdapException> apply(final Connection connection) throws LdapException { return connection.extendedRequestAsync(request, intermediateResponseHandler); } }); } @Override public boolean isClosed() { return state.isClosed(); } @Override public boolean isValid() { return state.isValid(); } @Override public LdapPromise<Result> modifyAsync(final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler) { final ConnectionFactory partition = getPartition(request.getName()); return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() { @Override public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { return connection.modifyAsync(request, intermediateResponseHandler); } }); } @Override public LdapPromise<Result> modifyDNAsync(final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler) { final DN oldDN = request.getName(); final ConnectionFactory oldPartition = getPartition(oldDN); final DN newParent = request.getNewSuperior() != null ? request.getNewSuperior() : oldDN.parent(); final DN newDN = newParent.child(request.getNewRDN()); final ConnectionFactory newPartition = getPartition(newDN); // Acceptable if the request is completely outside of the partitions or entirely within a single partition. if (oldPartition != newPartition) { return unwillingToPerform(DISTRIBUTION_MODDN_SPANS_MULTIPLE_PARTITIONS.get()); } return connectAndSendRequest(oldPartition, new AsyncFunction<Connection, Result, LdapException>() { @Override public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { return connection.modifyDNAsync(request, intermediateResponseHandler); } }); } @Override public void removeConnectionEventListener(final ConnectionEventListener listener) { state.removeConnectionEventListener(listener); } @Override public LdapPromise<Result> searchAsync(final SearchRequest request, final IntermediateResponseHandler intermediateResponseHandler, final SearchResultHandler entryHandler) { // Broadcast if the search targets an entry which is superior to the base DN and has a scope which // reaches into the partitions. final DN dn = request.getName(); switch (request.getScope().asEnum()) { case BASE_OBJECT: // Request can always be handled by a single partition. return searchSinglePartition(request, intermediateResponseHandler, entryHandler); case SINGLE_LEVEL: // Needs broadcasting if the search DN is equal to the partition DN. if (dn.equals(partitionBaseDN)) { return searchAllPartitions(request, intermediateResponseHandler, entryHandler); } else { return searchSinglePartition(request, intermediateResponseHandler, entryHandler); } case WHOLE_SUBTREE: // Needs broadcasting if the search DN is superior or equal to the partition DN. if (dn.isSuperiorOrEqualTo(partitionBaseDN)) { // Split into primary search against search DN and secondary searches against partition DN. return splitAndSearchAllPartitions(request, intermediateResponseHandler, entryHandler); } else { return searchSinglePartition(request, intermediateResponseHandler, entryHandler); } case SUBORDINATES: // Needs broadcasting if the search DN is superior or equal to the partition DN. if (dn.equals(partitionBaseDN)) { return searchAllPartitions(request, intermediateResponseHandler, entryHandler); } else if (dn.isSuperiorOrEqualTo(partitionBaseDN)) { // Split into primary search against search DN and secondary searches against partition DN. return splitAndSearchAllPartitions(request, intermediateResponseHandler, entryHandler); } else { return searchSinglePartition(request, intermediateResponseHandler, entryHandler); } default: // UNKNOWN return unwillingToPerform(DISTRIBUTION_UNSUPPORTED_SEARCH_SCOPE.get(request.getScope().intValue())); } } /** * Forwards a search request to each partition. The search request is scoped such that duplicate results are * not possible. */ private LdapPromise<Result> searchAllPartitions(final SearchRequest request, final IntermediateResponseHandler irh, final SearchResultHandler srh) { return broadcastSearch(request, request, irh, srh); } /** * In order to avoid duplicates, search one partition using the request's DN and the remaining partitions using * the partition base DN with scope "SUBORDINATES". * <p> * TODO: may return results that are not in hierarchical order. */ private LdapPromise<Result> splitAndSearchAllPartitions(final SearchRequest primarySearch, final IntermediateResponseHandler irh, final SearchResultHandler srh) { final SearchRequest secondarySearch = copyOfSearchRequest(primarySearch).setName(partitionBaseDN).setScope(SUBORDINATES); return broadcastSearch(primarySearch, secondarySearch, irh, srh); } /** * Take care to ensure that all searches are cancelled when one of them fails (important if the * searches are persistent). */ private LdapPromise<Result> broadcastSearch(final SearchRequest primarySearch, final SearchRequest secondarySearch, final IntermediateResponseHandler irh, final SearchResultHandler srh) { // First reject requests that contain controls that we do not support when broadcast. if (primarySearch.containsControl(VirtualListViewRequestControl.OID)) { return unwillingToPerform(DISTRIBUTION_VLV_CONTROL_NOT_SUPPORTED.get()); } else if (primarySearch.containsControl(SimplePagedResultsControl.OID)) { return unwillingToPerform(DISTRIBUTION_SPR_CONTROL_NOT_SUPPORTED.get()); } else if (primarySearch.containsControl(ServerSideSortRequestControl.OID)) { return unwillingToPerform(DISTRIBUTION_SSS_CONTROL_NOT_SUPPORTED.get()); } else if (primarySearch.containsControl(PersistentSearchRequestControl.OID)) { // Persistent searches return results in two phases: the initial search results and then the changes. // Unfortunately, there's no way to determine when the first phase has completed, so it's not // possible to prevent the two phases from being interleaved when broadcast. try { final PersistentSearchRequestControl control = primarySearch.getControl(PersistentSearchRequestControl.DECODER, new DecodeOptions()); if (!control.isChangesOnly()) { return unwillingToPerform(DISTRIBUTION_PSEARCH_CONTROL_NOT_SUPPORTED.get()); } } catch (final DecodeException e) { return newFailedLdapPromise(newLdapException(PROTOCOL_ERROR, e.getMessage(), e)); } } final List<Promise<Result, LdapException>> promises = new ArrayList<>(partitions.size()); // Launch all searches with the primary search targeting a random partition. final ConnectionFactory primaryPartition = getPartition(primarySearch.getName()); final IntermediateResponseHandler sirh = synchronize(irh); final SearchResultHandler ssrh = synchronize(srh); for (final ConnectionFactory partition : partitions.getAll()) { final SearchRequest searchRequest = partition == primaryPartition ? primarySearch : secondarySearch; promises.add(searchSinglePartition(searchRequest, sirh, ssrh, partition)); } // FIXME: chained PromiseImpl and Promises.when() don't chain cancellation requests. final PromiseImpl<Result, LdapException> reducedPromise = new PromiseImpl<Result, LdapException>() { @Override protected LdapException tryCancel(final boolean mayInterruptIfRunning) { for (Promise<Result, LdapException> promise : promises) { promise.cancel(mayInterruptIfRunning); } return newLdapException(CLIENT_SIDE_USER_CANCELLED); } }; when(promises).thenOnResult(new ResultHandler<List<Result>>() { @Override public void handleResult(final List<Result> results) { // TODO: Depending on controls we may want to merge these results in some way. reducedPromise.handleResult(results.get(0)); } }).thenOnException(new ExceptionHandler<LdapException>() { @Override public void handleException(final LdapException exception) { reducedPromise.handleException(exception); } }).thenOnRuntimeException(new RuntimeExceptionHandler() { @Override public void handleRuntimeException(final RuntimeException exception) { reducedPromise.handleRuntimeException(exception); } }).thenFinally(new Runnable() { @Override public void run() { // Ensure that any remaining searches are terminated. for (Promise<Result, LdapException> promise : promises) { promise.cancel(true); } } }); return LdapPromises.asPromise(reducedPromise); } private LdapPromise<Result> unwillingToPerform(final LocalizableMessage msg) { return newFailedLdapPromise(newLdapException(UNWILLING_TO_PERFORM, msg)); } /** * Forwards a search request to a single partition based on the search's base DN. If the DN does not target a * specific partition then a random partition will be selected. */ private LdapPromise<Result> searchSinglePartition(final SearchRequest request, final IntermediateResponseHandler irh, final SearchResultHandler srh) { final ConnectionFactory partition = getPartition(request.getName()); return searchSinglePartition(request, irh, srh, partition); } private LdapPromise<Result> searchSinglePartition(final SearchRequest request, final IntermediateResponseHandler irh, final SearchResultHandler srh, final ConnectionFactory partition) { return connectAndSendRequest(partition, new AsyncFunction<Connection, Result, LdapException>() { @Override public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { return connection.searchAsync(request, irh, srh); } }); } /** Returns a search result handler that ensures that only one response is processed at a time. */ private SearchResultHandler synchronize(final SearchResultHandler srh) { return new SearchResultHandler() { @Override public synchronized boolean handleEntry(final SearchResultEntry entry) { return srh.handleEntry(entry); } @Override public synchronized boolean handleReference(final SearchResultReference reference) { return srh.handleReference(reference); } }; } /** Returns an intermediate response handler that ensures that only one response is processed at a time. */ private IntermediateResponseHandler synchronize(final IntermediateResponseHandler irh) { return new IntermediateResponseHandler() { @Override public synchronized boolean handleIntermediateResponse(final IntermediateResponse response) { return irh.handleIntermediateResponse(response); } }; } /** * Returns the partition for requests having the provided DN. If the DN is not associated with a specific * partition then a random partition will be returned. */ private ConnectionFactory getPartition(final DN dn) { final DN partitionDN = getPartitionDN(dn); return partitions.get((partitionDN != null ? partitionDN : dn).toNormalizedUrlSafeString()); } /** * Returns the DN which is a child of the partition base DN and which is equal to or superior to the provided * DN, otherwise {@code null}. */ private DN getPartitionDN(final DN dn) { final int depthBelowBaseDN = dn.size() - partitionBaseDN.size(); if (depthBelowBaseDN > 0 && dn.isSubordinateOrEqualTo(partitionBaseDN)) { return dn.parent(depthBelowBaseDN - 1); } return null; } private <R> LdapPromise<R> connectAndSendRequest(final ConnectionFactory partition, final AsyncFunction<Connection, R, LdapException> doRequest) { if (state.isClosed()) { throw new IllegalStateException("Connection is already closed"); } final AtomicReference<Connection> connectionHolder = new AtomicReference<>(); return connectAsync(partition).thenOnResult(new ResultHandler<Connection>() { @Override public void handleResult(final Connection connection) { connectionHolder.set(connection); } // FIXME: how do we support cancellation of the request? }).thenAsync(doRequest).thenFinally(new Runnable() { @Override public void run() { closeSilently(connectionHolder.get()); } }); } private LdapPromise<Connection> connectAsync(final ConnectionFactory partition) { return LdapPromises.asPromise(partition.getConnectionAsync() .thenOnException(new ExceptionHandler<LdapException>() { @Override public void handleException(final LdapException e) { state.notifyConnectionError(false, e); } })); } } } opendj-core/src/main/java/org/forgerock/opendj/ldap/ConsistentHashMap.java
New file @@ -0,0 +1,288 @@ /* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2016 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static java.nio.charset.StandardCharsets.UTF_8; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Collection; import java.util.IdentityHashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.locks.ReentrantLock; import org.forgerock.util.Function; import org.forgerock.util.Reject; import org.forgerock.util.annotations.VisibleForTesting; import org.forgerock.util.promise.NeverThrowsException; /** * An implementation of "consistent hashing" supporting per-partition weighting. This implementation is thread safe * and allows partitions to be added and removed during use. * <p> * This implementation maps partitions to one or more points on a circle ranging from {@link Integer#MIN_VALUE} to * {@link Integer#MAX_VALUE}. The number of points per partition is dictated by the partition's weight. A partition * with a weight which is higher than another partition will receive a proportionally higher load. * * @param <P> The type of partition object. * * @see <a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.23.3738">Consistent Hashing and Random * Trees</a> * @see <a href="http://www8.org/w8-papers/2a-webserver/caching/paper2.html">Web Caching with Consistent Hashing</a> */ public final class ConsistentHashMap<P> { // TODO: add methods for determining which partitions will need to be rebalanced when a partition is added or // removed. /** The default weight. The value is relatively high in order to minimize the risk of imbalances. */ private static final int DEFAULT_WEIGHT = 200; /** Default hash function based on MD5. */ @VisibleForTesting static final Function<Object, Integer, NeverThrowsException> MD5 = new Function<Object, Integer, NeverThrowsException>() { @Override public Integer apply(final Object key) { final byte[] bytes = key.toString().getBytes(UTF_8); final byte[] digest = getMD5Digest().digest(bytes); return ByteString.wrap(digest).toInt(); } private MessageDigest getMD5Digest() { // TODO: we may want to cache these. try { return MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } } }; /** Synchronizes updates. Reads are protected by copy on write. */ private final ReentrantLock writeLock = new ReentrantLock(); /** Consistent hash map circle. */ private volatile NavigableMap<Integer, Node<P>> circle = new TreeMap<>(); /** Maps partition IDs to their partition. */ private volatile Map<String, P> partitions = new LinkedHashMap<>(); /** Function used for hashing keys. */ private final Function<Object, Integer, NeverThrowsException> hashFunction; /** Creates a new consistent hash map which will hash keys using MD5. */ public ConsistentHashMap() { this(MD5); } /** * Creates a new consistent hash map which will hash keys using the provided hash function. * * @param hashFunction * The function which should be used for hashing keys. */ public ConsistentHashMap(final Function<Object, Integer, NeverThrowsException> hashFunction) { this.hashFunction = hashFunction; } /** * Puts a partition into this consistent hash map using the default weight which is sufficiently high to ensure a * reasonably uniform distribution among all partitions having the same weight. * * @param partitionId * The partition ID. * @param partition * The partition. * @return This consistent hash map. */ public ConsistentHashMap<P> put(final String partitionId, final P partition) { return put(partitionId, partition, DEFAULT_WEIGHT); } /** * Puts a partition into this consistent hash map using the specified weight. If all partitions have the same * weight then they will each receive a similar amount of load. A partition having a weight which is twice that * of another will receive twice the load. Weight values should generally be great than 200 in order to minimize * the risk of unexpected imbalances due to the way in which logical partitions are mapped to real partitions. * * @param partitionId * The partition ID. * @param partition * The partition. * @param weight * The partition's weight, which should typically be over 200 and never negative. * @return This consistent hash map. */ public ConsistentHashMap<P> put(final String partitionId, final P partition, final int weight) { Reject.ifNull(partitionId, "partitionId must be non-null"); Reject.ifNull(partition, "partition must be non-null"); Reject.ifTrue(weight < 0, "Weight must be a positive integer"); final Node<P> node = new Node<>(partitionId, partition, weight); writeLock.lock(); try { final TreeMap<Integer, Node<P>> newCircle = new TreeMap<>(circle); for (int i = 0; i < weight; i++) { newCircle.put(hashFunction.apply(partitionId + i), node); } final Map<String, P> newPartitions = new LinkedHashMap<>(partitions); newPartitions.put(partitionId, partition); // It doesn't matter that these assignments are not atomic. circle = newCircle; partitions = newPartitions; } finally { writeLock.unlock(); } return this; } /** * Removes the partition that was previously added using the provided partition ID. * * @param partitionId * The partition ID. * @return This consistent hash map. */ public ConsistentHashMap<P> remove(final String partitionId) { Reject.ifNull(partitionId, "partitionId must be non-null"); writeLock.lock(); try { if (partitions.containsKey(partitionId)) { final TreeMap<Integer, Node<P>> newCircle = new TreeMap<>(circle); final Node<P> node = newCircle.remove(hashFunction.apply(partitionId + 0)); for (int i = 1; i < node.weight; i++) { newCircle.remove(hashFunction.apply(partitionId + i)); } final Map<String, P> newPartitions = new LinkedHashMap<>(partitions); newPartitions.remove(partitionId); // It doesn't matter that these assignments are not atomic. circle = newCircle; partitions = newPartitions; } } finally { writeLock.unlock(); } return this; } /** * Returns the partition from this map corresponding to the provided key's hash, or {@code null} if this map is * empty. * * @param key * The key for which a corresponding partition is to be returned. * @return The partition from this map corresponding to the provided key's hash, or {@code null} if this map is * empty. */ P get(final Object key) { final NavigableMap<Integer, Node<P>> circleSnapshot = circle; final Map.Entry<Integer, Node<P>> ceilingEntry = circleSnapshot.ceilingEntry(hashFunction.apply(key)); if (ceilingEntry != null) { return ceilingEntry.getValue().partition; } final Map.Entry<Integer, Node<P>> firstEntry = circleSnapshot.firstEntry(); return firstEntry != null ? firstEntry.getValue().partition : null; } /** * Returns a collection containing all of the partitions contained in this consistent hash map. * * @return A collection containing all of the partitions contained in this consistent hash map. */ Collection<P> getAll() { return partitions.values(); } /** * Returns the number of partitions in this consistent hash map. * * @return The number of partitions in this consistent hash map. */ int size() { return partitions.size(); } /** * Returns {@code true} if there are no partitions in this consistent hash map. * * @return {@code true} if there are no partitions in this consistent hash map. */ boolean isEmpty() { return partitions.isEmpty(); } /** * Returns a map whose keys are the partitions stored in this map and whose values are the actual weights associated * with each partition. The sum of the weights will be equal to 2^32. * <p/> * This method is intended for testing, but may one day be used in order to query the current status of the * load-balancer and, in particular, the weighting associated with each partition as a percentage. * * @return A map whose keys are the partitions stored in this map and whose values are the actual weights associated * with each partition. */ @VisibleForTesting Map<P, Long> getWeights() { final NavigableMap<Integer, Node<P>> circleSnapshot = circle; final IdentityHashMap<P, Long> weights = new IdentityHashMap<>(); Map.Entry<Integer, Node<P>> previousEntry = null; for (final Map.Entry<Integer, Node<P>> entry : circleSnapshot.entrySet()) { final long index = entry.getKey(); final P partition = entry.getValue().partition; if (previousEntry == null) { // Special case for first value since the range begins with the last entry. final long range1 = (long) Integer.MAX_VALUE - circleSnapshot.lastEntry().getKey(); final long range2 = index - Integer.MIN_VALUE; weights.put(partition, range1 + range2 + 1); } else { final long start = previousEntry.getKey(); final long end = entry.getKey(); if (weights.containsKey(partition)) { weights.put(partition, weights.get(partition) + (end - start)); } else { weights.put(partition, end - start); } } previousEntry = entry; } return weights; } @Override public String toString() { return getWeights().toString(); } /** A partition stored in the consistent hash map circle. */ private static final class Node<P> { private final String partitionId; private final P partition; private final int weight; private Node(final String partitionId, final P partition, final int weight) { this.partitionId = partitionId; this.partition = partition; this.weight = weight; } @Override public String toString() { return partitionId; } } } opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
@@ -1725,3 +1725,13 @@ ERR_UNKNOWN_REQUEST_TYPE=The following request has a unknown request type: %s WARN_DECODING_AFFINITY_CONTROL=An error occurred while decoding an affinity control: %s ERR_SASL_BIND_MULTI_STAGE=An error occurred during multi-stage authentication: '%s' DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_BIND=Unable to determine the partition ID from bind request DISTRIBUTION_UNRESOLVABLE_PARTITION_ID_FOR_EXT_OP=Unable to determine the partition ID from extended request DISTRIBUTION_DELETE_SPANS_MULTIPLE_PARTITIONS=The delete request spans multiple partitions DISTRIBUTION_MODDN_SPANS_MULTIPLE_PARTITIONS=The modify DN request spans multiple partitions DISTRIBUTION_UNSUPPORTED_SEARCH_SCOPE=The search scope %d is not supported by the distribution layer DISTRIBUTION_VLV_CONTROL_NOT_SUPPORTED=The VLV request control is not supported across multiple partitions DISTRIBUTION_SPR_CONTROL_NOT_SUPPORTED=The Simple Paged Results request control is not supported across multiple partitions DISTRIBUTION_SSS_CONTROL_NOT_SUPPORTED=The Server Side Sort request control is not supported across multiple partitions DISTRIBUTION_PSEARCH_CONTROL_NOT_SUPPORTED=The Persistent Search request control is not supported across multiple \ partitions, except if it requests changes only opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashDistributionLoadBalancerTest.java
New file @@ -0,0 +1,382 @@ /* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2016 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static org.assertj.core.api.Assertions.*; import static org.forgerock.opendj.ldap.Connections.newFixedSizeDistributionLoadBalancer; import static org.forgerock.opendj.ldap.Filter.alwaysTrue; import static org.forgerock.opendj.ldap.SearchScope.BASE_OBJECT; import static org.forgerock.opendj.ldap.SearchScope.SINGLE_LEVEL; import static org.forgerock.opendj.ldap.SearchScope.SUBORDINATES; import static org.forgerock.opendj.ldap.SearchScope.WHOLE_SUBTREE; import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory; import static org.forgerock.opendj.ldap.requests.Requests.*; import static org.forgerock.opendj.ldap.responses.Responses.newBindResult; import static org.forgerock.opendj.ldap.responses.Responses.newCompareResult; import static org.forgerock.opendj.ldap.responses.Responses.newResult; import static org.forgerock.opendj.ldap.spi.LdapPromises.newSuccessfulLdapPromise; import static org.forgerock.util.Options.defaultOptions; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; import org.forgerock.opendj.ldap.requests.AddRequest; import org.forgerock.opendj.ldap.requests.BindRequest; import org.forgerock.opendj.ldap.requests.CompareRequest; import org.forgerock.opendj.ldap.requests.DeleteRequest; import org.forgerock.opendj.ldap.requests.ExtendedRequest; import org.forgerock.opendj.ldap.requests.ModifyDNRequest; import org.forgerock.opendj.ldap.requests.ModifyRequest; import org.forgerock.opendj.ldap.requests.Requests; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.responses.BindResult; import org.forgerock.opendj.ldap.responses.CompareResult; import org.forgerock.opendj.ldap.responses.Result; import org.forgerock.util.Function; import org.forgerock.util.promise.NeverThrowsException; import org.mockito.Mock; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @SuppressWarnings("javadoc") public class ConsistentHashDistributionLoadBalancerTest extends SdkTestCase { private static final DN PARTITION_BASE_DN = DN.valueOf("ou=people,dc=example,dc=com"); private static final DN DN_1_BELOW_PARTITION_BASE_DN = PARTITION_BASE_DN.child("uid=bjensen"); private static final DN DN_2_BELOW_PARTITION_BASE_DN = DN_1_BELOW_PARTITION_BASE_DN.child("cn=prefs"); private static final DN DN_ABOVE_PARTITION_BASE_DN = PARTITION_BASE_DN.parent(); private static final DN UNPARTITIONED_DN = DN.valueOf("ou=groups,dc=example,dc=com"); private static final LdapPromise<Result> SUCCESS = newSuccessfulLdapPromise(newResult(ResultCode.SUCCESS)); private static final LdapPromise<BindResult> BIND_SUCCESS = newSuccessfulLdapPromise(newBindResult(ResultCode.SUCCESS)); private static final LdapPromise<CompareResult> COMPARE_SUCCESS = newSuccessfulLdapPromise(newCompareResult(ResultCode.SUCCESS)); private static final int P1_HASH = 0x00000000; private static final int P2_HASH = 0x80000000; @Mock private Connection partition1Conn; @Mock private Connection partition2Conn; @Mock private Function<Object, Integer, NeverThrowsException> hashFunction; private ConnectionFactory partition1; private ConnectionFactory partition2; private ConnectionFactory loadBalancer; @BeforeMethod public void beforeMethod() { initMocks(this); partition1 = mockConnectionFactory(partition1Conn); partition2 = mockConnectionFactory(partition2Conn); when(hashFunction.apply(any())).thenReturn(P1_HASH, P2_HASH); final ConsistentHashMap<ConnectionFactory> partitions = new ConsistentHashMap<>(hashFunction); partitions.put("P1", partition1, 1); partitions.put("P2", partition2, 1); loadBalancer = newFixedSizeDistributionLoadBalancer(PARTITION_BASE_DN, partitions, defaultOptions()); when(partition1Conn.addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition2Conn.addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition1Conn.bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class))).thenReturn(BIND_SUCCESS); when(partition2Conn.bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class))).thenReturn(BIND_SUCCESS); when(partition1Conn.compareAsync(any(CompareRequest.class), any(IntermediateResponseHandler.class))).thenReturn(COMPARE_SUCCESS); when(partition2Conn.compareAsync(any(CompareRequest.class), any(IntermediateResponseHandler.class))).thenReturn(COMPARE_SUCCESS); when(partition1Conn.deleteAsync(any(DeleteRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition2Conn.deleteAsync(any(DeleteRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition1Conn.extendedRequestAsync(any(ExtendedRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition2Conn.extendedRequestAsync(any(ExtendedRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition1Conn.modifyAsync(any(ModifyRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition2Conn.modifyAsync(any(ModifyRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition1Conn.modifyDNAsync(any(ModifyDNRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition2Conn.modifyDNAsync(any(ModifyDNRequest.class), any(IntermediateResponseHandler.class))).thenReturn(SUCCESS); when(partition1Conn.searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class))) .thenReturn(SUCCESS); when(partition2Conn.searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class))).thenReturn(SUCCESS); } @Test public void closeShouldCloseAllPartitions() throws Exception { loadBalancer.close(); verify(partition1).close(); verify(partition2).close(); } @Test public void getConnectionShouldReturnLogicalConnection() throws Exception { try (final Connection connection = loadBalancer.getConnection()) { assertThat(connection).isNotNull(); connection.close(); } verifyZeroInteractions(partition1, partition2); } @Test public void getConnectionAsyncShouldReturnLogicalConnection() throws Exception { try (final Connection connection = loadBalancer.getConnectionAsync().get()) { assertThat(connection).isNotNull(); connection.close(); } verifyZeroInteractions(partition1, partition2); } @DataProvider public Object[][] requests() { return new Object[][] { { P1_HASH, false, UNPARTITIONED_DN, UNPARTITIONED_DN }, { P2_HASH, true, UNPARTITIONED_DN, UNPARTITIONED_DN }, { P1_HASH, false, DN_ABOVE_PARTITION_BASE_DN, DN_ABOVE_PARTITION_BASE_DN }, { P2_HASH, true, DN_ABOVE_PARTITION_BASE_DN, DN_ABOVE_PARTITION_BASE_DN }, { P1_HASH, false, DN_1_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN }, { P2_HASH, true, DN_1_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN }, { P1_HASH, false, DN_2_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN }, { P2_HASH, true, DN_2_BELOW_PARTITION_BASE_DN, DN_1_BELOW_PARTITION_BASE_DN } }; } @Test(dataProvider = "requests") public void addShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN, final DN partitionDN) throws Exception { when(hashFunction.apply(any())).thenReturn(hash); try (final Connection connection = loadBalancer.getConnection()) { connection.addAsync(newAddRequest(requestDN)); } verify(hitPartition(isSecondPartition)).addAsync(any(AddRequest.class), any(IntermediateResponseHandler.class)); verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString()); verify(hitPartition(isSecondPartition)).close(); verifyZeroInteractions(missPartition(isSecondPartition)); } @Test(dataProvider = "requests") public void bindShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN, final DN partitionDN) throws Exception { when(hashFunction.apply(any())).thenReturn(hash); try (final Connection connection = loadBalancer.getConnection()) { connection.bindAsync(Requests.newSimpleBindRequest(requestDN.toString(), "password".toCharArray())); } verify(hitPartition(isSecondPartition)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class)); verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString()); verify(hitPartition(isSecondPartition)).close(); verifyZeroInteractions(missPartition(isSecondPartition)); } @Test(dataProvider = "requests") public void compareShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN, final DN partitionDN) throws Exception { when(hashFunction.apply(any())).thenReturn(hash); try (final Connection connection = loadBalancer.getConnection()) { connection.compareAsync(newCompareRequest(requestDN.toString(), "cn", "test")); } verify(hitPartition(isSecondPartition)).compareAsync(any(CompareRequest.class), any(IntermediateResponseHandler.class)); verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString()); verify(hitPartition(isSecondPartition)).close(); verifyZeroInteractions(missPartition(isSecondPartition)); } @Test(dataProvider = "requests") public void deleteShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN, final DN partitionDN) throws Exception { when(hashFunction.apply(any())).thenReturn(hash); try (final Connection connection = loadBalancer.getConnection()) { connection.deleteAsync(newDeleteRequest(requestDN)); } verify(hitPartition(isSecondPartition)).deleteAsync(any(DeleteRequest.class), any(IntermediateResponseHandler.class)); verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString()); verify(hitPartition(isSecondPartition)).close(); verifyZeroInteractions(missPartition(isSecondPartition)); } @Test(dataProvider = "requests") public void extendedShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN, final DN partitionDN) throws Exception { when(hashFunction.apply(any())).thenReturn(hash); try (final Connection connection = loadBalancer.getConnection()) { connection.extendedRequestAsync(newPasswordModifyExtendedRequest().setUserIdentity("dn:" + requestDN)); } verify(hitPartition(isSecondPartition)).extendedRequestAsync(any(ExtendedRequest.class), any(IntermediateResponseHandler.class)); verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString()); verify(hitPartition(isSecondPartition)).close(); verifyZeroInteractions(missPartition(isSecondPartition)); } @Test(dataProvider = "requests") public void modifyShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN, final DN partitionDN) throws Exception { when(hashFunction.apply(any())).thenReturn(hash); try (final Connection connection = loadBalancer.getConnection()) { connection.modifyAsync(newModifyRequest(requestDN)); } verify(hitPartition(isSecondPartition)).modifyAsync(any(ModifyRequest.class), any(IntermediateResponseHandler.class)); verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString()); verify(hitPartition(isSecondPartition)).close(); verifyZeroInteractions(missPartition(isSecondPartition)); } @Test(dataProvider = "requests") public void modifyDNShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN, final DN partitionDN) throws Exception { when(hashFunction.apply(any())).thenReturn(hash); try (final Connection connection = loadBalancer.getConnection()) { connection.modifyDNAsync(newModifyDNRequest(requestDN, RDN.valueOf("cn=changed"))); } verify(hitPartition(isSecondPartition)).modifyDNAsync(any(ModifyDNRequest.class), any(IntermediateResponseHandler.class)); verify(hashFunction, atLeastOnce()).apply(partitionDN.toNormalizedUrlSafeString()); verify(hitPartition(isSecondPartition)).close(); verifyZeroInteractions(missPartition(isSecondPartition)); } @Test(dataProvider = "requests") public void searchBaseShouldRouteCorrectly(final int hash, final boolean isSecondPartition, final DN requestDN, final DN partitionDN) throws Exception { when(hashFunction.apply(any())).thenReturn(hash); try (final Connection connection = loadBalancer.getConnection()) { connection.searchAsync(newSearchRequest(requestDN, BASE_OBJECT, alwaysTrue()), mock(SearchResultHandler.class)); } verify(hitPartition(isSecondPartition)).searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); verify(hashFunction).apply(partitionDN.toNormalizedUrlSafeString()); verify(hitPartition(isSecondPartition)).close(); verifyZeroInteractions(missPartition(isSecondPartition)); } @Test public void searchSingleLevelBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception { verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, SINGLE_LEVEL); } @Test public void searchSingleLevelAbovePartitionBaseDNShouldRouteToSinglePartition() throws Exception { verifySearchAgainstSinglePartition(DN_ABOVE_PARTITION_BASE_DN, SINGLE_LEVEL); } @Test public void searchSingleLevelAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception { verifySearchAgainstSinglePartition(UNPARTITIONED_DN, SINGLE_LEVEL); } @Test public void searchSingleLevelAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception { verifySearchAgainstAllPartitions(PARTITION_BASE_DN, SINGLE_LEVEL); } @Test public void searchSubtreeBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception { verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, WHOLE_SUBTREE); } @Test public void searchSubtreeAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception { verifySearchAgainstSinglePartition(UNPARTITIONED_DN, WHOLE_SUBTREE); } @Test public void searchSubtreeAbovePartitionBaseDNShouldRouteToAllPartitions() throws Exception { verifySearchAgainstAllPartitions(DN_ABOVE_PARTITION_BASE_DN, WHOLE_SUBTREE); } @Test public void searchSubtreeAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception { verifySearchAgainstAllPartitions(PARTITION_BASE_DN, WHOLE_SUBTREE); } @Test public void searchSubordinatesBelowPartitionBaseDNShouldRouteToSinglePartition() throws Exception { verifySearchAgainstSinglePartition(DN_1_BELOW_PARTITION_BASE_DN, SUBORDINATES); } @Test public void searchSubordinatesAdjacentToPartitionBaseDNShouldRouteToSinglePartition() throws Exception { verifySearchAgainstSinglePartition(UNPARTITIONED_DN, SUBORDINATES); } @Test public void searchSubordinatesAbovePartitionBaseDNShouldRouteToAllPartitions() throws Exception { verifySearchAgainstAllPartitions(DN_ABOVE_PARTITION_BASE_DN, SUBORDINATES); } @Test public void searchSubordinatesAtPartitionBaseDNShouldRouteToAllPartitions() throws Exception { verifySearchAgainstAllPartitions(PARTITION_BASE_DN, SUBORDINATES); } private void verifySearchAgainstSinglePartition(final DN dn, final SearchScope scope) throws Exception { when(hashFunction.apply(any())).thenReturn(P1_HASH); try (final Connection connection = loadBalancer.getConnection()) { connection.searchAsync(newSearchRequest(dn, scope, alwaysTrue()), mock(SearchResultHandler.class)); } verify(partition1Conn).searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); verify(hashFunction).apply(dn.toNormalizedUrlSafeString()); verify(partition1Conn).close(); verifyZeroInteractions(partition2Conn); } private void verifySearchAgainstAllPartitions(final DN dn, final SearchScope scope) throws Exception { when(hashFunction.apply(any())).thenReturn(P1_HASH); try (final Connection connection = loadBalancer.getConnection()) { connection.searchAsync(newSearchRequest(dn, scope, alwaysTrue()), mock(SearchResultHandler.class)); } verify(partition1Conn).searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); verify(partition2Conn).searchAsync(any(SearchRequest.class), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); verify(partition1Conn).close(); verify(partition2Conn).close(); } private Connection missPartition(final boolean isSecondPartition) { return isSecondPartition ? partition1Conn : partition2Conn; } private Connection hitPartition(final boolean isSecondPartition) { return isSecondPartition ? partition2Conn : partition1Conn; } } opendj-core/src/test/java/org/forgerock/opendj/ldap/ConsistentHashMapTest.java
New file @@ -0,0 +1,141 @@ /* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2016 ForgeRock AS. */ package org.forgerock.opendj.ldap; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Map; import org.forgerock.util.Function; import org.forgerock.util.promise.NeverThrowsException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @SuppressWarnings("javadoc") public class ConsistentHashMapTest extends SdkTestCase { private static final String P1 = "partition-1"; private static final String P2 = "partition-2"; private static final String P3 = "partition-3"; private static final String P4 = "partition-4"; @DataProvider public static Object[][] md5HashFunction() { // @Checkstyle:off return new Object[][] { { P1, ByteString.valueOfHex("94f940e6c2a31703d6be067b87d67968").toInt() }, { P2, ByteString.valueOfHex("1038f27a22f8b574670bfdd7f918ffb4").toInt() }, { P3, ByteString.valueOfHex("a88f746bfae24480be40df47c679228a").toInt() }, { P4, ByteString.valueOfHex("bdb89200d2a310f5afb345621f544183").toInt() }, }; // @Checkstyle:on } @Test(dataProvider = "md5HashFunction") public void testMD5HashFunction(final String s, final int expected) { assertThat(ConsistentHashMap.MD5.apply(s)).isEqualTo(expected); } @Test public void testConsistentHashMap() { // Create a hash function that returns predictable values. @SuppressWarnings({ "rawtypes", "unchecked" }) final Function<Object, Integer, NeverThrowsException> hashFunction = mock(Function.class); // Hashes for the 16 logical partitions. when(hashFunction.apply(any())).thenReturn(0x00000000, 0x40000000, 0x80000000, 0xC0000000, 0x10000000, 0x50000000, 0x90000000, 0xD0000000, 0x20000000, 0x60000000, 0xA0000000, 0xE0000000, 0x30000000, 0x70000000, 0xB0000000, 0xF0000000); final ConsistentHashMap<String> partitions = new ConsistentHashMap<>(hashFunction); partitions.put(P1, P1, 4); partitions.put(P2, P2, 4); partitions.put(P3, P3, 4); partitions.put(P4, P4, 4); // Check the structure of the CHM. assertThat(partitions.isEmpty()).isFalse(); assertThat(partitions.size()).isEqualTo(4); assertThat(partitions.getAll()).containsOnly(P1, P2, P3, P4); final Map<String, Long> weights = partitions.getWeights(); assertThat(weights.size()).isEqualTo(4); assertThat(weights).containsEntry(P1, 0x40000000L); assertThat(weights).containsEntry(P2, 0x40000000L); assertThat(weights).containsEntry(P3, 0x40000000L); assertThat(weights).containsEntry(P4, 0x40000000L); // Hashes for several key lookups. when(hashFunction.apply(any())).thenReturn(0x70000001, 0x7FFFFFFF, 0x80000000, // P1 0xF0000001, 0xFFFFFFFF, 0x00000000, // P1 0x00000001, 0x0FFFFFFF, 0x10000000, // P2 0xE0000001, 0xEFFFFFFF, 0xF0000000); // P4 assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P2)).isEqualTo(P2); assertThat(partitions.get(P2)).isEqualTo(P2); assertThat(partitions.get(P2)).isEqualTo(P2); assertThat(partitions.get(P4)).isEqualTo(P4); assertThat(partitions.get(P4)).isEqualTo(P4); assertThat(partitions.get(P4)).isEqualTo(P4); // Remove a partition and retry. when(hashFunction.apply(any())).thenReturn(0x10000000, 0x50000000, 0x90000000, 0xD0000000); partitions.remove(P2); // Check the structure of the CHM. assertThat(partitions.isEmpty()).isFalse(); assertThat(partitions.size()).isEqualTo(3); assertThat(partitions.getAll()).containsOnly(P1, P3, P4); final Map<String, Long> newWeights = partitions.getWeights(); assertThat(newWeights.size()).isEqualTo(3); assertThat(newWeights).containsEntry(P1, 0x40000000L); assertThat(newWeights).containsEntry(P3, 0x80000000L); // P2 now falls through to P3 assertThat(newWeights).containsEntry(P4, 0x40000000L); // Hashes for several key lookups. when(hashFunction.apply(any())).thenReturn(0x70000001, 0x7FFFFFFF, 0x80000000, // P1 0xF0000001, 0xFFFFFFFF, 0x00000000, // P1 0x00000001, 0x0FFFFFFF, 0x10000000, // P3 (was P2) 0xE0000001, 0xEFFFFFFF, 0xF0000000); // P4 assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P1)).isEqualTo(P1); assertThat(partitions.get(P2)).isEqualTo(P3); assertThat(partitions.get(P2)).isEqualTo(P3); assertThat(partitions.get(P2)).isEqualTo(P3); assertThat(partitions.get(P4)).isEqualTo(P4); assertThat(partitions.get(P4)).isEqualTo(P4); assertThat(partitions.get(P4)).isEqualTo(P4); } }