mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
06.42.2016 bdc5fa0dd0980eb9e077ae80644504705a24e035
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

* Depreciate ServerConnectionFactory and ServerConnection in favors of
Function and ReactiveHandler

* Apply PR comments

* Use SDK's LDAPListener in LDAPConnectionListener2 rather than
directly referencing Grizzly.

* Fixed SDK's clirr (Thank you Jean Noel !)
1 files deleted
1 files added
37 files modified
1742 ■■■■ changed files
opendj-core/clirr-ignored-api-changes.xml 48 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Action.java 2 ●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Completable.java 20 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Consumer.java 6 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java 26 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java 10 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java 317 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Single.java 29 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Stream.java 17 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java 14 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java 2 ●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java 30 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java 2 ●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java 83 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java 3 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java 3 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java 30 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java 17 ●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java 8 ●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java 23 ●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java 13 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java 148 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/ServerConnectionAdaptor.java 154 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java 24 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java 2 ●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 222 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java 64 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java 71 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java 23 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java 16 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java 22 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java 116 ●●●●● patch | view | raw | blame | history
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java 10 ●●●●● patch | view | raw | blame | history
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java 9 ●●●●● patch | view | raw | blame | history
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java 11 ●●●● patch | view | raw | blame | history
opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java 14 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java 11 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 83 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java 39 ●●●● patch | view | raw | blame | history
opendj-core/clirr-ignored-api-changes.xml
@@ -153,6 +153,12 @@
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
    <differenceType>7012</differenceType>
    <method>javax.security.sasl.SaslServer getSASLServer()</method>
    <justification>Simplify management of security layer</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
    <differenceType>7004</differenceType>
    <method>void enableTLS(javax.net.ssl.SSLContext, java.lang.String[], java.lang.String[], boolean, boolean)</method>
    <justification>Simplify management of security layer</justification>
@@ -167,27 +173,15 @@
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
    <differenceType>7012</differenceType>
    <method>void onDisconnect(org.forgerock.opendj.ldap.LDAPClientContext$DisconnectListener)</method>
    <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContext$ConnectionEventListener)</method>
    <justification>Allows to register connection state listener</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPListener</className>
    <differenceType>7005</differenceType>
    <method>LDAPListener(java.net.InetSocketAddress, org.forgerock.opendj.ldap.ServerConnectionFactory, org.forgerock.util.Options)</method>
    <to>%regex[LDAPListener\(java\.net\.SocketAddress,\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory,\s*org\.forgerock\.util\.Options\)]</to>
    <justification>Accept multiple SocketAddress to bind to</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
    <differenceType>7002</differenceType>
    <method>java.net.InetSocketAddress getSocketAddress()</method>
    <justification>Accept multiple SocketAddress to bind to</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
    <differenceType>7012</differenceType>
    <method>java.util.Set getSocketAddresses()</method>
    <justification>Accept multiple SocketAddress to bind to</justification>
     <method>%regex[LDAPListener\((((java\.lang\.String, +)?int,)|(java\.net\.InetSocketAddress,))\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory(, +org\.forgerock\.util\.Options)?\)]</method>
    <to>%regex[LDAPListener\((((java\.lang\.String, +)?int,)|(java\.util\.Set,)) +org\.forgerock\.util\.Function(, +org\.forgerock\.util\.Options)?\)]</to>
    <justification>Allow multiple bind addressses. Use function rather than specific interface</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPListener</className>
@@ -197,12 +191,6 @@
  </difference>
   <difference>
    <className>org/forgerock/opendj/ldap/LDAPListener</className>
    <differenceType>7012</differenceType>
    <method>java.util.Set getSocketAddresses()</method>
    <justification>Accept multiple SocketAddress to bind to</justification>
  </difference>
    <difference>
    <className>org/forgerock/opendj/ldap/LDAPListener</className>
    <differenceType>7002</differenceType>
    <method>java.net.InetAddress getAddress()</method>
    <justification>Accept multiple SocketAddress to bind to</justification>
@@ -220,11 +208,23 @@
    <justification>Accept multiple SocketAddress to bind to</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
    <differenceType>7002</differenceType>
    <method>java.net.InetSocketAddress getSocketAddress()</method>
    <justification>Accept multiple SocketAddress to bind to</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/spi/LDAPListenerImpl</className>
    <differenceType>7012</differenceType>
    <method>java.util.Set getSocketAddresses()</method>
    <justification>Accept multiple SocketAddress to bind to</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/spi/TransportProvider</className>
    <differenceType>7005</differenceType>
    <method>org.forgerock.opendj.ldap.spi.LDAPListenerImpl getLDAPListener(java.net.InetSocketAddress, org.forgerock.opendj.ldap.ServerConnectionFactory, org.forgerock.util.Options)</method>
    <to>%regex[org\.forgerock\.opendj\.ldap\.spi\.LDAPListenerImpl\s*getLDAPListener\(java\.util\.Set,\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory,\s*org\.forgerock\.util\.Options\)]</to>
    <justification>Accept multiple SocketAddress to bind to</justification>
    <to>%regex[org\.forgerock\.opendj\.ldap\.spi\.LDAPListenerImpl +getLDAPListener\(java\.util\.Set, +org\.forgerock\.util\.Function, +org\.forgerock\.util\.Options\)]</to>
    <justification>Accept multiple SocketAddress to bind to. Use Function rather than specific interface</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/io/LDAP</className>
opendj-core/src/main/java/com/forgerock/reactive/Action.java
@@ -23,7 +23,7 @@
     * Runs the action and optionally throws a checked exception.
     *
     * @throws Exception
     *             if the implementation wishes to throw a checked exception
     *             If an error occurred while performing the Action
     */
    void run() throws Exception;
}
opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -18,16 +18,20 @@
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
/** {@link Completable} is used to communicates a terminated operation which doesn't produce a result. */
/**
 * {@link Completable} is used to communicates a terminated operation which doesn't produce a result. It extends
 * {@link Publisher} by providing additional reactive methods allowing to act on it. The goal of this interface is to
 * decouple ourself from reactive framework (RxJava/Reactor).
 */
