mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
28.32.2013 a33a4959bae9ae6b119d5049609e4b439f911ee1
Additional change for OPENDJ-354: Implement a RequestHandler which provides an in-memory backend

* add more internal connection factory methods to make it easier to bind connections to the memory backend during testing.
3 files modified
334 ■■■■■ changed files
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java 118 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java 206 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java 10 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -27,12 +27,15 @@
package org.forgerock.opendj.ldap;
import static org.forgerock.opendj.ldap.RequestHandlerFactoryAdapter.adaptRequestHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import com.forgerock.opendj.ldap.InternalConnection;
import com.forgerock.opendj.util.Validator;
/**
@@ -195,6 +198,94 @@
    }
    /**
     * Creates a new internal client connection which will route requests to the
     * provided {@code RequestHandler}.
     * <p>
     * When processing requests, {@code RequestHandler} implementations are
     * passed a {@code RequestContext} having a pseudo {@code requestID} which
     * is incremented for each successive internal request on a per client
     * connection basis. The request ID may be useful for logging purposes.
     * <p>
     * An internal connection does not require {@code RequestHandler}
     * implementations to return a result when processing requests. However, it
     * is recommended that implementations do always return results even for
     * abandoned requests. This is because application client threads may block
     * indefinitely waiting for results.
     *
     * @param requestHandler
     *            The request handler which will be used for all client
     *            connections.
     * @return The new internal connection.
     * @throws NullPointerException
     *             If {@code requestHandler} was {@code null}.
     */
    public static Connection newInternalConnection(
            final RequestHandler<RequestContext> requestHandler) {
        Validator.ensureNotNull(requestHandler);
        return newInternalConnection(adaptRequestHandler(requestHandler));
    }
    /**
     * Creates a new internal client connection which will route requests to the
     * provided {@code ServerConnection}.
     * <p>
     * When processing requests, {@code ServerConnection} implementations are
     * passed an integer as the first parameter. This integer represents a
     * pseudo {@code requestID} which is incremented for each successive
     * internal request on a per client connection basis. The request ID may be
     * useful for logging purposes.
     * <p>
     * An internal connection does not require {@code ServerConnection}
     * implementations to return a result when processing requests. However, it
     * is recommended that implementations do always return results even for
     * abandoned requests. This is because application client threads may block
     * indefinitely waiting for results.
     *
     * @param serverConnection
     *            The server connection.
     * @return The new internal connection.
     * @throws NullPointerException
     *             If {@code serverConnection} was {@code null}.
     */
    public static Connection newInternalConnection(final ServerConnection<Integer> serverConnection) {
        Validator.ensureNotNull(serverConnection);
        return new InternalConnection(serverConnection);
    }
    /**
     * Creates a new connection factory which binds internal client connections
     * to {@link RequestHandler}s created using the provided
     * {@link RequestHandlerFactory}.
     * <p>
     * When processing requests, {@code RequestHandler} implementations are
     * passed an integer as the first parameter. This integer represents a
     * pseudo {@code requestID} which is incremented for each successive
     * internal request on a per client connection basis. The request ID may be
     * useful for logging purposes.
     * <p>
     * An internal connection factory does not require {@code RequestHandler}
     * implementations to return a result when processing requests. However, it
     * is recommended that implementations do always return results even for
     * abandoned requests. This is because application client threads may block
     * indefinitely waiting for results.
     *
     * @param <C>
     *            The type of client context.
     * @param factory
     *            The request handler factory to use for creating connections.
     * @param clientContext
     *            The client context.
     * @return The new internal connection factory.
     * @throws NullPointerException
     *             If {@code factory} was {@code null}.
     */
    public static <C> ConnectionFactory newInternalConnectionFactory(
            final RequestHandlerFactory<C, RequestContext> factory, final C clientContext) {
        Validator.ensureNotNull(factory);
        return new InternalConnectionFactory<C>(newServerConnectionFactory(factory), clientContext);
    }
    /**
     * Creates a new connection factory which binds internal client connections
     * to {@link ServerConnection}s created using the provided
     * {@link ServerConnectionFactory}.
@@ -265,14 +356,14 @@
        return new ConnectionFactory() {
            @Override
            public FutureResult<Connection> getConnectionAsync(
                    final ResultHandler<? super Connection> handler) {
                return factory.getConnectionAsync(handler);
            public Connection getConnection() throws ErrorResultException {
                return factory.getConnection();
            }
            @Override
            public Connection getConnection() throws ErrorResultException {
                return factory.getConnection();
            public FutureResult<Connection> getConnectionAsync(
                    final ResultHandler<? super Connection> handler) {
                return factory.getConnectionAsync(handler);
            }
            /**
@@ -313,17 +404,12 @@
    public static <C> ServerConnectionFactory<C, Integer> newServerConnectionFactory(
            final RequestHandler<RequestContext> requestHandler) {
        Validator.ensureNotNull(requestHandler);
        final RequestHandlerFactory<C, RequestContext> factory =
                new RequestHandlerFactory<C, RequestContext>() {
                    public RequestHandler<RequestContext> handleAccept(C clientContext)
                            throws ErrorResultException {
                        return requestHandler;
                    }
                };
        return new RequestHandlerFactoryAdapter<C>(factory);
        return new RequestHandlerFactoryAdapter<C>(new RequestHandlerFactory<C, RequestContext>() {
            @Override
            public RequestHandler<RequestContext> handleAccept(final C clientContext) {
                return requestHandler;
            }
        });
    }
    /**
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
@@ -21,12 +21,18 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2011-2012 ForgeRock AS
 *      Copyright 2011-2013 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
import static org.forgerock.opendj.ldap.CoreMessages.*;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_ABANDON_REQUEST;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CANCEL_REQUEST;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_DISCONNECT;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_ERROR;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_SERVER_DISCONNECT;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CLIENT_CONNECTION_CLOSING;
import static org.forgerock.opendj.ldap.CoreMessages.WARN_CLIENT_DUPLICATE_MESSAGE_ID;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Iterator;
@@ -72,9 +78,10 @@
    private static class RequestContextImpl<S extends Result, H extends ResultHandler<? super S>>
            implements RequestContext, ResultHandler<S> {
        // Adapter class which invokes cancel result handlers with correct
        // result
        // type.
        /*
         * Adapter class which invokes cancel result handlers with correct
         * result type.
         */
        private static final class ExtendedResultHandlerHolder<R extends ExtendedResult> {
            private final ExtendedRequest<R> request;
            private final ResultHandler<? super R> resultHandler;
@@ -101,46 +108,46 @@
        }
        private static enum RequestState {
            // Request active
            PENDING,
            // Request active, cancel requested
            CANCEL_REQUESTED,
            // Request active, too late to cancel
            TOO_LATE,
            // Result sent, was cancelled
            CANCELLED,
            // Request active
            PENDING,
            // Result sent, not cancelled
            RESULT_SENT,
            // Result sent, was cancelled
            CANCELLED;
            // Request active, too late to cancel
            TOO_LATE;
        }
        private final int messageID;
        // Cancellation state guarded by lock.
        private final Object stateLock = new Object();
        protected final H resultHandler;
        // These should be notified when a cancel request arrives, at most once.
        private List<CancelRequestListener> cancelRequestListeners = null;
        private LocalizableMessage cancelRequestReason = null;
        // These should be notified when the result is set.
        private List<ExtendedResultHandlerHolder<?>> cancelResultHandlers = null;
        private RequestState state = RequestState.PENDING;
        private LocalizableMessage cancelRequestReason = null;
        private boolean sendResult = true;
        private final ServerConnectionImpl clientConnection;
        private final boolean isCancelSupported;
        private final ServerConnectionImpl<?> clientConnection;
        private final int messageID;
        protected final H resultHandler;
        private boolean sendResult = true;
        protected RequestContextImpl(final ServerConnectionImpl<?> clientConnection,
        private RequestState state = RequestState.PENDING;
        // Cancellation state guarded by lock.
        private final Object stateLock = new Object();
        protected RequestContextImpl(final ServerConnectionImpl clientConnection,
                final H resultHandler, final int messageID, final boolean isCancelSupported) {
            this.clientConnection = clientConnection;
            this.resultHandler = resultHandler;
@@ -171,9 +178,10 @@
                case TOO_LATE:
                case RESULT_SENT:
                case CANCELLED:
                    // No point in registering the callback since the request
                    // can never be
                    // cancelled now.
                    /*
                     * No point in registering the callback since the request
                     * can never be cancelled now.
                     */
                    break;
                }
            }
