From 5e9caa5055a78b930500690cb919effd5c8bc2b1 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Fri, 18 Nov 2016 10:31:28 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-core/src/main/java/com/forgerock/reactive/Completable.java                           |   10 +
 opendj-core/clirr-ignored-api-changes.xml                                                   |    7 +
 opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java                  |    5 
 opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java                         |   15 ++
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java             |  207 +++++++++++++++++++++-------------
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java |   75 +++++++----
 6 files changed, 212 insertions(+), 107 deletions(-)

diff --git a/opendj-core/clirr-ignored-api-changes.xml b/opendj-core/clirr-ignored-api-changes.xml
index 6a629c1..1d6d8ce 100644
--- a/opendj-core/clirr-ignored-api-changes.xml
+++ b/opendj-core/clirr-ignored-api-changes.xml
@@ -232,4 +232,11 @@
     <method>org.forgerock.opendj.io.LDAPWriter getWriter(org.forgerock.opendj.io.ASN1Writer)</method>
     <justification>Add support for LdapV2 encoding</justification>
   </difference>
+  <difference>
+    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
+    <differenceType>7006</differenceType>
+    <method>void sendUnsolicitedNotification(org.forgerock.opendj.ldap.responses.ExtendedResult)</method>
+    <to>com.forgerock.reactive.Completable</to>
+    <justification>Return a completable so that operation can be chained (i.e: closing connection)</justification>
+  </difference>
 </differences>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index dabe352..3f12e71 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -63,6 +63,16 @@
     Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
 
     /**
+     * Subscribe to the result of this {@link Completable}.
+     *
+     * @param completeAction
+     *            An {@link Action} which will be invoked on successful completion
+     * @param errorConsumer
+     *            A {@link Consumer} which will be invoked on error
+     */
+    void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
+
+    /**
      * Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
      *
      * @param <V>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index de0d8bf..5944468 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -364,5 +364,20 @@
                         }
                     }));
         }
+
+        @Override
+        public void subscribe(final Action completeAction, final Consumer<Throwable> errorConsumer) {
+            impl.subscribe(new io.reactivex.functions.Action() {
+                @Override
+                public void run() throws Exception {
+                    completeAction.run();
+                }
+            }, new io.reactivex.functions.Consumer<Throwable>() {
+                @Override
+                public void accept(final Throwable error) throws Exception {
+                    errorConsumer.accept(error);
+                }
+            });
+        }
     }
 }
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
index 83d4cf5..47548ef 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -27,6 +27,8 @@
 import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
 
+import com.forgerock.reactive.Completable;
+
 /**
  * An LDAP client which has connected to a {@link ServerConnectionFactory}. An
  * LDAP client context can be used to query information about the client's
@@ -169,8 +171,9 @@
      *
      * @param notification
      *            The notification to send.
+     * @return A {@link Completable} which will be completed once the notification has been sent.
      */
