From b59e161bd2d26df6023efea77353c8f3f61dd37b Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java | 30 +++++----------
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 15 +++++--
opendj-core/src/main/java/com/forgerock/reactive/Stream.java | 21 ++++------
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 30 +++++++++++++++
4 files changed, 60 insertions(+), 36 deletions(-)
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index a5a6231..3fb0ec5 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -211,26 +211,6 @@
}
@Override
- public void subscribe(final Consumer<V> onResult, final Consumer<Throwable> onError, final Action onComplete) {
- impl.subscribe(new io.reactivex.functions.Consumer<V>() {
- @Override
- public void accept(V t) throws Exception {
- onResult.accept(t);
- }
- }, new io.reactivex.functions.Consumer<Throwable>() {
- @Override
- public void accept(Throwable t) throws Exception {
- onError.accept(t);
- }
- }, new io.reactivex.functions.Action() {
- @Override
- public void run() throws Exception {
- onComplete.run();
- }
- });
- }
-
- @Override
public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
@Override
@@ -252,6 +232,16 @@
}
@Override
+ public Stream<V> onCompleteDo(final Action action) {
+ return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
+ @Override
+ public void run() throws Exception {
+ action.run();
+ }
+ }));
+ }
+
+ @Override
public void subscribe() {
impl.subscribe();
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
index 68e6ae0..ebeca68 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -53,18 +53,6 @@
int maxConcurrency);
/**
- * Subscribe to the data emitted by this {@link Stream}.
- *
- * @param onResult
- * The {@link Consumer} invoked for each data of this stream
- * @param onError
- * The {@link Consumer} invoked on error in this stream.
- * @param onComplete
- * The {@link Action} invoked once this {@link Stream} is completed.
- */
- void subscribe(Consumer<V> onResult, Consumer<Throwable> onError, Action onComplete);
-
- /**
* When an error occurs in this stream, continue the processing with the new {@link Stream} provided by the
* function.
*
@@ -84,6 +72,15 @@
Stream<V> onErrorDo(Consumer<Throwable> onError);
/**
+ * Invokes the on complete {@link Action} when this stream is completed.
+ *
+ * @param onComplete
+ * The {@link Action} to invoke on stream completion
+ * @return a new {@link Stream}
+ */
+ Stream<V> onCompleteDo(Action onComplete);
+
+ /**
* Subscribe to this stream and drop all data produced by it.
*/
void subscribe();
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 8532b9d..d2f8c98 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -69,6 +69,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
+import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
@@ -162,7 +163,7 @@
@Override
public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
- clientContext.notifyAndClose(rawRequest);
+ clientContext.notifyConnectionClosed(rawRequest);
return singleFrom(DUMMY);
}
Single<Stream<Response>> response;
@@ -197,7 +198,13 @@
.onErrorDo(new Consumer<Throwable>() {
@Override
public void accept(final Throwable error) throws Exception {
- clientContext.notifyAndCloseSilently(error);
+ clientContext.notifyErrorAndCloseSilently(error);
+ }
+ })
+ .onCompleteDo(new Action() {
+ @Override
+ public void run() throws Exception {
+ clientContext.notifyConnectionClosed(null);
}
})
.subscribe();
@@ -511,7 +518,7 @@
}
}
- private void notifyAndClose(final LdapRawMessage unbindRequest) {
+ private void notifyConnectionClosed(final LdapRawMessage unbindRequest) {
// Close this connection context.
if (isClosed.compareAndSet(false, true)) {
if (unbindRequest == null) {
@@ -547,7 +554,7 @@
}
}
- private void notifyAndCloseSilently(final Throwable error) {
+ private void notifyErrorAndCloseSilently(final Throwable error) {
// Close this connection context.
if (isClosed.compareAndSet(false, true)) {
try {
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 7341f9b..0e14bea 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -34,6 +34,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.forgerock.i18n.LocalizableException;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -41,7 +42,9 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
+import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Responses;
@@ -221,6 +224,33 @@
}
connectionID = DirectoryServer.newConnectionAccepted(this);
+ clientContext.onDisconnect(new DisconnectListener()
+ {
+ @Override
+ public void exceptionOccurred(LDAPClientContext context, Throwable error)
+ {
+ if (error instanceof LocalizableException)
+ {
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
+ }
+ else
+ {
+ disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
+ }
+ }
+
+ @Override
+ public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage)
+ {
+ disconnect(DisconnectReason.SERVER_ERROR, false, null);
+ }
+
+ @Override
+ public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest)
+ {
+ disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
+ }
+ });
}
/**
--
Gitblit v1.10.0