public interface Completable extends Publisher<Void> {
    /** Emitter is used to notify when the operation has been completed, successfully or not. */
    /** Subscriber is notified when the operation has been completed, successfully or not. */
    public interface Subscriber {
        /** Notify that this {@link Completable} is now completed. */
        /** Called once this {@link Completable} is completed. */
        void onComplete();
        /**
         * Notify that this {@link Completable} cannot be completed because of an error.
         * Called when this {@link Completable} cannot be completed because of an error.
         *
         * @param error
         *            The error preventing this {@link Completable} to complete
@@ -40,10 +44,12 @@
        /**
         * Called when the streaming api has been subscribed.
         *
         * @param e
         * @param s
         *            The {@link Subscriber} to use to communicate the completeness of this {@link Completable}
         * @throws Exception
         *             on error
         */
        void subscribe(Subscriber e);
        void subscribe(Subscriber s) throws Exception;
    }
    /**
@@ -57,7 +63,7 @@
    Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
    /**
     * Creates a {@link Single} which will emit the specified value when this {@link Completable} complete.
     * Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
     *
     * @param <V>
     *            Type of the value to emit
opendj-core/src/main/java/com/forgerock/reactive/Consumer.java
@@ -23,12 +23,12 @@
 */
public interface Consumer<V> {
    /**
     * Consume the given value.
     * Consumes a value.
     *
     * @param item
     * @param value
     *            the value
     * @throws Exception
     *             on error
     */
    void accept(V item) throws Exception;
    void accept(V value) throws Exception;
}
opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
@@ -16,26 +16,26 @@
package com.forgerock.reactive;
/**
 * Handle the processing of a request in a given context and return the resulting response.
 * Handle the processing of an input in a given context and return the resulting output.
 *
 * @param <CTX>
 * @param <C>
 *            Type of the context
 * @param <REQ>
 *            Type of the request to process
 * @param <REP>
 *            Type of the response
 * @param <I>
 *            Type of the input to process
 * @param <O>
 *            Type of the output as a result of the process
 */
public interface ReactiveHandler<CTX, REQ, REP> {
public interface ReactiveHandler<C, I, O> {
    /**
     * Process the request given the context.
     * Process an input given the context.
     *
     * @param context
     *            Context in which the request must be processed
     * @param request
     *            The response to process
     * @return A {@link Single} response.
     *            Context in which the input must be processed
     * @param input
     *            The input to process
     * @return An output.
     * @throws Exception
     *             if the request cannot be processed
     */
    REP handle(final CTX context, final REQ request) throws Exception;
    O handle(final C context, final I input) throws Exception;
}
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -17,6 +17,7 @@
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
@@ -86,7 +87,8 @@
    }
    /**
     * Create a new {@link Single} from the given {@link Publisher}.
     * Create a new {@link Single} from the given {@link Publisher}. If the {@link Publisher} produce more than one
     * result, they'll be dropped and the inner {@link Subscription} cancelled.
     *
     * @param <V>
     *            Type of the datum emitted
@@ -223,7 +225,7 @@
        }
        @Override
        public Stream<V> onNextDo(final Consumer<V> onNext) {
        public Stream<V> onNext(final Consumer<V> onNext) {
            return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer<V>() {
                @Override
                public void accept(V value) throws Exception {
@@ -233,7 +235,7 @@
        }
        @Override
        public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
        public Stream<V> onError(final Consumer<Throwable> onError) {
            return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
                @Override
                public void accept(Throwable t) throws Exception {
@@ -254,7 +256,7 @@
        }
        @Override
        public Stream<V> onCompleteDo(final Action action) {
        public Stream<V> onComplete(final Action action) {
            return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
                @Override
                public void run() throws Exception {
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
New file
@@ -0,0 +1,317 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package com.forgerock.reactive;
import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
import static org.forgerock.util.Reject.checkNotNull;
import java.io.IOException;
import org.forgerock.opendj.io.ASN1Reader;
import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
import org.forgerock.opendj.io.LDAP;
import org.forgerock.opendj.io.LDAPReader;
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LdapResultHandler;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SearchResultHandler;
import org.forgerock.opendj.ldap.ServerConnection;
import org.forgerock.opendj.ldap.ServerConnectionFactory;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.GenericBindRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.IntermediateResponse;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.TransportProvider;
import org.forgerock.util.Function;
import org.forgerock.util.promise.RuntimeExceptionHandler;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
/**
 * Adapt a {@link ServerConnectionFactory} to a {@link Function} compatible with
 * {@link TransportProvider#getLDAPListener(java.util.Set, Function, org.forgerock.util.Options)}.
 */
@Deprecated
public final class ServerConnectionFactoryAdapter implements
        Function<LDAPClientContext,
                 ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                 LdapException> {
    private final ServerConnectionFactory<LDAPClientContext, Integer> adaptee;
    private final DecodeOptions decodeOptions;
    /**
     * Creates a new adapter.
     *
     * @param decodeOptions
     *            {@link DecodeOptions} used during request deserialization
     * @param serverConnectionFactory
     *            the {@link ServerConnectionFactory} to adapt
     */
    public ServerConnectionFactoryAdapter(final DecodeOptions decodeOptions,
            final ServerConnectionFactory<LDAPClientContext, Integer> serverConnectionFactory) {
        this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
        this.adaptee = checkNotNull(serverConnectionFactory, "serverConnectionFactory must not be null");
    }
    @Override
    public ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> apply(
            final LDAPClientContext clientContext) throws LdapException {
        return new ServerConnectionAdapter(clientContext, decodeOptions, adaptee.handleAccept(clientContext));
    }
    /**
     * Adapt a {@link ServerConnection} to a {@link Function} compatible with
     * {@link TransportProvider#getLDAPListener(java.util.Set, Function, org.forgerock.util.Options)}.
     */
    public static final class ServerConnectionAdapter
            implements ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> {
        private final ServerConnection<Integer> adaptee;
        private final DecodeOptions decodeOptions;
        /**
         * Creates a new adapter.
         *
         * @param clientContext
         *            The {@link LDAPClientContext} which represents the client connection
         * @param decodeOptions
         *            The {@link DecodeOptions} to use during request deserialization
         * @param serverConnection
         *            the {@link ServerConnection} to adapt
         */
        public ServerConnectionAdapter(final LDAPClientContext clientContext, final DecodeOptions decodeOptions,
                final ServerConnection<Integer> serverConnection) {
            this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
            this.adaptee = checkNotNull(serverConnection, "serverConnection must not be null");
            clientContext.addConnectionEventListener(new ConnectionEventListener() {
                @Override
                public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
                    adaptee.handleConnectionError(error);
                }
                @Override
                public void handleConnectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
                    if (unbindRequest == null) {
                        adaptee.handleConnectionClosed(null, null);
                    } else {
                        adaptee.handleConnectionClosed(0, unbindRequest);
                    }
                }
                @Override
                public void handleConnectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
                        final String diagnosticMessage) {
                    adaptee.handleConnectionDisconnected(resultCode, diagnosticMessage);
                }
            });
        }
        @Override
        public Stream<Response> handle(final LDAPClientContext context, final LdapRequestEnvelope request)
                throws Exception {
            final LDAPReader<ASN1Reader> reader = LDAP.getReader(request.getContent(), decodeOptions);
            return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
                @Override
                public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
                    reader.readMessage(new AbstractLDAPMessageHandler() {
                        @Override
                        public void abandonRequest(int messageID, AbandonRequest request)
                                throws DecodeException, IOException {
                            handleAbandon(messageID, request, emitter);
                        }
                        @Override
                        public void addRequest(int messageID, AddRequest request) throws DecodeException, IOException {
                            handleAdd(messageID, request, emitter);
                        }
                        @Override
                        public void deleteRequest(final int messageID, final DeleteRequest request)
                                throws DecodeException, IOException {
                            handleDelete(messageID, request, emitter);
                        }
                        @Override
                        public void bindRequest(int messageID, int version, GenericBindRequest request)
                                throws DecodeException, IOException {
                            handleBind(messageID, version, request, emitter);
                        }
                        @Override
                        public void compareRequest(int messageID, CompareRequest request)
                                throws DecodeException, IOException {
                            handleCompare(messageID, request, emitter);
                        }
                        @Override
                        public <R extends ExtendedResult> void extendedRequest(int messageID,
                                ExtendedRequest<R> request) throws DecodeException, IOException {
                            handleExtendedRequest(messageID, request, emitter);
                        }
                        @Override
                        public void modifyDNRequest(int messageID, ModifyDNRequest request)
                                throws DecodeException, IOException {
                            handleModifyDN(messageID, request, emitter);
                        }
                        @Override
                        public void modifyRequest(int messageID, ModifyRequest request)
                                throws DecodeException, IOException {
                            handleModify(messageID, request, emitter);
                        }
                        @Override
                        public void searchRequest(int messageID, SearchRequest request)
                                throws DecodeException, IOException {
                            handleSearch(messageID, request, emitter);
                        }
                        @Override
                        public void unbindRequest(int messageID, UnbindRequest request)
                                throws DecodeException, IOException {
                            // Unbind request are received through ConnectionEventListener only
                        }
                    });
                    emitter.onComplete();
                }
            }, BackpressureStrategy.ERROR));
        }
        void handleAdd(final int requestId, final AddRequest request, final FlowableEmitter<Response> responseEmitter) {
            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
            adaptee.handleAdd(requestId, request, resultAdapter, resultAdapter);
        }
        void handleBind(final int requestId, final int version, final BindRequest request,
                final FlowableEmitter<Response> responseEmitter) {
            final ResultHandlerAdapter<BindResult> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
            adaptee.handleBind(requestId, version, request, resultAdapter, resultAdapter);
        }
        void handleCompare(final int requestId, final CompareRequest request,
                final FlowableEmitter<Response> responseEmitter) {
            final ResultHandlerAdapter<CompareResult> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
            adaptee.handleCompare(requestId, request, resultAdapter, resultAdapter);
        }
        void handleDelete(final int requestId, final DeleteRequest request,
                final FlowableEmitter<Response> responseEmitter) {
            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
            adaptee.handleDelete(requestId, request, resultAdapter, resultAdapter);
        }
        <R extends ExtendedResult> void handleExtendedRequest(final int requestId, final ExtendedRequest<R> request,
                final FlowableEmitter<Response> responseEmitter) {
            final ResultHandlerAdapter<R> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
            adaptee.handleExtendedRequest(requestId, request, resultAdapter, resultAdapter);
        }
        void handleModify(final int requestId, final ModifyRequest request,
                final FlowableEmitter<Response> responseEmitter) {
            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
            adaptee.handleModify(requestId, request, resultAdapter, resultAdapter);
        }
        void handleModifyDN(final int requestId, final ModifyDNRequest request,
                final FlowableEmitter<Response> responseEmitter) {
            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
            adaptee.handleModifyDN(requestId, request, resultAdapter, resultAdapter);
        }
        void handleSearch(final int requestId, final SearchRequest request,
                final FlowableEmitter<Response> responseEmitter) {
            final ResultHandlerAdapter<Result> resultAdapter = new ResultHandlerAdapter<>(responseEmitter);
            adaptee.handleSearch(requestId, request, resultAdapter, resultAdapter, resultAdapter);
        }
        void handleAbandon(int requestId, final AbandonRequest request, final FlowableEmitter<Response> emitter) {
            adaptee.handleAbandon(requestId, request);
        }
        /** Forward all responses received from handler to a {@link LdapResponse}. */
        private static final class ResultHandlerAdapter<R extends Response> implements IntermediateResponseHandler,
                SearchResultHandler, LdapResultHandler<R>, RuntimeExceptionHandler {
            private final FlowableEmitter<Response> adaptee;
            ResultHandlerAdapter(final FlowableEmitter<Response> emitter) {
                this.adaptee = emitter;
            }
            @Override
            public boolean handleEntry(final SearchResultEntry entry) {
                adaptee.onNext(entry);
                return true;
            }
            @Override
            public boolean handleReference(final SearchResultReference reference) {
                adaptee.onNext(reference);
                return true;
            }
            @Override
            public boolean handleIntermediateResponse(final IntermediateResponse intermediateResponse) {
                adaptee.onNext(intermediateResponse);
                return true;
            }
            @Override
            public void handleResult(R result) {
                if (result != null) {
                    adaptee.onNext(result);
                }
                adaptee.onComplete();
            }
            @Override
            public void handleRuntimeException(RuntimeException exception) {
                adaptee.onError(exception);
            }
            @Override
            public void handleException(LdapException exception) {
                adaptee.onError(exception);
            }
        }
    }
}
opendj-core/src/main/java/com/forgerock/reactive/Single.java
@@ -19,40 +19,45 @@
import org.reactivestreams.Publisher;
/**
 * Single is a reactive-streams compatible promise.
 * Single is a reactive-streams compatible promise. It extends {@link Publisher} by providing additional reactive
 * methods allowing to act on it. The goal of this interface is to decouple ourself from reactive framework
 * (RxJava/Reactor).
 *
 * @param <V>
 *            Type of the datum emitted
 */
public interface Single<V> extends Publisher<V> {
    /** Emitter is used to notify when the operation has been completed, successfully or not. */
    /** Subscriber is notified when the operation has been completed, successfully or not. */
    public interface Subscriber<V> {
        /**
         * Signal a success value.
         * Called once this {@link Single} is completed.
         *
         * @param t
         * @param value
         *            the value, not null
         */
        void onComplete(V t);
        void onComplete(V value);
        /**
         * Signal an exception.
         * Called when this {@link Single} cannot be completed because of an error.
         *
         * @param t
         *            the exception, not null
         * @param error
         *            The error preventing this {@link Single} to complete, not null
         */
        void onError(Throwable t);
        void onError(Throwable error);
    }
    /** Adapts the streaming api to a callback one. */
    public interface Emitter<V> {
        /**
         * Called for each SingleObserver that subscribes.
         * @param e the safe emitter instance, never null
         * @throws Exception on error
         *
         * @param s
         *            The {@link Subscriber} to use to communicate the completeness of this {@link Single}
         * @throws Exception
         *             on error
         */
        void subscribe(Subscriber<V> e) throws Exception;
        void subscribe(Subscriber<V> s) throws Exception;
    }
    /**
opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -19,7 +19,9 @@
import org.reactivestreams.Publisher;
/**
 * Stream is a reactive-streams compliant way to chain operations and transformation on a stream of data.
 * Stream is a reactive-streams compliant way to chain operations and transformation on a stream of data. It extends
 * {@link Publisher} by providing additional reactive methods allowing to act on it. The goal of this interface is to
 * decouple ourself from reactive framework (RxJava/Reactor).
 *
 * @param <V>
 *            Type of data emitted
@@ -27,7 +29,7 @@
public interface Stream<V> extends Publisher<V> {
    /**
     * Transform the data emitted by this stream.
     * Transforms the data emitted by this stream.
     *
     * @param <O>
     *            Type of data emitted after transformation
@@ -38,7 +40,8 @@
    <O> Stream<O> map(Function<V, O, Exception> function);
    /**
     * Transform each data emitted by this stream into a new stream of data. All these streams are then merged together.
     * Transforms each data emitted by this stream into a new stream of data. All these streams are then merged
     * together.
     *
     * @param <O>
     *            Type of data emitted after transformation
@@ -69,7 +72,7 @@
     *            The {@link Consumer} to invoke when a value is emitted by this stream
     * @return a new {@link Stream}
     */
    Stream<V> onNextDo(Consumer<V> onNext);
    Stream<V> onNext(Consumer<V> onNext);
    /**
     * Invokes the on error {@link Consumer} when an error occurs on this stream.
@@ -78,7 +81,7 @@
     *            The {@link Consumer} to invoke on error
     * @return a new {@link Stream}
     */
    Stream<V> onErrorDo(Consumer<Throwable> onError);
    Stream<V> onError(Consumer<Throwable> onError);
    /**
     * Invokes the on complete {@link Action} when this stream is completed.
@@ -87,10 +90,10 @@
     *            The {@link Action} to invoke on stream completion
     * @return a new {@link Stream}
     */
    Stream<V> onCompleteDo(Action onComplete);
    Stream<V> onComplete(Action onComplete);
    /**
     * Subscribe to this stream and drop all data produced by it.
     * Subscribes to this stream and drop all data produced by it.
     */
    void subscribe();
}
opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java
@@ -17,6 +17,7 @@
package org.forgerock.opendj.io;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -153,6 +154,19 @@
     * The protocol op type for unbind requests.
     */
    public static final byte OP_TYPE_UNBIND_REQUEST = 0x42;
    /** Mapping between request protocol op and their respecetive response protocol op. */
    public static final byte[] OP_TO_RESULT_TYPE = new byte[0xFF];
    static {
        Arrays.fill(OP_TO_RESULT_TYPE, (byte) 0x00);
        OP_TO_RESULT_TYPE[OP_TYPE_ADD_REQUEST] = OP_TYPE_ADD_RESPONSE;
        OP_TO_RESULT_TYPE[OP_TYPE_BIND_REQUEST] = OP_TYPE_BIND_RESPONSE;
        OP_TO_RESULT_TYPE[OP_TYPE_COMPARE_REQUEST] = OP_TYPE_COMPARE_RESPONSE;
        OP_TO_RESULT_TYPE[OP_TYPE_DELETE_REQUEST] = OP_TYPE_DELETE_RESPONSE;
        OP_TO_RESULT_TYPE[OP_TYPE_EXTENDED_REQUEST] = OP_TYPE_EXTENDED_RESPONSE;
        OP_TO_RESULT_TYPE[OP_TYPE_MODIFY_DN_REQUEST] = OP_TYPE_MODIFY_DN_RESPONSE;
        OP_TO_RESULT_TYPE[OP_TYPE_MODIFY_REQUEST] = OP_TYPE_MODIFY_RESPONSE;
        OP_TO_RESULT_TYPE[OP_TYPE_SEARCH_REQUEST] = OP_TYPE_SEARCH_RESULT_DONE;
    };
    /**
     * The BER type to use for the AuthenticationChoice element in a bind
     * request when SASL authentication is to be used.
opendj-core/src/main/java/org/forgerock/opendj/ldap/CommonLDAPOptions.java
@@ -24,7 +24,7 @@
/**
 * Common options for LDAP clients and listeners.
 */