-    void sendUnsolicitedNotification(ExtendedResult notification);
+    Completable sendUnsolicitedNotification(ExtendedResult notification);
 
     /**
      * Installs the TLS/SSL security layer on the underlying connection. The TLS/SSL security layer will be installed
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 0fcaa00..7849e9e 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -19,13 +19,15 @@
 import static com.forgerock.reactive.RxJavaStreams.*;
 import static org.forgerock.opendj.grizzly.GrizzlyUtils.configureConnection;
 import static org.forgerock.opendj.io.LDAP.*;
+import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult;
 import static org.forgerock.opendj.ldap.spi.LdapMessages.newResponseMessage;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.ssl.SSLEngine;
@@ -46,7 +48,6 @@
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
 import org.forgerock.opendj.ldap.responses.IntermediateResponse;
 import org.forgerock.opendj.ldap.responses.Response;
-import org.forgerock.opendj.ldap.responses.Responses;
 import org.forgerock.opendj.ldap.responses.Result;
 import org.forgerock.opendj.ldap.responses.SearchResultEntry;
 import org.forgerock.opendj.ldap.responses.SearchResultReference;
@@ -55,7 +56,12 @@
 import org.forgerock.util.Function;
 import org.forgerock.util.Options;
 import org.forgerock.util.Reject;
+import org.forgerock.util.promise.ExceptionHandler;
+import org.forgerock.util.promise.PromiseImpl;
+import org.forgerock.util.promise.ResultHandler;
+import org.forgerock.util.promise.RuntimeExceptionHandler;
 import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.EmptyCompletionHandler;
 import org.glassfish.grizzly.filterchain.BaseFilter;
 import org.glassfish.grizzly.filterchain.Filter;
 import org.glassfish.grizzly.filterchain.FilterChain;
@@ -70,6 +76,7 @@
 
 import com.forgerock.reactive.Action;
 import com.forgerock.reactive.Completable;
+import com.forgerock.reactive.Consumer;
 import com.forgerock.reactive.ReactiveHandler;
 import com.forgerock.reactive.Stream;
 
@@ -163,7 +170,7 @@
             @Override
             public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
                 if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
-                    clientContext.notifyConnectionClosed(rawRequest);
+                    clientContext.notifyConnectionClosedSilently(rawRequest);
                     return emptyStream();
                 }
                 Stream<Response> response;
@@ -190,14 +197,14 @@
         }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
             @Override
             public Publisher<Void> apply(Throwable error) throws Exception {
-                clientContext.notifyErrorAndCloseSilently(error);
+                clientContext.notifyErrorSilently(error);
                 // Swallow the error to prevent the subscribe() below to report it on the console.
                 return emptyStream();
             }
         }).onComplete(new Action() {
             @Override
             public void run() throws Exception {
-                clientContext.notifyConnectionClosed(null);
+                connection.closeSilently();
             }
         }).subscribe();
         return ctx.getStopAction();
@@ -303,8 +310,8 @@
         }
 
         private final Connection<?> connection;
-        private final AtomicBoolean isClosed = new AtomicBoolean(false);
-        private final List<ConnectionEventListener> listeners = new LinkedList<>();
+        private volatile boolean isClosed;
+        private final List<ConnectionEventListener> connectionEventListeners = new LinkedList<>();
         private SaslServer saslServer;
         private GrizzlyBackpressureSubscription downstream;
 
@@ -334,12 +341,12 @@
 
         @Override
         public NextAction handleClose(final FilterChainContext ctx) {
-            if (isClosed.compareAndSet(false, true)) {
-                final GrizzlyBackpressureSubscription immutableRef = downstream;
-                if (immutableRef != null) {
-                    downstream.onComplete();
-                }
+            isClosed = true;
+            final GrizzlyBackpressureSubscription immutableRef = downstream;
+            if (immutableRef != null) {
+                immutableRef.onComplete();
             }
+            notifyConnectionClosedSilently(null);
             return ctx.getStopAction();
         }
 
@@ -489,108 +496,152 @@
         @Override
         public void addConnectionEventListener(ConnectionEventListener listener) {
             Reject.ifNull(listener, "listener must not be null");
-            listeners.add(listener);
+            synchronized (connectionEventListeners) {
+                connectionEventListeners.add(listener);
+            }
         }
 
         @Override
         public void disconnect() {
-            if (isClosed.compareAndSet(false, true)) {
-                try {
-                    for (ConnectionEventListener listener : listeners) {
-                        listener.handleConnectionDisconnected(this, null, null);
-                    }
-                } finally {
-                    closeConnection();
-                }
-            }
+            notifySilentlyConnectionDisconnected(null, null);
+            connection.closeSilently();
         }
 
         private void closeConnection() {
-            if (downstream != null) {
-                downstream.cancel();
-                downstream = null;
-            }
+            downstream.cancel();
             connection.closeSilently();
         }
 
         @Override
         public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
-            // Close this connection context.
-            if (isClosed.compareAndSet(false, true)) {
-                sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
-                                                     .setOID(OID_NOTICE_OF_DISCONNECTION)
-                                                     .setDiagnosticMessage(diagnosticMessage));
-                try {
-                    for (ConnectionEventListener listener : listeners) {
-                        listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
-                    }
-                } finally {
+            notifySilentlyConnectionDisconnected(resultCode, diagnosticMessage);
+            sendUnsolicitedNotification(
+                    newGenericExtendedResult(resultCode)
+                        .setOID(OID_NOTICE_OF_DISCONNECTION)
+                        .setDiagnosticMessage(diagnosticMessage)
+            ).subscribe(new Action() {
+                @Override
+                public void run() throws Exception {
                     closeConnection();
                 }
+            }, new Consumer<Throwable>() {
+                @Override
+                public void accept(final Throwable error) throws Exception {
+                    closeConnection();
+                }
+            });
+        }
+
+        private void notifyConnectionClosedSilently(final LdapRequestEnvelope unbindRequest) {
+            // Close this connection context.
+            if (unbindRequest == null) {
+                notifySilentlyConnectionClosed(null);
+            } else {
+                try {
+                    LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
+                            .readMessage(new AbstractLDAPMessageHandler() {
+                                @Override
+                                public void unbindRequest(int messageID, UnbindRequest unbindRequest)
+                                        throws DecodeException, IOException {
+                                    notifySilentlyConnectionClosed(unbindRequest);
+                                }
+                            });
+                } catch (Exception e) {
+                    notifySilentlyConnectionClosed(null);
+                }
             }
         }
 
-        private void notifyConnectionClosed(final LdapRequestEnvelope unbindRequest) {
-            // Close this connection context.
-            if (isClosed.compareAndSet(false, true)) {
-                if (unbindRequest == null) {
-                    notifySilentlyConnectionClosed(null);
-                } else {
-                    try {
-                        LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
-                                .readMessage(new AbstractLDAPMessageHandler() {
-                                    @Override
-                                    public void unbindRequest(int messageID, UnbindRequest unbindRequest)
-                                            throws DecodeException, IOException {
-                                        notifySilentlyConnectionClosed(unbindRequest);
-                                    }
-                                });
-                    } catch (Exception e) {
-                        notifySilentlyConnectionClosed(null);
-                    }
+        private void notifySilentlyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
+            for (final ConnectionEventListener listener : getAndClearListeners()) {
+                try {
+                    listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
+                } catch (Exception e) {
+                    logger.traceException(e);
                 }
             }
         }
 
         private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
-            try {
-                for (final ConnectionEventListener listener : listeners) {
-                    try {
-                        listener.handleConnectionClosed(this, unbindRequest);
-                    } catch (Exception e) {
-                        logger.traceException(e);
-                    }
+            for (final ConnectionEventListener listener : getAndClearListeners()) {
+                try {
+                    listener.handleConnectionClosed(this, unbindRequest);
+                } catch (Exception e) {
+                    logger.traceException(e);
                 }
-            } finally {
-                closeConnection();
             }
         }
 
-        private void notifyErrorAndCloseSilently(final Throwable error) {
-            // Close this connection context.
-            if (isClosed.compareAndSet(false, true)) {
+        private void notifyErrorSilently(final Throwable error) {
+            for (final ConnectionEventListener listener : getAndClearListeners()) {
                 try {
-                    for (final ConnectionEventListener listener : listeners) {
-                        try {
-                            listener.handleConnectionError(this, error);
-                        } catch (Exception e) {
-                            logger.traceException(e);
-                        }
-                    }
-                } finally {
-                    closeConnection();
+                    listener.handleConnectionError(this, error);
+                } catch (Exception e) {
+                    logger.traceException(e);
                 }
             }
         }
 
+        private ArrayList<ConnectionEventListener> getAndClearListeners() {
+            synchronized (connectionEventListeners) {
+                final ArrayList<ConnectionEventListener> listeners = new ArrayList<>(connectionEventListeners);
+                connectionEventListeners.clear();
+                return listeners;
+            }
+        }
+
         @Override
         public boolean isClosed() {
-            return isClosed.get();
+            return isClosed;
         }
 
+        @SuppressWarnings({ "unchecked", "rawtypes" })
         @Override
-        public void sendUnsolicitedNotification(final ExtendedResult notification) {
-            connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification));
+        public Completable sendUnsolicitedNotification(final ExtendedResult notification) {
+            // We use a promise so that the notification is sent even if the Completable is not subscribed.
+            final PromiseImpl<Boolean, Exception> promise = PromiseImpl.create();
+            connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification),
+                    new EmptyCompletionHandler() {
+                        @Override
+                        public void cancelled() {
+                            promise.handleException(new CancellationException());
+                        }
+
+                        @Override
+                        public void failed(Throwable throwable) {
+                            if (throwable instanceof Exception) {
+                                promise.handleException((Exception) throwable);
+                            } else {
+                                promise.handleException(new Exception(throwable));
+                            }
+                        }
+
+                        @Override
+                        public void completed(Object result) {
+                            promise.handleResult(Boolean.TRUE);
+                        }
+                    });
+            return newCompletable(new Completable.Emitter() {
+                @Override
+                public void subscribe(final Completable.Subscriber s) throws Exception {
+                    promise.thenOnResult(new ResultHandler<Boolean>() {
+                        @Override
+                        public void handleResult(Boolean result) {
+                            s.onComplete();
+                        }
+                    }).thenOnException(new ExceptionHandler<Exception>() {
+                        @Override
+                        public void handleException(Exception exception) {
+                            s.onError(exception);
+                        }
+                    }).thenOnRuntimeException(new RuntimeExceptionHandler() {
+                        @Override
+                        public void handleRuntimeException(RuntimeException exception) {
+                            s.onError(exception);
+                        }
+                    });
+                }
+            });
         }
     }
 }
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index bab468d..a753eee 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -958,27 +958,29 @@
      */
     @Override
     public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
