OPENDJ-2870 Add a least requests load balancer algorithm
- the default behavior can be overridden by providing
a new AffinityControl with a request, which
guarantees connection affinity based on a byte string value
2 files added
6 files modified
| New file |
| | |
| | | /* |
| | | * The contents of this file are subject to the terms of the Common Development and |
| | | * Distribution License (the License). You may not use this file except in compliance with the |
| | | * License. |
| | | * |
| | | * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the |
| | | * specific language governing permission and limitations under the License. |
| | | * |
| | | * When distributing Covered Software, include this CDDL Header Notice in each file and include |
| | | * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL |
| | | * Header, with the fields enclosed by brackets [] replaced by your own identifying |
| | | * information: "Portions Copyright [year] [name of copyright owner]". |
| | | * |
| | | * Copyright 2016 ForgeRock AS. |
| | | */ |
| | | package com.forgerock.opendj.ldap.controls; |
| | | |
| | | import static com.forgerock.opendj.ldap.CoreMessages.*; |
| | | |
| | | import java.util.concurrent.ThreadLocalRandom; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.DecodeException; |
| | | import org.forgerock.opendj.ldap.DecodeOptions; |
| | | import org.forgerock.opendj.ldap.controls.Control; |
| | | import org.forgerock.opendj.ldap.controls.ControlDecoder; |
| | | import org.forgerock.util.Reject; |
| | | |
| | | /** |
| | | * Control that provides a value for affinity. |
| | | * <p> |
| | | * As an example, this control can be used for connection affinity when using a load-balancer ({@link Connections}). |
| | | * |
| | | * @see Connections#newLeastRequestsLoadBalancer(Collection, Options) |
| | | */ |
| | | public final class AffinityControl implements Control { |
| | | |
| | | /** OID for this control. */ |
| | | public static final String OID = "1.3.6.1.4.1.36733.2.1.5.2"; |
| | | |
| | | /** A decoder which can be used for decoding the affinity control. */ |
| | | public static final ControlDecoder<AffinityControl> DECODER = |
| | | new ControlDecoder<AffinityControl>() { |
| | | |
| | | @Override |
| | | public AffinityControl decodeControl( |
| | | final Control control, final DecodeOptions options) throws DecodeException { |
| | | Reject.ifNull(control); |
| | | |
| | | if (control instanceof AffinityControl) { |
| | | return (AffinityControl) control; |
| | | } |
| | | |
| | | if (!control.getOID().equals(OID)) { |
| | | throw DecodeException.error(ERR_CONNECTION_AFFINITY_CONTROL_BAD_OID.get(control.getOID(), OID)); |
| | | } |
| | | |
| | | if (!control.hasValue()) { |
| | | // The control must always have a value. |
| | | throw DecodeException.error(ERR_CONNECTION_AFFINITY_CONTROL_DECODE_NULL.get()); |
| | | } |
| | | |
| | | return new AffinityControl(control.getValue(), control.isCritical()); |
| | | } |
| | | |
| | | @Override |
| | | public String getOID() { |
| | | return OID; |
| | | } |
| | | }; |
| | | |
| | | /** The affinity value is an arbitrary string. */ |
| | | private final ByteString affinityValue; |
| | | |
| | | /** Indicates if this control is critical. */ |
| | | private final boolean isCritical; |
| | | |
| | | /** |
| | | * Creates a new affinity control with provided value. |
| | | * |
| | | * @param affinityValue |
| | | * The affinity value to use |
| | | * @param isCritical |
| | | * Indicates if this control is critical |
| | | * @return The new control. |
| | | * @throws NullPointerException |
| | | * If {@code transactionId} was {@code null}. |
| | | */ |
| | | public static AffinityControl newControl(final ByteString affinityValue, final boolean isCritical) { |
| | | Reject.ifNull(affinityValue); |
| | | return new AffinityControl(affinityValue, isCritical); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new affinity control with a randomly generated affinity value. |
| | | * |
| | | * @param isCritical |
| | | * Indicates if this control is critical |
| | | * @return The new control. |
| | | * @throws NullPointerException |
| | | * If {@code transactionId} was {@code null}. |
| | | */ |
| | | public static AffinityControl newControl(final boolean isCritical) { |
| | | byte[] randomValue = new byte[5]; |
| | | ThreadLocalRandom.current().nextBytes(randomValue); |
| | | return new AffinityControl(ByteString.valueOfBytes(randomValue), isCritical); |
| | | } |
| | | |
| | | private AffinityControl(final ByteString affinityValue, final boolean isCritical) { |
| | | this.affinityValue = affinityValue; |
| | | this.isCritical = isCritical; |
| | | } |
| | | |
| | | /** |
| | | * Returns the affinity value. |
| | | * |
| | | * @return The affinity value. |
| | | */ |
| | | public ByteString getAffinityValue() { |
| | | return affinityValue; |
| | | } |
| | | |
| | | @Override |
| | | public String getOID() { |
| | | return OID; |
| | | } |
| | | |
| | | @Override |
| | | public ByteString getValue() { |
| | | return affinityValue; |
| | | } |
| | | |
| | | @Override |
| | | public boolean hasValue() { |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public boolean isCritical() { |
| | | return isCritical; |
| | | } |
| | | |
| | | @Override |
| | | public String toString() { |
| | | final StringBuilder builder = new StringBuilder(); |
| | | builder.append("AffinityControl(oid="); |
| | | builder.append(getOID()); |
| | | builder.append(", affinityValue(hex)="); |
| | | builder.append(affinityValue.toHexString()); |
| | | builder.append(", isCritical="); |
| | | builder.append(isCritical); |
| | | builder.append(")"); |
| | | return builder.toString(); |
| | | } |
| | | } |
| | |
| | | import java.util.concurrent.ThreadLocalRandom; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | import java.util.concurrent.atomic.AtomicLongArray; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex; |
| | | import org.forgerock.opendj.ldap.requests.AddRequest; |
| | | import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest; |
| | | import org.forgerock.opendj.ldap.requests.CompareRequest; |
| | |
| | | import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest; |
| | | import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest; |
| | | import org.forgerock.opendj.ldap.requests.Request; |
| | | import org.forgerock.opendj.ldap.requests.Requests; |
| | | import org.forgerock.opendj.ldap.requests.SearchRequest; |
| | | import org.forgerock.opendj.ldap.requests.SimpleBindRequest; |
| | | import org.forgerock.util.Function; |
| | | import org.forgerock.util.Option; |
| | | import org.forgerock.util.Options; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.annotations.VisibleForTesting; |
| | | import org.forgerock.util.promise.NeverThrowsException; |
| | | import org.forgerock.util.promise.Promise; |
| | | import org.forgerock.util.time.Duration; |
| | | |
| | | import com.forgerock.opendj.ldap.CoreMessages; |
| | | import com.forgerock.opendj.ldap.controls.AffinityControl; |
| | | |
| | | /** |
| | | * This class contains methods for creating and manipulating connection |
| | | * factories and connections. |
| | | */ |
| | | public final class Connections { |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | /** |
| | | * Specifies the interval between successive attempts to reconnect to offline load-balanced connection factories. |
| | | * The default configuration is to attempt to reconnect every second. |
| | |
| | | * @return The new round-robin load balancer. |
| | | * @see #newShardedRequestLoadBalancer(Collection, Options) |
| | | * @see #newFailoverLoadBalancer(Collection, Options) |
| | | * @see #newLeastRequestsLoadBalancer(Collection, Options) |
| | | * @see #LOAD_BALANCER_EVENT_LISTENER |
| | | * @see #LOAD_BALANCER_MONITORING_INTERVAL |
| | | * @see #LOAD_BALANCER_SCHEDULER |
| | |
| | | * @return The new fail-over load balancer. |
| | | * @see #newRoundRobinLoadBalancer(Collection, Options) |
| | | * @see #newShardedRequestLoadBalancer(Collection, Options) |
| | | * @see #newLeastRequestsLoadBalancer(Collection, Options) |
| | | * @see #LOAD_BALANCER_EVENT_LISTENER |
| | | * @see #LOAD_BALANCER_MONITORING_INTERVAL |
| | | * @see #LOAD_BALANCER_SCHEDULER |
| | |
| | | * @return The new affinity load balancer. |
| | | * @see #newRoundRobinLoadBalancer(Collection, Options) |
| | | * @see #newFailoverLoadBalancer(Collection, Options) |
| | | * @see #newLeastRequestsLoadBalancer(Collection, Options) |
| | | * @see #LOAD_BALANCER_EVENT_LISTENER |
| | | * @see #LOAD_BALANCER_MONITORING_INTERVAL |
| | | * @see #LOAD_BALANCER_SCHEDULER |
| | |
| | | return new RequestLoadBalancer("ShardedRequestLoadBalancer", |
| | | factories, |
| | | options, |
| | | newShardedRequestLoadBalancerFunction(factories)); |
| | | newShardedRequestLoadBalancerNextFunction(factories), |
| | | NOOP_END_OF_REQUEST_FUNCTION); |
| | | |
| | | } |
| | | |
| | | // Package private for testing. |
| | | static Function<Request, Integer, NeverThrowsException> newShardedRequestLoadBalancerFunction( |
| | | @VisibleForTesting |
| | | static Function<Request, RequestWithIndex, NeverThrowsException> newShardedRequestLoadBalancerNextFunction( |
| | | final Collection<? extends ConnectionFactory> factories) { |
| | | return new Function<Request, Integer, NeverThrowsException>() { |
| | | return new Function<Request, RequestWithIndex, NeverThrowsException>() { |
| | | private final int maxIndex = factories.size(); |
| | | |
| | | @Override |
| | | public Integer apply(final Request request) { |
| | | public RequestWithIndex apply(final Request request) { |
| | | // Normalize the hash to a valid factory index, taking care of negative hash values and especially |
| | | // Integer.MIN_VALUE (see doc for Math.abs()). |
| | | final int index = computeIndexBasedOnDnHashCode(request); |
| | | return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex); |
| | | return new RequestWithIndex(request, index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex)); |
| | | } |
| | | |
| | | private int computeIndexBasedOnDnHashCode(final Request request) { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new "least requests" load-balancer which will load-balance individual requests across the provided |
| | | * set of connection factories, each typically representing a single replica, using an algorithm that ensures that |
| | | * requests are routed to the replica which has the minimum number of active requests. |
| | | * <p> |
| | | * In other words, this load-balancer provides availability and partition tolerance, but sacrifices consistency. |
| | | * When a replica is not available, its number of active requests will not decrease until the requests time out, |
| | | * which will have the effect of directing requests to the other replicas. Consistency is low compared to the |
| | | * "sharded" load-balancer, because there is no guarantee that requests for the same DN are directed to the same |
| | | * replica. |
| | | * <p/> |
| | | * It is possible to increase consistency by providing a {@link AffinityControl} with a |
| | | * request. The control value will then be used to compute a hash that will determine the connection to use. In that |
| | | * case, the "least requests" behavior is completely overridden, i.e. the most saturated connection may be chosen |
| | | * depending on the hash value. |
| | | * <p/> |
| | | * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each |
| | | * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored |
| | | * since they cannot be routed; connection event listeners can be registered, but will only be notified when the |
| | | * fake connection is closed or when all of the connection factories are unavailable. |
| | | * <p/> |
| | | * <b>NOTE:</b>Server selection is only based on information which is local to the client application. If other |
| | | * applications are accessing the same servers then their additional load is not taken into account. Therefore, |
| | | * this load balancer is only effective if all client applications access the servers in a similar way. |
| | | * <p/> |
| | | * The implementation periodically attempts to connect to failed connection factories in order to determine if they |
| | | * have become available again. |
| | | * |
| | | * @param factories |
| | | * The connection factories. |
| | | * @param options |
| | | * This configuration options for the load-balancer. |
| | | * @return The new least requests load balancer. |
| | | * @see #newRoundRobinLoadBalancer(Collection, Options) |
| | | * @see #newFailoverLoadBalancer(Collection, Options) |
| | | * @see #newShardedRequestLoadBalancer(Collection, Options) |
| | | * @see #LOAD_BALANCER_EVENT_LISTENER |
| | | * @see #LOAD_BALANCER_MONITORING_INTERVAL |
| | | * @see #LOAD_BALANCER_SCHEDULER |
| | | */ |
| | | public static ConnectionFactory newLeastRequestsLoadBalancer( |
| | | final Collection<? extends ConnectionFactory> factories, final Options options) { |
| | | final LeastRequestsDispatcher dispatcher = new LeastRequestsDispatcher(factories.size()); |
| | | return new RequestLoadBalancer("SaturationBasedRequestLoadBalancer", factories, options, |
| | | newLeastRequestsLoadBalancerNextFunction(dispatcher), |
| | | newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher)); |
| | | } |
| | | |
| | | private static final DecodeOptions CONTROL_DECODE_OPTIONS = new DecodeOptions(); |
| | | |
| | | @VisibleForTesting |
| | | static Function<Request, RequestWithIndex, NeverThrowsException> newLeastRequestsLoadBalancerNextFunction( |
| | | final LeastRequestsDispatcher dispatcher) { |
| | | return new Function<Request, RequestWithIndex, NeverThrowsException>() { |
| | | private final int maxIndex = dispatcher.size(); |
| | | |
| | | @Override |
| | | public RequestWithIndex apply(final Request request) { |
| | | int affinityBasedIndex = parseAffinityRequestControl(request); |
| | | int finalIndex = dispatcher.selectServer(affinityBasedIndex); |
| | | Request cleanedRequest = (affinityBasedIndex == -1) |
| | | ? request : Requests.shallowCopyOfRequest(request, AffinityControl.OID); |
| | | return new RequestWithIndex(cleanedRequest, finalIndex); |
| | | } |
| | | |
| | | private int parseAffinityRequestControl(final Request request) { |
| | | try { |
| | | AffinityControl control = request.getControl(AffinityControl.DECODER, CONTROL_DECODE_OPTIONS); |
| | | if (control != null) { |
| | | int index = control.getAffinityValue().hashCode(); |
| | | return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex); |
| | | } |
| | | } catch (DecodeException e) { |
| | | logger.warn(CoreMessages.WARN_DECODING_AFFINITY_CONTROL.get(e.getMessage())); |
| | | } |
| | | return -1; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | @VisibleForTesting |
| | | static Function<Integer, Void, NeverThrowsException> newLeastRequestsLoadBalancerEndOfRequestFunction( |
| | | final LeastRequestsDispatcher dispatcher) { |
| | | return new Function<Integer, Void, NeverThrowsException>() { |
| | | @Override |
| | | public Void apply(final Integer index) { |
| | | dispatcher.terminatedRequest(index); |
| | | return null; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | /** No-op "end of request" function for the saturation-based request load balancer. */ |
| | | static final Function<Integer, Void, NeverThrowsException> NOOP_END_OF_REQUEST_FUNCTION = |
| | | new Function<Integer, Void, NeverThrowsException>() { |
| | | @Override |
| | | public Void apply(Integer index) { |
| | | return null; |
| | | } |
| | | }; |
| | | |
| | | /** |
| | | * Dispatch requests to the server index which has the least active requests. |
| | | * <p> |
| | | * A server is actually represented only by its index. Provided an initial number of servers, the requests are |
| | | * dispatched to the less saturated index, i.e. which corresponds to the server that has the lowest number of active |
| | | * requests. |
| | | */ |
| | | @VisibleForTesting |
| | | static class LeastRequestsDispatcher { |
| | | /** Counter for each server. */ |
| | | private final AtomicLongArray serversCounters; |
| | | |
| | | LeastRequestsDispatcher(int numberOfServers) { |
| | | serversCounters = new AtomicLongArray(numberOfServers); |
| | | } |
| | | |
| | | int size() { |
| | | return serversCounters.length(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the server index to use. |
| | | * |
| | | * @param forceIndex |
| | | * Forces a server index to use if different from -1. In that case, the default behavior of the |
| | | * dispatcher is overridden. If -1 is provided, then the default behavior of the dispatcher applies. |
| | | * @return the server index |
| | | */ |
| | | int selectServer(int forceIndex) { |
| | | int index = forceIndex == -1 ? getLessSaturatedIndex() : forceIndex; |
| | | serversCounters.incrementAndGet(index); |
| | | return index; |
| | | } |
| | | |
| | | /** |
| | | * Signals to this dispatcher that a request has been finished for the provided server index. |
| | | * |
| | | * @param index |
| | | * The index of server that processed the request. |
| | | */ |
| | | void terminatedRequest(Integer index) { |
| | | serversCounters.decrementAndGet(index); |
| | | } |
| | | |
| | | private int getLessSaturatedIndex() { |
| | | long min = Long.MAX_VALUE; |
| | | int minIndex = -1; |
| | | // Modifications during this loop are ok, effects on result is should not be dramatic |
| | | for (int i = 0; i < serversCounters.length(); i++) { |
| | | long count = serversCounters.get(i); |
| | | if (count < min) { |
| | | min = count; |
| | | minIndex = i; |
| | | } |
| | | } |
| | | return minIndex; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new connection factory which forwards connection requests to |
| | | * the provided factory, but whose {@code toString} method will always |
| | | * return {@code name}. |
| | |
| | | * A function which returns the index of the first connection factory which should be used in order to satisfy the |
| | | * next request. Implementations may base the decision on properties of the provided request, such as the target DN, |
| | | * whether the request is a read or update request, etc. |
| | | * Additionally a new request is returned with the index, because some modifications may be needed (removal of |
| | | * a control for example) but the original request should not be modified. The new request must be used |
| | | * for the actual LDAP operation. |
| | | */ |
| | | private final Function<Request, Integer, NeverThrowsException> nextFactoryFunction; |
| | | private final Function<Request, RequestWithIndex, NeverThrowsException> nextFactoryFunction; |
| | | /** A function which is called after a request is terminated. */ |
| | | private final Function<Integer, Void, NeverThrowsException> endOfRequestFunction; |
| | | |
| | | RequestLoadBalancer(final String loadBalancerName, |
| | | final Collection<? extends ConnectionFactory> factories, |
| | | final Options options, |
| | | final Function<Request, Integer, NeverThrowsException> nextFactoryFunction) { |
| | | final Function<Request, RequestWithIndex, NeverThrowsException> nextFactoryFunction, |
| | | final Function<Integer, Void, NeverThrowsException> endOfRequestFunction) { |
| | | super(loadBalancerName, factories, options); |
| | | this.nextFactoryFunction = nextFactoryFunction; |
| | | this.endOfRequestFunction = endOfRequestFunction; |
| | | } |
| | | |
| | | @Override |
| | |
| | | @Override |
| | | public LdapPromise<Result> addAsync( |
| | | final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler) { |
| | | return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() { |
| | | final ConnectionContext connectionContext = getConnection(request); |
| | | return executeRequest(connectionContext, |
| | | new AsyncFunction<Connection, Result, LdapException>() { |
| | | @Override |
| | | public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { |
| | | return connection.addAsync(request, intermediateResponseHandler); |
| | | return connection.addAsync((AddRequest) connectionContext.getRequest(), |
| | | intermediateResponseHandler); |
| | | } |
| | | }); |
| | | } |
| | |
| | | @Override |
| | | public LdapPromise<BindResult> bindAsync( |
| | | final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler) { |
| | | return getConnectionAndSendRequest(request, new AsyncFunction<Connection, BindResult, LdapException>() { |
| | | final ConnectionContext connectionContext = getConnection(request); |
| | | return executeRequest(connectionContext, new AsyncFunction<Connection, BindResult, LdapException>() { |
| | | @Override |
| | | public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException { |
| | | return connection.bindAsync(request, intermediateResponseHandler); |
| | | return connection.bindAsync((BindRequest) connectionContext.getRequest(), |
| | | intermediateResponseHandler); |
| | | } |
| | | }); |
| | | } |
| | |
| | | @Override |
| | | public LdapPromise<CompareResult> compareAsync( |
| | | final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler) { |
| | | return getConnectionAndSendRequest(request, new AsyncFunction<Connection, CompareResult, LdapException>() { |
| | | final ConnectionContext connectionContext = getConnection(request); |
| | | return executeRequest(connectionContext, |
| | | new AsyncFunction<Connection, CompareResult, LdapException>() { |
| | | @Override |
| | | public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException { |
| | | return connection.compareAsync(request, intermediateResponseHandler); |
| | | public Promise<CompareResult, LdapException> apply(final Connection connection) |
| | | throws LdapException { |
| | | return connection.compareAsync((CompareRequest) connectionContext.getRequest(), |
| | | intermediateResponseHandler); |
| | | } |
| | | }); |
| | | } |
| | |
| | | @Override |
| | | public LdapPromise<Result> deleteAsync( |
| | | final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler) { |
| | | return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() { |
| | | final ConnectionContext connectionContext = getConnection(request); |
| | | return executeRequest(connectionContext, |
| | | new AsyncFunction<Connection, Result, LdapException>() { |
| | | @Override |
| | | public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { |
| | | return connection.deleteAsync(request, intermediateResponseHandler); |
| | | return connection.deleteAsync((DeleteRequest) connectionContext.getRequest(), |
| | | intermediateResponseHandler); |
| | | } |
| | | }); |
| | | } |
| | |
| | | @Override |
| | | public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync( |
| | | final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) { |
| | | return getConnectionAndSendRequest(request, new AsyncFunction<Connection, R, LdapException>() { |
| | | final ConnectionContext connectionContext = getConnection(request); |
| | | return executeRequest(connectionContext, |
| | | new AsyncFunction<Connection, R, LdapException>() { |
| | | @Override |
| | | public Promise<R, LdapException> apply(final Connection connection) throws LdapException { |
| | | return connection.extendedRequestAsync(request, intermediateResponseHandler); |
| | | return connection.extendedRequestAsync((ExtendedRequest<R>) connectionContext.getRequest(), |
| | | intermediateResponseHandler); |
| | | } |
| | | }); |
| | | } |
| | |
| | | @Override |
| | | public LdapPromise<Result> modifyAsync( |
| | | final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler) { |
| | | return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() { |
| | | final ConnectionContext connectionContext = getConnection(request); |
| | | return executeRequest(connectionContext, |
| | | new AsyncFunction<Connection, Result, LdapException>() { |
| | | @Override |
| | | public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { |
| | | return connection.modifyAsync(request, intermediateResponseHandler); |
| | | return connection.modifyAsync((ModifyRequest) connectionContext.getRequest(), |
| | | intermediateResponseHandler); |
| | | } |
| | | }); |
| | | } |
| | |
| | | @Override |
| | | public LdapPromise<Result> modifyDNAsync( |
| | | final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler) { |
| | | return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() { |
| | | final ConnectionContext connectionContext = getConnection(request); |
| | | return executeRequest(connectionContext, |
| | | new AsyncFunction<Connection, Result, LdapException>() { |
| | | @Override |
| | | public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { |
| | | return connection.modifyDNAsync(request, intermediateResponseHandler); |
| | | return connection.modifyDNAsync((ModifyDNRequest) connectionContext.getRequest(), |
| | | intermediateResponseHandler); |
| | | } |
| | | }); |
| | | } |
| | |
| | | final SearchRequest request, |
| | | final IntermediateResponseHandler intermediateResponseHandler, |
| | | final SearchResultHandler entryHandler) { |
| | | return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() { |
| | | final ConnectionContext connectionContext = getConnection(request); |
| | | return executeRequest(connectionContext, |
| | | new AsyncFunction<Connection, Result, LdapException>() { |
| | | @Override |
| | | public Promise<Result, LdapException> apply(final Connection connection) throws LdapException { |
| | | return connection.searchAsync(request, intermediateResponseHandler, entryHandler); |
| | | return connection.searchAsync((SearchRequest) connectionContext.getRequest(), |
| | | intermediateResponseHandler, |
| | | entryHandler); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | private <R> LdapPromise<R> getConnectionAndSendRequest( |
| | | final Request request, final AsyncFunction<Connection, R, LdapException> sendRequest) { |
| | | private ConnectionContext getConnection(final Request request) { |
| | | if (state.isClosed()) { |
| | | throw new IllegalStateException(); |
| | | } |
| | | final AtomicReference<Connection> connectionHolder = new AtomicReference<>(); |
| | | return getConnectionAsync(request) |
| | | .thenOnResult(new ResultHandler<Connection>() { |
| | | @Override |
| | | public void handleResult(final Connection connection) { |
| | | connectionHolder.set(connection); |
| | | } |
| | | }) |
| | | .thenAsync(sendRequest) |
| | | .thenFinally(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | closeSilently(connectionHolder.get()); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | private LdapPromise<Connection> getConnectionAsync(final Request request) { |
| | | try { |
| | | final int index = nextFactoryFunction.apply(request); |
| | | final ConnectionFactory factory = getMonitoredConnectionFactory(index); |
| | | return LdapPromises.asPromise(factory.getConnectionAsync() |
| | | final RequestWithIndex requestWithIndex = nextFactoryFunction.apply(request); |
| | | final ConnectionFactory factory = getMonitoredConnectionFactory(requestWithIndex.getServerIndex()); |
| | | return new ConnectionContext( |
| | | LdapPromises.asPromise(factory.getConnectionAsync() |
| | | .thenOnException(new ExceptionHandler<LdapException>() { |
| | | @Override |
| | | public void handleException(final LdapException e) { |
| | | state.notifyConnectionError(false, e); |
| | | } |
| | | })); |
| | | })), |
| | | requestWithIndex); |
| | | } catch (final LdapException e) { |
| | | state.notifyConnectionError(false, e); |
| | | return newFailedLdapPromise(e); |
| | | LdapPromise<Connection> failedLdapPromise = newFailedLdapPromise(e); |
| | | return new ConnectionContext(failedLdapPromise, new RequestWithIndex(request, -1)); |
| | | } |
| | | } |
| | | |
| | | private <R> LdapPromise<R> executeRequest(final ConnectionContext connectionContext, |
| | | final AsyncFunction<Connection, R, LdapException> requestSender) { |
| | | return connectionContext.getConnectionPromise() |
| | | .thenOnResult(new ResultHandler<Connection>() { |
| | | @Override |
| | | public void handleResult(final Connection connection) { |
| | | connectionContext.setConnection(connection); |
| | | } |
| | | }) |
| | | .thenAsync(requestSender) |
| | | .thenFinally(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | closeSilently(connectionContext.getConnection()); |
| | | endOfRequestFunction.apply(connectionContext.getServerIndex()); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | |
| | | /** Utility class for a request and a server index. */ |
| | | static class RequestWithIndex { |
| | | private final Request request; |
| | | /** The index of server chosen for the connection. */ |
| | | private final int serverIndex; |
| | | |
| | | RequestWithIndex(Request request, int serverIndex) { |
| | | this.serverIndex = serverIndex; |
| | | this.request = request; |
| | | } |
| | | |
| | | Request getRequest() { |
| | | return request; |
| | | } |
| | | |
| | | int getServerIndex() { |
| | | return serverIndex; |
| | | } |
| | | } |
| | | |
| | | /** Utility class to hold together parameters for a request and the connection used to perform it. */ |
| | | private static class ConnectionContext { |
| | | private final AtomicReference<Connection> connectionHolder = new AtomicReference<>(); |
| | | private final LdapPromise<Connection> connectionPromise; |
| | | private final RequestWithIndex requestWithIndex; |
| | | |
| | | ConnectionContext(LdapPromise<Connection> connectionPromise, RequestWithIndex requestWithIndex) { |
| | | this.requestWithIndex = requestWithIndex; |
| | | this.connectionPromise = connectionPromise; |
| | | } |
| | | |
| | | |
| | | Connection getConnection() { |
| | | return connectionHolder.get(); |
| | | } |
| | | |
| | | void setConnection(Connection connection) { |
| | | connectionHolder.set(connection); |
| | | } |
| | | |
| | | LdapPromise<Connection> getConnectionPromise() { |
| | | return connectionPromise; |
| | | } |
| | | |
| | | int getServerIndex() { |
| | | return requestWithIndex.getServerIndex(); |
| | | } |
| | | |
| | | Request getRequest() { |
| | | return requestWithIndex.getRequest(); |
| | | } |
| | | } |
| | | } |
| | |
| | | * information: "Portions Copyright [year] [name of copyright owner]". |
| | | * |
| | | * Copyright 2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2015 ForgeRock AS. |
| | | * Portions copyright 2011-2016 ForgeRock AS. |
| | | */ |
| | | |
| | | package org.forgerock.opendj.ldap.requests; |
| | | |
| | | import static com.forgerock.opendj.util.StaticUtils.EMPTY_BYTES; |
| | | import static com.forgerock.opendj.util.StaticUtils.getBytes; |
| | | |
| | | import java.util.Arrays; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | |
| | | import static com.forgerock.opendj.ldap.CoreMessages.WARN_READ_LDIF_RECORD_CHANGE_RECORD_WRONG_TYPE; |
| | | |
| | | import javax.net.ssl.SSLContext; |
| | |
| | | import org.forgerock.opendj.ldap.Entry; |
| | | import org.forgerock.opendj.ldap.Filter; |
| | | import org.forgerock.opendj.ldap.LinkedHashMapEntry; |
| | | import org.forgerock.opendj.ldap.Modification; |
| | | import org.forgerock.opendj.ldap.ModificationType; |
| | | import org.forgerock.opendj.ldap.RDN; |
| | | import org.forgerock.opendj.ldap.SearchScope; |
| | | import org.forgerock.opendj.ldap.controls.Control; |
| | | import org.forgerock.opendj.ldif.ChangeRecord; |
| | | import org.forgerock.opendj.ldif.LDIFChangeRecordReader; |
| | | import org.forgerock.util.Reject; |
| | | |
| | | import com.forgerock.opendj.ldap.CoreMessages; |
| | | |
| | | /** |
| | | * This class contains various methods for creating and manipulating requests. |
| | | * <p> |
| | |
| | | // TODO: synchronized requests? |
| | | |
| | | /** |
| | | * Creates a new request that is a shallow copy of the provided request, except for |
| | | * controls list which is a new list containing the original controls (and not the original list |
| | | * of controls) possibly filtered by the provided exclusion parameter. |
| | | * <p> |
| | | * The intended usage is to be able to perform modification of the controls of a request without |
| | | * affecting the original request. |
| | | * |
| | | * @param request |
| | | * the original request |
| | | * @param excludeControlOids |
| | | * OIDs of controls to exclude from the new request |
| | | * @return the new request |
| | | */ |
| | | public static Request shallowCopyOfRequest(Request request, String... excludeControlOids) { |
| | | List<String> exclusions = Arrays.asList(excludeControlOids); |
| | | if (request instanceof AbandonRequest) { |
| | | AbandonRequest req = copyOfAbandonRequest((AbandonRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof AddRequest) { |
| | | AddRequest req = (AddRequest) request; |
| | | AddRequest newReq = newAddRequest(new LinkedHashMapEntry(req)); |
| | | return copyControls(req, newReq, exclusions); |
| | | } else if (request instanceof AnonymousSASLBindRequest) { |
| | | AnonymousSASLBindRequest req = copyOfAnonymousSASLBindRequest((AnonymousSASLBindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof CancelExtendedRequest) { |
| | | CancelExtendedRequest req = copyOfCancelExtendedRequest((CancelExtendedRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof CompareRequest) { |
| | | CompareRequest req = copyOfCompareRequest((CompareRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof CRAMMD5SASLBindRequest) { |
| | | CRAMMD5SASLBindRequest req = copyOfCRAMMD5SASLBindRequest((CRAMMD5SASLBindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof DeleteRequest) { |
| | | DeleteRequest req = copyOfDeleteRequest((DeleteRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof DigestMD5SASLBindRequest) { |
| | | DigestMD5SASLBindRequest req = copyOfDigestMD5SASLBindRequest((DigestMD5SASLBindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof ExternalSASLBindRequest) { |
| | | ExternalSASLBindRequest req = copyOfExternalSASLBindRequest((ExternalSASLBindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof GenericBindRequest) { |
| | | GenericBindRequest req = copyOfGenericBindRequest((GenericBindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof GenericExtendedRequest) { |
| | | GenericExtendedRequest req = copyOfGenericExtendedRequest((GenericExtendedRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof GSSAPISASLBindRequest) { |
| | | GSSAPISASLBindRequest req = copyOfGSSAPISASLBindRequest((GSSAPISASLBindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof ModifyDNRequest) { |
| | | ModifyDNRequest req = copyOfModifyDNRequest((ModifyDNRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof ModifyRequest) { |
| | | ModifyRequest req = (ModifyRequest) request; |
| | | ModifyRequest newReq = newModifyRequest(req.getName()); |
| | | for (Modification mod : req.getModifications()) { |
| | | newReq.addModification(mod); |
| | | } |
| | | return copyControls(req, newReq, exclusions); |
| | | } else if (request instanceof PasswordModifyExtendedRequest) { |
| | | PasswordModifyExtendedRequest req = |
| | | copyOfPasswordModifyExtendedRequest((PasswordModifyExtendedRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof PlainSASLBindRequest) { |
| | | PlainSASLBindRequest req = copyOfPlainSASLBindRequest((PlainSASLBindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof SearchRequest) { |
| | | SearchRequest req = copyOfSearchRequest((SearchRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof SimpleBindRequest) { |
| | | SimpleBindRequest req = copyOfSimpleBindRequest((SimpleBindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof StartTLSExtendedRequest) { |
| | | StartTLSExtendedRequest req = copyOfStartTLSExtendedRequest((StartTLSExtendedRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof UnbindRequest) { |
| | | UnbindRequest req = copyOfUnbindRequest((UnbindRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else if (request instanceof WhoAmIExtendedRequest) { |
| | | WhoAmIExtendedRequest req = copyOfWhoAmIExtendedRequest((WhoAmIExtendedRequest) request); |
| | | filterControls(req.getControls(), exclusions); |
| | | return req; |
| | | } else { |
| | | throw new LocalizedIllegalArgumentException(CoreMessages.ERR_UNKNOWN_REQUEST_TYPE.get(request)); |
| | | } |
| | | } |
| | | |
| | | private static Request copyControls(Request source, Request destination, List<String> excludeControlOids) { |
| | | for (final Control control : source.getControls()) { |
| | | if (!excludeControlOids.contains(control.getOID())) { |
| | | destination.addControl(control); |
| | | } |
| | | } |
| | | return destination; |
| | | } |
| | | |
| | | private static void filterControls(List<Control> controls, List<String> excludedControlOids) { |
| | | for (Iterator<Control> it = controls.iterator(); it.hasNext();) { |
| | | Control control = it.next(); |
| | | if (excludedControlOids.contains(control.getOID())) { |
| | | it.remove(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new abandon request that is an exact copy of the provided |
| | | * request. |
| | | * |
| | |
| | | does not match the host name '%s' |
| | | ERR_ATTRIBUTE_PARSER_MISSING_ATTRIBUTE=The entry could not be parsed because the '%s' is missing |
| | | ERR_CONNECTION_UNEXPECTED=An error occurred during establishment of a connection: %s |
| | | ERR_CONNECTION_AFFINITY_CONTROL_BAD_OID=Cannot decode the provided \ |
| | | control as a connection affinity control because it contained the OID '%s', \ |
| | | when '%s' was expected |
| | | ERR_CONNECTION_AFFINITY_CONTROL_DECODE_NULL=Cannot decode the provided ASN.1 \ |
| | | element as a connection affinity control because it did not have a value, when \ |
| | | a value must always be provided |
| | | ERR_UNKNOWN_REQUEST_TYPE=The following request has a unknown request type: %s |
| | | WARN_DECODING_AFFINITY_CONTROL=An error occurred while decoding an affinity control: %s |
| New file |
| | |
| | | /* |
| | | * The contents of this file are subject to the terms of the Common Development and |
| | | * Distribution License (the License). You may not use this file except in compliance with the |
| | | * License. |
| | | * |
| | | * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the |
| | | * specific language governing permission and limitations under the License. |
| | | * |
| | | * When distributing Covered Software, include this CDDL Header Notice in each file and include |
| | | * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL |
| | | * Header, with the fields enclosed by brackets [] replaced by your own identifying |
| | | * information: "Portions Copyright [year] [name of copyright owner]". |
| | | * |
| | | * Copyright 2016 ForgeRock AS. |
| | | */ |
| | | package com.forgerock.opendj.ldap.controls; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.Connection; |
| | | import org.forgerock.opendj.ldap.DN; |
| | | import org.forgerock.opendj.ldap.Filter; |
| | | import org.forgerock.opendj.ldap.SearchScope; |
| | | import org.forgerock.opendj.ldap.TestCaseUtils; |
| | | import org.forgerock.opendj.ldap.controls.ControlsTestCase; |
| | | import org.forgerock.opendj.ldap.requests.Requests; |
| | | import org.forgerock.opendj.ldap.requests.SearchRequest; |
| | | import org.forgerock.opendj.ldap.responses.SearchResultEntry; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.testng.Assert.*; |
| | | import static org.assertj.core.api.Assertions.assertThat; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class AffinityControlTestCase extends ControlsTestCase { |
| | | |
| | | @Test |
| | | public void testControl() throws Exception { |
| | | // Send this control with a search request to ensure it is encoded/decoded correctly |
| | | final SearchRequest req = Requests.newSearchRequest(DN.valueOf("uid=user.1,ou=people,o=test"), |
| | | SearchScope.BASE_OBJECT, Filter.objectClassPresent()); |
| | | final AffinityControl control = AffinityControl.newControl(ByteString.valueOfUtf8("value"), false); |
| | | req.addControl(control); |
| | | try (Connection con = TestCaseUtils.getInternalConnection()) { |
| | | final List<SearchResultEntry> entries = new ArrayList<>(); |
| | | con.search(req, entries); |
| | | assertThat(entries).hasAtLeastOneElementOfType(SearchResultEntry.class); |
| | | final SearchResultEntry entry = entries.get(0); |
| | | assertThat(entry.getControls()).hasSize(0); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testControlGeneratesRandomValue() throws Exception { |
| | | final AffinityControl control = AffinityControl.newControl(true); |
| | | assertTrue(control.isCritical()); |
| | | ByteString value1 = control.getAffinityValue(); |
| | | assertThat(value1).isNotNull(); |
| | | assertThat(value1.length()).isNotZero(); |
| | | |
| | | final AffinityControl control2 = AffinityControl.newControl(false); |
| | | ByteString value2 = control2.getAffinityValue(); |
| | | assertFalse(control2.isCritical()); |
| | | assertThat(value2).isNotNull(); |
| | | assertThat(value2.length()).isNotZero(); |
| | | |
| | | assertThat(value1).isNotEqualTo(value2); |
| | | } |
| | | } |
| | |
| | | |
| | | import static java.util.Arrays.asList; |
| | | import static org.assertj.core.api.Assertions.assertThat; |
| | | import static org.forgerock.opendj.ldap.Connections.newShardedRequestLoadBalancerFunction; |
| | | import static org.forgerock.opendj.ldap.Connections.*; |
| | | import static org.mockito.Mockito.mock; |
| | | import static org.mockito.Mockito.verify; |
| | | import static org.mockito.Mockito.verifyZeroInteractions; |
| | | import static org.mockito.Mockito.when; |
| | | |
| | | import org.forgerock.opendj.ldap.Connections.LeastRequestsDispatcher; |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex; |
| | | import org.forgerock.opendj.ldap.requests.AddRequest; |
| | | import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest; |
| | | import org.forgerock.opendj.ldap.requests.CompareRequest; |
| | |
| | | import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest; |
| | | import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest; |
| | | import org.forgerock.opendj.ldap.requests.Request; |
| | | import org.forgerock.opendj.ldap.requests.Requests; |
| | | import org.forgerock.opendj.ldap.requests.SearchRequest; |
| | | import org.forgerock.opendj.ldap.requests.SimpleBindRequest; |
| | | import org.forgerock.util.Function; |
| | | import org.forgerock.util.promise.NeverThrowsException; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import com.forgerock.opendj.ldap.controls.AffinityControl; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class ConnectionsTestCase extends SdkTestCase { |
| | | |
| | |
| | | |
| | | @Test |
| | | public void shardedRequestLoadBalancerUsesConsistentIndexing() { |
| | | final Function<Request, Integer, NeverThrowsException> f = |
| | | newShardedRequestLoadBalancerFunction(asList(mock(ConnectionFactory.class), |
| | | final Function<Request, RequestWithIndex, NeverThrowsException> f = |
| | | newShardedRequestLoadBalancerNextFunction(asList(mock(ConnectionFactory.class), |
| | | mock(ConnectionFactory.class))); |
| | | |
| | | // These two DNs have a different hash code. |
| | |
| | | assertThat(index(f, genericExtendedRequest)).isBetween(0, 1); |
| | | } |
| | | |
| | | private void assertRequestsAreRoutedConsistently(final Function<Request, Integer, NeverThrowsException> f, |
| | | @Test |
| | | public void leastRequestsDispatcherMustChooseTheLessSaturatedServer() { |
| | | LeastRequestsDispatcher dispatcher = new Connections.LeastRequestsDispatcher(3); |
| | | Function<Request, RequestWithIndex, NeverThrowsException> next = |
| | | newLeastRequestsLoadBalancerNextFunction(dispatcher); |
| | | Function<Integer, Void, NeverThrowsException> end = |
| | | newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher); |
| | | |
| | | final SearchRequest[] reqs = new SearchRequest[11]; |
| | | for (int i = 0; i < reqs.length; i++) { |
| | | reqs[i] = mock(SearchRequest.class); |
| | | } |
| | | assertThat(next.apply(reqs[0]).getServerIndex()).isEqualTo(0); // number of reqs = [1, 0, 0] |
| | | assertThat(next.apply(reqs[1]).getServerIndex()).isEqualTo(1); // number of reqs = [1, 1, 0] |
| | | assertThat(next.apply(reqs[2]).getServerIndex()).isEqualTo(2); // number of reqs = [1, 1, 1] |
| | | end.apply(1); // number of reqs = [1, 0, 1] |
| | | assertThat(next.apply(reqs[3]).getServerIndex()).isEqualTo(1); // number of reqs = [1, 1, 1] |
| | | end.apply(1); // number of reqs = [1, 0, 1] |
| | | assertThat(next.apply(reqs[5]).getServerIndex()).isEqualTo(1); // number of reqs = [1, 1, 1] |
| | | assertThat(next.apply(reqs[6]).getServerIndex()).isEqualTo(0); // number of reqs = [2, 1, 1] |
| | | assertThat(next.apply(reqs[7]).getServerIndex()).isEqualTo(1); // number of reqs = [2, 2, 1] |
| | | assertThat(next.apply(reqs[8]).getServerIndex()).isEqualTo(2); // number of reqs = [2, 2, 2] |
| | | assertThat(next.apply(reqs[9]).getServerIndex()).isEqualTo(0); // number of reqs = [3, 2, 2] |
| | | end.apply(2); // number of reqs = [3, 2, 1] |
| | | assertThat(next.apply(reqs[10]).getServerIndex()).isEqualTo(2); // number of reqs = [3, 2, 2] |
| | | } |
| | | |
| | | @Test |
| | | public void leastRequestsDispatcherMustTakeConnectionAffinityControlIntoAccount() { |
| | | LeastRequestsDispatcher dispatcher = new Connections.LeastRequestsDispatcher(3); |
| | | Function<Request, RequestWithIndex, NeverThrowsException> next = |
| | | newLeastRequestsLoadBalancerNextFunction(dispatcher); |
| | | |
| | | final Request[] reqs = new Request[11]; |
| | | for (int i = 0; i < reqs.length; i++) { |
| | | reqs[i] = Requests.newDeleteRequest("o=example"); |
| | | } |
| | | assertThat(next.apply(reqs[0]).getServerIndex()).isEqualTo(0); // number of reqs = [1, 0, 0] |
| | | |
| | | // control should now force the same connection three times |
| | | // control should be removed from the RequestWithIndex object used to perform the actual query |
| | | for (int i = 1; i <= 3; i++) { |
| | | reqs[i].addControl(AffinityControl.newControl(ByteString.valueOfUtf8("val"), false)); |
| | | } |
| | | RequestWithIndex req1 = next.apply(reqs[1]); |
| | | assertThat(req1.getServerIndex()).isEqualTo(0); // number of reqs = [2, 0, 0] |
| | | assertThat(req1.getRequest().getControls()).isEmpty(); |
| | | assertThat(reqs[1].getControls()).hasSize(1); |
| | | |
| | | RequestWithIndex req2 = next.apply(reqs[2]); |
| | | assertThat(req2.getServerIndex()).isEqualTo(0); // number of reqs = [3, 0, 0] |
| | | assertThat(req2.getRequest().getControls()).isEmpty(); |
| | | assertThat(reqs[2].getControls()).hasSize(1); |
| | | |
| | | RequestWithIndex req3 = next.apply(reqs[3]); |
| | | assertThat(req3.getServerIndex()).isEqualTo(0); // number of reqs = [4, 0, 0] |
| | | assertThat(req3.getRequest().getControls()).isEmpty(); |
| | | assertThat(reqs[3].getControls()).hasSize(1); |
| | | |
| | | // back to default "saturation-based" behavior |
| | | assertThat(next.apply(reqs[4]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 1, 0] |
| | | assertThat(next.apply(reqs[5]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 1, 1] |
| | | assertThat(next.apply(reqs[6]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 2, 1] |
| | | assertThat(next.apply(reqs[7]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 2, 2] |
| | | assertThat(next.apply(reqs[8]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 3, 2] |
| | | assertThat(next.apply(reqs[9]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 3, 3] |
| | | } |
| | | |
| | | private void assertRequestsAreRoutedConsistently(final Function<Request, RequestWithIndex, NeverThrowsException> f, |
| | | final Request r, |
| | | final int firstExpectedIndex, |
| | | final int secondExpectedIndex) { |
| | |
| | | assertThat(index(f, r)).isEqualTo(secondExpectedIndex); |
| | | } |
| | | |
| | | private int index(final Function<Request, Integer, NeverThrowsException> function, final Request request) { |
| | | return function.apply(request); |
| | | private int index(final Function<Request, RequestWithIndex, NeverThrowsException> function, final Request request) { |
| | | return function.apply(request).getServerIndex(); |
| | | } |
| | | } |
| | |
| | | import java.util.ArrayList; |
| | | import java.util.logging.Level; |
| | | |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex; |
| | | import org.forgerock.opendj.ldap.requests.AbandonRequest; |
| | | import org.forgerock.opendj.ldap.requests.AddRequest; |
| | | import org.forgerock.opendj.ldap.requests.BindRequest; |
| | |
| | | stub(this.connection3); |
| | | loadBalancer = new RequestLoadBalancer("Test", |
| | | asList(factory1, factory2, factory3), |
| | | defaultOptions(), newNextFactoryFunction()); |
| | | defaultOptions(), newNextFactoryFunction(), |
| | | Connections.NOOP_END_OF_REQUEST_FUNCTION); |
| | | } |
| | | |
| | | private Function<Request, Integer, NeverThrowsException> newNextFactoryFunction() { |
| | | return new Function<Request, Integer, NeverThrowsException>() { |
| | | private Function<Request, RequestWithIndex, NeverThrowsException> newNextFactoryFunction() { |
| | | return new Function<Request, RequestWithIndex, NeverThrowsException>() { |
| | | @Override |
| | | public Integer apply(final Request request) { |
| | | public RequestWithIndex apply(final Request request) { |
| | | if (request == addRequest1 || request == bindRequest1 || request == compareRequest1 |
| | | || request == deleteRequest1 || request == extendedRequest1 || request == modifyRequest1 |
| | | || request == modifyDNRequest1 || request == searchRequest1) { |
| | | return 0; |
| | | return new RequestWithIndex(request, 0); |
| | | } |
| | | |
| | | if (request == addRequest2 || request == bindRequest2 || request == compareRequest2 |
| | | || request == deleteRequest2 || request == extendedRequest2 || request == modifyRequest2 |
| | | || request == modifyDNRequest2 || request == searchRequest2) { |
| | | return 1; |
| | | return new RequestWithIndex(request, 1); |
| | | } |
| | | |
| | | if (request == addRequest3 || request == bindRequest3 || request == compareRequest3 |
| | | || request == deleteRequest3 || request == extendedRequest3 || request == modifyRequest3 |
| | | || request == modifyDNRequest3 || request == searchRequest3) { |
| | | return 2; |
| | | return new RequestWithIndex(request, 2); |
| | | } |
| | | |
| | | fail("Received unexpected request"); |
| | | return -1; // Keep compiler happy. |
| | | return new RequestWithIndex(request, -1); // Keep compiler happy. |
| | | } |
| | | }; |
| | | } |