| | |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicLongArray; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex; |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.PartitionedRequest; |
| | | import org.forgerock.opendj.ldap.requests.AddRequest; |
| | | import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest; |
| | | import org.forgerock.opendj.ldap.requests.CompareRequest; |
| | |
| | | NOOP_END_OF_REQUEST_FUNCTION); |
| | | } |
| | | |
| | | static Function<Request, RequestWithIndex, NeverThrowsException> newAffinityRequestLoadBalancerNextFunction( |
| | | static Function<Request, PartitionedRequest, NeverThrowsException> newAffinityRequestLoadBalancerNextFunction( |
| | | final Collection<? extends ConnectionFactory> factories) { |
| | | return new Function<Request, RequestWithIndex, NeverThrowsException>() { |
| | | private final int maxIndex = factories.size(); |
| | | return new Function<Request, PartitionedRequest, NeverThrowsException>() { |
| | | private final int maxPartitionId = factories.size(); |
| | | |
| | | @Override |
| | | public RequestWithIndex apply(final Request request) { |
| | | final int index = computePartitionIdFromDN(dnOfRequest(request), maxIndex); |
| | | return new RequestWithIndex(request, index); |
| | | public PartitionedRequest apply(final Request request) { |
| | | final int partitionId = computePartitionIdFromDN(dnOfRequest(request), maxPartitionId); |
| | | return new PartitionedRequest(request, partitionId); |
| | | } |
| | | }; |
| | | } |
| | |
| | | * @return A partition ID in the range 0 <= partitionID < numberOfPartitions. |
| | | */ |
| | | private static int computePartitionIdFromDN(final DN dn, final int numberOfPartitions) { |
| | | final int index = dn != null ? dn.hashCode() : ThreadLocalRandom.current().nextInt(0, numberOfPartitions); |
| | | return index == Integer.MIN_VALUE ? 0 : (Math.abs(index) % numberOfPartitions); |
| | | final int partitionId = dn != null ? dn.hashCode() : ThreadLocalRandom.current().nextInt(0, numberOfPartitions); |
| | | return partitionId == Integer.MIN_VALUE ? 0 : (Math.abs(partitionId) % numberOfPartitions); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | private static final DecodeOptions CONTROL_DECODE_OPTIONS = new DecodeOptions(); |
| | | |
| | | static Function<Request, RequestWithIndex, NeverThrowsException> newLeastRequestsLoadBalancerNextFunction( |
| | | static Function<Request, PartitionedRequest, NeverThrowsException> newLeastRequestsLoadBalancerNextFunction( |
| | | final LeastRequestsDispatcher dispatcher) { |
| | | return new Function<Request, RequestWithIndex, NeverThrowsException>() { |
| | | return new Function<Request, PartitionedRequest, NeverThrowsException>() { |
| | | private final int maxIndex = dispatcher.size(); |
| | | |
| | | @Override |
| | | public RequestWithIndex apply(final Request request) { |
| | | public PartitionedRequest apply(final Request request) { |
| | | int affinityBasedIndex = parseAffinityRequestControl(request); |
| | | int finalIndex = dispatcher.selectServer(affinityBasedIndex); |
| | | Request cleanedRequest = (affinityBasedIndex == -1) |
| | | ? request : Requests.shallowCopyOfRequest(request, AffinityControl.OID); |
| | | return new RequestWithIndex(cleanedRequest, finalIndex); |
| | | return new PartitionedRequest(cleanedRequest, finalIndex); |
| | | } |
| | | |
| | | private int parseAffinityRequestControl(final Request request) { |
| | |
| | | * a control for example) but the original request should not be modified. The new request must be used |
| | | * for the actual LDAP operation. |
| | | */ |
| | | private final Function<Request, RequestWithIndex, NeverThrowsException> nextFactoryFunction; |
| | | private final Function<Request, PartitionedRequest, NeverThrowsException> nextFactoryFunction; |
| | | /** A function which is called after a request is terminated. */ |
| | | private final Function<Integer, Void, NeverThrowsException> endOfRequestFunction; |
| | | |
| | | RequestLoadBalancer(final String loadBalancerName, |
| | | final Collection<? extends ConnectionFactory> factories, |
| | | final Options options, |
| | | final Function<Request, RequestWithIndex, NeverThrowsException> nextFactoryFunction, |
| | | final Function<Request, PartitionedRequest, NeverThrowsException> nextFactoryFunction, |
| | | final Function<Integer, Void, NeverThrowsException> endOfRequestFunction) { |
| | | super(loadBalancerName, factories, options); |
| | | this.nextFactoryFunction = nextFactoryFunction; |
| | |
| | | }); |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | @Override |
| | | public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync( |
| | | final ExtendedRequest<R> request, final IntermediateResponseHandler intermediateResponseHandler) { |
| | |
| | | throw new IllegalStateException(); |
| | | } |
| | | try { |
| | | final RequestWithIndex requestWithIndex = nextFactoryFunction.apply(request); |
| | | final ConnectionFactory factory = getMonitoredConnectionFactory(requestWithIndex.getServerIndex()); |
| | | final PartitionedRequest partitionedRequest = nextFactoryFunction.apply(request); |
| | | final ConnectionFactory factory = getMonitoredConnectionFactory(partitionedRequest.getServerIndex()); |
| | | return new ConnectionContext( |
| | | LdapPromises.asPromise(factory.getConnectionAsync() |
| | | .thenOnException(new ExceptionHandler<LdapException>() { |
| | |
| | | public void handleException(final LdapException e) { |
| | | state.notifyConnectionError(false, e); |
| | | } |
| | | })), |
| | | requestWithIndex); |
| | | })), partitionedRequest); |
| | | } catch (final LdapException e) { |
| | | state.notifyConnectionError(false, e); |
| | | LdapPromise<Connection> failedLdapPromise = newFailedLdapPromise(e); |
| | | return new ConnectionContext(failedLdapPromise, new RequestWithIndex(request, -1)); |
| | | return new ConnectionContext(failedLdapPromise, new PartitionedRequest(request, -1)); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** Utility class for a request and a server index. */ |
| | | static class RequestWithIndex { |
| | | static class PartitionedRequest { |
| | | private final Request request; |
| | | /** The index of server chosen for the connection. */ |
| | | private final int serverIndex; |
| | | |
| | | RequestWithIndex(Request request, int serverIndex) { |
| | | PartitionedRequest(Request request, int serverIndex) { |
| | | this.serverIndex = serverIndex; |
| | | this.request = request; |
| | | } |
| | |
| | | private static class ConnectionContext { |
| | | private final AtomicReference<Connection> connectionHolder = new AtomicReference<>(); |
| | | private final LdapPromise<Connection> connectionPromise; |
| | | private final RequestWithIndex requestWithIndex; |
| | | private final PartitionedRequest partitionedRequest; |
| | | |
| | | ConnectionContext(LdapPromise<Connection> connectionPromise, RequestWithIndex requestWithIndex) { |
| | | this.requestWithIndex = requestWithIndex; |
| | | ConnectionContext(LdapPromise<Connection> connectionPromise, PartitionedRequest partitionedRequest) { |
| | | this.partitionedRequest = partitionedRequest; |
| | | this.connectionPromise = connectionPromise; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | int getServerIndex() { |
| | | return requestWithIndex.getServerIndex(); |
| | | return partitionedRequest.getServerIndex(); |
| | | } |
| | | |
| | | Request getRequest() { |
| | | return requestWithIndex.getRequest(); |
| | | return partitionedRequest.getRequest(); |
| | | } |
| | | } |
| | | } |
| | |
| | | import static org.mockito.Mockito.when; |
| | | |
| | | import org.forgerock.opendj.ldap.Connections.LeastRequestsDispatcher; |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex; |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.PartitionedRequest; |
| | | import org.forgerock.opendj.ldap.requests.AddRequest; |
| | | import org.forgerock.opendj.ldap.requests.CRAMMD5SASLBindRequest; |
| | | import org.forgerock.opendj.ldap.requests.CompareRequest; |
| | |
| | | |
| | | @Test |
| | | public void affinityRequestLoadBalancerUsesConsistentIndexing() { |
| | | final Function<Request, RequestWithIndex, NeverThrowsException> f = |
| | | final Function<Request, PartitionedRequest, NeverThrowsException> f = |
| | | newAffinityRequestLoadBalancerNextFunction(asList(mock(ConnectionFactory.class), |
| | | mock(ConnectionFactory.class))); |
| | | |
| | |
| | | @Test |
| | | public void leastRequestsDispatcherMustChooseTheLessSaturatedServer() { |
| | | LeastRequestsDispatcher dispatcher = new Connections.LeastRequestsDispatcher(3); |
| | | Function<Request, RequestWithIndex, NeverThrowsException> next = |
| | | Function<Request, PartitionedRequest, NeverThrowsException> next = |
| | | newLeastRequestsLoadBalancerNextFunction(dispatcher); |
| | | Function<Integer, Void, NeverThrowsException> end = |
| | | newLeastRequestsLoadBalancerEndOfRequestFunction(dispatcher); |
| | |
| | | @Test |
| | | public void leastRequestsDispatcherMustTakeConnectionAffinityControlIntoAccount() { |
| | | LeastRequestsDispatcher dispatcher = new Connections.LeastRequestsDispatcher(3); |
| | | Function<Request, RequestWithIndex, NeverThrowsException> next = |
| | | Function<Request, PartitionedRequest, NeverThrowsException> next = |
| | | newLeastRequestsLoadBalancerNextFunction(dispatcher); |
| | | |
| | | final Request[] reqs = new Request[11]; |
| | |
| | | for (int i = 1; i <= 3; i++) { |
| | | reqs[i].addControl(AffinityControl.newControl(ByteString.valueOfUtf8("val"), false)); |
| | | } |
| | | RequestWithIndex req1 = next.apply(reqs[1]); |
| | | PartitionedRequest req1 = next.apply(reqs[1]); |
| | | assertThat(req1.getServerIndex()).isEqualTo(0); // number of reqs = [2, 0, 0] |
| | | assertThat(req1.getRequest().getControls()).isEmpty(); |
| | | assertThat(reqs[1].getControls()).hasSize(1); |
| | | |
| | | RequestWithIndex req2 = next.apply(reqs[2]); |
| | | PartitionedRequest req2 = next.apply(reqs[2]); |
| | | assertThat(req2.getServerIndex()).isEqualTo(0); // number of reqs = [3, 0, 0] |
| | | assertThat(req2.getRequest().getControls()).isEmpty(); |
| | | assertThat(reqs[2].getControls()).hasSize(1); |
| | | |
| | | RequestWithIndex req3 = next.apply(reqs[3]); |
| | | PartitionedRequest req3 = next.apply(reqs[3]); |
| | | assertThat(req3.getServerIndex()).isEqualTo(0); // number of reqs = [4, 0, 0] |
| | | assertThat(req3.getRequest().getControls()).isEmpty(); |
| | | assertThat(reqs[3].getControls()).hasSize(1); |
| | |
| | | assertThat(next.apply(reqs[9]).getServerIndex()).isEqualTo(2); // number of reqs = [4, 3, 3] |
| | | } |
| | | |
| | | private void assertRequestsAreRoutedConsistently(final Function<Request, RequestWithIndex, NeverThrowsException> f, |
| | | final Request r, |
| | | final int firstExpectedIndex, |
| | | final int secondExpectedIndex) { |
| | | private void assertRequestsAreRoutedConsistently( |
| | | final Function<Request, PartitionedRequest, NeverThrowsException> f, final Request r, |
| | | final int firstExpectedIndex, final int secondExpectedIndex) { |
| | | assertThat(index(f, r)).isEqualTo(firstExpectedIndex); |
| | | assertThat(index(f, r)).isEqualTo(secondExpectedIndex); |
| | | } |
| | | |
| | | private int index(final Function<Request, RequestWithIndex, NeverThrowsException> function, final Request request) { |
| | | private int index(final Function<Request, PartitionedRequest, NeverThrowsException> function, |
| | | final Request request) { |
| | | return function.apply(request).getServerIndex(); |
| | | } |
| | | } |
| | |
| | | import java.util.ArrayList; |
| | | import java.util.logging.Level; |
| | | |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.RequestWithIndex; |
| | | import org.forgerock.opendj.ldap.RequestLoadBalancer.PartitionedRequest; |
| | | import org.forgerock.opendj.ldap.requests.AbandonRequest; |
| | | import org.forgerock.opendj.ldap.requests.AddRequest; |
| | | import org.forgerock.opendj.ldap.requests.BindRequest; |
| | |
| | | Connections.NOOP_END_OF_REQUEST_FUNCTION); |
| | | } |
| | | |
| | | private Function<Request, RequestWithIndex, NeverThrowsException> newNextFactoryFunction() { |
| | | return new Function<Request, RequestWithIndex, NeverThrowsException>() { |
| | | private Function<Request, PartitionedRequest, NeverThrowsException> newNextFactoryFunction() { |
| | | return new Function<Request, PartitionedRequest, NeverThrowsException>() { |
| | | @Override |
| | | public RequestWithIndex apply(final Request request) { |
| | | public PartitionedRequest apply(final Request request) { |
| | | if (request == addRequest1 || request == bindRequest1 || request == compareRequest1 |
| | | || request == deleteRequest1 || request == extendedRequest1 || request == modifyRequest1 |
| | | || request == modifyDNRequest1 || request == searchRequest1) { |
| | | return new RequestWithIndex(request, 0); |
| | | return new PartitionedRequest(request, 0); |
| | | } |
| | | |
| | | if (request == addRequest2 || request == bindRequest2 || request == compareRequest2 |
| | | || request == deleteRequest2 || request == extendedRequest2 || request == modifyRequest2 |
| | | || request == modifyDNRequest2 || request == searchRequest2) { |
| | | return new RequestWithIndex(request, 1); |
| | | return new PartitionedRequest(request, 1); |
| | | } |
| | | |
| | | if (request == addRequest3 || request == bindRequest3 || request == compareRequest3 |
| | | || request == deleteRequest3 || request == extendedRequest3 || request == modifyRequest3 |
| | | || request == modifyDNRequest3 || request == searchRequest3) { |
| | | return new RequestWithIndex(request, 2); |
| | | return new PartitionedRequest(request, 2); |
| | | } |
| | | |
| | | fail("Received unexpected request"); |
| | | return new RequestWithIndex(request, -1); // Keep compiler happy. |
| | | return new PartitionedRequest(request, -1); // Keep compiler happy. |
| | | } |
| | | }; |
| | | } |