From bdc5fa0dd0980eb9e077ae80644504705a24e035 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java |   83 +++++++++++++++++++++++++++--------------
 1 files changed, 55 insertions(+), 28 deletions(-)

diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 007ce33..adecbcf 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -15,7 +15,10 @@
  */
 package org.forgerock.opendj.grizzly;
 
+import java.util.concurrent.CancellationException;
+
 import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
+import org.glassfish.grizzly.CompletionHandler;
 import org.glassfish.grizzly.Connection;
 import org.glassfish.grizzly.WriteHandler;
 import org.reactivestreams.Subscriber;
@@ -23,55 +26,79 @@
 
 import com.forgerock.reactive.Completable;
 
-final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
+final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage>, CompletionHandler {
 
     private final Connection<?> connection;
-    private final Completable.Subscriber completable;
+    private final Completable.Subscriber downstream;
     private Subscription upstream;
 
-    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
+    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber downstream) {
         this.connection = connection;
-        this.completable = completable;
+        this.downstream = downstream;
     }
 
     @Override
-    public void onSubscribe(Subscription s) {
-        this.upstream = s;
-        requestMore();
+    public void onSubscribe(final Subscription s) {
+        if (upstream != null) {
+            s.cancel();
+            return;
+        }
+        upstream = s;
+        connection.notifyCanWrite(new WriteHandler() {
+            @Override
+            public void onWritePossible() throws Exception {
+                final Subscription sub = upstream;
+                if (sub != null) {
+                    sub.request(1);
+                }
+            }
+
+            @Override
+            public void onError(final Throwable error) {
+                LdapResponseMessageWriter.this.onError(error);
+            }
+        });
     }
 
-    private void requestMore() {
-        if (connection.canWrite()) {
-            upstream.request(1);
-        } else {
-            connection.notifyCanWrite(new WriteHandler() {
-                @Override
-                public void onWritePossible() throws Exception {
-                    upstream.request(1);
-                }
+    @Override
+    public void onNext(final LdapResponseMessage message) {
+        connection.write(message).addCompletionHandler(this);
+    }
 
-                @Override
-                public void onError(Throwable t) {
-                    upstream.cancel();
-                    completable.onError(t);
-                }
-            });
+    @Override
+    public void completed(final Object result) {
+        final Subscription sub = upstream;
+        if (sub != null) {
+            sub.request(1);
         }
     }
 
     @Override
-    public void onNext(LdapResponseMessage message) {
-        connection.write(message);
-        requestMore();
+    public void cancelled() {
+        failed(new CancellationException());
     }
 
     @Override
-    public void onError(Throwable t) {
-        completable.onError(t);
+    public void failed(final Throwable error) {
+        onError(error);
+    }
+
+    @Override
+    public void updated(final Object result) {
+        // Nothing to do
+    }
+
+    @Override
+    public void onError(final Throwable error) {
+        upstream.cancel();
+        upstream = null;
+        downstream.onError(error);
     }
 
     @Override
     public void onComplete() {
-        completable.onComplete();
+        upstream.cancel();
+        upstream = null;
+        downstream.onComplete();
     }
 }

--
Gitblit v1.10.0