From 08b3f1565a41877054575135de0dac83266e8dd8 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 24 Oct 2016 09:30:24 +0000
Subject: [PATCH] OPENDJ-2870 Add a least requests load balancer algorithm

---
 opendj-core/src/test/java/com/forgerock/opendj/ldap/controls/AffinityControlTestCase.java |   71 ++++
 opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java                |  138 ++++++++
 opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java              |  240 ++++++++++----
 opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties                  |   10 
 opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java              |   85 ++++
 opendj-core/src/main/java/com/forgerock/opendj/ldap/controls/AffinityControl.java         |  155 +++++++++
 opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java      |   18 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java                      |  189 +++++++++++
 8 files changed, 807 insertions(+), 99 deletions(-)

diff --git a/opendj-core/src/main/java/com/forgerock/opendj/ldap/controls/AffinityControl.java b/opendj-core/src/main/java/com/forgerock/opendj/ldap/controls/AffinityControl.java
new file mode 100644
index 0000000..0a442b7
--- /dev/null
+++ b/opendj-core/src/main/java/com/forgerock/opendj/ldap/controls/AffinityControl.java
@@ -0,0 +1,155 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package com.forgerock.opendj.ldap.controls;
+
+import static com.forgerock.opendj.ldap.CoreMessages.*;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.DecodeException;
+import org.forgerock.opendj.ldap.DecodeOptions;
+import org.forgerock.opendj.ldap.controls.Control;
+import org.forgerock.opendj.ldap.controls.ControlDecoder;
+import org.forgerock.util.Reject;
+
+/**
+ * Control that provides a value for affinity.
+ * <p>
+ * As an example, this control can be used for connection affinity when using a load-balancer ({@link Connections}).
+ *
+ * @see Connections#newLeastRequestsLoadBalancer(Collection, Options)
+ */
+public final class AffinityControl implements Control {
+
+    /** OID for this control. */
+    public static final String OID = "1.3.6.1.4.1.36733.2.1.5.2";
+
+    /** A decoder which can be used for decoding the affinity control. */
+    public static final ControlDecoder<AffinityControl> DECODER =
+            new ControlDecoder<AffinityControl>() {
+
+                @Override
+                public AffinityControl decodeControl(
+                        final Control control, final DecodeOptions options) throws DecodeException {
+                    Reject.ifNull(control);
+
+                    if (control instanceof AffinityControl) {
+                        return (AffinityControl) control;
+                    }
+
+                    if (!control.getOID().equals(OID)) {
+                        throw DecodeException.error(ERR_CONNECTION_AFFINITY_CONTROL_BAD_OID.get(control.getOID(), OID));
+                    }
+
+                    if (!control.hasValue()) {
+                        // The control must always have a value.
+                        throw DecodeException.error(ERR_CONNECTION_AFFINITY_CONTROL_DECODE_NULL.get());
+                    }
+
+                    return new AffinityControl(control.getValue(), control.isCritical());
+                }
+
+                @Override
+                public String getOID() {
+                    return OID;
+                }
+            };
+
+    /** The affinity value is an arbitrary string. */
+    private final ByteString affinityValue;
+
+    /** Indicates if this control is critical. */
+    private final boolean isCritical;
+
+    /**
+     * Creates a new affinity control with provided value.
+     *
+     * @param affinityValue
+     *            The affinity value to use
+     * @param isCritical
+     *            Indicates if this control is critical
+     * @return The new control.
+     * @throws NullPointerException
+     *             If {@code transactionId} was {@code null}.
+     */
+    public static AffinityControl newControl(final ByteString affinityValue, final boolean isCritical) {
+        Reject.ifNull(affinityValue);
+        return new AffinityControl(affinityValue, isCritical);
+    }
+
+    /**
+     * Creates a new affinity control with a randomly generated affinity value.
+     *
+     * @param isCritical
+     *          Indicates if this control is critical
+     * @return The new control.
+     * @throws NullPointerException
+     *             If {@code transactionId} was {@code null}.
+     */
+    public static AffinityControl newControl(final boolean isCritical) {
+        byte[] randomValue = new byte[5];
+        ThreadLocalRandom.current().nextBytes(randomValue);
+        return new AffinityControl(ByteString.valueOfBytes(randomValue), isCritical);
+    }
+
+    private AffinityControl(final ByteString affinityValue, final boolean isCritical) {
+        this.affinityValue = affinityValue;
+        this.isCritical = isCritical;
+    }
+
+    /**
+     * Returns the affinity value.
+     *
+     * @return The affinity value.
+     */
+    public ByteString getAffinityValue() {
+        return affinityValue;
+    }
+
+    @Override
+    public String getOID() {
+        return OID;
+    }
+
+    @Override
+    public ByteString getValue() {
+        return affinityValue;
+    }
+
+    @Override
+    public boolean hasValue() {
+        return true;
+    }
+
+    @Override
+    public boolean isCritical() {
+        return isCritical;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder builder = new StringBuilder();
+        builder.append("AffinityControl(oid=");
+        builder.append(getOID());
+        builder.append(", affinityValue(hex)=");
+        builder.append(affinityValue.toHexString());
+        builder.append(", isCritical=");
+        builder.append(isCritical);
+        builder.append(")");
+        return builder.toString();
+    }
+}
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
index ef5656e..cadb312 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -27,7 +27,9 @@
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import java.util.concurrent.atomic.AtomicLongArray;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex;
 import org.forgerock.opendj.ldap.requests.AddRequest;
 import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
 import org.forgerock.opendj.ldap.requests.CompareRequest;
@@ -39,21 +41,29 @@
 import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
 import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
 import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.Requests;
 import org.forgerock.opendj.ldap.requests.SearchRequest;
 import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
 import org.forgerock.util.Function;
 import org.forgerock.util.Option;
 import org.forgerock.util.Options;
 import org.forgerock.util.Reject;
