mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
28.32.2013 a33a4959bae9ae6b119d5049609e4b439f911ee1
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/RequestHandlerFactoryAdapter.java
@@ -21,12 +21,18 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2011-2012 ForgeRock AS
 *      Copyright 2011-2013 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
import static org.forgerock.opendj.ldap.CoreMessages.*;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_ABANDON_REQUEST;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CANCEL_REQUEST;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_DISCONNECT;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_CLIENT_ERROR;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CANCELED_BY_SERVER_DISCONNECT;
import static org.forgerock.opendj.ldap.CoreMessages.INFO_CLIENT_CONNECTION_CLOSING;
import static org.forgerock.opendj.ldap.CoreMessages.WARN_CLIENT_DUPLICATE_MESSAGE_ID;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Iterator;
@@ -72,9 +78,10 @@
    private static class RequestContextImpl<S extends Result, H extends ResultHandler<? super S>>
            implements RequestContext, ResultHandler<S> {
        // Adapter class which invokes cancel result handlers with correct
        // result
        // type.
        /*
         * Adapter class which invokes cancel result handlers with correct
         * result type.
         */
        private static final class ExtendedResultHandlerHolder<R extends ExtendedResult> {
            private final ExtendedRequest<R> request;
            private final ResultHandler<? super R> resultHandler;
@@ -101,46 +108,46 @@
        }
        private static enum RequestState {
            // Request active
            PENDING,
            // Request active, cancel requested
            CANCEL_REQUESTED,
            // Request active, too late to cancel
            TOO_LATE,
            // Result sent, was cancelled
            CANCELLED,
            // Request active
            PENDING,
            // Result sent, not cancelled
            RESULT_SENT,
            // Result sent, was cancelled
            CANCELLED;
            // Request active, too late to cancel
            TOO_LATE;
        }
        private final int messageID;
        // Cancellation state guarded by lock.
        private final Object stateLock = new Object();
        protected final H resultHandler;
        // These should be notified when a cancel request arrives, at most once.
        private List<CancelRequestListener> cancelRequestListeners = null;
        private LocalizableMessage cancelRequestReason = null;
        // These should be notified when the result is set.
        private List<ExtendedResultHandlerHolder<?>> cancelResultHandlers = null;
        private RequestState state = RequestState.PENDING;
        private LocalizableMessage cancelRequestReason = null;
        private boolean sendResult = true;
        private final ServerConnectionImpl clientConnection;
        private final boolean isCancelSupported;
        private final ServerConnectionImpl<?> clientConnection;
        private final int messageID;
        protected final H resultHandler;
        private boolean sendResult = true;
        protected RequestContextImpl(final ServerConnectionImpl<?> clientConnection,
        private RequestState state = RequestState.PENDING;
        // Cancellation state guarded by lock.
        private final Object stateLock = new Object();
        protected RequestContextImpl(final ServerConnectionImpl clientConnection,
                final H resultHandler, final int messageID, final boolean isCancelSupported) {
            this.clientConnection = clientConnection;
            this.resultHandler = resultHandler;
@@ -171,9 +178,10 @@
                case TOO_LATE:
                case RESULT_SENT:
                case CANCELLED:
                    // No point in registering the callback since the request
                    // can never be
                    // cancelled now.
                    /*
                     * No point in registering the callback since the request
                     * can never be cancelled now.
                     */
                    break;
                }
            }
@@ -191,24 +199,28 @@
            synchronized (stateLock) {
                switch (state) {
                case PENDING:
                    // No cancel request, so no handlers, just switch state.
                    /* No cancel request, so no handlers, just switch state. */
                    if (signalTooLate) {
                        cancelRequestListeners = null;
                        state = RequestState.TOO_LATE;
                    }
                    break;
                case CANCEL_REQUESTED:
                    // Don't change state: let the handler ack the cancellation
                    // request.
                    /*
                     * Don't change state: let the handler ack the cancellation
                     * request.
                     */
                    throw (CancelledResultException) newErrorResult(ResultCode.CANCELLED,
                            cancelRequestReason.toString());
                case TOO_LATE:
                    // Already too late. Nothing to do.
                    /* Already too late. Nothing to do. */
                    break;
                case RESULT_SENT:
                case CANCELLED:
                    // This should not happen - could throw an illegal state
                    // exception?
                    /*
                     * This should not happen - could throw an illegal state
                     * exception?
                     */
                    break;
                }
            }
@@ -229,11 +241,12 @@
        public void handleErrorResult(final ErrorResultException error) {
            if (clientConnection.removePendingRequest(this)) {
                if (setResult(error.getResult())) {
                    // FIXME: we must invoke the result handler even when
                    // abandoned so
                    // that chained result handlers may clean up, log, etc. We
                    // really need
                    // to signal that the result must not be sent to the client.
                    /*
                     * FIXME: we must invoke the result handler even when
                     * abandoned so that chained result handlers may clean up,
                     * log, etc. We really need to signal that the result must
                     * not be sent to the client.
                     */
                }
                resultHandler.handleErrorResult(error);
            }
@@ -246,11 +259,12 @@
        public void handleResult(final S result) {
            if (clientConnection.removePendingRequest(this)) {
                if (setResult(result)) {
                    // FIXME: we must invoke the result handler even when
                    // abandoned so
                    // that chained result handlers may clean up, log, etc. We
                    // really need
                    // to signal that the result must not be sent to the client.
                    /*
                     * FIXME: we must invoke the result handler even when
                     * abandoned so that chained result handlers may clean up,
                     * log, etc. We really need to signal that the result must
                     * not be sent to the client.
                     */
                }
                resultHandler.handleResult(result);
            }
@@ -291,7 +305,7 @@
            synchronized (stateLock) {
                switch (state) {
                case PENDING:
                    // Switch to CANCEL_REQUESTED state.
                    /* Switch to CANCEL_REQUESTED state. */
                    cancelRequestReason = reason;
                    if (cancelResultHandler != null) {
                        cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -304,7 +318,9 @@
                    this.sendResult &= sendResult;
                    break;
                case CANCEL_REQUESTED:
                    // Cancel already request so listeners already invoked.
                    /*
                     * Cancel already request so listeners already invoked.
                     */
                    if (cancelResultHandler != null) {
                        if (cancelResultHandlers == null) {
                            cancelResultHandlers = new LinkedList<ExtendedResultHandlerHolder<?>>();
@@ -315,20 +331,22 @@
                    break;
                case TOO_LATE:
                case RESULT_SENT:
                    // Cannot cancel, so invoke result handler immediately
                    // outside of
                    // lock.
                    /*
                     * Cannot cancel, so invoke result handler immediately
                     * outside of lock.
                     */
                    if (cancelResultHandler != null) {
                        invokeResultHandler = true;
                        resultHandlerIsSuccess = false;
                    }
                    break;
                case CANCELLED:
                    // Multiple cancellation attempts. Clients should not do
                    // this, but the
                    // cancel will effectively succeed immediately, so invoke
                    // result
                    // handler immediately outside of lock.
                    /*
                     * Multiple cancellation attempts. Clients should not do
                     * this, but the cancel will effectively succeed
                     * immediately, so invoke result handler immediately outside
                     * of lock.
                     */
                    if (cancelResultHandler != null) {
                        invokeResultHandler = true;
                        resultHandlerIsSuccess = true;
@@ -337,7 +355,7 @@
                }
            }
            // Invoke listeners outside of lock.
            /* Invoke listeners outside of lock. */
            if (tmpListeners != null) {
                for (final CancelRequestListener listener : tmpListeners) {
                    listener.handleCancelRequest(reason);
@@ -376,7 +394,7 @@
                switch (state) {
                case PENDING:
                case TOO_LATE:
                    // Switch to appropriate final state.
                    /* Switch to appropriate final state. */
                    if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
                        state = RequestState.RESULT_SENT;
                    } else {
@@ -384,9 +402,10 @@
                    }
                    break;
                case CANCEL_REQUESTED:
                    // Switch to appropriate final state and invoke any cancel
                    // request
                    // handlers.
                    /*
                     * Switch to appropriate final state and invoke any cancel
                     * request handlers.
                     */
                    if (!result.getResultCode().equals(ResultCode.CANCELLED)) {
                        state = RequestState.RESULT_SENT;
                    } else {
@@ -399,14 +418,16 @@
                    break;
                case RESULT_SENT:
                case CANCELLED:
                    // This should not happen - could throw an illegal state
                    // exception?
                    /*
                     * This should not happen - could throw an illegal state
                     * exception?
                     */
                    maySendResult = false; // Prevent sending multiple results.
                    break;
                }
            }
            // Invoke handlers outside of lock.
            /* Invoke handlers outside of lock. */
            if (tmpHandlers != null) {
                for (final ExtendedResultHandlerHolder<?> handler : tmpHandlers) {
                    if (isCancelled) {
@@ -427,7 +448,7 @@
    private final static class SearchRequestContextImpl extends
            RequestContextImpl<Result, SearchResultHandler> implements SearchResultHandler {
        private SearchRequestContextImpl(final ServerConnectionImpl<?> clientConnection,
        private SearchRequestContextImpl(final ServerConnectionImpl clientConnection,
                final SearchResultHandler resultHandler, final int messageID,
                final boolean isCancelSupported) {
            super(clientConnection, resultHandler, messageID, isCancelSupported);
@@ -450,11 +471,11 @@
        }
    }
    private static final class ServerConnectionImpl<C> implements ServerConnection<Integer> {
        private final RequestHandler<RequestContext> requestHandler;
    private static final class ServerConnectionImpl implements ServerConnection<Integer> {
        private final AtomicBoolean isClosed = new AtomicBoolean();
        private final ConcurrentHashMap<Integer, RequestContextImpl<?, ?>> pendingRequests =
                new ConcurrentHashMap<Integer, RequestContextImpl<?, ?>>();
        private final RequestHandler<RequestContext> requestHandler;
        private ServerConnectionImpl(final RequestHandler<RequestContext> requestHandler) {
            this.requestHandler = requestHandler;
@@ -588,11 +609,11 @@
                    return;
                }
                // Register the request in the pending requests table. Even
                // though
                // this request cannot be cancelled, it is important to do this
                // in
                // order to monitor the number of pending operations.
                /*
                 * Register the request in the pending requests table. Even
                 * though this request cannot be cancelled, it is important to
                 * do this in order to monitor the number of pending operations.
                 */
                final RequestContextImpl<R, ResultHandler<? super R>> requestContext =
                        new RequestContextImpl<R, ResultHandler<? super R>>(this, resultHandler,
                                messageID, false);
@@ -605,9 +626,10 @@
                                INFO_CANCELED_BY_CANCEL_REQUEST.get(messageID);
                        cancelledRequest.cancel(cancelReason, request, requestContext, true);
                    } else {
                        // Couldn't find the request. Invoke on context in order
                        // to remove
                        // pending request.
                        /*
                         * Couldn't find the request. Invoke on context in order
                         * to remove pending request.
                         */
                        requestContext
                                .handleErrorResult(newErrorResult(ResultCode.NO_SUCH_OPERATION));
                    }
@@ -694,9 +716,10 @@
                        .toString()));
                return false;
            } else if (isClosed.get()) {
                // A concurrent close may have already removed the pending
                // request but
                // it will have only been notified for cancellation.
                /*
                 * A concurrent close may have already removed the pending
                 * request but it will have only been notified for cancellation.
                 */
                pendingRequests.remove(messageID);
                final LocalizableMessage message = INFO_CLIENT_CONNECTION_CLOSING.get();
@@ -704,20 +727,21 @@
                        message.toString()));
                return false;
            } else {
                // If the connection is closed now then we just have to pay the
                // cost of
                // invoking the request in the request handler.
                /*
                 * If the connection is closed now then we just have to pay the
                 * cost of invoking the request in the request handler.
                 */
                return true;
            }
        }
        private void doClose(final LocalizableMessage cancelReason) {
            if (!isClosed.getAndSet(true)) {
                // At this point if any pending requests are added then we may
                // end up
                // cancelling them, but this does not matter since
                // addPendingRequest
                // will fail the request immediately.
                /*
                 * At this point if any pending requests are added then we may
                 * end up cancelling them, but this does not matter since
                 * addPendingRequest will fail the request immediately.
                 */
                final Iterator<RequestContextImpl<?, ?>> iterator =
                        pendingRequests.values().iterator();
                while (iterator.hasNext()) {
@@ -752,6 +776,19 @@
    }
    /**
     * Adapts the provided request handler as a {@code ServerConnection}.
     *
     * @param requestHandler
     *            The request handler.
     * @return The server connection which will forward requests to the provided
     *         request handler.
     */
    static ServerConnection<Integer> adaptRequestHandler(
            final RequestHandler<RequestContext> requestHandler) {
        return new ServerConnectionImpl(requestHandler);
    }
    private final RequestHandlerFactory<C, RequestContext> factory;
    /**
@@ -772,8 +809,7 @@
    @Override
    public ServerConnection<Integer> handleAccept(final C clientContext)
            throws ErrorResultException {
        final RequestHandler<RequestContext> requestHandler = factory.handleAccept(clientContext);
        return new ServerConnectionImpl<C>(requestHandler);
        return adaptRequestHandler(factory.handleAccept(clientContext));
    }
}