mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
03.09.2016 bb8d8ab8ac1bc14b26abf45c8fda5cf571c1c9bb
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

Removed Single from ReactiveHandler/ReactiveFilter, use Stream directly
9 files modified
185 ■■■■ changed files
opendj-core/src/main/java/com/forgerock/reactive/Completable.java 11 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java 6 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java 2 ●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java 23 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java 9 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java 2 ●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 95 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 34 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java 3 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -15,6 +15,7 @@
 */
package com.forgerock.reactive;
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
/** {@link Completable} is used to communicates a terminated operation which doesn't produce a result. */
@@ -46,6 +47,16 @@
    }
    /**
     * When an error occurs in this completable, continue the processing with the new {@link Completable} provided by
     * the function.
     *
     * @param function
     *            Generates the stream which must will used to resume operation when this {@link Completable} failed.
     * @return A new {@link Completable}
     */
    Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
    /**
     * Creates a {@link Single} which will emit the specified value when this {@link Completable} complete.
     *
     * @param <V>
opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java
@@ -131,7 +131,7 @@
        final ReactiveFilter<C, I1, O1, I2, O2> parent = this;
        return new ReactiveHandler<C, I1, O1>() {
            @Override
            public Single<O1> handle(final C context, final I1 request) throws Exception {
            public O1 handle(final C context, final I1 request) throws Exception {
                return parent.filter(context, request, handler);
            }
        };
@@ -150,7 +150,7 @@
     * @throws Exception
     *             If the operation cannot be done
     */
    public abstract Single<O1> filter(final C context, final I1 request, final ReactiveHandler<C, I2, O2> next)
    public abstract O1 filter(final C context, final I1 request, final ReactiveHandler<C, I2, O2> next)
            throws Exception;
    private static final class ConcatenatedFilter<C, I1, O1, I2, O2> extends ReactiveFilter<C, I1, O1, I2, O2> {
@@ -178,7 +178,7 @@
        }
        @Override
        public Single<O1> filter(C context, I1 request, ReactiveHandler<C, I2, O2> handler) throws Exception {
        public O1 filter(C context, I1 request, ReactiveHandler<C, I2, O2> handler) throws Exception {
            return converter.apply(handler).handle(context, request);
        }
    }
opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
@@ -37,5 +37,5 @@
     * @throws Exception
     *             if the request cannot be processed
     */
    Single<REP> handle(final CTX context, final REQ request) throws Exception;
    REP handle(final CTX context, final REQ request) throws Exception;
}
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -20,6 +20,7 @@
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.CompletableSource;
import io.reactivex.Flowable;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
@@ -176,6 +177,17 @@
        }));
    }
    /**
     * Create a new {@link Completable} from the given error.
     *
     * @param error
     *            The error emitted by this {@link Completable}
     * @return A new {@link Completable}
     */
    public static Completable completableError(final Throwable error) {
        return new RxJavaCompletable(io.reactivex.Completable.error(error));
    }
    private static final class RxJavaStream<V> implements Stream<V> {
        private final Flowable<V> impl;
@@ -339,5 +351,16 @@
        public void subscribe(org.reactivestreams.Subscriber<? super Void> s) {
            impl.<Void>toFlowable().subscribe(s);
        }
        @Override
        public Completable onErrorResumeWith(final Function<Throwable, Completable, Exception> function) {
            return new RxJavaCompletable(
                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, CompletableSource>() {
                        @Override
                        public CompletableSource apply(Throwable error) throws Exception {
                            return io.reactivex.Completable.fromPublisher(function.apply(error));
                        }
                    }));
        }
    }
}
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -15,7 +15,7 @@
 */
package com.forgerock.opendj.grizzly;
import static com.forgerock.reactive.RxJavaStreams.*;
import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -56,7 +56,6 @@
import org.forgerock.util.Options;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureStrategy;
@@ -121,10 +120,10 @@
        final DecodeOptions decodeOptions = options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS);
        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
            @Override
            public Single<Stream<Response>> handle(final LDAPClientContext context,
            public Stream<Response> handle(final LDAPClientContext context,
                    final LdapRawMessage rawRequest) throws Exception {
                final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(), decodeOptions);
                return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
                return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
                        reader.readMessage(new AbstractLDAPMessageHandler() {
@@ -190,7 +189,7 @@
                        });
                        emitter.onComplete();
                    }
                }, BackpressureStrategy.ERROR)));
                }, BackpressureStrategy.ERROR));
            }
        };
    }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
@@ -135,7 +135,7 @@
    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
    /** Initial size of newly created buffers. */
    private static final int BUFFER_INIT_SIZE = 4096;
    private static final int BUFFER_INIT_SIZE = 1024;
    /** Default maximum size for cached protocol/entry encoding buffers. */
    private static final int DEFAULT_MAX_INTERNAL_BUFFER_SIZE = 32 * 1024;
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -72,7 +72,6 @@
import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.internal.util.BackpressureHelper;
@@ -83,8 +82,6 @@
 */
