From 5e9caa5055a78b930500690cb919effd5c8bc2b1 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Fri, 18 Nov 2016 10:31:28 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java |   75 +++++++++++++++++++++++--------------
 1 files changed, 47 insertions(+), 28 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index bab468d..a753eee 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -958,27 +958,29 @@
      */
     @Override
     public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
-        return streamFromPublisher(new BlockingBackpressureSubscription(
-                Flowable.create(new FlowableOnSubscribe<Response>() {
-                    @Override
-                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
-                        try {
-                            processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
-                        } finally {
-                            // We don't need the ASN1Reader anymore.
-                            closeSilently(message.getContent());
-                        }
-                    }
-                }, BackpressureStrategy.ERROR)))
-                .onNext(new Consumer<Response>() {
-                    @Override
-                    public void accept(final Response response) throws Exception {
-                        if (keepStats) {
-                            statTracker.updateMessageWritten(
-                                    toLdapResponseType(message, response), message.getMessageId());
-                        }
-                    }
-                });
+        return streamFromPublisher(
+                new BlockingBackpressureSubscription(connectionHandler.getMaxBlockedWriteTimeLimit(),
+                        Flowable.create(new FlowableOnSubscribe<Response>() {
+                            @Override
+                            public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
+                                try {
+                                    processLDAPMessage(
+                                            queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+                                } finally {
+                                    // We don't need the ASN1Reader anymore.
+                                    closeSilently(message.getContent());
+                                }
+                            }
+                        }, BackpressureStrategy.ERROR)))
+                        .onNext(new Consumer<Response>() {
+                            @Override
+                            public void accept(final Response response) throws Exception {
+                                if (keepStats) {
+                                    statTracker.updateMessageWritten(
+                                            toLdapResponseType(message, response), message.getMessageId());
+                                }
+                            }
+                        });
     }
 
     private final byte toLdapResultType(final byte requestType) {
@@ -1682,14 +1684,15 @@
         private final Condition spaceAvailable = lock.newCondition();
         private final Publisher<Response> upstream;
         private final long writeTimeoutMillis;
+        private boolean upstreamCompleted;
         private Subscription subscription;
         private Subscriber<? super Response> downstream;
 
-        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
+        BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
             this.upstream = upstream;
-            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
+            this.writeTimeoutMillis = maxBlockedWriteTimeLimit == 0
                     ? 30000 // Do not wait indefinitely,
-                    : connectionHandler.getMaxBlockedWriteTimeLimit();
+                    : maxBlockedWriteTimeLimit;
         }
 
         @Override
@@ -1733,6 +1736,12 @@
                     // Forward response
                     pendingRequests--;
                 }
+                if (upstreamCompleted && queue.isEmpty()) {
+                    if (pendingRequests != Long.MIN_VALUE) {
+                        downstream.onComplete();
+                    }
+                    cancel();
+                }
             } finally {
                 spaceAvailable.signalAll();
             }
@@ -1771,17 +1780,27 @@
 
         @Override
         public void onComplete() {
-            downstream.onComplete();
-            cancel();
+            lock.lock();
+            try {
+                // the onComplete() will be forwarded downstream once the pending messages have been flushed
+                upstreamCompleted = true;
+                drain();
+            } finally {
+                lock.unlock();
+            }
         }
 
         @Override
         public void cancel() {
+            lock.lock();
+            try {
+                pendingRequests = Long.MIN_VALUE;
+            } finally {
+                lock.unlock();
+            }
             if (subscription != null) {
                 subscription.cancel();
             }
-            queue.clear();
-            pendingRequests = Long.MIN_VALUE;
         }
     }
 }

--
Gitblit v1.10.0