public abstract class CommonLDAPOptions {
abstract class CommonLDAPOptions {
    /**
     * Specifies the class loader which will be used to load the
     * {@link org.forgerock.opendj.ldap.spi.TransportProvider TransportProvider}.
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -18,6 +18,7 @@
package org.forgerock.opendj.ldap;
import java.net.InetSocketAddress;
import java.util.EventListener;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
@@ -35,7 +36,7 @@
public interface LDAPClientContext {
    /** Listens for disconnection event. */
    public interface DisconnectListener {
    public interface ConnectionEventListener extends EventListener {
        /**
         * Invoked when the connection has been disconnected because of an error (e.g: message too big).
         *
@@ -44,7 +45,7 @@
         * @param error
         *            The error
         */
        void exceptionOccurred(LDAPClientContext context, Throwable error);
        void handleConnectionError(LDAPClientContext context, Throwable error);
        /**
         * Invoked when the client closed the connection, possibly using an unbind request.
@@ -55,7 +56,7 @@
         *            The unbind request, which may be {@code null} if one was not sent before the connection was
         *            closed.
         */
        void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
        void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
        /**
         * Invoked when the connection has been disconnected by the server.
@@ -68,28 +69,28 @@
         * @param diagnosticMessage
         *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
         */
        void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
        void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
    }
    /**
     * Register a listener which will be notified when this {@link LDAPClientContext} is disconnected.
     *
     * @param listener The {@link DisconnectListener} to register.
     * @param listener The {@link ConnectionEventListener} to register.
     */
    void onDisconnect(DisconnectListener listener);
    void addConnectionEventListener(ConnectionEventListener listener);
    /**
     * Disconnects the client without sending a disconnect notification. Invoking this method causes
     * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
     * method returns.
     * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
     * before this method returns.
     */
    void disconnect();
    /**
     * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
     * message. Invoking this method causes
     * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
     * method returns.
     * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
     * before this method returns.
     *
     * @param resultCode
     *            The result code to include with the disconnect notification
@@ -136,6 +137,15 @@
    SSLSession getSSLSession();
    /**
     * Returns the {@link SaslServer} currently in use by the underlying connection, or
     * {@code null} if SASL integrity and/or privacy protection is not enabled.
     *
     * @return The {@link SaslServer} currently in use by the underlying connection, or
     *         {@code null} if SASL integrity and/or privacy protection is not enabled.
     */
    SaslServer getSASLServer();
    /**
     * Returns {@code true} if the underlying connection has been closed as a
     * result of a client disconnect, a fatal connection error, or a server-side
     * {@link #disconnect}.
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
@@ -17,8 +17,8 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT;
import static com.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_UNEXPECTED;
import static com.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT;
import static com.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_UNEXPECTED;
import static com.forgerock.opendj.ldap.CoreMessages.LDAP_CONNECTION_CONNECT_TIMEOUT;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static java.util.concurrent.TimeUnit.*;
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java
@@ -22,12 +22,18 @@
import java.util.Collections;
import java.util.Set;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.TransportProvider;
import org.forgerock.util.Function;
import org.forgerock.util.Option;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
/**
 * An LDAP server connection listener which waits for LDAP connection requests
 * to come in over the network and binds them to a {@link ServerConnection}
@@ -119,8 +125,7 @@
     * @param port
     *            The port to listen on.
     * @param factory
     *            The server connection factory which will be used to create
     *            server connections.
     *            The handler factory which will be used to create handlers.
     * @throws IOException
     *             If an error occurred while trying to listen on the provided
     *             address.
@@ -128,7 +133,9 @@
     *             If {code factory} was {@code null}.
     */
    public LDAPListener(final int port,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> factory) throws IOException {
        this(port, factory, Options.defaultOptions());
    }
@@ -139,8 +146,7 @@
     * @param port
     *            The port to listen on.
     * @param factory
     *            The server connection factory which will be used to create
     *            server connections.
     *            The handler factory which will be used to create handlers.
     * @param options
     *            The LDAP listener options.
     * @throws IOException
@@ -150,40 +156,42 @@
     *             If {code factory} or {@code options} was {@code null}.
     */
    public LDAPListener(final int port,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> factory,
            final Options options) throws IOException {
        this(new InetSocketAddress(port), factory, options);
        this(Collections.singleton(new InetSocketAddress(port)), factory, options);
    }
    /**
     * Creates a new LDAP listener implementation which will listen for LDAP
     * client connections at the provided address.
     *
     * @param address
     *            The address to listen on.
     * @param addresses
     *            The addresses to listen on.
     * @param factory
     *            The server connection factory which will be used to create
     *            server connections.
     *            The handler factory which will be used to create handlers.
     * @throws IOException
     *             If an error occurred while trying to listen on the provided
     *             address.
     * @throws NullPointerException
     *             If {@code address} or {code factory} was {@code null}.
     */
    public LDAPListener(final InetSocketAddress address,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
        this(address, factory, Options.defaultOptions());
    public LDAPListener(final Set<InetSocketAddress> addresses,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> factory) throws IOException {
        this(addresses, factory, Options.defaultOptions());
    }
    /**
     * Creates a new LDAP listener implementation which will listen for LDAP
     * client connections at the provided address.
     *
     * @param address
     *            The address to listen on.
     * @param addresses
     *            The addresses to listen on.
     * @param factory
     *            The server connection factory which will be used to create
     *            server connections.
     *            The handler factory which will be used to create handlers.
     * @param options
     *            The LDAP listener options.
     * @throws IOException
@@ -193,12 +201,14 @@
     *             If {@code address}, {code factory}, or {@code options} was
     *             {@code null}.
     */
    public LDAPListener(final InetSocketAddress address,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
    public LDAPListener(final Set<InetSocketAddress> addresses,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> factory,
            final Options options) throws IOException {
        Reject.ifNull(address, factory, options);
        Reject.ifNull(addresses, factory, options);
        this.provider = getTransportProvider(options);
        this.impl = provider.getLDAPListener(Collections.singleton(address), factory, options);
        this.impl = provider.getLDAPListener(addresses, factory, options);
    }
    /**
@@ -210,8 +220,7 @@
     * @param port
     *            The port to listen on.
     * @param factory
     *            The server connection factory which will be used to create
     *            server connections.
     *            The handler factory which will be used to create handlers.
     * @throws IOException
     *             If an error occurred while trying to listen on the provided
     *             address.
@@ -219,7 +228,9 @@
     *             If {@code host} or {code factory} was {@code null}.
     */
    public LDAPListener(final String host, final int port,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory) throws IOException {
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> factory) throws IOException {
        this(host, port, factory, Options.defaultOptions());
    }
@@ -232,8 +243,7 @@
     * @param port
     *            The port to listen on.
     * @param factory
     *            The server connection factory which will be used to create
     *            server connections.
     *            The handler factory which will be used to create handlers.
     * @param options
     *            The LDAP listener options.
     * @throws IOException
@@ -244,9 +254,11 @@
     *             {@code null}.
     */
    public LDAPListener(final String host, final int port,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> factory,
            final Options options) throws IOException {
        this(new InetSocketAddress(host, port), factory, options);
        this(Collections.singleton(new InetSocketAddress(host, port)), factory, options);
    }
    /** Closes this LDAP connection listener. */
@@ -256,15 +268,24 @@
    }
    /**
     * Returns the address that this LDAP listener is listening on.
     * Returns the addresses that this LDAP listener is listening on.
     *
     * @return The address that this LDAP listener is listening on.
     * @return The addresses that this LDAP listener is listening on.
     */
    public Set<InetSocketAddress> getSocketAddresses() {
        return impl.getSocketAddresses();
    }
    /**
     * Returns the first address that his LDAP listener is listening on.
     *
     * @return The addresses that this LDAP listener is listening on.
     */
    public InetSocketAddress firstSocketAddress() {
        return impl.getSocketAddresses().iterator().next();
    }
    /**
     * Returns the name of the transport provider, which provides the implementation
     * of this factory.
     *
opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnection.java
@@ -12,7 +12,7 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2010 Sun Microsystems, Inc.
 * Portions copyright 2012 ForgeRock AS.
 * Portions copyright 2012-2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -35,6 +35,7 @@
 *            The type of request context.
 * @see ServerConnectionFactory
 */
@Deprecated
public interface ServerConnection<C> extends RequestHandler<C> {
    /**
opendj-core/src/main/java/org/forgerock/opendj/ldap/ServerConnectionFactory.java
@@ -12,7 +12,7 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2010 Sun Microsystems, Inc.
 * Portions Copyright 2014 ForgeRock AS.
 * Portions Copyright 2014-2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -33,6 +33,7 @@
 * @see Connections#newInternalConnectionFactory(ServerConnectionFactory,
 *      Object) newInternalConnectionFactory
 */
@Deprecated
public interface ServerConnectionFactory<C, R> {
    /**
     * Invoked when a new client connection is accepted by the associated
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java
@@ -21,7 +21,7 @@
import org.forgerock.opendj.ldap.responses.Response;
/**
 * Contains statics methods to create ldap messages.
 * Contains static methods to create ldap messages.
 */
public final class LdapMessages {
@@ -30,23 +30,23 @@
    }
    /**
     * Creates a new {@link LdapRawMessage} containing a partially decoded LDAP message.
     * Creates a new {@link     } containing a partially decoded LDAP message.
     *
     * @param messageType
     *            Operation code of the message
     * @param messageId
     *            Unique identifier of this message
     * @param protocolVersion
     * @param ldapVersion
     *            Protocol version to use (only for Bind requests)
     * @param rawDn
     *            Unparsed name contained in the request (or null if DN is not applicable)
     * @param reader
     *            An {@link ASN1Reader} containing the full encoded ldap message packet.
     * @return A new {@link LdapRawMessage}
     * @return A new {@link LdapRequestEnvelope}
     */
    public static LdapRawMessage newRawMessage(final byte messageType, final int messageId, final int protocolVersion,
            final ByteString rawDn, final ASN1Reader reader) {
        return new LdapRawMessage(messageType, messageId, protocolVersion, rawDn, reader);
    public static LdapRequestEnvelope newRequestEnvelope(final byte messageType, final int messageId,
            final int ldapVersion, final ByteString rawDn, final ASN1Reader reader) {
        return new LdapRequestEnvelope(messageType, messageId, ldapVersion, rawDn, reader);
    }
    /**
@@ -67,16 +67,16 @@
    }
    /**
     * Represents an encoded LDAP message with it's envelope.
     * Represents a Ldap Request envelope containing an encoded Request.
     */
    public static final class LdapRawMessage extends LdapMessageEnvelope<ASN1Reader> {
    public static final class LdapRequestEnvelope extends LdapMessageEnvelope<ASN1Reader> {
        private final ByteString rawDn;
        private final int version;
        private final int ldapVersion;
        private LdapRawMessage(final byte messageType, final int messageId, final int version, final ByteString rawDn,
                final ASN1Reader content) {
        private LdapRequestEnvelope(final byte messageType, final int messageId, final int ldapVersion,
                final ByteString rawDn, final ASN1Reader content) {
            super(messageType, messageId, content);
            this.version = version;
            this.ldapVersion = ldapVersion;
            this.rawDn = rawDn;
        }
@@ -85,8 +85,8 @@
         *
         * @return The ldap protocol version
         */
        public int getVersion() {
            return version;
        public int getLdapVersion() {
            return ldapVersion;
        }
        /**
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java
@@ -20,9 +20,15 @@
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.ServerConnectionFactory;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
/**
 * Interface for transport providers, which provide implementation
 * for {@code LDAPConnectionFactory} and {@code LDAPListener} classes,
@@ -55,8 +61,7 @@
     * @param addresses
     *            The addresses to listen on.
     * @param factory
     *            The server connection factory which will be used to create
     *            server connections.
     *            The handler factory which will be used to create handlers.
     * @param options
     *            The LDAP listener options.
     * @return an implementation of {@code LDAPListener}
@@ -65,6 +70,8 @@
     *             address.
     */
    LDAPListenerImpl getLDAPListener(Set<InetSocketAddress> addresses,
            ServerConnectionFactory<LDAPClientContext, Integer> factory, Options options)
            throws IOException;
            Function<LDAPClientContext,
                     ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                     LdapException> factory,
            Options options) throws IOException;
}
opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
@@ -17,12 +17,13 @@
package org.forgerock.opendj.ldap;
import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
import static org.forgerock.opendj.ldap.TestCaseUtils.loopbackWithDynamicPort;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -71,6 +72,7 @@
import com.forgerock.opendj.ldap.controls.AccountUsabilityRequestControl;
import com.forgerock.opendj.ldap.controls.AccountUsabilityResponseControl;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/**
 * A simple ldap server that manages 1000 entries and used for running
@@ -517,7 +519,9 @@
            return;
        }
        sslContext = new SSLContextBuilder().getSSLContext();
        listener = new LDAPListener(loopbackWithDynamicPort(), getInstance(),
        listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                        getInstance()),
                        Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096));
        isRunning = true;
    }
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java
@@ -21,14 +21,22 @@
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.ServerConnectionFactory;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
/**
 * Basic LDAP listener implementation to use for tests only.
 */
public final class BasicLDAPListener implements LDAPListenerImpl {
    private final ServerConnectionFactory<LDAPClientContext, Integer> connectionFactory;
    private final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> connectionFactory;
    private final Set<InetSocketAddress> socketAddresses;
    /**
@@ -37,15 +45,16 @@
     * @param address
     *            The address to listen on.
     * @param factory
     *            The server connection factory can be used to create
     *            server connections.
     *            The server connection factory can be used to create server connections.
     * @param options
     *            The LDAP listener options.
     * @throws IOException
     *             is never thrown with this do-nothing implementation
     */
    public BasicLDAPListener(final Set<InetSocketAddress> addresses,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> factory,
            final Options options) throws IOException {
        this.connectionFactory = factory;
        this.socketAddresses = addresses;
@@ -70,7 +79,9 @@
        return builder.toString();
    }
    ServerConnectionFactory<LDAPClientContext, Integer> getConnectionFactory() {
    Function<LDAPClientContext,
             ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
             LdapException> getConnectionFactory() {
        return connectionFactory;
    }
}
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java
@@ -20,9 +20,15 @@
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.ServerConnectionFactory;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
/**
 * Provides an basic implementation of a transport provider doing nothing.
 * This should be used for tests only.
@@ -45,7 +51,10 @@
    @Override
    public LDAPListenerImpl getLDAPListener(Set<InetSocketAddress> addresses,
            ServerConnectionFactory<LDAPClientContext, Integer> factory, Options options) throws IOException {
            Function<LDAPClientContext,
                     ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                     LdapException> factory,
            Options options) throws IOException {
        return new BasicLDAPListener(addresses, factory, options);
    }
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -15,42 +15,18 @@
 */
package com.forgerock.opendj.grizzly;
import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import org.forgerock.opendj.grizzly.GrizzlyLDAPConnectionFactory;
import org.forgerock.opendj.grizzly.GrizzlyLDAPListener;
import org.forgerock.opendj.io.ASN1Reader;
import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
import org.forgerock.opendj.io.LDAP;
import org.forgerock.opendj.io.LDAPReader;
import org.forgerock.opendj.ldap.CommonLDAPOptions;
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.ServerConnection;
import org.forgerock.opendj.ldap.ServerConnectionFactory;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
import org.forgerock.opendj.ldap.requests.AddRequest;
import org.forgerock.opendj.ldap.requests.CompareRequest;
import org.forgerock.opendj.ldap.requests.DeleteRequest;
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.GenericBindRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LDAPConnectionFactoryImpl;
import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.TransportProvider;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
@@ -58,11 +34,6 @@
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
/**
 * Grizzly transport provider implementation.
 */
@@ -75,122 +46,15 @@
    @Override
    public LDAPListenerImpl getLDAPListener(final Set<InetSocketAddress> addresses,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory, final Options options)
            throws IOException {
        return new GrizzlyLDAPListener(addresses, options,
                new Function<LDAPClientContext,
                             ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                             LdapException>() {
                    @Override
                    public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
                            final LDAPClientContext clientContext) throws LdapException {
                        return newHandler(clientContext, factory, options);
                    }
                });
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> factory,
            final Options options) throws IOException {
        return new GrizzlyLDAPListener(addresses, options, factory);
    }
    @Override
    public String getName() {
        return "Grizzly";
    }
    private ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> newHandler(
            final LDAPClientContext clientContext, final ServerConnectionFactory<LDAPClientContext, Integer> factory,
            final Options options) throws LdapException {
        final ServerConnection<Integer> serverConnection = factory.handleAccept(clientContext);
        final ServerConnectionAdaptor<Integer> adapter = new ServerConnectionAdaptor<>(serverConnection);
        clientContext.onDisconnect(new DisconnectListener() {
            @Override
            public void exceptionOccurred(final LDAPClientContext context, final Throwable error) {
                serverConnection.handleConnectionError(error);
            }
            @Override
            public void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
                serverConnection.handleConnectionClosed(0, unbindRequest);
            }
            @Override
            public void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
                    final String diagnosticMessage) {
                serverConnection.handleConnectionDisconnected(resultCode, diagnosticMessage);
            }
        });
        final DecodeOptions decodeOptions = options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS);
        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
            @Override
            public Stream<Response> handle(final LDAPClientContext context,
                    final LdapRawMessage rawRequest) throws Exception {
                final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(), decodeOptions);
                return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
                        reader.readMessage(new AbstractLDAPMessageHandler() {
                            @Override
                            public void abandonRequest(int messageID, AbandonRequest request)
                                    throws DecodeException, IOException {
                                adapter.handleAbandon(messageID, request, emitter);
                            }
                            @Override
                            public void addRequest(int messageID, AddRequest request)
                                    throws DecodeException, IOException {
                                adapter.handleAdd(messageID, request, emitter);
                            }
                            @Override
                            public void deleteRequest(final int messageID, final DeleteRequest request)
                                    throws DecodeException, IOException {
                                adapter.handleDelete(messageID, request, emitter);
                            }
                            @Override
                            public void bindRequest(int messageID, int version, GenericBindRequest request)
                                    throws DecodeException, IOException {
                                adapter.handleBind(messageID, version, request, emitter);
                            }
                            @Override
                            public void compareRequest(int messageID, CompareRequest request)
                                    throws DecodeException, IOException {
                                adapter.handleCompare(messageID, request, emitter);
                            }
                            @Override
                            public <R extends ExtendedResult> void extendedRequest(int messageID,
                                    ExtendedRequest<R> request) throws DecodeException, IOException {
                                adapter.handleExtendedRequest(messageID, request, emitter);
                            }
                            @Override
                            public void modifyDNRequest(int messageID, ModifyDNRequest request)
                                    throws DecodeException, IOException {
                                adapter.handleModifyDN(messageID, request, emitter);
                            }
                            @Override
                            public void modifyRequest(int messageID, ModifyRequest request)
                                    throws DecodeException, IOException {
                                adapter.handleModify(messageID, request, emitter);
                            }
                            @Override
                            public void searchRequest(int messageID, SearchRequest request)
                                    throws DecodeException, IOException {
                                adapter.handleSearch(messageID, request, emitter);
                            }
                            @Override
                            public void unbindRequest(int messageID, UnbindRequest request)
                                    throws DecodeException, IOException {
                                serverConnection.handleConnectionClosed(messageID, request);
                            }
                        });
                        emitter.onComplete();
                    }
                }, BackpressureStrategy.ERROR));
            }
        };
    }
}
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/ServerConnectionAdaptor.java
File was deleted
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -17,7 +17,6 @@
package org.forgerock.opendj.grizzly;
import static org.forgerock.opendj.grizzly.ServerTCPNIOTransport.SERVER_TRANSPORT;
import static org.forgerock.opendj.ldap.CommonLDAPOptions.LDAP_DECODE_OPTIONS;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import java.io.IOException;
@@ -34,11 +33,10 @@
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.glassfish.grizzly.filterchain.FilterChain;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.nio.transport.TCPNIOBindingHandler;
import org.glassfish.grizzly.nio.transport.TCPNIOConnection;
import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection;
@@ -74,7 +72,7 @@
     */
    public GrizzlyLDAPListener(final Set<InetSocketAddress> addresses, final Options options,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> requestHandlerFactory) throws IOException {
        this(addresses, requestHandlerFactory, options, null);
    }
