From bdc5fa0dd0980eb9e077ae80644504705a24e035 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java                |  317 ++++++++++
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java                            |   64 -
 opendj-core/clirr-ignored-api-changes.xml                                                           |   50 
 opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java               |   14 
 opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java                                 |   10 
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java            |   83 +
 opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java                 |   13 
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java                  |   24 
 opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java                     |   10 
 opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java |   28 
 opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java                                         |   14 
 opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java                    |   11 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java                      |    2 
 opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java                      |   23 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java                           |    3 
 opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java                               |   26 
 opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java            |   23 
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java         |   83 +-
 opendj-core/src/main/java/com/forgerock/reactive/Completable.java                                   |   20 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java                          |   30 
 opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java             |    9 
 opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java          |  116 ++-
 opendj-core/src/main/java/com/forgerock/reactive/Action.java                                        |    2 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java                           |   30 
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java                     |  222 +++----
 opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java                          |    2 
 opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java                                 |   10 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java                      |   17 
 /dev/null                                                                                           |  154 -----
 opendj-core/src/main/java/com/forgerock/reactive/Consumer.java                                      |    6 
 opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java            |   15 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java                    |    3 
 opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java             |  148 ----
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java                     |    2 
 opendj-core/src/main/java/com/forgerock/reactive/Single.java                                        |   29 
 opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java        |   22 
 opendj-core/src/main/java/com/forgerock/reactive/Stream.java                                        |   17 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java                               |   83 +-
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java        |   39 
 39 files changed, 964 insertions(+), 810 deletions(-)

diff --git a/opendj-core/clirr-ignored-api-changes.xml b/opendj-core/clirr-ignored-api-changes.xml
index aeb2f55..6a629c1 100644
--- a/opendj-core/clirr-ignored-api-changes.xml
+++ b/opendj-core/clirr-ignored-api-changes.xml
@@ -153,6 +153,12 @@
   </difference>
   <difference>
     <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
+    <differenceType>7012</differenceType>
+    <method>javax.security.sasl.SaslServer getSASLServer()</method>
+    <justification>Simplify management of security layer</justification>
+  </difference>
+  <difference>
+    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
     <differenceType>7004</differenceType>
     <method>void enableTLS(javax.net.ssl.SSLContext, java.lang.String[], java.lang.String[], boolean, boolean)</method>
     <justification>Simplify management of security layer</justification>
@@ -167,27 +173,15 @@
   <difference>
     <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
     <differenceType>7012</differenceType>
-    <method>void onDisconnect(org.forgerock.opendj.ldap.LDAPClientContext$DisconnectListener)</method>
+    <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContext$ConnectionEventListener)</method>
     <justification>Allows to register connection state listener</justification>
   </difference>
   <difference>
     <className>org/forgerock/opendj/ldap/LDAPListener</className>
     <differenceType>7005</differenceType>
-    <method>LDAPListener(java.net.InetSocketAddress, org.forgerock.opendj.ldap.ServerConnectionFactory, org.forgerock.util.Options)</method>
-    <to>%regex[LDAPListener\(java\.net\.SocketAddress,\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory,\s*org\.forgerock\.util\.Options\)]</to>
-    <justification>Accept multiple SocketAddress to bind to</justification>
-  </difference>
-  <difference>
-    <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
-    <differenceType>7002</differenceType>
-    <method>java.net.InetSocketAddress getSocketAddress()</method>
-    <justification>Accept multiple SocketAddress to bind to</justification>
-  </difference>
-  <difference>
-    <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
-    <differenceType>7012</differenceType>
-    <method>java.util.Set getSocketAddresses()</method>
-    <justification>Accept multiple SocketAddress to bind to</justification>
+     <method>%regex[LDAPListener\((((java\.lang\.String, +)?int,)|(java\.net\.InetSocketAddress,))\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory(, +org\.forgerock\.util\.Options)?\)]</method>
+    <to>%regex[LDAPListener\((((java\.lang\.String, +)?int,)|(java\.util\.Set,)) +org\.forgerock\.util\.Function(, +org\.forgerock\.util\.Options)?\)]</to>
+    <justification>Allow multiple bind addressses. Use function rather than specific interface</justification>
   </difference>
   <difference>
     <className>org/forgerock/opendj/ldap/LDAPListener</className>
@@ -195,13 +189,7 @@
     <method>java.net.InetSocketAddress getSocketAddress()</method>
     <justification>Accept multiple SocketAddress to bind to</justification>
   </difference>
-   <difference>
-    <className>org/forgerock/opendj/ldap/LDAPListener</className>
-    <differenceType>7012</differenceType>
-    <method>java.util.Set getSocketAddresses()</method>
-    <justification>Accept multiple SocketAddress to bind to</justification>
-  </difference>
-    <difference>
+  <difference>
     <className>org/forgerock/opendj/ldap/LDAPListener</className>
     <differenceType>7002</differenceType>
     <method>java.net.InetAddress getAddress()</method>
@@ -220,11 +208,23 @@
     <justification>Accept multiple SocketAddress to bind to</justification>
   </difference>
   <difference>
+    <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
+    <differenceType>7002</differenceType>
+    <method>java.net.InetSocketAddress getSocketAddress()</method>
+    <justification>Accept multiple SocketAddress to bind to</justification>
+  </difference>
+  <difference>
+    <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
+    <differenceType>7012</differenceType>
+    <method>java.util.Set getSocketAddresses()</method>
+    <justification>Accept multiple SocketAddress to bind to</justification>
+  </difference>
+  <difference>
     <className>org/forgerock/opendj/ldap/spi/TransportProvider</className>
     <differenceType>7005</differenceType>
     <method>org.forgerock.opendj.ldap.spi.LDAPListenerImpl getLDAPListener(java.net.InetSocketAddress, org.forgerock.opendj.ldap.ServerConnectionFactory, org.forgerock.util.Options)</method>
-    <to>%regex[org\.forgerock\.opendj\.ldap\.spi\.LDAPListenerImpl\s*getLDAPListener\(java\.util\.Set,\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory,\s*org\.forgerock\.util\.Options\)]</to>
-    <justification>Accept multiple SocketAddress to bind to</justification>
+    <to>%regex[org\.forgerock\.opendj\.ldap\.spi\.LDAPListenerImpl +getLDAPListener\(java\.util\.Set, +org\.forgerock\.util\.Function, +org\.forgerock\.util\.Options\)]</to>
+    <justification>Accept multiple SocketAddress to bind to. Use Function rather than specific interface</justification>
   </difference>
   <difference>
     <className>org/forgerock/opendj/io/LDAP</className>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Action.java b/opendj-core/src/main/java/com/forgerock/reactive/Action.java
index 72c541c..a7c8e96 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Action.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Action.java
@@ -23,7 +23,7 @@
      * Runs the action and optionally throws a checked exception.
      *
      * @throws Exception
-     *             if the implementation wishes to throw a checked exception
+     *             If an error occurred while performing the Action
      */
     void run() throws Exception;
 }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index b8391d6..dabe352 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -18,16 +18,20 @@
 import org.forgerock.util.Function;
 import org.reactivestreams.Publisher;
 
