From bb8d8ab8ac1bc14b26abf45c8fda5cf571c1c9bb Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-core/src/main/java/com/forgerock/reactive/Completable.java | 11 ++
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java | 2
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java | 9 +-
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java | 23 +++++
opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java | 6
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 95 ++++++++++-------------
opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java | 2
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java | 3
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 34 +------
9 files changed, 91 insertions(+), 94 deletions(-)
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index 73a62f9..b8391d6 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -15,6 +15,7 @@
*/
package com.forgerock.reactive;
+import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
/** {@link Completable} is used to communicates a terminated operation which doesn't produce a result. */
@@ -46,6 +47,16 @@
}
/**
+ * When an error occurs in this completable, continue the processing with the new {@link Completable} provided by
+ * the function.
+ *
+ * @param function
+ * Generates the stream which must will used to resume operation when this {@link Completable} failed.
+ * @return A new {@link Completable}
+ */
+ Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
+
+ /**
* Creates a {@link Single} which will emit the specified value when this {@link Completable} complete.
*
* @param <V>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java
index 7a23201..d04212c 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveFilter.java
@@ -131,7 +131,7 @@
final ReactiveFilter<C, I1, O1, I2, O2> parent = this;
return new ReactiveHandler<C, I1, O1>() {
@Override
- public Single<O1> handle(final C context, final I1 request) throws Exception {
+ public O1 handle(final C context, final I1 request) throws Exception {
return parent.filter(context, request, handler);
}
};
@@ -150,7 +150,7 @@
* @throws Exception
* If the operation cannot be done
*/
- public abstract Single<O1> filter(final C context, final I1 request, final ReactiveHandler<C, I2, O2> next)
+ public abstract O1 filter(final C context, final I1 request, final ReactiveHandler<C, I2, O2> next)
throws Exception;
private static final class ConcatenatedFilter<C, I1, O1, I2, O2> extends ReactiveFilter<C, I1, O1, I2, O2> {
@@ -178,7 +178,7 @@
}
@Override
- public Single<O1> filter(C context, I1 request, ReactiveHandler<C, I2, O2> handler) throws Exception {
+ public O1 filter(C context, I1 request, ReactiveHandler<C, I2, O2> handler) throws Exception {
return converter.apply(handler).handle(context, request);
}
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
index 849b511..689e54c 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ReactiveHandler.java
@@ -37,5 +37,5 @@
* @throws Exception
* if the request cannot be processed
*/
- Single<REP> handle(final CTX context, final REQ request) throws Exception;
+ REP handle(final CTX context, final REQ request) throws Exception;
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 6e6367c..35ed67c 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -20,6 +20,7 @@
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
+import io.reactivex.CompletableSource;
import io.reactivex.Flowable;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
@@ -176,6 +177,17 @@
}));
}
+ /**
+ * Create a new {@link Completable} from the given error.
+ *
+ * @param error
+ * The error emitted by this {@link Completable}
+ * @return A new {@link Completable}
+ */
+ public static Completable completableError(final Throwable error) {
+ return new RxJavaCompletable(io.reactivex.Completable.error(error));
+ }
+
private static final class RxJavaStream<V> implements Stream<V> {
private final Flowable<V> impl;
@@ -339,5 +351,16 @@
public void subscribe(org.reactivestreams.Subscriber<? super Void> s) {
impl.<Void>toFlowable().subscribe(s);
}
+
+ @Override
+ public Completable onErrorResumeWith(final Function<Throwable, Completable, Exception> function) {
+ return new RxJavaCompletable(
+ impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, CompletableSource>() {
+ @Override
+ public CompletableSource apply(Throwable error) throws Exception {
+ return io.reactivex.Completable.fromPublisher(function.apply(error));
+ }
+ }));
+ }
}
}
diff --git a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
index f3ee127..126f97b 100644
--- a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
+++ b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -15,7 +15,7 @@
*/
package com.forgerock.opendj.grizzly;
-import static com.forgerock.reactive.RxJavaStreams.*;
+import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -56,7 +56,6 @@
import org.forgerock.util.Options;
import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureStrategy;
@@ -121,10 +120,10 @@
final DecodeOptions decodeOptions = options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS);
return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
@Override
- public Single<Stream<Response>> handle(final LDAPClientContext context,
+ public Stream<Response> handle(final LDAPClientContext context,
final LdapRawMessage rawRequest) throws Exception {
final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(), decodeOptions);
- return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
+ return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
@Override
public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
reader.readMessage(new AbstractLDAPMessageHandler() {
@@ -190,7 +189,7 @@
});
emitter.onComplete();
}
- }, BackpressureStrategy.ERROR)));
+ }, BackpressureStrategy.ERROR));
}
};
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
index 9ff4040..d752449 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
@@ -135,7 +135,7 @@
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
/** Initial size of newly created buffers. */
- private static final int BUFFER_INIT_SIZE = 4096;
+ private static final int BUFFER_INIT_SIZE = 1024;
/** Default maximum size for cached protocol/entry encoding buffers. */
private static final int DEFAULT_MAX_INTERNAL_BUFFER_SIZE = 32 * 1024;
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 5650e98..189f31c 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -72,7 +72,6 @@
import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.internal.util.BackpressureHelper;
@@ -83,8 +82,6 @@
*/
final class LDAPServerFilter extends BaseFilter {
- private static final Object DUMMY = new byte[0];
-
private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
.createAttribute("LDAPServerConnection");
@@ -153,59 +150,47 @@
final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
connectionHandlerFactory.apply(clientContext);
- clientContext
- .read()
- .flatMap(new Function<LdapRawMessage, Publisher<Object>, Exception>() {
- @Override
- public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
- if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
- clientContext.notifyConnectionClosed(rawRequest);
- return singleFrom(DUMMY);
- }
- Single<Stream<Response>> response;
- try {
- response = requestHandler.handle(clientContext, rawRequest);
- } catch (Exception e) {
- response = singleError(e);
- }
- return response
- .flatMap(new Function<Stream<Response>, Single<Object>, Exception>() {
- @Override
- public Single<Object> apply(final Stream<Response> response) {
- return clientContext.write(response.map(toLdapResponseMessage(rawRequest)))
- .toSingle(DUMMY);
+ clientContext.read().flatMap(new Function<LdapRawMessage, Publisher<Void>, Exception>() {
+ @Override
+ public Publisher<Void> apply(final LdapRawMessage rawRequest) throws Exception {
+ if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
+ clientContext.notifyConnectionClosed(rawRequest);
+ return emptyStream();
+ }
+ Stream<Response> response;
+ try {
+ response = requestHandler.handle(clientContext, rawRequest);
+ } catch (Exception e) {
+ response = streamError(e);
+ }
+ return clientContext
+ .write(response.map(toLdapResponseMessage(rawRequest)))
+ .onErrorResumeWith(new Function<Throwable, Completable, Exception>() {
+ @Override
+ public Completable apply(final Throwable error) throws Exception {
+ if (!(error instanceof LdapException)) {
+ // Unexpected error, propagate it.
+ return completableError(error);
}
- })
- .onErrorResumeWith(new Function<Throwable, Single<Object>, Exception>() {
- @Override
- public Single<Object> apply(final Throwable error) throws Exception {
- if (!(error instanceof LdapException)) {
- // Unexpected error, propagate it.
- return singleError(error);
- }
- final LdapException exception = (LdapException) error;
- return clientContext
- .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())))
- .toSingle(DUMMY);
- }
- });
- }
- }, maxConcurrentRequests)
- .onErrorResumeWith(new Function<Throwable, Publisher<Object>, Exception>() {
- @Override
- public Publisher<Object> apply(Throwable error) throws Exception {
- clientContext.notifyErrorAndCloseSilently(error);
- // Swallow the error to prevent the subscribe() below to report it on the console.
- return streamFrom(DUMMY);
- }
- })
- .onCompleteDo(new Action() {
- @Override
- public void run() throws Exception {
- clientContext.notifyConnectionClosed(null);
- }
- })
- .subscribe();
+ final LdapException exception = (LdapException) error;
+ return clientContext
+ .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())));
+ }
+ });
+ }
+ }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
+ @Override
+ public Publisher<Void> apply(Throwable error) throws Exception {
+ clientContext.notifyErrorAndCloseSilently(error);
+ // Swallow the error to prevent the subscribe() below to report it on the console.
+ return emptyStream();
+ }
+ }).onCompleteDo(new Action() {
+ @Override
+ public void run() throws Exception {
+ clientContext.notifyConnectionClosed(null);
+ }
+ }).subscribe();
return ctx.getStopAction();
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index efe9929..af9a708 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -16,30 +16,11 @@
*/
package org.forgerock.opendj.reactive;
-import static com.forgerock.reactive.RxJavaStreams.*;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY;
-import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE;
+import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
+import static org.forgerock.opendj.io.LDAP.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
-import static org.opends.server.protocols.ldap.LDAPConstants.*;
import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
import static org.opends.server.util.StaticUtils.*;
@@ -130,7 +111,6 @@
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureStrategy;
@@ -982,8 +962,8 @@
* a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
*/
@Override
- public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
- return singleFrom(streamFromPublisher(
+ public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
+ return streamFromPublisher(
new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
@Override
public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
@@ -997,7 +977,7 @@
toLdapResponseType(message, response), message.getMessageId());
}
}
- }));
+ });
}
private final byte toLdapResultType(final byte requestType) {
@@ -1851,8 +1831,8 @@
BlockingBackpressureSubscription(final Publisher<Response> upstream) {
this.upstream = upstream;
- this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
- ? 30000 // Do not wait indefinitely,
+ this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
+ ? 30000 // Do not wait indefinitely,
: connectionHandler.getMaxBlockedWriteTimeLimit();
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index 8127e01..b7dc22f 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -90,7 +90,6 @@
import org.opends.server.util.StaticUtils;
import com.forgerock.reactive.ReactiveHandler;
-import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
/**
@@ -668,7 +667,7 @@
});
return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
@Override
- public Single<Stream<Response>> handle(LDAPClientContext context, LdapRawMessage request)
+ public Stream<Response> handle(LDAPClientContext context, LdapRawMessage request)
throws Exception {
return conn.handle(queueingStrategy, request);
}
--
Gitblit v1.10.0