@@ -97,7 +95,7 @@
     */
    public GrizzlyLDAPListener(final Set<InetSocketAddress> addresses,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> requestHandlerFactory,
            final Options options, TCPNIOTransport transport) throws IOException {
@@ -105,13 +103,7 @@
        this.options = Options.copyOf(options);
        final LDAPServerFilter serverFilter = new LDAPServerFilter(requestHandlerFactory, options,
                options.get(LDAP_DECODE_OPTIONS), options.get(MAX_CONCURRENT_REQUESTS));
        final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(),
                new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)) {
                    @Override
                    protected void onLdapCodecError(FilterChainContext ctx, Throwable error) {
                        serverFilter.exceptionOccurred(ctx, error);
                    }
                }, serverFilter);
        final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(), serverFilter);
        final TCPNIOBindingHandler bindingHandler = TCPNIOBindingHandler.builder(this.transport.get())
                .processor(ldapChain).build();
        this.serverConnections = new ArrayList<>(addresses.size());
@@ -128,18 +120,16 @@
        if (isClosed.compareAndSet(false, true)) {
            try {
                for (TCPNIOConnection serverConnection : serverConnections) {
                    serverConnection.close().get();
                    serverConnection.closeSilently();
                }
            } catch (final InterruptedException e) {
                // Cannot handle here.
                Thread.currentThread().interrupt();
            } catch (final Exception e) {
                // TODO: I18N
                logger.warn(LocalizableMessage.raw("Exception occurred while closing listener", e));
            }
            } finally {
            transport.release();
        }
    }
    }
    @Override
    public Set<InetSocketAddress> getSocketAddresses() {
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
@@ -17,7 +17,7 @@
package org.forgerock.opendj.grizzly;
import static org.forgerock.opendj.ldap.CommonLDAPOptions.LDAP_DECODE_OPTIONS;
import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
import static org.forgerock.opendj.ldap.ResultCode.CLIENT_SIDE_LOCAL_ERROR;
import static org.forgerock.opendj.ldap.responses.Responses.newResult;
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -39,6 +39,7 @@
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
@@ -49,18 +50,16 @@
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import org.forgerock.opendj.ldap.spi.LdapMessages;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.Filter;
import org.glassfish.grizzly.filterchain.FilterChain;
import org.glassfish.grizzly.filterchain.FilterChainBuilder;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.ssl.SSLFilter;
@@ -82,11 +81,8 @@
 */
final class LDAPServerFilter extends BaseFilter {
    private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
            .createAttribute("LDAPServerConnection");
    private final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> connectionHandlerFactory;
    /**
@@ -119,6 +115,8 @@
    private final int maxConcurrentRequests;
    private final Options connectionOptions;
    private DecodeOptions decodeOptions;
    /**
     * Creates a server filter with provided listener, options and max size of ASN1 element.
     *
@@ -131,11 +129,12 @@
     */
    LDAPServerFilter(
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                           LdapException> connectionHandlerFactory,
            final Options connectionOptions, final DecodeOptions options, final int maxPendingRequests) {
        this.connectionHandlerFactory = connectionHandlerFactory;
        this.connectionOptions = connectionOptions;
        this.decodeOptions = options;
        this.maxConcurrentRequests = maxPendingRequests;
    }
@@ -146,13 +145,23 @@
        connection.configureBlocking(false);
        final ClientConnectionImpl clientContext = new ClientConnectionImpl(connection);
        LDAP_CONNECTION_ATTR.set(connection, clientContext);
        final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
                connectionHandlerFactory.apply(clientContext);
        clientContext.read().flatMap(new Function<LdapRawMessage, Publisher<Void>, Exception>() {
        connection.setProcessor(FilterChainBuilder
                .stateless()
                .addAll((FilterChain) connection.getProcessor())
                .add(new LdapCodec(connectionOptions.get(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES), decodeOptions) {
            @Override
            public Publisher<Void> apply(final LdapRawMessage rawRequest) throws Exception {
                    protected void onLdapCodecError(final FilterChainContext ctx, final Throwable error) {
                        clientContext.exceptionOccurred(ctx, error);
                    }
                })
                .add(clientContext)
                .build());
        final ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> requestHandler =
                connectionHandlerFactory.apply(clientContext);
        clientContext.read().flatMap(new Function<LdapRequestEnvelope, Publisher<Void>, Exception>() {
            @Override
            public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
                if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
                    clientContext.notifyConnectionClosed(rawRequest);
                    return emptyStream();
@@ -185,7 +194,7 @@
                // Swallow the error to prevent the subscribe() below to report it on the console.
                return emptyStream();
            }
        }).onCompleteDo(new Action() {
        }).onComplete(new Action() {
            @Override
            public void run() throws Exception {
                clientContext.notifyConnectionClosed(null);
@@ -194,30 +203,16 @@
        return ctx.getStopAction();
    }
    private final LdapResponseMessage toLdapResponseMessage(final LdapRawMessage rawRequest, final Result result) {
        switch (rawRequest.getMessageType()) {
        case OP_TYPE_ADD_REQUEST:
            return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_BIND_REQUEST:
            return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_COMPARE_REQUEST:
            return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_DELETE_REQUEST:
            return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_EXTENDED_REQUEST:
            return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_MODIFY_DN_REQUEST:
            return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_MODIFY_REQUEST:
            return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), result);
        case OP_TYPE_SEARCH_REQUEST:
            return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), result);
        default:
    private final LdapResponseMessage toLdapResponseMessage(final LdapRequestEnvelope rawRequest, final Result result) {
        final byte resultType = OP_TO_RESULT_TYPE[rawRequest.getMessageType()];
        if (resultType == 0) {
            throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
        }
        return newResponseMessage(resultType, rawRequest.getMessageId(), result);
    }
    private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(final LdapRawMessage rawRequest) {
    private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(
            final LdapRequestEnvelope rawRequest) {
        return new Function<Response, LdapResponseMessage, Exception>() {
            @Override
            public LdapResponseMessage apply(final Response response) {
@@ -233,58 +228,32 @@
                if (response instanceof SearchResultReference) {
                    return newResponseMessage(OP_TYPE_SEARCH_RESULT_REFERENCE, rawRequest.getMessageId(), response);
                }
                throw new IllegalArgumentException();
                throw new IllegalArgumentException(
                        "Not implemented for a response of type " + (response != null ? response.getClass() : null));
            }
        };
    }
    @Override
    public NextAction handleRead(final FilterChainContext ctx) throws IOException {
        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.get(ctx.getConnection());
        if (clientContext != null) {
            return clientContext.handleRead(ctx);
        }
        ctx.suspend();
        return ctx.getSuspendAction();
    }
    @Override
    public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
        if (clientContext != null) {
            clientContext.exceptionOccurred(ctx, error);
        }
    }
    @Override
    public NextAction handleClose(final FilterChainContext ctx) throws IOException {
        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
        if (clientContext != null) {
            clientContext.handleClose(ctx);
        }
        return ctx.getStopAction();
    }
    final class ClientConnectionImpl implements LDAPClientContext {
    final class ClientConnectionImpl extends BaseFilter implements LDAPClientContext {
        final class GrizzlyBackpressureSubscription implements Subscription {
            private final AtomicLong pendingRequests = new AtomicLong();
            private Subscriber<? super LdapRawMessage> subscriber;
            private Subscriber<? super LdapRequestEnvelope> subscriber;
            volatile FilterChainContext ctx;
            GrizzlyBackpressureSubscription(Subscriber<? super LdapRawMessage> subscriber) {
            GrizzlyBackpressureSubscription(Subscriber<? super LdapRequestEnvelope> subscriber) {
                this.subscriber = subscriber;
                subscriber.onSubscribe(this);
            }
            NextAction handleRead(final FilterChainContext ctx) {
                final Subscriber<? super LdapRawMessage> sub = subscriber;
                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                if (sub == null) {
                    // Subscription cancelled. Stop reading
                    ctx.suspend();
                    return ctx.getSuspendAction();
                }
                sub.onNext((LdapRawMessage) ctx.getMessage());
                sub.onNext((LdapRequestEnvelope) ctx.getMessage());
                if (BackpressureHelper.produced(pendingRequests, 1) > 0) {
                    return ctx.getStopAction();
                }
@@ -302,7 +271,7 @@
            }
            public void onError(final Throwable error) {
                final Subscriber<? super LdapRawMessage> sub = subscriber;
                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                if (sub != null) {
                    subscriber = null;
                    sub.onError(error);
@@ -310,7 +279,7 @@
            }
            public void onComplete() {
                final Subscriber<? super LdapRawMessage> sub = subscriber;
                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
                if (sub != null) {
                    subscriber = null;
                    sub.onComplete();
@@ -325,16 +294,17 @@
        private final Connection<?> connection;
        private final AtomicBoolean isClosed = new AtomicBoolean(false);
        private final List<DisconnectListener> listeners = new LinkedList<>();
        private final List<ConnectionEventListener> listeners = new LinkedList<>();
        private SaslServer saslServer;
        GrizzlyBackpressureSubscription upstream;
        private GrizzlyBackpressureSubscription downstream;
        private ClientConnectionImpl(final Connection<?> connection) {
            this.connection = connection;
        }
        NextAction handleRead(final FilterChainContext ctx)  {
            final GrizzlyBackpressureSubscription immutableRef = upstream;
        @Override
        public NextAction handleRead(final FilterChainContext ctx)  {
            final GrizzlyBackpressureSubscription immutableRef = downstream;
            if (immutableRef != null) {
                return immutableRef.handleRead(ctx);
            }
@@ -342,29 +312,35 @@
            return ctx.getSuspendAction();
        }
        void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
            final GrizzlyBackpressureSubscription immutableRef = upstream;
            if (immutableRef != null) {
        @Override
        public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
            final GrizzlyBackpressureSubscription immutableRef = downstream;
            if (immutableRef == null) {
                ctx.getConnection().closeSilently();
            } else {
                immutableRef.onError(error);
            }
        }
        NextAction handleClose(final FilterChainContext ctx) {
            final GrizzlyBackpressureSubscription immutableRef = upstream;
        @Override
        public NextAction handleClose(final FilterChainContext ctx) {
            if (isClosed.compareAndSet(false, true)) {
                final GrizzlyBackpressureSubscription immutableRef = downstream;
            if (immutableRef != null) {
                immutableRef.onComplete();
                    downstream.onComplete();
                }
            }
            return ctx.getStopAction();
        }
        Stream<LdapRawMessage> read() {
            return streamFromPublisher(new Publisher<LdapRawMessage>() {
        Stream<LdapRequestEnvelope> read() {
            return streamFromPublisher(new Publisher<LdapRequestEnvelope>() {
                @Override
                public void subscribe(final Subscriber<? super LdapRawMessage> subscriber) {
                    if (upstream != null) {
                public void subscribe(final Subscriber<? super LdapRequestEnvelope> subscriber) {
                    if (downstream != null) {
                        return;
                    }
                    upstream = new GrizzlyBackpressureSubscription(subscriber);
                    downstream = new GrizzlyBackpressureSubscription(subscriber);
                }
            });
        }
@@ -382,7 +358,7 @@
        public boolean enableTLS(final SSLEngine sslEngine, final boolean startTls) {
            Reject.ifNull(sslEngine, "sslEngine must not be null");
            synchronized (this) {
                if (isFilterExists(SSLFilter.class)) {
                if (filterExists(SSLFilter.class)) {
                    return false;
                }
                SSLUtils.setSSLEngine(connection, sslEngine);
@@ -392,10 +368,17 @@
        }
        @Override
        public SSLSession getSSLSession() {
            final SSLEngine sslEngine = SSLUtils.getSSLEngine(connection);
            return sslEngine != null ? sslEngine.getSession() : null;
        }
        @Override
        public void enableSASL(final SaslServer saslServer) {
            Reject.ifNull(saslServer, "saslServer must not be null");
            synchronized (this) {
                if (isFilterExists(SaslFilter.class)) {
                if (filterExists(SaslFilter.class)) {
                    // FIXME: The current saslServer must be replaced with the new one
                    return;
                }
                this.saslServer = saslServer;
@@ -404,6 +387,11 @@
        }
        @Override
        public SaslServer getSASLServer() {
            return saslServer;
        }
        @Override
        public InetSocketAddress getLocalAddress() {
            return (InetSocketAddress) connection.getLocalAddress();
        }
@@ -438,15 +426,15 @@
            int ssf = 0;
            final String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
            if (SaslFilter.SASL_AUTH_INTEGRITY.equalsIgnoreCase(qop)) {
                ssf = 1;
                return 1;
            } else if (SaslFilter.SASL_AUTH_CONFIDENTIALITY.equalsIgnoreCase(qop)) {
                final String negStrength = (String) saslServer.getNegotiatedProperty(Sasl.STRENGTH);
                if ("low".equalsIgnoreCase(negStrength)) {
                    ssf = 40;
                    return 40;
                } else if ("medium".equalsIgnoreCase(negStrength)) {
                    ssf = 56;
                    return 56;
                } else if ("high".equalsIgnoreCase(negStrength)) {
                    ssf = 128;
                    return 128;
                }
                /*
                 * Treat anything else as if not security is provided and keep the server running
@@ -456,12 +444,6 @@
        }
        @Override
        public SSLSession getSSLSession() {
            final SSLEngine sslEngine = SSLUtils.getSSLEngine(connection);
            return sslEngine != null ? sslEngine.getSession() : null;
        }
        @Override
        public String toString() {
            final StringBuilder builder = new StringBuilder();
            builder.append("LDAPClientContext(");
@@ -482,7 +464,7 @@
            GrizzlyUtils.addFilterToConnection(filter, connection);
        }
        private boolean isFilterExists(Class<?> filterKlass) {
        private boolean filterExists(Class<?> filterKlass) {
            synchronized (this) {
                final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
                for (final Filter filter : currentFilterChain) {
@@ -495,7 +477,7 @@
        }
        @Override
        public void onDisconnect(DisconnectListener listener) {
        public void addConnectionEventListener(ConnectionEventListener listener) {
            Reject.ifNull(listener, "listener must not be null");
            listeners.add(listener);
        }
@@ -504,8 +486,8 @@
        public void disconnect() {
            if (isClosed.compareAndSet(false, true)) {
                try {
                    for (DisconnectListener listener : listeners) {
                        listener.connectionDisconnected(this, null, null);
                    for (ConnectionEventListener listener : listeners) {
                        listener.handleConnectionDisconnected(this, null, null);
                    }
                } finally {
                    closeConnection();
@@ -514,9 +496,9 @@
        }
        private void closeConnection() {
            if (upstream != null) {
                upstream.cancel();
                upstream = null;
            if (downstream != null) {
                downstream.cancel();
                downstream = null;
            }
            connection.closeSilently();
        }
@@ -526,11 +508,11 @@
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
                                                     .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION)
                                                     .setOID(OID_NOTICE_OF_DISCONNECTION)
                                                     .setDiagnosticMessage(diagnosticMessage));
                try {
                    for (DisconnectListener listener : listeners) {
                        listener.connectionDisconnected(this, resultCode, diagnosticMessage);
                    for (ConnectionEventListener listener : listeners) {
                        listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
                    }
                } finally {
                    closeConnection();
@@ -538,11 +520,11 @@
            }
        }
        private void notifyConnectionClosed(final LdapRawMessage unbindRequest) {
        private void notifyConnectionClosed(final LdapRequestEnvelope unbindRequest) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                if (unbindRequest == null) {
                    doNotifySilentlyConnectionClosed(null);
                    notifySilentlyConnectionClosed(null);
                } else {
                    try {
                        LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
@@ -550,23 +532,23 @@
                                    @Override
                                    public void unbindRequest(int messageID, UnbindRequest unbindRequest)
                                            throws DecodeException, IOException {
                                        doNotifySilentlyConnectionClosed(unbindRequest);
                                        notifySilentlyConnectionClosed(unbindRequest);
                                    }
                                });
                    } catch (Exception e) {
                        doNotifySilentlyConnectionClosed(null);
                        notifySilentlyConnectionClosed(null);
                    }
                }
            }
        }
        private void doNotifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
        private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
            try {
                for (final DisconnectListener listener : listeners) {
                for (final ConnectionEventListener listener : listeners) {
                    try {
                        listener.connectionClosed(this, unbindRequest);
                        listener.handleConnectionClosed(this, unbindRequest);
                    } catch (Exception e) {
                        // TODO: Log as a warning ?
                        logger.traceException(e);
                    }
                }
            } finally {
@@ -578,11 +560,11 @@
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                try {
                    for (final DisconnectListener listener : listeners) {
                    for (final ConnectionEventListener listener : listeners) {
                        try {
                            listener.exceptionOccurred(this, error);
                            listener.handleConnectionError(this, error);
                        } catch (Exception e) {
                            // TODO: Log as a warning ?
                            logger.traceException(e);
                        }
                    }
                } finally {
@@ -598,7 +580,7 @@
        @Override
        public void sendUnsolicitedNotification(final ExtendedResult notification) {
            connection.write(LdapMessages.newResponseMessage(LDAP.OP_TYPE_EXTENDED_RESPONSE, 0, notification));
            connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification));
        }
    }
}
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -15,12 +15,15 @@
 */
package org.forgerock.opendj.grizzly;
import static com.forgerock.opendj.ldap.CoreMessages.ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED;
import static org.forgerock.opendj.io.LDAP.*;
import static org.forgerock.opendj.ldap.spi.LdapMessages.newRequestEnvelope;
import java.io.IOException;
import org.forgerock.opendj.io.LDAPWriter;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -30,40 +33,31 @@
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import org.forgerock.opendj.ldap.spi.LdapMessages;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.attributes.AttributeStorage;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
/**
 * Decodes {@link LdapRequestEnvelope} and encodes {@link Response}. This class keeps a state to handler the Ldap V2
 * detection, a LdapCodec instance cannot be shared accross different connection
 */
abstract class LdapCodec extends LDAPBaseFilter {
    private static final Attribute<Boolean> IS_LDAP_V2 = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
            .createAttribute(LdapCodec.class.getName() + ".IsLdapV2", Boolean.FALSE);
    private static final Attribute<Boolean> IS_LDAP_V2_PENDING = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
            .createAttribute(LdapCodec.class.getName() + ".PendingLdapV2", Boolean.FALSE);
    private boolean isLdapV2Pending = false;
    private boolean isLdapV2 = false;
    LdapCodec(final int maxElementSize, final DecodeOptions decodeOptions) {
        super(decodeOptions, maxElementSize);
    }
    @Override
    public NextAction handleAccept(FilterChainContext ctx) throws IOException {
        // Default value mechanism of Grizzly's attribute doesn't seems to work.
        IS_LDAP_V2.set(ctx.getConnection(), Boolean.FALSE);
        return ctx.getInvokeAction();
    }
    @Override
    public NextAction handleRead(final FilterChainContext ctx) throws IOException {
        try {
            final Buffer buffer = ctx.getMessage();
            final LdapRawMessage message;
            final LdapRequestEnvelope message;
            message = readMessage(buffer, ctx.getConnection());
            if (message != null) {
@@ -73,9 +67,7 @@
            return ctx.getStopAction(getRemainingBuffer(buffer));
        } catch (Exception e) {
            onLdapCodecError(ctx, e);
            // make the connection deaf to any following input
            // onLdapDecodeError call will take care of error processing
            // and closing the connection
            ctx.getConnection().closeSilently();
            final NextAction suspendAction = ctx.getSuspendAction();
            ctx.completeAndRecycle();
            return suspendAction;
@@ -84,7 +76,7 @@
    protected abstract void onLdapCodecError(FilterChainContext ctx, Throwable error);
    private LdapRawMessage readMessage(final Buffer buffer, final AttributeStorage attributeStorage)
    private LdapRequestEnvelope readMessage(final Buffer buffer, final AttributeStorage attributeStorage)
            throws IOException {
        try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
            final int packetStart = buffer.position();
@@ -95,7 +87,8 @@
            final int length = reader.peekLength();
            if (length > maxASN1ElementSize) {
                buffer.position(packetStart);
                throw new IllegalStateException();
                throw DecodeException.fatalError(
                        ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED.get(length, maxASN1ElementSize));
            }
            final Buffer packet = buffer.slice(packetStart, buffer.position() + length);
@@ -105,7 +98,7 @@
        }
    }
    private LdapRawMessage decodePacket(final ASN1BufferReader reader, final AttributeStorage attributeStorage)
    private LdapRequestEnvelope decodePacket(final ASN1BufferReader reader, final AttributeStorage attributeStorage)
            throws IOException {
        reader.mark();
        try {
@@ -114,17 +107,17 @@
            final byte messageType = reader.peekType();
            final ByteString rawDn;
            final int protocolVersion;
            final int ldapVersion;
            switch (messageType) {
            case OP_TYPE_BIND_REQUEST:
                reader.readStartSequence(messageType);
                protocolVersion = (int) reader.readInteger();
                ldapVersion = (int) reader.readInteger();
                rawDn = reader.readOctetString();
                IS_LDAP_V2_PENDING.set(attributeStorage, protocolVersion == 2);
                isLdapV2Pending = ldapVersion == 2;
                break;
            case OP_TYPE_DELETE_REQUEST:
                rawDn = reader.readOctetString(messageType);
                protocolVersion = -1;
                ldapVersion = -1;
                break;
            case OP_TYPE_ADD_REQUEST:
            case OP_TYPE_COMPARE_REQUEST:
@@ -133,13 +126,13 @@
            case OP_TYPE_SEARCH_REQUEST:
                reader.readStartSequence(messageType);
                rawDn = reader.readOctetString();
                protocolVersion = -1;
                ldapVersion = -1;
                break;
            default:
                rawDn = null;
                protocolVersion = -1;
                ldapVersion = -1;
            }
            return LdapMessages.newRawMessage(messageType, messageId, protocolVersion, rawDn, reader);
            return newRequestEnvelope(messageType, messageId, ldapVersion, rawDn, reader);
        } finally {
            reader.reset();
        }
@@ -152,11 +145,10 @@
    @Override
    public NextAction handleWrite(final FilterChainContext ctx) throws IOException {
        final LdapResponseMessage response = ctx.<LdapResponseMessage>getMessage();
        if (response.getMessageType() == OP_TYPE_BIND_RESPONSE) {
            final Boolean isLdapV2 = IS_LDAP_V2_PENDING.remove(ctx.getConnection());
            IS_LDAP_V2.set(ctx.getConnection(), isLdapV2);
        if (response.getMessageType() == OP_TYPE_BIND_RESPONSE && ((BindResult) response.getContent()).isSuccess()) {
            isLdapV2 = isLdapV2Pending;
        }
        final int protocolVersion = IS_LDAP_V2.get(ctx.getConnection()) ? 2 : 3;
        final int protocolVersion = isLdapV2 ? 2 : 3;
        final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(ctx.getMemoryManager(), protocolVersion);
        try {
@@ -165,9 +157,7 @@
            return ctx.getInvokeAction();
        } catch (Exception e) {
            onLdapCodecError(ctx, e);
            // make the connection deaf to any following input
            // onLdapDecodeError call will take care of error processing
            // and closing the connection
            ctx.getConnection().closeSilently();
            final NextAction suspendAction = ctx.getSuspendAction();
            ctx.completeAndRecycle();
            return suspendAction;
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -15,7 +15,10 @@
 */
package org.forgerock.opendj.grizzly;
import java.util.concurrent.CancellationException;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.WriteHandler;
import org.reactivestreams.Subscriber;
@@ -23,55 +26,79 @@
import com.forgerock.reactive.Completable;
final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage>, CompletionHandler {
    private final Connection<?> connection;
    private final Completable.Subscriber completable;
    private final Completable.Subscriber downstream;
    private Subscription upstream;
    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber downstream) {
        this.connection = connection;
        this.completable = completable;
        this.downstream = downstream;
    }
    @Override
    public void onSubscribe(Subscription s) {
        this.upstream = s;
        requestMore();
    public void onSubscribe(final Subscription s) {
        if (upstream != null) {
            s.cancel();
            return;
    }
    private void requestMore() {
        if (connection.canWrite()) {
            upstream.request(1);
        } else {
        upstream = s;
            connection.notifyCanWrite(new WriteHandler() {
                @Override
                public void onWritePossible() throws Exception {
                    upstream.request(1);
                final Subscription sub = upstream;
                if (sub != null) {
                    sub.request(1);
                }
                }
                @Override
                public void onError(Throwable t) {
                    upstream.cancel();
                    completable.onError(t);
            public void onError(final Throwable error) {
                LdapResponseMessageWriter.this.onError(error);
                }
            });
        }
    @Override
    public void onNext(final LdapResponseMessage message) {
        connection.write(message).addCompletionHandler(this);
    }
    @Override
    public void onNext(LdapResponseMessage message) {
        connection.write(message);
        requestMore();
    public void completed(final Object result) {
        final Subscription sub = upstream;
        if (sub != null) {
            sub.request(1);
        }
    }
    @Override
    public void onError(Throwable t) {
        completable.onError(t);
    public void cancelled() {
        failed(new CancellationException());
    }
    @Override
    public void failed(final Throwable error) {
        onError(error);
    }
    @Override
    public void updated(final Object result) {
        // Nothing to do
    }
    @Override
    public void onError(final Throwable error) {
        upstream.cancel();
        upstream = null;
        downstream.onError(error);
    }
    @Override
    public void onComplete() {
        completable.onComplete();
        upstream.cancel();
        upstream = null;
        downstream.onComplete();
    }
}
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
@@ -20,6 +20,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.Connections.*;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
@@ -34,6 +35,7 @@
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -90,6 +92,8 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/**
 * Tests the {@code ConnectionFactory} classes.
 */
@@ -544,11 +548,12 @@
                    }
                });
        LDAPListener listener = new LDAPListener(new InetSocketAddress("127.0.0.1", 0), mockServer);
        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), mockServer));
        final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
        try {
            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(
                    ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getHostName(),
                    ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getPort());
            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(listenerAddr.getHostName(),
                    listenerAddr.getPort());
            final Connection client = clientFactory.getConnection();
            connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
            MockConnectionEventListener mockListener = null;
@@ -627,12 +632,12 @@
                    }
                });
        LDAPListener listener = new LDAPListener(new InetSocketAddress("127.0.0.1", 0), mockServer);
        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), mockServer));
        final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
        try {
            LDAPConnectionFactory clientFactory =
                    new LDAPConnectionFactory(
                            listener.getSocketAddresses().iterator().next().getHostName(),
                            listener.getSocketAddresses().iterator().next().getPort());
            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(listenerAddr.getHostName(),
                    listenerAddr.getPort());
            final Connection client = clientFactory.getConnection();
            connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
            try {
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
@@ -17,7 +17,7 @@
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.CommonLDAPOptions.*;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
import static org.forgerock.opendj.ldap.TestCaseUtils.*;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -71,6 +72,8 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/**
 * Tests the {@link LDAPConnectionFactory} class.
 */
@@ -349,16 +352,19 @@
    private LDAPListener createServer() {
        try {
            return new LDAPListener(loopbackWithDynamicPort(),
            return new LDAPListener(
                    Collections.singleton(loopbackWithDynamicPort()),
                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                    new ServerConnectionFactory<LDAPClientContext, Integer>() {
                        @Override
                        public ServerConnection<Integer> handleAccept(
                                final LDAPClientContext clientContext) throws LdapException {
                            public ServerConnection<Integer> handleAccept(final LDAPClientContext clientContext)
                                    throws LdapException {
                            context.set(clientContext);
                            connectLatch.release();
                            return serverConnection;
                        }
                    });
                        })
            );
        } catch (IOException e) {
            fail("Unable to create LDAP listener", e);
            return null;
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionTestCase.java
@@ -11,23 +11,23 @@
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2013-2015 ForgeRock AS.
 * Copyright 2013-2016 ForgeRock AS.
 */
package org.forgerock.opendj.grizzly;
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.util.time.Duration.duration;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.REQUEST_TIMEOUT;
import static org.forgerock.util.time.Duration.duration;
import static org.mockito.Mockito.*;
import java.net.InetSocketAddress;
import java.util.Collections;
import org.forgerock.opendj.ldap.Connections;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.RequestHandler;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SdkTestCase;
@@ -43,6 +43,8 @@
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/**
 * Tests LDAP connection implementation class.
 */
@@ -74,9 +76,9 @@
         * and leave the client waiting forever for a response.
         */
        @SuppressWarnings("unchecked")
        LDAPListener listener =
                new LDAPListener(address, Connections
                        .newServerConnectionFactory(mock(RequestHandler.class)));
        LDAPListener listener = new LDAPListener(Collections.singleton(address),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                        Connections.newServerConnectionFactory(mock(RequestHandler.class))));
        /*
         * Use a very long time out in order to prevent the timeout thread from
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -16,8 +16,20 @@
 */
package org.forgerock.opendj.grizzly;
import static java.util.Arrays.asList;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.Connections.*;
import static org.forgerock.opendj.ldap.LDAPListener.REQUEST_MAX_SIZE_IN_BYTES;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
import static org.forgerock.opendj.ldap.TestCaseUtils.*;
import static org.forgerock.util.Options.defaultOptions;
import static org.mockito.Mockito.mock;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -32,9 +44,9 @@
import org.forgerock.opendj.ldap.LDAPConnectionFactory;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LdapResultHandler;
import org.forgerock.opendj.ldap.ProviderNotFoundException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.LdapResultHandler;
import org.forgerock.opendj.ldap.SdkTestCase;
import org.forgerock.opendj.ldap.SearchResultHandler;
import org.forgerock.opendj.ldap.ServerConnection;
@@ -61,16 +73,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static java.util.Arrays.asList;
import static org.fest.assertions.Assertions.*;
import static org.fest.assertions.Fail.*;
import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
import static org.forgerock.opendj.ldap.Connections.newRoundRobinLoadBalancer;
import static org.forgerock.opendj.ldap.LdapException.*;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.TestCaseUtils.*;
import static org.forgerock.util.Options.defaultOptions;
import static org.mockito.Mockito.*;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/** Tests the LDAPListener class. */
@SuppressWarnings("javadoc")
@@ -201,7 +204,9 @@
    public void testCreateLDAPListener() throws Exception {
        // test no exception is thrown, which means transport provider is
        // correctly loaded
        LDAPListener listener = new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class));
        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                        mock(ServerConnectionFactory.class)));
        listener.close();
    }
@@ -212,8 +217,10 @@
        // test no exception is thrown, which means transport provider is correctly loaded
        Options options = defaultOptions().set(TRANSPORT_PROVIDER_CLASS_LOADER,
                                                       Thread.currentThread().getContextClassLoader());
        LDAPListener listener =
                new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class), options);
        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS),
                        mock(ServerConnectionFactory.class)),
                options);
        listener.close();
    }
@@ -223,8 +230,10 @@
        expectedExceptionsMessageRegExp = "^The requested provider 'unknown' .*")
    public void testCreateLDAPListenerFailureProviderNotFound() throws Exception {
        Options options = defaultOptions().set(TRANSPORT_PROVIDER, "unknown");
        LDAPListener listener
            = new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class), options);
        LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS),
                        mock(ServerConnectionFactory.class)),
                options);
        listener.close();
    }
@@ -239,8 +248,10 @@
        final MockServerConnection serverConnection = new MockServerConnection();
        final MockServerConnectionFactory serverConnectionFactory =
                new MockServerConnectionFactory(serverConnection);
        final LDAPListener listener = new LDAPListener(loopbackWithDynamicPort(), serverConnectionFactory);
        final InetSocketAddress addr = (InetSocketAddress) listener.getSocketAddresses().iterator().next();
        final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                        serverConnectionFactory));
        final InetSocketAddress addr = listener.firstSocketAddress();
        try {
            // Connect and close.
            final Connection connection =
@@ -268,9 +279,10 @@
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
        final InetSocketAddress onlineAddr = onlineServerListener.getSocketAddresses().iterator().next();
        final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                        onlineServerConnectionFactory));
        final InetSocketAddress onlineAddr = onlineServerListener.firstSocketAddress();
        try {
            // Connection pool and load balancing tests.
@@ -312,9 +324,10 @@
                    };
            final LDAPListener proxyListener =
                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
            final InetSocketAddress proxyAddr = proxyListener.getSocketAddresses().iterator().next();
            final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                            proxyServerConnectionFactory));
            final InetSocketAddress proxyAddr = proxyListener.firstSocketAddress();
            try {
                // Connect and close.
                final Connection connection =
@@ -349,9 +362,10 @@
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
        final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                        onlineServerConnectionFactory));
        final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
        try {
            // Connection pool and load balancing tests.
@@ -401,10 +415,10 @@
            final MockServerConnectionFactory proxyServerConnectionFactory =
                    new MockServerConnectionFactory(proxyServerConnection);
            final LDAPListener proxyListener =
                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
            final InetSocketAddress proxyAddr =
                    (InetSocketAddress) proxyListener.getSocketAddresses().iterator().next();
            final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                            proxyServerConnectionFactory));
            final InetSocketAddress proxyAddr = (InetSocketAddress) proxyListener.firstSocketAddress();
            try {
                // Connect, bind, and close.
@@ -444,9 +458,10 @@
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
        final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                        onlineServerConnectionFactory));
        final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
        try {
            final MockServerConnection proxyServerConnection = new MockServerConnection();
@@ -494,8 +509,9 @@
                    };
            final LDAPListener proxyListener =
                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
            final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                            proxyServerConnectionFactory));
            try {
                // Connect and close.
                final Connection connection =
@@ -529,9 +545,10 @@
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
        final LDAPListener onlineServerListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                        onlineServerConnectionFactory));
        final InetSocketAddress onlineServerAddr = onlineServerListener.firstSocketAddress();
        try {
            final MockServerConnection proxyServerConnection = new MockServerConnection() {
@@ -574,11 +591,12 @@
                }
            };
            final InetSocketAddress proxyAddr = findFreeSocketAddress();
            final MockServerConnectionFactory proxyServerConnectionFactory =
                    new MockServerConnectionFactory(proxyServerConnection);
            final LDAPListener proxyListener =
                    new LDAPListener(proxyAddr, proxyServerConnectionFactory);
            final LDAPListener proxyListener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                    new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS),
                            proxyServerConnectionFactory));
            final InetSocketAddress proxyAddr = proxyListener.firstSocketAddress();
            try {
                // Connect, bind, and close.
                final Connection connection =
@@ -617,8 +635,9 @@
        final MockServerConnectionFactory factory =
                new MockServerConnectionFactory(serverConnection);
        final Options options = defaultOptions().set(REQUEST_MAX_SIZE_IN_BYTES, 2048);
        final InetSocketAddress addr = findFreeSocketAddress();
        final LDAPListener listener = new LDAPListener(addr, factory, options);
        final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), factory), options);
        final InetSocketAddress addr = listener.firstSocketAddress();
        Connection connection = null;
        try {
@@ -674,11 +693,10 @@
    @Test
    public void testServerDisconnect() throws Exception {
        final MockServerConnection serverConnection = new MockServerConnection();
        final MockServerConnectionFactory factory =
                new MockServerConnectionFactory(serverConnection);
        final LDAPListener listener =
                new LDAPListener(loopbackWithDynamicPort(), factory);
        final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
        final MockServerConnectionFactory factory = new MockServerConnectionFactory(serverConnection);
        final LDAPListener listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), factory));
        final InetSocketAddress listenerAddr = listener.firstSocketAddress();
        final Connection connection;
        try {
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -17,9 +17,8 @@
package org.forgerock.opendj.examples;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.AUTHN_BIND_REQUEST;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.HEARTBEAT_ENABLED;
import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import java.io.IOException;
@@ -39,6 +38,8 @@
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.util.Options;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/**
 * An LDAP load balancing proxy which forwards requests to one or more remote
 * Directory Servers. This is implementation is very simple and is only intended
@@ -142,7 +143,8 @@
        final Options options = Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096);
        LDAPListener listener = null;
        try {
            listener = new LDAPListener(localAddress, localPort, connectionHandler, options);
            listener = new LDAPListener(localAddress, localPort, new ServerConnectionFactoryAdapter(
                    options.get(LDAP_DECODE_OPTIONS), connectionHandler), options);
            System.out.println("Press any key to stop the server...");
            System.in.read();
        } catch (final IOException e) {
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
@@ -12,13 +12,13 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2009-2010 Sun Microsystems, Inc.
 * Portions Copyright 2011-2015 ForgeRock AS.
 * Portions Copyright 2011-2016 ForgeRock AS.
 */
package org.forgerock.opendj.examples;
import static org.forgerock.opendj.ldap.Connections.newCachedConnectionPool;
import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import java.io.IOException;
@@ -64,6 +64,8 @@
import org.forgerock.opendj.ldap.schema.AttributeType;
import org.forgerock.util.Options;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/**
 * This example is based on the {@link Proxy}. This example does no load
 * balancing, but instead rewrites attribute descriptions and DN suffixes in
@@ -419,7 +421,8 @@
        final Options listenerOptions = Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096);
        LDAPListener listener = null;
        try {
            listener = new LDAPListener(localAddress, localPort, connectionHandler, listenerOptions);
            listener = new LDAPListener(localAddress, localPort, new ServerConnectionFactoryAdapter(
                    listenerOptions.get(LDAP_DECODE_OPTIONS), connectionHandler), listenerOptions);
            System.out.println("Press any key to stop the server...");
            System.in.read();
        } catch (final IOException e) {
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java
@@ -17,7 +17,7 @@
package org.forgerock.opendj.examples;
import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
import static org.forgerock.opendj.ldap.LDAPListener.*;
import java.io.FileInputStream;
import java.io.IOException;
@@ -38,6 +38,8 @@
import org.forgerock.opendj.ldif.LDIFEntryReader;
import org.forgerock.util.Options;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/**
 * An LDAP directory server which exposes data contained in an LDIF file. This
 * is implementation is very simple and is only intended as an example:
@@ -116,10 +118,13 @@
                            }
                        };
                listener = new LDAPListener(localAddress, localPort, sslWrapper, options);
                listener = new LDAPListener(localAddress, localPort,
                        new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), sslWrapper), options);
            } else {
                // No SSL.
                listener = new LDAPListener(localAddress, localPort, connectionHandler, options);
                listener = new LDAPListener(localAddress, localPort,
                        new ServerConnectionFactoryAdapter(options.get(LDAP_DECODE_OPTIONS), connectionHandler),
                        options);
            }
            System.out.println("Press any key to stop the server...");
            System.in.read();
opendj-ldap-toolkit/src/test/java/com/forgerock/opendj/ldap/tools/ToolLdapServer.java
@@ -16,9 +16,11 @@
package com.forgerock.opendj.ldap.tools;
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.forgerock.opendj.ldap.TestCaseUtils.loopbackWithDynamicPort;
import static org.forgerock.opendj.ldap.LDAPListener.LDAP_DECODE_OPTIONS;
import java.io.IOException;
import java.util.Collections;
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPClientContext;
@@ -44,6 +46,9 @@
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.util.Options;
import com.forgerock.reactive.ServerConnectionFactoryAdapter;
/** Mocked request handler sketelon for tools test cases. */
class ToolLdapServer implements ServerConnectionFactory<LDAPClientContext, Integer> {
@@ -153,7 +158,8 @@
    }
    void start() throws IOException {
        listener = new LDAPListener(findFreeSocketAddress(), this);
        listener = new LDAPListener(Collections.singleton(loopbackWithDynamicPort()),
                new ServerConnectionFactoryAdapter(Options.defaultOptions().get(LDAP_DECODE_OPTIONS), this));
    }
    void stop() {
@@ -161,10 +167,10 @@
    }
    String getHostName() {
        return listener.getSocketAddress().getHostName();
        return listener.getSocketAddresses().iterator().next().getHostName();
    }
    String getPort() {
        return Integer.toString(listener.getSocketAddress().getPort());
        return Integer.toString(listener.getSocketAddresses().iterator().next().getPort());
    }
}
opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java
@@ -460,7 +460,9 @@
    /**
     * Converts from OpenDJ server {@link org.opends.server.types.Attribute} to OpenDJ LDAP SDK
     * {@link org.forgerock.opendj.ldap.Attribute}.
     * {@link org.forgerock.opendj.ldap.Attribute}. This method is an optimization of the equivalent from() method. It's
     * only used in encoding/decoding and as such only wrap the required methods: the returned object partially wrap the
     * incoming one.
     *
     * @param attribute
     *            value to convert
@@ -625,9 +627,10 @@
    }
    /**
     * Converts from OpenDJ server
     * {@link org.opends.server.types.SearchResultEntry} to OpenDJ LDAP SDK
     * {@link org.forgerock.opendj.ldap.responses.SearchResultEntry}.
     * Converts from OpenDJ server {@link org.opends.server.types.SearchResultEntry} to OpenDJ LDAP SDK
     * {@link org.forgerock.opendj.ldap.responses.SearchResultEntry}. This method is an optimization of the equivalent
     * from() method. It's only used in encoding/decoding and as such only wrap the required methods: the returned
     * object partially wrap the incoming one.
     *
     * @param srvResultEntry
     *          value to convert
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -54,14 +54,14 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.AbandonOperationBasis;
@@ -123,7 +123,7 @@
 * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
 */