-/** {@link Completable} is used to communicates a terminated operation which doesn't produce a result. */
+/**
+ * {@link Completable} is used to communicates a terminated operation which doesn't produce a result. It extends
+ * {@link Publisher} by providing additional reactive methods allowing to act on it. The goal of this interface is to
+ * decouple ourself from reactive framework (RxJava/Reactor).
+ */
 public interface Completable extends Publisher<Void> {
 
-    /** Emitter is used to notify when the operation has been completed, successfully or not. */
+    /** Subscriber is notified when the operation has been completed, successfully or not. */
     public interface Subscriber {
-        /** Notify that this {@link Completable} is now completed. */
+        /** Called once this {@link Completable} is completed. */
         void onComplete();
 
         /**
-         * Notify that this {@link Completable} cannot be completed because of an error.
+         * Called when this {@link Completable} cannot be completed because of an error.
          *
          * @param error
          *            The error preventing this {@link Completable} to complete
@@ -40,10 +44,12 @@
         /**
          * Called when the streaming api has been subscribed.
          *
-         * @param e
+         * @param s
          *            The {@link Subscriber} to use to communicate the completeness of this {@link Completable}
+         * @throws Exception
+         *             on error
          */
-        void subscribe(Subscriber e);
+        void subscribe(Subscriber s) throws Exception;
     }
 
     /**
@@ -57,7 +63,7 @@
     Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
 
     /**
-     * Creates a {@link Single} which will emit the specified value when this {@link Completable} complete.
+     * Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
      *
      * @param <V>
      *            Type of the value to emit
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Consumer.java b/opendj-core/src/main/java/com/forgerock/reactive/Consumer.java
index e3c3949..026bca8 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Consumer.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Consumer.java
@@ -23,12 +23,12 @@
  */
 public interface Consumer<V> {
     /**
-     * Consume the given value.
+     * Consumes a value.
      *
-     * @param item
+     * @param value
      *            the value
      * @throws Exception
      *             on error
      */
-    void accept(V item) throws Exception;
+    void accept(V value) throws Exception;
 }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
index 689e54c..d98989d 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
@@ -16,26 +16,26 @@
 package com.forgerock.reactive;
 
 /**
- * Handle the processing of a request in a given context and return the resulting response.
+ * Handle the processing of an input in a given context and return the resulting output.
  *
- * @param <CTX>
+ * @param <C>
  *            Type of the context
- * @param <REQ>
- *            Type of the request to process
- * @param <REP>
- *            Type of the response
+ * @param <I>
+ *            Type of the input to process
+ * @param <O>
+ *            Type of the output as a result of the process
  */
-public interface ReactiveHandler<CTX, REQ, REP> {
+public interface ReactiveHandler<C, I, O> {
     /**
-     * Process the request given the context.
+     * Process an input given the context.
      *
      * @param context
-     *            Context in which the request must be processed
-     * @param request
-     *            The response to process
-     * @return A {@link Single} response.
+     *            Context in which the input must be processed
+     * @param input
+     *            The input to process
+     * @return An output.
      * @throws Exception
      *             if the request cannot be processed
      */
-    REP handle(final CTX context, final REQ request) throws Exception;
+    O handle(final C context, final I input) throws Exception;
 }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 35ed67c..de0d8bf 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -17,6 +17,7 @@
 
 import org.forgerock.util.Function;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscription;
 
 import io.reactivex.CompletableEmitter;
 import io.reactivex.CompletableOnSubscribe;
@@ -86,7 +87,8 @@
     }
 
     /**
-     * Create a new {@link Single} from the given {@link Publisher}.
+     * Create a new {@link Single} from the given {@link Publisher}. If the {@link Publisher} produce more than one
+     * result, they'll be dropped and the inner {@link Subscription} cancelled.
      *
      * @param <V>
      *            Type of the datum emitted
@@ -223,7 +225,7 @@
         }
 
         @Override
-        public Stream<V> onNextDo(final Consumer<V> onNext) {
+        public Stream<V> onNext(final Consumer<V> onNext) {
             return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer<V>() {
                 @Override
                 public void accept(V value) throws Exception {
@@ -233,7 +235,7 @@
         }
 
         @Override
-        public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
+        public Stream<V> onError(final Consumer<Throwable> onError) {
             return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
                 @Override
                 public void accept(Throwable t) throws Exception {
@@ -254,7 +256,7 @@
         }
 
         @Override
-        public Stream<V> onCompleteDo(final Action action) {
+        public Stream<V> onComplete(final Action action) {
             return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
                 @Override
                 public void run() throws Exception {
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
new file mode 100644
index 0000000..dad0634
--- /dev/null
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -0,0 +1,317 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package com.forgerock.reactive;
+
+import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
+import static org.forgerock.util.Reject.checkNotNull;
+
+import java.io.IOException;
+
+import org.forgerock.opendj.io.ASN1Reader;
+import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
+import org.forgerock.opendj.io.LDAP;
+import org.forgerock.opendj.io.LDAPReader;
+import org.forgerock.opendj.ldap.DecodeException;
+import org.forgerock.opendj.ldap.DecodeOptions;
+import org.forgerock.opendj.ldap.IntermediateResponseHandler;
+import org.forgerock.opendj.ldap.LDAPClientContext;
+import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.LdapResultHandler;
+import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.SearchResultHandler;
+import org.forgerock.opendj.ldap.ServerConnection;
+import org.forgerock.opendj.ldap.ServerConnectionFactory;
+import org.forgerock.opendj.ldap.requests.AbandonRequest;
+import org.forgerock.opendj.ldap.requests.AddRequest;
+import org.forgerock.opendj.ldap.requests.BindRequest;
+import org.forgerock.opendj.ldap.requests.CompareRequest;
+import org.forgerock.opendj.ldap.requests.DeleteRequest;
+import org.forgerock.opendj.ldap.requests.ExtendedRequest;
+import org.forgerock.opendj.ldap.requests.GenericBindRequest;
+import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
+import org.forgerock.opendj.ldap.requests.ModifyRequest;
+import org.forgerock.opendj.ldap.requests.SearchRequest;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+import org.forgerock.opendj.ldap.responses.BindResult;
+import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import org.forgerock.opendj.ldap.responses.IntermediateResponse;
+import org.forgerock.opendj.ldap.responses.Response;
+import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.opendj.ldap.responses.SearchResultEntry;
+import org.forgerock.opendj.ldap.responses.SearchResultReference;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.opendj.ldap.spi.TransportProvider;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.RuntimeExceptionHandler;
+
+import io.reactivex.BackpressureStrategy;
+import io.reactivex.Flowable;
+import io.reactivex.FlowableEmitter;
+import io.reactivex.FlowableOnSubscribe;
+
+/**
+ * Adapt a {@link ServerConnectionFactory} to a {@link Function} compatible with
+ * {@link TransportProvider#getLDAPListener(java.util.Set, Function, org.forgerock.util.Options)}.
+ */
+@Deprecated
+public final class ServerConnectionFactoryAdapter implements
+        Function<LDAPClientContext,
+                 ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                 LdapException> {
+
+    private final ServerConnectionFactory<LDAPClientContext, Integer> adaptee;
+    private final DecodeOptions decodeOptions;
+
+    /**
+     * Creates a new adapter.
+     *
+     * @param decodeOptions
+     *            {@link DecodeOptions} used during request deserialization
+     * @param serverConnectionFactory
+     *            the {@link ServerConnectionFactory} to adapt
+     */
+    public ServerConnectionFactoryAdapter(final DecodeOptions decodeOptions,
+            final ServerConnectionFactory<LDAPClientContext, Integer> serverConnectionFactory) {
+        this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
+        this.adaptee = checkNotNull(serverConnectionFactory, "serverConnectionFactory must not be null");
+    }
+
+    @Override
+    public ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> apply(
+            final LDAPClientContext clientContext) throws LdapException {
+        return new ServerConnectionAdapter(clientContext, decodeOptions, adaptee.handleAccept(clientContext));
+    }
+
+    /**
+     * Adapt a {@link ServerConnection} to a {@link Function} compatible with
+     * {@link TransportProvider#getLDAPListener(java.util.Set, Function, org.forgerock.util.Options)}.
+     */
+    public static final class ServerConnectionAdapter
+            implements ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> {
+
+        private final ServerConnection<Integer> adaptee;
+        private final DecodeOptions decodeOptions;
+
+        /**
+         * Creates a new adapter.
+         *
+         * @param clientContext
+         *            The {@link LDAPClientContext} which represents the client connection
+         * @param decodeOptions
+         *            The {@link DecodeOptions} to use during request deserialization
+         * @param serverConnection
+         *            the {@link ServerConnection} to adapt
+         */
+        public ServerConnectionAdapter(final LDAPClientContext clientContext, final DecodeOptions decodeOptions,
+                final ServerConnection<Integer> serverConnection) {
+            this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
+            this.adaptee = checkNotNull(serverConnection, "serverConnection must not be null");
+            clientContext.addConnectionEventListener(new ConnectionEventListener() {
+                @Override
+                public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
+                    adaptee.handleConnectionError(error);
+                }
+
+                @Override
+                public void handleConnectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
+                    if (unbindRequest == null) {
+                        adaptee.handleConnectionClosed(null, null);
+                    } else {
+                        adaptee.handleConnectionClosed(0, unbindRequest);
+                    }
+                }
+
+                @Override
+                public void handleConnectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
+                        final String diagnosticMessage) {
+                    adaptee.handleConnectionDisconnected(resultCode, diagnosticMessage);
+                }
+            });
+        }
+
+        @Override
+        public Stream<Response> handle(final LDAPClientContext context, final LdapRequestEnvelope request)
+                throws Exception {
+            final LDAPReader<ASN1Reader> reader = LDAP.getReader(request.getContent(), decodeOptions);
+            return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
+                @Override
+                public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
+                    reader.readMessage(new AbstractLDAPMessageHandler() {
+                        @Override
+                        public void abandonRequest(int messageID, AbandonRequest request)
+                                throws DecodeException, IOException {
+                            handleAbandon(messageID, request, emitter);
+                        }
+
+                        @Override
+                        public void addRequest(int messageID, AddRequest request) throws DecodeException, IOException {
+                            handleAdd(messageID, request, emitter);
+                        }
+
+                        @Override
+                        public void deleteRequest(final int messageID, final DeleteRequest request)
+                                throws DecodeException, IOException {
+                            handleDelete(messageID, request, emitter);
+                        }
+
+                        @Override
+                        public void bindRequest(int messageID, int version, GenericBindRequest request)
+                                throws DecodeException, IOException {
+                            handleBind(messageID, version, request, emitter);
+                        }
+
+                        @Override
+                        public void compareRequest(int messageID, CompareRequest request)
+                                throws DecodeException, IOException {
+                            handleCompare(messageID, request, emitter);
+                        }
+
+                        @Override
+                        public <R extends ExtendedResult> void extendedRequest(int messageID,
+                                ExtendedRequest<R> request) throws DecodeException, IOException {
+                            handleExtendedRequest(messageID, request, emitter);
+                        }
+
+                        @Override
+                        public void modifyDNRequest(int messageID, ModifyDNRequest request)
+                                throws DecodeException, IOException {
+                            handleModifyDN(messageID, request, emitter);
+                        }
+
+                        @Override
+                        public void modifyRequest(int messageID, ModifyRequest request)
+                                throws DecodeException, IOException {
+                            handleModify(messageID, request, emitter);
+                        }
+
+                        @Override
+                        public void searchRequest(int messageID, SearchRequest request)
+                                throws DecodeException, IOException {
+                            handleSearch(messageID, request, emitter);
+                        }
+
+                        @Override
+                        public void unbindRequest(int messageID, UnbindRequest request)
+                                throws DecodeException, IOException {
+                            // Unbind request are received through ConnectionEventListener only
+                        }
+                    });
+                    emitter.onComplete();
+                }
+            }, BackpressureStrategy.ERROR));
+        }
+
+        void handleAdd(final int requestId, final AddRequest request, final FlowableEmitter<Response> responseEmitter) {
+            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+            adaptee.handleAdd(requestId, request, resultAdapter, resultAdapter);
+        }
+
+        void handleBind(final int requestId, final int version, final BindRequest request,
+                final FlowableEmitter<Response> responseEmitter) {
+            final ResultHandlerAdapter<BindResult> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+            adaptee.handleBind(requestId, version, request, resultAdapter, resultAdapter);
+        }
+
+        void handleCompare(final int requestId, final CompareRequest request,
+                final FlowableEmitter<Response> responseEmitter) {
+            final ResultHandlerAdapter<CompareResult> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+            adaptee.handleCompare(requestId, request, resultAdapter, resultAdapter);
+        }
+
+        void handleDelete(final int requestId, final DeleteRequest request,
+                final FlowableEmitter<Response> responseEmitter) {
+            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+            adaptee.handleDelete(requestId, request, resultAdapter, resultAdapter);
+        }
+
+        <R extends ExtendedResult> void handleExtendedRequest(final int requestId, final ExtendedRequest<R> request,
+                final FlowableEmitter<Response> responseEmitter) {
+            final ResultHandlerAdapter<R> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+            adaptee.handleExtendedRequest(requestId, request, resultAdapter, resultAdapter);
+        }
+
+        void handleModify(final int requestId, final ModifyRequest request,
+                final FlowableEmitter<Response> responseEmitter) {
+            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+            adaptee.handleModify(requestId, request, resultAdapter, resultAdapter);
+        }
+
+        void handleModifyDN(final int requestId, final ModifyDNRequest request,
+                final FlowableEmitter<Response> responseEmitter) {
+            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+            adaptee.handleModifyDN(requestId, request, resultAdapter, resultAdapter);
+        }
+
+        void handleSearch(final int requestId, final SearchRequest request,
+                final FlowableEmitter<Response> responseEmitter) {
+            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
+            adaptee.handleSearch(requestId, request, resultAdapter, resultAdapter, resultAdapter);
+        }
+
+        void handleAbandon(int requestId, final AbandonRequest request, final FlowableEmitter<Response> emitter) {
+            adaptee.handleAbandon(requestId, request);
+        }
+
+        /** Forward all responses received from handler to a {@link LdapResponse}. */
+        private static final class ResultHandlerAdapter<R extends Response> implements IntermediateResponseHandler,
+                SearchResultHandler, LdapResultHandler<R>, RuntimeExceptionHandler {
+
+            private final FlowableEmitter<Response> adaptee;
+
+            ResultHandlerAdapter(final FlowableEmitter<Response> emitter) {
+                this.adaptee = emitter;
+            }
+
+            @Override
+            public boolean handleEntry(final SearchResultEntry entry) {
+                adaptee.onNext(entry);
+                return true;
+            }
+
+            @Override
+            public boolean handleReference(final SearchResultReference reference) {
+                adaptee.onNext(reference);
+                return true;
+            }
+
+            @Override
+            public boolean handleIntermediateResponse(final IntermediateResponse intermediateResponse) {
+                adaptee.onNext(intermediateResponse);
+                return true;
+            }
+
+            @Override
+            public void handleResult(R result) {
+                if (result != null) {
+                    adaptee.onNext(result);
+                }
+                adaptee.onComplete();
+            }
+
+            @Override
+            public void handleRuntimeException(RuntimeException exception) {
+                adaptee.onError(exception);
+            }
+
+            @Override
+            public void handleException(LdapException exception) {
+                adaptee.onError(exception);
+            }
+        }
+    }
+}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Single.java b/opendj-core/src/main/java/com/forgerock/reactive/Single.java
index a1f534f..2d8c678 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Single.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Single.java
@@ -19,40 +19,45 @@
 import org.reactivestreams.Publisher;
 
 /**
- * Single is a reactive-streams compatible promise.
+ * Single is a reactive-streams compatible promise. It extends {@link Publisher} by providing additional reactive
+ * methods allowing to act on it. The goal of this interface is to decouple ourself from reactive framework
+ * (RxJava/Reactor).
  *
  * @param <V>
  *            Type of the datum emitted
  */
 public interface Single<V> extends Publisher<V> {
 
-    /** Emitter is used to notify when the operation has been completed, successfully or not. */
+    /** Subscriber is notified when the operation has been completed, successfully or not. */
     public interface Subscriber<V> {
         /**
-         * Signal a success value.
+         * Called once this {@link Single} is completed.
          *
-         * @param t
+         * @param value
          *            the value, not null
          */
-        void onComplete(V t);
+        void onComplete(V value);
 
         /**
-         * Signal an exception.
+         * Called when this {@link Single} cannot be completed because of an error.
          *
-         * @param t
-         *            the exception, not null
+         * @param error
+         *            The error preventing this {@link Single} to complete, not null
          */
-        void onError(Throwable t);
+        void onError(Throwable error);
     }
 
     /** Adapts the streaming api to a callback one. */
     public interface Emitter<V> {
         /**
          * Called for each SingleObserver that subscribes.
-         * @param e the safe emitter instance, never null
-         * @throws Exception on error
+         *
+         * @param s
+         *            The {@link Subscriber} to use to communicate the completeness of this {@link Single}
+         * @throws Exception
+         *             on error
          */
-        void subscribe(Subscriber<V> e) throws Exception;
+        void subscribe(Subscriber<V> s) throws Exception;
     }
 
     /**
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
index 7138548..106f26f 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -19,7 +19,9 @@
 import org.reactivestreams.Publisher;
 
 /**
- * Stream is a reactive-streams compliant way to chain operations and transformation on a stream of data.
+ * Stream is a reactive-streams compliant way to chain operations and transformation on a stream of data. It extends
+ * {@link Publisher} by providing additional reactive methods allowing to act on it. The goal of this interface is to
+ * decouple ourself from reactive framework (RxJava/Reactor).
  *
  * @param <V>
  *            Type of data emitted
@@ -27,7 +29,7 @@
 public interface Stream<V> extends Publisher<V> {
 
     /**
-     * Transform the data emitted by this stream.
+     * Transforms the data emitted by this stream.
      *
      * @param <O>
      *            Type of data emitted after transformation
@@ -38,7 +40,8 @@
     <O> Stream<O> map(Function<V, O, Exception> function);
 
     /**
-     * Transform each data emitted by this stream into a new stream of data. All these streams are then merged together.
+     * Transforms each data emitted by this stream into a new stream of data. All these streams are then merged
+     * together.
      *
      * @param <O>
      *            Type of data emitted after transformation
@@ -69,7 +72,7 @@
      *            The {@link Consumer} to invoke when a value is emitted by this stream
      * @return a new {@link Stream}
      */
-    Stream<V> onNextDo(Consumer<V> onNext);
+    Stream<V> onNext(Consumer<V> onNext);
 
     /**
      * Invokes the on error {@link Consumer} when an error occurs on this stream.
@@ -78,7 +81,7 @@
      *            The {@link Consumer} to invoke on error
      * @return a new {@link Stream}
      */
-    Stream<V> onErrorDo(Consumer<Throwable> onError);
+    Stream<V> onError(Consumer<Throwable> onError);
 
     /**
      * Invokes the on complete {@link Action} when this stream is completed.
@@ -87,10 +90,10 @@
      *            The {@link Action} to invoke on stream completion
      * @return a new {@link Stream}
      */
-    Stream<V> onCompleteDo(Action onComplete);
+    Stream<V> onComplete(Action onComplete);
 
     /**
-     * Subscribe to this stream and drop all data produced by it.
+     * Subscribes to this stream and drop all data produced by it.
      */
     void subscribe();
 }
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java b/opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java
index 1720efc..31c9248 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java
@@ -17,6 +17,7 @@
 package org.forgerock.opendj.io;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -153,6 +154,19 @@
      * The protocol op type for unbind requests.
      */
     public static final byte OP_TYPE_UNBIND_REQUEST = 0x42;
+    /** Mapping between request protocol op and their respecetive response protocol op. */
+    public static final byte[] OP_TO_RESULT_TYPE = new byte[0xFF];
+    static {
+        Arrays.fill(OP_TO_RESULT_TYPE, (byte) 0x00);
+        OP_TO_RESULT_TYPE[OP_TYPE_ADD_REQUEST] = OP_TYPE_ADD_RESPONSE;
+        OP_TO_RESULT_TYPE[OP_TYPE_BIND_REQUEST] = OP_TYPE_BIND_RESPONSE;
+        OP_TO_RESULT_TYPE[OP_TYPE_COMPARE_REQUEST] = OP_TYPE_COMPARE_RESPONSE;
+        OP_TO_RESULT_TYPE[OP_TYPE_DELETE_REQUEST] = OP_TYPE_DELETE_RESPONSE;
+        OP_TO_RESULT_TYPE[OP_TYPE_EXTENDED_REQUEST] = OP_TYPE_EXTENDED_RESPONSE;
+        OP_TO_RESULT_TYPE[OP_TYPE_MODIFY_DN_REQUEST] = OP_TYPE_MODIFY_DN_RESPONSE;
+        OP_TO_RESULT_TYPE[OP_TYPE_MODIFY_REQUEST] = OP_TYPE_MODIFY_RESPONSE;
+        OP_TO_RESULT_TYPE[OP_TYPE_SEARCH_REQUEST] = OP_TYPE_SEARCH_RESULT_DONE;
+    };
     /**
      * The BER type to use for the AuthenticationChoice element in a bind
      * request when SASL authentication is to be used.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java
index 1ea5b1d..2354990 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java
@@ -24,7 +24,7 @@
 /**
  * Common options for LDAP clients and listeners.
  */
