From 5511a94238385a30b5b516ee360b234ff56d7c3f Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-core/src/main/java/com/forgerock/reactive/Completable.java                            |   10 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java                  |    4 
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java                     |   38 ++
 opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java                   |   63 +++--
 opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java                          |   63 ++++-
 opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java   |    2 
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java     |    6 
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java             |   24 +-
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java              |  322 ++++++++++++++++------------
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java           |   21 +
 opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java      |   16 +
 opendj-core/src/main/java/com/forgerock/reactive/Single.java                                 |   20 +
 opendj-core/src/main/java/com/forgerock/reactive/Stream.java                                 |    9 
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java |    4 
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java  |    4 
 15 files changed, 379 insertions(+), 227 deletions(-)

diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index a60df6b..73a62f9 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -21,9 +21,9 @@
 public interface Completable extends Publisher<Void> {
 
     /** Emitter is used to notify when the operation has been completed, successfully or not. */
-    public interface Emitter {
+    public interface Subscriber {
         /** Notify that this {@link Completable} is now completed. */
-        void complete();
+        void onComplete();
 
         /**
          * Notify that this {@link Completable} cannot be completed because of an error.
@@ -35,14 +35,14 @@
     }
 
     /** Adapts the streaming api to a callback one. */
-    public interface OnSubscribe {
+    public interface Emitter {
         /**
          * Called when the streaming api has been subscribed.
          *
          * @param e
-         *            The {@link Emitter} to use to communicate the completeness of this {@link Completable}
+         *            The {@link Subscriber} to use to communicate the completeness of this {@link Completable}
          */
-        void onSubscribe(Emitter e);
+        void subscribe(Subscriber e);
     }
 
     /**
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 7831df5..a5a6231 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -17,7 +17,6 @@
 
 import org.forgerock.util.Function;
 import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
 
 import io.reactivex.CompletableEmitter;
 import io.reactivex.CompletableOnSubscribe;
@@ -112,21 +111,34 @@
     }
 
     /**
+     * Create a new {@link Single} from the given error.
+     *
+     * @param <V>
+     *            Type of the datum emitted
+     * @param error
+     *            The error emitted by this {@link Single}
+     * @return A new {@link Single}
+     */
+    public static <V> Single<V> singleError(final Throwable error) {
+        return new RxJavaSingle<>(io.reactivex.Single.<V>error(error));
+    }
+
+    /**
      * Creates a bridge from callback world to {@link Single}.
      *
      * @param <V>
      *            Type of the datum emitted
-     * @param onSubscribe
+     * @param emitter
      *            Action to perform once this {@link Single} has been subscribed to.
      * @return A new {@link Single}
      */
-    public static <V> Single<V> newSingle(final Single.OnSubscribe<V> onSubscribe) {
+    public static <V> Single<V> newSingle(final Single.Emitter<V> emitter) {
         return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe<V>() {
             @Override
             public void subscribe(final SingleEmitter<V> e) throws Exception {
-                onSubscribe.onSubscribe(new Single.Emitter<V>() {
+                emitter.subscribe(new Single.Subscriber<V>() {
                     @Override
-                    public void onSuccess(V value) {
+                    public void onComplete(V value) {
                         e.onSuccess(value);
                     }
 
@@ -146,18 +158,18 @@
      *            Action to perform once this {@link Completable} has been subscribed to.
      * @return A new {@link Completable}
      */
-    public static Completable newCompletable(final Completable.OnSubscribe onSubscribe) {
+    public static Completable newCompletable(final Completable.Emitter onSubscribe) {
         return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() {
             @Override
             public void subscribe(final CompletableEmitter e) throws Exception {
-                onSubscribe.onSubscribe(new Completable.Emitter() {
+                onSubscribe.subscribe(new Completable.Subscriber() {
                     @Override
-                    public void complete() {
+                    public void onComplete() {
                         e.onComplete();
                     }
                     @Override
-                    public void onError(Throwable t) {
-                        e.onError(t);
+                    public void onError(final Throwable error) {
+                        e.onError(error);
                     }
                 });
             }
@@ -166,14 +178,14 @@
 
     private static final class RxJavaStream<V> implements Stream<V> {
 
-        private Flowable<V> impl;
+        private final Flowable<V> impl;
 
         private RxJavaStream(final Flowable<V> impl) {
             this.impl = impl;
         }
 
         @Override
-        public void subscribe(Subscriber<? super V> s) {
+        public void subscribe(org.reactivestreams.Subscriber<? super V> s) {
             impl.subscribe(s);
         }
 
@@ -219,6 +231,16 @@
         }
 
         @Override
+        public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
+            return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
+                @Override
+                public void accept(Throwable t) throws Exception {
+                    onError.accept(t);
+                }
+            }));
+        }
+
+        @Override
         public Stream<V> onErrorResumeWith(final Function<Throwable, Publisher<V>, Exception> function) {
             return new RxJavaStream<>(
                     impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, Publisher<? extends V>>() {
@@ -259,7 +281,7 @@
         }
 
         @Override
-        public void subscribe(Subscriber<? super V> s) {
+        public void subscribe(org.reactivestreams.Subscriber<? super V> s) {
             impl.toFlowable().subscribe(s);
         }
 
@@ -279,7 +301,7 @@
         }
 
         @Override
-        public <O> Single<O> flatMap(final Function<V, Publisher<O>, Exception> function) {
+        public <O> Single<O> flatMap(final Function<V, Single<O>, Exception> function) {
             return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function<V, SingleSource<O>>() {
                 @Override
                 public SingleSource<O> apply(V t) throws Exception {
@@ -287,6 +309,17 @@
                 }
             }));
         }
+
+        @Override
+        public Single<V> onErrorResumeWith(final Function<Throwable, Single<V>, Exception> function) {
+            return new RxJavaSingle<>(
+                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, SingleSource<V>>() {
+                        @Override
+                        public SingleSource<V> apply(Throwable error) throws Exception {
+                            return io.reactivex.Single.fromPublisher(function.apply(error));
+                        }
+                    }));
+        }
     }
 
     private static final class RxJavaCompletable implements Completable {
@@ -303,7 +336,7 @@
         }
 
         @Override
-        public void subscribe(Subscriber<? super Void> s) {
+        public void subscribe(org.reactivestreams.Subscriber<? super Void> s) {
             impl.<Void>toFlowable().subscribe(s);
         }
     }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Single.java b/opendj-core/src/main/java/com/forgerock/reactive/Single.java
index f5d7b25..a1f534f 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Single.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Single.java
@@ -27,14 +27,14 @@
 public interface Single<V> extends Publisher<V> {
 
     /** Emitter is used to notify when the operation has been completed, successfully or not. */
-    public interface Emitter<V> {
+    public interface Subscriber<V> {
         /**
          * Signal a success value.
          *
          * @param t
          *            the value, not null
          */
-        void onSuccess(V t);
+        void onComplete(V t);
 
         /**
          * Signal an exception.
@@ -46,13 +46,13 @@
     }
 
     /** Adapts the streaming api to a callback one. */
-    public interface OnSubscribe<V> {
+    public interface Emitter<V> {
         /**
          * Called for each SingleObserver that subscribes.
          * @param e the safe emitter instance, never null
          * @throws Exception on error
          */
-        void onSubscribe(Emitter<V> e) throws Exception;
+        void subscribe(Subscriber<V> e) throws Exception;
     }
 
     /**
@@ -82,7 +82,17 @@
      *            Function to apply to perform the asynchronous transformation
      * @return A new {@link Single} transforming the datum emitted by this {@link Single}.
      */
-    <O> Single<O> flatMap(Function<V, Publisher<O>, Exception> function);
+    <O> Single<O> flatMap(Function<V, Single<O>, Exception> function);
+
+    /**
+     * When an error occurs in this stream, continue the processing with the new {@link Single} provided by the
+     * function.
+     *
+     * @param function
+     *            Generates the single which must will used to resume operation when this {@link Single} failed.
+     * @return A new {@link Single}
+     */
+    Single<V> onErrorResumeWith(Function<Throwable, Single<V>, Exception> function);
 
     /**
      * Subscribe to the single value emitted by this {@link Single}.
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
index b19ba37..68e6ae0 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -75,6 +75,15 @@
     Stream<V> onErrorResumeWith(Function<Throwable, Publisher<V>, Exception> function);
 
     /**
+     * Invokes the on error {@link Consumer} when an error occurs on this stream.
+     *
+     * @param onError
+     *            The {@link Consumer} to invoke on error
+     * @return a new {@link Stream}
+     */
+    Stream<V> onErrorDo(Consumer<Throwable> onError);
+
+    /**
      * Subscribe to this stream and drop all data produced by it.
      */
     void subscribe();
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
index a9ba54f..0f2531f 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
@@ -28,7 +28,7 @@
  * {@code ByteSequenceReader} must be created using the associated
  * {@code ByteSequence}'s {@code asReader()} method.
  */
-public final class ByteSequenceReader {
+public class ByteSequenceReader {
 
     /** The current position in the byte sequence. */
     private int pos;
@@ -55,7 +55,7 @@
      * @param sequence
      *            The byte sequence to be read.
      */
-    ByteSequenceReader(final ByteSequence sequence) {
+    public ByteSequenceReader(final ByteSequence sequence) {
         this.sequence = sequence;
     }
 
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
index 9bb905b..b88edbb 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -23,6 +23,7 @@
 import javax.net.ssl.SSLSession;
 import javax.security.sasl.SaslServer;
 
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
 
 /**
@@ -36,17 +37,39 @@
     /** Listens for disconnection event. */
     public interface DisconnectListener {
         /**
-         * Invoked when the connection has been closed as a result of a client disconnect, a fatal connection error, or
-         * a server-side {@link #disconnect}.
+         * Invoked when the connection has been disconnected because of an error (i.e: message too big).
+         *
+         * @param context
+         *            The {@link LDAPClientContext} which has failed
+         * @param error
+         *            The error
+         */
+        void exceptionOccurred(final LDAPClientContext context, final Throwable error);
+
+        /**
+         * Invoked when the client closes the connection, possibly using an unbind request.
+         *
+         * @param context
+         *            The {@link LDAPClientContext} which has been disconnected
+         * @param unbindRequest
+         *            The unbind request, which may be {@code null} if one was not sent before the connection was
+         *            closed.
+         */
+        void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest);
+
+        /**
+         * Invoked when the connection has been disconnected by the server.
          *
          * @param context
          *            The {@link LDAPClientContext} which has been disconnected
          * @param resultCode
-         *            The {@link ResultCode} of the notification sent, or null
-         * @param message
-         *            The message of the notification sent, or null
+         *            The result code which was included with the disconnect notification, or {@code null} if no
+         *            disconnect notification was sent.
+         * @param diagnosticMessage
+         *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
          */
-        void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode, final String message);
+        void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
+                final String diagnosticMessage);
     }
 
     /**
@@ -57,30 +80,24 @@
     void onDisconnect(final DisconnectListener listener);
 
     /**
-     * Disconnects the client without sending a disconnect notification.
-     * <p>
-     * <b>Server connections:</b> invoking this method causes
-     * {@link ServerConnection#handleConnectionDisconnected
-     * handleConnectionDisconnected} to be called before this method returns.
+     * Disconnects the client without sending a disconnect notification. Invoking this method causes
+     * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
+     * method returns.
      */
     void disconnect();
 
     /**
-     * Disconnects the client and sends a disconnect notification, if possible,
-     * containing the provided result code and diagnostic message.
-     * <p>
-     * <b>Server connections:</b> invoking this method causes
-     * {@link ServerConnection#handleConnectionDisconnected
-     * handleConnectionDisconnected} to be called before this method returns.
+     * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
+     * message. Invoking this method causes
+     * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
+     * method returns.
      *
      * @param resultCode
-     *            The result code which should be included with the disconnect
-     *            notification.
-     * @param message
-     *            The diagnostic message, which may be empty or {@code null}
-     *            indicating that none was provided.
+     *            The result code to include with the disconnect notification
+     * @param diagnosticMessage
+     *            The diagnostic message to include with the disconnect notification
      */
-    void disconnect(final ResultCode resultCode, final String message);
+    void disconnect(final ResultCode resultCode, final String diagnosticMessage);
 
     /**
      * Returns the {@code InetSocketAddress} associated with the local system.
diff --git a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
index e67b849..c2dce7e 100644
--- a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
+++ b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -101,10 +101,22 @@
         final ServerConnectionAdaptor<Integer> adapter = new ServerConnectionAdaptor<>(serverConnection);
         clientContext.onDisconnect(new DisconnectListener() {
             @Override
-            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String message) {
-                serverConnection.handleConnectionDisconnected(resultCode, message);
+            public void exceptionOccurred(final LDAPClientContext context, final Throwable error) {
+                serverConnection.handleConnectionError(error);
+            }
+
+            @Override
+            public void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
+                serverConnection.handleConnectionClosed(0, unbindRequest);
+            }
+
+            @Override
+            public void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
+                    final String diagnosticMessage) {
+                serverConnection.handleConnectionDisconnected(resultCode, diagnosticMessage);
             }
         });
+
         return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
             @Override
             public Single<Stream<Response>> handle(final LDAPClientContext context,
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
index 1de0ea0..1f77cca 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -38,6 +38,7 @@
 import org.forgerock.util.Function;
 import org.forgerock.util.Options;
 import org.glassfish.grizzly.filterchain.FilterChain;
+import org.glassfish.grizzly.filterchain.FilterChainContext;
 import org.glassfish.grizzly.nio.transport.TCPNIOBindingHandler;
 import org.glassfish.grizzly.nio.transport.TCPNIOConnection;
 import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection;
@@ -66,7 +67,7 @@
      *            The addresses to listen on.
      * @param options
      *            The LDAP listener options.
-     * @param handler
+     * @param requestHandlerFactory
      *            The server connection factory which will be used to create server connections.
      * @throws IOException
      *             If an error occurred while trying to listen on the provided address.
@@ -74,8 +75,8 @@
     public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses, final Options options,
             final Function<LDAPClientContext,
                            ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
-                           LdapException> handler) throws IOException {
-        this(addresses, handler, options, null);
+                           LdapException> requestHandlerFactory) throws IOException {
+        this(addresses, requestHandlerFactory, options, null);
     }
 
     /**
@@ -84,7 +85,7 @@
      *
      * @param addresses
      *            The addresses to listen on.
-     * @param handler
+     * @param requestHandlerFactory
      *            The server connection factory which will be used to create server connections.
      * @param options
      *            The LDAP listener options.
@@ -97,14 +98,20 @@
     public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses,
             final Function<LDAPClientContext,
                            ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
-                           LdapException> handler,
+                           LdapException> requestHandlerFactory,
             final Options options, TCPNIOTransport transport) throws IOException {
+
         this.transport = DEFAULT_TRANSPORT.acquireIfNull(transport);
         this.options = Options.copyOf(options);
-        final LDAPServerFilter serverFilter = new LDAPServerFilter(handler, options,
+        final LDAPServerFilter serverFilter = new LDAPServerFilter(requestHandlerFactory, options,
                 options.get(LDAP_DECODE_OPTIONS), options.get(MAX_CONCURRENT_REQUESTS));
         final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(),
-                new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)), serverFilter);
+                new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)) {
+                    @Override
+                    protected void onLdapCodecError(FilterChainContext ctx, Throwable error) {
+                        serverFilter.exceptionOccurred(ctx, error);
+                    }
+                }, serverFilter);
         final TCPNIOBindingHandler bindingHandler = TCPNIOBindingHandler.builder(this.transport.get())
                 .processor(ldapChain).build();
         this.serverConnections = new ArrayList<>(addresses.size());
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 787a336..8532b9d 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -34,14 +34,14 @@
 import javax.security.sasl.SaslServer;
 
 import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
 import org.forgerock.opendj.io.LDAP;
+import org.forgerock.opendj.ldap.DecodeException;
 import org.forgerock.opendj.ldap.DecodeOptions;
 import org.forgerock.opendj.ldap.LDAPClientContext;
 import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.ResultCode;
-import org.forgerock.opendj.ldap.controls.Control;
-import org.forgerock.opendj.ldap.responses.BindResult;
-import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
 import org.forgerock.opendj.ldap.responses.IntermediateResponse;
 import org.forgerock.opendj.ldap.responses.Response;
@@ -55,8 +55,6 @@
 import org.forgerock.util.Function;
 import org.forgerock.util.Options;
 import org.forgerock.util.Reject;
-import org.glassfish.grizzly.CloseReason;
-import org.glassfish.grizzly.CompletionHandler;
 import org.glassfish.grizzly.Connection;
 import org.glassfish.grizzly.Grizzly;
 import org.glassfish.grizzly.attributes.Attribute;
@@ -72,8 +70,9 @@
 import org.reactivestreams.Subscription;
 
 import com.forgerock.reactive.Completable;
-import com.forgerock.reactive.Completable.Emitter;
+import com.forgerock.reactive.Consumer;
 import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Single;
 import com.forgerock.reactive.Stream;
 
 import io.reactivex.internal.util.BackpressureHelper;
@@ -84,12 +83,14 @@
  */
 final class LDAPServerFilter extends BaseFilter {
 
+    private static final Object DUMMY = new byte[0];
+
     private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
             .createAttribute("LDAPServerConnection");
 
     private final Function<LDAPClientContext,
                            ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
-                           LdapException> connectionHandler;
+                           LdapException> connectionHandlerFactory;
 
     /**
      * Map of cipher phrases to effective key size (bits). Taken from the
@@ -132,11 +133,11 @@
      *            The maximum BER element size, or <code>0</code> to indicate that there is no limit.
      */
     LDAPServerFilter(
-            Function<LDAPClientContext,
-                     ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
-                     LdapException> connectionHandler,
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+                           LdapException> connectionHandlerFactory,
             final Options connectionOptions, final DecodeOptions options, final int maxPendingRequests) {
-        this.connectionHandler = connectionHandler;
+        this.connectionHandlerFactory = connectionHandlerFactory;
         this.connectionOptions = connectionOptions;
         this.maxConcurrentRequests = maxPendingRequests;
     }
@@ -152,98 +153,86 @@
         configureConnection(connection, logger, connectionOptions);
         final ClientConnectionImpl clientContext = new ClientConnectionImpl(connection);
         LDAP_CONNECTION_ATTR.set(connection, clientContext);
-        final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> handler = connectionHandler
-                .apply(clientContext);
+        final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
+                connectionHandlerFactory.apply(clientContext);
 
-        streamFromPublisher(clientContext).flatMap(new Function<LdapRawMessage, Publisher<Integer>, Exception>() {
-            @Override
-            public Publisher<Integer> apply(final LdapRawMessage rawRequest) throws Exception {
-                return handler
-                        .handle(clientContext, rawRequest)
-                        .flatMap(new Function<Stream<Response>, Publisher<Integer>, Exception>() {
-                            @Override
-                            public Publisher<Integer> apply(final Stream<Response> response) {
-                                return clientContext.write(response.onErrorResumeWith(toErrorMessage(rawRequest))
-                                                                   .map(toResponseMessage(rawRequest))).toSingle(1);
-                            }
-                        });
-            }
-        }, maxConcurrentRequests).subscribe();
+        clientContext
+            .read()
+            .flatMap(new Function<LdapRawMessage, Publisher<Object>, Exception>() {
+                @Override
+                public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
+                    if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
+                        clientContext.notifyAndClose(rawRequest);
+                        return singleFrom(DUMMY);
+                    }
+                    Single<Stream<Response>> response;
+                    try {
+                        response = requestHandler.handle(clientContext, rawRequest);
+                    } catch (Exception e) {
+                        response = singleError(e);
+                    }
+                    return response
+                            .flatMap(new Function<Stream<Response>, Single<Object>, Exception>() {
+                                @Override
+                                public Single<Object> apply(final Stream<Response> response) {
+                                    return clientContext.write(response.map(toLdapResponseMessage(rawRequest)))
+                                                        .toSingle(DUMMY);
+                                }
+                            })
+                            .onErrorResumeWith(new Function<Throwable, Single<Object>, Exception>() {
+                                @Override
+                                public Single<Object> apply(final Throwable error) throws Exception {
+                                    if (!(error instanceof LdapException)) {
+                                        // Unexpected error, propagate it.
+                                        return singleError(error);
+                                    }
+                                    final LdapException exception = (LdapException) error;
+                                    return clientContext
+                                            .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())))
+                                            .toSingle(DUMMY);
+                                }
+                            });
+                }
+            }, maxConcurrentRequests)
+            .onErrorDo(new Consumer<Throwable>() {
+                @Override
+                public void accept(final Throwable error) throws Exception {
+                    clientContext.notifyAndCloseSilently(error);
+                }
+            })
+            .subscribe();
         return ctx.getStopAction();
     }
 
-    private Function<Throwable, Publisher<Response>, Exception> toErrorMessage(final LdapRawMessage rawRequest) {
-        return new Function<Throwable, Publisher<Response>, Exception>() {
-            @Override
-            public Publisher<Response> apply(Throwable error) throws Exception {
-                if (!(error instanceof LdapException)) {
-                    // Propagate error
-                    return streamError(error);
-                }
-
-                final Result result = ((LdapException) error).getResult();
-                switch (rawRequest.getMessageType()) {
-                case OP_TYPE_BIND_REQUEST:
-                    if (result instanceof BindResult) {
-                        return streamFrom((Response) result);
-                    }
-                    return streamFrom((Response) populateNewResultFromResult(
-                            Responses.newBindResult(result.getResultCode()), result));
-                case OP_TYPE_COMPARE_REQUEST:
-                    if (result instanceof CompareResult) {
-                        return streamFrom((Response) result);
-                    }
-                    return streamFrom((Response) populateNewResultFromResult(
-                            Responses.newCompareResult(result.getResultCode()), result));
-                case OP_TYPE_EXTENDED_REQUEST:
-                    if (result instanceof ExtendedResult) {
-                        return streamFrom((Response) result);
-                    }
-                    return streamFrom((Response) populateNewResultFromResult(
-                            Responses.newGenericExtendedResult(result.getResultCode()), result));
-                default:
-                    return streamFrom((Response) result);
-                }
-            }
-        };
-    }
-
-    private Result populateNewResultFromResult(final Result newResult,
-            final Result result) {
-        newResult.setDiagnosticMessage(result.getDiagnosticMessage());
-        newResult.setMatchedDN(result.getMatchedDN());
-        newResult.setCause(result.getCause());
-        for (final Control control : result.getControls()) {
-            newResult.addControl(control);
+    private final LdapResponseMessage toLdapResponseMessage(final LdapRawMessage rawRequest, final Result result) {
+        switch (rawRequest.getMessageType()) {
+        case OP_TYPE_ADD_REQUEST:
+            return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), result);
+        case OP_TYPE_BIND_REQUEST:
+            return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), result);
+        case OP_TYPE_COMPARE_REQUEST:
+            return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), result);
+        case OP_TYPE_DELETE_REQUEST:
+            return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), result);
+        case OP_TYPE_EXTENDED_REQUEST:
+            return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), result);
+        case OP_TYPE_MODIFY_DN_REQUEST:
+            return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), result);
+        case OP_TYPE_MODIFY_REQUEST:
+            return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), result);
+        case OP_TYPE_SEARCH_REQUEST:
+            return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), result);
+        default:
+            throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
         }
-        return newResult;
     }
 
-    private Function<Response, LdapResponseMessage, Exception> toResponseMessage(final LdapRawMessage rawRequest) {
+    private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(final LdapRawMessage rawRequest) {
         return new Function<Response, LdapResponseMessage, Exception>() {
             @Override
             public LdapResponseMessage apply(final Response response) {
                 if (response instanceof Result) {
-                    switch (rawRequest.getMessageType()) {
-                    case OP_TYPE_ADD_REQUEST:
-                        return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), response);
-                    case OP_TYPE_BIND_REQUEST:
-                        return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), response);
-                    case OP_TYPE_COMPARE_REQUEST:
-                        return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), response);
-                    case OP_TYPE_DELETE_REQUEST:
-                        return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), response);
-                    case OP_TYPE_EXTENDED_REQUEST:
-                        return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), response);
-                    case OP_TYPE_MODIFY_DN_REQUEST:
-                        return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), response);
-                    case OP_TYPE_MODIFY_REQUEST:
-                        return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), response);
-                    case OP_TYPE_SEARCH_REQUEST:
-                        return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), response);
-                    default:
-                        throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
-                    }
+                    return toLdapResponseMessage(rawRequest, (Result) response);
                 }
                 if (response instanceof IntermediateResponse) {
                     return newResponseMessage(OP_TYPE_INTERMEDIATE_RESPONSE, rawRequest.getMessageId(), response);
@@ -274,7 +263,7 @@
         return ctx.getStopAction();
     }
 