@@ -191,24 +199,28 @@
            synchronized (stateLock) {
                switch (state) {
                case PENDING:
                    // No cancel request, so no handlers, just switch state.
                    /* No cancel request, so no handlers, just switch state. */
                    if (signalTooLate) {
                        cancelRequestListeners = null;
                        state = RequestState.TOO_LATE;
                    }
                    break;
                case CANCEL_REQUESTED:
                    // Don't change state: let the handler ack the cancellation
                    // request.
                    /*
                     * Don't change state: let the handler ack the cancellation
                     * request.
                     */
                    throw (CancelledResultException) newErrorResult(ResultCode.CANCELLED,
                            cancelRequestReason.toString());
                case TOO_LATE:
                    // Already too late. Nothing to do.
                    /* Already too late. Nothing to do. */
                    break;
                case RESULT_SENT:
                case CANCELLED:
                    // This should not happen - could throw an illegal state
                    // exception?
                    /*
                     * This should not happen - could throw an illegal state
                     * exception?
                     */
                    break;
                }
            }
@@ -229,11 +241,12 @@
        public void handleErrorResult(final ErrorResultException error) {
            if (clientConnection.removePendingRequest(this)) {
                if (setResult(error.getResult())) {
                    // FIXME: we must invoke the result handler even when
                    // abandoned so
                    // that chained result handlers may clean up, log, etc. We
                    // really need
                    // to signal that the result must not be sent to the client.
                    /*
                     * FIXME: we must invoke the result handler even when
                     * abandoned so that chained result handlers may clean up,
                     * log, etc. We really need to signal that the result must
                     * not be sent to the client.
                     */
                }
                resultHandler.handleErrorResult(error);
            }
@@ -246,11 +259,12 @@
        public void handleResult(final S result) {
            if (clientConnection.removePendingRequest(this)) {
                if (setResult(result)) {
                    // FIXME: we must invoke the result handler even when
                    // abandoned so
                    // that chained result handlers may clean up, log, etc. We
                    // really need
                    // to signal that the result must not be sent to the client.
                    /*
                     * FIXME: we must invoke the result handler even when
                     * abandoned so that chained result handlers may clean up,
                     * log, etc. We really need to signal that the result must
                     * not be sent to the client.
                     */
                }
                resultHandler.handleResult(result);
            }
@@ -291,7 +305,7 @@
            synchronized (stateLock) {
                switch (state) {
                case PENDING:
                    // Switch to CANCEL_REQUESTED state.
                    /* Switch to CANCEL_REQUESTED state. */
                    cancelRequestReason = reason;
                    if (cancelResultHandler != null) {
                        cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -304,7 +318,9 @@
                    this.sendResult &= sendResult;
                    break;
                case CANCEL_REQUESTED:
                    // Cancel already request so listeners already invoked.
                    /*
                     * Cancel already request so listeners already invoked.
                     */
                    if (cancelResultHandler != null) {
                        if (cancelResultHandlers == null) {
                            cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -315,20 +331,22 @@
                    break;
                case TOO_LATE:
                case RESULT_SENT:
                    // Cannot cancel, so invoke result handler immediately
                    // outside of
                    // lock.
                    /*
                     * Cannot cancel, so invoke result handler immediately
                     * outside of lock.
                     */
                    if (cancelResultHandler != null) {
                        invokeResultHandler = true;
                        resultHandlerIsSuccess = false;
                    }
                    break;
                case CANCELLED:
                    // Multiple cancellation attempts. Clients should not do
                    // this, but the
                    // cancel will effectively succeed immediately, so invoke
                    // result
                    // handler immediately outside of lock.
                    /*
                     * Multiple cancellation attempts. Clients should not do
                     * this, but the cancel will effectively succeed
                     * immediately, so invoke result handler immediately outside
                     * of lock.
                     */
                    if (cancelResultHandler != null) {
                        invokeResultHandler = true;
                        resultHandlerIsSuccess = true;
@@ -337,7 +355,7 @@
                }
            }
            // Invoke listeners outside of lock.
            /* Invoke listeners outside of lock. */
            if (tmpListeners != null) {
                for (final CancelRequestListener listener : tmpListeners) {
                    listener.handleCancelRequest(reason);
@@ -376,7 +394,7 @@
                switch (state) {
                case PENDING:
                case TOO_LATE:
                    // Switch to appropriate final state.
                    /* Switch to appropriate final state. */
                    if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
                        state = RequestState.RESULT_SENT;
                    } else {
@@ -384,9 +402,10 @@
                    }
                    break;
                case CANCEL_REQUESTED:
                    // Switch to appropriate final state and invoke any cancel
                    // request
                    // handlers.
                    /*
                     * Switch to appropriate final state and invoke any cancel
                     * request handlers.
                     */
                    if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
                        state = RequestState.RESULT_SENT;
                    } else {
@@ -399,14 +418,16 @@
                    break;
                case RESULT_SENT:
                case CANCELLED:
                    // This should not happen - could throw an illegal state
                    // exception?
                    /*
                     * This should not happen - could throw an illegal state
                     * exception?
                     */
                    maySendResult = false; // Prevent sending multiple results.
                    break;
                }
            }
            // Invoke handlers outside of lock.
            /* Invoke handlers outside of lock. */
            if (tmpHandlers != null) {
                for (final ExtendedResultHandlerHolder<?> handler : tmpHandlers) {
                    if (isCancelled) {
@@ -427,7 +448,7 @@
    private final static class SearchRequestContextImpl extends
            RequestContextImpl<Result, SearchResultHandler> implements SearchResultHandler {
        private SearchRequestContextImpl(final ServerConnectionImpl<?> clientConnection,
        private SearchRequestContextImpl(final ServerConnectionImpl clientConnection,
                final SearchResultHandler resultHandler, final int messageID,
                final boolean isCancelSupported) {
            super(clientConnection, resultHandler, messageID, isCancelSupported);
@@ -450,11 +471,11 @@
        }
    }
    private static final class ServerConnectionImpl<C> implements ServerConnection<Integer> {
        private final RequestHandler<RequestContext> requestHandler;
    private static final class ServerConnectionImpl implements ServerConnection<Integer> {
        private final AtomicBoolean isClosed = new AtomicBoolean();
        private final ConcurrentHashMap<Integer, RequestContextImpl<?, ?>> pendingRequests =
                new ConcurrentHashMap<Integer, RequestContextImpl<?, ?>>();
        private final RequestHandler<RequestContext> requestHandler;
        private ServerConnectionImpl(final RequestHandler<RequestContext> requestHandler) {
            this.requestHandler = requestHandler;
@@ -588,11 +609,11 @@
                    return;
                }
                // Register the request in the pending requests table. Even
                // though
                // this request cannot be cancelled, it is important to do this
                // in
                // order to monitor the number of pending operations.
                /*
                 * Register the request in the pending requests table. Even
                 * though this request cannot be cancelled, it is important to
                 * do this in order to monitor the number of pending operations.
                 */
                final RequestContextImpl<R, ResultHandler<? super R>> requestContext =
                        new RequestContextImpl<R, ResultHandler<? super R>>(this, resultHandler,
                                messageID, false);
@@ -605,9 +626,10 @@
                                INFO_CANCELED_BY_CANCEL_REQUEST.get(messageID);
                        cancelledRequest.cancel(cancelReason, request, requestContext, true);
                    } else {
                        // Couldn't find the request. Invoke on context in order
                        // to remove
                        // pending request.
                        /*
                         * Couldn't find the request. Invoke on context in order
                         * to remove pending request.
                         */
                        requestContext
                                .handleErrorResult(newErrorResult(ResultCode.NO_SUCH_OPERATION));
                    }
@@ -694,9 +716,10 @@
                        .toString()));
                return false;
            } else if (isClosed.get()) {
                // A concurrent close may have already removed the pending
                // request but
                // it will have only been notified for cancellation.
                /*
                 * A concurrent close may have already removed the pending
                 * request but it will have only been notified for cancellation.
                 */
                pendingRequests.remove(messageID);
                final LocalizableMessage message = INFO_CLIENT_CONNECTION_CLOSING.get();
@@ -704,20 +727,21 @@
                        message.toString()));
                return false;
            } else {
                // If the connection is closed now then we just have to pay the
                // cost of
                // invoking the request in the request handler.
                /*
                 * If the connection is closed now then we just have to pay the
                 * cost of invoking the request in the request handler.
                 */
                return true;
            }
        }
        private void doClose(final LocalizableMessage cancelReason) {
            if (!isClosed.getAndSet(true)) {
                // At this point if any pending requests are added then we may
                // end up
                // cancelling them, but this does not matter since
                // addPendingRequest
                // will fail the request immediately.
                /*
                 * At this point if any pending requests are added then we may
                 * end up cancelling them, but this does not matter since
                 * addPendingRequest will fail the request immediately.
                 */
                final Iterator<RequestContextImpl<?, ?>> iterator =
                        pendingRequests.values().iterator();
                while (iterator.hasNext()) {
@@ -752,6 +776,19 @@
    }
    /**
     * Adapts the provided request handler as a {@code ServerConnection}.
     *
     * @param requestHandler
     *            The request handler.
     * @return The server connection which will forward requests to the provided
     *         request handler.
     */
    static ServerConnection<Integer> adaptRequestHandler(
            final RequestHandler<RequestContext> requestHandler) {
        return new ServerConnectionImpl(requestHandler);
    }
    private final RequestHandlerFactory<C, RequestContext> factory;
    /**
@@ -772,8 +809,7 @@
    @Override
    public ServerConnection<Integer> handleAccept(final C clientContext)
            throws ErrorResultException {
        final RequestHandler<RequestContext> requestHandler = factory.handleAccept(clientContext);
        return new ServerConnectionImpl<C>(requestHandler);
        return adaptRequestHandler(factory.handleAccept(clientContext));
    }
}
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MemoryBackendTestCase.java
@@ -26,8 +26,7 @@
package org.forgerock.opendj.ldap;
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.opendj.ldap.Connections.newInternalConnectionFactory;
import static org.forgerock.opendj.ldap.Connections.newServerConnectionFactory;
import static org.forgerock.opendj.ldap.Connections.newInternalConnection;
import static org.forgerock.opendj.ldap.requests.Requests.newAddRequest;
import static org.forgerock.opendj.ldap.requests.Requests.newDeleteRequest;
import static org.forgerock.opendj.ldap.requests.Requests.newModifyRequest;
@@ -398,7 +397,7 @@
                getUser1Entry());
    }
    private Connection getConnection() throws IOException, ErrorResultException {
    private Connection getConnection() throws IOException {
        // @formatter:off
        final MemoryBackend backend =
                new MemoryBackend(new LDIFEntryReader(
@@ -440,10 +439,7 @@
                ));
        // @formatter:on
        final Connection connection =
                newInternalConnectionFactory(newServerConnectionFactory(backend), null)
                        .getConnection();
        return connection;
        return newInternalConnection(backend);
    }
    private Entry getUser1Entry() {