mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
20.44.2016 5511a94238385a30b5b516ee360b234ff56d7c3f
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

* Better exception handling
* Rename some misleading class names in sdk reactive package
* Fix some bugs
15 files modified
564 ■■■■■ changed files
opendj-core/src/main/java/com/forgerock/reactive/Completable.java 10 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java 63 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Single.java 20 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Stream.java 9 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java 4 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java 63 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java 16 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java 21 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 290 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java 28 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java 6 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java 2 ●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java 24 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 4 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java 4 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -21,9 +21,9 @@
public interface Completable extends Publisher<Void> {
    /** Emitter is used to notify when the operation has been completed, successfully or not. */
    public interface Emitter {
    public interface Subscriber {
        /** Notify that this {@link Completable} is now completed. */
        void complete();
        void onComplete();
        /**
         * Notify that this {@link Completable} cannot be completed because of an error.
@@ -35,14 +35,14 @@
    }
    /** Adapts the streaming api to a callback one. */
    public interface OnSubscribe {
    public interface Emitter {
        /**
         * Called when the streaming api has been subscribed.
         *
         * @param e
         *            The {@link Emitter} to use to communicate the completeness of this {@link Completable}
         *            The {@link Subscriber} to use to communicate the completeness of this {@link Completable}
         */
        void onSubscribe(Emitter e);
        void subscribe(Subscriber e);
    }
    /**
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -17,7 +17,6 @@
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
@@ -112,21 +111,34 @@
    }
    /**
     * Create a new {@link Single} from the given error.
     *
     * @param <V>
     *            Type of the datum emitted
     * @param error
     *            The error emitted by this {@link Single}
     * @return A new {@link Single}
     */
    public static <V> Single<V> singleError(final Throwable error) {
        return new RxJavaSingle<>(io.reactivex.Single.<V>error(error));
    }
    /**
     * Creates a bridge from callback world to {@link Single}.
     *
     * @param <V>
     *            Type of the datum emitted
     * @param onSubscribe
     * @param emitter
     *            Action to perform once this {@link Single} has been subscribed to.
     * @return A new {@link Single}
     */
    public static <V> Single<V> newSingle(final Single.OnSubscribe<V> onSubscribe) {
    public static <V> Single<V> newSingle(final Single.Emitter<V> emitter) {
        return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe<V>() {
            @Override
            public void subscribe(final SingleEmitter<V> e) throws Exception {
                onSubscribe.onSubscribe(new Single.Emitter<V>() {
                emitter.subscribe(new Single.Subscriber<V>() {
                    @Override
                    public void onSuccess(V value) {
                    public void onComplete(V value) {
                        e.onSuccess(value);
                    }
@@ -146,18 +158,18 @@
     *            Action to perform once this {@link Completable} has been subscribed to.
     * @return A new {@link Completable}
     */
    public static Completable newCompletable(final Completable.OnSubscribe onSubscribe) {
    public static Completable newCompletable(final Completable.Emitter onSubscribe) {
        return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(final CompletableEmitter e) throws Exception {
                onSubscribe.onSubscribe(new Completable.Emitter() {
                onSubscribe.subscribe(new Completable.Subscriber() {
                    @Override
                    public void complete() {
                    public void onComplete() {
                        e.onComplete();
                    }
                    @Override
                    public void onError(Throwable t) {
                        e.onError(t);
                    public void onError(final Throwable error) {
                        e.onError(error);
                    }
                });
            }
@@ -166,14 +178,14 @@
    private static final class RxJavaStream<V> implements Stream<V> {
        private Flowable<V> impl;
        private final Flowable<V> impl;
        private RxJavaStream(final Flowable<V> impl) {
            this.impl = impl;
        }
        @Override
        public void subscribe(Subscriber<? super V> s) {
        public void subscribe(org.reactivestreams.Subscriber<? super V> s) {
            impl.subscribe(s);
        }
@@ -219,6 +231,16 @@
        }
        @Override
        public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
            return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
                @Override
                public void accept(Throwable t) throws Exception {
                    onError.accept(t);
                }
            }));
        }
        @Override
        public Stream<V> onErrorResumeWith(final Function<Throwable, Publisher<V>, Exception> function) {
            return new RxJavaStream<>(
                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, Publisher<? extends V>>() {
@@ -259,7 +281,7 @@
        }
        @Override
        public void subscribe(Subscriber<? super V> s) {
        public void subscribe(org.reactivestreams.Subscriber<? super V> s) {
            impl.toFlowable().subscribe(s);
        }
@@ -279,7 +301,7 @@
        }
        @Override
        public <O> Single<O> flatMap(final Function<V, Publisher<O>, Exception> function) {
        public <O> Single<O> flatMap(final Function<V, Single<O>, Exception> function) {
            return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function<V, SingleSource<O>>() {
                @Override
                public SingleSource<O> apply(V t) throws Exception {
@@ -287,6 +309,17 @@
                }
            }));
        }
        @Override
        public Single<V> onErrorResumeWith(final Function<Throwable, Single<V>, Exception> function) {
            return new RxJavaSingle<>(
                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, SingleSource<V>>() {
                        @Override
                        public SingleSource<V> apply(Throwable error) throws Exception {
                            return io.reactivex.Single.fromPublisher(function.apply(error));
                        }
                    }));
        }
    }
    private static final class RxJavaCompletable implements Completable {
@@ -303,7 +336,7 @@
        }
        @Override
        public void subscribe(Subscriber<? super Void> s) {
        public void subscribe(org.reactivestreams.Subscriber<? super Void> s) {
            impl.<Void>toFlowable().subscribe(s);
        }
    }
opendj-core/src/main/java/com/forgerock/reactive/Single.java
@@ -27,14 +27,14 @@
public interface Single<V> extends Publisher<V> {
    /** Emitter is used to notify when the operation has been completed, successfully or not. */
    public interface Emitter<V> {
    public interface Subscriber<V> {
        /**
         * Signal a success value.
         *
         * @param t
         *            the value, not null
         */
        void onSuccess(V t);
        void onComplete(V t);
        /**
         * Signal an exception.
@@ -46,13 +46,13 @@
    }
    /** Adapts the streaming api to a callback one. */
    public interface OnSubscribe<V> {
    public interface Emitter<V> {
        /**
         * Called for each SingleObserver that subscribes.
         * @param e the safe emitter instance, never null
         * @throws Exception on error
         */
        void onSubscribe(Emitter<V> e) throws Exception;
        void subscribe(Subscriber<V> e) throws Exception;
    }
    /**
@@ -82,7 +82,17 @@
     *            Function to apply to perform the asynchronous transformation
     * @return A new {@link Single} transforming the datum emitted by this {@link Single}.
     */
    <O> Single<O> flatMap(Function<V, Publisher<O>, Exception> function);
    <O> Single<O> flatMap(Function<V, Single<O>, Exception> function);
    /**
     * When an error occurs in this stream, continue the processing with the new {@link Single} provided by the
     * function.
     *
     * @param function
     *            Generates the single which must will used to resume operation when this {@link Single} failed.
     * @return A new {@link Single}
     */
    Single<V> onErrorResumeWith(Function<Throwable, Single<V>, Exception> function);
    /**
     * Subscribe to the single value emitted by this {@link Single}.
opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -75,6 +75,15 @@
    Stream<V> onErrorResumeWith(Function<Throwable, Publisher<V>, Exception> function);
    /**
     * Invokes the on error {@link Consumer} when an error occurs on this stream.
     *
     * @param onError
     *            The {@link Consumer} to invoke on error
     * @return a new {@link Stream}
     */
    Stream<V> onErrorDo(Consumer<Throwable> onError);
    /**
     * Subscribe to this stream and drop all data produced by it.
     */
    void subscribe();
opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
@@ -28,7 +28,7 @@
 * {@code ByteSequenceReader} must be created using the associated
 * {@code ByteSequence}'s {@code asReader()} method.
 */
public final class ByteSequenceReader {
public class ByteSequenceReader {
    /** The current position in the byte sequence. */
    private int pos;
@@ -55,7 +55,7 @@
     * @param sequence
     *            The byte sequence to be read.
     */
    ByteSequenceReader(final ByteSequence sequence) {
    public ByteSequenceReader(final ByteSequence sequence) {
        this.sequence = sequence;
    }
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -23,6 +23,7 @@
import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
/**
@@ -36,17 +37,39 @@
    /** Listens for disconnection event. */
    public interface DisconnectListener {
        /**
         * Invoked when the connection has been closed as a result of a client disconnect, a fatal connection error, or
         * a server-side {@link #disconnect}.
         * Invoked when the connection has been disconnected because of an error (i.e: message too big).
         *
         * @param context
         *            The {@link LDAPClientContext} which has failed
         * @param error
         *            The error
         */
        void exceptionOccurred(final LDAPClientContext context, final Throwable error);
        /**
         * Invoked when the client closes the connection, possibly using an unbind request.
         *
         * @param context
         *            The {@link LDAPClientContext} which has been disconnected
         * @param unbindRequest
         *            The unbind request, which may be {@code null} if one was not sent before the connection was
         *            closed.
         */
        void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest);
        /**
         * Invoked when the connection has been disconnected by the server.
         *
         * @param context
         *            The {@link LDAPClientContext} which has been disconnected
         * @param resultCode
         *            The {@link ResultCode} of the notification sent, or null
         * @param message
         *            The message of the notification sent, or null
         *            The result code which was included with the disconnect notification, or {@code null} if no
         *            disconnect notification was sent.
         * @param diagnosticMessage
         *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
         */
        void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode, final String message);
        void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
                final String diagnosticMessage);
    }
    /**
@@ -57,30 +80,24 @@
    void onDisconnect(final DisconnectListener listener);
    /**
     * Disconnects the client without sending a disconnect notification.
     * <p>
     * <b>Server connections:</b> invoking this method causes
     * {@link ServerConnection#handleConnectionDisconnected
     * handleConnectionDisconnected} to be called before this method returns.
     * Disconnects the client without sending a disconnect notification. Invoking this method causes
     * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
     * method returns.
     */
    void disconnect();
    /**
     * Disconnects the client and sends a disconnect notification, if possible,
     * containing the provided result code and diagnostic message.
     * <p>
     * <b>Server connections:</b> invoking this method causes
     * {@link ServerConnection#handleConnectionDisconnected
     * handleConnectionDisconnected} to be called before this method returns.
     * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
     * message. Invoking this method causes
     * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
     * method returns.
     *
     * @param resultCode
     *            The result code which should be included with the disconnect
     *            notification.
     * @param message
     *            The diagnostic message, which may be empty or {@code null}
     *            indicating that none was provided.
     *            The result code to include with the disconnect notification
     * @param diagnosticMessage
     *            The diagnostic message to include with the disconnect notification
     */
    void disconnect(final ResultCode resultCode, final String message);
    void disconnect(final ResultCode resultCode, final String diagnosticMessage);
    /**
     * Returns the {@code InetSocketAddress} associated with the local system.
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -101,10 +101,22 @@
        final ServerConnectionAdaptor<Integer> adapter = new ServerConnectionAdaptor<>(serverConnection);
        clientContext.onDisconnect(new DisconnectListener() {
            @Override
            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String message) {
                serverConnection.handleConnectionDisconnected(resultCode, message);
            public void exceptionOccurred(final LDAPClientContext context, final Throwable error) {
                serverConnection.handleConnectionError(error);
            }
            @Override
            public void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
                serverConnection.handleConnectionClosed(0, unbindRequest);
            }
            @Override
            public void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
                    final String diagnosticMessage) {
                serverConnection.handleConnectionDisconnected(resultCode, diagnosticMessage);
            }
        });
        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
            @Override
            public Single<Stream<Response>> handle(final LDAPClientContext context,
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -38,6 +38,7 @@
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.glassfish.grizzly.filterchain.FilterChain;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.nio.transport.TCPNIOBindingHandler;
import org.glassfish.grizzly.nio.transport.TCPNIOConnection;
import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection;
@@ -66,7 +67,7 @@
     *            The addresses to listen on.
     * @param options
     *            The LDAP listener options.
     * @param handler
     * @param requestHandlerFactory
     *            The server connection factory which will be used to create server connections.
     * @throws IOException
     *             If an error occurred while trying to listen on the provided address.
@@ -74,8 +75,8 @@
    public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses, final Options options,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           LdapException> handler) throws IOException {
        this(addresses, handler, options, null);
                           LdapException> requestHandlerFactory) throws IOException {
        this(addresses, requestHandlerFactory, options, null);
    }
    /**
@@ -84,7 +85,7 @@
     *
     * @param addresses
     *            The addresses to listen on.
     * @param handler
     * @param requestHandlerFactory
     *            The server connection factory which will be used to create server connections.
     * @param options
     *            The LDAP listener options.
@@ -97,14 +98,20 @@
    public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           LdapException> handler,
                           LdapException> requestHandlerFactory,
            final Options options, TCPNIOTransport transport) throws IOException {
        this.transport = DEFAULT_TRANSPORT.acquireIfNull(transport);
        this.options = Options.copyOf(options);
        final LDAPServerFilter serverFilter = new LDAPServerFilter(handler, options,
        final LDAPServerFilter serverFilter = new LDAPServerFilter(requestHandlerFactory, options,
                options.get(LDAP_DECODE_OPTIONS), options.get(MAX_CONCURRENT_REQUESTS));
        final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(),
                new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)), serverFilter);
                new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)) {
                    @Override
                    protected void onLdapCodecError(FilterChainContext ctx, Throwable error) {
                        serverFilter.exceptionOccurred(ctx, error);
                    }
                }, serverFilter);
        final TCPNIOBindingHandler bindingHandler = TCPNIOBindingHandler.builder(this.transport.get())
                .processor(ldapChain).build();
        this.serverConnections = new ArrayList<>(addresses.size());
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -34,14 +34,14 @@
import javax.security.sasl.SaslServer;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
import org.forgerock.opendj.io.LDAP;
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.controls.Control;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.IntermediateResponse;
import org.forgerock.opendj.ldap.responses.Response;
@@ -55,8 +55,6 @@
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import org.glassfish.grizzly.CloseReason;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.attributes.Attribute;
@@ -72,8 +70,9 @@
import org.reactivestreams.Subscription;
import com.forgerock.reactive.Completable;
import com.forgerock.reactive.Completable.Emitter;
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.internal.util.BackpressureHelper;
@@ -84,12 +83,14 @@
 */
final class LDAPServerFilter extends BaseFilter {
    private static final Object DUMMY = new byte[0];
    private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
            .createAttribute("LDAPServerConnection");
    private final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           LdapException> connectionHandler;
                           LdapException> connectionHandlerFactory;
    /**
     * Map of cipher phrases to effective key size (bits). Taken from the
@@ -132,11 +133,11 @@
     *            The maximum BER element size, or <code>0</code> to indicate that there is no limit.
     */
    LDAPServerFilter(
            Function<LDAPClientContext,
            final Function<LDAPClientContext,
                     ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                     LdapException> connectionHandler,
                           LdapException> connectionHandlerFactory,
            final Options connectionOptions, final DecodeOptions options, final int maxPendingRequests) {
        this.connectionHandler = connectionHandler;
        this.connectionHandlerFactory = connectionHandlerFactory;
        this.connectionOptions = connectionOptions;
        this.maxConcurrentRequests = maxPendingRequests;
    }
@@ -152,98 +153,86 @@
        configureConnection(connection, logger, connectionOptions);
        final ClientConnectionImpl clientContext = new ClientConnectionImpl(connection);
        LDAP_CONNECTION_ATTR.set(connection, clientContext);
        final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> handler = connectionHandler
                .apply(clientContext);
        final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
                connectionHandlerFactory.apply(clientContext);
        streamFromPublisher(clientContext).flatMap(new Function<LdapRawMessage, Publisher<Integer>, Exception>() {
        clientContext
            .read()
            .flatMap(new Function<LdapRawMessage, Publisher<Object>, Exception>() {
            @Override
            public Publisher<Integer> apply(final LdapRawMessage rawRequest) throws Exception {
                return handler
                        .handle(clientContext, rawRequest)
                        .flatMap(new Function<Stream<Response>, Publisher<Integer>, Exception>() {
                public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
                    if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
                        clientContext.notifyAndClose(rawRequest);
                        return singleFrom(DUMMY);
                    }
                    Single<Stream<Response>> response;
                    try {
                        response = requestHandler.handle(clientContext, rawRequest);
                    } catch (Exception e) {
                        response = singleError(e);
                    }
                    return response
                            .flatMap(new Function<Stream<Response>, Single<Object>, Exception>() {
                            @Override
                            public Publisher<Integer> apply(final Stream<Response> response) {
                                return clientContext.write(response.onErrorResumeWith(toErrorMessage(rawRequest))
                                                                   .map(toResponseMessage(rawRequest))).toSingle(1);
                                public Single<Object> apply(final Stream<Response> response) {
                                    return clientContext.write(response.map(toLdapResponseMessage(rawRequest)))
                                                        .toSingle(DUMMY);
                                }
                            })
                            .onErrorResumeWith(new Function<Throwable, Single<Object>, Exception>() {
                                @Override
                                public Single<Object> apply(final Throwable error) throws Exception {
                                    if (!(error instanceof LdapException)) {
                                        // Unexpected error, propagate it.
                                        return singleError(error);
                                    }
                                    final LdapException exception = (LdapException) error;
                                    return clientContext
                                            .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())))
                                            .toSingle(DUMMY);
                            }
                        });
            }
        }, maxConcurrentRequests).subscribe();
            }, maxConcurrentRequests)
            .onErrorDo(new Consumer<Throwable>() {
                @Override
                public void accept(final Throwable error) throws Exception {
                    clientContext.notifyAndCloseSilently(error);
                }
            })
            .subscribe();
        return ctx.getStopAction();
    }
    private Function<Throwable, Publisher<Response>, Exception> toErrorMessage(final LdapRawMessage rawRequest) {
        return new Function<Throwable, Publisher<Response>, Exception>() {
            @Override
            public Publisher<Response> apply(Throwable error) throws Exception {
                if (!(error instanceof LdapException)) {
                    // Propagate error
                    return streamError(error);
                }
                final Result result = ((LdapException) error).getResult();
    private final LdapResponseMessage toLdapResponseMessage(final LdapRawMessage rawRequest, final Result result) {
                switch (rawRequest.getMessageType()) {
        case OP_TYPE_ADD_REQUEST:
            return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), result);
                case OP_TYPE_BIND_REQUEST:
                    if (result instanceof BindResult) {
                        return streamFrom((Response) result);
                    }
                    return streamFrom((Response) populateNewResultFromResult(
                            Responses.newBindResult(result.getResultCode()), result));
            return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), result);
                case OP_TYPE_COMPARE_REQUEST:
                    if (result instanceof CompareResult) {
                        return streamFrom((Response) result);
                    }
                    return streamFrom((Response) populateNewResultFromResult(
                            Responses.newCompareResult(result.getResultCode()), result));
            return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_DELETE_REQUEST:
            return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), result);
                case OP_TYPE_EXTENDED_REQUEST:
                    if (result instanceof ExtendedResult) {
                        return streamFrom((Response) result);
                    }
                    return streamFrom((Response) populateNewResultFromResult(
                            Responses.newGenericExtendedResult(result.getResultCode()), result));
            return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_MODIFY_DN_REQUEST:
            return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_MODIFY_REQUEST:
            return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_SEARCH_REQUEST:
            return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), result);
                default:
                    return streamFrom((Response) result);
            throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
                }
            }
        };
    }
    private Result populateNewResultFromResult(final Result newResult,
            final Result result) {
        newResult.setDiagnosticMessage(result.getDiagnosticMessage());
        newResult.setMatchedDN(result.getMatchedDN());
        newResult.setCause(result.getCause());
        for (final Control control : result.getControls()) {
            newResult.addControl(control);
        }
        return newResult;
    }
    private Function<Response, LdapResponseMessage, Exception> toResponseMessage(final LdapRawMessage rawRequest) {
    private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(final LdapRawMessage rawRequest) {
        return new Function<Response, LdapResponseMessage, Exception>() {
            @Override
            public LdapResponseMessage apply(final Response response) {
                if (response instanceof Result) {
                    switch (rawRequest.getMessageType()) {
                    case OP_TYPE_ADD_REQUEST:
                        return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), response);
                    case OP_TYPE_BIND_REQUEST:
                        return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), response);
                    case OP_TYPE_COMPARE_REQUEST:
                        return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), response);
                    case OP_TYPE_DELETE_REQUEST:
                        return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), response);
                    case OP_TYPE_EXTENDED_REQUEST:
                        return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), response);
                    case OP_TYPE_MODIFY_DN_REQUEST:
                        return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), response);
                    case OP_TYPE_MODIFY_REQUEST:
                        return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), response);
                    case OP_TYPE_SEARCH_REQUEST:
                        return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), response);
                    default:
                        throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
                    }
                    return toLdapResponseMessage(rawRequest, (Result) response);
                }
                if (response instanceof IntermediateResponse) {
                    return newResponseMessage(OP_TYPE_INTERMEDIATE_RESPONSE, rawRequest.getMessageId(), response);
@@ -274,7 +263,7 @@
        return ctx.getStopAction();
    }
    final class ClientConnectionImpl implements LDAPClientContext, Publisher<LdapRawMessage> {
    final class ClientConnectionImpl implements LDAPClientContext {
        final class GrizzlyBackpressureSubscription implements Subscription {
            private final AtomicLong pendingRequests = new AtomicLong();
@@ -339,38 +328,24 @@
        private ClientConnectionImpl(final Connection<?> connection) {
            this.connection = connection;
            connection.closeFuture().addCompletionHandler(new CompletionHandler<CloseReason>() {
                @Override
                public void completed(CloseReason result) {
                    disconnect0(null, null);
                }
        Stream<LdapRawMessage> read() {
            return streamFromPublisher(new Publisher<LdapRawMessage>() {
                @Override
                public void updated(CloseReason result) {
                public void subscribe(final Subscriber<? super LdapRawMessage> subscriber) {
                    if (upstream != null) {
                        return;
                }
                @Override
                public void failed(Throwable throwable) {
                }
                @Override
                public void cancelled() {
                    upstream = new GrizzlyBackpressureSubscription(subscriber);
                }
            });
        }
        @Override
        public void subscribe(final Subscriber<? super LdapRawMessage> s) {
            if (upstream != null) {
                return;
            }
            this.upstream = new GrizzlyBackpressureSubscription(s);
        }
        Completable write(final Publisher<LdapResponseMessage> messages) {
            return newCompletable(new Completable.OnSubscribe() {
            return newCompletable(new Completable.Emitter() {
                @Override
                public void onSubscribe(Emitter e) {
                public void subscribe(Completable.Subscriber e) {
                    messages.subscribe(new LdapResponseMessageWriter(connection, e));
                }
            });
@@ -378,9 +353,9 @@
        @Override
        public void enableTLS(final SSLEngine sslEngine) {
            Reject.ifNull(sslEngine);
            Reject.ifNull(sslEngine, "sslEngine must not be null");
            synchronized (this) {
                if (isTLSEnabled()) {
                if (isFilterExists(SSLFilter.class)) {
                    throw new IllegalStateException("TLS already enabled");
                }
                SSLUtils.setSSLEngine(connection, sslEngine);
@@ -390,9 +365,15 @@
        @Override
        public void enableSASL(final SaslServer saslServer) {
            Reject.ifNull(saslServer, "saslServer must not be null");
            synchronized (this) {
                if (isFilterExists(SaslFilter.class)) {
                    throw new IllegalStateException("Sasl already enabled");
                }
            SaslUtils.setSaslServer(connection, saslServer);
            installFilter(new SaslFilter());
        }
        }
        @Override
        public InetSocketAddress getLocalAddress() {
@@ -474,16 +455,11 @@
            GrizzlyUtils.addFilterToConnection(filter, connection);
        }
        /**
         * Indicates whether TLS is enabled this provided connection.
         *
         * @return {@code true} if TLS is enabled on this connection, otherwise {@code false}.
         */
        private boolean isTLSEnabled() {
        private boolean isFilterExists(Class<?> filterKlass) {
            synchronized (this) {
                final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
                for (final Filter filter : currentFilterChain) {
                    if (filter instanceof SSLFilter) {
                    if (filterKlass.isAssignableFrom(filter.getClass())) {
                        return true;
                    }
                }
@@ -493,35 +469,97 @@
        @Override
        public void onDisconnect(DisconnectListener listener) {
            Reject.ifNull(listener, "listener must not be null");
            listeners.add(listener);
        }
        @Override
        public void disconnect() {
            disconnect0(null, null);
            if (isClosed.compareAndSet(false, true)) {
                try {
                    for (DisconnectListener listener : listeners) {
                        listener.connectionDisconnected(this, null, null);
                    }
                } finally {
                    closeConnection();
                }
            }
        }
        private void closeConnection() {
            if (upstream != null) {
                upstream.cancel();
                upstream = null;
            }
            connection.closeSilently();
        }
        @Override
        public void disconnect(final ResultCode resultCode, final String message) {
            sendUnsolicitedNotification(
                    Responses.newGenericExtendedResult(resultCode)
        public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
                             .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION)
                             .setDiagnosticMessage(message));
            disconnect0(resultCode, message);
                                                     .setDiagnosticMessage(diagnosticMessage));
                try {
                    for (DisconnectListener listener : listeners) {
                        listener.connectionDisconnected(this, resultCode, diagnosticMessage);
                    }
                } finally {
                    closeConnection();
                }
            }
        }
        private void disconnect0(final ResultCode resultCode, final String message) {
        private void notifyAndClose(final LdapRawMessage unbindRequest) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                if (unbindRequest == null) {
                    doNotifySilentlyConnectionClosed(null);
                } else {
                    try {
                        LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
                                .readMessage(new AbstractLDAPMessageHandler() {
                                    @Override
                                    public void unbindRequest(int messageID, UnbindRequest unbindRequest)
                                            throws DecodeException, IOException {
                                        doNotifySilentlyConnectionClosed(unbindRequest);
                                    }
                                });
                    } catch (Exception e) {
                        doNotifySilentlyConnectionClosed(null);
                    }
                }
            }
        }
        private void doNotifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
            try {
                for (final DisconnectListener listener : listeners) {
                    try {
                        listener.connectionClosed(this, unbindRequest);
                    } catch (Exception e) {
                        // TODO: Log as a warning ?
                    }
                }
            } finally {
                closeConnection();
            }
        }
        private void notifyAndCloseSilently(final Throwable error) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                try {
                    // Notify the server connection: it may be null if disconnect is
                    // invoked during accept.
                    for (DisconnectListener listener : listeners) {
                        listener.connectionDisconnected(this, resultCode, message);
                    for (final DisconnectListener listener : listeners) {
                        try {
                            listener.exceptionOccurred(this, error);
                        } catch (Exception e) {
                            // TODO: Log as a warning ?
                        }
                    }
                } finally {
                    // Close the connection.
                    connection.closeSilently();
                    closeConnection();
                }
            }
        }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -36,7 +36,7 @@
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
final class LdapCodec extends LDAPBaseFilter {
abstract class LdapCodec extends LDAPBaseFilter {
    LdapCodec(final int maxElementSize, final DecodeOptions decodeOptions) {
        super(decodeOptions, maxElementSize);
@@ -44,14 +44,28 @@
    @Override
    public NextAction handleRead(final FilterChainContext ctx) throws IOException {
        try {
        final Buffer buffer = ctx.getMessage();
        final LdapRawMessage message = readMessage(buffer);
            final LdapRawMessage message;
            message = readMessage(buffer);
        if (message != null) {
            ctx.setMessage(message);
            return ctx.getInvokeAction(getRemainingBuffer(buffer));
        }
        return ctx.getStopAction(getRemainingBuffer(buffer));
        } catch (Exception e) {
            onLdapCodecError(ctx, e);
            // make the connection deaf to any following input
            // onLdapDecodeError call will take care of error processing
            // and closing the connection
            final NextAction suspendAction = ctx.getSuspendAction();
            ctx.completeAndRecycle();
            return suspendAction;
    }
    }
    protected abstract void onLdapCodecError(FilterChainContext ctx, Throwable error);
    private LdapRawMessage readMessage(final Buffer buffer) throws IOException {
        try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
@@ -123,10 +137,18 @@
        try {
            final Buffer buffer = toBuffer(writer, ctx.<LdapResponseMessage> getMessage());
            ctx.setMessage(buffer);
            return ctx.getInvokeAction();
        } catch (Exception e) {
            onLdapCodecError(ctx, e);
            // make the connection deaf to any following input
            // onLdapDecodeError call will take care of error processing
            // and closing the connection
            final NextAction suspendAction = ctx.getSuspendAction();
            ctx.completeAndRecycle();
            return suspendAction;
        } finally {
            GrizzlyUtils.recycleWriter(writer);
        }
        return ctx.getInvokeAction();
    }
    private Buffer toBuffer(final LDAPWriter<ASN1BufferWriter> writer, final LdapResponseMessage message)
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -26,10 +26,10 @@
final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
    private final Connection<?> connection;
    private final Completable.Emitter completable;
    private final Completable.Subscriber completable;
    private Subscription upstream;
    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Emitter completable) {
    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
        this.connection = connection;
        this.completable = completable;
    }
@@ -72,6 +72,6 @@
    @Override
    public void onComplete() {
        completable.complete();
        completable.onComplete();
    }
}
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -610,7 +610,7 @@
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test(enabled = false)
    @Test
    public void testMaxRequestSize() throws Exception {
        final MockServerConnection serverConnection = new MockServerConnection();
        final MockServerConnectionFactory factory =
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
@@ -171,58 +171,58 @@
            public Single<Stream<Response>> filter(final LDAPClientConnection2 context,
                    final LdapRawMessage encodedRequestMessage,
                    final ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> next) throws Exception {
                return newSingle(new Single.OnSubscribe<Request>() {
                return newSingle(new Single.Emitter<Request>() {
                    @Override
                    public void onSubscribe(final Single.Emitter<Request> emitter) throws Exception {
                    public void subscribe(final Single.Subscriber<Request> subscriber) throws Exception {
                        LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions)
                                .readMessage(new AbstractLDAPMessageHandler() {
                            @Override
                            public void abandonRequest(final int messageID, final AbandonRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void addRequest(int messageID, AddRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void bindRequest(int messageID, int version, GenericBindRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void modifyDNRequest(int messageID, ModifyDNRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void modifyRequest(int messageID, ModifyRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void searchRequest(int messageID, SearchRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void unbindRequest(int messageID, UnbindRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                        });
                    }
                }).flatMap(new Function<Request, Publisher<Stream<Response>>, Exception>() {
                }).flatMap(new Function<Request, Single<Stream<Response>>, Exception>() {
                    @Override
                    public Publisher<Stream<Response>> apply(Request t) throws Exception {
                        return next.handle(context, t);
                    public Single<Stream<Response>> apply(final Request request) throws Exception {
                        return next.handle(context, request);
                    }
                });
            }
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -711,6 +711,10 @@
        logger.traceException(e);
      }
    }
    else
    {
        clientContext.disconnect();
    }
    // NYI -- Deregister the client connection from any server components that
    // might know about it.
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -747,8 +747,8 @@
                                LdapException>() {
                    @Override
                    public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>
                        apply(LDAPClientContext value) throws LdapException {
                        final LDAPClientConnection2 conn = canAccept(value);
                        apply(LDAPClientContext clientContext) throws LdapException {
                        final LDAPClientConnection2 conn = canAccept(clientContext);
                        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
                            @Override
                            public Single<Stream<Response>> handle(LDAPClientContext context,