+import org.forgerock.util.annotations.VisibleForTesting;
 import org.forgerock.util.promise.NeverThrowsException;
 import org.forgerock.util.promise.Promise;
 import org.forgerock.util.time.Duration;
 
+import com.forgerock.opendj.ldap.CoreMessages;
+import com.forgerock.opendj.ldap.controls.AffinityControl;
+
 /**
  * This class contains methods for creating and manipulating connection
  * factories and connections.
  */
 public final class Connections {
+
+    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
     /**
      * Specifies the interval between successive attempts to reconnect to offline load-balanced connection factories.
      * The default configuration is to attempt to reconnect every second.
@@ -421,6 +431,7 @@
      * @return The new round-robin load balancer.
      * @see #newShardedRequestLoadBalancer(Collection, Options)
      * @see #newFailoverLoadBalancer(Collection, Options)
+     * @see #newLeastRequestsLoadBalancer(Collection, Options)
      * @see #LOAD_BALANCER_EVENT_LISTENER
      * @see #LOAD_BALANCER_MONITORING_INTERVAL
      * @see #LOAD_BALANCER_SCHEDULER
@@ -488,6 +499,7 @@
      * @return The new fail-over load balancer.
      * @see #newRoundRobinLoadBalancer(Collection, Options)
      * @see #newShardedRequestLoadBalancer(Collection, Options)
+     * @see #newLeastRequestsLoadBalancer(Collection, Options)
      * @see #LOAD_BALANCER_EVENT_LISTENER
      * @see #LOAD_BALANCER_MONITORING_INTERVAL
      * @see #LOAD_BALANCER_SCHEDULER
@@ -539,6 +551,7 @@
      * @return The new affinity load balancer.
      * @see #newRoundRobinLoadBalancer(Collection, Options)
      * @see #newFailoverLoadBalancer(Collection, Options)
+     * @see #newLeastRequestsLoadBalancer(Collection, Options)
      * @see #LOAD_BALANCER_EVENT_LISTENER
      * @see #LOAD_BALANCER_MONITORING_INTERVAL
      * @see #LOAD_BALANCER_SCHEDULER
@@ -548,21 +561,23 @@
         return new RequestLoadBalancer("ShardedRequestLoadBalancer",
                                        factories,
                                        options,
-                                       newShardedRequestLoadBalancerFunction(factories));
+                                       newShardedRequestLoadBalancerNextFunction(factories),
+                                       NOOP_END_OF_REQUEST_FUNCTION);
+
     }
 
-    // Package private for testing.
-    static Function<Request, Integer, NeverThrowsException> newShardedRequestLoadBalancerFunction(
+    @VisibleForTesting
+    static Function<Request, RequestWithIndex, NeverThrowsException> newShardedRequestLoadBalancerNextFunction(
             final Collection<? extends ConnectionFactory> factories) {
-        return new Function<Request, Integer, NeverThrowsException>() {
+        return new Function<Request, RequestWithIndex, NeverThrowsException>() {
             private final int maxIndex = factories.size();
 
             @Override
-            public Integer apply(final Request request) {
+            public RequestWithIndex apply(final Request request) {
                 // Normalize the hash to a valid factory index, taking care of negative hash values and especially
                 // Integer.MIN_VALUE (see doc for Math.abs()).
                 final int index = computeIndexBasedOnDnHashCode(request);
-                return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex);
+                return new RequestWithIndex(request, index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex));
             }
 
             private int computeIndexBasedOnDnHashCode(final Request request) {
@@ -620,6 +635,166 @@
     }
 
     /**
+     * Creates a new "least requests" load-balancer which will load-balance individual requests across the provided
+     * set of connection factories, each typically representing a single replica, using an algorithm that ensures that
+     * requests are routed to the replica which has the minimum number of active requests.
+     * <p>
+     * In other words, this load-balancer provides availability and partition tolerance, but sacrifices consistency.
+     * When a replica is not available, its number of active requests will not decrease until the requests time out,
+     * which will have the effect of directing requests to the other replicas. Consistency is low compared to the
+     * "sharded" load-balancer, because there is no guarantee that requests for the same DN are directed to the same
+     * replica.
+     * <p/>
+     * It is possible to increase consistency by providing a {@link AffinityControl} with a
+     * request. The control value will then be used to compute a hash that will determine the connection to use. In that
+     * case, the "least requests" behavior is completely overridden, i.e. the most saturated connection may be chosen
+     * depending on the hash value.
+     * <p/>
+     * <b>NOTE:</b> this connection factory returns fake connections, since real connections are obtained for each
+     * request. Therefore, the returned fake connections have certain limitations: abandon requests will be ignored
+     * since they cannot be routed; connection event listeners can be registered, but will only be notified when the
+     * fake connection is closed or when all of the connection factories are unavailable.
+     * <p/>
+     * <b>NOTE:</b>Server selection is only based on information which is local to the client application. If other
+     * applications are accessing the same servers then their additional load is not taken into account. Therefore,
+     * this load balancer is only effective if all client applications access the servers in a similar way.
+     * <p/>
+     * The implementation periodically attempts to connect to failed connection factories in order to determine if they
+     * have become available again.
+     *
+     * @param factories
+     *            The connection factories.
+     * @param options
+     *            This configuration options for the load-balancer.
+     * @return The new least requests load balancer.
+     * @see #newRoundRobinLoadBalancer(Collection, Options)
+     * @see #newFailoverLoadBalancer(Collection, Options)
+     * @see #newShardedRequestLoadBalancer(Collection, Options)
+     * @see #LOAD_BALANCER_EVENT_LISTENER
+     * @see #LOAD_BALANCER_MONITORING_INTERVAL
+     * @see #LOAD_BALANCER_SCHEDULER
+     */
+    public static ConnectionFactory newLeastRequestsLoadBalancer(
+            final Collection<? extends ConnectionFactory> factories, final Options options) {
+        final LeastRequestsDispatcher dispatcher = new LeastRequestsDispatcher(factories.size());
+        return new RequestLoadBalancer("SaturationBasedRequestLoadBalancer", factories, options,
+                newLeastRequestsLoadBalancerNextFunction(dispatcher),
+                newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher));
+    }
+
+    private static final DecodeOptions CONTROL_DECODE_OPTIONS = new DecodeOptions();
+
+    @VisibleForTesting
+    static Function<Request, RequestWithIndex, NeverThrowsException> newLeastRequestsLoadBalancerNextFunction(
+            final LeastRequestsDispatcher dispatcher) {
+        return new Function<Request, RequestWithIndex, NeverThrowsException>() {
+            private final int maxIndex = dispatcher.size();
+
+            @Override
+            public RequestWithIndex apply(final Request request) {
+                int affinityBasedIndex = parseAffinityRequestControl(request);
+                int finalIndex = dispatcher.selectServer(affinityBasedIndex);
+                Request cleanedRequest = (affinityBasedIndex == -1)
+                        ? request : Requests.shallowCopyOfRequest(request, AffinityControl.OID);
+                return new RequestWithIndex(cleanedRequest, finalIndex);
+            }
+
+            private int parseAffinityRequestControl(final Request request) {
+                try {
+                    AffinityControl control = request.getControl(AffinityControl.DECODER, CONTROL_DECODE_OPTIONS);
+                    if (control != null) {
+                        int index = control.getAffinityValue().hashCode();
+                        return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % maxIndex);
+                    }
+                } catch (DecodeException e) {
+                    logger.warn(CoreMessages.WARN_DECODING_AFFINITY_CONTROL.get(e.getMessage()));
+                }
+                return -1;
+            }
+        };
+    }
+
+    @VisibleForTesting
+    static Function<Integer, Void, NeverThrowsException> newLeastRequestsLoadBalancerEndOfRequestFunction(
+            final LeastRequestsDispatcher dispatcher) {
+        return new Function<Integer, Void, NeverThrowsException>() {
+            @Override
+            public Void apply(final Integer index) {
+                dispatcher.terminatedRequest(index);
+                return null;
+            }
+        };
+    }
+
+    /** No-op "end of request" function for the saturation-based request load balancer. */
+    static final Function<Integer, Void, NeverThrowsException> NOOP_END_OF_REQUEST_FUNCTION =
+            new Function<Integer, Void, NeverThrowsException>() {
+                @Override
+                public Void apply(Integer index) {
+                    return null;
+                }
+            };
+
+    /**
+     * Dispatch requests to the server index which has the least active requests.
+     * <p>
+     * A server is actually represented only by its index. Provided an initial number of servers, the requests are
+     * dispatched to the less saturated index, i.e. which corresponds to the server that has the lowest number of active
+     * requests.
+     */
+    @VisibleForTesting
+    static class LeastRequestsDispatcher {
+        /** Counter for each server. */
+        private final AtomicLongArray serversCounters;
+
+        LeastRequestsDispatcher(int numberOfServers) {
+            serversCounters = new AtomicLongArray(numberOfServers);
+        }
+
+        int size() {
+            return serversCounters.length();
+        }
+
+        /**
+         * Returns the server index to use.
+         *
+         * @param forceIndex
+         *            Forces a server index to use if different from -1. In that case, the default behavior of the
+         *            dispatcher is overridden. If -1 is provided, then the default behavior of the dispatcher applies.
+         * @return the server index
+         */
+        int selectServer(int forceIndex) {
+            int index = forceIndex == -1 ? getLessSaturatedIndex() : forceIndex;
+            serversCounters.incrementAndGet(index);
+            return index;
+        }
+
+        /**
+         * Signals to this dispatcher that a request has been finished for the provided server index.
+         *
+         * @param index
+         *            The index of server that processed the request.
+         */
+        void terminatedRequest(Integer index) {
+            serversCounters.decrementAndGet(index);
+        }
+
+        private int getLessSaturatedIndex() {
+            long min = Long.MAX_VALUE;
+            int minIndex = -1;
+            // Modifications during this loop are ok, effects on result is should not be dramatic
+            for (int i = 0; i < serversCounters.length(); i++) {
+                long count = serversCounters.get(i);
+                if (count < min) {
+                    min = count;
+                    minIndex = i;
+                }
+            }
+            return minIndex;
+        }
+    }
+
+    /**
      * Creates a new connection factory which forwards connection requests to
      * the provided factory, but whose {@code toString} method will always
      * return {@code name}.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
index 3e12e3c..dff8713 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/RequestLoadBalancer.java
@@ -59,15 +59,22 @@
      * A function which returns the index of the first connection factory which should be used in order to satisfy the
      * next request. Implementations may base the decision on properties of the provided request, such as the target DN,
      * whether the request is a read or update request, etc.
+     * Additionally a new request is returned with the index, because some modifications may be needed (removal of
+     * a control for example) but the original request should not be modified. The new request must be used
+     * for the actual LDAP operation.
      */
