From b59e161bd2d26df6023efea77353c8f3f61dd37b Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java                         |   30 +++++----------
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java             |   15 +++++--
 opendj-core/src/main/java/com/forgerock/reactive/Stream.java                                |   21 ++++------
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java |   30 +++++++++++++++
 4 files changed, 60 insertions(+), 36 deletions(-)

diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index a5a6231..3fb0ec5 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -211,26 +211,6 @@
         }
 
         @Override
-        public void subscribe(final Consumer<V> onResult, final Consumer<Throwable> onError, final Action onComplete) {
-            impl.subscribe(new io.reactivex.functions.Consumer<V>() {
-                @Override
-                public void accept(V t) throws Exception {
-                    onResult.accept(t);
-                }
-            }, new io.reactivex.functions.Consumer<Throwable>() {
-                @Override
-                public void accept(Throwable t) throws Exception {
-                    onError.accept(t);
-                }
-            }, new io.reactivex.functions.Action() {
-                @Override
-                public void run() throws Exception {
-                    onComplete.run();
-                }
-            });
-        }
-
-        @Override
         public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
             return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
                 @Override
@@ -252,6 +232,16 @@
         }
 
         @Override
+        public Stream<V> onCompleteDo(final Action action) {
+            return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
+                @Override
+                public void run() throws Exception {
+                    action.run();
+                }
+            }));
+        }
+
+        @Override
         public void subscribe() {
             impl.subscribe();
         }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
index 68e6ae0..ebeca68 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -53,18 +53,6 @@
             int maxConcurrency);
 
     /**
-     * Subscribe to the data emitted by this {@link Stream}.
-     *
-     * @param onResult
-     *            The {@link Consumer} invoked for each data of this stream
-     * @param onError
-     *            The {@link Consumer} invoked on error in this stream.
-     * @param onComplete
-     *            The {@link Action} invoked once this {@link Stream} is completed.
-     */
-    void subscribe(Consumer<V> onResult, Consumer<Throwable> onError, Action onComplete);
-
-    /**
      * When an error occurs in this stream, continue the processing with the new {@link Stream} provided by the
      * function.
      *
@@ -84,6 +72,15 @@
     Stream<V> onErrorDo(Consumer<Throwable> onError);
 
     /**
+     * Invokes the on complete {@link Action} when this stream is completed.
+     *
+     * @param onComplete
+     *            The {@link Action} to invoke on stream completion
+     * @return a new {@link Stream}
+     */
+    Stream<V> onCompleteDo(Action onComplete);
+
+    /**
      * Subscribe to this stream and drop all data produced by it.
      */
     void subscribe();
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 8532b9d..d2f8c98 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -69,6 +69,7 @@
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
+import com.forgerock.reactive.Action;
 import com.forgerock.reactive.Completable;
 import com.forgerock.reactive.Consumer;
 import com.forgerock.reactive.ReactiveHandler;
@@ -162,7 +163,7 @@
                 @Override
                 public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
                     if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
-                        clientContext.notifyAndClose(rawRequest);
+                        clientContext.notifyConnectionClosed(rawRequest);
                         return singleFrom(DUMMY);
                     }
                     Single<Stream<Response>> response;
@@ -197,7 +198,13 @@
             .onErrorDo(new Consumer<Throwable>() {
                 @Override
                 public void accept(final Throwable error) throws Exception {
-                    clientContext.notifyAndCloseSilently(error);
+                    clientContext.notifyErrorAndCloseSilently(error);
+                }
+            })
+            .onCompleteDo(new Action() {
+                @Override
+                public void run() throws Exception {
+                    clientContext.notifyConnectionClosed(null);
                 }
             })
             .subscribe();
@@ -511,7 +518,7 @@
             }
         }
 
-        private void notifyAndClose(final LdapRawMessage unbindRequest) {
+        private void notifyConnectionClosed(final LdapRawMessage unbindRequest) {
             // Close this connection context.
             if (isClosed.compareAndSet(false, true)) {
                 if (unbindRequest == null) {
@@ -547,7 +554,7 @@
             }
         }
 
-        private void notifyAndCloseSilently(final Throwable error) {
+        private void notifyErrorAndCloseSilently(final Throwable error) {
             // Close this connection context.
             if (isClosed.compareAndSet(false, true)) {
                 try {
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 7341f9b..0e14bea 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.forgerock.i18n.LocalizableException;
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.LocalizableMessageBuilder;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -41,7 +42,9 @@
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.LDAPClientContext;
+import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
 import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.CompareResult;
 import org.forgerock.opendj.ldap.responses.Response;
 import org.forgerock.opendj.ldap.responses.Responses;
@@ -221,6 +224,33 @@
     }
 
     connectionID = DirectoryServer.newConnectionAccepted(this);
+    clientContext.onDisconnect(new DisconnectListener()
+    {
+      @Override
+      public void exceptionOccurred(LDAPClientContext context, Throwable error)
+      {
+        if (error instanceof LocalizableException)
+        {
+          disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
+        }
+        else
+        {
+          disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
+        }
+      }
+
+      @Override
+      public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage)
+      {
+        disconnect(DisconnectReason.SERVER_ERROR, false, null);
+      }
+
+      @Override
+      public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest)
+      {
+        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
+      }
+    });
   }
 
   /**

--
Gitblit v1.10.0