-public abstract class CommonLDAPOptions {
+abstract class CommonLDAPOptions {
     /**
      * Specifies the class loader which will be used to load the
      * {@link org.forgerock.opendj.ldap.spi.TransportProvider TransportProvider}.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
index bd212cb..83d4cf5 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -18,6 +18,7 @@
 package org.forgerock.opendj.ldap;
 
 import java.net.InetSocketAddress;
+import java.util.EventListener;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLSession;
@@ -35,7 +36,7 @@
 public interface LDAPClientContext {
 
     /** Listens for disconnection event. */
-    public interface DisconnectListener {
+    public interface ConnectionEventListener extends EventListener {
         /**
          * Invoked when the connection has been disconnected because of an error (e.g: message too big).
          *
@@ -44,7 +45,7 @@
          * @param error
          *            The error
          */
-        void exceptionOccurred(LDAPClientContext context, Throwable error);
+        void handleConnectionError(LDAPClientContext context, Throwable error);
 
         /**
          * Invoked when the client closed the connection, possibly using an unbind request.
@@ -55,7 +56,7 @@
          *            The unbind request, which may be {@code null} if one was not sent before the connection was
          *            closed.
          */
-        void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
+        void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
 
         /**
          * Invoked when the connection has been disconnected by the server.
@@ -68,28 +69,28 @@
          * @param diagnosticMessage
          *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
          */
-        void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
+        void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
     }
 
     /**
      * Register a listener which will be notified when this {@link LDAPClientContext} is disconnected.
      *
-     * @param listener The {@link DisconnectListener} to register.
+     * @param listener The {@link ConnectionEventListener} to register.
      */
-    void onDisconnect(DisconnectListener listener);
+    void addConnectionEventListener(ConnectionEventListener listener);
 
     /**
      * Disconnects the client without sending a disconnect notification. Invoking this method causes
-     * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
-     * method returns.
+     * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
+     * before this method returns.
      */
     void disconnect();
 
     /**
      * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
      * message. Invoking this method causes
-     * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
-     * method returns.
+     * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
+     * before this method returns.
      *
      * @param resultCode
      *            The result code to include with the disconnect notification
@@ -136,6 +137,15 @@
     SSLSession getSSLSession();
 
     /**
+     * Returns the {@link SaslServer} currently in use by the underlying connection, or
+     * {@code null} if SASL integrity and/or privacy protection is not enabled.
+     *
+     * @return The {@link SaslServer} currently in use by the underlying connection, or
+     *         {@code null} if SASL integrity and/or privacy protection is not enabled.
+     */
+    SaslServer getSASLServer();
+
+    /**
      * Returns {@code true} if the underlying connection has been closed as a
      * result of a client disconnect, a fatal connection error, or a server-side
      * {@link #disconnect}.
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
index 2bb4497..dac0421 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
@@ -17,8 +17,8 @@
 package org.forgerock.opendj.ldap;
 
 import static com.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT;
-import static com.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_UNEXPECTED;
 import static com.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT;
+import static com.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_UNEXPECTED;
 import static com.forgerock.opendj.ldap.CoreMessages.LDAP_CONNECTION_CONNECT_TIMEOUT;
 import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
 import static java.util.concurrent.TimeUnit.*;
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java
index 6b9bee7..647d445 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java
@@ -22,12 +22,18 @@
 import java.util.Collections;
 import java.util.Set;
 
+import org.forgerock.opendj.ldap.responses.Response;
 import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
 import org.forgerock.opendj.ldap.spi.TransportProvider;
+import org.forgerock.util.Function;
 import org.forgerock.util.Option;
 import org.forgerock.util.Options;
 import org.forgerock.util.Reject;
 
+import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Stream;
+
 /**
  * An LDAP server connection listener which waits for LDAP connection requests
  * to come in over the network and binds them to a {@link ServerConnection}
@@ -119,8 +125,7 @@
      * @param port
      *            The port to listen on.
      * @param factory
-     *            The server connection factory which will be used to create
-     *            server connections.
+     *            The handler factory which will be used to create handlers.
      * @throws IOException
      *             If an error occurred while trying to listen on the provided
      *             address.
@@ -128,7 +133,9 @@
      *             If {code factory} was {@code null}.
      */
     public LDAPListener(final int port,
-            final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> factory) throws IOException {
         this(port, factory, Options.defaultOptions());
     }
 
@@ -139,8 +146,7 @@
      * @param port
      *            The port to listen on.
      * @param factory
-     *            The server connection factory which will be used to create
-     *            server connections.
+     *            The handler factory which will be used to create handlers.
      * @param options
      *            The LDAP listener options.
      * @throws IOException
@@ -150,40 +156,42 @@
      *             If {code factory} or {@code options} was {@code null}.
      */
     public LDAPListener(final int port,
-            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> factory,
             final Options options) throws IOException {
-        this(new InetSocketAddress(port), factory, options);
+        this(Collections.singleton(new InetSocketAddress(port)), factory, options);
     }
 
     /**
      * Creates a new LDAP listener implementation which will listen for LDAP
      * client connections at the provided address.
      *
-     * @param address
-     *            The address to listen on.
+     * @param addresses
+     *            The addresses to listen on.
      * @param factory
-     *            The server connection factory which will be used to create
-     *            server connections.
+     *            The handler factory which will be used to create handlers.
      * @throws IOException
      *             If an error occurred while trying to listen on the provided
      *             address.
      * @throws NullPointerException
      *             If {@code address} or {code factory} was {@code null}.
      */
-    public LDAPListener(final InetSocketAddress address,
-            final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
-        this(address, factory, Options.defaultOptions());
+    public LDAPListener(final Set<InetSocketAddress> addresses,
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> factory) throws IOException {
+        this(addresses, factory, Options.defaultOptions());
     }
 
     /**
      * Creates a new LDAP listener implementation which will listen for LDAP
      * client connections at the provided address.
      *
-     * @param address
-     *            The address to listen on.
+     * @param addresses
+     *            The addresses to listen on.
      * @param factory
-     *            The server connection factory which will be used to create
-     *            server connections.
+     *            The handler factory which will be used to create handlers.
      * @param options
      *            The LDAP listener options.
      * @throws IOException
@@ -193,12 +201,14 @@
      *             If {@code address}, {code factory}, or {@code options} was
      *             {@code null}.
      */
-    public LDAPListener(final InetSocketAddress address,
-            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
+    public LDAPListener(final Set<InetSocketAddress> addresses,
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> factory,
             final Options options) throws IOException {
-        Reject.ifNull(address, factory, options);
+        Reject.ifNull(addresses, factory, options);
         this.provider = getTransportProvider(options);
-        this.impl = provider.getLDAPListener(Collections.singleton(address), factory, options);
+        this.impl = provider.getLDAPListener(addresses, factory, options);
     }
 
     /**
@@ -210,8 +220,7 @@
      * @param port
      *            The port to listen on.
      * @param factory
-     *            The server connection factory which will be used to create
-     *            server connections.
+     *            The handler factory which will be used to create handlers.
      * @throws IOException
      *             If an error occurred while trying to listen on the provided
      *             address.
@@ -219,7 +228,9 @@
      *             If {@code host} or {code factory} was {@code null}.
      */
     public LDAPListener(final String host, final int port,
-            final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> factory) throws IOException {
         this(host, port, factory, Options.defaultOptions());
     }
 
@@ -232,8 +243,7 @@
      * @param port
      *            The port to listen on.
      * @param factory
-     *            The server connection factory which will be used to create
-     *            server connections.
+     *            The handler factory which will be used to create handlers.
      * @param options
      *            The LDAP listener options.
      * @throws IOException
@@ -244,9 +254,11 @@
      *             {@code null}.
      */
     public LDAPListener(final String host, final int port,
-            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> factory,
             final Options options) throws IOException {
-        this(new InetSocketAddress(host, port), factory, options);
+        this(Collections.singleton(new InetSocketAddress(host, port)), factory, options);
     }
 
     /** Closes this LDAP connection listener. */
@@ -256,15 +268,24 @@
     }
 
     /**
-     * Returns the address that this LDAP listener is listening on.
+     * Returns the addresses that this LDAP listener is listening on.
      *
-     * @return The address that this LDAP listener is listening on.
+     * @return The addresses that this LDAP listener is listening on.
      */
     public Set<InetSocketAddress> getSocketAddresses() {
         return impl.getSocketAddresses();
     }
 
     /**
+     * Returns the first address that his LDAP listener is listening on.
+     *
+     * @return The addresses that this LDAP listener is listening on.
+     */
+    public InetSocketAddress firstSocketAddress() {
+        return impl.getSocketAddresses().iterator().next();
+    }
+
+    /**
      * Returns the name of the transport provider, which provides the implementation
      * of this factory.
      *
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java
index a8eab85..09690dc 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java
@@ -12,7 +12,7 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2012 ForgeRock AS.
+ * Portions copyright 2012-2016 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
@@ -35,6 +35,7 @@
  *            The type of request context.
  * @see ServerConnectionFactory
  */
+@Deprecated
 public interface ServerConnection<C> extends RequestHandler<C> {
 
     /**
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java
index 996b216..0037bb9 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java
@@ -12,7 +12,7 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2010 Sun Microsystems, Inc.
- * Portions Copyright 2014 ForgeRock AS.
+ * Portions Copyright 2014-2016 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
@@ -33,6 +33,7 @@
  * @see Connections#newInternalConnectionFactory(ServerConnectionFactory,
  *      Object) newInternalConnectionFactory
  */
+@Deprecated
 public interface ServerConnectionFactory<C, R> {
     /**
      * Invoked when a new client connection is accepted by the associated
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java
index a2efe6a..b4603a2 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java
@@ -21,7 +21,7 @@
 import org.forgerock.opendj.ldap.responses.Response;
 
 /**
- * Contains statics methods to create ldap messages.
+ * Contains static methods to create ldap messages.
  */
 public final class LdapMessages {
 
@@ -30,23 +30,23 @@
     }
 
     /**
-     * Creates a new {@link LdapRawMessage} containing a partially decoded LDAP message.
+     * Creates a new {@link     } containing a partially decoded LDAP message.
      *
      * @param messageType
      *            Operation code of the message
      * @param messageId
      *            Unique identifier of this message
-     * @param protocolVersion
+     * @param ldapVersion
      *            Protocol version to use (only for Bind requests)
      * @param rawDn
      *            Unparsed name contained in the request (or null if DN is not applicable)
      * @param reader
      *            An {@link ASN1Reader} containing the full encoded ldap message packet.
-     * @return A new {@link LdapRawMessage}
+     * @return A new {@link LdapRequestEnvelope}
      */
-    public static LdapRawMessage newRawMessage(final byte messageType, final int messageId, final int protocolVersion,
-            final ByteString rawDn, final ASN1Reader reader) {
-        return new LdapRawMessage(messageType, messageId, protocolVersion, rawDn, reader);
+    public static LdapRequestEnvelope newRequestEnvelope(final byte messageType, final int messageId,
+            final int ldapVersion, final ByteString rawDn, final ASN1Reader reader) {
+        return new LdapRequestEnvelope(messageType, messageId, ldapVersion, rawDn, reader);
     }
 
     /**
@@ -67,16 +67,16 @@
     }
 
     /**
-     * Represents an encoded LDAP message with it's envelope.
+     * Represents a Ldap Request envelope containing an encoded Request.
      */
-    public static final class LdapRawMessage extends LdapMessageEnvelope<ASN1Reader> {
+    public static final class LdapRequestEnvelope extends LdapMessageEnvelope<ASN1Reader> {
         private final ByteString rawDn;
-        private final int version;
+        private final int ldapVersion;
 
-        private LdapRawMessage(final byte messageType, final int messageId, final int version, final ByteString rawDn,
-                final ASN1Reader content) {
+        private LdapRequestEnvelope(final byte messageType, final int messageId, final int ldapVersion,
+                final ByteString rawDn, final ASN1Reader content) {
             super(messageType, messageId, content);
-            this.version = version;
+            this.ldapVersion = ldapVersion;
             this.rawDn = rawDn;
         }
 
@@ -85,8 +85,8 @@
          *
          * @return The ldap protocol version
          */
-        public int getVersion() {
-            return version;
+        public int getLdapVersion() {
+            return ldapVersion;
         }
 
         /**
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java
index 37453be..9803230 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java
@@ -20,9 +20,15 @@
 import java.util.Set;
 
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.ServerConnectionFactory;
+import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.responses.Response;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Function;
 import org.forgerock.util.Options;
 
+import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Stream;
+
 /**
  * Interface for transport providers, which provide implementation
  * for {@code LDAPConnectionFactory} and {@code LDAPListener} classes,
@@ -55,8 +61,7 @@
      * @param addresses
      *            The addresses to listen on.
      * @param factory
-     *            The server connection factory which will be used to create
-     *            server connections.
+     *            The handler factory which will be used to create handlers.
      * @param options
      *            The LDAP listener options.
      * @return an implementation of {@code LDAPListener}
@@ -65,6 +70,8 @@
      *             address.
      */
     LDAPListenerImpl getLDAPListener(Set<InetSocketAddress> addresses,
-            ServerConnectionFactory<LDAPClientContext, Integer> factory, Options options)
-            throws IOException;
+            Function<LDAPClientContext,
+                     ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                     LdapException> factory,
+            Options options) throws IOException;
 }
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
index 86f16c1..7c8a436 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
@@ -17,12 +17,13 @@
 
 package org.forgerock.opendj.ldap;
 
-import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
 import static org.forgerock.opendj.ldap.LdapException.newLdapException;
 import static org.forgerock.opendj.ldap.TestCaseUtils.loopbackWithDynamicPort;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -71,6 +72,7 @@
 
 import com.forgerock.opendj.ldap.controls.AccountUsabilityRequestControl;
 import com.forgerock.opendj.ldap.controls.AccountUsabilityResponseControl;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
 
 /**
  * A simple ldap server that manages 1000 entries and used for running
@@ -517,8 +519,10 @@
             return;
         }
         sslContext = new SSLContextBuilder().getSSLContext();
-        listener = new LDAPListener(loopbackWithDynamicPort(), getInstance(),
-                        Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096));
+        listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        getInstance()),
+                Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096));
         isRunning = true;
     }
 
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java
index bc01991..d98bdd9 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java
@@ -21,14 +21,22 @@
 import java.util.Set;
 
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.ServerConnectionFactory;
+import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.responses.Response;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Function;
 import org.forgerock.util.Options;
 
+import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Stream;
+
 /**
  * Basic LDAP listener implementation to use for tests only.
  */
 public final class BasicLDAPListener implements LDAPListenerImpl {
-    private final ServerConnectionFactory<LDAPClientContext, Integer> connectionFactory;
+    private final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> connectionFactory;
     private final Set<InetSocketAddress> socketAddresses;
 
     /**
@@ -37,15 +45,16 @@
      * @param address
      *            The address to listen on.
      * @param factory
-     *            The server connection factory can be used to create
-     *            server connections.
+     *            The server connection factory can be used to create server connections.
      * @param options
      *            The LDAP listener options.
      * @throws IOException
      *             is never thrown with this do-nothing implementation
      */
     public BasicLDAPListener(final Set<InetSocketAddress> addresses,
-            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> factory,
             final Options options) throws IOException {
         this.connectionFactory = factory;
         this.socketAddresses = addresses;
@@ -70,7 +79,9 @@
         return builder.toString();
     }
 
-    ServerConnectionFactory<LDAPClientContext, Integer> getConnectionFactory() {
+    Function<LDAPClientContext,
+             ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+             LdapException> getConnectionFactory() {
         return connectionFactory;
     }
 }
diff --git a/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java b/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java
index f397c80..0410ab5 100644
--- a/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java
+++ b/opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java
@@ -20,9 +20,15 @@
 import java.util.Set;
 
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.ServerConnectionFactory;
+import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.responses.Response;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Function;
 import org.forgerock.util.Options;
 
+import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Stream;
+
 /**
  * Provides an basic implementation of a transport provider doing nothing.
  * This should be used for tests only.
@@ -45,7 +51,10 @@
 
     @Override
     public LDAPListenerImpl getLDAPListener(Set<InetSocketAddress> addresses,
-            ServerConnectionFactory<LDAPClientContext, Integer> factory, Options options) throws IOException {
+            Function<LDAPClientContext,
+                     ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                     LdapException> factory,
+            Options options) throws IOException {
         return new BasicLDAPListener(addresses, factory, options);
     }
 
diff --git a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
index 126f97b..6aa495a 100644
--- a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
+++ b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -15,42 +15,18 @@
  */
 package com.forgerock.opendj.grizzly;
 
-import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Set;
 
 import org.forgerock.opendj.grizzly.GrizzlyLDAPConnectionFactory;
 import org.forgerock.opendj.grizzly.GrizzlyLDAPListener;
-import org.forgerock.opendj.io.ASN1Reader;
-import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
-import org.forgerock.opendj.io.LDAP;
-import org.forgerock.opendj.io.LDAPReader;
-import org.forgerock.opendj.ldap.CommonLDAPOptions;
-import org.forgerock.opendj.ldap.DecodeException;
-import org.forgerock.opendj.ldap.DecodeOptions;
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
 import org.forgerock.opendj.ldap.LdapException;
-import org.forgerock.opendj.ldap.ResultCode;
-import org.forgerock.opendj.ldap.ServerConnection;
-import org.forgerock.opendj.ldap.ServerConnectionFactory;
-import org.forgerock.opendj.ldap.requests.AbandonRequest;
-import org.forgerock.opendj.ldap.requests.AddRequest;
-import org.forgerock.opendj.ldap.requests.CompareRequest;
-import org.forgerock.opendj.ldap.requests.DeleteRequest;
-import org.forgerock.opendj.ldap.requests.ExtendedRequest;
-import org.forgerock.opendj.ldap.requests.GenericBindRequest;
-import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
-import org.forgerock.opendj.ldap.requests.ModifyRequest;
-import org.forgerock.opendj.ldap.requests.SearchRequest;
-import org.forgerock.opendj.ldap.requests.UnbindRequest;
-import org.forgerock.opendj.ldap.responses.ExtendedResult;
 import org.forgerock.opendj.ldap.responses.Response;
 import org.forgerock.opendj.ldap.spi.LDAPConnectionFactoryImpl;
 import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
 import org.forgerock.opendj.ldap.spi.TransportProvider;
 import org.forgerock.util.Function;
 import org.forgerock.util.Options;
@@ -58,11 +34,6 @@
 import com.forgerock.reactive.ReactiveHandler;
 import com.forgerock.reactive.Stream;
 
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
-import io.reactivex.FlowableEmitter;
-import io.reactivex.FlowableOnSubscribe;
-
 /**
  * Grizzly transport provider implementation.
  */
@@ -75,122 +46,15 @@
 
     @Override
     public LDAPListenerImpl getLDAPListener(final Set<InetSocketAddress> addresses,
-            final ServerConnectionFactory<LDAPClientContext, Integer> factory, final Options options)
-            throws IOException {
-        return new GrizzlyLDAPListener(addresses, options,
-                new Function<LDAPClientContext,
-                             ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
-                             LdapException>() {
-                    @Override
-                    public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
-                            final LDAPClientContext clientContext) throws LdapException {
-                        return newHandler(clientContext, factory, options);
-                    }
-                });
+            final Function<LDAPClientContext,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
+                           LdapException> factory,
+            final Options options) throws IOException {
+        return new GrizzlyLDAPListener(addresses, options, factory);
     }
 
     @Override
     public String getName() {
         return "Grizzly";
     }
-
-    private ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> newHandler(
-            final LDAPClientContext clientContext, final ServerConnectionFactory<LDAPClientContext, Integer> factory,
-            final Options options) throws LdapException {
-        final ServerConnection<Integer> serverConnection = factory.handleAccept(clientContext);
-        final ServerConnectionAdaptor<Integer> adapter = new ServerConnectionAdaptor<>(serverConnection);
-        clientContext.onDisconnect(new DisconnectListener() {
-            @Override
-            public void exceptionOccurred(final LDAPClientContext context, final Throwable error) {
-                serverConnection.handleConnectionError(error);
-            }
-
-            @Override
-            public void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
-                serverConnection.handleConnectionClosed(0, unbindRequest);
-            }
-
-            @Override
-            public void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
-                    final String diagnosticMessage) {
-                serverConnection.handleConnectionDisconnected(resultCode, diagnosticMessage);
-            }
-        });
-
-        final DecodeOptions decodeOptions = options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS);
-        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
-            @Override
-            public Stream<Response> handle(final LDAPClientContext context,
-                    final LdapRawMessage rawRequest) throws Exception {
-                final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(), decodeOptions);
-                return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
-                    @Override
-                    public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
-                        reader.readMessage(new AbstractLDAPMessageHandler() {
-                            @Override
-                            public void abandonRequest(int messageID, AbandonRequest request)
-                                    throws DecodeException, IOException {
-                                adapter.handleAbandon(messageID, request, emitter);
-                            }
-
-                            @Override
-                            public void addRequest(int messageID, AddRequest request)
-                                    throws DecodeException, IOException {
-                                adapter.handleAdd(messageID, request, emitter);
-                            }
-
-                            @Override
-                            public void deleteRequest(final int messageID, final DeleteRequest request)
-                                    throws DecodeException, IOException {
-                                adapter.handleDelete(messageID, request, emitter);
-                            }
-
-                            @Override
-                            public void bindRequest(int messageID, int version, GenericBindRequest request)
-                                    throws DecodeException, IOException {
-                                adapter.handleBind(messageID, version, request, emitter);
-                            }
-
-                            @Override
-                            public void compareRequest(int messageID, CompareRequest request)
-                                    throws DecodeException, IOException {
-                                adapter.handleCompare(messageID, request, emitter);
-                            }
-
-                            @Override
-                            public <R extends ExtendedResult> void extendedRequest(int messageID,
-                                    ExtendedRequest<R> request) throws DecodeException, IOException {
-                                adapter.handleExtendedRequest(messageID, request, emitter);
-                            }
-
-                            @Override
-                            public void modifyDNRequest(int messageID, ModifyDNRequest request)
-                                    throws DecodeException, IOException {
-                                adapter.handleModifyDN(messageID, request, emitter);
-                            }
-
-                            @Override
-                            public void modifyRequest(int messageID, ModifyRequest request)
-                                    throws DecodeException, IOException {
-                                adapter.handleModify(messageID, request, emitter);
-                            }
-
-                            @Override
-                            public void searchRequest(int messageID, SearchRequest request)
-                                    throws DecodeException, IOException {
-                                adapter.handleSearch(messageID, request, emitter);
-                            }
-
-                            @Override
-                            public void unbindRequest(int messageID, UnbindRequest request)
-                                    throws DecodeException, IOException {
-                                serverConnection.handleConnectionClosed(messageID, request);
-                            }
-                        });
-                        emitter.onComplete();
-                    }
-                }, BackpressureStrategy.ERROR));
-            }
-        };
-    }
 }
