From 2a88ad00862b2241f3b87ef8d4db383c69b54e3a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 24 Nov 2016 13:59:43 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java | 58 ++++++++
opendj-core/src/main/java/com/forgerock/reactive/Completable.java | 27 +++
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java | 4
opendj-core/clirr-ignored-api-changes.xml | 2
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java | 53 +------
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java | 15 ++
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 55 +++----
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java | 4
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 186 +++++++++++++++-----------
9 files changed, 238 insertions(+), 166 deletions(-)
diff --git a/opendj-core/clirr-ignored-api-changes.xml b/opendj-core/clirr-ignored-api-changes.xml
index 1d6d8ce..ab682cc 100644
--- a/opendj-core/clirr-ignored-api-changes.xml
+++ b/opendj-core/clirr-ignored-api-changes.xml
@@ -173,7 +173,7 @@
<difference>
<className>org/forgerock/opendj/ldap/LDAPClientContext</className>
<differenceType>7012</differenceType>
- <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContext$ConnectionEventListener)</method>
+ <method>void addConnectionEventListener(org.forgerock.opendj.ldap.LDAPClientContextEventListener)</method>
<justification>Allows to register connection state listener</justification>
</difference>
<difference>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index 3f12e71..638faba 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -63,14 +63,14 @@
Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
/**
- * Subscribe to the result of this {@link Completable}.
+ * Returns a {@link Completable} instance that calls the given onTerminate callback after this Completable completes
+ * normally or with an exception.
*
- * @param completeAction
- * An {@link Action} which will be invoked on successful completion
- * @param errorConsumer
- * A {@link Consumer} which will be invoked on error
+ * @param onAfterTerminate
+ * the callback to call after this {@link Completable} terminates
+ * @return the new {@link Completable} instance
*/
- void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
+ Completable doAfterTerminate(Action onAfterTerminate);
/**
* Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
@@ -82,4 +82,19 @@
* @return A new {@link Single}
*/
<V> Single<V> toSingle(V value);
+
+ /**
+ * Subscribe to the result of this {@link Completable}.
+ *
+ * @param completeAction
+ * An {@link Action} which will be invoked on successful completion
+ * @param errorConsumer
+ * A {@link Consumer} which will be invoked on error
+ */
+ void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
+
+ /**
+ * Subscribes to this {@link Completable}.
+ */
+ void subscribe();
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 5944468..b3370c3 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -379,5 +379,20 @@
}
});
}
+
+ @Override
+ public Completable doAfterTerminate(final Action onTerminate) {
+ return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.functions.Action() {
+ @Override
+ public void run() throws Exception {
+ onTerminate.run();
+ }
+ }));
+ }
+
+ @Override
+ public void subscribe() {
+ impl.subscribe();
+ }
}
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
index e185627..0cb33bf 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -28,7 +28,7 @@
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LdapResultHandler;
import org.forgerock.opendj.ldap.ResultCode;
@@ -121,7 +121,7 @@
final ServerConnection<Integer> serverConnection) {
this.decodeOptions = checkNotNull(decodeOptions, "decodeOptions must not be null");
this.adaptee = checkNotNull(serverConnection, "serverConnection must not be null");
- clientContext.addConnectionEventListener(new ConnectionEventListener() {
+ clientContext.addListener(new LDAPClientContextEventListener() {
@Override
public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
adaptee.handleConnectionError(error);
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
index 47548ef..3fec4e2 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -18,13 +18,11 @@
package org.forgerock.opendj.ldap;
import java.net.InetSocketAddress;
-import java.util.EventListener;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
-import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import com.forgerock.reactive.Completable;
@@ -37,62 +35,25 @@
*/
public interface LDAPClientContext {
- /** Listens for disconnection event. */
- public interface ConnectionEventListener extends EventListener {
- /**
- * Invoked when the connection has been disconnected because of an error (e.g: message too big).
- *
- * @param context
- * The {@link LDAPClientContext} which has failed
- * @param error
- * The error
- */
- void handleConnectionError(LDAPClientContext context, Throwable error);
-
- /**
- * Invoked when the client closed the connection, possibly using an unbind request.
- *
- * @param context
- * The {@link LDAPClientContext} which has been disconnected
- * @param unbindRequest
- * The unbind request, which may be {@code null} if one was not sent before the connection was
- * closed.
- */
- void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
-
- /**
- * Invoked when the connection has been disconnected by the server.
- *
- * @param context
- * The {@link LDAPClientContext} which has been disconnected
- * @param resultCode
- * The result code which was included with the disconnect notification, or {@code null} if no
- * disconnect notification was sent.
- * @param diagnosticMessage
- * The diagnostic message, which may be empty or {@code null} indicating that none was provided.
- */
- void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
- }
-
/**
- * Register a listener which will be notified when this {@link LDAPClientContext} is disconnected.
+ * Register a listener which will be notified when this {@link LDAPClientContext} changes state.
*
- * @param listener The {@link ConnectionEventListener} to register.
+ * @param listener The {@link LDAPClientContextEventListener} to register.
*/
- void addConnectionEventListener(ConnectionEventListener listener);
+ void addListener(LDAPClientContextEventListener listener);
/**
* Disconnects the client without sending a disconnect notification. Invoking this method causes
- * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
- * before this method returns.
+ * {@link LDAPClientContextEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be
+ * called before this method returns.
*/
void disconnect();
/**
* Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
* message. Invoking this method causes
- * {@link ConnectionEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be called
- * before this method returns.
+ * {@link LDAPClientContextEventListener#handleConnectionDisconnected(LDAPClientContext, ResultCode, String)} to be
+ * called before this method returns.
*
* @param resultCode
* The result code to include with the disconnect notification
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java
new file mode 100644
index 0000000..30e0116
--- /dev/null
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContextEventListener.java
@@ -0,0 +1,58 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+
+package org.forgerock.opendj.ldap;
+
+import java.util.EventListener;
+
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
+
+/** A listener interface for handling LDAPClientContext state changes. */
+public interface LDAPClientContextEventListener extends EventListener {
+ /**
+ * Invoked when the connection has been disconnected because of an error (e.g: message too big).
+ *
+ * @param context
+ * The {@link LDAPClientContext} which has failed
+ * @param error
+ * The error
+ */
+ void handleConnectionError(LDAPClientContext context, Throwable error);
+
+ /**
+ * Invoked when the client closed the connection, possibly using an unbind request.
+ *
+ * @param context
+ * The {@link LDAPClientContext} which has been disconnected
+ * @param unbindRequest
+ * The unbind request, which may be {@code null} if one was not sent before the connection was
+ * closed.
+ */
+ void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
+
+ /**
+ * Invoked when the connection has been disconnected by the server.
+ *
+ * @param context
+ * The {@link LDAPClientContext} which has been disconnected
+ * @param resultCode
+ * The result code which was included with the disconnect notification, or {@code null} if no
+ * disconnect notification was sent.
+ * @param diagnosticMessage
+ * The diagnostic message, which may be empty or {@code null} indicating that none was provided.
+ */
+ void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
+}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index d749ddd..5bb9307 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -41,6 +41,7 @@
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
@@ -76,7 +77,6 @@
import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
-import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
@@ -165,7 +165,7 @@
@Override
public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
- clientContext.notifyConnectionClosedSilently(rawRequest);
+ clientContext.notifyConnectionClosedRawUnbind(rawRequest);
return emptyStream();
}
Stream<Response> response;
@@ -192,7 +192,7 @@
}, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
@Override
public Publisher<Void> apply(Throwable error) throws Exception {
- clientContext.notifyErrorSilently(error);
+ clientContext.notifyConnectionError(error);
// Swallow the error to prevent the subscribe() below to report it on the console.
return emptyStream();
}
@@ -306,7 +306,7 @@
private final Connection<?> connection;
private volatile boolean isClosed;
- private final List<ConnectionEventListener> connectionEventListeners = new LinkedList<>();
+ private final List<LDAPClientContextEventListener> connectionEventListeners = new LinkedList<>();
private SaslServer saslServer;
private GrizzlyBackpressureSubscription downstream;
@@ -341,7 +341,7 @@
if (immutableRef != null) {
immutableRef.onComplete();
}
- notifyConnectionClosedSilently(null);
+ notifyConnectionClosedRawUnbind(null);
return ctx.getStopAction();
}
@@ -487,7 +487,7 @@
}
@Override
- public void addConnectionEventListener(ConnectionEventListener listener) {
+ public void addListener(final LDAPClientContextEventListener listener) {
Reject.ifNull(listener, "listener must not be null");
synchronized (connectionEventListeners) {
connectionEventListeners.add(listener);
@@ -496,7 +496,7 @@
@Override
public void disconnect() {
- notifySilentlyConnectionDisconnected(null, null);
+ notifyConnectionDisconnected(null, null);
connection.closeSilently();
}
@@ -507,46 +507,41 @@
@Override
public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
- notifySilentlyConnectionDisconnected(resultCode, diagnosticMessage);
+ notifyConnectionDisconnected(resultCode, diagnosticMessage);
sendUnsolicitedNotification(
newGenericExtendedResult(resultCode)
.setOID(OID_NOTICE_OF_DISCONNECTION)
.setDiagnosticMessage(diagnosticMessage)
- ).subscribe(new Action() {
+ ).doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
closeConnection();
}
- }, new Consumer<Throwable>() {
- @Override
- public void accept(final Throwable error) throws Exception {
- closeConnection();
- }
- });
+ }).subscribe();
}
- private void notifyConnectionClosedSilently(final LdapRequestEnvelope unbindRequest) {
+ private void notifyConnectionClosedRawUnbind(final LdapRequestEnvelope rawUnbindRequest) {
// Close this connection context.
- if (unbindRequest == null) {
- notifySilentlyConnectionClosed(null);
+ if (rawUnbindRequest == null) {
+ notifyConnectionClosed(null);
} else {
try {
- LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
+ LDAP.getReader(rawUnbindRequest.getContent(), new DecodeOptions())
.readMessage(new AbstractLDAPMessageHandler() {
@Override
- public void unbindRequest(int messageID, UnbindRequest unbindRequest)
+ public void unbindRequest(final int messageID, final UnbindRequest unbindRequest)
throws DecodeException, IOException {
- notifySilentlyConnectionClosed(unbindRequest);
+ notifyConnectionClosed(unbindRequest);
}
});
} catch (Exception e) {
- notifySilentlyConnectionClosed(null);
+ notifyConnectionClosed(null);
}
}
}
- private void notifySilentlyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
- for (final ConnectionEventListener listener : getAndClearListeners()) {
+ private void notifyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
+ for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
try {
listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
} catch (Exception e) {
@@ -555,8 +550,8 @@
}
}
- private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
- for (final ConnectionEventListener listener : getAndClearListeners()) {
+ private void notifyConnectionClosed(final UnbindRequest unbindRequest) {
+ for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
try {
listener.handleConnectionClosed(this, unbindRequest);
} catch (Exception e) {
@@ -565,8 +560,8 @@
}
}
- private void notifyErrorSilently(final Throwable error) {
- for (final ConnectionEventListener listener : getAndClearListeners()) {
+ private void notifyConnectionError(final Throwable error) {
+ for (final LDAPClientContextEventListener listener : getAndClearListeners()) {
try {
listener.handleConnectionError(this, error);
} catch (Exception e) {
@@ -575,9 +570,9 @@
}
}
- private ArrayList<ConnectionEventListener> getAndClearListeners() {
+ private List<LDAPClientContextEventListener> getAndClearListeners() {
synchronized (connectionEventListeners) {
- final ArrayList<ConnectionEventListener> listeners = new ArrayList<>(connectionEventListeners);
+ final ArrayList<LDAPClientContextEventListener> listeners = new ArrayList<>(connectionEventListeners);
connectionEventListeners.clear();
return listeners;
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 70dc842..defc825 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -32,16 +32,14 @@
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -56,7 +54,7 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -64,6 +62,7 @@
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Reject;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.AbandonOperationBasis;
@@ -107,6 +106,7 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.util.TimeThread;
+import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -229,7 +229,7 @@
}
connectionID = DirectoryServer.newConnectionAccepted(this);
- clientContext.addConnectionEventListener(new ConnectionEventListener() {
+ clientContext.addListener(new LDAPClientContextEventListener() {
@Override
public void handleConnectionError(LDAPClientContext context, Throwable error) {
if (error instanceof LocalizableException) {
@@ -1676,17 +1676,17 @@
}
/** Upstream -> BlockingBackpressureSubscription -> Downstream. */
- private final class BlockingBackpressureSubscription
- implements Subscription, Publisher<Response>, Subscriber<Response> {
- private long pendingRequests;
- private final Queue<Response> queue = new LinkedList<>();
- private final Lock lock = new ReentrantLock();
- private final Condition spaceAvailable = lock.newCondition();
+ private final class BlockingBackpressureSubscription implements Subscription, Processor<Response, Response> {
+ private final AtomicLong pendingRequests = new AtomicLong();
+ private final AtomicInteger missedDrain = new AtomicInteger();
+ private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(32);
private final Publisher<Response> upstream;
private final long writeTimeoutMillis;
- private boolean upstreamCompleted;
private Subscription subscription;
private Subscriber<? super Response> downstream;
+ private volatile boolean done;
+ private Throwable error;
+ private volatile boolean cancelled;
BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
this.upstream = upstream;
@@ -1697,17 +1697,19 @@
@Override
public void subscribe(final Subscriber<? super Response> subscriber) {
+ Reject.ifNull(subscriber);
if (downstream != null) {
+ // This publisher only support one subscriber.
return;
}
downstream = subscriber;
- subscriber.onSubscribe(this);
- upstream.subscribe(this);
+ subscriber.onSubscribe(/* Subscription */ this);
+ upstream.subscribe(/* Subscriber */ this);
}
@Override
public void onSubscribe(final Subscription s) {
- if ( subscription != null) {
+ if (subscription != null) {
s.cancel();
return;
}
@@ -1716,91 +1718,117 @@
}
@Override
- public void request(long n) {
- lock.lock();
- try {
- if (pendingRequests != Long.MIN_VALUE) {
- pendingRequests += n;
- drain();
+ public void request(final long n) {
+ if (n == Long.MAX_VALUE) {
+ pendingRequests.set(Long.MAX_VALUE);
+ } else {
+ // There is a known and accepted problem here regarding reactive-stream contract in the sense that
+ // we're not supporting pendingRequests overflow (pendingRequests + n > Long.MAX_VALUE) for performance
+ // reason since this should never happen in the context we're using it.
+ pendingRequests.addAndGet(n);
+ }
+ drain();
+ }
+
+ // Taken from
+ // https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation
+ private void drain() {
+ if (missedDrain.getAndIncrement() != 0) {
+ // Another thread is already executing this drain method.
+ return;
+ }
+
+ int missed = 1;
+
+ for (;;) {
+ final long immutablePendingRequests = pendingRequests.get();
+ long emitted = 0L;
+ while (emitted != immutablePendingRequests) {
+ // Check if we should early exit because of cancellation
+ if (cancelled) {
+ return;
+ }
+
+ final Response response = queue.poll();
+ if (response != null) {
+ downstream.onNext(response);
+ emitted++;
+ } else if (done) {
+ // queue is empty and we received a completion (onError/onComplete) notification from upstream
+ forwardDoneEvent();
+ return;
+ } else {
+ // Queue is empty but upstream is not done yet.
+ break;
+ }
}
- } finally {
- lock.unlock();
+
+ // Check if an onError/onComplete from upstream arrived.
+ if (emitted == immutablePendingRequests) {
+ if (cancelled) {
+ return;
+ }
+
+ if (done && queue.isEmpty()) {
+ forwardDoneEvent();
+ return;
+ }
+ }
+
+ if (emitted != 0) {
+ pendingRequests.addAndGet(-emitted);
+ }
+
+ // Check to see if another thread asked for drain
+ missed = missedDrain.addAndGet(-missed);
+ if (missed == 0) {
+ // Nop, we can exit.
+ break;
+ }
}
}
- private void drain() {
- Response response;
- try {
- while (pendingRequests > 0 && (response = queue.poll()) != null) {
- downstream.onNext(response);
- // Forward response
- pendingRequests--;
- }
- if (upstreamCompleted && queue.isEmpty()) {
- if (pendingRequests != Long.MIN_VALUE) {
- downstream.onComplete();
- }
- cancel();
- }
- } finally {
- spaceAvailable.signalAll();
+ private void forwardDoneEvent() {
+ final Throwable immutableError = error;
+ if (immutableError != null) {
+ downstream.onError(immutableError);
+ } else {
+ downstream.onComplete();
}
}
@Override
public void onNext(final Response response) {
- lock.lock();
try {
- while (queue.size() >= 32) {
- try {
- if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
- // If we've gotten here, then the write timed out.
- downstream.onError(new ClosedChannelException());
- cancel();
- return;
- }
- } catch (InterruptedException e) {
- downstream.onError(e);
- cancel();
- return;
- }
+ if (queue.offer(response, writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
+ drain();
+ } else {
+ // If we've gotten here, then the write timed out.
+ onError(new ClosedChannelException().fillInStackTrace());
+ return;
}
- queue.add(response);
- drain();
- } finally {
- lock.unlock();
+ } catch (InterruptedException e) {
+ onError(e);
}
}
@Override
- public void onError(Throwable t) {
- downstream.onError(t);
- cancel();
+ public void onError(final Throwable error) {
+ this.error = error;
+ done = true;
+ drain();
}
@Override
public void onComplete() {
- lock.lock();
- try {
- // the onComplete() will be forwarded downstream once the pending messages have been flushed
- upstreamCompleted = true;
- drain();
- } finally {
- lock.unlock();
- }
+ done = true;
+ drain();
}
@Override
public void cancel() {
- lock.lock();
- try {
- pendingRequests = Long.MIN_VALUE;
- } finally {
- lock.unlock();
- }
- if (subscription != null) {
- subscription.cancel();
- }
+ cancelled = true;
+ subscription.cancel();
}
}
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index 16421e9..07a8cd4 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -53,7 +53,7 @@
import org.forgerock.opendj.ldap.AddressMask;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
@@ -641,7 +641,7 @@
LDAPClientContext clientContext) throws LdapException {
final LDAPClientConnection2 conn = canAccept(clientContext);
clientConnections.add(conn);
- clientContext.addConnectionEventListener(new ConnectionEventListener() {
+ clientContext.addListener(new LDAPClientContextEventListener() {
@Override
public void handleConnectionError(final LDAPClientContext context, final Throwable error) {
clientConnections.remove(conn);
--
Gitblit v1.10.0