From 5e9caa5055a78b930500690cb919effd5c8bc2b1 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Fri, 18 Nov 2016 10:31:28 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-core/src/main/java/com/forgerock/reactive/Completable.java | 10 +
opendj-core/clirr-ignored-api-changes.xml | 7 +
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java | 5
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java | 15 ++
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 207 +++++++++++++++++++++-------------
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 75 +++++++----
6 files changed, 212 insertions(+), 107 deletions(-)
diff --git a/opendj-core/clirr-ignored-api-changes.xml b/opendj-core/clirr-ignored-api-changes.xml
index 6a629c1..1d6d8ce 100644
--- a/opendj-core/clirr-ignored-api-changes.xml
+++ b/opendj-core/clirr-ignored-api-changes.xml
@@ -232,4 +232,11 @@
<method>org.forgerock.opendj.io.LDAPWriter getWriter(org.forgerock.opendj.io.ASN1Writer)</method>
<justification>Add support for LdapV2 encoding</justification>
</difference>
+ <difference>
+ <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
+ <differenceType>7006</differenceType>
+ <method>void sendUnsolicitedNotification(org.forgerock.opendj.ldap.responses.ExtendedResult)</method>
+ <to>com.forgerock.reactive.Completable</to>
+ <justification>Return a completable so that operation can be chained (i.e: closing connection)</justification>
+ </difference>
</differences>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index dabe352..3f12e71 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -63,6 +63,16 @@
Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
/**
+ * Subscribe to the result of this {@link Completable}.
+ *
+ * @param completeAction
+ * An {@link Action} which will be invoked on successful completion
+ * @param errorConsumer
+ * A {@link Consumer} which will be invoked on error
+ */
+ void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
+
+ /**
* Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
*
* @param <V>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index de0d8bf..5944468 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -364,5 +364,20 @@
}
}));
}
+
+ @Override
+ public void subscribe(final Action completeAction, final Consumer<Throwable> errorConsumer) {
+ impl.subscribe(new io.reactivex.functions.Action() {
+ @Override
+ public void run() throws Exception {
+ completeAction.run();
+ }
+ }, new io.reactivex.functions.Consumer<Throwable>() {
+ @Override
+ public void accept(final Throwable error) throws Exception {
+ errorConsumer.accept(error);
+ }
+ });
+ }
}
}
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
index 83d4cf5..47548ef 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -27,6 +27,8 @@
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
+import com.forgerock.reactive.Completable;
+
/**
* An LDAP client which has connected to a {@link ServerConnectionFactory}. An
* LDAP client context can be used to query information about the client's
@@ -169,8 +171,9 @@
*
* @param notification
* The notification to send.
+ * @return A {@link Completable} which will be completed once the notification has been sent.
*/
- void sendUnsolicitedNotification(ExtendedResult notification);
+ Completable sendUnsolicitedNotification(ExtendedResult notification);
/**
* Installs the TLS/SSL security layer on the underlying connection. The TLS/SSL security layer will be installed
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 0fcaa00..7849e9e 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -19,13 +19,15 @@
import static com.forgerock.reactive.RxJavaStreams.*;
import static org.forgerock.opendj.grizzly.GrizzlyUtils.configureConnection;
import static org.forgerock.opendj.io.LDAP.*;
+import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult;
import static org.forgerock.opendj.ldap.spi.LdapMessages.newResponseMessage;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLEngine;
@@ -46,7 +48,6 @@
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.IntermediateResponse;
import org.forgerock.opendj.ldap.responses.Response;
-import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
@@ -55,7 +56,12 @@
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
+import org.forgerock.util.promise.ExceptionHandler;
+import org.forgerock.util.promise.PromiseImpl;
+import org.forgerock.util.promise.ResultHandler;
+import org.forgerock.util.promise.RuntimeExceptionHandler;
import org.glassfish.grizzly.Connection;
+import org.glassfish.grizzly.EmptyCompletionHandler;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.Filter;
import org.glassfish.grizzly.filterchain.FilterChain;
@@ -70,6 +76,7 @@
import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
+import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
@@ -163,7 +170,7 @@
@Override
public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
- clientContext.notifyConnectionClosed(rawRequest);
+ clientContext.notifyConnectionClosedSilently(rawRequest);
return emptyStream();
}
Stream<Response> response;
@@ -190,14 +197,14 @@
}, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
@Override
public Publisher<Void> apply(Throwable error) throws Exception {
- clientContext.notifyErrorAndCloseSilently(error);
+ clientContext.notifyErrorSilently(error);
// Swallow the error to prevent the subscribe() below to report it on the console.
return emptyStream();
}
}).onComplete(new Action() {
@Override
public void run() throws Exception {
- clientContext.notifyConnectionClosed(null);
+ connection.closeSilently();
}
}).subscribe();
return ctx.getStopAction();
@@ -303,8 +310,8 @@
}
private final Connection<?> connection;
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private final List<ConnectionEventListener> listeners = new LinkedList<>();
+ private volatile boolean isClosed;
+ private final List<ConnectionEventListener> connectionEventListeners = new LinkedList<>();
private SaslServer saslServer;
private GrizzlyBackpressureSubscription downstream;
@@ -334,12 +341,12 @@
@Override
public NextAction handleClose(final FilterChainContext ctx) {
- if (isClosed.compareAndSet(false, true)) {
- final GrizzlyBackpressureSubscription immutableRef = downstream;
- if (immutableRef != null) {
- downstream.onComplete();
- }
+ isClosed = true;
+ final GrizzlyBackpressureSubscription immutableRef = downstream;
+ if (immutableRef != null) {
+ immutableRef.onComplete();
}
+ notifyConnectionClosedSilently(null);
return ctx.getStopAction();
}
@@ -489,108 +496,152 @@
@Override
public void addConnectionEventListener(ConnectionEventListener listener) {
Reject.ifNull(listener, "listener must not be null");
- listeners.add(listener);
+ synchronized (connectionEventListeners) {
+ connectionEventListeners.add(listener);
+ }
}
@Override
public void disconnect() {
- if (isClosed.compareAndSet(false, true)) {
- try {
- for (ConnectionEventListener listener : listeners) {
- listener.handleConnectionDisconnected(this, null, null);
- }
- } finally {
- closeConnection();
- }
- }
+ notifySilentlyConnectionDisconnected(null, null);
+ connection.closeSilently();
}
private void closeConnection() {
- if (downstream != null) {
- downstream.cancel();
- downstream = null;
- }
+ downstream.cancel();
connection.closeSilently();
}
@Override
public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
- // Close this connection context.
- if (isClosed.compareAndSet(false, true)) {
- sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
- .setOID(OID_NOTICE_OF_DISCONNECTION)
- .setDiagnosticMessage(diagnosticMessage));
- try {
- for (ConnectionEventListener listener : listeners) {
- listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
- }
- } finally {
+ notifySilentlyConnectionDisconnected(resultCode, diagnosticMessage);
+ sendUnsolicitedNotification(
+ newGenericExtendedResult(resultCode)
+ .setOID(OID_NOTICE_OF_DISCONNECTION)
+ .setDiagnosticMessage(diagnosticMessage)
+ ).subscribe(new Action() {
+ @Override
+ public void run() throws Exception {
closeConnection();
}
+ }, new Consumer<Throwable>() {
+ @Override
+ public void accept(final Throwable error) throws Exception {
+ closeConnection();
+ }
+ });
+ }
+
+ private void notifyConnectionClosedSilently(final LdapRequestEnvelope unbindRequest) {
+ // Close this connection context.
+ if (unbindRequest == null) {
+ notifySilentlyConnectionClosed(null);
+ } else {
+ try {
+ LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
+ .readMessage(new AbstractLDAPMessageHandler() {
+ @Override
+ public void unbindRequest(int messageID, UnbindRequest unbindRequest)
+ throws DecodeException, IOException {
+ notifySilentlyConnectionClosed(unbindRequest);
+ }
+ });
+ } catch (Exception e) {
+ notifySilentlyConnectionClosed(null);
+ }
}
}
- private void notifyConnectionClosed(final LdapRequestEnvelope unbindRequest) {
- // Close this connection context.
- if (isClosed.compareAndSet(false, true)) {
- if (unbindRequest == null) {
- notifySilentlyConnectionClosed(null);
- } else {
- try {
- LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
- .readMessage(new AbstractLDAPMessageHandler() {
- @Override
- public void unbindRequest(int messageID, UnbindRequest unbindRequest)
- throws DecodeException, IOException {
- notifySilentlyConnectionClosed(unbindRequest);
- }
- });
- } catch (Exception e) {
- notifySilentlyConnectionClosed(null);
- }
+ private void notifySilentlyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
+ for (final ConnectionEventListener listener : getAndClearListeners()) {
+ try {
+ listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
+ } catch (Exception e) {
+ logger.traceException(e);
}
}
}
private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
- try {
- for (final ConnectionEventListener listener : listeners) {
- try {
- listener.handleConnectionClosed(this, unbindRequest);
- } catch (Exception e) {
- logger.traceException(e);
- }
+ for (final ConnectionEventListener listener : getAndClearListeners()) {
+ try {
+ listener.handleConnectionClosed(this, unbindRequest);
+ } catch (Exception e) {
+ logger.traceException(e);
}
- } finally {
- closeConnection();
}
}
- private void notifyErrorAndCloseSilently(final Throwable error) {
- // Close this connection context.
- if (isClosed.compareAndSet(false, true)) {
+ private void notifyErrorSilently(final Throwable error) {
+ for (final ConnectionEventListener listener : getAndClearListeners()) {
try {
- for (final ConnectionEventListener listener : listeners) {
- try {
- listener.handleConnectionError(this, error);
- } catch (Exception e) {
- logger.traceException(e);
- }
- }
- } finally {
- closeConnection();
+ listener.handleConnectionError(this, error);
+ } catch (Exception e) {
+ logger.traceException(e);
}
}
}
+ private ArrayList<ConnectionEventListener> getAndClearListeners() {
+ synchronized (connectionEventListeners) {
+ final ArrayList<ConnectionEventListener> listeners = new ArrayList<>(connectionEventListeners);
+ connectionEventListeners.clear();
+ return listeners;
+ }
+ }
+
@Override
public boolean isClosed() {
- return isClosed.get();
+ return isClosed;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
- public void sendUnsolicitedNotification(final ExtendedResult notification) {
- connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification));
+ public Completable sendUnsolicitedNotification(final ExtendedResult notification) {
+ // We use a promise so that the notification is sent even if the Completable is not subscribed.
+ final PromiseImpl<Boolean, Exception> promise = PromiseImpl.create();
+ connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification),
+ new EmptyCompletionHandler() {
+ @Override
+ public void cancelled() {
+ promise.handleException(new CancellationException());
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ if (throwable instanceof Exception) {
+ promise.handleException((Exception) throwable);
+ } else {
+ promise.handleException(new Exception(throwable));
+ }
+ }
+
+ @Override
+ public void completed(Object result) {
+ promise.handleResult(Boolean.TRUE);
+ }
+ });
+ return newCompletable(new Completable.Emitter() {
+ @Override
+ public void subscribe(final Completable.Subscriber s) throws Exception {
+ promise.thenOnResult(new ResultHandler<Boolean>() {
+ @Override
+ public void handleResult(Boolean result) {
+ s.onComplete();
+ }
+ }).thenOnException(new ExceptionHandler<Exception>() {
+ @Override
+ public void handleException(Exception exception) {
+ s.onError(exception);
+ }
+ }).thenOnRuntimeException(new RuntimeExceptionHandler() {
+ @Override
+ public void handleRuntimeException(RuntimeException exception) {
+ s.onError(exception);
+ }
+ });
+ }
+ });
}
}
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index bab468d..a753eee 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -958,27 +958,29 @@
*/
@Override
public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
- return streamFromPublisher(new BlockingBackpressureSubscription(
- Flowable.create(new FlowableOnSubscribe<Response>() {
- @Override
- public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
- try {
- processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
- } finally {
- // We don't need the ASN1Reader anymore.
- closeSilently(message.getContent());
- }
- }
- }, BackpressureStrategy.ERROR)))
- .onNext(new Consumer<Response>() {
- @Override
- public void accept(final Response response) throws Exception {
- if (keepStats) {
- statTracker.updateMessageWritten(
- toLdapResponseType(message, response), message.getMessageId());
- }
- }
- });
+ return streamFromPublisher(
+ new BlockingBackpressureSubscription(connectionHandler.getMaxBlockedWriteTimeLimit(),
+ Flowable.create(new FlowableOnSubscribe<Response>() {
+ @Override
+ public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
+ try {
+ processLDAPMessage(
+ queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+ } finally {
+ // We don't need the ASN1Reader anymore.
+ closeSilently(message.getContent());
+ }
+ }
+ }, BackpressureStrategy.ERROR)))
+ .onNext(new Consumer<Response>() {
+ @Override
+ public void accept(final Response response) throws Exception {
+ if (keepStats) {
+ statTracker.updateMessageWritten(
+ toLdapResponseType(message, response), message.getMessageId());
+ }
+ }
+ });
}
private final byte toLdapResultType(final byte requestType) {
@@ -1682,14 +1684,15 @@
private final Condition spaceAvailable = lock.newCondition();
private final Publisher<Response> upstream;
private final long writeTimeoutMillis;
+ private boolean upstreamCompleted;
private Subscription subscription;
private Subscriber<? super Response> downstream;
- BlockingBackpressureSubscription(final Publisher<Response> upstream) {
+ BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
this.upstream = upstream;
- this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
+ this.writeTimeoutMillis = maxBlockedWriteTimeLimit == 0
? 30000 // Do not wait indefinitely,
- : connectionHandler.getMaxBlockedWriteTimeLimit();
+ : maxBlockedWriteTimeLimit;
}
@Override
@@ -1733,6 +1736,12 @@
// Forward response
pendingRequests--;
}
+ if (upstreamCompleted && queue.isEmpty()) {
+ if (pendingRequests != Long.MIN_VALUE) {
+ downstream.onComplete();
+ }
+ cancel();
+ }
} finally {
spaceAvailable.signalAll();
}
@@ -1771,17 +1780,27 @@
@Override
public void onComplete() {
- downstream.onComplete();
- cancel();
+ lock.lock();
+ try {
+ // the onComplete() will be forwarded downstream once the pending messages have been flushed
+ upstreamCompleted = true;
+ drain();
+ } finally {
+ lock.unlock();
+ }
}
@Override
public void cancel() {
+ lock.lock();
+ try {
+ pendingRequests = Long.MIN_VALUE;
+ } finally {
+ lock.unlock();
+ }
if (subscription != null) {
subscription.cancel();
}
- queue.clear();
- pendingRequests = Long.MIN_VALUE;
}
}
}
--
Gitblit v1.10.0