From bb8d8ab8ac1bc14b26abf45c8fda5cf571c1c9bb Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-core/src/main/java/com/forgerock/reactive/Completable.java                            |   11 ++
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java              |    2 
 opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java      |    9 +-
 opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java                          |   23 +++++
 opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java                         |    6 
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java              |   95 ++++++++++-------------
 opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java                        |    2 
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java |    3 
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java  |   34 +------
 9 files changed, 91 insertions(+), 94 deletions(-)

diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index 73a62f9..b8391d6 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -15,6 +15,7 @@
  */
 package com.forgerock.reactive;
 
+import org.forgerock.util.Function;
 import org.reactivestreams.Publisher;
 
 /** {@link Completable} is used to communicates a terminated operation which doesn't produce a result. */
@@ -46,6 +47,16 @@
     }
 
     /**
+     * When an error occurs in this completable, continue the processing with the new {@link Completable} provided by
+     * the function.
+     *
+     * @param function
+     *            Generates the stream which must will used to resume operation when this {@link Completable} failed.
+     * @return A new {@link Completable}
+     */
+    Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
+
+    /**
      * Creates a {@link Single} which will emit the specified value when this {@link Completable} complete.
      *
      * @param <V>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java
index 7a23201..d04212c 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java
@@ -131,7 +131,7 @@
         final ReactiveFilter<C, I1, O1, I2, O2> parent = this;
         return new ReactiveHandler<C, I1, O1>() {
             @Override
-            public Single<O1> handle(final C context, final I1 request) throws Exception {
+            public O1 handle(final C context, final I1 request) throws Exception {
                 return parent.filter(context, request, handler);
             }
         };
@@ -150,7 +150,7 @@
      * @throws Exception
      *             If the operation cannot be done
      */
-    public abstract Single<O1> filter(final C context, final I1 request, final ReactiveHandler<C, I2, O2> next)
+    public abstract O1 filter(final C context, final I1 request, final ReactiveHandler<C, I2, O2> next)
             throws Exception;
 
     private static final class ConcatenatedFilter<C, I1, O1, I2, O2> extends ReactiveFilter<C, I1, O1, I2, O2> {
@@ -178,7 +178,7 @@
         }
 
         @Override
-        public Single<O1> filter(C context, I1 request, ReactiveHandler<C, I2, O2> handler) throws Exception {
+        public O1 filter(C context, I1 request, ReactiveHandler<C, I2, O2> handler) throws Exception {
             return converter.apply(handler).handle(context, request);
         }
     }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
index 849b511..689e54c 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
@@ -37,5 +37,5 @@
      * @throws Exception
      *             if the request cannot be processed
      */
-    Single<REP> handle(final CTX context, final REQ request) throws Exception;
+    REP handle(final CTX context, final REQ request) throws Exception;
 }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 6e6367c..35ed67c 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -20,6 +20,7 @@
 
 import io.reactivex.CompletableEmitter;
 import io.reactivex.CompletableOnSubscribe;
+import io.reactivex.CompletableSource;
 import io.reactivex.Flowable;
 import io.reactivex.SingleEmitter;
 import io.reactivex.SingleOnSubscribe;
@@ -176,6 +177,17 @@
         }));
     }
 
+    /**
+     * Create a new {@link Completable} from the given error.
+     *
+     * @param error
+     *            The error emitted by this {@link Completable}
+     * @return A new {@link Completable}
+     */
+    public static Completable completableError(final Throwable error) {
+        return new RxJavaCompletable(io.reactivex.Completable.error(error));
+    }
+
     private static final class RxJavaStream<V> implements Stream<V> {
 
         private final Flowable<V> impl;
@@ -339,5 +351,16 @@
         public void subscribe(org.reactivestreams.Subscriber<? super Void> s) {
             impl.<Void>toFlowable().subscribe(s);
         }
+
+        @Override
+        public Completable onErrorResumeWith(final Function<Throwable, Completable, Exception> function) {
+            return new RxJavaCompletable(
+                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, CompletableSource>() {
+                        @Override
+                        public CompletableSource apply(Throwable error) throws Exception {
+                            return io.reactivex.Completable.fromPublisher(function.apply(error));
+                        }
+                    }));
+        }
     }
 }