diff --git a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/ServerConnectionAdaptor.java b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/ServerConnectionAdaptor.java
deleted file mode 100644
index 9318052..0000000
--- a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/ServerConnectionAdaptor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * The contents of this file are subject to the terms of the Common Development and
- * Distribution License (the License). You may not use this file except in compliance with the
- * License.
- *
- * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
- * specific language governing permission and limitations under the License.
- *
- * When distributing Covered Software, include this CDDL Header Notice in each file and include
- * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
- * Header, with the fields enclosed by brackets [] replaced by your own identifying
- * information: "Portions Copyright [year] [name of copyright owner]".
- *
- * Copyright 2016 ForgeRock AS.
- */
-package com.forgerock.opendj.grizzly;
-
-import static org.forgerock.util.Reject.checkNotNull;
-
-import org.forgerock.opendj.ldap.IntermediateResponseHandler;
-import org.forgerock.opendj.ldap.LdapException;
-import org.forgerock.opendj.ldap.LdapResultHandler;
-import org.forgerock.opendj.ldap.SearchResultHandler;
-import org.forgerock.opendj.ldap.ServerConnection;
-import org.forgerock.opendj.ldap.requests.AbandonRequest;
-import org.forgerock.opendj.ldap.requests.AddRequest;
-import org.forgerock.opendj.ldap.requests.BindRequest;
-import org.forgerock.opendj.ldap.requests.CompareRequest;
-import org.forgerock.opendj.ldap.requests.DeleteRequest;
-import org.forgerock.opendj.ldap.requests.ExtendedRequest;
-import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
-import org.forgerock.opendj.ldap.requests.ModifyRequest;
-import org.forgerock.opendj.ldap.requests.SearchRequest;
-import org.forgerock.opendj.ldap.responses.BindResult;
-import org.forgerock.opendj.ldap.responses.CompareResult;
-import org.forgerock.opendj.ldap.responses.ExtendedResult;
-import org.forgerock.opendj.ldap.responses.IntermediateResponse;
-import org.forgerock.opendj.ldap.responses.Response;
-import org.forgerock.opendj.ldap.responses.Result;
-import org.forgerock.opendj.ldap.responses.SearchResultEntry;
-import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.util.promise.RuntimeExceptionHandler;
-
-import io.reactivex.FlowableEmitter;
-
-final class ServerConnectionAdaptor<C> {
-
-    private final ServerConnection<C> adaptee;
-
-    public ServerConnectionAdaptor(final ServerConnection<C> handler) {
-        this.adaptee = checkNotNull(handler, "handler must not be null");
-    }
-
-    public void handleAdd(final C requestContext, final AddRequest request,
-            final FlowableEmitter<Response> response) {
-        final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
-        adaptee.handleAdd(requestContext, request, resultAdapter, resultAdapter);
-    }
-
-    public void handleBind(final C requestContext, final int version, final BindRequest request,
-            final FlowableEmitter<Response> response) {
-        final ResultHandlerAdaptor<BindResult> resultAdapter = new ResultHandlerAdaptor<>(response);
-        adaptee.handleBind(requestContext, version, request, resultAdapter, resultAdapter);
-    }
-
-    public void handleCompare(final C requestContext, final CompareRequest request,
-            final FlowableEmitter<Response> response) {
-        final ResultHandlerAdaptor<CompareResult> resultAdapter = new ResultHandlerAdaptor<>(response);
-        adaptee.handleCompare(requestContext, request, resultAdapter, resultAdapter);
-    }
-
-    public void handleDelete(final C requestContext, final DeleteRequest request,
-            final FlowableEmitter<Response> response) {
-        final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
-        adaptee.handleDelete(requestContext, request, resultAdapter, resultAdapter);
-    }
-
-    public <R extends ExtendedResult> void handleExtendedRequest(final C requestContext,
-            final ExtendedRequest<R> request, final FlowableEmitter<Response> response) {
-        final ResultHandlerAdaptor<R> resultAdapter = new ResultHandlerAdaptor<>(response);
-        adaptee.handleExtendedRequest(requestContext, request, resultAdapter, resultAdapter);
-    }
-
-    public void handleModify(final C requestContext, final ModifyRequest request,
-            final FlowableEmitter<Response> response) {
-        final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
-        adaptee.handleModify(requestContext, request, resultAdapter, resultAdapter);
-    }
-
-    public void handleModifyDN(final C requestContext, final ModifyDNRequest request,
-            final FlowableEmitter<Response> response) {
-        final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
-        adaptee.handleModifyDN(requestContext, request, resultAdapter, resultAdapter);
-    }
-
-    public void handleSearch(final C requestContext, final SearchRequest request,
-            final FlowableEmitter<Response> response) {
-        final ResultHandlerAdaptor<Result> resultAdapter = new ResultHandlerAdaptor<>(response);
-        adaptee.handleSearch(requestContext, request, resultAdapter, resultAdapter, resultAdapter);
-    }
-
-    public void handleAbandon(C requestContext, final AbandonRequest request, final FlowableEmitter<Response> out) {
-        adaptee.handleAbandon(requestContext, request);
-    }
-
-    /**
-     * Forward all response received from handler to a {@link LdapResponse}.
-     */
-    private static final class ResultHandlerAdaptor<R extends Response>
-            implements IntermediateResponseHandler, SearchResultHandler, LdapResultHandler<R>, RuntimeExceptionHandler {
-
-        private final FlowableEmitter<Response> adaptee;
-
-        ResultHandlerAdaptor(final FlowableEmitter<Response> emitter) {
-            this.adaptee = emitter;
-        }
-
-        @Override
-        public boolean handleEntry(final SearchResultEntry entry) {
-            adaptee.onNext(entry);
-            return true;
-        }
-
-        @Override
-        public boolean handleReference(final SearchResultReference reference) {
-            adaptee.onNext(reference);
-            return true;
-        }
-
-        @Override
-        public boolean handleIntermediateResponse(final IntermediateResponse intermediateResponse) {
-            adaptee.onNext(intermediateResponse);
-            return true;
-        }
-
-        @Override
-        public void handleResult(R result) {
-            if (result != null) {
-                adaptee.onNext(result);
-            }
-            adaptee.onComplete();
-        }
-
-        @Override
-        public void handleRuntimeException(RuntimeException exception) {
-            adaptee.onError(exception);
-        }
-
-        @Override
-        public void handleException(LdapException exception) {
-            adaptee.onError(exception);
-        }
-    }
-}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
index fe1df91..2ebbf43 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -17,7 +17,6 @@
 package org.forgerock.opendj.grizzly;
 
 import static org.forgerock.opendj.grizzly.ServerTCPNIOTransport.SERVER_TRANSPORT;
