From 2a88ad00862b2241f3b87ef8d4db383c69b54e3a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 24 Nov 2016 13:59:43 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java      |   58 ++++++++
 opendj-core/src/main/java/com/forgerock/reactive/Completable.java                            |   27 +++
 opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java         |    4 
 opendj-core/clirr-ignored-api-changes.xml                                                    |    2 
 opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java                   |   53 +------
 opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java                          |   15 ++
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java              |   55 +++----
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java |    4 
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java  |  186 +++++++++++++++-----------
 9 files changed, 238 insertions(+), 166 deletions(-)

diff --git a/opendj-core/clirr-ignored-api-changes.xml b/opendj-core/clirr-ignored-api-changes.xml
index 1d6d8ce..ab682cc 100644
--- a/opendj-core/clirr-ignored-api-changes.xml
+++ b/opendj-core/clirr-ignored-api-changes.xml
@@ -173,7 +173,7 @@
   <difference>
     <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
     <differenceType>7012</differenceType>
-    <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContext$ConnectionEventListener)</method>
+    <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContextEventListener)</method>
     <justification>Allows to register connection state listener</justification>
   </difference>
   <difference>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index 3f12e71..638faba 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -63,14 +63,14 @@
     Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
 
     /**
-     * Subscribe to the result of this {@link Completable}.
+     * Returns a {@link Completable} instance that calls the given onTerminate callback after this Completable completes
+     * normally or with an exception.
      *
-     * @param completeAction
-     *            An {@link Action} which will be invoked on successful completion
-     * @param errorConsumer
-     *            A {@link Consumer} which will be invoked on error
+     * @param onAfterTerminate
+     *            the callback to call after this {@link Completable} terminates
+     * @return the new {@link Completable} instance
      */
-    void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
+    Completable doAfterTerminate(Action onAfterTerminate);
 
     /**
      * Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
@@ -82,4 +82,19 @@
      * @return A new {@link Single}
      */
     <V> Single<V> toSingle(V value);
+
+    /**
+     * Subscribe to the result of this {@link Completable}.
+     *
+     * @param completeAction
+     *            An {@link Action} which will be invoked on successful completion
+     * @param errorConsumer
+     *            A {@link Consumer} which will be invoked on error
+     */
+    void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
+
+    /**
+     * Subscribes to this {@link Completable}.
+     */
+    void subscribe();
 }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 5944468..b3370c3 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -379,5 +379,20 @@
                 }
             });
         }
+
+        @Override
+        public Completable doAfterTerminate(final Action onTerminate) {
+            return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.functions.Action() {
+                @Override
+                public void run() throws Exception {
+                    onTerminate.run();
+                }
+            }));
+        }
+
+        @Override
+        public void subscribe() {
+            impl.subscribe();
+        }
     }
 }
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
index e185627..0cb33bf 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -28,7 +28,7 @@
 import org.forgerock.opendj.ldap.DecodeOptions;
 import org.forgerock.opendj.ldap.IntermediateResponseHandler;
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
 import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.LdapResultHandler;
 import org.forgerock.opendj.ldap.ResultCode;