diff --git a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
index f3ee127..126f97b 100644
--- a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
+++ b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -15,7 +15,7 @@
  */
 package com.forgerock.opendj.grizzly;
 
-import static com.forgerock.reactive.RxJavaStreams.*;
+import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -56,7 +56,6 @@
 import org.forgerock.util.Options;
 
 import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
 import com.forgerock.reactive.Stream;
 
 import io.reactivex.BackpressureStrategy;
@@ -121,10 +120,10 @@
         final DecodeOptions decodeOptions = options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS);
         return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
             @Override
-            public Single<Stream<Response>> handle(final LDAPClientContext context,
+            public Stream<Response> handle(final LDAPClientContext context,
                     final LdapRawMessage rawRequest) throws Exception {
                 final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(), decodeOptions);
-                return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
+                return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
                     @Override
                     public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
                         reader.readMessage(new AbstractLDAPMessageHandler() {
@@ -190,7 +189,7 @@
                         });
                         emitter.onComplete();
                     }
-                }, BackpressureStrategy.ERROR)));
+                }, BackpressureStrategy.ERROR));
             }
         };
     }
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
index 9ff4040..d752449 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
@@ -135,7 +135,7 @@
     private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
     /** Initial size of newly created buffers. */
-    private static final int BUFFER_INIT_SIZE = 4096;
+    private static final int BUFFER_INIT_SIZE = 1024;
     /** Default maximum size for cached protocol/entry encoding buffers. */
     private static final int DEFAULT_MAX_INTERNAL_BUFFER_SIZE = 32 * 1024;
 
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 5650e98..189f31c 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -72,7 +72,6 @@
 import com.forgerock.reactive.Action;
 import com.forgerock.reactive.Completable;
 import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
 import com.forgerock.reactive.Stream;
 
 import io.reactivex.internal.util.BackpressureHelper;
@@ -83,8 +82,6 @@
  */
 final class LDAPServerFilter extends BaseFilter {
 
-    private static final Object DUMMY = new byte[0];
-
     private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
             .createAttribute("LDAPServerConnection");
 
@@ -153,59 +150,47 @@
         final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
                 connectionHandlerFactory.apply(clientContext);
 
-        clientContext
-            .read()
-            .flatMap(new Function<LdapRawMessage, Publisher<Object>, Exception>() {
-                @Override
-                public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
-                    if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
-                        clientContext.notifyConnectionClosed(rawRequest);
-                        return singleFrom(DUMMY);
-                    }
-                    Single<Stream<Response>> response;
-                    try {
-                        response = requestHandler.handle(clientContext, rawRequest);
-                    } catch (Exception e) {
-                        response = singleError(e);
-                    }
-                    return response
-                            .flatMap(new Function<Stream<Response>, Single<Object>, Exception>() {
-                                @Override
-                                public Single<Object> apply(final Stream<Response> response) {
-                                    return clientContext.write(response.map(toLdapResponseMessage(rawRequest)))
-                                                        .toSingle(DUMMY);
+        clientContext.read().flatMap(new Function<LdapRawMessage, Publisher<Void>, Exception>() {
+            @Override
+            public Publisher<Void> apply(final LdapRawMessage rawRequest) throws Exception {
+                if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
+                    clientContext.notifyConnectionClosed(rawRequest);
+                    return emptyStream();
+                }
+                Stream<Response> response;
+                try {
+                    response = requestHandler.handle(clientContext, rawRequest);
+                } catch (Exception e) {
+                    response = streamError(e);
+                }
+                return clientContext
+                        .write(response.map(toLdapResponseMessage(rawRequest)))
+                        .onErrorResumeWith(new Function<Throwable, Completable, Exception>() {
+                            @Override
+                            public Completable apply(final Throwable error) throws Exception {
+                                if (!(error instanceof LdapException)) {
+                                    // Unexpected error, propagate it.
+                                    return completableError(error);
                                 }
-                            })
-                            .onErrorResumeWith(new Function<Throwable, Single<Object>, Exception>() {
-                                @Override
-                                public Single<Object> apply(final Throwable error) throws Exception {
-                                    if (!(error instanceof LdapException)) {
-                                        // Unexpected error, propagate it.
-                                        return singleError(error);
-                                    }
-                                    final LdapException exception = (LdapException) error;
-                                    return clientContext
-                                            .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())))
-                                            .toSingle(DUMMY);
-                                }
-                            });
-                }
-            }, maxConcurrentRequests)
-            .onErrorResumeWith(new Function<Throwable, Publisher<Object>, Exception>() {
-                @Override
-                public Publisher<Object> apply(Throwable error) throws Exception {
-                    clientContext.notifyErrorAndCloseSilently(error);
-                    // Swallow the error to prevent the subscribe() below to report it on the console.
-                    return streamFrom(DUMMY);
-                }
-            })
-            .onCompleteDo(new Action() {
-                @Override
-                public void run() throws Exception {
-                    clientContext.notifyConnectionClosed(null);
-                }
-            })
-            .subscribe();
+                                final LdapException exception = (LdapException) error;
+                                return clientContext
+                                        .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())));
+                            }
+                        });
+            }
+        }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
+            @Override
+            public Publisher<Void> apply(Throwable error) throws Exception {
+                clientContext.notifyErrorAndCloseSilently(error);
+                // Swallow the error to prevent the subscribe() below to report it on the console.
+                return emptyStream();
+            }
+        }).onCompleteDo(new Action() {
+            @Override
+            public void run() throws Exception {
+                clientContext.notifyConnectionClosed(null);
+            }
+        }).subscribe();
         return ctx.getStopAction();
     }
 
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index efe9929..af9a708 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -16,30 +16,11 @@
  */
 package org.forgerock.opendj.reactive;
 