-        return streamFromPublisher(new BlockingBackpressureSubscription(
-                Flowable.create(new FlowableOnSubscribe<Response>() {
-                    @Override
-                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
-                        try {
-                            processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
-                        } finally {
-                            // We don't need the ASN1Reader anymore.
-                            closeSilently(message.getContent());
-                        }
-                    }
-                }, BackpressureStrategy.ERROR)))
-                .onNext(new Consumer<Response>() {
-                    @Override
-                    public void accept(final Response response) throws Exception {
-                        if (keepStats) {
-                            statTracker.updateMessageWritten(
-                                    toLdapResponseType(message, response), message.getMessageId());
-                        }
-                    }
-                });
+        return streamFromPublisher(
+                new BlockingBackpressureSubscription(connectionHandler.getMaxBlockedWriteTimeLimit(),
+                        Flowable.create(new FlowableOnSubscribe<Response>() {
+                            @Override
+                            public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
+                                try {
+                                    processLDAPMessage(
+                                            queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+                                } finally {
+                                    // We don't need the ASN1Reader anymore.
+                                    closeSilently(message.getContent());
+                                }
+                            }
+                        }, BackpressureStrategy.ERROR)))
+                        .onNext(new Consumer<Response>() {
+                            @Override
+                            public void accept(final Response response) throws Exception {
+                                if (keepStats) {
+                                    statTracker.updateMessageWritten(
+                                            toLdapResponseType(message, response), message.getMessageId());
+                                }
+                            }
+                        });
     }
 
     private final byte toLdapResultType(final byte requestType) {
@@ -1682,14 +1684,15 @@
         private final Condition spaceAvailable = lock.newCondition();
         private final Publisher<Response> upstream;
         private final long writeTimeoutMillis;
+        private boolean upstreamCompleted;
         private Subscription subscription;
         private Subscriber<? super Response> downstream;
 
-        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
+        BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
             this.upstream = upstream;
-            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
+            this.writeTimeoutMillis = maxBlockedWriteTimeLimit == 0
                     ? 30000 // Do not wait indefinitely,
-                    : connectionHandler.getMaxBlockedWriteTimeLimit();
+                    : maxBlockedWriteTimeLimit;
         }
 
         @Override
@@ -1733,6 +1736,12 @@
                     // Forward response
                     pendingRequests--;
                 }
+                if (upstreamCompleted && queue.isEmpty()) {
+                    if (pendingRequests != Long.MIN_VALUE) {
+                        downstream.onComplete();
+                    }
+                    cancel();
+                }
             } finally {
                 spaceAvailable.signalAll();
             }
@@ -1771,17 +1780,27 @@
 
         @Override
         public void onComplete() {
-            downstream.onComplete();
-            cancel();
+            lock.lock();
+            try {
+                // the onComplete() will be forwarded downstream once the pending messages have been flushed
+                upstreamCompleted = true;
+                drain();
+            } finally {
+                lock.unlock();
+            }
         }
 
         @Override
         public void cancel() {
+            lock.lock();
+            try {
+                pendingRequests = Long.MIN_VALUE;
+            } finally {
+                lock.unlock();
+            }
             if (subscription != null) {
                 subscription.cancel();
             }
-            queue.clear();
-            pendingRequests = Long.MIN_VALUE;
         }
     }
 }

--
Gitblit v1.10.0