@@ -121,7 +121,7 @@
                 final ServerConnection<Integer> serverConnection) {
             this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
             this.adaptee = checkNotNull(serverConnection, "serverConnection must not be null");
-            clientContext.addConnectionEventListener(new ConnectionEventListener() {
+            clientContext.addListener(new LDAPClientContextEventListener() {
                 @Override
                 public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
                     adaptee.handleConnectionError(error);
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
index 47548ef..3fec4e2 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -18,13 +18,11 @@
 package org.forgerock.opendj.ldap;
 
 import java.net.InetSocketAddress;
-import java.util.EventListener;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLSession;
 import javax.security.sasl.SaslServer;
 
-import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
 
 import com.forgerock.reactive.Completable;
@@ -37,62 +35,25 @@
  */
 public interface LDAPClientContext {
 
-    /** Listens for disconnection event. */
-    public interface ConnectionEventListener extends EventListener {
-        /**
-         * Invoked when the connection has been disconnected because of an error (e.g: message too big).
-         *
-         * @param context
-         *            The {@link LDAPClientContext} which has failed
-         * @param error
-         *            The error
-         */
-        void handleConnectionError(LDAPClientContext context, Throwable error);
-
-        /**
-         * Invoked when the client closed the connection, possibly using an unbind request.
-         *
-         * @param context
-         *            The {@link LDAPClientContext} which has been disconnected
-         * @param unbindRequest
-         *            The unbind request, which may be {@code null} if one was not sent before the connection was
-         *            closed.
-         */
-        void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
-
-        /**
-         * Invoked when the connection has been disconnected by the server.
-         *
-         * @param context
-         *            The {@link LDAPClientContext} which has been disconnected
-         * @param resultCode
-         *            The result code which was included with the disconnect notification, or {@code null} if no
-         *            disconnect notification was sent.
-         * @param diagnosticMessage
-         *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
-         */
-        void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
-    }
-
     /**
-     * Register a listener which will be notified when this {@link LDAPClientContext} is disconnected.
+     * Register a listener which will be notified when this {@link LDAPClientContext} changes state.
      *
-     * @param listener The {@link ConnectionEventListener} to register.
+     * @param listener The {@link LDAPClientContextEventListener} to register.
      */
-    void addConnectionEventListener(ConnectionEventListener listener);
+    void addListener(LDAPClientContextEventListener listener);
 
     /**
      * Disconnects the client without sending a disconnect notification. Invoking this method causes
-     * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
-     * before this method returns.
+     * {@link LDAPClientContextEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be
+     * called before this method returns.
      */
     void disconnect();
 
     /**
      * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
      * message. Invoking this method causes
-     * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
-     * before this method returns.
+     * {@link LDAPClientContextEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be
+     * called before this method returns.
      *
      * @param resultCode
      *            The result code to include with the disconnect notification
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java
new file mode 100644
index 0000000..30e0116
--- /dev/null
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java
@@ -0,0 +1,58 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+
+package org.forgerock.opendj.ldap;
+
+import java.util.EventListener;
+
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+
+/** A listener interface for handling LDAPClientContext state changes. */
+public interface LDAPClientContextEventListener extends EventListener {
+    /**
+     * Invoked when the connection has been disconnected because of an error (e.g: message too big).
+     *
+     * @param context
+     *            The {@link LDAPClientContext} which has failed
+     * @param error
+     *            The error
+     */
+    void handleConnectionError(LDAPClientContext context, Throwable error);
+
+    /**
+     * Invoked when the client closed the connection, possibly using an unbind request.
+     *
+     * @param context
+     *            The {@link LDAPClientContext} which has been disconnected
+     * @param unbindRequest
+     *            The unbind request, which may be {@code null} if one was not sent before the connection was
+     *            closed.
+     */
+    void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
+
+    /**
+     * Invoked when the connection has been disconnected by the server.
+     *
+     * @param context
+     *            The {@link LDAPClientContext} which has been disconnected
+     * @param resultCode
+     *            The result code which was included with the disconnect notification, or {@code null} if no
+     *            disconnect notification was sent.
+     * @param diagnosticMessage
+     *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
+     */
+    void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
+}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index d749ddd..5bb9307 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -41,6 +41,7 @@
 import org.forgerock.opendj.ldap.DecodeException;
 import org.forgerock.opendj.ldap.DecodeOptions;
 import org.forgerock.opendj.ldap.LDAPClientContext;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
 import org.forgerock.opendj.ldap.LDAPListener;
 import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.ResultCode;
@@ -76,7 +77,6 @@
 
 import com.forgerock.reactive.Action;
 import com.forgerock.reactive.Completable;
-import com.forgerock.reactive.Consumer;
 import com.forgerock.reactive.ReactiveHandler;
 import com.forgerock.reactive.Stream;
 
@@ -165,7 +165,7 @@
             @Override
             public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
                 if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
-                    clientContext.notifyConnectionClosedSilently(rawRequest);
+                    clientContext.notifyConnectionClosedRawUnbind(rawRequest);
                     return emptyStream();
                 }
                 Stream<Response> response;
@@ -192,7 +192,7 @@
         }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
             @Override
             public Publisher<Void> apply(Throwable error) throws Exception {
-                clientContext.notifyErrorSilently(error);
+                clientContext.notifyConnectionError(error);
                 // Swallow the error to prevent the subscribe() below to report it on the console.
                 return emptyStream();
             }
@@ -306,7 +306,7 @@
 
         private final Connection<?> connection;
         private volatile boolean isClosed;
-        private final List<ConnectionEventListener> connectionEventListeners = new LinkedList<>();
+        private final List<LDAPClientContextEventListener> connectionEventListeners = new LinkedList<>();
         private SaslServer saslServer;
         private GrizzlyBackpressureSubscription downstream;
 
@@ -341,7 +341,7 @@
             if (immutableRef != null) {
                 immutableRef.onComplete();
             }
-            notifyConnectionClosedSilently(null);
+            notifyConnectionClosedRawUnbind(null);
             return ctx.getStopAction();
         }
 
@@ -487,7 +487,7 @@
         }
 
         @Override
-        public void addConnectionEventListener(ConnectionEventListener listener) {
+        public void addListener(final LDAPClientContextEventListener listener) {
             Reject.ifNull(listener, "listener must not be null");
             synchronized (connectionEventListeners) {
                 connectionEventListeners.add(listener);
@@ -496,7 +496,7 @@
 
         @Override
         public void disconnect() {
-            notifySilentlyConnectionDisconnected(null, null);
+            notifyConnectionDisconnected(null, null);
             connection.closeSilently();
         }
 
@@ -507,46 +507,41 @@
 
         @Override
         public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
-            notifySilentlyConnectionDisconnected(resultCode, diagnosticMessage);
+            notifyConnectionDisconnected(resultCode, diagnosticMessage);
             sendUnsolicitedNotification(
                     newGenericExtendedResult(resultCode)
                         .setOID(OID_NOTICE_OF_DISCONNECTION)
                         .setDiagnosticMessage(diagnosticMessage)
-            ).subscribe(new Action() {
+            ).doAfterTerminate(new Action() {
                 @Override
                 public void run() throws Exception {
                     closeConnection();
                 }
-            }, new Consumer<Throwable>() {
-                @Override
-                public void accept(final Throwable error) throws Exception {
-                    closeConnection();
-                }
-            });
+            }).subscribe();
         }
 
-        private void notifyConnectionClosedSilently(final LdapRequestEnvelope unbindRequest) {
+        private void notifyConnectionClosedRawUnbind(final LdapRequestEnvelope rawUnbindRequest) {
             // Close this connection context.
-            if (unbindRequest == null) {
-                notifySilentlyConnectionClosed(null);
+            if (rawUnbindRequest == null) {
+                notifyConnectionClosed(null);
             } else {
                 try {
-                    LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
+                    LDAP.getReader(rawUnbindRequest.getContent(), new DecodeOptions())
                             .readMessage(new AbstractLDAPMessageHandler() {
                                 @Override
-                                public void unbindRequest(int messageID, UnbindRequest unbindRequest)
+                                public void unbindRequest(final int messageID, final UnbindRequest unbindRequest)
                                         throws DecodeException, IOException {
-                                    notifySilentlyConnectionClosed(unbindRequest);
+                                    notifyConnectionClosed(unbindRequest);
                                 }
                             });
                 } catch (Exception e) {
-                    notifySilentlyConnectionClosed(null);
+                    notifyConnectionClosed(null);
                 }
             }
         }
 
-        private void notifySilentlyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
-            for (final ConnectionEventListener listener : getAndClearListeners()) {
+        private void notifyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
+            for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
                 try {
                     listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
                 } catch (Exception e) {
@@ -555,8 +550,8 @@
             }
         }
 
-        private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
-            for (final ConnectionEventListener listener : getAndClearListeners()) {
+        private void notifyConnectionClosed(final UnbindRequest unbindRequest) {
+            for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
                 try {
                     listener.handleConnectionClosed(this, unbindRequest);
                 } catch (Exception e) {
@@ -565,8 +560,8 @@
             }
         }
 
-        private void notifyErrorSilently(final Throwable error) {
-            for (final ConnectionEventListener listener : getAndClearListeners()) {
+        private void notifyConnectionError(final Throwable error) {
+            for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
                 try {
                     listener.handleConnectionError(this, error);
                 } catch (Exception e) {
@@ -575,9 +570,9 @@
             }
         }
 
-        private ArrayList<ConnectionEventListener> getAndClearListeners() {
+        private List<LDAPClientContextEventListener> getAndClearListeners() {
             synchronized (connectionEventListeners) {
-                final ArrayList<ConnectionEventListener> listeners = new ArrayList<>(connectionEventListeners);
+                final ArrayList<LDAPClientContextEventListener> listeners = new ArrayList<>(connectionEventListeners);
                 connectionEventListeners.clear();
                 return listeners;
             }
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 70dc842..defc825 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -32,16 +32,14 @@
 import java.security.cert.Certificate;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLPeerUnverifiedException;
@@ -56,7 +54,7 @@
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -64,6 +62,7 @@
 import org.forgerock.opendj.ldap.responses.Responses;
 import org.forgerock.opendj.ldap.responses.Result;
 import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Reject;
 import org.opends.server.api.ClientConnection;
 import org.opends.server.api.ConnectionHandler;
 import org.opends.server.core.AbandonOperationBasis;
@@ -107,6 +106,7 @@
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.types.SearchResultReference;
 import org.opends.server.util.TimeThread;
+import org.reactivestreams.Processor;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
@@ -229,7 +229,7 @@
         }
 
         connectionID = DirectoryServer.newConnectionAccepted(this);
-        clientContext.addConnectionEventListener(new ConnectionEventListener() {
+        clientContext.addListener(new LDAPClientContextEventListener() {
             @Override
             public void handleConnectionError(LDAPClientContext context, Throwable error) {
                 if (error instanceof LocalizableException) {
@@ -1676,17 +1676,17 @@
     }
 
     /** Upstream -> BlockingBackpressureSubscription -> Downstream. */
-    private final class BlockingBackpressureSubscription
-            implements Subscription, Publisher<Response>, Subscriber<Response> {
-        private long pendingRequests;
-        private final Queue<Response> queue = new LinkedList<>();
-        private final Lock lock = new ReentrantLock();
-        private final Condition spaceAvailable = lock.newCondition();
+    private final class BlockingBackpressureSubscription implements Subscription, Processor<Response, Response> {
+        private final AtomicLong pendingRequests = new AtomicLong();
+        private final AtomicInteger missedDrain = new AtomicInteger();
+        private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(32);
         private final Publisher<Response> upstream;
         private final long writeTimeoutMillis;
-        private boolean upstreamCompleted;
         private Subscription subscription;
         private Subscriber<? super Response> downstream;
+        private volatile boolean done;
+        private Throwable error;
+        private volatile boolean cancelled;
 
         BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
             this.upstream = upstream;
@@ -1697,17 +1697,19 @@
 
         @Override
         public void subscribe(final Subscriber<? super Response> subscriber) {
+            Reject.ifNull(subscriber);
             if (downstream != null) {
+                // This publisher only support one subscriber.
                 return;
             }
             downstream = subscriber;
-            subscriber.onSubscribe(this);
-            upstream.subscribe(this);
+            subscriber.onSubscribe(/* Subscription */ this);
+            upstream.subscribe(/* Subscriber */ this);
         }
 
         @Override
         public void onSubscribe(final Subscription s) {
-            if ( subscription != null) {
+            if (subscription != null) {
                 s.cancel();
                 return;
             }
@@ -1716,91 +1718,117 @@
         }
 
         @Override
-        public void request(long n) {
-            lock.lock();
-            try {
-                if (pendingRequests != Long.MIN_VALUE) {
-                    pendingRequests += n;
-                    drain();
+        public void request(final long n) {
+            if (n == Long.MAX_VALUE) {
+                pendingRequests.set(Long.MAX_VALUE);
+            } else {
+                // There is a known and accepted problem here regarding reactive-stream contract in the sense that
+                // we're not supporting pendingRequests overflow (pendingRequests + n > Long.MAX_VALUE) for performance
+                // reason since this should never happen in the context we're using it.
+                pendingRequests.addAndGet(n);
+            }
+            drain();
+        }
+
+        // Taken from
+        // https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation
+        private void drain() {
+            if (missedDrain.getAndIncrement() != 0) {
+                // Another thread is already executing this drain method.
+                return;
+            }
+
+            int missed = 1;
+
+            for (;;) {
+                final long immutablePendingRequests = pendingRequests.get();
+                long emitted = 0L;
+                while (emitted != immutablePendingRequests) {
+                    // Check if we should early exit because of cancellation
+                    if (cancelled) {
+                        return;
+                    }
+
+                    final Response response = queue.poll();
+                    if (response != null) {
+                        downstream.onNext(response);
+                        emitted++;
+                    } else if (done) {
+                        // queue is empty and we received a completion (onError/onComplete) notification from upstream
+                        forwardDoneEvent();
+                        return;
+                    } else {
+                        // Queue is empty but upstream is not done yet.
+                        break;
+                    }
                 }
-            } finally {
-                lock.unlock();
+
+                // Check if an onError/onComplete from upstream arrived.
+                if (emitted == immutablePendingRequests) {
+                    if (cancelled) {
+                        return;
+                    }
+
+                    if (done && queue.isEmpty()) {
+                        forwardDoneEvent();
+                        return;
+                    }
+                }
+
+                if (emitted != 0) {
+                    pendingRequests.addAndGet(-emitted);
+                }
+
+                // Check to see if another thread asked for drain
+                missed = missedDrain.addAndGet(-missed);
+                if (missed == 0) {
+                    // Nop, we can exit.
+                    break;
+                }
             }
         }
 
-        private void drain() {
-            Response response;
-            try {
-                while (pendingRequests > 0 && (response = queue.poll()) != null) {
-                    downstream.onNext(response);
-                    // Forward response
-                    pendingRequests--;
-                }
-                if (upstreamCompleted && queue.isEmpty()) {
-                    if (pendingRequests != Long.MIN_VALUE) {
-                        downstream.onComplete();
-                    }
-                    cancel();
-                }
-            } finally {
-                spaceAvailable.signalAll();
+        private void forwardDoneEvent() {
+            final Throwable immutableError = error;
+            if (immutableError != null) {
+                downstream.onError(immutableError);
+            } else {
+                downstream.onComplete();
             }
         }
 
         @Override
         public void onNext(final Response response) {
-            lock.lock();
             try {
-                while (queue.size() >= 32) {
-                    try {
-                        if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
-                            // If we've gotten here, then the write timed out.
-                            downstream.onError(new ClosedChannelException());
-                            cancel();
-                            return;
-                        }
-                    } catch (InterruptedException e) {
-                        downstream.onError(e);
-                        cancel();
-                        return;
-                    }
+                if (queue.offer(response, writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
+                    drain();
+                } else {
+                    // If we've gotten here, then the write timed out.
+                    onError(new ClosedChannelException().fillInStackTrace());
+                    return;
                 }
-                queue.add(response);
-                drain();
-            } finally {
-                lock.unlock();
+            } catch (InterruptedException e) {
+                onError(e);
             }
         }
 
         @Override
-        public void onError(Throwable t) {
-            downstream.onError(t);
-            cancel();
+        public void onError(final Throwable error) {
+            this.error = error;
+            done = true;
+            drain();
         }
 
         @Override
         public void onComplete() {
-            lock.lock();
-            try {
-                // the onComplete() will be forwarded downstream once the pending messages have been flushed
-                upstreamCompleted = true;
-                drain();
-            } finally {
-                lock.unlock();
-            }
+            done = true;
+            drain();
         }
 
         @Override
         public void cancel() {
-            lock.lock();
-            try {
-                pendingRequests = Long.MIN_VALUE;
-            } finally {
-                lock.unlock();
-            }
-            if (subscription != null) {
-                subscription.cancel();
-            }
+            cancelled = true;
+            subscription.cancel();
         }
     }
 }
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index 16421e9..07a8cd4 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -53,7 +53,7 @@
 import org.forgerock.opendj.ldap.AddressMask;
 import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
 import org.forgerock.opendj.ldap.LDAPListener;
 import org.forgerock.opendj.ldap.LdapException;
 import org.forgerock.opendj.ldap.ResultCode;
@@ -641,7 +641,7 @@
                             LDAPClientContext clientContext) throws LdapException {
                         final LDAPClientConnection2 conn = canAccept(clientContext);
                         clientConnections.add(conn);
-                        clientContext.addConnectionEventListener(new ConnectionEventListener() {
+                        clientContext.addListener(new LDAPClientContextEventListener() {
                             @Override
                             public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
                                 clientConnections.remove(conn);

--
Gitblit v1.10.0