mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
18.17.2016 2a88ad00862b2241f3b87ef8d4db383c69b54e3a
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

* Change LDAPClientContext.ConnectionEventListener to
LDAPClientContextEventListener
* New algorithm for BlockingBackpressure to prevent too many recursion
* Cleanup.
1 files added
8 files modified
404 ■■■■■ changed files
opendj-core/clirr-ignored-api-changes.xml 2 ●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Completable.java 27 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java 15 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java 4 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java 53 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java 58 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 55 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 186 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java 4 ●●●● patch | view | raw | blame | history
opendj-core/clirr-ignored-api-changes.xml
@@ -173,7 +173,7 @@
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
    <differenceType>7012</differenceType>
    <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContext$ConnectionEventListener)</method>
    <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContextEventListener)</method>
    <justification>Allows to register connection state listener</justification>
  </difference>
  <difference>
opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -63,14 +63,14 @@
    Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
    /**
     * Subscribe to the result of this {@link Completable}.
     * Returns a {@link Completable} instance that calls the given onTerminate callback after this Completable completes
     * normally or with an exception.
     *
     * @param completeAction
     *            An {@link Action} which will be invoked on successful completion
     * @param errorConsumer
     *            A {@link Consumer} which will be invoked on error
     * @param onAfterTerminate
     *            the callback to call after this {@link Completable} terminates
     * @return the new {@link Completable} instance
     */
    void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
    Completable doAfterTerminate(Action onAfterTerminate);
    /**
     * Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
@@ -82,4 +82,19 @@
     * @return A new {@link Single}
     */
    <V> Single<V> toSingle(V value);
    /**
     * Subscribe to the result of this {@link Completable}.
     *
     * @param completeAction
     *            An {@link Action} which will be invoked on successful completion
     * @param errorConsumer
     *            A {@link Consumer} which will be invoked on error
     */
    void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
    /**
     * Subscribes to this {@link Completable}.
     */
    void subscribe();
}
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -379,5 +379,20 @@
                }
            });
        }
        @Override
        public Completable doAfterTerminate(final Action onTerminate) {
            return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.functions.Action() {
                @Override
                public void run() throws Exception {
                    onTerminate.run();
                }
            }));
        }
        @Override
        public void subscribe() {
            impl.subscribe();
        }
    }
}
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -28,7 +28,7 @@
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LdapResultHandler;
import org.forgerock.opendj.ldap.ResultCode;
@@ -121,7 +121,7 @@
                final ServerConnection<Integer> serverConnection) {
            this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
            this.adaptee = checkNotNull(serverConnection, "serverConnection must not be null");
            clientContext.addConnectionEventListener(new ConnectionEventListener() {
            clientContext.addListener(new LDAPClientContextEventListener() {
                @Override
                public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
                    adaptee.handleConnectionError(error);
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -18,13 +18,11 @@
package org.forgerock.opendj.ldap;
import java.net.InetSocketAddress;
import java.util.EventListener;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import com.forgerock.reactive.Completable;
@@ -37,62 +35,25 @@
 */
public interface LDAPClientContext {
    /** Listens for disconnection event. */
    public interface ConnectionEventListener extends EventListener {
        /**
         * Invoked when the connection has been disconnected because of an error (e.g: message too big).
         *
         * @param context
         *            The {@link LDAPClientContext} which has failed
         * @param error
         *            The error
         */
        void handleConnectionError(LDAPClientContext context, Throwable error);
        /**
         * Invoked when the client closed the connection, possibly using an unbind request.
         *
         * @param context
         *            The {@link LDAPClientContext} which has been disconnected
         * @param unbindRequest
         *            The unbind request, which may be {@code null} if one was not sent before the connection was
         *            closed.
         */
        void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
        /**
         * Invoked when the connection has been disconnected by the server.
         *
         * @param context
         *            The {@link LDAPClientContext} which has been disconnected
         * @param resultCode
         *            The result code which was included with the disconnect notification, or {@code null} if no
         *            disconnect notification was sent.
         * @param diagnosticMessage
         *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
         */
        void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
    }
    /**
     * Register a listener which will be notified when this {@link LDAPClientContext} is disconnected.
     * Register a listener which will be notified when this {@link LDAPClientContext} changes state.
     *
     * @param listener The {@link ConnectionEventListener} to register.
     * @param listener The {@link LDAPClientContextEventListener} to register.
     */
    void addConnectionEventListener(ConnectionEventListener listener);
    void addListener(LDAPClientContextEventListener listener);
    /**
     * Disconnects the client without sending a disconnect notification. Invoking this method causes
     * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
     * before this method returns.
     * {@link LDAPClientContextEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be
     * called before this method returns.
     */
    void disconnect();
    /**
     * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
     * message. Invoking this method causes
     * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
     * before this method returns.
     * {@link LDAPClientContextEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be
     * called before this method returns.
     *
     * @param resultCode
     *            The result code to include with the disconnect notification
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java
New file
@@ -0,0 +1,58 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import java.util.EventListener;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
/** A listener interface for handling LDAPClientContext state changes. */
public interface LDAPClientContextEventListener extends EventListener {
    /**
     * Invoked when the connection has been disconnected because of an error (e.g: message too big).
     *
     * @param context
     *            The {@link LDAPClientContext} which has failed
     * @param error
     *            The error
     */
    void handleConnectionError(LDAPClientContext context, Throwable error);
    /**
     * Invoked when the client closed the connection, possibly using an unbind request.
     *
     * @param context
     *            The {@link LDAPClientContext} which has been disconnected
     * @param unbindRequest
     *            The unbind request, which may be {@code null} if one was not sent before the connection was
     *            closed.
     */
    void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
    /**
     * Invoked when the connection has been disconnected by the server.
     *
     * @param context
     *            The {@link LDAPClientContext} which has been disconnected
     * @param resultCode
     *            The result code which was included with the disconnect notification, or {@code null} if no
     *            disconnect notification was sent.
     * @param diagnosticMessage
     *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
     */
    void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
}
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -41,6 +41,7 @@
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
@@ -76,7 +77,6 @@
import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
@@ -165,7 +165,7 @@
            @Override
            public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
                if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
                    clientContext.notifyConnectionClosedSilently(rawRequest);
                    clientContext.notifyConnectionClosedRawUnbind(rawRequest);
                    return emptyStream();
                }
                Stream<Response> response;
@@ -192,7 +192,7 @@
        }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
            @Override
            public Publisher<Void> apply(Throwable error) throws Exception {
                clientContext.notifyErrorSilently(error);
                clientContext.notifyConnectionError(error);
                // Swallow the error to prevent the subscribe() below to report it on the console.
                return emptyStream();
            }
@@ -306,7 +306,7 @@
        private final Connection<?> connection;
        private volatile boolean isClosed;
        private final List<ConnectionEventListener> connectionEventListeners = new LinkedList<>();
        private final List<LDAPClientContextEventListener> connectionEventListeners = new LinkedList<>();
        private SaslServer saslServer;
        private GrizzlyBackpressureSubscription downstream;
@@ -341,7 +341,7 @@
            if (immutableRef != null) {
                immutableRef.onComplete();
            }
            notifyConnectionClosedSilently(null);
            notifyConnectionClosedRawUnbind(null);
            return ctx.getStopAction();
        }
@@ -487,7 +487,7 @@
        }
        @Override
        public void addConnectionEventListener(ConnectionEventListener listener) {
        public void addListener(final LDAPClientContextEventListener listener) {
            Reject.ifNull(listener, "listener must not be null");
            synchronized (connectionEventListeners) {
                connectionEventListeners.add(listener);
@@ -496,7 +496,7 @@
        @Override
        public void disconnect() {
            notifySilentlyConnectionDisconnected(null, null);
            notifyConnectionDisconnected(null, null);
            connection.closeSilently();
        }
@@ -507,46 +507,41 @@
        @Override
        public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
            notifySilentlyConnectionDisconnected(resultCode, diagnosticMessage);
            notifyConnectionDisconnected(resultCode, diagnosticMessage);
            sendUnsolicitedNotification(
                    newGenericExtendedResult(resultCode)
                        .setOID(OID_NOTICE_OF_DISCONNECTION)
                        .setDiagnosticMessage(diagnosticMessage)
            ).subscribe(new Action() {
            ).doAfterTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    closeConnection();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(final Throwable error) throws Exception {
                    closeConnection();
                }
            });
            }).subscribe();
        }
        private void notifyConnectionClosedSilently(final LdapRequestEnvelope unbindRequest) {
        private void notifyConnectionClosedRawUnbind(final LdapRequestEnvelope rawUnbindRequest) {
            // Close this connection context.
            if (unbindRequest == null) {
                notifySilentlyConnectionClosed(null);
            if (rawUnbindRequest == null) {
                notifyConnectionClosed(null);
            } else {
                try {
                    LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
                    LDAP.getReader(rawUnbindRequest.getContent(), new DecodeOptions())
                            .readMessage(new AbstractLDAPMessageHandler() {
                                @Override
                                public void unbindRequest(int messageID, UnbindRequest unbindRequest)
                                public void unbindRequest(final int messageID, final UnbindRequest unbindRequest)
                                        throws DecodeException, IOException {
                                    notifySilentlyConnectionClosed(unbindRequest);
                                    notifyConnectionClosed(unbindRequest);
                                }
                            });
                } catch (Exception e) {
                    notifySilentlyConnectionClosed(null);
                    notifyConnectionClosed(null);
                }
            }
        }
        private void notifySilentlyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
            for (final ConnectionEventListener listener : getAndClearListeners()) {
        private void notifyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
            for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
                try {
                    listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
                } catch (Exception e) {
@@ -555,8 +550,8 @@
            }
        }
        private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
            for (final ConnectionEventListener listener : getAndClearListeners()) {
        private void notifyConnectionClosed(final UnbindRequest unbindRequest) {
            for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
                try {
                    listener.handleConnectionClosed(this, unbindRequest);
                } catch (Exception e) {
@@ -565,8 +560,8 @@
            }
        }
        private void notifyErrorSilently(final Throwable error) {
            for (final ConnectionEventListener listener : getAndClearListeners()) {
        private void notifyConnectionError(final Throwable error) {
            for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
                try {
                    listener.handleConnectionError(this, error);
                } catch (Exception e) {
@@ -575,9 +570,9 @@
            }
        }
        private ArrayList<ConnectionEventListener> getAndClearListeners() {
        private List<LDAPClientContextEventListener> getAndClearListeners() {
            synchronized (connectionEventListeners) {
                final ArrayList<ConnectionEventListener> listeners = new ArrayList<>(connectionEventListeners);
                final ArrayList<LDAPClientContextEventListener> listeners = new ArrayList<>(connectionEventListeners);
                connectionEventListeners.clear();
                return listeners;
            }
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -32,16 +32,14 @@
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -56,7 +54,7 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -64,6 +62,7 @@
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.util.Reject;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.AbandonOperationBasis;
@@ -107,6 +106,7 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.util.TimeThread;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -229,7 +229,7 @@
        }
        connectionID = DirectoryServer.newConnectionAccepted(this);
        clientContext.addConnectionEventListener(new ConnectionEventListener() {
        clientContext.addListener(new LDAPClientContextEventListener() {
            @Override
            public void handleConnectionError(LDAPClientContext context, Throwable error) {
                if (error instanceof LocalizableException) {
@@ -1676,17 +1676,17 @@
    }
    /** Upstream -> BlockingBackpressureSubscription -> Downstream. */
    private final class BlockingBackpressureSubscription
            implements Subscription, Publisher<Response>, Subscriber<Response> {
        private long pendingRequests;
        private final Queue<Response> queue = new LinkedList<>();
        private final Lock lock = new ReentrantLock();
        private final Condition spaceAvailable = lock.newCondition();
    private final class BlockingBackpressureSubscription implements Subscription, Processor<Response, Response> {
        private final AtomicLong pendingRequests = new AtomicLong();
        private final AtomicInteger missedDrain = new AtomicInteger();
        private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(32);
        private final Publisher<Response> upstream;
        private final long writeTimeoutMillis;
        private boolean upstreamCompleted;
        private Subscription subscription;
        private Subscriber<? super Response> downstream;
        private volatile boolean done;
        private Throwable error;
        private volatile boolean cancelled;
        BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
            this.upstream = upstream;
@@ -1697,17 +1697,19 @@
        @Override
        public void subscribe(final Subscriber<? super Response> subscriber) {
            Reject.ifNull(subscriber);
            if (downstream != null) {
                // This publisher only support one subscriber.
                return;
            }
            downstream = subscriber;
            subscriber.onSubscribe(this);
            upstream.subscribe(this);
            subscriber.onSubscribe(/* Subscription */ this);
            upstream.subscribe(/* Subscriber */ this);
        }
        @Override
        public void onSubscribe(final Subscription s) {
            if ( subscription != null) {
            if (subscription != null) {
                s.cancel();
                return;
            }
@@ -1716,91 +1718,117 @@
        }
        @Override
        public void request(long n) {
            lock.lock();
            try {
                if (pendingRequests != Long.MIN_VALUE) {
                    pendingRequests += n;
                    drain();
        public void request(final long n) {
            if (n == Long.MAX_VALUE) {
                pendingRequests.set(Long.MAX_VALUE);
            } else {
                // There is a known and accepted problem here regarding reactive-stream contract in the sense that
                // we're not supporting pendingRequests overflow (pendingRequests + n > Long.MAX_VALUE) for performance
                // reason since this should never happen in the context we're using it.
                pendingRequests.addAndGet(n);
            }
            drain();
        }
        // Taken from
        // https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation
        private void drain() {
            if (missedDrain.getAndIncrement() != 0) {
                // Another thread is already executing this drain method.
                return;
            }
            int missed = 1;
            for (;;) {
                final long immutablePendingRequests = pendingRequests.get();
                long emitted = 0L;
                while (emitted != immutablePendingRequests) {
                    // Check if we should early exit because of cancellation
                    if (cancelled) {
                        return;
                    }
                    final Response response = queue.poll();
                    if (response != null) {
                        downstream.onNext(response);
                        emitted++;
                    } else if (done) {
                        // queue is empty and we received a completion (onError/onComplete) notification from upstream
                        forwardDoneEvent();
                        return;
                    } else {
                        // Queue is empty but upstream is not done yet.
                        break;
                    }
                }
            } finally {
                lock.unlock();
                // Check if an onError/onComplete from upstream arrived.
                if (emitted == immutablePendingRequests) {
                    if (cancelled) {
                        return;
                    }
                    if (done && queue.isEmpty()) {
                        forwardDoneEvent();
                        return;
                    }
                }
                if (emitted != 0) {
                    pendingRequests.addAndGet(-emitted);
                }
                // Check to see if another thread asked for drain
                missed = missedDrain.addAndGet(-missed);
                if (missed == 0) {
                    // Nop, we can exit.
                    break;
                }
            }
        }
        private void drain() {
            Response response;
            try {
                while (pendingRequests > 0 && (response = queue.poll()) != null) {
                    downstream.onNext(response);
                    // Forward response
                    pendingRequests--;
                }
                if (upstreamCompleted && queue.isEmpty()) {
                    if (pendingRequests != Long.MIN_VALUE) {
                        downstream.onComplete();
                    }
                    cancel();
                }
            } finally {
                spaceAvailable.signalAll();
        private void forwardDoneEvent() {
            final Throwable immutableError = error;
            if (immutableError != null) {
                downstream.onError(immutableError);
            } else {
                downstream.onComplete();
            }
        }
        @Override
        public void onNext(final Response response) {
            lock.lock();
            try {
                while (queue.size() >= 32) {
                    try {
                        if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
                            // If we've gotten here, then the write timed out.
                            downstream.onError(new ClosedChannelException());
                            cancel();
                            return;
                        }
                    } catch (InterruptedException e) {
                        downstream.onError(e);
                        cancel();
                        return;
                    }
                if (queue.offer(response, writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
                    drain();
                } else {
                    // If we've gotten here, then the write timed out.
                    onError(new ClosedChannelException().fillInStackTrace());
                    return;
                }
                queue.add(response);
                drain();
            } finally {
                lock.unlock();
            } catch (InterruptedException e) {
                onError(e);
            }
        }
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
            cancel();
        public void onError(final Throwable error) {
            this.error = error;
            done = true;
            drain();
        }
        @Override
        public void onComplete() {
            lock.lock();
            try {
                // the onComplete() will be forwarded downstream once the pending messages have been flushed
                upstreamCompleted = true;
                drain();
            } finally {
                lock.unlock();
            }
            done = true;
            drain();
        }
        @Override
        public void cancel() {
            lock.lock();
            try {
                pendingRequests = Long.MIN_VALUE;
            } finally {
                lock.unlock();
            }
            if (subscription != null) {
                subscription.cancel();
            }
            cancelled = true;
            subscription.cancel();
        }
    }
}
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -53,7 +53,7 @@
import org.forgerock.opendj.ldap.AddressMask;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
@@ -641,7 +641,7 @@
                            LDAPClientContext clientContext) throws LdapException {
                        final LDAPClientConnection2 conn = canAccept(clientContext);
                        clientConnections.add(conn);
                        clientContext.addConnectionEventListener(new ConnectionEventListener() {
                        clientContext.addListener(new LDAPClientContextEventListener() {
                            @Override
                            public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
                                clientConnections.remove(conn);