From 08b3f1565a41877054575135de0dac83266e8dd8 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 24 Oct 2016 09:30:24 +0000
Subject: [PATCH] OPENDJ-2870 Add a least requests load balancer algorithm
---
opendj-core/src/test/java/com/forgerock/opendj/ldap/controls/AffinityControlTestCase.java | 71 ++++
opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java | 138 ++++++++
opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java | 240 ++++++++++----
opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties | 10
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java | 85 ++++
opendj-core/src/main/java/com/forgerock/opendj/ldap/controls/AffinityControl.java | 155 +++++++++
opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java | 18
opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java | 189 +++++++++++
8 files changed, 807 insertions(+), 99 deletions(-)
diff --git a/opendj-core/src/main/java/com/forgerock/opendj/ldap/controls/AffinityControl.java b/opendj-core/src/main/java/com/forgerock/opendj/ldap/controls/AffinityControl.java
new file mode 100644
index 0000000..0a442b7
--- /dev/null
+++ b/opendj-core/src/main/java/com/forgerock/opendj/ldap/controls/AffinityControl.java
@@ -0,0 +1,155 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package com.forgerock.opendj.ldap.controls;
+
+import static com.forgerock.opendj.ldap.CoreMessages.*;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.DecodeException;
+import org.forgerock.opendj.ldap.DecodeOptions;
+import org.forgerock.opendj.ldap.controls.Control;
+import org.forgerock.opendj.ldap.controls.ControlDecoder;
+import org.forgerock.util.Reject;
+
+/**
+ * Control that provides a value for affinity.
+ * <p>
+ * As an example, this control can be used for connection affinity when using a load-balancer ({@link Connections}).
+ *
+ * @see Connections#newLeastRequestsLoadBalancer(Collection, Options)
+ */
+public final class AffinityControl implements Control {
+
+ /** OID for this control. */
+ public static final String OID = "1.3.6.1.4.1.36733.2.1.5.2";
+
+ /** A decoder which can be used for decoding the affinity control. */
+ public static final ControlDecoder<AffinityControl> DECODER =
+ new ControlDecoder<AffinityControl>() {
+
+ @Override
+ public AffinityControl decodeControl(
+ final Control control, final DecodeOptions options) throws DecodeException {
+ Reject.ifNull(control);
+
+ if (control instanceof AffinityControl) {
+ return (AffinityControl) control;
+ }
+
+ if (!control.getOID().equals(OID)) {
+ throw DecodeException.error(ERR_CONNECTION_AFFINITY_CONTROL_BAD_OID.get(control.getOID(), OID));
+ }
+
+ if (!control.hasValue()) {
+ // The control must always have a value.
+ throw DecodeException.error(ERR_CONNECTION_AFFINITY_CONTROL_DECODE_NULL.get());
+ }
+
+ return new AffinityControl(control.getValue(), control.isCritical());
+ }
+
+ @Override
+ public String getOID() {
+ return OID;
+ }
+ };
+
+ /** The affinity value is an arbitrary string. */
+ private final ByteString affinityValue;
+
+ /** Indicates if this control is critical. */
+ private final boolean isCritical;
+
+ /**
+ * Creates a new affinity control with provided value.
+ *
+ * @param affinityValue
+ * The affinity value to use
+ * @param isCritical
+ * Indicates if this control is critical
+ * @return The new control.
+ * @throws NullPointerException
+ * If {@code transactionId} was {@code null}.
+ */
+ public static AffinityControl newControl(final ByteString affinityValue, final boolean isCritical) {
+ Reject.ifNull(affinityValue);
+ return new AffinityControl(affinityValue, isCritical);
+ }
+
+ /**
+ * Creates a new affinity control with a randomly generated affinity value.
+ *
+ * @param isCritical
+ * Indicates if this control is critical
+ * @return The new control.
+ * @throws NullPointerException
+ * If {@code transactionId} was {@code null}.
+ */
+ public static AffinityControl newControl(final boolean isCritical) {
+ byte[] randomValue = new byte[5];
+ ThreadLocalRandom.current().nextBytes(randomValue);
+ return new AffinityControl(ByteString.valueOfBytes(randomValue), isCritical);
+ }
+
+ private AffinityControl(final ByteString affinityValue, final boolean isCritical) {
+ this.affinityValue = affinityValue;
+ this.isCritical = isCritical;
+ }
+
+ /**
+ * Returns the affinity value.
+ *
+ * @return The affinity value.
+ */
+ public ByteString getAffinityValue() {
+ return affinityValue;
+ }
+
+ @Override
+ public String getOID() {
+ return OID;
+ }
+
+ @Override
+ public ByteString getValue() {
+ return affinityValue;
+ }
+
+ @Override
+ public boolean hasValue() {
+ return true;
+ }
+
+ @Override
+ public boolean isCritical() {
+ return isCritical;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("AffinityControl(oid=");
+ builder.append(getOID());
+ builder.append(", affinityValue(hex)=");
+ builder.append(affinityValue.toHexString());
+ builder.append(", isCritical=");
+ builder.append(isCritical);
+ builder.append(")");
+ return builder.toString();
+ }
+}
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
index ef5656e..cadb312 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -27,7 +27,9 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
+import java.util.concurrent.atomic.AtomicLongArray;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
@@ -39,21 +41,29 @@
import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
import org.forgerock.util.Function;
import org.forgerock.util.Option;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
+import org.forgerock.util.annotations.VisibleForTesting;
import org.forgerock.util.promise.NeverThrowsException;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.time.Duration;
+import com.forgerock.opendj.ldap.CoreMessages;
+import com.forgerock.opendj.ldap.controls.AffinityControl;
+
/**
* This class contains methods for creating and manipulating connection
* factories and connections.
*/
public final class Connections {
+
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
/**
* Specifies the interval between successive attempts to reconnect to offline load-balanced connection factories.
* The default configuration is to attempt to reconnect every second.
@@ -421,6 +431,7 @@
* @return The new round-robin load balancer.
* @see #newShardedRequestLoadBalancer(Collection, Options)
* @see #newFailoverLoadBalancer(Collection, Options)
+ * @see #newLeastRequestsLoadBalancer(Collection, Options)
* @see #LOAD_BALANCER_EVENT_LISTENER
* @see #LOAD_BALANCER_MONITORING_INTERVAL
* @see #LOAD_BALANCER_SCHEDULER
@@ -488,6 +499,7 @@
* @return The new fail-over load balancer.
* @see #newRoundRobinLoadBalancer(Collection, Options)
* @see #newShardedRequestLoadBalancer(Collection, Options)
+ * @see #newLeastRequestsLoadBalancer(Collection, Options)
* @see #LOAD_BALANCER_EVENT_LISTENER
* @see #LOAD_BALANCER_MONITORING_INTERVAL
* @see #LOAD_BALANCER_SCHEDULER
@@ -539,6 +551,7 @@
* @return The new affinity load balancer.
* @see #newRoundRobinLoadBalancer(Collection, Options)
* @see #newFailoverLoadBalancer(Collection, Options)
+ * @see #newLeastRequestsLoadBalancer(Collection, Options)
* @see #LOAD_BALANCER_EVENT_LISTENER
* @see #LOAD_BALANCER_MONITORING_INTERVAL
* @see #LOAD_BALANCER_SCHEDULER
@@ -548,21 +561,23 @@
return new RequestLoadBalancer("ShardedRequestLoadBalancer",
factories,
options,
- newShardedRequestLoadBalancerFunction(factories));
+ newShardedRequestLoadBalancerNextFunction(factories),
+ NOOP_END_OF_REQUEST_FUNCTION);
+
}
- // Package private for testing.
- static Function<Request, Integer, NeverThrowsException> newShardedRequestLoadBalancerFunction(
+ @VisibleForTesting
+ static Function<Request, RequestWithIndex, NeverThrowsException> newShardedRequestLoadBalancerNextFunction(
final Collection<? extends ConnectionFactory> factories) {
- return new Function<Request, Integer, NeverThrowsException>() {
+ return new Function<Request, RequestWithIndex, NeverThrowsException>() {
private final int maxIndex = factories.size();
@Override
- public Integer apply(final Request request) {
+ public RequestWithIndex apply(final Request request) {
// Normalize the hash to a valid factory index, taking care of negative hash values and especially
// Integer.MIN_VALUE (see doc for Math.abs()).
final int index = computeIndexBasedOnDnHashCode(request);
- return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex);
+ return new RequestWithIndex(request, index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex));
}
private int computeIndexBasedOnDnHashCode(final Request request) {
@@ -620,6 +635,166 @@
}
/**
+ * Creates a new "least requests" load-balancer which will load-balance individual requests across the provided
+ * set of connection factories, each typically representing a single replica, using an algorithm that ensures that
+ * requests are routed to the replica which has the minimum number of active requests.
+ * <p>
+ * In other words, this load-balancer provides availability and partition tolerance, but sacrifices consistency.
+ * When a replica is not available, its number of active requests will not decrease until the requests time out,
+ * which will have the effect of directing requests to the other replicas. Consistency is low compared to the
+ * "sharded" load-balancer, because there is no guarantee that requests for the same DN are directed to the same
+ * replica.
+ * <p/>
+ * It is possible to increase consistency by providing a {@link AffinityControl} with a
+ * request. The control value will then be used to compute a hash that will determine the connection to use. In that
+ * case, the "least requests" behavior is completely overridden, i.e. the most saturated connection may be chosen
+ * depending on the hash value.
+ * <p/>
+ * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each
+ * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored
+ * since they cannot be routed; connection event listeners can be registered, but will only be notified when the
+ * fake connection is closed or when all of the connection factories are unavailable.
+ * <p/>
+ * <b>NOTE:</b>Server selection is only based on information which is local to the client application. If other
+ * applications are accessing the same servers then their additional load is not taken into account. Therefore,
+ * this load balancer is only effective if all client applications access the servers in a similar way.
+ * <p/>
+ * The implementation periodically attempts to connect to failed connection factories in order to determine if they
+ * have become available again.
+ *
+ * @param factories
+ * The connection factories.
+ * @param options
+ * This configuration options for the load-balancer.
+ * @return The new least requests load balancer.
+ * @see #newRoundRobinLoadBalancer(Collection, Options)
+ * @see #newFailoverLoadBalancer(Collection, Options)
+ * @see #newShardedRequestLoadBalancer(Collection, Options)
+ * @see #LOAD_BALANCER_EVENT_LISTENER
+ * @see #LOAD_BALANCER_MONITORING_INTERVAL
+ * @see #LOAD_BALANCER_SCHEDULER
+ */
+ public static ConnectionFactory newLeastRequestsLoadBalancer(
+ final Collection<? extends ConnectionFactory> factories, final Options options) {
+ final LeastRequestsDispatcher dispatcher = new LeastRequestsDispatcher(factories.size());
+ return new RequestLoadBalancer("SaturationBasedRequestLoadBalancer", factories, options,
+ newLeastRequestsLoadBalancerNextFunction(dispatcher),
+ newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher));
+ }
+
+ private static final DecodeOptions CONTROL_DECODE_OPTIONS = new DecodeOptions();
+
+ @VisibleForTesting
+ static Function<Request, RequestWithIndex, NeverThrowsException> newLeastRequestsLoadBalancerNextFunction(
+ final LeastRequestsDispatcher dispatcher) {
+ return new Function<Request, RequestWithIndex, NeverThrowsException>() {
+ private final int maxIndex = dispatcher.size();
+
+ @Override
+ public RequestWithIndex apply(final Request request) {
+ int affinityBasedIndex = parseAffinityRequestControl(request);
+ int finalIndex = dispatcher.selectServer(affinityBasedIndex);
+ Request cleanedRequest = (affinityBasedIndex == -1)
+ ? request : Requests.shallowCopyOfRequest(request, AffinityControl.OID);
+ return new RequestWithIndex(cleanedRequest, finalIndex);
+ }
+
+ private int parseAffinityRequestControl(final Request request) {
+ try {
+ AffinityControl control = request.getControl(AffinityControl.DECODER, CONTROL_DECODE_OPTIONS);
+ if (control != null) {
+ int index = control.getAffinityValue().hashCode();
+ return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex);
+ }
+ } catch (DecodeException e) {
+ logger.warn(CoreMessages.WARN_DECODING_AFFINITY_CONTROL.get(e.getMessage()));
+ }
+ return -1;
+ }
+ };
+ }
+
+ @VisibleForTesting
+ static Function<Integer, Void, NeverThrowsException> newLeastRequestsLoadBalancerEndOfRequestFunction(
+ final LeastRequestsDispatcher dispatcher) {
+ return new Function<Integer, Void, NeverThrowsException>() {
+ @Override
+ public Void apply(final Integer index) {
+ dispatcher.terminatedRequest(index);
+ return null;
+ }
+ };
+ }
+
+ /** No-op "end of request" function for the saturation-based request load balancer. */
+ static final Function<Integer, Void, NeverThrowsException> NOOP_END_OF_REQUEST_FUNCTION =
+ new Function<Integer, Void, NeverThrowsException>() {
+ @Override
+ public Void apply(Integer index) {
+ return null;
+ }
+ };
+
+ /**
+ * Dispatch requests to the server index which has the least active requests.
+ * <p>
+ * A server is actually represented only by its index. Provided an initial number of servers, the requests are
+ * dispatched to the less saturated index, i.e. which corresponds to the server that has the lowest number of active
+ * requests.
+ */
+ @VisibleForTesting
+ static class LeastRequestsDispatcher {
+ /** Counter for each server. */
+ private final AtomicLongArray serversCounters;
+
+ LeastRequestsDispatcher(int numberOfServers) {
+ serversCounters = new AtomicLongArray(numberOfServers);
+ }
+
+ int size() {
+ return serversCounters.length();
+ }
+
+ /**
+ * Returns the server index to use.
+ *
+ * @param forceIndex
+ * Forces a server index to use if different from -1. In that case, the default behavior of the
+ * dispatcher is overridden. If -1 is provided, then the default behavior of the dispatcher applies.
+ * @return the server index
+ */
+ int selectServer(int forceIndex) {
+ int index = forceIndex == -1 ? getLessSaturatedIndex() : forceIndex;
+ serversCounters.incrementAndGet(index);
+ return index;
+ }
+
+ /**
+ * Signals to this dispatcher that a request has been finished for the provided server index.
+ *
+ * @param index
+ * The index of server that processed the request.
+ */
+ void terminatedRequest(Integer index) {
+ serversCounters.decrementAndGet(index);
+ }
+
+ private int getLessSaturatedIndex() {
+ long min = Long.MAX_VALUE;
+ int minIndex = -1;
+ // Modifications during this loop are ok, effects on result is should not be dramatic
+ for (int i = 0; i < serversCounters.length(); i++) {
+ long count = serversCounters.get(i);
+ if (count < min) {
+ min = count;
+ minIndex = i;
+ }
+ }
+ return minIndex;
+ }
+ }
+
+ /**
* Creates a new connection factory which forwards connection requests to
* the provided factory, but whose {@code toString} method will always
* return {@code name}.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
index 3e12e3c..dff8713 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
@@ -59,15 +59,22 @@
* A function which returns the index of the first connection factory which should be used in order to satisfy the
* next request. Implementations may base the decision on properties of the provided request, such as the target DN,
* whether the request is a read or update request, etc.
+ * Additionally a new request is returned with the index, because some modifications may be needed (removal of
+ * a control for example) but the original request should not be modified. The new request must be used
+ * for the actual LDAP operation.
*/
- private final Function<Request, Integer, NeverThrowsException> nextFactoryFunction;
+ private final Function<Request, RequestWithIndex, NeverThrowsException> nextFactoryFunction;
+ /** A function which is called after a request is terminated. */
+ private final Function<Integer, Void, NeverThrowsException> endOfRequestFunction;
RequestLoadBalancer(final String loadBalancerName,
final Collection<? extends ConnectionFactory> factories,
final Options options,
- final Function<Request, Integer, NeverThrowsException> nextFactoryFunction) {
+ final Function<Request, RequestWithIndex, NeverThrowsException> nextFactoryFunction,
+ final Function<Integer, Void, NeverThrowsException> endOfRequestFunction) {
super(loadBalancerName, factories, options);
this.nextFactoryFunction = nextFactoryFunction;
+ this.endOfRequestFunction = endOfRequestFunction;
}
@Override
@@ -97,12 +104,15 @@
@Override
public LdapPromise<Result> addAsync(
final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
- return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
- @Override
- public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
- return connection.addAsync(request, intermediateResponseHandler);
- }
- });
+ final ConnectionContext connectionContext = getConnection(request);
+ return executeRequest(connectionContext,
+ new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.addAsync((AddRequest) connectionContext.getRequest(),
+ intermediateResponseHandler);
+ }
+ });
}
@Override
@@ -113,10 +123,12 @@
@Override
public LdapPromise<BindResult> bindAsync(
final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
- return getConnectionAndSendRequest(request, new AsyncFunction<Connection, BindResult, LdapException>() {
+ final ConnectionContext connectionContext = getConnection(request);
+ return executeRequest(connectionContext, new AsyncFunction<Connection, BindResult, LdapException>() {
@Override
public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException {
- return connection.bindAsync(request, intermediateResponseHandler);
+ return connection.bindAsync((BindRequest) connectionContext.getRequest(),
+ intermediateResponseHandler);
}
});
}
@@ -129,34 +141,44 @@
@Override
public LdapPromise<CompareResult> compareAsync(
final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
- return getConnectionAndSendRequest(request, new AsyncFunction<Connection, CompareResult, LdapException>() {
- @Override
- public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException {
- return connection.compareAsync(request, intermediateResponseHandler);
- }
- });
+ final ConnectionContext connectionContext = getConnection(request);
+ return executeRequest(connectionContext,
+ new AsyncFunction<Connection, CompareResult, LdapException>() {
+ @Override
+ public Promise<CompareResult, LdapException> apply(final Connection connection)
+ throws LdapException {
+ return connection.compareAsync((CompareRequest) connectionContext.getRequest(),
+ intermediateResponseHandler);
+ }
+ });
}
@Override
public LdapPromise<Result> deleteAsync(
final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
- return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
- @Override
- public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
- return connection.deleteAsync(request, intermediateResponseHandler);
- }
- });
+ final ConnectionContext connectionContext = getConnection(request);
+ return executeRequest(connectionContext,
+ new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.deleteAsync((DeleteRequest) connectionContext.getRequest(),
+ intermediateResponseHandler);
+ }
+ });
}
@Override
public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(
final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) {
- return getConnectionAndSendRequest(request, new AsyncFunction<Connection, R, LdapException>() {
- @Override
- public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
- return connection.extendedRequestAsync(request, intermediateResponseHandler);
- }
- });
+ final ConnectionContext connectionContext = getConnection(request);
+ return executeRequest(connectionContext,
+ new AsyncFunction<Connection, R, LdapException>() {
+ @Override
+ public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.extendedRequestAsync((ExtendedRequest<R>) connectionContext.getRequest(),
+ intermediateResponseHandler);
+ }
+ });
}
@Override
@@ -172,23 +194,29 @@
@Override
public LdapPromise<Result> modifyAsync(
final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
- return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
- @Override
- public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
- return connection.modifyAsync(request, intermediateResponseHandler);
- }
- });
+ final ConnectionContext connectionContext = getConnection(request);
+ return executeRequest(connectionContext,
+ new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.modifyAsync((ModifyRequest) connectionContext.getRequest(),
+ intermediateResponseHandler);
+ }
+ });
}
@Override
public LdapPromise<Result> modifyDNAsync(
final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
- return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
- @Override
- public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
- return connection.modifyDNAsync(request, intermediateResponseHandler);
- }
- });
+ final ConnectionContext connectionContext = getConnection(request);
+ return executeRequest(connectionContext,
+ new AsyncFunction<Connection, Result, LdapException>() {
+ @Override
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.modifyDNAsync((ModifyDNRequest) connectionContext.getRequest(),
+ intermediateResponseHandler);
+ }
+ });
}
@Override
@@ -201,51 +229,111 @@
final SearchRequest request,
final IntermediateResponseHandler intermediateResponseHandler,
final SearchResultHandler entryHandler) {
- return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
- @Override
- public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
- return connection.searchAsync(request, intermediateResponseHandler, entryHandler);
- }
- });
- }
-
- private <R> LdapPromise<R> getConnectionAndSendRequest(
- final Request request, final AsyncFunction<Connection, R, LdapException> sendRequest) {
- if (state.isClosed()) {
- throw new IllegalStateException();
- }
- final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
- return getConnectionAsync(request)
- .thenOnResult(new ResultHandler<Connection>() {
+ final ConnectionContext connectionContext = getConnection(request);
+ return executeRequest(connectionContext,
+ new AsyncFunction<Connection, Result, LdapException>() {
@Override
- public void handleResult(final Connection connection) {
- connectionHolder.set(connection);
- }
- })
- .thenAsync(sendRequest)
- .thenFinally(new Runnable() {
- @Override
- public void run() {
- closeSilently(connectionHolder.get());
+ public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+ return connection.searchAsync((SearchRequest) connectionContext.getRequest(),
+ intermediateResponseHandler,
+ entryHandler);
}
});
}
- private LdapPromise<Connection> getConnectionAsync(final Request request) {
+ private ConnectionContext getConnection(final Request request) {
+ if (state.isClosed()) {
+ throw new IllegalStateException();
+ }
try {
- final int index = nextFactoryFunction.apply(request);
- final ConnectionFactory factory = getMonitoredConnectionFactory(index);
- return LdapPromises.asPromise(factory.getConnectionAsync()
- .thenOnException(new ExceptionHandler<LdapException>() {
- @Override
- public void handleException(final LdapException e) {
- state.notifyConnectionError(false, e);
- }
- }));
+ final RequestWithIndex requestWithIndex = nextFactoryFunction.apply(request);
+ final ConnectionFactory factory = getMonitoredConnectionFactory(requestWithIndex.getServerIndex());
+ return new ConnectionContext(
+ LdapPromises.asPromise(factory.getConnectionAsync()
+ .thenOnException(new ExceptionHandler<LdapException>() {
+ @Override
+ public void handleException(final LdapException e) {
+ state.notifyConnectionError(false, e);
+ }
+ })),
+ requestWithIndex);
} catch (final LdapException e) {
state.notifyConnectionError(false, e);
- return newFailedLdapPromise(e);
+ LdapPromise<Connection> failedLdapPromise = newFailedLdapPromise(e);
+ return new ConnectionContext(failedLdapPromise, new RequestWithIndex(request, -1));
}
}
+
+ private <R> LdapPromise<R> executeRequest(final ConnectionContext connectionContext,
+ final AsyncFunction<Connection, R, LdapException> requestSender) {
+ return connectionContext.getConnectionPromise()
+ .thenOnResult(new ResultHandler<Connection>() {
+ @Override
+ public void handleResult(final Connection connection) {
+ connectionContext.setConnection(connection);
+ }
+ })
+ .thenAsync(requestSender)
+ .thenFinally(new Runnable() {
+ @Override
+ public void run() {
+ closeSilently(connectionContext.getConnection());
+ endOfRequestFunction.apply(connectionContext.getServerIndex());
+ }
+ });
+ }
+ }
+
+ /** Utility class for a request and a server index. */
+ static class RequestWithIndex {
+ private final Request request;
+ /** The index of server chosen for the connection. */
+ private final int serverIndex;
+
+ RequestWithIndex(Request request, int serverIndex) {
+ this.serverIndex = serverIndex;
+ this.request = request;
+ }
+
+ Request getRequest() {
+ return request;
+ }
+
+ int getServerIndex() {
+ return serverIndex;
+ }
+ }
+
+ /** Utility class to hold together parameters for a request and the connection used to perform it. */
+ private static class ConnectionContext {
+ private final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
+ private final LdapPromise<Connection> connectionPromise;
+ private final RequestWithIndex requestWithIndex;
+
+ ConnectionContext(LdapPromise<Connection> connectionPromise, RequestWithIndex requestWithIndex) {
+ this.requestWithIndex = requestWithIndex;
+ this.connectionPromise = connectionPromise;
+ }
+
+
+ Connection getConnection() {
+ return connectionHolder.get();
+ }
+
+ void setConnection(Connection connection) {
+ connectionHolder.set(connection);
+ }
+
+ LdapPromise<Connection> getConnectionPromise() {
+ return connectionPromise;
+ }
+
+ int getServerIndex() {
+ return requestWithIndex.getServerIndex();
+ }
+
+ Request getRequest() {
+ return requestWithIndex.getRequest();
+ }
}
}
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java
index 0963615..4691fdb 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java
@@ -12,13 +12,18 @@
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2015 ForgeRock AS.
+ * Portions copyright 2011-2016 ForgeRock AS.
*/
package org.forgerock.opendj.ldap.requests;
import static com.forgerock.opendj.util.StaticUtils.EMPTY_BYTES;
import static com.forgerock.opendj.util.StaticUtils.getBytes;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
import static com.forgerock.opendj.ldap.CoreMessages.WARN_READ_LDIF_RECORD_CHANGE_RECORD_WRONG_TYPE;
import javax.net.ssl.SSLContext;
@@ -33,13 +38,17 @@
import org.forgerock.opendj.ldap.Entry;
import org.forgerock.opendj.ldap.Filter;
import org.forgerock.opendj.ldap.LinkedHashMapEntry;
+import org.forgerock.opendj.ldap.Modification;
import org.forgerock.opendj.ldap.ModificationType;
import org.forgerock.opendj.ldap.RDN;
import org.forgerock.opendj.ldap.SearchScope;
+import org.forgerock.opendj.ldap.controls.Control;
import org.forgerock.opendj.ldif.ChangeRecord;
import org.forgerock.opendj.ldif.LDIFChangeRecordReader;
import org.forgerock.util.Reject;
+import com.forgerock.opendj.ldap.CoreMessages;
+
/**
* This class contains various methods for creating and manipulating requests.
* <p>
@@ -62,6 +71,133 @@
// TODO: synchronized requests?
/**
+ * Creates a new request that is a shallow copy of the provided request, except for
+ * controls list which is a new list containing the original controls (and not the original list
+ * of controls) possibly filtered by the provided exclusion parameter.
+ * <p>
+ * The intended usage is to be able to perform modification of the controls of a request without
+ * affecting the original request.
+ *
+ * @param request
+ * the original request
+ * @param excludeControlOids
+ * OIDs of controls to exclude from the new request
+ * @return the new request
+ */
+ public static Request shallowCopyOfRequest(Request request, String... excludeControlOids) {
+ List<String> exclusions = Arrays.asList(excludeControlOids);
+ if (request instanceof AbandonRequest) {
+ AbandonRequest req = copyOfAbandonRequest((AbandonRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof AddRequest) {
+ AddRequest req = (AddRequest) request;
+ AddRequest newReq = newAddRequest(new LinkedHashMapEntry(req));
+ return copyControls(req, newReq, exclusions);
+ } else if (request instanceof AnonymousSASLBindRequest) {
+ AnonymousSASLBindRequest req = copyOfAnonymousSASLBindRequest((AnonymousSASLBindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof CancelExtendedRequest) {
+ CancelExtendedRequest req = copyOfCancelExtendedRequest((CancelExtendedRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof CompareRequest) {
+ CompareRequest req = copyOfCompareRequest((CompareRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof CRAMMD5SASLBindRequest) {
+ CRAMMD5SASLBindRequest req = copyOfCRAMMD5SASLBindRequest((CRAMMD5SASLBindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof DeleteRequest) {
+ DeleteRequest req = copyOfDeleteRequest((DeleteRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof DigestMD5SASLBindRequest) {
+ DigestMD5SASLBindRequest req = copyOfDigestMD5SASLBindRequest((DigestMD5SASLBindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof ExternalSASLBindRequest) {
+ ExternalSASLBindRequest req = copyOfExternalSASLBindRequest((ExternalSASLBindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof GenericBindRequest) {
+ GenericBindRequest req = copyOfGenericBindRequest((GenericBindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof GenericExtendedRequest) {
+ GenericExtendedRequest req = copyOfGenericExtendedRequest((GenericExtendedRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof GSSAPISASLBindRequest) {
+ GSSAPISASLBindRequest req = copyOfGSSAPISASLBindRequest((GSSAPISASLBindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof ModifyDNRequest) {
+ ModifyDNRequest req = copyOfModifyDNRequest((ModifyDNRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof ModifyRequest) {
+ ModifyRequest req = (ModifyRequest) request;
+ ModifyRequest newReq = newModifyRequest(req.getName());
+ for (Modification mod : req.getModifications()) {
+ newReq.addModification(mod);
+ }
+ return copyControls(req, newReq, exclusions);
+ } else if (request instanceof PasswordModifyExtendedRequest) {
+ PasswordModifyExtendedRequest req =
+ copyOfPasswordModifyExtendedRequest((PasswordModifyExtendedRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof PlainSASLBindRequest) {
+ PlainSASLBindRequest req = copyOfPlainSASLBindRequest((PlainSASLBindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof SearchRequest) {
+ SearchRequest req = copyOfSearchRequest((SearchRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof SimpleBindRequest) {
+ SimpleBindRequest req = copyOfSimpleBindRequest((SimpleBindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof StartTLSExtendedRequest) {
+ StartTLSExtendedRequest req = copyOfStartTLSExtendedRequest((StartTLSExtendedRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof UnbindRequest) {
+ UnbindRequest req = copyOfUnbindRequest((UnbindRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else if (request instanceof WhoAmIExtendedRequest) {
+ WhoAmIExtendedRequest req = copyOfWhoAmIExtendedRequest((WhoAmIExtendedRequest) request);
+ filterControls(req.getControls(), exclusions);
+ return req;
+ } else {
+ throw new LocalizedIllegalArgumentException(CoreMessages.ERR_UNKNOWN_REQUEST_TYPE.get(request));
+ }
+ }
+
+ private static Request copyControls(Request source, Request destination, List<String> excludeControlOids) {
+ for (final Control control : source.getControls()) {
+ if (!excludeControlOids.contains(control.getOID())) {
+ destination.addControl(control);
+ }
+ }
+ return destination;
+ }
+
+ private static void filterControls(List<Control> controls, List<String> excludedControlOids) {
+ for (Iterator<Control> it = controls.iterator(); it.hasNext();) {
+ Control control = it.next();
+ if (excludedControlOids.contains(control.getOID())) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
* Creates a new abandon request that is an exact copy of the provided
* request.
*
diff --git a/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties b/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
index c23e254..92c89d6 100644
--- a/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
+++ b/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
@@ -1715,4 +1715,12 @@
ERR_CERT_NO_MATCH_SUBJECT=The host name contained in the subject DN '%s' \
does not match the host name '%s'
ERR_ATTRIBUTE_PARSER_MISSING_ATTRIBUTE=The entry could not be parsed because the '%s' is missing
-ERR_CONNECTION_UNEXPECTED=An error occurred during establishment of a connection: %s
\ No newline at end of file
+ERR_CONNECTION_UNEXPECTED=An error occurred during establishment of a connection: %s
+ERR_CONNECTION_AFFINITY_CONTROL_BAD_OID=Cannot decode the provided \
+ control as a connection affinity control because it contained the OID '%s', \
+ when '%s' was expected
+ERR_CONNECTION_AFFINITY_CONTROL_DECODE_NULL=Cannot decode the provided ASN.1 \
+ element as a connection affinity control because it did not have a value, when \
+ a value must always be provided
+ERR_UNKNOWN_REQUEST_TYPE=The following request has a unknown request type: %s
+WARN_DECODING_AFFINITY_CONTROL=An error occurred while decoding an affinity control: %s
\ No newline at end of file
diff --git a/opendj-core/src/test/java/com/forgerock/opendj/ldap/controls/AffinityControlTestCase.java b/opendj-core/src/test/java/com/forgerock/opendj/ldap/controls/AffinityControlTestCase.java
new file mode 100644
index 0000000..2b0b077
--- /dev/null
+++ b/opendj-core/src/test/java/com/forgerock/opendj/ldap/controls/AffinityControlTestCase.java
@@ -0,0 +1,71 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package com.forgerock.opendj.ldap.controls;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.Connection;
+import org.forgerock.opendj.ldap.DN;
+import org.forgerock.opendj.ldap.Filter;
+import org.forgerock.opendj.ldap.SearchScope;
+import org.forgerock.opendj.ldap.TestCaseUtils;
+import org.forgerock.opendj.ldap.controls.ControlsTestCase;
+import org.forgerock.opendj.ldap.requests.Requests;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.responses.SearchResultEntry;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@SuppressWarnings("javadoc")
+public class AffinityControlTestCase extends ControlsTestCase {
+
+ @Test
+ public void testControl() throws Exception {
+ // Send this control with a search request to ensure it is encoded/decoded correctly
+ final SearchRequest req = Requests.newSearchRequest(DN.valueOf("uid=user.1,ou=people,o=test"),
+ SearchScope.BASE_OBJECT, Filter.objectClassPresent());
+ final AffinityControl control = AffinityControl.newControl(ByteString.valueOfUtf8("value"), false);
+ req.addControl(control);
+ try (Connection con = TestCaseUtils.getInternalConnection()) {
+ final List<SearchResultEntry> entries = new ArrayList<>();
+ con.search(req, entries);
+ assertThat(entries).hasAtLeastOneElementOfType(SearchResultEntry.class);
+ final SearchResultEntry entry = entries.get(0);
+ assertThat(entry.getControls()).hasSize(0);
+ }
+ }
+
+ @Test
+ public void testControlGeneratesRandomValue() throws Exception {
+ final AffinityControl control = AffinityControl.newControl(true);
+ assertTrue(control.isCritical());
+ ByteString value1 = control.getAffinityValue();
+ assertThat(value1).isNotNull();
+ assertThat(value1.length()).isNotZero();
+
+ final AffinityControl control2 = AffinityControl.newControl(false);
+ ByteString value2 = control2.getAffinityValue();
+ assertFalse(control2.isCritical());
+ assertThat(value2).isNotNull();
+ assertThat(value2.length()).isNotZero();
+
+ assertThat(value1).isNotEqualTo(value2);
+ }
+}
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
index 5d72977..c1bbae9 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
@@ -17,12 +17,14 @@
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.forgerock.opendj.ldap.Connections.newShardedRequestLoadBalancerFunction;
+import static org.forgerock.opendj.ldap.Connections.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import org.forgerock.opendj.ldap.Connections.LeastRequestsDispatcher;
+import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
@@ -35,12 +37,15 @@
import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
import org.forgerock.util.Function;
import org.forgerock.util.promise.NeverThrowsException;
import org.testng.annotations.Test;
+import com.forgerock.opendj.ldap.controls.AffinityControl;
+
@SuppressWarnings("javadoc")
public class ConnectionsTestCase extends SdkTestCase {
@@ -70,8 +75,8 @@
@Test
public void shardedRequestLoadBalancerUsesConsistentIndexing() {
- final Function<Request, Integer, NeverThrowsException> f =
- newShardedRequestLoadBalancerFunction(asList(mock(ConnectionFactory.class),
+ final Function<Request, RequestWithIndex, NeverThrowsException> f =
+ newShardedRequestLoadBalancerNextFunction(asList(mock(ConnectionFactory.class),
mock(ConnectionFactory.class)));
// These two DNs have a different hash code.
@@ -138,7 +143,75 @@
assertThat(index(f, genericExtendedRequest)).isBetween(0, 1);
}
- private void assertRequestsAreRoutedConsistently(final Function<Request, Integer, NeverThrowsException> f,
+ @Test
+ public void leastRequestsDispatcherMustChooseTheLessSaturatedServer() {
+ LeastRequestsDispatcher dispatcher = new Connections.LeastRequestsDispatcher(3);
+ Function<Request, RequestWithIndex, NeverThrowsException> next =
+ newLeastRequestsLoadBalancerNextFunction(dispatcher);
+ Function<Integer, Void, NeverThrowsException> end =
+ newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher);
+
+ final SearchRequest[] reqs = new SearchRequest[11];
+ for (int i = 0; i < reqs.length; i++) {
+ reqs[i] = mock(SearchRequest.class);
+ }
+ assertThat(next.apply(reqs[0]).getServerIndex()).isEqualTo(0); // number of reqs = [1, 0, 0]
+ assertThat(next.apply(reqs[1]).getServerIndex()).isEqualTo(1); // number of reqs = [1, 1, 0]
+ assertThat(next.apply(reqs[2]).getServerIndex()).isEqualTo(2); // number of reqs = [1, 1, 1]
+ end.apply(1); // number of reqs = [1, 0, 1]
+ assertThat(next.apply(reqs[3]).getServerIndex()).isEqualTo(1); // number of reqs = [1, 1, 1]
+ end.apply(1); // number of reqs = [1, 0, 1]
+ assertThat(next.apply(reqs[5]).getServerIndex()).isEqualTo(1); // number of reqs = [1, 1, 1]
+ assertThat(next.apply(reqs[6]).getServerIndex()).isEqualTo(0); // number of reqs = [2, 1, 1]
+ assertThat(next.apply(reqs[7]).getServerIndex()).isEqualTo(1); // number of reqs = [2, 2, 1]
+ assertThat(next.apply(reqs[8]).getServerIndex()).isEqualTo(2); // number of reqs = [2, 2, 2]
+ assertThat(next.apply(reqs[9]).getServerIndex()).isEqualTo(0); // number of reqs = [3, 2, 2]
+ end.apply(2); // number of reqs = [3, 2, 1]
+ assertThat(next.apply(reqs[10]).getServerIndex()).isEqualTo(2); // number of reqs = [3, 2, 2]
+ }
+
+ @Test
+ public void leastRequestsDispatcherMustTakeConnectionAffinityControlIntoAccount() {
+ LeastRequestsDispatcher dispatcher = new Connections.LeastRequestsDispatcher(3);
+ Function<Request, RequestWithIndex, NeverThrowsException> next =
+ newLeastRequestsLoadBalancerNextFunction(dispatcher);
+
+ final Request[] reqs = new Request[11];
+ for (int i = 0; i < reqs.length; i++) {
+ reqs[i] = Requests.newDeleteRequest("o=example");
+ }
+ assertThat(next.apply(reqs[0]).getServerIndex()).isEqualTo(0); // number of reqs = [1, 0, 0]
+
+ // control should now force the same connection three times
+ // control should be removed from the RequestWithIndex object used to perform the actual query
+ for (int i = 1; i <= 3; i++) {
+ reqs[i].addControl(AffinityControl.newControl(ByteString.valueOfUtf8("val"), false));
+ }
+ RequestWithIndex req1 = next.apply(reqs[1]);
+ assertThat(req1.getServerIndex()).isEqualTo(0); // number of reqs = [2, 0, 0]
+ assertThat(req1.getRequest().getControls()).isEmpty();
+ assertThat(reqs[1].getControls()).hasSize(1);
+
+ RequestWithIndex req2 = next.apply(reqs[2]);
+ assertThat(req2.getServerIndex()).isEqualTo(0); // number of reqs = [3, 0, 0]
+ assertThat(req2.getRequest().getControls()).isEmpty();
+ assertThat(reqs[2].getControls()).hasSize(1);
+
+ RequestWithIndex req3 = next.apply(reqs[3]);
+ assertThat(req3.getServerIndex()).isEqualTo(0); // number of reqs = [4, 0, 0]
+ assertThat(req3.getRequest().getControls()).isEmpty();
+ assertThat(reqs[3].getControls()).hasSize(1);
+
+ // back to default "saturation-based" behavior
+ assertThat(next.apply(reqs[4]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 1, 0]
+ assertThat(next.apply(reqs[5]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 1, 1]
+ assertThat(next.apply(reqs[6]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 2, 1]
+ assertThat(next.apply(reqs[7]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 2, 2]
+ assertThat(next.apply(reqs[8]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 3, 2]
+ assertThat(next.apply(reqs[9]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 3, 3]
+ }
+
+ private void assertRequestsAreRoutedConsistently(final Function<Request, RequestWithIndex, NeverThrowsException> f,
final Request r,
final int firstExpectedIndex,
final int secondExpectedIndex) {
@@ -146,7 +219,7 @@
assertThat(index(f, r)).isEqualTo(secondExpectedIndex);
}
- private int index(final Function<Request, Integer, NeverThrowsException> function, final Request request) {
- return function.apply(request);
+ private int index(final Function<Request, RequestWithIndex, NeverThrowsException> function, final Request request) {
+ return function.apply(request).getServerIndex();
}
}
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
index e1e6d0b..6aaa060 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
@@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.logging.Level;
+import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
@@ -850,33 +851,34 @@
stub(this.connection3);
loadBalancer = new RequestLoadBalancer("Test",
asList(factory1, factory2, factory3),
- defaultOptions(), newNextFactoryFunction());
+ defaultOptions(), newNextFactoryFunction(),
+ Connections.NOOP_END_OF_REQUEST_FUNCTION);
}
- private Function<Request, Integer, NeverThrowsException> newNextFactoryFunction() {
- return new Function<Request, Integer, NeverThrowsException>() {
+ private Function<Request, RequestWithIndex, NeverThrowsException> newNextFactoryFunction() {
+ return new Function<Request, RequestWithIndex, NeverThrowsException>() {
@Override
- public Integer apply(final Request request) {
+ public RequestWithIndex apply(final Request request) {
if (request == addRequest1 || request == bindRequest1 || request == compareRequest1
|| request == deleteRequest1 || request == extendedRequest1 || request == modifyRequest1
|| request == modifyDNRequest1 || request == searchRequest1) {
- return 0;
+ return new RequestWithIndex(request, 0);
}
if (request == addRequest2 || request == bindRequest2 || request == compareRequest2
|| request == deleteRequest2 || request == extendedRequest2 || request == modifyRequest2
|| request == modifyDNRequest2 || request == searchRequest2) {
- return 1;
+ return new RequestWithIndex(request, 1);
}
if (request == addRequest3 || request == bindRequest3 || request == compareRequest3
|| request == deleteRequest3 || request == extendedRequest3 || request == modifyRequest3
|| request == modifyDNRequest3 || request == searchRequest3) {
- return 2;
+ return new RequestWithIndex(request, 2);
}
fail("Received unexpected request");
- return -1; // Keep compiler happy.
+ return new RequestWithIndex(request, -1); // Keep compiler happy.
}
};
}
--
Gitblit v1.10.0