-    private final Function<Request, Integer, NeverThrowsException> nextFactoryFunction;
+    private final Function<Request, RequestWithIndex, NeverThrowsException> nextFactoryFunction;
+    /** A function which is called after a request is terminated. */
+    private final Function<Integer, Void, NeverThrowsException> endOfRequestFunction;
 
     RequestLoadBalancer(final String loadBalancerName,
                         final Collection<? extends ConnectionFactory> factories,
                         final Options options,
-                        final Function<Request, Integer, NeverThrowsException> nextFactoryFunction) {
+                        final Function<Request, RequestWithIndex, NeverThrowsException> nextFactoryFunction,
+                        final Function<Integer, Void, NeverThrowsException> endOfRequestFunction) {
         super(loadBalancerName, factories, options);
         this.nextFactoryFunction = nextFactoryFunction;
+        this.endOfRequestFunction = endOfRequestFunction;
     }
 
     @Override
@@ -97,12 +104,15 @@
         @Override
         public LdapPromise<Result> addAsync(
                 final AddRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
-            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
-                @Override
-                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
-                    return connection.addAsync(request, intermediateResponseHandler);
-                }
-            });
+            final ConnectionContext connectionContext = getConnection(request);
+            return executeRequest(connectionContext,
+                    new AsyncFunction<Connection, Result, LdapException>() {
+                        @Override
+                        public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                            return connection.addAsync((AddRequest) connectionContext.getRequest(),
+                                                       intermediateResponseHandler);
+                        }
+                    });
         }
 
         @Override