-import static org.forgerock.opendj.ldap.CommonLDAPOptions.LDAP_DECODE_OPTIONS;
 import static org.forgerock.opendj.ldap.LDAPListener.*;
 
 import java.io.IOException;
@@ -34,11 +33,10 @@
 import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.responses.Response;
 import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
 import org.forgerock.util.Function;
 import org.forgerock.util.Options;
 import org.glassfish.grizzly.filterchain.FilterChain;
-import org.glassfish.grizzly.filterchain.FilterChainContext;
 import org.glassfish.grizzly.nio.transport.TCPNIOBindingHandler;
 import org.glassfish.grizzly.nio.transport.TCPNIOConnection;
 import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection;
@@ -74,7 +72,7 @@
      */
     public GrizzlyLDAPListener(final Set<InetSocketAddress> addresses, final Options options,
             final Function<LDAPClientContext,
-                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                            LdapException> requestHandlerFactory) throws IOException {
         this(addresses, requestHandlerFactory, options, null);
     }
@@ -97,7 +95,7 @@
      */
     public GrizzlyLDAPListener(final Set<InetSocketAddress> addresses,
             final Function<LDAPClientContext,
-                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                            LdapException> requestHandlerFactory,
             final Options options, TCPNIOTransport transport) throws IOException {
 
@@ -105,13 +103,7 @@
         this.options = Options.copyOf(options);
         final LDAPServerFilter serverFilter = new LDAPServerFilter(requestHandlerFactory, options,
                 options.get(LDAP_DECODE_OPTIONS), options.get(MAX_CONCURRENT_REQUESTS));
-        final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(),
-                new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)) {
-                    @Override
-                    protected void onLdapCodecError(FilterChainContext ctx, Throwable error) {
-                        serverFilter.exceptionOccurred(ctx, error);
-                    }
-                }, serverFilter);
+        final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(), serverFilter);
         final TCPNIOBindingHandler bindingHandler = TCPNIOBindingHandler.builder(this.transport.get())
                 .processor(ldapChain).build();
         this.serverConnections = new ArrayList<>(addresses.size());
@@ -128,16 +120,14 @@
         if (isClosed.compareAndSet(false, true)) {
             try {
                 for (TCPNIOConnection serverConnection : serverConnections) {
-                    serverConnection.close().get();
+                    serverConnection.closeSilently();
                 }
-            } catch (final InterruptedException e) {
-                // Cannot handle here.
-                Thread.currentThread().interrupt();
             } catch (final Exception e) {
                 // TODO: I18N
                 logger.warn(LocalizableMessage.raw("Exception occurred while closing listener", e));
+            } finally {
+                transport.release();
             }
-            transport.release();
         }
     }
 
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
index dcae91c..b6f059d 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
@@ -17,7 +17,7 @@
 
 package org.forgerock.opendj.grizzly;
 
-import static org.forgerock.opendj.ldap.CommonLDAPOptions.LDAP_DECODE_OPTIONS;
+import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
 import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_LOCAL_ERROR;
 import static org.forgerock.opendj.ldap.responses.Responses.newResult;
 
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 189f31c..6e1b492 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -39,6 +39,7 @@
 import org.forgerock.opendj.ldap.DecodeException;
 import org.forgerock.opendj.ldap.DecodeOptions;
 import org.forgerock.opendj.ldap.LDAPClientContext;
+import org.forgerock.opendj.ldap.LDAPListener;
 import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.requests.UnbindRequest;
@@ -49,18 +50,16 @@
 import org.forgerock.opendj.ldap.responses.Result;
 import org.forgerock.opendj.ldap.responses.SearchResultEntry;
 import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.opendj.ldap.spi.LdapMessages;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
 import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
 import org.forgerock.util.Function;
 import org.forgerock.util.Options;
 import org.forgerock.util.Reject;
 import org.glassfish.grizzly.Connection;
-import org.glassfish.grizzly.Grizzly;
-import org.glassfish.grizzly.attributes.Attribute;
 import org.glassfish.grizzly.filterchain.BaseFilter;
 import org.glassfish.grizzly.filterchain.Filter;
 import org.glassfish.grizzly.filterchain.FilterChain;
+import org.glassfish.grizzly.filterchain.FilterChainBuilder;
 import org.glassfish.grizzly.filterchain.FilterChainContext;
 import org.glassfish.grizzly.filterchain.NextAction;
 import org.glassfish.grizzly.ssl.SSLFilter;
@@ -82,11 +81,8 @@
  */
 final class LDAPServerFilter extends BaseFilter {
 
-    private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
-            .createAttribute("LDAPServerConnection");
-
     private final Function<LDAPClientContext,
-                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                            LdapException> connectionHandlerFactory;
 
     /**
@@ -119,6 +115,8 @@
     private final int maxConcurrentRequests;
     private final Options connectionOptions;
 
+    private DecodeOptions decodeOptions;
+
     /**
      * Creates a server filter with provided listener, options and max size of ASN1 element.
      *
@@ -131,11 +129,12 @@
      */
     LDAPServerFilter(
             final Function<LDAPClientContext,
-                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                            LdapException> connectionHandlerFactory,
             final Options connectionOptions, final DecodeOptions options, final int maxPendingRequests) {
         this.connectionHandlerFactory = connectionHandlerFactory;
         this.connectionOptions = connectionOptions;
+        this.decodeOptions = options;
         this.maxConcurrentRequests = maxPendingRequests;
     }
 
@@ -146,13 +145,23 @@
         connection.configureBlocking(false);
 
         final ClientConnectionImpl clientContext = new ClientConnectionImpl(connection);
-        LDAP_CONNECTION_ATTR.set(connection, clientContext);
-        final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
-                connectionHandlerFactory.apply(clientContext);
+        connection.setProcessor(FilterChainBuilder
+                .stateless()
+                .addAll((FilterChain) connection.getProcessor())
+                .add(new LdapCodec(connectionOptions.get(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES), decodeOptions) {
+                    @Override
+                    protected void onLdapCodecError(final FilterChainContext ctx, final Throwable error) {
+                        clientContext.exceptionOccurred(ctx, error);
+                    }
+                })
+                .add(clientContext)
+                .build());
 
-        clientContext.read().flatMap(new Function<LdapRawMessage, Publisher<Void>, Exception>() {
+        final ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> requestHandler =
+                connectionHandlerFactory.apply(clientContext);
+        clientContext.read().flatMap(new Function<LdapRequestEnvelope, Publisher<Void>, Exception>() {
             @Override
-            public Publisher<Void> apply(final LdapRawMessage rawRequest) throws Exception {
+            public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
                 if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
                     clientContext.notifyConnectionClosed(rawRequest);
                     return emptyStream();
@@ -185,7 +194,7 @@
                 // Swallow the error to prevent the subscribe() below to report it on the console.
                 return emptyStream();
             }
-        }).onCompleteDo(new Action() {
+        }).onComplete(new Action() {
             @Override
             public void run() throws Exception {
                 clientContext.notifyConnectionClosed(null);
@@ -194,30 +203,16 @@
         return ctx.getStopAction();
     }
 
-    private final LdapResponseMessage toLdapResponseMessage(final LdapRawMessage rawRequest, final Result result) {
-        switch (rawRequest.getMessageType()) {
-        case OP_TYPE_ADD_REQUEST:
-            return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), result);
-        case OP_TYPE_BIND_REQUEST:
-            return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), result);
-        case OP_TYPE_COMPARE_REQUEST:
-            return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), result);
-        case OP_TYPE_DELETE_REQUEST:
-            return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), result);
-        case OP_TYPE_EXTENDED_REQUEST:
-            return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), result);
-        case OP_TYPE_MODIFY_DN_REQUEST:
-            return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), result);
-        case OP_TYPE_MODIFY_REQUEST:
-            return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), result);
-        case OP_TYPE_SEARCH_REQUEST:
-            return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), result);
-        default:
+    private final LdapResponseMessage toLdapResponseMessage(final LdapRequestEnvelope rawRequest, final Result result) {
+        final byte resultType = OP_TO_RESULT_TYPE[rawRequest.getMessageType()];
+        if (resultType == 0) {
             throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
         }
+        return newResponseMessage(resultType, rawRequest.getMessageId(), result);
     }
 