final class LDAPServerFilter extends BaseFilter {
    private static final Object DUMMY = new byte[0];
    private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
            .createAttribute("LDAPServerConnection");
@@ -153,59 +150,47 @@
        final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
                connectionHandlerFactory.apply(clientContext);
        clientContext
            .read()
            .flatMap(new Function<LdapRawMessage, Publisher<Object>, Exception>() {
                @Override
                public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
                    if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
                        clientContext.notifyConnectionClosed(rawRequest);
                        return singleFrom(DUMMY);
                    }
                    Single<Stream<Response>> response;
                    try {
                        response = requestHandler.handle(clientContext, rawRequest);
                    } catch (Exception e) {
                        response = singleError(e);
                    }
                    return response
                            .flatMap(new Function<Stream<Response>, Single<Object>, Exception>() {
                                @Override
                                public Single<Object> apply(final Stream<Response> response) {
                                    return clientContext.write(response.map(toLdapResponseMessage(rawRequest)))
                                                        .toSingle(DUMMY);
        clientContext.read().flatMap(new Function<LdapRawMessage, Publisher<Void>, Exception>() {
            @Override
            public Publisher<Void> apply(final LdapRawMessage rawRequest) throws Exception {
                if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
                    clientContext.notifyConnectionClosed(rawRequest);
                    return emptyStream();
                }
                Stream<Response> response;
                try {
                    response = requestHandler.handle(clientContext, rawRequest);
                } catch (Exception e) {
                    response = streamError(e);
                }
                return clientContext
                        .write(response.map(toLdapResponseMessage(rawRequest)))
                        .onErrorResumeWith(new Function<Throwable, Completable, Exception>() {
                            @Override
                            public Completable apply(final Throwable error) throws Exception {
                                if (!(error instanceof LdapException)) {
                                    // Unexpected error, propagate it.
                                    return completableError(error);
                                }
                            })
                            .onErrorResumeWith(new Function<Throwable, Single<Object>, Exception>() {
                                @Override
                                public Single<Object> apply(final Throwable error) throws Exception {
                                    if (!(error instanceof LdapException)) {
                                        // Unexpected error, propagate it.
                                        return singleError(error);
                                    }
                                    final LdapException exception = (LdapException) error;
                                    return clientContext
                                            .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())))
                                            .toSingle(DUMMY);
                                }
                            });
                }
            }, maxConcurrentRequests)
            .onErrorResumeWith(new Function<Throwable, Publisher<Object>, Exception>() {
                @Override
                public Publisher<Object> apply(Throwable error) throws Exception {
                    clientContext.notifyErrorAndCloseSilently(error);
                    // Swallow the error to prevent the subscribe() below to report it on the console.
                    return streamFrom(DUMMY);
                }
            })
            .onCompleteDo(new Action() {
                @Override
                public void run() throws Exception {
                    clientContext.notifyConnectionClosed(null);
                }
            })
            .subscribe();
                                final LdapException exception = (LdapException) error;
                                return clientContext
                                        .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())));
                            }
                        });
            }
        }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
            @Override
            public Publisher<Void> apply(Throwable error) throws Exception {
                clientContext.notifyErrorAndCloseSilently(error);
                // Swallow the error to prevent the subscribe() below to report it on the console.
                return emptyStream();
            }
        }).onCompleteDo(new Action() {
            @Override
            public void run() throws Exception {
                clientContext.notifyConnectionClosed(null);
            }
        }).subscribe();
        return ctx.getStopAction();
    }
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -16,30 +16,11 @@
 */
package org.forgerock.opendj.reactive;
import static com.forgerock.reactive.RxJavaStreams.*;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE;
import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
import static org.forgerock.opendj.io.LDAP.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
import static org.opends.server.protocols.ldap.LDAPConstants.*;
import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
import static org.opends.server.util.StaticUtils.*;
@@ -130,7 +111,6 @@
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureStrategy;
@@ -982,8 +962,8 @@
     *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
     */
    @Override
    public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return singleFrom(streamFromPublisher(
    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return streamFromPublisher(
                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
@@ -997,7 +977,7 @@
                                    toLdapResponseType(message, response), message.getMessageId());
                        }
                    }
                }));
                });
    }
    private final byte toLdapResultType(final byte requestType) {
@@ -1851,8 +1831,8 @@
        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
            this.upstream = upstream;
            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
                    ? 30000 // Do not wait indefinitely,
            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
                    ? 30000 // Do not wait indefinitely,
                    : connectionHandler.getMaxBlockedWriteTimeLimit();
        }
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -90,7 +90,6 @@
import org.opends.server.util.StaticUtils;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
/**
@@ -668,7 +667,7 @@
                        });
                        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
                            @Override
                            public Single<Stream<Response>> handle(LDAPClientContext context, LdapRawMessage request)
                            public Stream<Response> handle(LDAPClientContext context, LdapRawMessage request)
                                    throws Exception {
                                return conn.handle(queueingStrategy, request);
                            }