From 5511a94238385a30b5b516ee360b234ff56d7c3f Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-core/src/main/java/com/forgerock/reactive/Completable.java | 10
opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java | 4
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java | 38 ++
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java | 63 +++--
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java | 63 ++++-
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java | 2
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java | 6
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java | 24 +-
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 322 ++++++++++++++++------------
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java | 21 +
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java | 16 +
opendj-core/src/main/java/com/forgerock/reactive/Single.java | 20 +
opendj-core/src/main/java/com/forgerock/reactive/Stream.java | 9
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java | 4
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 4
15 files changed, 379 insertions(+), 227 deletions(-)
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
index a60df6b..73a62f9 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -21,9 +21,9 @@
public interface Completable extends Publisher<Void> {
/** Emitter is used to notify when the operation has been completed, successfully or not. */
- public interface Emitter {
+ public interface Subscriber {
/** Notify that this {@link Completable} is now completed. */
- void complete();
+ void onComplete();
/**
* Notify that this {@link Completable} cannot be completed because of an error.
@@ -35,14 +35,14 @@
}
/** Adapts the streaming api to a callback one. */
- public interface OnSubscribe {
+ public interface Emitter {
/**
* Called when the streaming api has been subscribed.
*
* @param e
- * The {@link Emitter} to use to communicate the completeness of this {@link Completable}
+ * The {@link Subscriber} to use to communicate the completeness of this {@link Completable}
*/
- void onSubscribe(Emitter e);
+ void subscribe(Subscriber e);
}
/**
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 7831df5..a5a6231 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -17,7 +17,6 @@
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
@@ -112,21 +111,34 @@
}
/**
+ * Create a new {@link Single} from the given error.
+ *
+ * @param <V>
+ * Type of the datum emitted
+ * @param error
+ * The error emitted by this {@link Single}
+ * @return A new {@link Single}
+ */
+ public static <V> Single<V> singleError(final Throwable error) {
+ return new RxJavaSingle<>(io.reactivex.Single.<V>error(error));
+ }
+
+ /**
* Creates a bridge from callback world to {@link Single}.
*
* @param <V>
* Type of the datum emitted
- * @param onSubscribe
+ * @param emitter
* Action to perform once this {@link Single} has been subscribed to.
* @return A new {@link Single}
*/
- public static <V> Single<V> newSingle(final Single.OnSubscribe<V> onSubscribe) {
+ public static <V> Single<V> newSingle(final Single.Emitter<V> emitter) {
return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe<V>() {
@Override
public void subscribe(final SingleEmitter<V> e) throws Exception {
- onSubscribe.onSubscribe(new Single.Emitter<V>() {
+ emitter.subscribe(new Single.Subscriber<V>() {
@Override
- public void onSuccess(V value) {
+ public void onComplete(V value) {
e.onSuccess(value);
}
@@ -146,18 +158,18 @@
* Action to perform once this {@link Completable} has been subscribed to.
* @return A new {@link Completable}
*/
- public static Completable newCompletable(final Completable.OnSubscribe onSubscribe) {
+ public static Completable newCompletable(final Completable.Emitter onSubscribe) {
return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter e) throws Exception {
- onSubscribe.onSubscribe(new Completable.Emitter() {
+ onSubscribe.subscribe(new Completable.Subscriber() {
@Override
- public void complete() {
+ public void onComplete() {
e.onComplete();
}
@Override
- public void onError(Throwable t) {
- e.onError(t);
+ public void onError(final Throwable error) {
+ e.onError(error);
}
});
}
@@ -166,14 +178,14 @@
private static final class RxJavaStream<V> implements Stream<V> {
- private Flowable<V> impl;
+ private final Flowable<V> impl;
private RxJavaStream(final Flowable<V> impl) {
this.impl = impl;
}
@Override
- public void subscribe(Subscriber<? super V> s) {
+ public void subscribe(org.reactivestreams.Subscriber<? super V> s) {
impl.subscribe(s);
}
@@ -219,6 +231,16 @@
}
@Override
+ public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
+ return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
+ @Override
+ public void accept(Throwable t) throws Exception {
+ onError.accept(t);
+ }
+ }));
+ }
+
+ @Override
public Stream<V> onErrorResumeWith(final Function<Throwable, Publisher<V>, Exception> function) {
return new RxJavaStream<>(
impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, Publisher<? extends V>>() {
@@ -259,7 +281,7 @@
}
@Override
- public void subscribe(Subscriber<? super V> s) {
+ public void subscribe(org.reactivestreams.Subscriber<? super V> s) {
impl.toFlowable().subscribe(s);
}
@@ -279,7 +301,7 @@
}
@Override
- public <O> Single<O> flatMap(final Function<V, Publisher<O>, Exception> function) {
+ public <O> Single<O> flatMap(final Function<V, Single<O>, Exception> function) {
return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function<V, SingleSource<O>>() {
@Override
public SingleSource<O> apply(V t) throws Exception {
@@ -287,6 +309,17 @@
}
}));
}
+
+ @Override
+ public Single<V> onErrorResumeWith(final Function<Throwable, Single<V>, Exception> function) {
+ return new RxJavaSingle<>(
+ impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, SingleSource<V>>() {
+ @Override
+ public SingleSource<V> apply(Throwable error) throws Exception {
+ return io.reactivex.Single.fromPublisher(function.apply(error));
+ }
+ }));
+ }
}
private static final class RxJavaCompletable implements Completable {
@@ -303,7 +336,7 @@
}
@Override
- public void subscribe(Subscriber<? super Void> s) {
+ public void subscribe(org.reactivestreams.Subscriber<? super Void> s) {
impl.<Void>toFlowable().subscribe(s);
}
}
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Single.java b/opendj-core/src/main/java/com/forgerock/reactive/Single.java
index f5d7b25..a1f534f 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Single.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Single.java
@@ -27,14 +27,14 @@
public interface Single<V> extends Publisher<V> {
/** Emitter is used to notify when the operation has been completed, successfully or not. */
- public interface Emitter<V> {
+ public interface Subscriber<V> {
/**
* Signal a success value.
*
* @param t
* the value, not null
*/
- void onSuccess(V t);
+ void onComplete(V t);
/**
* Signal an exception.
@@ -46,13 +46,13 @@
}
/** Adapts the streaming api to a callback one. */
- public interface OnSubscribe<V> {
+ public interface Emitter<V> {
/**
* Called for each SingleObserver that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
- void onSubscribe(Emitter<V> e) throws Exception;
+ void subscribe(Subscriber<V> e) throws Exception;
}
/**
@@ -82,7 +82,17 @@
* Function to apply to perform the asynchronous transformation
* @return A new {@link Single} transforming the datum emitted by this {@link Single}.
*/
- <O> Single<O> flatMap(Function<V, Publisher<O>, Exception> function);
+ <O> Single<O> flatMap(Function<V, Single<O>, Exception> function);
+
+ /**
+ * When an error occurs in this stream, continue the processing with the new {@link Single} provided by the
+ * function.
+ *
+ * @param function
+ * Generates the single which must will used to resume operation when this {@link Single} failed.
+ * @return A new {@link Single}
+ */
+ Single<V> onErrorResumeWith(Function<Throwable, Single<V>, Exception> function);
/**
* Subscribe to the single value emitted by this {@link Single}.
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
index b19ba37..68e6ae0 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -75,6 +75,15 @@
Stream<V> onErrorResumeWith(Function<Throwable, Publisher<V>, Exception> function);
/**
+ * Invokes the on error {@link Consumer} when an error occurs on this stream.
+ *
+ * @param onError
+ * The {@link Consumer} to invoke on error
+ * @return a new {@link Stream}
+ */
+ Stream<V> onErrorDo(Consumer<Throwable> onError);
+
+ /**
* Subscribe to this stream and drop all data produced by it.
*/
void subscribe();
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
index a9ba54f..0f2531f 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
@@ -28,7 +28,7 @@
* {@code ByteSequenceReader} must be created using the associated
* {@code ByteSequence}'s {@code asReader()} method.
*/
-public final class ByteSequenceReader {
+public class ByteSequenceReader {
/** The current position in the byte sequence. */
private int pos;
@@ -55,7 +55,7 @@
* @param sequence
* The byte sequence to be read.
*/
- ByteSequenceReader(final ByteSequence sequence) {
+ public ByteSequenceReader(final ByteSequence sequence) {
this.sequence = sequence;
}
diff --git a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
index 9bb905b..b88edbb 100644
--- a/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
+++ b/opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -23,6 +23,7 @@
import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
/**
@@ -36,17 +37,39 @@
/** Listens for disconnection event. */
public interface DisconnectListener {
/**
- * Invoked when the connection has been closed as a result of a client disconnect, a fatal connection error, or
- * a server-side {@link #disconnect}.
+ * Invoked when the connection has been disconnected because of an error (i.e: message too big).
+ *
+ * @param context
+ * The {@link LDAPClientContext} which has failed
+ * @param error
+ * The error
+ */
+ void exceptionOccurred(final LDAPClientContext context, final Throwable error);
+
+ /**
+ * Invoked when the client closes the connection, possibly using an unbind request.
+ *
+ * @param context
+ * The {@link LDAPClientContext} which has been disconnected
+ * @param unbindRequest
+ * The unbind request, which may be {@code null} if one was not sent before the connection was
+ * closed.
+ */
+ void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest);
+
+ /**
+ * Invoked when the connection has been disconnected by the server.
*
* @param context
* The {@link LDAPClientContext} which has been disconnected
* @param resultCode
- * The {@link ResultCode} of the notification sent, or null
- * @param message
- * The message of the notification sent, or null
+ * The result code which was included with the disconnect notification, or {@code null} if no
+ * disconnect notification was sent.
+ * @param diagnosticMessage
+ * The diagnostic message, which may be empty or {@code null} indicating that none was provided.
*/
- void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode, final String message);
+ void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
+ final String diagnosticMessage);
}
/**
@@ -57,30 +80,24 @@
void onDisconnect(final DisconnectListener listener);
/**
- * Disconnects the client without sending a disconnect notification.
- * <p>
- * <b>Server connections:</b> invoking this method causes
- * {@link ServerConnection#handleConnectionDisconnected
- * handleConnectionDisconnected} to be called before this method returns.
+ * Disconnects the client without sending a disconnect notification. Invoking this method causes
+ * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
+ * method returns.
*/
void disconnect();
/**
- * Disconnects the client and sends a disconnect notification, if possible,
- * containing the provided result code and diagnostic message.
- * <p>
- * <b>Server connections:</b> invoking this method causes
- * {@link ServerConnection#handleConnectionDisconnected
- * handleConnectionDisconnected} to be called before this method returns.
+ * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic
+ * message. Invoking this method causes
+ * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this
+ * method returns.
*
* @param resultCode
- * The result code which should be included with the disconnect
- * notification.
- * @param message
- * The diagnostic message, which may be empty or {@code null}
- * indicating that none was provided.
+ * The result code to include with the disconnect notification
+ * @param diagnosticMessage
+ * The diagnostic message to include with the disconnect notification
*/
- void disconnect(final ResultCode resultCode, final String message);
+ void disconnect(final ResultCode resultCode, final String diagnosticMessage);
/**
* Returns the {@code InetSocketAddress} associated with the local system.
diff --git a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
index e67b849..c2dce7e 100644
--- a/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
+++ b/opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -101,10 +101,22 @@
final ServerConnectionAdaptor<Integer> adapter = new ServerConnectionAdaptor<>(serverConnection);
clientContext.onDisconnect(new DisconnectListener() {
@Override
- public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String message) {
- serverConnection.handleConnectionDisconnected(resultCode, message);
+ public void exceptionOccurred(final LDAPClientContext context, final Throwable error) {
+ serverConnection.handleConnectionError(error);
+ }
+
+ @Override
+ public void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) {
+ serverConnection.handleConnectionClosed(0, unbindRequest);
+ }
+
+ @Override
+ public void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
+ final String diagnosticMessage) {
+ serverConnection.handleConnectionDisconnected(resultCode, diagnosticMessage);
}
});
+
return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
@Override
public Single<Stream<Response>> handle(final LDAPClientContext context,
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
index 1de0ea0..1f77cca 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -38,6 +38,7 @@
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.glassfish.grizzly.filterchain.FilterChain;
+import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.nio.transport.TCPNIOBindingHandler;
import org.glassfish.grizzly.nio.transport.TCPNIOConnection;
import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection;
@@ -66,7 +67,7 @@
* The addresses to listen on.
* @param options
* The LDAP listener options.
- * @param handler
+ * @param requestHandlerFactory
* The server connection factory which will be used to create server connections.
* @throws IOException
* If an error occurred while trying to listen on the provided address.
@@ -74,8 +75,8 @@
public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses, final Options options,
final Function<LDAPClientContext,
ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
- LdapException> handler) throws IOException {
- this(addresses, handler, options, null);
+ LdapException> requestHandlerFactory) throws IOException {
+ this(addresses, requestHandlerFactory, options, null);
}
/**
@@ -84,7 +85,7 @@
*
* @param addresses
* The addresses to listen on.
- * @param handler
+ * @param requestHandlerFactory
* The server connection factory which will be used to create server connections.
* @param options
* The LDAP listener options.
@@ -97,14 +98,20 @@
public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses,
final Function<LDAPClientContext,
ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
- LdapException> handler,
+ LdapException> requestHandlerFactory,
final Options options, TCPNIOTransport transport) throws IOException {
+
this.transport = DEFAULT_TRANSPORT.acquireIfNull(transport);
this.options = Options.copyOf(options);
- final LDAPServerFilter serverFilter = new LDAPServerFilter(handler, options,
+ final LDAPServerFilter serverFilter = new LDAPServerFilter(requestHandlerFactory, options,
options.get(LDAP_DECODE_OPTIONS), options.get(MAX_CONCURRENT_REQUESTS));
final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(),
- new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)), serverFilter);
+ new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)) {
+ @Override
+ protected void onLdapCodecError(FilterChainContext ctx, Throwable error) {
+ serverFilter.exceptionOccurred(ctx, error);
+ }
+ }, serverFilter);
final TCPNIOBindingHandler bindingHandler = TCPNIOBindingHandler.builder(this.transport.get())
.processor(ldapChain).build();
this.serverConnections = new ArrayList<>(addresses.size());
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 787a336..8532b9d 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -34,14 +34,14 @@
import javax.security.sasl.SaslServer;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.io.AbstractLDAPMessageHandler;
import org.forgerock.opendj.io.LDAP;
+import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
-import org.forgerock.opendj.ldap.controls.Control;
-import org.forgerock.opendj.ldap.responses.BindResult;
-import org.forgerock.opendj.ldap.responses.CompareResult;
+import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.IntermediateResponse;
import org.forgerock.opendj.ldap.responses.Response;
@@ -55,8 +55,6 @@
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
-import org.glassfish.grizzly.CloseReason;
-import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.attributes.Attribute;
@@ -72,8 +70,9 @@
import org.reactivestreams.Subscription;
import com.forgerock.reactive.Completable;
-import com.forgerock.reactive.Completable.Emitter;
+import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
+import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.internal.util.BackpressureHelper;
@@ -84,12 +83,14 @@
*/
final class LDAPServerFilter extends BaseFilter {
+ private static final Object DUMMY = new byte[0];
+
private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
.createAttribute("LDAPServerConnection");
private final Function<LDAPClientContext,
ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
- LdapException> connectionHandler;
+ LdapException> connectionHandlerFactory;
/**
* Map of cipher phrases to effective key size (bits). Taken from the
@@ -132,11 +133,11 @@
* The maximum BER element size, or <code>0</code> to indicate that there is no limit.
*/
LDAPServerFilter(
- Function<LDAPClientContext,
- ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
- LdapException> connectionHandler,
+ final Function<LDAPClientContext,
+ ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
+ LdapException> connectionHandlerFactory,
final Options connectionOptions, final DecodeOptions options, final int maxPendingRequests) {
- this.connectionHandler = connectionHandler;
+ this.connectionHandlerFactory = connectionHandlerFactory;
this.connectionOptions = connectionOptions;
this.maxConcurrentRequests = maxPendingRequests;
}
@@ -152,98 +153,86 @@
configureConnection(connection, logger, connectionOptions);
final ClientConnectionImpl clientContext = new ClientConnectionImpl(connection);
LDAP_CONNECTION_ATTR.set(connection, clientContext);
- final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> handler = connectionHandler
- .apply(clientContext);
+ final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
+ connectionHandlerFactory.apply(clientContext);
- streamFromPublisher(clientContext).flatMap(new Function<LdapRawMessage, Publisher<Integer>, Exception>() {
- @Override
- public Publisher<Integer> apply(final LdapRawMessage rawRequest) throws Exception {
- return handler
- .handle(clientContext, rawRequest)
- .flatMap(new Function<Stream<Response>, Publisher<Integer>, Exception>() {
- @Override
- public Publisher<Integer> apply(final Stream<Response> response) {
- return clientContext.write(response.onErrorResumeWith(toErrorMessage(rawRequest))
- .map(toResponseMessage(rawRequest))).toSingle(1);
- }
- });
- }
- }, maxConcurrentRequests).subscribe();
+ clientContext
+ .read()
+ .flatMap(new Function<LdapRawMessage, Publisher<Object>, Exception>() {
+ @Override
+ public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
+ if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
+ clientContext.notifyAndClose(rawRequest);
+ return singleFrom(DUMMY);
+ }
+ Single<Stream<Response>> response;
+ try {
+ response = requestHandler.handle(clientContext, rawRequest);
+ } catch (Exception e) {
+ response = singleError(e);
+ }
+ return response
+ .flatMap(new Function<Stream<Response>, Single<Object>, Exception>() {
+ @Override
+ public Single<Object> apply(final Stream<Response> response) {
+ return clientContext.write(response.map(toLdapResponseMessage(rawRequest)))
+ .toSingle(DUMMY);
+ }
+ })
+ .onErrorResumeWith(new Function<Throwable, Single<Object>, Exception>() {
+ @Override
+ public Single<Object> apply(final Throwable error) throws Exception {
+ if (!(error instanceof LdapException)) {
+ // Unexpected error, propagate it.
+ return singleError(error);
+ }
+ final LdapException exception = (LdapException) error;
+ return clientContext
+ .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult())))
+ .toSingle(DUMMY);
+ }
+ });
+ }
+ }, maxConcurrentRequests)
+ .onErrorDo(new Consumer<Throwable>() {
+ @Override
+ public void accept(final Throwable error) throws Exception {
+ clientContext.notifyAndCloseSilently(error);
+ }
+ })
+ .subscribe();
return ctx.getStopAction();
}
- private Function<Throwable, Publisher<Response>, Exception> toErrorMessage(final LdapRawMessage rawRequest) {
- return new Function<Throwable, Publisher<Response>, Exception>() {
- @Override
- public Publisher<Response> apply(Throwable error) throws Exception {
- if (!(error instanceof LdapException)) {
- // Propagate error
- return streamError(error);
- }
-
- final Result result = ((LdapException) error).getResult();
- switch (rawRequest.getMessageType()) {
- case OP_TYPE_BIND_REQUEST:
- if (result instanceof BindResult) {
- return streamFrom((Response) result);
- }
- return streamFrom((Response) populateNewResultFromResult(
- Responses.newBindResult(result.getResultCode()), result));
- case OP_TYPE_COMPARE_REQUEST:
- if (result instanceof CompareResult) {
- return streamFrom((Response) result);
- }
- return streamFrom((Response) populateNewResultFromResult(
- Responses.newCompareResult(result.getResultCode()), result));
- case OP_TYPE_EXTENDED_REQUEST:
- if (result instanceof ExtendedResult) {
- return streamFrom((Response) result);
- }
- return streamFrom((Response) populateNewResultFromResult(
- Responses.newGenericExtendedResult(result.getResultCode()), result));
- default:
- return streamFrom((Response) result);
- }
- }
- };
- }
-
- private Result populateNewResultFromResult(final Result newResult,
- final Result result) {
- newResult.setDiagnosticMessage(result.getDiagnosticMessage());
- newResult.setMatchedDN(result.getMatchedDN());
- newResult.setCause(result.getCause());
- for (final Control control : result.getControls()) {
- newResult.addControl(control);
+ private final LdapResponseMessage toLdapResponseMessage(final LdapRawMessage rawRequest, final Result result) {
+ switch (rawRequest.getMessageType()) {
+ case OP_TYPE_ADD_REQUEST:
+ return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), result);
+ case OP_TYPE_BIND_REQUEST:
+ return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), result);
+ case OP_TYPE_COMPARE_REQUEST:
+ return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), result);
+ case OP_TYPE_DELETE_REQUEST:
+ return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), result);
+ case OP_TYPE_EXTENDED_REQUEST:
+ return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), result);
+ case OP_TYPE_MODIFY_DN_REQUEST:
+ return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), result);
+ case OP_TYPE_MODIFY_REQUEST:
+ return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), result);
+ case OP_TYPE_SEARCH_REQUEST:
+ return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), result);
+ default:
+ throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
}
- return newResult;
}
- private Function<Response, LdapResponseMessage, Exception> toResponseMessage(final LdapRawMessage rawRequest) {
+ private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(final LdapRawMessage rawRequest) {
return new Function<Response, LdapResponseMessage, Exception>() {
@Override
public LdapResponseMessage apply(final Response response) {
if (response instanceof Result) {
- switch (rawRequest.getMessageType()) {
- case OP_TYPE_ADD_REQUEST:
- return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), response);
- case OP_TYPE_BIND_REQUEST:
- return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), response);
- case OP_TYPE_COMPARE_REQUEST:
- return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), response);
- case OP_TYPE_DELETE_REQUEST:
- return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), response);
- case OP_TYPE_EXTENDED_REQUEST:
- return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), response);
- case OP_TYPE_MODIFY_DN_REQUEST:
- return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), response);
- case OP_TYPE_MODIFY_REQUEST:
- return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), response);
- case OP_TYPE_SEARCH_REQUEST:
- return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), response);
- default:
- throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType());
- }
+ return toLdapResponseMessage(rawRequest, (Result) response);
}
if (response instanceof IntermediateResponse) {
return newResponseMessage(OP_TYPE_INTERMEDIATE_RESPONSE, rawRequest.getMessageId(), response);
@@ -274,7 +263,7 @@
return ctx.getStopAction();
}
- final class ClientConnectionImpl implements LDAPClientContext, Publisher<LdapRawMessage> {
+ final class ClientConnectionImpl implements LDAPClientContext {
final class GrizzlyBackpressureSubscription implements Subscription {
private final AtomicLong pendingRequests = new AtomicLong();
@@ -339,38 +328,24 @@
private ClientConnectionImpl(final Connection<?> connection) {
this.connection = connection;
- connection.closeFuture().addCompletionHandler(new CompletionHandler<CloseReason>() {
- @Override
- public void completed(CloseReason result) {
- disconnect0(null, null);
- }
+ }
+ Stream<LdapRawMessage> read() {
+ return streamFromPublisher(new Publisher<LdapRawMessage>() {
@Override
- public void updated(CloseReason result) {
- }
-
- @Override
- public void failed(Throwable throwable) {
- }
-
- @Override
- public void cancelled() {
+ public void subscribe(final Subscriber<? super LdapRawMessage> subscriber) {
+ if (upstream != null) {
+ return;
+ }
+ upstream = new GrizzlyBackpressureSubscription(subscriber);
}
});
}
- @Override
- public void subscribe(final Subscriber<? super LdapRawMessage> s) {
- if (upstream != null) {
- return;
- }
- this.upstream = new GrizzlyBackpressureSubscription(s);
- }
-
Completable write(final Publisher<LdapResponseMessage> messages) {
- return newCompletable(new Completable.OnSubscribe() {
+ return newCompletable(new Completable.Emitter() {
@Override
- public void onSubscribe(Emitter e) {
+ public void subscribe(Completable.Subscriber e) {
messages.subscribe(new LdapResponseMessageWriter(connection, e));
}
});
@@ -378,9 +353,9 @@
@Override
public void enableTLS(final SSLEngine sslEngine) {
- Reject.ifNull(sslEngine);
+ Reject.ifNull(sslEngine, "sslEngine must not be null");
synchronized (this) {
- if (isTLSEnabled()) {
+ if (isFilterExists(SSLFilter.class)) {
throw new IllegalStateException("TLS already enabled");
}
SSLUtils.setSSLEngine(connection, sslEngine);
@@ -390,8 +365,14 @@
@Override
public void enableSASL(final SaslServer saslServer) {
- SaslUtils.setSaslServer(connection, saslServer);
- installFilter(new SaslFilter());
+ Reject.ifNull(saslServer, "saslServer must not be null");
+ synchronized (this) {
+ if (isFilterExists(SaslFilter.class)) {
+ throw new IllegalStateException("Sasl already enabled");
+ }
+ SaslUtils.setSaslServer(connection, saslServer);
+ installFilter(new SaslFilter());
+ }
}
@Override
@@ -474,16 +455,11 @@
GrizzlyUtils.addFilterToConnection(filter, connection);
}
- /**
- * Indicates whether TLS is enabled this provided connection.
- *
- * @return {@code true} if TLS is enabled on this connection, otherwise {@code false}.
- */
- private boolean isTLSEnabled() {
+ private boolean isFilterExists(Class<?> filterKlass) {
synchronized (this) {
final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
for (final Filter filter : currentFilterChain) {
- if (filter instanceof SSLFilter) {
+ if (filterKlass.isAssignableFrom(filter.getClass())) {
return true;
}
}
@@ -493,35 +469,97 @@
@Override
public void onDisconnect(DisconnectListener listener) {
+ Reject.ifNull(listener, "listener must not be null");
listeners.add(listener);
}
@Override
public void disconnect() {
- disconnect0(null, null);
+ if (isClosed.compareAndSet(false, true)) {
+ try {
+ for (DisconnectListener listener : listeners) {
+ listener.connectionDisconnected(this, null, null);
+ }
+ } finally {
+ closeConnection();
+ }
+ }
+ }
+
+ private void closeConnection() {
+ if (upstream != null) {
+ upstream.cancel();
+ upstream = null;
+ }
+ connection.closeSilently();
}
@Override
- public void disconnect(final ResultCode resultCode, final String message) {
- sendUnsolicitedNotification(
- Responses.newGenericExtendedResult(resultCode)
- .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION)
- .setDiagnosticMessage(message));
- disconnect0(resultCode, message);
+ public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
+ // Close this connection context.
+ if (isClosed.compareAndSet(false, true)) {
+ sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
+ .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION)
+ .setDiagnosticMessage(diagnosticMessage));
+ try {
+ for (DisconnectListener listener : listeners) {
+ listener.connectionDisconnected(this, resultCode, diagnosticMessage);
+ }
+ } finally {
+ closeConnection();
+ }
+ }
}
- private void disconnect0(final ResultCode resultCode, final String message) {
+ private void notifyAndClose(final LdapRawMessage unbindRequest) {
+ // Close this connection context.
+ if (isClosed.compareAndSet(false, true)) {
+ if (unbindRequest == null) {
+ doNotifySilentlyConnectionClosed(null);
+ } else {
+ try {
+ LDAP.getReader(unbindRequest.getContent(), new DecodeOptions())
+ .readMessage(new AbstractLDAPMessageHandler() {
+ @Override
+ public void unbindRequest(int messageID, UnbindRequest unbindRequest)
+ throws DecodeException, IOException {
+ doNotifySilentlyConnectionClosed(unbindRequest);
+ }
+ });
+ } catch (Exception e) {
+ doNotifySilentlyConnectionClosed(null);
+ }
+ }
+ }
+ }
+
+ private void doNotifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
+ try {
+ for (final DisconnectListener listener : listeners) {
+ try {
+ listener.connectionClosed(this, unbindRequest);
+ } catch (Exception e) {
+ // TODO: Log as a warning ?
+ }
+ }
+ } finally {
+ closeConnection();
+ }
+ }
+
+ private void notifyAndCloseSilently(final Throwable error) {
// Close this connection context.
if (isClosed.compareAndSet(false, true)) {
try {
- // Notify the server connection: it may be null if disconnect is
- // invoked during accept.
- for (DisconnectListener listener : listeners) {
- listener.connectionDisconnected(this, resultCode, message);
+ for (final DisconnectListener listener : listeners) {
+ try {
+ listener.exceptionOccurred(this, error);
+ } catch (Exception e) {
+ // TODO: Log as a warning ?
+ }
}
} finally {
- // Close the connection.
- connection.closeSilently();
+ closeConnection();
}
}
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
index 730a7b1..b58139b 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -36,7 +36,7 @@
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
-final class LdapCodec extends LDAPBaseFilter {
+abstract class LdapCodec extends LDAPBaseFilter {
LdapCodec(final int maxElementSize, final DecodeOptions decodeOptions) {
super(decodeOptions, maxElementSize);
@@ -44,15 +44,29 @@
@Override
public NextAction handleRead(final FilterChainContext ctx) throws IOException {
- final Buffer buffer = ctx.getMessage();
- final LdapRawMessage message = readMessage(buffer);
- if (message != null) {
- ctx.setMessage(message);
- return ctx.getInvokeAction(getRemainingBuffer(buffer));
+ try {
+ final Buffer buffer = ctx.getMessage();
+ final LdapRawMessage message;
+
+ message = readMessage(buffer);
+ if (message != null) {
+ ctx.setMessage(message);
+ return ctx.getInvokeAction(getRemainingBuffer(buffer));
+ }
+ return ctx.getStopAction(getRemainingBuffer(buffer));
+ } catch (Exception e) {
+ onLdapCodecError(ctx, e);
+ // make the connection deaf to any following input
+ // onLdapDecodeError call will take care of error processing
+ // and closing the connection
+ final NextAction suspendAction = ctx.getSuspendAction();
+ ctx.completeAndRecycle();
+ return suspendAction;
}
- return ctx.getStopAction(getRemainingBuffer(buffer));
}
+ protected abstract void onLdapCodecError(FilterChainContext ctx, Throwable error);
+
private LdapRawMessage readMessage(final Buffer buffer) throws IOException {
try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
final int packetStart = buffer.position();
@@ -123,10 +137,18 @@
try {
final Buffer buffer = toBuffer(writer, ctx.<LdapResponseMessage> getMessage());
ctx.setMessage(buffer);
+ return ctx.getInvokeAction();
+ } catch (Exception e) {
+ onLdapCodecError(ctx, e);
+ // make the connection deaf to any following input
+ // onLdapDecodeError call will take care of error processing
+ // and closing the connection
+ final NextAction suspendAction = ctx.getSuspendAction();
+ ctx.completeAndRecycle();
+ return suspendAction;
} finally {
GrizzlyUtils.recycleWriter(writer);
}
- return ctx.getInvokeAction();
}
private Buffer toBuffer(final LDAPWriter<ASN1BufferWriter> writer, final LdapResponseMessage message)
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 9e3fa30..007ce33 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -26,10 +26,10 @@
final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
private final Connection<?> connection;
- private final Completable.Emitter completable;
+ private final Completable.Subscriber completable;
private Subscription upstream;
- LdapResponseMessageWriter(final Connection<?> connection, final Completable.Emitter completable) {
+ LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
this.connection = connection;
this.completable = completable;
}
@@ -72,6 +72,6 @@
@Override
public void onComplete() {
- completable.complete();
+ completable.onComplete();
}
}
diff --git a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
index a893b367..8890a14 100644
--- a/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
+++ b/opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -610,7 +610,7 @@
* @throws Exception
* If an unexpected error occurred.
*/
- @Test(enabled = false)
+ @Test
public void testMaxRequestSize() throws Exception {
final MockServerConnection serverConnection = new MockServerConnection();
final MockServerConnectionFactory factory =
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
index 5bea25b..14db1c6 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
@@ -171,58 +171,58 @@
public Single<Stream<Response>> filter(final LDAPClientConnection2 context,
final LdapRawMessage encodedRequestMessage,
final ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> next) throws Exception {
- return newSingle(new Single.OnSubscribe<Request>() {
+ return newSingle(new Single.Emitter<Request>() {
@Override
- public void onSubscribe(final Single.Emitter<Request> emitter) throws Exception {
+ public void subscribe(final Single.Subscriber<Request> subscriber) throws Exception {
LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions)
.readMessage(new AbstractLDAPMessageHandler() {
@Override
public void abandonRequest(final int messageID, final AbandonRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void addRequest(int messageID, AddRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void bindRequest(int messageID, int version, GenericBindRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void modifyDNRequest(int messageID, ModifyDNRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void modifyRequest(int messageID, ModifyRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void searchRequest(int messageID, SearchRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void unbindRequest(int messageID, UnbindRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
});
}
- }).flatMap(new Function<Request, Publisher<Stream<Response>>, Exception>() {
+ }).flatMap(new Function<Request, Single<Stream<Response>>, Exception>() {
@Override
- public Publisher<Stream<Response>> apply(Request t) throws Exception {
- return next.handle(context, t);
+ public Single<Stream<Response>> apply(final Request request) throws Exception {
+ return next.handle(context, request);
}
});
}
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 2bff320..7341f9b 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -711,6 +711,10 @@
logger.traceException(e);
}
}
+ else
+ {
+ clientContext.disconnect();
+ }
// NYI -- Deregister the client connection from any server components that
// might know about it.
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index af7326a..5e71d6c 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -747,8 +747,8 @@
LdapException>() {
@Override
public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>
- apply(LDAPClientContext value) throws LdapException {
- final LDAPClientConnection2 conn = canAccept(value);
+ apply(LDAPClientContext clientContext) throws LdapException {
+ final LDAPClientConnection2 conn = canAccept(clientContext);
return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
@Override
public Single<Stream<Response>> handle(LDAPClientContext context,
--
Gitblit v1.10.0