-    private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(final LdapRawMessage rawRequest) {
+    private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(
+            final LdapRequestEnvelope rawRequest) {
         return new Function<Response, LdapResponseMessage, Exception>() {
             @Override
             public LdapResponseMessage apply(final Response response) {
@@ -233,58 +228,32 @@
                 if (response instanceof SearchResultReference) {
                     return newResponseMessage(OP_TYPE_SEARCH_RESULT_REFERENCE, rawRequest.getMessageId(), response);
                 }
-                throw new IllegalArgumentException();
+                throw new IllegalArgumentException(
+                        "Not implemented for a response of type " + (response != null ? response.getClass() : null));
             }
         };
     }
 
-    @Override
-    public NextAction handleRead(final FilterChainContext ctx) throws IOException {
-        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.get(ctx.getConnection());
-        if (clientContext != null) {
-            return clientContext.handleRead(ctx);
-        }
-        ctx.suspend();
-        return ctx.getSuspendAction();
-    }
-
-    @Override
-    public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
-        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
-        if (clientContext != null) {
-            clientContext.exceptionOccurred(ctx, error);
-        }
-    }
-
-    @Override
-    public NextAction handleClose(final FilterChainContext ctx) throws IOException {
-        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
-        if (clientContext != null) {
-            clientContext.handleClose(ctx);
-        }
-        return ctx.getStopAction();
-    }
-
-    final class ClientConnectionImpl implements LDAPClientContext {
+    final class ClientConnectionImpl extends BaseFilter implements LDAPClientContext {
 
         final class GrizzlyBackpressureSubscription implements Subscription {
             private final AtomicLong pendingRequests = new AtomicLong();
-            private Subscriber<? super LdapRawMessage> subscriber;
+            private Subscriber<? super LdapRequestEnvelope> subscriber;
             volatile FilterChainContext ctx;
 
-            GrizzlyBackpressureSubscription(Subscriber<? super LdapRawMessage> subscriber) {
+            GrizzlyBackpressureSubscription(Subscriber<? super LdapRequestEnvelope> subscriber) {
                 this.subscriber = subscriber;
                 subscriber.onSubscribe(this);
             }
 
             NextAction handleRead(final FilterChainContext ctx) {
-                final Subscriber<? super LdapRawMessage> sub = subscriber;
+                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                 if (sub == null) {
                     // Subscription cancelled. Stop reading
                     ctx.suspend();
                     return ctx.getSuspendAction();
                 }
-                sub.onNext((LdapRawMessage) ctx.getMessage());
+                sub.onNext((LdapRequestEnvelope) ctx.getMessage());
                 if (BackpressureHelper.produced(pendingRequests, 1) > 0) {
                     return ctx.getStopAction();
                 }
@@ -302,7 +271,7 @@
             }
 
             public void onError(final Throwable error) {
-                final Subscriber<? super LdapRawMessage> sub = subscriber;
+                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                 if (sub != null) {
                     subscriber = null;
                     sub.onError(error);
@@ -310,7 +279,7 @@
             }
 
             public void onComplete() {
-                final Subscriber<? super LdapRawMessage> sub = subscriber;
+                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                 if (sub != null) {
                     subscriber = null;
                     sub.onComplete();
@@ -325,16 +294,17 @@
 
         private final Connection<?> connection;
         private final AtomicBoolean isClosed = new AtomicBoolean(false);
-        private final List<DisconnectListener> listeners = new LinkedList<>();
+        private final List<ConnectionEventListener> listeners = new LinkedList<>();
         private SaslServer saslServer;
-        GrizzlyBackpressureSubscription upstream;
+        private GrizzlyBackpressureSubscription downstream;
 
         private ClientConnectionImpl(final Connection<?> connection) {
             this.connection = connection;
         }
 
-        NextAction handleRead(final FilterChainContext ctx)  {
-            final GrizzlyBackpressureSubscription immutableRef = upstream;
+        @Override
+        public NextAction handleRead(final FilterChainContext ctx)  {
+            final GrizzlyBackpressureSubscription immutableRef = downstream;
             if (immutableRef != null) {
                 return immutableRef.handleRead(ctx);
             }
@@ -342,29 +312,35 @@
             return ctx.getSuspendAction();
         }
 
-        void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
-            final GrizzlyBackpressureSubscription immutableRef = upstream;
-            if (immutableRef != null) {
+        @Override
+        public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
+            final GrizzlyBackpressureSubscription immutableRef = downstream;
+            if (immutableRef == null) {
+                ctx.getConnection().closeSilently();
+            } else {
                 immutableRef.onError(error);
             }
         }
 
-        NextAction handleClose(final FilterChainContext ctx) {
-            final GrizzlyBackpressureSubscription immutableRef = upstream;
-            if (immutableRef != null) {
-                immutableRef.onComplete();
+        @Override
+        public NextAction handleClose(final FilterChainContext ctx) {
+            if (isClosed.compareAndSet(false, true)) {
+                final GrizzlyBackpressureSubscription immutableRef = downstream;
+                if (immutableRef != null) {
+                    downstream.onComplete();
+                }
             }
             return ctx.getStopAction();
         }
 
-        Stream<LdapRawMessage> read() {
-            return streamFromPublisher(new Publisher<LdapRawMessage>() {
+        Stream<LdapRequestEnvelope> read() {
+            return streamFromPublisher(new Publisher<LdapRequestEnvelope>() {
                 @Override
-                public void subscribe(final Subscriber<? super LdapRawMessage> subscriber) {
-                    if (upstream != null) {
+                public void subscribe(final Subscriber<? super LdapRequestEnvelope> subscriber) {
+                    if (downstream != null) {
                         return;
                     }
-                    upstream = new GrizzlyBackpressureSubscription(subscriber);
+                    downstream = new GrizzlyBackpressureSubscription(subscriber);
                 }
             });
         }
@@ -382,7 +358,7 @@
         public boolean enableTLS(final SSLEngine sslEngine, final boolean startTls) {
             Reject.ifNull(sslEngine, "sslEngine must not be null");
             synchronized (this) {
-                if (isFilterExists(SSLFilter.class)) {
+                if (filterExists(SSLFilter.class)) {
                     return false;
                 }
                 SSLUtils.setSSLEngine(connection, sslEngine);
@@ -392,10 +368,17 @@
         }
 
         @Override
+        public SSLSession getSSLSession() {
+            final SSLEngine sslEngine = SSLUtils.getSSLEngine(connection);
+            return sslEngine != null ? sslEngine.getSession() : null;
+        }
+
+        @Override
         public void enableSASL(final SaslServer saslServer) {
             Reject.ifNull(saslServer, "saslServer must not be null");
             synchronized (this) {
-                if (isFilterExists(SaslFilter.class)) {
+                if (filterExists(SaslFilter.class)) {
+                    // FIXME: The current saslServer must be replaced with the new one
                     return;
                 }
                 this.saslServer = saslServer;
@@ -404,6 +387,11 @@
         }
 
         @Override
+        public SaslServer getSASLServer() {
+            return saslServer;
+        }
+
+        @Override
         public InetSocketAddress getLocalAddress() {
             return (InetSocketAddress) connection.getLocalAddress();
         }
@@ -438,15 +426,15 @@
             int ssf = 0;
             final String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
             if (SaslFilter.SASL_AUTH_INTEGRITY.equalsIgnoreCase(qop)) {
-                ssf = 1;
+                return 1;
             } else if (SaslFilter.SASL_AUTH_CONFIDENTIALITY.equalsIgnoreCase(qop)) {
                 final String negStrength = (String) saslServer.getNegotiatedProperty(Sasl.STRENGTH);
                 if ("low".equalsIgnoreCase(negStrength)) {
-                    ssf = 40;
+                    return 40;
                 } else if ("medium".equalsIgnoreCase(negStrength)) {
-                    ssf = 56;
+                    return 56;
                 } else if ("high".equalsIgnoreCase(negStrength)) {
-                    ssf = 128;
+                    return 128;
                 }
                 /*
                  * Treat anything else as if not security is provided and keep the server running
@@ -456,12 +444,6 @@
         }
 
         @Override
-        public SSLSession getSSLSession() {
-            final SSLEngine sslEngine = SSLUtils.getSSLEngine(connection);
-            return sslEngine != null ? sslEngine.getSession() : null;
-        }
-
-        @Override
         public String toString() {
             final StringBuilder builder = new StringBuilder();
             builder.append("LDAPClientContext(");
@@ -482,7 +464,7 @@
             GrizzlyUtils.addFilterToConnection(filter, connection);
         }
 
-        private boolean isFilterExists(Class<?> filterKlass) {
+        private boolean filterExists(Class<?> filterKlass) {
             synchronized (this) {
                 final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
                 for (final Filter filter : currentFilterChain) {
@@ -495,7 +477,7 @@
         }
 
         @Override
-        public void onDisconnect(DisconnectListener listener) {
+        public void addConnectionEventListener(ConnectionEventListener listener) {
             Reject.ifNull(listener, "listener must not be null");
             listeners.add(listener);
         }
@@ -504,8 +486,8 @@
         public void disconnect() {
             if (isClosed.compareAndSet(false, true)) {
                 try {
-                    for (DisconnectListener listener : listeners) {
-                        listener.connectionDisconnected(this, null, null);
+                    for (ConnectionEventListener listener : listeners) {
+                        listener.handleConnectionDisconnected(this, null, null);
                     }
                 } finally {
                     closeConnection();
@@ -514,9 +496,9 @@
         }
 
         private void closeConnection() {
-            if (upstream != null) {
-                upstream.cancel();
-                upstream = null;
+            if (downstream != null) {
+                downstream.cancel();
+                downstream = null;
             }
             connection.closeSilently();
         }
@@ -526,11 +508,11 @@
             // Close this connection context.
             if (isClosed.compareAndSet(false, true)) {
                 sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
-                                                     .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION)
+                                                     .setOID(OID_NOTICE_OF_DISCONNECTION)
                                                      .setDiagnosticMessage(diagnosticMessage));
                 try {
-                    for (DisconnectListener listener : listeners) {
-                        listener.connectionDisconnected(this, resultCode, diagnosticMessage);
+                    for (ConnectionEventListener listener : listeners) {
+                        listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
                     }
                 } finally {
                     closeConnection();
@@ -538,11 +520,11 @@
             }
         }
 
-        private void notifyConnectionClosed(final LdapRawMessage unbindRequest) {
+        private void notifyConnectionClosed(final LdapRequestEnvelope unbindRequest) {
             // Close this connection context.
             if (isClosed.compareAndSet(false, true)) {
                 if (unbindRequest == null) {
-                    doNotifySilentlyConnectionClosed(null);
+                    notifySilentlyConnectionClosed(null);
                 } else {
                     try {
                         LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
@@ -550,23 +532,23 @@
                                     @Override
                                     public void unbindRequest(int messageID, UnbindRequest unbindRequest)
                                             throws DecodeException, IOException {
-                                        doNotifySilentlyConnectionClosed(unbindRequest);
+                                        notifySilentlyConnectionClosed(unbindRequest);
                                     }
                                 });
                     } catch (Exception e) {
-                        doNotifySilentlyConnectionClosed(null);
+                        notifySilentlyConnectionClosed(null);
                     }
                 }
             }
         }
 
-        private void doNotifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
+        private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
             try {
-                for (final DisconnectListener listener : listeners) {
+                for (final ConnectionEventListener listener : listeners) {
                     try {
-                        listener.connectionClosed(this, unbindRequest);
+                        listener.handleConnectionClosed(this, unbindRequest);
                     } catch (Exception e) {
-                        // TODO: Log as a warning ?
+                        logger.traceException(e);
                     }
                 }
             } finally {
@@ -578,11 +560,11 @@
             // Close this connection context.
             if (isClosed.compareAndSet(false, true)) {
                 try {
-                    for (final DisconnectListener listener : listeners) {
+                    for (final ConnectionEventListener listener : listeners) {
                         try {
-                            listener.exceptionOccurred(this, error);
+                            listener.handleConnectionError(this, error);
                         } catch (Exception e) {
-                            // TODO: Log as a warning ?
+                            logger.traceException(e);
                         }
                     }
                 } finally {
@@ -598,7 +580,7 @@
 
         @Override
         public void sendUnsolicitedNotification(final ExtendedResult notification) {
-            connection.write(LdapMessages.newResponseMessage(LDAP.OP_TYPE_EXTENDED_RESPONSE, 0, notification));
+            connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification));
         }
     }
 }
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
index ed2e6f7..b20e71c 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -15,12 +15,15 @@
  */
 package org.forgerock.opendj.grizzly;
 
+import static com.forgerock.opendj.ldap.CoreMessages.ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED;
 import static org.forgerock.opendj.io.LDAP.*;
+import static org.forgerock.opendj.ldap.spi.LdapMessages.newRequestEnvelope;
 
 import java.io.IOException;
 
 import org.forgerock.opendj.io.LDAPWriter;
 import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.DecodeException;
 import org.forgerock.opendj.ldap.DecodeOptions;
 import org.forgerock.opendj.ldap.responses.BindResult;
 import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -30,40 +33,31 @@
 import org.forgerock.opendj.ldap.responses.Result;
 import org.forgerock.opendj.ldap.responses.SearchResultEntry;
 import org.forgerock.opendj.ldap.responses.SearchResultReference;
-import org.forgerock.opendj.ldap.spi.LdapMessages;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
 import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
 import org.glassfish.grizzly.Buffer;
-import org.glassfish.grizzly.Grizzly;
-import org.glassfish.grizzly.attributes.Attribute;
 import org.glassfish.grizzly.attributes.AttributeStorage;
 import org.glassfish.grizzly.filterchain.FilterChainContext;
 import org.glassfish.grizzly.filterchain.NextAction;
 
+/**
+ * Decodes {@link LdapRequestEnvelope} and encodes {@link Response}. This class keeps a state to handler the Ldap V2
+ * detection, a LdapCodec instance cannot be shared accross different connection
+ */
 abstract class LdapCodec extends LDAPBaseFilter {
 
-    private static final Attribute<Boolean> IS_LDAP_V2 = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
-            .createAttribute(LdapCodec.class.getName() + ".IsLdapV2", Boolean.FALSE);
-
-    private static final Attribute<Boolean> IS_LDAP_V2_PENDING = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
-            .createAttribute(LdapCodec.class.getName() + ".PendingLdapV2", Boolean.FALSE);
+    private boolean isLdapV2Pending = false;
+    private boolean isLdapV2 = false;
 
     LdapCodec(final int maxElementSize, final DecodeOptions decodeOptions) {
         super(decodeOptions, maxElementSize);
     }
 
     @Override
-    public NextAction handleAccept(FilterChainContext ctx) throws IOException {
-        // Default value mechanism of Grizzly's attribute doesn't seems to work.
-        IS_LDAP_V2.set(ctx.getConnection(), Boolean.FALSE);
-        return ctx.getInvokeAction();
-    }
-
-    @Override
     public NextAction handleRead(final FilterChainContext ctx) throws IOException {
         try {
             final Buffer buffer = ctx.getMessage();
-            final LdapRawMessage message;
+            final LdapRequestEnvelope message;
 
             message = readMessage(buffer, ctx.getConnection());
             if (message != null) {
@@ -73,9 +67,7 @@
             return ctx.getStopAction(getRemainingBuffer(buffer));
         } catch (Exception e) {
             onLdapCodecError(ctx, e);
-            // make the connection deaf to any following input
-            // onLdapDecodeError call will take care of error processing
-            // and closing the connection
+            ctx.getConnection().closeSilently();
             final NextAction suspendAction = ctx.getSuspendAction();
             ctx.completeAndRecycle();
             return suspendAction;
@@ -84,7 +76,7 @@
 
     protected abstract void onLdapCodecError(FilterChainContext ctx, Throwable error);
 
-    private LdapRawMessage readMessage(final Buffer buffer, final AttributeStorage attributeStorage)
+    private LdapRequestEnvelope readMessage(final Buffer buffer, final AttributeStorage attributeStorage)
             throws IOException {
         try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
             final int packetStart = buffer.position();
@@ -95,7 +87,8 @@
             final int length = reader.peekLength();
             if (length > maxASN1ElementSize) {
                 buffer.position(packetStart);
-                throw new IllegalStateException();
+                throw DecodeException.fatalError(
+                        ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED.get(length, maxASN1ElementSize));
             }
 
             final Buffer packet = buffer.slice(packetStart, buffer.position() + length);
@@ -105,7 +98,7 @@
         }
     }
 
-    private LdapRawMessage decodePacket(final ASN1BufferReader reader, final AttributeStorage attributeStorage)
+    private LdapRequestEnvelope decodePacket(final ASN1BufferReader reader, final AttributeStorage attributeStorage)
             throws IOException {
         reader.mark();
         try {
@@ -114,17 +107,17 @@
             final byte messageType = reader.peekType();
 
             final ByteString rawDn;
-            final int protocolVersion;
+            final int ldapVersion;
             switch (messageType) {
             case OP_TYPE_BIND_REQUEST:
                 reader.readStartSequence(messageType);
-                protocolVersion = (int) reader.readInteger();
+                ldapVersion = (int) reader.readInteger();
                 rawDn = reader.readOctetString();
-                IS_LDAP_V2_PENDING.set(attributeStorage, protocolVersion == 2);
+                isLdapV2Pending = ldapVersion == 2;
                 break;
             case OP_TYPE_DELETE_REQUEST:
                 rawDn = reader.readOctetString(messageType);
-                protocolVersion = -1;
+                ldapVersion = -1;
                 break;
             case OP_TYPE_ADD_REQUEST:
             case OP_TYPE_COMPARE_REQUEST:
@@ -133,13 +126,13 @@
             case OP_TYPE_SEARCH_REQUEST:
                 reader.readStartSequence(messageType);
                 rawDn = reader.readOctetString();
-                protocolVersion = -1;
+                ldapVersion = -1;
                 break;
             default:
                 rawDn = null;
-                protocolVersion = -1;
+                ldapVersion = -1;
             }
-            return LdapMessages.newRawMessage(messageType, messageId, protocolVersion, rawDn, reader);
+            return newRequestEnvelope(messageType, messageId, ldapVersion, rawDn, reader);
         } finally {
             reader.reset();
         }
@@ -152,11 +145,10 @@
     @Override
     public NextAction handleWrite(final FilterChainContext ctx) throws IOException {
         final LdapResponseMessage response = ctx.<LdapResponseMessage>getMessage();
-        if (response.getMessageType() == OP_TYPE_BIND_RESPONSE) {
-            final Boolean isLdapV2 = IS_LDAP_V2_PENDING.remove(ctx.getConnection());
-            IS_LDAP_V2.set(ctx.getConnection(), isLdapV2);
+        if (response.getMessageType() == OP_TYPE_BIND_RESPONSE && ((BindResult) response.getContent()).isSuccess()) {
+            isLdapV2 = isLdapV2Pending;
         }
-        final int protocolVersion = IS_LDAP_V2.get(ctx.getConnection()) ? 2 : 3;
+        final int protocolVersion = isLdapV2 ? 2 : 3;
 
         final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(ctx.getMemoryManager(), protocolVersion);
         try {
@@ -165,9 +157,7 @@
             return ctx.getInvokeAction();
         } catch (Exception e) {
             onLdapCodecError(ctx, e);
-            // make the connection deaf to any following input
-            // onLdapDecodeError call will take care of error processing
-            // and closing the connection
+            ctx.getConnection().closeSilently();
             final NextAction suspendAction = ctx.getSuspendAction();
             ctx.completeAndRecycle();
             return suspendAction;
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 007ce33..adecbcf 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -15,7 +15,10 @@
  */
 package org.forgerock.opendj.grizzly;
 
+import java.util.concurrent.CancellationException;
+
 import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
+import org.glassfish.grizzly.CompletionHandler;
 import org.glassfish.grizzly.Connection;
 import org.glassfish.grizzly.WriteHandler;
 import org.reactivestreams.Subscriber;
@@ -23,55 +26,79 @@
 
 import com.forgerock.reactive.Completable;
 
-final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
+final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage>, CompletionHandler {
 
     private final Connection<?> connection;
-    private final Completable.Subscriber completable;
+    private final Completable.Subscriber downstream;
     private Subscription upstream;
 
-    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
+    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber downstream) {
         this.connection = connection;
-        this.completable = completable;
+        this.downstream = downstream;
     }
 
     @Override
-    public void onSubscribe(Subscription s) {
-        this.upstream = s;
-        requestMore();
+    public void onSubscribe(final Subscription s) {
+        if (upstream != null) {
+            s.cancel();
+            return;
+        }
+        upstream = s;
+        connection.notifyCanWrite(new WriteHandler() {
+            @Override
+            public void onWritePossible() throws Exception {
+                final Subscription sub = upstream;
+                if (sub != null) {
+                    sub.request(1);
+                }
+            }
+
+            @Override
+            public void onError(final Throwable error) {
+                LdapResponseMessageWriter.this.onError(error);
+            }
+        });
     }
 
-    private void requestMore() {
-        if (connection.canWrite()) {
-            upstream.request(1);
-        } else {
-            connection.notifyCanWrite(new WriteHandler() {
-                @Override
-                public void onWritePossible() throws Exception {
-                    upstream.request(1);
-                }
+    @Override
+    public void onNext(final LdapResponseMessage message) {
+        connection.write(message).addCompletionHandler(this);
+    }
 
-                @Override
-                public void onError(Throwable t) {
-                    upstream.cancel();
-                    completable.onError(t);
-                }
-            });
+    @Override
+    public void completed(final Object result) {
+        final Subscription sub = upstream;
+        if (sub != null) {
+            sub.request(1);
         }
     }
 
     @Override
-    public void onNext(LdapResponseMessage message) {
-        connection.write(message);
-        requestMore();
+    public void cancelled() {
+        failed(new CancellationException());
     }
 
     @Override
-    public void onError(Throwable t) {
-        completable.onError(t);
+    public void failed(final Throwable error) {
+        onError(error);
+    }
+
+    @Override
+    public void updated(final Object result) {
+        // Nothing to do
+    }
+
+    @Override
+    public void onError(final Throwable error) {
+        upstream.cancel();
+        upstream = null;
+        downstream.onError(error);
     }
 
     @Override
     public void onComplete() {
-        completable.onComplete();
+        upstream.cancel();
+        upstream = null;
+        downstream.onComplete();
     }
 }
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
index 0d1933b..7660605 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
@@ -20,6 +20,7 @@
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.fest.assertions.Assertions.assertThat;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
 import static org.forgerock.opendj.ldap.Connections.*;
 import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
 import static org.forgerock.opendj.ldap.LdapException.newLdapException;
@@ -34,6 +35,7 @@
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -90,6 +92,8 @@
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
 /**
  * Tests the {@code ConnectionFactory} classes.
  */
@@ -544,11 +548,12 @@
                     }
                 });
 
-        LDAPListener listener = new LDAPListener(new InetSocketAddress("127.0.0.1", 0), mockServer);
+        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), mockServer));
+        final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
         try {
-            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(
-                    ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getHostName(),
-                    ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getPort());
+            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(listenerAddr.getHostName(),
+                    listenerAddr.getPort());
             final Connection client = clientFactory.getConnection();
             connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
             MockConnectionEventListener mockListener = null;
@@ -627,12 +632,12 @@
                     }
                 });
 
-        LDAPListener listener = new LDAPListener(new InetSocketAddress("127.0.0.1", 0), mockServer);
+        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), mockServer));
+        final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
         try {
-            LDAPConnectionFactory clientFactory =
-                    new LDAPConnectionFactory(
-                            listener.getSocketAddresses().iterator().next().getHostName(),
-                            listener.getSocketAddresses().iterator().next().getPort());
+            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(listenerAddr.getHostName(),
+                    listenerAddr.getPort());
             final Connection client = clientFactory.getConnection();
             connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
             try {
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
index 454b641..59f3ab3 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
@@ -17,7 +17,7 @@
 
 import static org.fest.assertions.Assertions.assertThat;
 import static org.fest.assertions.Fail.fail;
-import static org.forgerock.opendj.ldap.CommonLDAPOptions.*;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
 import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
 import static org.forgerock.opendj.ldap.TestCaseUtils.*;
 import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -71,6 +72,8 @@
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
 /**
  * Tests the {@link LDAPConnectionFactory} class.
  */
@@ -349,16 +352,19 @@
 
     private LDAPListener createServer() {
         try {
-            return new LDAPListener(loopbackWithDynamicPort(),
-                    new ServerConnectionFactory<LDAPClientContext, Integer>() {
-                        @Override
-                        public ServerConnection<Integer> handleAccept(
-                                final LDAPClientContext clientContext) throws LdapException {
-                            context.set(clientContext);
-                            connectLatch.release();
-                            return serverConnection;
-                        }
-                    });
+            return new LDAPListener(
+                    Collections.singleton(loopbackWithDynamicPort()),
+                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        new ServerConnectionFactory<LDAPClientContext, Integer>() {
+                            @Override
+                            public ServerConnection<Integer> handleAccept(final LDAPClientContext clientContext)
+                                    throws LdapException {
+                                context.set(clientContext);
+                                connectLatch.release();
+                                return serverConnection;
+                            }
+                        })
+            );
         } catch (IOException e) {
             fail("Unable to create LDAP listener", e);
             return null;
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java
index 9e44e68..297cdc5 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java
@@ -11,23 +11,23 @@
  * Header, with the fields enclosed by brackets [] replaced by your own identifying
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
- * Copyright 2013-2015 ForgeRock AS.
+ * Copyright 2013-2016 ForgeRock AS.
  */
 
 package org.forgerock.opendj.grizzly;
 
 import static org.fest.assertions.Assertions.assertThat;
-import static org.forgerock.util.time.Duration.duration;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-
+import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
 import static org.forgerock.opendj.ldap.LDAPConnectionFactory.REQUEST_TIMEOUT;
+import static org.forgerock.util.time.Duration.duration;
+import static org.mockito.Mockito.*;
+
 import java.net.InetSocketAddress;
+import java.util.Collections;
 
 import org.forgerock.opendj.ldap.Connections;
-import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.LDAPListener;
+import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.RequestHandler;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.SdkTestCase;
@@ -43,6 +43,8 @@
 import org.mockito.ArgumentCaptor;
 import org.testng.annotations.Test;
 
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
 /**
  * Tests LDAP connection implementation class.
  */
@@ -74,9 +76,9 @@
          * and leave the client waiting forever for a response.
          */
         @SuppressWarnings("unchecked")
-        LDAPListener listener =
-                new LDAPListener(address, Connections
-                        .newServerConnectionFactory(mock(RequestHandler.class)));
+        LDAPListener listener = new LDAPListener(Collections.singleton(address),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        Connections.newServerConnectionFactory(mock(RequestHandler.class))));
 
         /*
          * Use a very long time out in order to prevent the timeout thread from
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
index 6a2cedb..459eebd 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -16,8 +16,20 @@
  */
 package org.forgerock.opendj.grizzly;
 
+import static java.util.Arrays.asList;
+import static org.fest.assertions.Assertions.assertThat;
+import static org.fest.assertions.Fail.fail;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
+import static org.forgerock.opendj.ldap.Connections.*;
+import static org.forgerock.opendj.ldap.LDAPListener.REQUEST_MAX_SIZE_IN_BYTES;
+import static org.forgerock.opendj.ldap.LdapException.newLdapException;
+import static org.forgerock.opendj.ldap.TestCaseUtils.*;
+import static org.forgerock.util.Options.defaultOptions;
+import static org.mockito.Mockito.mock;
+
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
@@ -32,9 +44,9 @@
 import org.forgerock.opendj.ldap.LDAPConnectionFactory;
 import org.forgerock.opendj.ldap.LDAPListener;
 import org.forgerock.opendj.ldap.LdapException;
+import org.forgerock.opendj.ldap.LdapResultHandler;
 import org.forgerock.opendj.ldap.ProviderNotFoundException;
 import org.forgerock.opendj.ldap.ResultCode;
-import org.forgerock.opendj.ldap.LdapResultHandler;
 import org.forgerock.opendj.ldap.SdkTestCase;
 import org.forgerock.opendj.ldap.SearchResultHandler;
 import org.forgerock.opendj.ldap.ServerConnection;
@@ -61,16 +73,7 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static java.util.Arrays.asList;
-import static org.fest.assertions.Assertions.*;
-import static org.fest.assertions.Fail.*;
-import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
-import static org.forgerock.opendj.ldap.Connections.newRoundRobinLoadBalancer;
-import static org.forgerock.opendj.ldap.LdapException.*;
-import static org.forgerock.opendj.ldap.LDAPListener.*;
-import static org.forgerock.opendj.ldap.TestCaseUtils.*;
-import static org.forgerock.util.Options.defaultOptions;
-import static org.mockito.Mockito.*;
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
 
 /** Tests the LDAPListener class. */
 @SuppressWarnings("javadoc")
@@ -201,7 +204,9 @@
     public void testCreateLDAPListener() throws Exception {
         // test no exception is thrown, which means transport provider is
         // correctly loaded
-        LDAPListener listener = new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class));
+        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        mock(ServerConnectionFactory.class)));
         listener.close();
     }
 
@@ -212,8 +217,10 @@
         // test no exception is thrown, which means transport provider is correctly loaded
         Options options = defaultOptions().set(TRANSPORT_PROVIDER_CLASS_LOADER,
                                                        Thread.currentThread().getContextClassLoader());
-        LDAPListener listener =
-                new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class), options);
+        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS),
+                        mock(ServerConnectionFactory.class)),
+                options);
         listener.close();
     }
 
@@ -223,8 +230,10 @@
         expectedExceptionsMessageRegExp = "^The requested provider 'unknown' .*")
     public void testCreateLDAPListenerFailureProviderNotFound() throws Exception {
         Options options = defaultOptions().set(TRANSPORT_PROVIDER, "unknown");
-        LDAPListener listener
-            = new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class), options);
+        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS),
+                        mock(ServerConnectionFactory.class)),
+                options);
         listener.close();
     }
 
@@ -239,8 +248,10 @@
         final MockServerConnection serverConnection = new MockServerConnection();
         final MockServerConnectionFactory serverConnectionFactory =
                 new MockServerConnectionFactory(serverConnection);
-        final LDAPListener listener = new LDAPListener(loopbackWithDynamicPort(), serverConnectionFactory);
-        final InetSocketAddress addr = (InetSocketAddress) listener.getSocketAddresses().iterator().next();
+        final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        serverConnectionFactory));
+        final InetSocketAddress addr = listener.firstSocketAddress();
         try {
             // Connect and close.
             final Connection connection =
@@ -268,9 +279,10 @@
         final MockServerConnection onlineServerConnection = new MockServerConnection();
         final MockServerConnectionFactory onlineServerConnectionFactory =
                 new MockServerConnectionFactory(onlineServerConnection);
-        final LDAPListener onlineServerListener =
-                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
-        final InetSocketAddress onlineAddr = onlineServerListener.getSocketAddresses().iterator().next();
+        final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        onlineServerConnectionFactory));
+        final InetSocketAddress onlineAddr = onlineServerListener.firstSocketAddress();
 
         try {
             // Connection pool and load balancing tests.
@@ -312,9 +324,10 @@
 
                     };
 
-            final LDAPListener proxyListener =
-                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
-            final InetSocketAddress proxyAddr = proxyListener.getSocketAddresses().iterator().next();
+            final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                            proxyServerConnectionFactory));
+            final InetSocketAddress proxyAddr = proxyListener.firstSocketAddress();
             try {
                 // Connect and close.
                 final Connection connection =
@@ -349,9 +362,10 @@
         final MockServerConnection onlineServerConnection = new MockServerConnection();
         final MockServerConnectionFactory onlineServerConnectionFactory =
                 new MockServerConnectionFactory(onlineServerConnection);
-        final LDAPListener onlineServerListener =
-                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
-        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
+        final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        onlineServerConnectionFactory));
+        final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
 
         try {
             // Connection pool and load balancing tests.
@@ -401,10 +415,10 @@
             final MockServerConnectionFactory proxyServerConnectionFactory =
                     new MockServerConnectionFactory(proxyServerConnection);
 
-            final LDAPListener proxyListener =
-                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
-            final InetSocketAddress proxyAddr =
-                    (InetSocketAddress) proxyListener.getSocketAddresses().iterator().next();
+            final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                            proxyServerConnectionFactory));
+            final InetSocketAddress proxyAddr = (InetSocketAddress) proxyListener.firstSocketAddress();
 
             try {
                 // Connect, bind, and close.
@@ -444,9 +458,10 @@
         final MockServerConnection onlineServerConnection = new MockServerConnection();
         final MockServerConnectionFactory onlineServerConnectionFactory =
                 new MockServerConnectionFactory(onlineServerConnection);
-        final LDAPListener onlineServerListener =
-                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
-        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
+        final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        onlineServerConnectionFactory));
+        final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
 
         try {
             final MockServerConnection proxyServerConnection = new MockServerConnection();
@@ -494,8 +509,9 @@
 
                     };
 