public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
        ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
        ReactiveHandler<QueueingStrategy, LdapRequestEnvelope, Stream<Response>> {
    private static final String REACTIVE_OUT = "reactive.out";
    /** The tracer object for the debug logger. */
@@ -230,9 +230,9 @@
        }
        connectionID = DirectoryServer.newConnectionAccepted(this);
        clientContext.onDisconnect(new DisconnectListener() {
        clientContext.addConnectionEventListener(new ConnectionEventListener() {
            @Override
            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
            public void handleConnectionError(LDAPClientContext context, Throwable error) {
                if (error instanceof LocalizableException) {
                    disconnect(
                            DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
@@ -242,13 +242,13 @@
            }
            @Override
            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
            public void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode,
                    String diagnosticMessage) {
                disconnect(DisconnectReason.SERVER_ERROR, false, null);
            }
            @Override
            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
            public void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
                disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
            }
        });
@@ -372,7 +372,7 @@
     */
    @Override
    public boolean isSecure() {
        return false;
        return clientContext.getSSLSession() != null || clientContext.getSASLServer() != null;
    }
    /**
@@ -405,7 +405,7 @@
        // an error result to the client indicating that a problem occurred.
        if (removeOperationInProgress(operation.getMessageID())) {
            final Response response = operationToResponse(operation);
            final FlowableEmitter<Response> out = getOut(operation);
            final FlowableEmitter<Response> out = getAttachedEmitter(operation);
            if (response != null) {
                out.onNext(response);
            }
@@ -534,22 +534,18 @@
     *            The search result entry to be sent to the client
     */
    @Override
    public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
        getEmitter(searchOperation).onNext(toResponse(searchEntry));
    public void sendSearchEntry(final SearchOperation searchOperation, final SearchResultEntry searchEntry) {
        getAttachedEmitter(searchOperation).onNext(toResponse(searchEntry));
    }
    private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
        return getOut(searchOperation);
    }
    private Response toResponse(SearchResultEntry searchEntry) {
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
    }
    private FlowableEmitter<Response> getOut(Operation operation) {
    private FlowableEmitter<Response> getAttachedEmitter(final Operation operation) {
        return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
    }
    private Response toResponse(final SearchResultEntry searchEntry) {
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
    }
    /**
     * Sends the provided search result reference to the client.
     *
@@ -572,7 +568,7 @@
            return false;
        }
        final FlowableEmitter<Response> out = getOut(searchOperation);
        final FlowableEmitter<Response> out = getAttachedEmitter(searchOperation);
        out.onNext(Converters.from(searchReference));
        return true;
@@ -589,7 +585,7 @@
    @Override
    protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
        final Operation operation = intermediateResponse.getOperation();
        final FlowableEmitter<Response> out = getOut(operation);
        final FlowableEmitter<Response> emitter = getAttachedEmitter(operation);
        final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
                intermediateResponse.getValue());
@@ -597,7 +593,7 @@
            response.addControl(Converters.from(control));
        }
        out.onNext(response);
        emitter.onNext(response);
        // The only reason we shouldn't continue processing is if the
        // connection is closed.
@@ -962,14 +958,15 @@
     *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
     */
    @Override
    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return streamFromPublisher(
                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
        return streamFromPublisher(new BlockingBackpressureSubscription(
                Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                        processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                    }
                }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
                }, BackpressureStrategy.ERROR)))
                .onNext(new Consumer<Response>() {
                    @Override
                    public void accept(final Response response) throws Exception {
                        if (keepStats) {
@@ -1003,7 +1000,7 @@
        }
    }
    private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
    private final byte toLdapResponseType(final LdapRequestEnvelope rawRequest, final Response response) {
        if (response instanceof Result) {
            return toLdapResultType(rawRequest.getMessageType());
        }
@@ -1160,8 +1157,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1229,17 +1226,17 @@
            versionString = "2";
            if (!connectionHandler.allowLDAPv2()) {
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                        ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
                                    .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                out.onComplete();
                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
                return false;
            }
            if (!controls.isEmpty()) {
                // LDAPv2 clients aren't allowed to send controls.
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                        ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
                                    .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                out.onComplete();
                disconnectControlsNotAllowed();
                return false;
@@ -1390,8 +1387,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1514,8 +1511,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1569,8 +1566,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1626,8 +1623,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -49,17 +49,16 @@
import org.forgerock.opendj.config.server.ConfigChangeResult;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.config.server.ConfigurationChangeListener;
import org.forgerock.opendj.grizzly.GrizzlyLDAPListener;
import org.forgerock.opendj.ldap.AddressMask;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
import org.forgerock.opendj.server.config.server.LDAPConnectionHandlerCfg;
import org.forgerock.util.Function;
@@ -129,7 +128,7 @@
    /** SSL instance name used in context creation. */
    private static final String SSL_CONTEXT_INSTANCE_NAME = "TLS";
    private GrizzlyLDAPListener listener;
    private LDAPListener listener;
    /** The current configuration state. */
    private LDAPConnectionHandlerCfg currentConfig;
@@ -636,44 +635,45 @@
    }
    private void startListener() throws IOException {
        listener = new GrizzlyLDAPListener(
        listener = new LDAPListener(
                listenAddresses,
                Options.defaultOptions().set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
                        .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()),
                new Function<LDAPClientContext,
                             ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                             ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>,
                             LdapException>() {
                    @Override
                    public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
                    public ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>> apply(
                            LDAPClientContext clientContext) throws LdapException {
                        final LDAPClientConnection2 conn = canAccept(clientContext);
                        connectionList.add(conn);
                        clientContext.onDisconnect(new DisconnectListener() {
                        clientContext.addConnectionEventListener(new ConnectionEventListener() {
                            @Override
                            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
                            public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
                                connectionList.remove(conn);
                            }
                            @Override
                            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
                                    String diagnosticMessage) {
                            public void handleConnectionDisconnected(final LDAPClientContext context,
                                    final ResultCode resultCode, String diagnosticMessage) {
                                connectionList.remove(conn);
                            }
                            @Override
                            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
                            public void handleConnectionClosed(final LDAPClientContext context,
                                    final UnbindRequest unbindRequest) {
                                connectionList.remove(conn);
                            }
                        });
                        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
                        return new ReactiveHandler<LDAPClientContext, LdapRequestEnvelope, Stream<Response>>() {
                            @Override
                            public Stream<Response> handle(LDAPClientContext context, LdapRawMessage request)
                                    throws Exception {
                            public Stream<Response> handle(final LDAPClientContext context,
                                    final LdapRequestEnvelope request) throws Exception {
                                return conn.handle(queueingStrategy, request);
                            }
                        };
                    }
                });
                }, Options.defaultOptions()
                          .set(LDAPListener.CONNECT_MAX_BACKLOG, backlog)
                          .set(LDAPListener.REQUEST_MAX_SIZE_IN_BYTES, (int) currentConfig.getMaxRequestSize()));
    }
    /**
@@ -684,8 +684,6 @@
    public void run() {
        setName(handlerName);
        boolean starting = true;
        setName(handlerName);
        boolean lastIterationFailed = false;
        while (!shutdownRequested) {
@@ -724,7 +722,6 @@
                // If we have gotten here, then we are about to start listening
                // for the first time since startup or since we were previously disabled.
                // Start the embedded HTTP server
                startListener();
                lastIterationFailed = false;
            } catch (Exception e) {