@@ -113,10 +123,12 @@
         @Override
         public LdapPromise<BindResult> bindAsync(
                 final BindRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
-            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, BindResult, LdapException>() {
+            final ConnectionContext connectionContext = getConnection(request);
+            return executeRequest(connectionContext, new AsyncFunction<Connection, BindResult, LdapException>() {
                 @Override
                 public Promise<BindResult, LdapException> apply(final Connection connection) throws LdapException {
-                    return connection.bindAsync(request, intermediateResponseHandler);
+                    return connection.bindAsync((BindRequest) connectionContext.getRequest(),
+                                                intermediateResponseHandler);
                 }
             });
         }
@@ -129,34 +141,44 @@
         @Override
         public LdapPromise<CompareResult> compareAsync(
                 final CompareRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
-            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, CompareResult, LdapException>() {
-                @Override
-                public Promise<CompareResult, LdapException> apply(final Connection connection) throws LdapException {
-                    return connection.compareAsync(request, intermediateResponseHandler);
-                }
-            });
+            final ConnectionContext connectionContext = getConnection(request);
+            return executeRequest(connectionContext,
+                    new AsyncFunction<Connection, CompareResult, LdapException>() {
+                        @Override
+                        public Promise<CompareResult, LdapException> apply(final Connection connection)
+                                throws LdapException {
+                            return connection.compareAsync((CompareRequest) connectionContext.getRequest(),
+                                                           intermediateResponseHandler);
+                        }
+                    });
         }
 
         @Override
         public LdapPromise<Result> deleteAsync(
                 final DeleteRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
-            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
-                @Override
-                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
-                    return connection.deleteAsync(request, intermediateResponseHandler);
-                }
-            });
+            final ConnectionContext connectionContext = getConnection(request);
+            return executeRequest(connectionContext,
+                    new AsyncFunction<Connection, Result, LdapException>() {
+                        @Override
+                        public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                            return connection.deleteAsync((DeleteRequest) connectionContext.getRequest(),
+                                                          intermediateResponseHandler);
+                        }
+                    });
         }
 
         @Override
         public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(
                 final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) {
-            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, R, LdapException>() {
-                @Override
-                public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
-                    return connection.extendedRequestAsync(request, intermediateResponseHandler);
-                }
-            });
+            final ConnectionContext connectionContext = getConnection(request);
+            return executeRequest(connectionContext,
+                    new AsyncFunction<Connection, R, LdapException>() {
+                        @Override
+                        public Promise<R, LdapException> apply(final Connection connection) throws LdapException {
+                            return connection.extendedRequestAsync((ExtendedRequest<R>) connectionContext.getRequest(),
+                                                                   intermediateResponseHandler);
+                        }
+                    });
         }
 
         @Override
@@ -172,23 +194,29 @@
         @Override
         public LdapPromise<Result> modifyAsync(
                 final ModifyRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
-            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
-                @Override
-                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
-                    return connection.modifyAsync(request, intermediateResponseHandler);
-                }
-            });
+            final ConnectionContext connectionContext = getConnection(request);
+            return executeRequest(connectionContext,
+                    new AsyncFunction<Connection, Result, LdapException>() {
+                        @Override
+                        public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                            return connection.modifyAsync((ModifyRequest) connectionContext.getRequest(),
+                                                          intermediateResponseHandler);
+                        }
+                    });
         }
 
         @Override
         public LdapPromise<Result> modifyDNAsync(
                 final ModifyDNRequest request, final IntermediateResponseHandler intermediateResponseHandler) {
-            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
-                @Override
-                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
-                    return connection.modifyDNAsync(request, intermediateResponseHandler);
-                }
-            });
+            final ConnectionContext connectionContext = getConnection(request);
+            return executeRequest(connectionContext,
+                    new AsyncFunction<Connection, Result, LdapException>() {
+                        @Override
+                        public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                            return connection.modifyDNAsync((ModifyDNRequest) connectionContext.getRequest(),
+                                                            intermediateResponseHandler);
+                        }
+                    });
         }
 
         @Override
