mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
27.12.2013 e6d5d49cd785f726aaf7d530a86559cc832491c2
OPENDJ-832 Leverage the work queue for processing requests received on the HTTP connection handler

AsynchronousFutureResult.java:
Added a generic type parameter for the ResultHandler.
Added getRequestHandler() method.

Made all the necessary changes due to introducing this change.
7 files modified
91 ■■■■■ changed files
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java 8 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/AsynchronousFutureResult.java 25 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/RecursiveFutureResult.java 13 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java 7 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java 13 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java 10 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java 15 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
@@ -44,8 +44,10 @@
 * @param <S>
 *            The type of result returned by this future.
 */
abstract class AbstractLDAPFutureResultImpl<S extends Result> extends AsynchronousFutureResult<S>
abstract class AbstractLDAPFutureResultImpl<S extends Result>
        extends AsynchronousFutureResult<S, ResultHandler<? super S>>
        implements IntermediateResponseHandler {
    private final Connection connection;
    private final int requestID;
@@ -54,7 +56,8 @@
    private volatile long timestamp;
    AbstractLDAPFutureResultImpl(final int requestID, final ResultHandler<? super S> resultHandler,
    AbstractLDAPFutureResultImpl(final int requestID,
        final ResultHandler<? super S> resultHandler,
            final IntermediateResponseHandler intermediateResponseHandler,
            final Connection connection) {
        super(resultHandler);
@@ -72,6 +75,7 @@
        return requestID;
    }
    /** {@inheritDoc} */
    @Override
    public final boolean handleIntermediateResponse(final IntermediateResponse response) {
        // FIXME: there's a potential race condition here - the future could
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/AsynchronousFutureResult.java
@@ -26,7 +26,7 @@
package com.forgerock.opendj.util;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -63,9 +63,13 @@
 * </ul>
 *
 * @param <M>
 *            The type of result returned by this completion future.
 *          The type of result returned by this future.
 * @param <H>
 *          The type of {@link ResultHandler} associated to this future.
 */
public class AsynchronousFutureResult<M> implements FutureResult<M>, ResultHandler<M> {
public class AsynchronousFutureResult<M, H extends ResultHandler<? super M>> implements
    FutureResult<M>, ResultHandler<M> {
    @SuppressWarnings("serial")
    private final class Sync extends AbstractQueuedSynchronizer {
        // State value representing the initial state before a result has
@@ -212,7 +216,7 @@
    private final Sync sync = new Sync();
    private final ResultHandler<? super M> handler;
    private final H handler;
    private final int requestID;
@@ -224,7 +228,7 @@
     *            A result handler which will be forwarded the result or error
     *            when it arrives, may be {@code null}.
     */
    public AsynchronousFutureResult(final ResultHandler<? super M> handler) {
    public AsynchronousFutureResult(final H handler) {
        this(handler, -1);
    }
@@ -239,7 +243,7 @@
     *            The request ID which will be returned by the default
     *            implementation of {@link #getRequestID}.
     */
    public AsynchronousFutureResult(final ResultHandler<? super M> handler, final int requestID) {
    public AsynchronousFutureResult(final H handler, final int requestID) {
        this.handler = handler;
        this.requestID = requestID;
    }
@@ -270,6 +274,15 @@
    }
    /**
     * Returns the request handler associated to this FutureResult.
     *
     * @return the request handler associated to this FutureResult.
     */
    public H getRequestHandler() {
        return handler;
    }
    /**
     * {@inheritDoc}
     * <p>
     * The default implementation returns the request ID passed in during
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/util/RecursiveFutureResult.java
@@ -47,11 +47,14 @@
 *            The type of the outer result.
 */
public abstract class RecursiveFutureResult<M, N> implements FutureResult<N>, ResultHandler<M> {
    private final class FutureResultImpl extends AsynchronousFutureResult<N> {
    private final class FutureResultImpl extends AsynchronousFutureResult<N, ResultHandler<? super N>> {
        private FutureResultImpl(final ResultHandler<? super N> handler) {
            super(handler);
        }
        @Override
        public int getRequestID() {
            if (innerFuture instanceof FutureResult<?>) {
                final FutureResult<?> tmp = (FutureResult<?>) innerFuture;
@@ -97,6 +100,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public final boolean cancel(final boolean mayInterruptIfRunning) {
        return impl.cancel(mayInterruptIfRunning);
    }
@@ -104,6 +108,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public final N get() throws ErrorResultException, InterruptedException {
        return impl.get();
    }
@@ -111,6 +116,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public final N get(final long timeout, final TimeUnit unit) throws ErrorResultException,
            TimeoutException, InterruptedException {
        return impl.get(timeout, unit);
@@ -119,6 +125,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public final int getRequestID() {
        return impl.getRequestID();
    }
@@ -126,6 +133,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public final void handleErrorResult(final ErrorResultException error) {
        try {
            outerFuture = chainErrorResult(error, impl);
@@ -137,6 +145,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public final void handleResult(final M result) {
        try {
            outerFuture = chainResult(result, impl);
@@ -148,6 +157,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isCancelled() {
        return impl.isCancelled();
    }
@@ -155,6 +165,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isDone() {
        return impl.isDone();
    }
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -27,7 +27,7 @@
package org.forgerock.opendj.ldap;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.ArrayList;
import java.util.Collection;
@@ -70,6 +70,7 @@
        /**
         * {@inheritDoc}
         */
        @Override
        public Connection getConnection() throws ErrorResultException {
            final Connection connection;
            try {
@@ -92,8 +93,8 @@
        @Override
        public FutureResult<Connection> getConnectionAsync(
                final ResultHandler<? super Connection> resultHandler) {
            final AsynchronousFutureResult<Connection> future =
                    new AsynchronousFutureResult<Connection>(resultHandler);
            final AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> future =
                   new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(resultHandler);
            final ResultHandler<Connection> failoverHandler = new ResultHandler<Connection>() {
                @Override
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -27,9 +27,10 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static com.forgerock.opendj.util.StaticUtils.*;
import static org.forgerock.opendj.ldap.CoreMessages.*;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.Collection;
import java.util.LinkedList;
@@ -536,7 +537,7 @@
        }
        QueueElement(final ResultHandler<? super Connection> handler) {
            this.value = new AsynchronousFutureResult<Connection>(handler);
            this.value = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler);
        }
        @Override
@@ -553,8 +554,8 @@
        }
        @SuppressWarnings("unchecked")
        AsynchronousFutureResult<Connection> getWaitingFuture() {
            return (AsynchronousFutureResult<Connection>) value;
        AsynchronousFutureResult<Connection, ResultHandler<? super Connection>> getWaitingFuture() {
            return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value;
        }
        boolean isWaitingFuture() {
opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -27,9 +27,10 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static java.lang.System.currentTimeMillis;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static com.forgerock.opendj.util.StaticUtils.*;
import static java.lang.System.*;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import java.util.Collection;
import java.util.LinkedList;
@@ -85,7 +86,8 @@
         * @param <R>
         *            The type of result returned by the request.
         */
        private abstract class DelayedFuture<R extends Result> extends AsynchronousFutureResult<R>
        private abstract class DelayedFuture<R extends Result>
                extends AsynchronousFutureResult<R, ResultHandler<? super R>>
                implements Runnable {
            private volatile FutureResult<R> innerFuture = null;
opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -24,12 +24,11 @@
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.fest.assertions.Assertions.*;
import static org.fest.assertions.Fail.*;
import static org.forgerock.opendj.ldap.TestCaseUtils.*;
import java.net.InetSocketAddress;
import java.util.Arrays;
@@ -65,10 +64,10 @@
public class LDAPListenerTestCase extends SdkTestCase {
    private static class MockServerConnection implements ServerConnection<Integer> {
        final AsynchronousFutureResult<Throwable> connectionError =
                new AsynchronousFutureResult<Throwable>(null);
        final AsynchronousFutureResult<LDAPClientContext> context =
                new AsynchronousFutureResult<LDAPClientContext>(null);
        final AsynchronousFutureResult<Throwable, ResultHandler<? super Throwable>> connectionError =
                new AsynchronousFutureResult<Throwable, ResultHandler<? super Throwable>>(null);
        final AsynchronousFutureResult<LDAPClientContext, ResultHandler<? super LDAPClientContext>> context =
                new AsynchronousFutureResult<LDAPClientContext, ResultHandler<? super LDAPClientContext>>(null);
        final CountDownLatch isClosed = new CountDownLatch(1);
        MockServerConnection() {