From 2a88ad00862b2241f3b87ef8d4db383c69b54e3a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 24 Nov 2016 13:59:43 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java |  186 ++++++++++++++++++++++++++-------------------
 1 files changed, 107 insertions(+), 79 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 70dc842..defc825 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -32,16 +32,14 @@
 import java.security.cert.Certificate;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLPeerUnverifiedException;
@@ -56,7 +54,7 @@
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -64,6 +62,7 @@
 import org.forgerock.opendj.ldap.responses.Responses;
 import org.forgerock.opendj.ldap.responses.Result;
 import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Reject;
 import org.opends.server.api.ClientConnection;
 import org.opends.server.api.ConnectionHandler;
 import org.opends.server.core.AbandonOperationBasis;
@@ -107,6 +106,7 @@
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.types.SearchResultReference;
 import org.opends.server.util.TimeThread;
+import org.reactivestreams.Processor;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
@@ -229,7 +229,7 @@
         }
 
         connectionID = DirectoryServer.newConnectionAccepted(this);
-        clientContext.addConnectionEventListener(new ConnectionEventListener() {
+        clientContext.addListener(new LDAPClientContextEventListener() {
             @Override
             public void handleConnectionError(LDAPClientContext context, Throwable error) {
                 if (error instanceof LocalizableException) {
@@ -1676,17 +1676,17 @@
     }
 
     /** Upstream -> BlockingBackpressureSubscription -> Downstream. */
-    private final class BlockingBackpressureSubscription
-            implements Subscription, Publisher<Response>, Subscriber<Response> {
-        private long pendingRequests;
-        private final Queue<Response> queue = new LinkedList<>();
-        private final Lock lock = new ReentrantLock();
-        private final Condition spaceAvailable = lock.newCondition();
+    private final class BlockingBackpressureSubscription implements Subscription, Processor<Response, Response> {
+        private final AtomicLong pendingRequests = new AtomicLong();
+        private final AtomicInteger missedDrain = new AtomicInteger();
+        private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(32);
         private final Publisher<Response> upstream;
         private final long writeTimeoutMillis;
-        private boolean upstreamCompleted;
         private Subscription subscription;
         private Subscriber<? super Response> downstream;
+        private volatile boolean done;
+        private Throwable error;
+        private volatile boolean cancelled;
 
         BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
             this.upstream = upstream;
@@ -1697,17 +1697,19 @@
 
         @Override
         public void subscribe(final Subscriber<? super Response> subscriber) {
+            Reject.ifNull(subscriber);
             if (downstream != null) {
+                // This publisher only support one subscriber.
                 return;
             }
             downstream = subscriber;
-            subscriber.onSubscribe(this);
-            upstream.subscribe(this);
+            subscriber.onSubscribe(/* Subscription */ this);
+            upstream.subscribe(/* Subscriber */ this);
         }
 
         @Override
         public void onSubscribe(final Subscription s) {
-            if ( subscription != null) {
+            if (subscription != null) {
                 s.cancel();
                 return;
             }
@@ -1716,91 +1718,117 @@
         }
 
         @Override
-        public void request(long n) {
-            lock.lock();
-            try {
-                if (pendingRequests != Long.MIN_VALUE) {
-                    pendingRequests += n;
-                    drain();
+        public void request(final long n) {
+            if (n == Long.MAX_VALUE) {
+                pendingRequests.set(Long.MAX_VALUE);
+            } else {
+                // There is a known and accepted problem here regarding reactive-stream contract in the sense that
+                // we're not supporting pendingRequests overflow (pendingRequests + n > Long.MAX_VALUE) for performance
+                // reason since this should never happen in the context we're using it.
+                pendingRequests.addAndGet(n);
+            }
+            drain();
+        }
+
+        // Taken from
+        // https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation
+        private void drain() {
+            if (missedDrain.getAndIncrement() != 0) {
+                // Another thread is already executing this drain method.
+                return;
+            }
+
+            int missed = 1;
+
+            for (;;) {
+                final long immutablePendingRequests = pendingRequests.get();
+                long emitted = 0L;
+                while (emitted != immutablePendingRequests) {
+                    // Check if we should early exit because of cancellation
+                    if (cancelled) {
+                        return;
+                    }
+
+                    final Response response = queue.poll();
+                    if (response != null) {
+                        downstream.onNext(response);
+                        emitted++;
+                    } else if (done) {
+                        // queue is empty and we received a completion (onError/onComplete) notification from upstream
+                        forwardDoneEvent();
+                        return;
+                    } else {
+                        // Queue is empty but upstream is not done yet.
+                        break;
+                    }
                 }
-            } finally {
-                lock.unlock();
+
+                // Check if an onError/onComplete from upstream arrived.
+                if (emitted == immutablePendingRequests) {
+                    if (cancelled) {
+                        return;
+                    }
+
+                    if (done && queue.isEmpty()) {
+                        forwardDoneEvent();
+                        return;
+                    }
+                }
+
+                if (emitted != 0) {
+                    pendingRequests.addAndGet(-emitted);
+                }
+
+                // Check to see if another thread asked for drain
+                missed = missedDrain.addAndGet(-missed);
+                if (missed == 0) {
+                    // Nop, we can exit.
+                    break;
+                }
             }
         }
 
-        private void drain() {
-            Response response;
-            try {
-                while (pendingRequests > 0 && (response = queue.poll()) != null) {
-                    downstream.onNext(response);
-                    // Forward response
-                    pendingRequests--;
-                }
-                if (upstreamCompleted && queue.isEmpty()) {
-                    if (pendingRequests != Long.MIN_VALUE) {
-                        downstream.onComplete();
-                    }
-                    cancel();
-                }
-            } finally {
-                spaceAvailable.signalAll();
+        private void forwardDoneEvent() {
+            final Throwable immutableError = error;
+            if (immutableError != null) {
+                downstream.onError(immutableError);
+            } else {
+                downstream.onComplete();
             }
         }
 
         @Override
         public void onNext(final Response response) {
-            lock.lock();
             try {
-                while (queue.size() >= 32) {
-                    try {
-                        if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
-                            // If we've gotten here, then the write timed out.
-                            downstream.onError(new ClosedChannelException());
-                            cancel();
-                            return;
-                        }
-                    } catch (InterruptedException e) {
-                        downstream.onError(e);
-                        cancel();
-                        return;
-                    }
+                if (queue.offer(response, writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
+                    drain();
+                } else {
+                    // If we've gotten here, then the write timed out.
+                    onError(new ClosedChannelException().fillInStackTrace());
+                    return;
                 }
-                queue.add(response);
-                drain();
-            } finally {
-                lock.unlock();
+            } catch (InterruptedException e) {
+                onError(e);
             }
         }
 
         @Override
-        public void onError(Throwable t) {
-            downstream.onError(t);
-            cancel();
+        public void onError(final Throwable error) {
+            this.error = error;
+            done = true;
+            drain();
         }
 
         @Override
         public void onComplete() {
-            lock.lock();
-            try {
-                // the onComplete() will be forwarded downstream once the pending messages have been flushed
-                upstreamCompleted = true;
-                drain();
-            } finally {
-                lock.unlock();
-            }
+            done = true;
+            drain();
         }
 
         @Override
         public void cancel() {
-            lock.lock();
-            try {
-                pendingRequests = Long.MIN_VALUE;
-            } finally {
-                lock.unlock();
-            }
-            if (subscription != null) {
-                subscription.cancel();
-            }
+            cancelled = true;
+            subscription.cancel();
         }
     }
 }

--
Gitblit v1.10.0