-            final LDAPListener proxyListener =
-                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
+            final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                            proxyServerConnectionFactory));
             try {
                 // Connect and close.
                 final Connection connection =
@@ -529,9 +545,10 @@
         final MockServerConnection onlineServerConnection = new MockServerConnection();
         final MockServerConnectionFactory onlineServerConnectionFactory =
                 new MockServerConnectionFactory(onlineServerConnection);
-        final LDAPListener onlineServerListener =
-                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
-        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
+        final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                        onlineServerConnectionFactory));
+        final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
 
         try {
             final MockServerConnection proxyServerConnection = new MockServerConnection() {
@@ -574,11 +591,12 @@
                 }
 
             };
-            final InetSocketAddress proxyAddr = findFreeSocketAddress();
             final MockServerConnectionFactory proxyServerConnectionFactory =
                     new MockServerConnectionFactory(proxyServerConnection);
-            final LDAPListener proxyListener =
-                    new LDAPListener(proxyAddr, proxyServerConnectionFactory);
+            final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
+                            proxyServerConnectionFactory));
+            final InetSocketAddress proxyAddr = proxyListener.firstSocketAddress();
             try {
                 // Connect, bind, and close.
                 final Connection connection =
@@ -617,8 +635,9 @@
         final MockServerConnectionFactory factory =
                 new MockServerConnectionFactory(serverConnection);
         final Options options = defaultOptions().set(REQUEST_MAX_SIZE_IN_BYTES, 2048);
-        final InetSocketAddress addr = findFreeSocketAddress();
-        final LDAPListener listener = new LDAPListener(addr, factory, options);
+        final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), factory), options);
+        final InetSocketAddress addr = listener.firstSocketAddress();
 
         Connection connection = null;
         try {
@@ -674,11 +693,10 @@
     @Test
     public void testServerDisconnect() throws Exception {
         final MockServerConnection serverConnection = new MockServerConnection();
-        final MockServerConnectionFactory factory =
-                new MockServerConnectionFactory(serverConnection);
-        final LDAPListener listener =
-                new LDAPListener(loopbackWithDynamicPort(), factory);
-        final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
+        final MockServerConnectionFactory factory = new MockServerConnectionFactory(serverConnection);
+        final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), factory));
+        final InetSocketAddress listenerAddr = listener.firstSocketAddress();
 
         final Connection connection;
         try {
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
index da1f1f8..75f9c42 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -17,9 +17,8 @@
 
 package org.forgerock.opendj.examples;
 
-import static org.forgerock.opendj.ldap.LDAPConnectionFactory.AUTHN_BIND_REQUEST;
-import static org.forgerock.opendj.ldap.LDAPConnectionFactory.HEARTBEAT_ENABLED;
-import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
+import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
 import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
 
 import java.io.IOException;
@@ -39,6 +38,8 @@
 import org.forgerock.opendj.ldap.requests.BindRequest;
 import org.forgerock.util.Options;
 
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
 /**
  * An LDAP load balancing proxy which forwards requests to one or more remote
  * Directory Servers. This is implementation is very simple and is only intended
@@ -142,7 +143,8 @@
         final Options options = Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096);
         LDAPListener listener = null;
         try {
-            listener = new LDAPListener(localAddress, localPort, connectionHandler, options);
+            listener = new LDAPListener(localAddress, localPort, new ServerConnectionFactoryAdapter(
+                    options.get(LDAP_DECODE_OPTIONS), connectionHandler), options);
             System.out.println("Press any key to stop the server...");
             System.in.read();
         } catch (final IOException e) {
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
index 108b307..65ae330 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
@@ -12,13 +12,13 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS.
+ * Portions Copyright 2011-2016 ForgeRock AS.
  */
 
 package org.forgerock.opendj.examples;
 
 import static org.forgerock.opendj.ldap.Connections.newCachedConnectionPool;
-import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
 import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
 
 import java.io.IOException;
@@ -64,6 +64,8 @@
 import org.forgerock.opendj.ldap.schema.AttributeType;
 import org.forgerock.util.Options;
 
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
 /**
  * This example is based on the {@link Proxy}. This example does no load
  * balancing, but instead rewrites attribute descriptions and DN suffixes in
@@ -419,7 +421,8 @@
         final Options listenerOptions = Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096);
         LDAPListener listener = null;
         try {
-            listener = new LDAPListener(localAddress, localPort, connectionHandler, listenerOptions);
+            listener = new LDAPListener(localAddress, localPort, new ServerConnectionFactoryAdapter(
+                    listenerOptions.get(LDAP_DECODE_OPTIONS), connectionHandler), listenerOptions);
             System.out.println("Press any key to stop the server...");
             System.in.read();
         } catch (final IOException e) {
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java
index e2c4a2a..5288836 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java
@@ -17,7 +17,7 @@
 
 package org.forgerock.opendj.examples;
 
-import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
+import static org.forgerock.opendj.ldap.LDAPListener.*;
 
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -38,6 +38,8 @@
 import org.forgerock.opendj.ldif.LDIFEntryReader;
 import org.forgerock.util.Options;
 
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
+
 /**
  * An LDAP directory server which exposes data contained in an LDIF file. This
  * is implementation is very simple and is only intended as an example:
@@ -116,10 +118,13 @@
                             }
                         };
 
-                listener = new LDAPListener(localAddress, localPort, sslWrapper, options);
+                listener = new LDAPListener(localAddress, localPort,
+                        new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), sslWrapper), options);
             } else {
                 // No SSL.
-                listener = new LDAPListener(localAddress, localPort, connectionHandler, options);
+                listener = new LDAPListener(localAddress, localPort,
+                        new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), connectionHandler),
+                        options);
             }
             System.out.println("Press any key to stop the server...");
             System.in.read();
diff --git a/opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java b/opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java
index 7e49c4a..f3836af 100644
--- a/opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java
+++ b/opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java
@@ -16,9 +16,11 @@
 package com.forgerock.opendj.ldap.tools;
 
 import static org.fest.assertions.Assertions.assertThat;
-import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
+import static org.forgerock.opendj.ldap.TestCaseUtils.loopbackWithDynamicPort;
+import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import org.forgerock.opendj.ldap.IntermediateResponseHandler;
 import org.forgerock.opendj.ldap.LDAPClientContext;
@@ -44,6 +46,9 @@
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
 import org.forgerock.opendj.ldap.responses.Responses;
 import org.forgerock.opendj.ldap.responses.Result;
+import org.forgerock.util.Options;
+
+import com.forgerock.reactive.ServerConnectionFactoryAdapter;
 
 /** Mocked request handler sketelon for tools test cases. */
 class ToolLdapServer implements ServerConnectionFactory<LDAPClientContext, Integer> {
@@ -153,7 +158,8 @@
     }
 
     void start() throws IOException {
-        listener = new LDAPListener(findFreeSocketAddress(), this);
+        listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
+                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), this));
     }
 
     void stop() {
@@ -161,10 +167,10 @@
     }
 
     String getHostName() {
-        return listener.getSocketAddress().getHostName();
+        return listener.getSocketAddresses().iterator().next().getHostName();
     }
 
     String getPort() {
-        return Integer.toString(listener.getSocketAddress().getPort());
+        return Integer.toString(listener.getSocketAddresses().iterator().next().getPort());
     }
 }
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java
index 6c56560..d96d06c 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java
@@ -460,7 +460,9 @@
 
     /**
      * Converts from OpenDJ server {@link org.opends.server.types.Attribute} to OpenDJ LDAP SDK
-     * {@link org.forgerock.opendj.ldap.Attribute}.
+     * {@link org.forgerock.opendj.ldap.Attribute}. This method is an optimization of the equivalent from() method. It's
+     * only used in encoding/decoding and as such only wrap the required methods: the returned object partially wrap the
+     * incoming one.
      *
      * @param attribute
      *            value to convert
@@ -625,14 +627,15 @@
     }
 
     /**
-     * Converts from OpenDJ server
-     * {@link org.opends.server.types.SearchResultEntry} to OpenDJ LDAP SDK
-     * {@link org.forgerock.opendj.ldap.responses.SearchResultEntry}.
+     * Converts from OpenDJ server {@link org.opends.server.types.SearchResultEntry} to OpenDJ LDAP SDK
+     * {@link org.forgerock.opendj.ldap.responses.SearchResultEntry}. This method is an optimization of the equivalent
+     * from() method. It's only used in encoding/decoding and as such only wrap the required methods: the returned
+     * object partially wrap the incoming one.
      *
      * @param srvResultEntry
-     *          value to convert
+     *            value to convert
      * @param ldapVersion
-     *         Version of the ldap protocol
+     *            Version of the ldap protocol
      * @return the converted value
      */
     public static org.forgerock.opendj.ldap.responses.SearchResultEntry partiallyWrap(
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index af9a708..2cb5125 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -54,14 +54,14 @@
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
+import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.CompareResult;
 import org.forgerock.opendj.ldap.responses.Response;
 import org.forgerock.opendj.ldap.responses.Responses;
 import org.forgerock.opendj.ldap.responses.Result;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
 import org.opends.server.api.ClientConnection;
 import org.opends.server.api.ConnectionHandler;
 import org.opends.server.core.AbandonOperationBasis;
@@ -123,7 +123,7 @@
  * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
  */
 public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
-        ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
+        ReactiveHandler<QueueingStrategy, LdapRequestEnvelope, Stream<Response>> {
     private static final String REACTIVE_OUT = "reactive.out";
 
     /** The tracer object for the debug logger. */
@@ -230,9 +230,9 @@
         }
 
         connectionID = DirectoryServer.newConnectionAccepted(this);
-        clientContext.onDisconnect(new DisconnectListener() {
+        clientContext.addConnectionEventListener(new ConnectionEventListener() {
             @Override
-            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+            public void handleConnectionError(LDAPClientContext context, Throwable error) {
                 if (error instanceof LocalizableException) {
                     disconnect(
                             DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
@@ -242,13 +242,13 @@
             }
 
             @Override
-            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
+            public void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode,
                     String diagnosticMessage) {
                 disconnect(DisconnectReason.SERVER_ERROR, false, null);
             }
 
             @Override
-            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+            public void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
                 disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
             }
         });
@@ -372,7 +372,7 @@
      */
     @Override
     public boolean isSecure() {
-        return false;
+        return clientContext.getSSLSession() != null || clientContext.getSASLServer() != null;
     }
 
     /**
@@ -405,7 +405,7 @@
         // an error result to the client indicating that a problem occurred.
         if (removeOperationInProgress(operation.getMessageID())) {
             final Response response = operationToResponse(operation);
-            final FlowableEmitter<Response> out = getOut(operation);
+            final FlowableEmitter<Response> out = getAttachedEmitter(operation);
             if (response != null) {
                 out.onNext(response);
             }
@@ -534,22 +534,18 @@
      *            The search result entry to be sent to the client
      */
     @Override
-    public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
-        getEmitter(searchOperation).onNext(toResponse(searchEntry));
+    public void sendSearchEntry(final SearchOperation searchOperation, final SearchResultEntry searchEntry) {
+        getAttachedEmitter(searchOperation).onNext(toResponse(searchEntry));
     }
 
-    private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
-        return getOut(searchOperation);
-    }
-
-    private Response toResponse(SearchResultEntry searchEntry) {
-        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
-    }
-
-    private FlowableEmitter<Response> getOut(Operation operation) {
+    private FlowableEmitter<Response> getAttachedEmitter(final Operation operation) {
         return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
     }
 
+    private Response toResponse(final SearchResultEntry searchEntry) {
+        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
+    }
+
     /**
      * Sends the provided search result reference to the client.
      *
@@ -572,7 +568,7 @@
             return false;
         }
 
-        final FlowableEmitter<Response> out = getOut(searchOperation);
+        final FlowableEmitter<Response> out = getAttachedEmitter(searchOperation);
         out.onNext(Converters.from(searchReference));
 
         return true;
@@ -589,7 +585,7 @@
     @Override
     protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
         final Operation operation = intermediateResponse.getOperation();
-        final FlowableEmitter<Response> out = getOut(operation);
+        final FlowableEmitter<Response> emitter = getAttachedEmitter(operation);
 
         final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
                 intermediateResponse.getValue());
@@ -597,7 +593,7 @@
             response.addControl(Converters.from(control));
         }
 
-        out.onNext(response);
+        emitter.onNext(response);
 
         // The only reason we shouldn't continue processing is if the
         // connection is closed.
@@ -962,14 +958,15 @@
      *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
      */
     @Override
-    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
-        return streamFromPublisher(
-                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
+    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
+        return streamFromPublisher(new BlockingBackpressureSubscription(
+                Flowable.create(new FlowableOnSubscribe<Response>() {
                     @Override
                     public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                         processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                     }
-                }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
+                }, BackpressureStrategy.ERROR)))
+                .onNext(new Consumer<Response>() {
                     @Override
                     public void accept(final Response response) throws Exception {
                         if (keepStats) {
@@ -1003,7 +1000,7 @@
         }
     }
 
-    private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
+    private final byte toLdapResponseType(final LdapRequestEnvelope rawRequest, final Response response) {
         if (response instanceof Result) {
             return toLdapResultType(rawRequest.getMessageType());
         }
@@ -1160,8 +1157,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1229,17 +1226,17 @@
             versionString = "2";
 
             if (!connectionHandler.allowLDAPv2()) {
-                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                        ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
+                                    .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                 out.onComplete();
-                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
+                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
                 return false;
             }
 
             if (!controls.isEmpty()) {
                 // LDAPv2 clients aren't allowed to send controls.
-                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                        ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
+                                    .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                 out.onComplete();
                 disconnectControlsNotAllowed();
                 return false;
@@ -1390,8 +1387,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1514,8 +1511,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1569,8 +1566,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1626,8 +1623,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index b7dc22f..5b5d2ba 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -49,17 +49,16 @@
 import org.forgerock.opendj.config.server.ConfigChangeResult;
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.config.server.ConfigurationChangeListener;
-import org.forgerock.opendj.grizzly.GrizzlyLDAPListener;
 import org.forgerock.opendj.ldap.AddressMask;
 import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
+import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
 import org.forgerock.opendj.ldap.LDAPListener;
 import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.Response;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
 import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
 import org.forgerock.opendj.server.config.server.LDAPConnectionHandlerCfg;
 import org.forgerock.util.Function;
@@ -129,7 +128,7 @@
     /** SSL instance name used in context creation. */
     private static final String SSL_CONTEXT_INSTANCE_NAME = "TLS";
 
-    private GrizzlyLDAPListener listener;
+    private LDAPListener listener;
 
     /** The current configuration state. */
     private LDAPConnectionHandlerCfg currentConfig;
@@ -636,44 +635,45 @@
     }
 
     private void startListener() throws IOException {
-        listener = new GrizzlyLDAPListener(
+        listener = new LDAPListener(
                 listenAddresses,
-                Options.defaultOptions().set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
-                        .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()),
                 new Function<LDAPClientContext,
-                             ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+                             ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                              LdapException>() {
                     @Override
-                    public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
+                    public ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> apply(
                             LDAPClientContext clientContext) throws LdapException {
                         final LDAPClientConnection2 conn = canAccept(clientContext);
                         connectionList.add(conn);
-                        clientContext.onDisconnect(new DisconnectListener() {
+                        clientContext.addConnectionEventListener(new ConnectionEventListener() {
                             @Override
-                            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+                            public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
                                 connectionList.remove(conn);
                             }
 
                             @Override
-                            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
-                                    String diagnosticMessage) {
+                            public void handleConnectionDisconnected(final LDAPClientContext context,
+                                    final ResultCode resultCode, String diagnosticMessage) {
                                 connectionList.remove(conn);
                             }
 
                             @Override
-                            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+                            public void handleConnectionClosed(final LDAPClientContext context,
+                                    final UnbindRequest unbindRequest) {
                                 connectionList.remove(conn);
                             }
                         });
-                        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
+                        return new ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>() {
                             @Override
-                            public Stream<Response> handle(LDAPClientContext context, LdapRawMessage request)
-                                    throws Exception {
+                            public Stream<Response> handle(final LDAPClientContext context,
+                                    final LdapRequestEnvelope request) throws Exception {
                                 return conn.handle(queueingStrategy, request);
                             }
                         };
                     }
-                });
+                }, Options.defaultOptions()
+                          .set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
+                          .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()));
     }
 
     /**
@@ -684,8 +684,6 @@
     public void run() {
         setName(handlerName);
         boolean starting = true;
-        setName(handlerName);
-
         boolean lastIterationFailed = false;
 
         while (!shutdownRequested) {
@@ -724,7 +722,6 @@
 
                 // If we have gotten here, then we are about to start listening
                 // for the first time since startup or since we were previously disabled.
-                // Start the embedded HTTP server
                 startListener();
                 lastIterationFailed = false;
             } catch (Exception e) {

--
Gitblit v1.10.0