-import static com.forgerock.reactive.RxJavaStreams.*;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE;
+import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
+import static org.forgerock.opendj.io.LDAP.*;
 import static org.opends.messages.CoreMessages.*;
 import static org.opends.messages.ProtocolMessages.*;
 import static org.opends.server.loggers.AccessLogger.logDisconnect;
-import static org.opends.server.protocols.ldap.LDAPConstants.*;
 import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
 import static org.opends.server.util.StaticUtils.*;
 
@@ -130,7 +111,6 @@
 
 import com.forgerock.reactive.Consumer;
 import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
 import com.forgerock.reactive.Stream;
 
 import io.reactivex.BackpressureStrategy;
@@ -982,8 +962,8 @@
      *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
      */
     @Override
-    public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
-        return singleFrom(streamFromPublisher(
+    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
+        return streamFromPublisher(
                 new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
                     @Override
                     public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
@@ -997,7 +977,7 @@
                                     toLdapResponseType(message, response), message.getMessageId());
                         }
                     }
-                }));
+                });
     }
 
     private final byte toLdapResultType(final byte requestType) {
@@ -1851,8 +1831,8 @@
 
         BlockingBackpressureSubscription(final Publisher<Response> upstream) {
             this.upstream = upstream;
-            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0 
-                    ? 30000 // Do not wait indefinitely, 
+            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
+                    ? 30000 // Do not wait indefinitely,
                     : connectionHandler.getMaxBlockedWriteTimeLimit();
         }
 
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index 8127e01..b7dc22f 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -90,7 +90,6 @@
 import org.opends.server.util.StaticUtils;
 
 import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
 import com.forgerock.reactive.Stream;
 
 /**
@@ -668,7 +667,7 @@
                         });
                         return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
                             @Override
-                            public Single<Stream<Response>> handle(LDAPClientContext context, LdapRawMessage request)
+                            public Stream<Response> handle(LDAPClientContext context, LdapRawMessage request)
                                     throws Exception {
                                 return conn.handle(queueingStrategy, request);
                             }

--
Gitblit v1.10.0