-    final class ClientConnectionImpl implements LDAPClientContext, Publisher<LdapRawMessage> {
+    final class ClientConnectionImpl implements LDAPClientContext {
 
         final class GrizzlyBackpressureSubscription implements Subscription {
             private final AtomicLong pendingRequests = new AtomicLong();
@@ -339,38 +328,24 @@
 
         private ClientConnectionImpl(final Connection<?> connection) {
             this.connection = connection;
-            connection.closeFuture().addCompletionHandler(new CompletionHandler<CloseReason>() {
-                @Override
-                public void completed(CloseReason result) {
-                    disconnect0(null, null);
-                }
+        }
 
+        Stream<LdapRawMessage> read() {
+            return streamFromPublisher(new Publisher<LdapRawMessage>() {
                 @Override
-                public void updated(CloseReason result) {
-                }
-
-                @Override
-                public void failed(Throwable throwable) {
-                }
-
-                @Override
-                public void cancelled() {
+                public void subscribe(final Subscriber<? super LdapRawMessage> subscriber) {
+                    if (upstream != null) {
+                        return;
+                    }
+                    upstream = new GrizzlyBackpressureSubscription(subscriber);
                 }
             });
         }
 
-        @Override
-        public void subscribe(final Subscriber<? super LdapRawMessage> s) {
-            if (upstream != null) {
-                return;
-            }
-            this.upstream = new GrizzlyBackpressureSubscription(s);
-        }
-
         Completable write(final Publisher<LdapResponseMessage> messages) {
-            return newCompletable(new Completable.OnSubscribe() {
+            return newCompletable(new Completable.Emitter() {
                 @Override
-                public void onSubscribe(Emitter e) {
+                public void subscribe(Completable.Subscriber e) {
                     messages.subscribe(new LdapResponseMessageWriter(connection, e));
                 }
             });
@@ -378,9 +353,9 @@
 
         @Override
         public void enableTLS(final SSLEngine sslEngine) {
-            Reject.ifNull(sslEngine);
+            Reject.ifNull(sslEngine, "sslEngine must not be null");
             synchronized (this) {
-                if (isTLSEnabled()) {
+                if (isFilterExists(SSLFilter.class)) {
                     throw new IllegalStateException("TLS already enabled");
                 }
                 SSLUtils.setSSLEngine(connection, sslEngine);
@@ -390,8 +365,14 @@
 
         @Override
         public void enableSASL(final SaslServer saslServer) {
-            SaslUtils.setSaslServer(connection, saslServer);
-            installFilter(new SaslFilter());
+            Reject.ifNull(saslServer, "saslServer must not be null");
+            synchronized (this) {
+                if (isFilterExists(SaslFilter.class)) {
+                    throw new IllegalStateException("Sasl already enabled");
+                }
+                SaslUtils.setSaslServer(connection, saslServer);
+                installFilter(new SaslFilter());
+            }
         }
 
         @Override
@@ -474,16 +455,11 @@
             GrizzlyUtils.addFilterToConnection(filter, connection);
         }
 
-        /**
-         * Indicates whether TLS is enabled this provided connection.
-         *
-         * @return {@code true} if TLS is enabled on this connection, otherwise {@code false}.
-         */
-        private boolean isTLSEnabled() {
+        private boolean isFilterExists(Class<?> filterKlass) {
             synchronized (this) {
                 final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
                 for (final Filter filter : currentFilterChain) {
-                    if (filter instanceof SSLFilter) {
+                    if (filterKlass.isAssignableFrom(filter.getClass())) {
                         return true;
                     }
                 }
@@ -493,35 +469,97 @@
 
         @Override
         public void onDisconnect(DisconnectListener listener) {
+            Reject.ifNull(listener, "listener must not be null");
             listeners.add(listener);
         }
 
         @Override
         public void disconnect() {
-            disconnect0(null, null);
+            if (isClosed.compareAndSet(false, true)) {
+                try {
+                    for (DisconnectListener listener : listeners) {
+                        listener.connectionDisconnected(this, null, null);
+                    }
+                } finally {
+                    closeConnection();
+                }
+            }
+        }
+
+        private void closeConnection() {
+            if (upstream != null) {
+                upstream.cancel();
+                upstream = null;
+            }
+            connection.closeSilently();
         }
 
         @Override
-        public void disconnect(final ResultCode resultCode, final String message) {
-            sendUnsolicitedNotification(
-                    Responses.newGenericExtendedResult(resultCode)
-                             .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION)
-                             .setDiagnosticMessage(message));
-            disconnect0(resultCode, message);
+        public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
+            // Close this connection context.
+            if (isClosed.compareAndSet(false, true)) {
+                sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
+                                                     .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION)
+                                                     .setDiagnosticMessage(diagnosticMessage));
+                try {
+                    for (DisconnectListener listener : listeners) {
+                        listener.connectionDisconnected(this, resultCode, diagnosticMessage);
+                    }
+                } finally {
+                    closeConnection();
+                }
+            }
         }
 
-        private void disconnect0(final ResultCode resultCode, final String message) {
+        private void notifyAndClose(final LdapRawMessage unbindRequest) {
+            // Close this connection context.
+            if (isClosed.compareAndSet(false, true)) {
+                if (unbindRequest == null) {
+                    doNotifySilentlyConnectionClosed(null);
+                } else {
+                    try {
+                        LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
+                                .readMessage(new AbstractLDAPMessageHandler() {
+                                    @Override
+                                    public void unbindRequest(int messageID, UnbindRequest unbindRequest)
+                                            throws DecodeException, IOException {
+                                        doNotifySilentlyConnectionClosed(unbindRequest);
+                                    }
+                                });
+                    } catch (Exception e) {
+                        doNotifySilentlyConnectionClosed(null);
+                    }
+                }
+            }
+        }
+
+        private void doNotifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
+            try {
+                for (final DisconnectListener listener : listeners) {
+                    try {
+                        listener.connectionClosed(this, unbindRequest);
+                    } catch (Exception e) {
+                        // TODO: Log as a warning ?
+                    }
+                }
+            } finally {
+                closeConnection();
+            }
+        }
+
+        private void notifyAndCloseSilently(final Throwable error) {
             // Close this connection context.
             if (isClosed.compareAndSet(false, true)) {
                 try {
-                    // Notify the server connection: it may be null if disconnect is
-                    // invoked during accept.
-                    for (DisconnectListener listener : listeners) {
-                        listener.connectionDisconnected(this, resultCode, message);
+                    for (final DisconnectListener listener : listeners) {
+                        try {
+                            listener.exceptionOccurred(this, error);
+                        } catch (Exception e) {
+                            // TODO: Log as a warning ?
+                        }
                     }
                 } finally {
-                    // Close the connection.
-                    connection.closeSilently();
+                    closeConnection();
                 }
             }
         }
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
index 730a7b1..b58139b 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -36,7 +36,7 @@
 import org.glassfish.grizzly.filterchain.FilterChainContext;
 import org.glassfish.grizzly.filterchain.NextAction;
 
-final class LdapCodec extends LDAPBaseFilter {
+abstract class LdapCodec extends LDAPBaseFilter {
 
     LdapCodec(final int maxElementSize, final DecodeOptions decodeOptions) {
         super(decodeOptions, maxElementSize);
@@ -44,15 +44,29 @@
 
     @Override
     public NextAction handleRead(final FilterChainContext ctx) throws IOException {
-        final Buffer buffer = ctx.getMessage();
-        final LdapRawMessage message = readMessage(buffer);
-        if (message != null) {
-            ctx.setMessage(message);
-            return ctx.getInvokeAction(getRemainingBuffer(buffer));
+        try {
+            final Buffer buffer = ctx.getMessage();
+            final LdapRawMessage message;
+
+            message = readMessage(buffer);
+            if (message != null) {
+                ctx.setMessage(message);
+                return ctx.getInvokeAction(getRemainingBuffer(buffer));
+            }
+            return ctx.getStopAction(getRemainingBuffer(buffer));
+        } catch (Exception e) {
+            onLdapCodecError(ctx, e);
+            // make the connection deaf to any following input
+            // onLdapDecodeError call will take care of error processing
+            // and closing the connection
+            final NextAction suspendAction = ctx.getSuspendAction();
+            ctx.completeAndRecycle();
+            return suspendAction;
         }
-        return ctx.getStopAction(getRemainingBuffer(buffer));
     }
 
+    protected abstract void onLdapCodecError(FilterChainContext ctx, Throwable error);
+
     private LdapRawMessage readMessage(final Buffer buffer) throws IOException {
         try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
             final int packetStart = buffer.position();
@@ -123,10 +137,18 @@
         try {
             final Buffer buffer = toBuffer(writer, ctx.<LdapResponseMessage> getMessage());
             ctx.setMessage(buffer);
+            return ctx.getInvokeAction();
+        } catch (Exception e) {
+            onLdapCodecError(ctx, e);
+            // make the connection deaf to any following input
+            // onLdapDecodeError call will take care of error processing
+            // and closing the connection
+            final NextAction suspendAction = ctx.getSuspendAction();
+            ctx.completeAndRecycle();
+            return suspendAction;
         } finally {
             GrizzlyUtils.recycleWriter(writer);
         }
-        return ctx.getInvokeAction();
     }
 
     private Buffer toBuffer(final LDAPWriter<ASN1BufferWriter> writer, final LdapResponseMessage message)
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 9e3fa30..007ce33 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -26,10 +26,10 @@
 final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
 
     private final Connection<?> connection;
-    private final Completable.Emitter completable;
+    private final Completable.Subscriber completable;
     private Subscription upstream;
 
-    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Emitter completable) {
+    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
         this.connection = connection;
         this.completable = completable;
     }
@@ -72,6 +72,6 @@
 
     @Override
     public void onComplete() {
-        completable.complete();
+        completable.onComplete();
     }
 }
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
index a893b367..8890a14 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -610,7 +610,7 @@
      * @throws Exception
      *             If an unexpected error occurred.
      */
-    @Test(enabled = false)
+    @Test
     public void testMaxRequestSize() throws Exception {
         final MockServerConnection serverConnection = new MockServerConnection();
         final MockServerConnectionFactory factory =
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
index 5bea25b..14db1c6 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
@@ -171,58 +171,58 @@
             public Single<Stream<Response>> filter(final LDAPClientConnection2 context,
                     final LdapRawMessage encodedRequestMessage,
                     final ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> next) throws Exception {
-                return newSingle(new Single.OnSubscribe<Request>() {
+                return newSingle(new Single.Emitter<Request>() {
                     @Override
-                    public void onSubscribe(final Single.Emitter<Request> emitter) throws Exception {
+                    public void subscribe(final Single.Subscriber<Request> subscriber) throws Exception {
                         LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions)
                                 .readMessage(new AbstractLDAPMessageHandler() {
                             @Override
                             public void abandonRequest(final int messageID, final AbandonRequest request)
                                     throws DecodeException, IOException {
-                                emitter.onSuccess(request);
+                                subscriber.onComplete(request);
                             }
 
                             @Override
                             public void addRequest(int messageID, AddRequest request)
                                     throws DecodeException, IOException {
-                                emitter.onSuccess(request);
+                                subscriber.onComplete(request);
                             }
 
                             @Override
                             public void bindRequest(int messageID, int version, GenericBindRequest request)
                                     throws DecodeException, IOException {
-                                emitter.onSuccess(request);
+                                subscriber.onComplete(request);
                             }
 
                             @Override
                             public void modifyDNRequest(int messageID, ModifyDNRequest request)
                                     throws DecodeException, IOException {
-                                emitter.onSuccess(request);
+                                subscriber.onComplete(request);
                             }
 
                             @Override
                             public void modifyRequest(int messageID, ModifyRequest request)
                                     throws DecodeException, IOException {
-                                emitter.onSuccess(request);
+                                subscriber.onComplete(request);
                             }
 
                             @Override
                             public void searchRequest(int messageID, SearchRequest request)
                                     throws DecodeException, IOException {
-                                emitter.onSuccess(request);
+                                subscriber.onComplete(request);
                             }
 
                             @Override
                             public void unbindRequest(int messageID, UnbindRequest request)
                                     throws DecodeException, IOException {
-                                emitter.onSuccess(request);
+                                subscriber.onComplete(request);
                             }
                         });
                     }
-                }).flatMap(new Function<Request, Publisher<Stream<Response>>, Exception>() {
+                }).flatMap(new Function<Request, Single<Stream<Response>>, Exception>() {
                     @Override
-                    public Publisher<Stream<Response>> apply(Request t) throws Exception {
-                        return next.handle(context, t);
+                    public Single<Stream<Response>> apply(final Request request) throws Exception {
+                        return next.handle(context, request);
                     }
                 });
             }
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 2bff320..7341f9b 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -711,6 +711,10 @@
         logger.traceException(e);
       }
     }
+    else
+    {
+        clientContext.disconnect();
+    }
 
     // NYI -- Deregister the client connection from any server components that
     // might know about it.
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index af7326a..5e71d6c 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -747,8 +747,8 @@
                                 LdapException>() {
                     @Override
                     public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>
-                        apply(LDAPClientContext value) throws LdapException {
-                        final LDAPClientConnection2 conn = canAccept(value);
+                        apply(LDAPClientContext clientContext) throws LdapException {
+                        final LDAPClientConnection2 conn = canAccept(clientContext);
                         return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
                             @Override
                             public Single<Stream<Response>> handle(LDAPClientContext context,

--
Gitblit v1.10.0