@@ -201,51 +229,111 @@
                 final SearchRequest request,
                 final IntermediateResponseHandler intermediateResponseHandler,
                 final SearchResultHandler entryHandler) {
-            return getConnectionAndSendRequest(request, new AsyncFunction<Connection, Result, LdapException>() {
-                @Override
-                public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
-                    return connection.searchAsync(request, intermediateResponseHandler, entryHandler);
-                }
-            });
-        }
-
-        private <R> LdapPromise<R> getConnectionAndSendRequest(
-                final Request request, final AsyncFunction<Connection, R, LdapException> sendRequest) {
-            if (state.isClosed()) {
-                throw new IllegalStateException();
-            }
-            final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
-            return getConnectionAsync(request)
-                    .thenOnResult(new ResultHandler<Connection>() {
+            final ConnectionContext connectionContext = getConnection(request);
+            return executeRequest(connectionContext,
+                    new AsyncFunction<Connection, Result, LdapException>() {
                         @Override
-                        public void handleResult(final Connection connection) {
-                            connectionHolder.set(connection);
-                        }
-                    })
-                    .thenAsync(sendRequest)
-                    .thenFinally(new Runnable() {
-                        @Override
-                        public void run() {
-                            closeSilently(connectionHolder.get());
+                        public Promise<Result, LdapException> apply(final Connection connection) throws LdapException {
+                            return connection.searchAsync((SearchRequest) connectionContext.getRequest(),
+                                                          intermediateResponseHandler,
+                                                          entryHandler);
                         }
                     });
         }
 
-        private LdapPromise<Connection> getConnectionAsync(final Request request) {
+        private ConnectionContext getConnection(final Request request) {
+            if (state.isClosed()) {
+                throw new IllegalStateException();
+            }
             try {
-                final int index = nextFactoryFunction.apply(request);
-                final ConnectionFactory factory = getMonitoredConnectionFactory(index);
-                return LdapPromises.asPromise(factory.getConnectionAsync()
-                                                     .thenOnException(new ExceptionHandler<LdapException>() {
-                                                         @Override
-                                                         public void handleException(final LdapException e) {
-                                                             state.notifyConnectionError(false, e);
-                                                         }
-                                                     }));
+                final RequestWithIndex requestWithIndex = nextFactoryFunction.apply(request);
+                final ConnectionFactory factory = getMonitoredConnectionFactory(requestWithIndex.getServerIndex());
+                return new ConnectionContext(
+                        LdapPromises.asPromise(factory.getConnectionAsync()
+                                .thenOnException(new ExceptionHandler<LdapException>() {
+                                    @Override
+                                    public void handleException(final LdapException e) {
+                                        state.notifyConnectionError(false, e);
+                                    }
+                                })),
+                        requestWithIndex);
             } catch (final LdapException e) {
                 state.notifyConnectionError(false, e);
-                return newFailedLdapPromise(e);
+                LdapPromise<Connection> failedLdapPromise = newFailedLdapPromise(e);
+                return new ConnectionContext(failedLdapPromise, new RequestWithIndex(request, -1));
             }
         }
+
+        private <R> LdapPromise<R> executeRequest(final ConnectionContext connectionContext,
+                final AsyncFunction<Connection, R, LdapException> requestSender) {
+            return connectionContext.getConnectionPromise()
+                    .thenOnResult(new ResultHandler<Connection>() {
+                        @Override
+                        public void handleResult(final Connection connection) {
+                            connectionContext.setConnection(connection);
+                        }
+                    })
+                    .thenAsync(requestSender)
+                    .thenFinally(new Runnable() {
+                        @Override
+                        public void run() {
+                            closeSilently(connectionContext.getConnection());
+                            endOfRequestFunction.apply(connectionContext.getServerIndex());
+                        }
+                    });
+        }
+    }
+
+    /** Utility class for a request and a server index. */
+    static class RequestWithIndex {
+        private final Request request;
+        /** The index of server chosen for the connection. */
+        private final int serverIndex;
+
+        RequestWithIndex(Request request, int serverIndex) {
+            this.serverIndex = serverIndex;
+            this.request = request;
+        }
+
+        Request getRequest() {
+            return request;
+        }
+
+        int getServerIndex() {
+            return serverIndex;
+        }
+    }
+
+    /** Utility class to hold together parameters for a request and the connection used to perform it. */
+    private static class ConnectionContext {
+        private final AtomicReference<Connection> connectionHolder = new AtomicReference<>();
+        private final LdapPromise<Connection> connectionPromise;
+        private final RequestWithIndex requestWithIndex;
+
+        ConnectionContext(LdapPromise<Connection> connectionPromise, RequestWithIndex requestWithIndex) {
+            this.requestWithIndex = requestWithIndex;
+            this.connectionPromise = connectionPromise;
+        }
+
+
+        Connection getConnection() {
+            return connectionHolder.get();
+        }
+
+        void setConnection(Connection connection) {
+            connectionHolder.set(connection);
+        }
+
+        LdapPromise<Connection> getConnectionPromise() {
+            return connectionPromise;
+        }
+
+        int getServerIndex() {
+            return requestWithIndex.getServerIndex();
+        }
+
+        Request getRequest() {
+            return requestWithIndex.getRequest();
+        }
     }
 }
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java
index 0963615..4691fdb 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/requests/Requests.java
@@ -12,13 +12,18 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2015 ForgeRock AS.
+ * Portions copyright 2011-2016 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap.requests;
 
 import static com.forgerock.opendj.util.StaticUtils.EMPTY_BYTES;
 import static com.forgerock.opendj.util.StaticUtils.getBytes;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
 import static com.forgerock.opendj.ldap.CoreMessages.WARN_READ_LDIF_RECORD_CHANGE_RECORD_WRONG_TYPE;
 
 import javax.net.ssl.SSLContext;
@@ -33,13 +38,17 @@
 import org.forgerock.opendj.ldap.Entry;
 import org.forgerock.opendj.ldap.Filter;
 import org.forgerock.opendj.ldap.LinkedHashMapEntry;
+import org.forgerock.opendj.ldap.Modification;
 import org.forgerock.opendj.ldap.ModificationType;
 import org.forgerock.opendj.ldap.RDN;
 import org.forgerock.opendj.ldap.SearchScope;
+import org.forgerock.opendj.ldap.controls.Control;
 import org.forgerock.opendj.ldif.ChangeRecord;
 import org.forgerock.opendj.ldif.LDIFChangeRecordReader;
 import org.forgerock.util.Reject;
 
+import com.forgerock.opendj.ldap.CoreMessages;
+
 /**
  * This class contains various methods for creating and manipulating requests.
  * <p>
@@ -62,6 +71,133 @@
     // TODO: synchronized requests?
 
     /**
+     * Creates a new request that is a shallow copy of the provided request, except for
+     * controls list which is a new list containing the original controls (and not the original list
+     * of controls) possibly filtered by the provided exclusion parameter.
+     * <p>
+     * The intended usage is to be able to perform modification of the controls of a request without
+     * affecting the original request.
+     *
+     * @param request
+     *          the original request
+     * @param excludeControlOids
+     *          OIDs of controls to exclude from the new request
+     * @return the new request
+     */
+    public static Request shallowCopyOfRequest(Request request, String... excludeControlOids) {
+        List<String> exclusions = Arrays.asList(excludeControlOids);
+        if (request instanceof AbandonRequest) {
+            AbandonRequest req = copyOfAbandonRequest((AbandonRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof AddRequest) {
+            AddRequest req = (AddRequest) request;
+            AddRequest newReq = newAddRequest(new LinkedHashMapEntry(req));
+            return copyControls(req, newReq, exclusions);
+        } else if (request instanceof AnonymousSASLBindRequest) {
+            AnonymousSASLBindRequest req = copyOfAnonymousSASLBindRequest((AnonymousSASLBindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof CancelExtendedRequest) {
+            CancelExtendedRequest req = copyOfCancelExtendedRequest((CancelExtendedRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof CompareRequest) {
+            CompareRequest req = copyOfCompareRequest((CompareRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof CRAMMD5SASLBindRequest) {
+            CRAMMD5SASLBindRequest req = copyOfCRAMMD5SASLBindRequest((CRAMMD5SASLBindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof DeleteRequest) {
+            DeleteRequest req = copyOfDeleteRequest((DeleteRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof DigestMD5SASLBindRequest) {
+            DigestMD5SASLBindRequest req = copyOfDigestMD5SASLBindRequest((DigestMD5SASLBindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof ExternalSASLBindRequest) {
+            ExternalSASLBindRequest req = copyOfExternalSASLBindRequest((ExternalSASLBindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof GenericBindRequest) {
+            GenericBindRequest req = copyOfGenericBindRequest((GenericBindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof GenericExtendedRequest) {
+            GenericExtendedRequest req = copyOfGenericExtendedRequest((GenericExtendedRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof GSSAPISASLBindRequest) {
+            GSSAPISASLBindRequest req = copyOfGSSAPISASLBindRequest((GSSAPISASLBindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof ModifyDNRequest) {
+            ModifyDNRequest req = copyOfModifyDNRequest((ModifyDNRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof ModifyRequest) {
+            ModifyRequest req = (ModifyRequest) request;
+            ModifyRequest newReq = newModifyRequest(req.getName());
+            for (Modification mod : req.getModifications()) {
+                newReq.addModification(mod);
+            }
+            return copyControls(req, newReq, exclusions);
+        } else if (request instanceof PasswordModifyExtendedRequest) {
+            PasswordModifyExtendedRequest req =
+                    copyOfPasswordModifyExtendedRequest((PasswordModifyExtendedRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof PlainSASLBindRequest) {
+            PlainSASLBindRequest req = copyOfPlainSASLBindRequest((PlainSASLBindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof SearchRequest) {
+            SearchRequest req = copyOfSearchRequest((SearchRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof SimpleBindRequest) {
+            SimpleBindRequest req = copyOfSimpleBindRequest((SimpleBindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof StartTLSExtendedRequest) {
+            StartTLSExtendedRequest req = copyOfStartTLSExtendedRequest((StartTLSExtendedRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof UnbindRequest) {
+            UnbindRequest req = copyOfUnbindRequest((UnbindRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else if (request instanceof WhoAmIExtendedRequest) {
+            WhoAmIExtendedRequest req = copyOfWhoAmIExtendedRequest((WhoAmIExtendedRequest) request);
+            filterControls(req.getControls(), exclusions);
+            return req;
+        } else {
+            throw new LocalizedIllegalArgumentException(CoreMessages.ERR_UNKNOWN_REQUEST_TYPE.get(request));
+        }
+    }
+
+    private static Request copyControls(Request source, Request destination, List<String> excludeControlOids) {
+        for (final Control control : source.getControls()) {
+            if (!excludeControlOids.contains(control.getOID())) {
+                destination.addControl(control);
+            }
+        }
+        return destination;
+    }
+
+    private static void filterControls(List<Control> controls, List<String> excludedControlOids) {
+        for (Iterator<Control> it = controls.iterator(); it.hasNext();) {
+            Control control = it.next();
+            if (excludedControlOids.contains(control.getOID())) {
+                it.remove();
+            }
+        }
+    }
+
+    /**
      * Creates a new abandon request that is an exact copy of the provided
      * request.
      *
diff --git a/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties b/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
index c23e254..92c89d6 100644
--- a/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
+++ b/opendj-core/src/main/resources/com/forgerock/opendj/ldap/core.properties
@@ -1715,4 +1715,12 @@
 ERR_CERT_NO_MATCH_SUBJECT=The host name contained in the subject DN '%s' \
   does not match the host name '%s'
 ERR_ATTRIBUTE_PARSER_MISSING_ATTRIBUTE=The entry could not be parsed because the '%s' is missing
-ERR_CONNECTION_UNEXPECTED=An error occurred during establishment of a connection: %s
\ No newline at end of file
+ERR_CONNECTION_UNEXPECTED=An error occurred during establishment of a connection: %s
+ERR_CONNECTION_AFFINITY_CONTROL_BAD_OID=Cannot decode the provided \
+ control as a connection affinity control because it contained the OID '%s', \
+ when '%s' was expected
+ERR_CONNECTION_AFFINITY_CONTROL_DECODE_NULL=Cannot decode the provided ASN.1 \
+ element as a connection affinity control because it did not have a value, when \
+ a value must always be provided
+ERR_UNKNOWN_REQUEST_TYPE=The following request has a unknown request type: %s
+WARN_DECODING_AFFINITY_CONTROL=An error occurred while decoding an affinity control: %s
\ No newline at end of file
diff --git a/opendj-core/src/test/java/com/forgerock/opendj/ldap/controls/AffinityControlTestCase.java b/opendj-core/src/test/java/com/forgerock/opendj/ldap/controls/AffinityControlTestCase.java
new file mode 100644
index 0000000..2b0b077
--- /dev/null
+++ b/opendj-core/src/test/java/com/forgerock/opendj/ldap/controls/AffinityControlTestCase.java
@@ -0,0 +1,71 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package com.forgerock.opendj.ldap.controls;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.Connection;
+import org.forgerock.opendj.ldap.DN;
+import org.forgerock.opendj.ldap.Filter;
+import org.forgerock.opendj.ldap.SearchScope;
+import org.forgerock.opendj.ldap.TestCaseUtils;
+import org.forgerock.opendj.ldap.controls.ControlsTestCase;
+import org.forgerock.opendj.ldap.requests.Requests;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.responses.SearchResultEntry;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@SuppressWarnings("javadoc")
+public class AffinityControlTestCase extends ControlsTestCase {
+
+    @Test
+    public void testControl() throws Exception {
+        // Send this control with a search request to ensure it is encoded/decoded correctly
+        final SearchRequest req = Requests.newSearchRequest(DN.valueOf("uid=user.1,ou=people,o=test"),
+                SearchScope.BASE_OBJECT, Filter.objectClassPresent());
+        final AffinityControl control = AffinityControl.newControl(ByteString.valueOfUtf8("value"), false);
+        req.addControl(control);
+        try (Connection con = TestCaseUtils.getInternalConnection()) {
+            final List<SearchResultEntry> entries = new ArrayList<>();
+            con.search(req, entries);
+            assertThat(entries).hasAtLeastOneElementOfType(SearchResultEntry.class);
+            final SearchResultEntry entry = entries.get(0);
+            assertThat(entry.getControls()).hasSize(0);
+        }
+    }
+
+    @Test
+    public void testControlGeneratesRandomValue() throws Exception {
+        final AffinityControl control = AffinityControl.newControl(true);
+        assertTrue(control.isCritical());
+        ByteString value1 = control.getAffinityValue();
+        assertThat(value1).isNotNull();
+        assertThat(value1.length()).isNotZero();
+
+        final AffinityControl control2 = AffinityControl.newControl(false);
+        ByteString value2 = control2.getAffinityValue();
+        assertFalse(control2.isCritical());
+        assertThat(value2).isNotNull();
+        assertThat(value2.length()).isNotZero();
+
+        assertThat(value1).isNotEqualTo(value2);
+    }
+}
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
index 5d72977..c1bbae9 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionsTestCase.java
@@ -17,12 +17,14 @@
 
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.forgerock.opendj.ldap.Connections.newShardedRequestLoadBalancerFunction;
+import static org.forgerock.opendj.ldap.Connections.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
+import org.forgerock.opendj.ldap.Connections.LeastRequestsDispatcher;
+import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex;
 import org.forgerock.opendj.ldap.requests.AddRequest;
 import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest;
 import org.forgerock.opendj.ldap.requests.CompareRequest;
@@ -35,12 +37,15 @@
 import org.forgerock.opendj.ldap.requests.PasswordModifyExtendedRequest;
 import org.forgerock.opendj.ldap.requests.PlainSASLBindRequest;
 import org.forgerock.opendj.ldap.requests.Request;
+import org.forgerock.opendj.ldap.requests.Requests;
 import org.forgerock.opendj.ldap.requests.SearchRequest;
 import org.forgerock.opendj.ldap.requests.SimpleBindRequest;
 import org.forgerock.util.Function;
 import org.forgerock.util.promise.NeverThrowsException;
 import org.testng.annotations.Test;
 
+import com.forgerock.opendj.ldap.controls.AffinityControl;
+
 @SuppressWarnings("javadoc")
 public class ConnectionsTestCase extends SdkTestCase {
 
@@ -70,8 +75,8 @@
 
     @Test
     public void shardedRequestLoadBalancerUsesConsistentIndexing() {
-        final Function<Request, Integer, NeverThrowsException> f =
-                newShardedRequestLoadBalancerFunction(asList(mock(ConnectionFactory.class),
+        final Function<Request, RequestWithIndex, NeverThrowsException> f =
+                newShardedRequestLoadBalancerNextFunction(asList(mock(ConnectionFactory.class),
                                                              mock(ConnectionFactory.class)));
 
         // These two DNs have a different hash code.
@@ -138,7 +143,75 @@
         assertThat(index(f, genericExtendedRequest)).isBetween(0, 1);
     }
 
-    private void assertRequestsAreRoutedConsistently(final Function<Request, Integer, NeverThrowsException> f,
+    @Test
+    public void leastRequestsDispatcherMustChooseTheLessSaturatedServer() {
+        LeastRequestsDispatcher dispatcher = new Connections.LeastRequestsDispatcher(3);
+        Function<Request, RequestWithIndex, NeverThrowsException> next =
+                newLeastRequestsLoadBalancerNextFunction(dispatcher);
+        Function<Integer, Void, NeverThrowsException> end =
+                newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher);
+
+        final SearchRequest[] reqs = new SearchRequest[11];
+        for (int i = 0; i < reqs.length; i++) {
+            reqs[i] = mock(SearchRequest.class);
+        }
+        assertThat(next.apply(reqs[0]).getServerIndex()).isEqualTo(0);  // number of reqs = [1, 0, 0]
+        assertThat(next.apply(reqs[1]).getServerIndex()).isEqualTo(1);  // number of reqs = [1, 1, 0]
+        assertThat(next.apply(reqs[2]).getServerIndex()).isEqualTo(2);  // number of reqs = [1, 1, 1]
+        end.apply(1);                                                   // number of reqs = [1, 0, 1]
+        assertThat(next.apply(reqs[3]).getServerIndex()).isEqualTo(1);  // number of reqs = [1, 1, 1]
+        end.apply(1);                                                   // number of reqs = [1, 0, 1]
+        assertThat(next.apply(reqs[5]).getServerIndex()).isEqualTo(1);  // number of reqs = [1, 1, 1]
+        assertThat(next.apply(reqs[6]).getServerIndex()).isEqualTo(0);  // number of reqs = [2, 1, 1]
+        assertThat(next.apply(reqs[7]).getServerIndex()).isEqualTo(1);  // number of reqs = [2, 2, 1]
+        assertThat(next.apply(reqs[8]).getServerIndex()).isEqualTo(2);  // number of reqs = [2, 2, 2]
+        assertThat(next.apply(reqs[9]).getServerIndex()).isEqualTo(0);  // number of reqs = [3, 2, 2]
+        end.apply(2);                                                   // number of reqs = [3, 2, 1]
+        assertThat(next.apply(reqs[10]).getServerIndex()).isEqualTo(2); // number of reqs = [3, 2, 2]
+    }
+
+    @Test
+    public void leastRequestsDispatcherMustTakeConnectionAffinityControlIntoAccount() {
+        LeastRequestsDispatcher dispatcher = new Connections.LeastRequestsDispatcher(3);
+        Function<Request, RequestWithIndex, NeverThrowsException> next =
+                newLeastRequestsLoadBalancerNextFunction(dispatcher);
+
+        final Request[] reqs = new Request[11];
+        for (int i = 0; i < reqs.length; i++) {
+            reqs[i] = Requests.newDeleteRequest("o=example");
+        }
+        assertThat(next.apply(reqs[0]).getServerIndex()).isEqualTo(0); // number of reqs = [1, 0, 0]
+
+        // control should now force the same connection three times
+        // control should be removed from the RequestWithIndex object used to perform the actual query
+        for (int i = 1; i <= 3; i++) {
+            reqs[i].addControl(AffinityControl.newControl(ByteString.valueOfUtf8("val"), false));
+        }
+        RequestWithIndex req1 = next.apply(reqs[1]);
+        assertThat(req1.getServerIndex()).isEqualTo(0); // number of reqs = [2, 0, 0]
+        assertThat(req1.getRequest().getControls()).isEmpty();
+        assertThat(reqs[1].getControls()).hasSize(1);
+
+        RequestWithIndex req2 = next.apply(reqs[2]);
+        assertThat(req2.getServerIndex()).isEqualTo(0); // number of reqs = [3, 0, 0]
+        assertThat(req2.getRequest().getControls()).isEmpty();
+        assertThat(reqs[2].getControls()).hasSize(1);
+
+        RequestWithIndex req3 = next.apply(reqs[3]);
+        assertThat(req3.getServerIndex()).isEqualTo(0); // number of reqs = [4, 0, 0]
+        assertThat(req3.getRequest().getControls()).isEmpty();
+        assertThat(reqs[3].getControls()).hasSize(1);
+
+        // back to default "saturation-based" behavior
+        assertThat(next.apply(reqs[4]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 1, 0]
+        assertThat(next.apply(reqs[5]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 1, 1]
+        assertThat(next.apply(reqs[6]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 2, 1]
+        assertThat(next.apply(reqs[7]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 2, 2]
+        assertThat(next.apply(reqs[8]).getServerIndex()).isEqualTo(1); // number of reqs = [4, 3, 2]
+        assertThat(next.apply(reqs[9]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 3, 3]
+    }
+
+    private void assertRequestsAreRoutedConsistently(final Function<Request, RequestWithIndex, NeverThrowsException> f,
                                                      final Request r,
                                                      final int firstExpectedIndex,
                                                      final int secondExpectedIndex) {
@@ -146,7 +219,7 @@
         assertThat(index(f, r)).isEqualTo(secondExpectedIndex);
     }
 
-    private int index(final Function<Request, Integer, NeverThrowsException> function, final Request request) {
-        return function.apply(request);
+    private int index(final Function<Request, RequestWithIndex, NeverThrowsException> function, final Request request) {
+        return function.apply(request).getServerIndex();
     }
 }
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
index e1e6d0b..6aaa060 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/RequestLoadBalancerTestCase.java
@@ -37,6 +37,7 @@
 import java.util.ArrayList;
 import java.util.logging.Level;
 
+import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex;
 import org.forgerock.opendj.ldap.requests.AbandonRequest;
 import org.forgerock.opendj.ldap.requests.AddRequest;
 import org.forgerock.opendj.ldap.requests.BindRequest;
@@ -850,33 +851,34 @@
         stub(this.connection3);
         loadBalancer = new RequestLoadBalancer("Test",
                                                asList(factory1, factory2, factory3),
-                                               defaultOptions(), newNextFactoryFunction());
+                                               defaultOptions(), newNextFactoryFunction(),
+                                               Connections.NOOP_END_OF_REQUEST_FUNCTION);
     }
 
-    private Function<Request, Integer, NeverThrowsException> newNextFactoryFunction() {
-        return new Function<Request, Integer, NeverThrowsException>() {
+    private Function<Request, RequestWithIndex, NeverThrowsException> newNextFactoryFunction() {
+        return new Function<Request, RequestWithIndex, NeverThrowsException>() {
             @Override
-            public Integer apply(final Request request) {
+            public RequestWithIndex apply(final Request request) {
                 if (request == addRequest1 || request == bindRequest1 || request == compareRequest1
                         || request == deleteRequest1 || request == extendedRequest1 || request == modifyRequest1
                         || request == modifyDNRequest1 || request == searchRequest1) {
-                    return 0;
+                    return new RequestWithIndex(request, 0);
                 }
 
                 if (request == addRequest2 || request == bindRequest2 || request == compareRequest2
                         || request == deleteRequest2 || request == extendedRequest2 || request == modifyRequest2
                         || request == modifyDNRequest2 || request == searchRequest2) {
-                    return 1;
+                    return new RequestWithIndex(request, 1);
                 }
 
                 if (request == addRequest3 || request == bindRequest3 || request == compareRequest3
                         || request == deleteRequest3 || request == extendedRequest3 || request == modifyRequest3
                         || request == modifyDNRequest3 || request == searchRequest3) {
-                    return 2;
+                    return new RequestWithIndex(request, 2);
                 }
 
                 fail("Received unexpected request");
-                return -1; // Keep compiler happy.
+                return new RequestWithIndex(request, -1); // Keep compiler happy.
             }
         };
     }

--
Gitblit v1.10.0