opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -21,9 +21,9 @@ public interface Completable extends Publisher<Void> { /** Emitter is used to notify when the operation has been completed, successfully or not. */ public interface Emitter { public interface Subscriber { /** Notify that this {@link Completable} is now completed. */ void complete(); void onComplete(); /** * Notify that this {@link Completable} cannot be completed because of an error. @@ -35,14 +35,14 @@ } /** Adapts the streaming api to a callback one. */ public interface OnSubscribe { public interface Emitter { /** * Called when the streaming api has been subscribed. * * @param e * The {@link Emitter} to use to communicate the completeness of this {@link Completable} * The {@link Subscriber} to use to communicate the completeness of this {@link Completable} */ void onSubscribe(Emitter e); void subscribe(Subscriber e); } /** opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -17,7 +17,6 @@ import org.forgerock.util.Function; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import io.reactivex.CompletableEmitter; import io.reactivex.CompletableOnSubscribe; @@ -112,21 +111,34 @@ } /** * Create a new {@link Single} from the given error. * * @param <V> * Type of the datum emitted * @param error * The error emitted by this {@link Single} * @return A new {@link Single} */ public static <V> Single<V> singleError(final Throwable error) { return new RxJavaSingle<>(io.reactivex.Single.<V>error(error)); } /** * Creates a bridge from callback world to {@link Single}. * * @param <V> * Type of the datum emitted * @param onSubscribe * @param emitter * Action to perform once this {@link Single} has been subscribed to. * @return A new {@link Single} */ public static <V> Single<V> newSingle(final Single.OnSubscribe<V> onSubscribe) { public static <V> Single<V> newSingle(final Single.Emitter<V> emitter) { return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe<V>() { @Override public void subscribe(final SingleEmitter<V> e) throws Exception { onSubscribe.onSubscribe(new Single.Emitter<V>() { emitter.subscribe(new Single.Subscriber<V>() { @Override public void onSuccess(V value) { public void onComplete(V value) { e.onSuccess(value); } @@ -146,18 +158,18 @@ * Action to perform once this {@link Completable} has been subscribed to. * @return A new {@link Completable} */ public static Completable newCompletable(final Completable.OnSubscribe onSubscribe) { public static Completable newCompletable(final Completable.Emitter onSubscribe) { return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(final CompletableEmitter e) throws Exception { onSubscribe.onSubscribe(new Completable.Emitter() { onSubscribe.subscribe(new Completable.Subscriber() { @Override public void complete() { public void onComplete() { e.onComplete(); } @Override public void onError(Throwable t) { e.onError(t); public void onError(final Throwable error) { e.onError(error); } }); } @@ -166,14 +178,14 @@ private static final class RxJavaStream<V> implements Stream<V> { private Flowable<V> impl; private final Flowable<V> impl; private RxJavaStream(final Flowable<V> impl) { this.impl = impl; } @Override public void subscribe(Subscriber<? super V> s) { public void subscribe(org.reactivestreams.Subscriber<? super V> s) { impl.subscribe(s); } @@ -219,6 +231,16 @@ } @Override public Stream<V> onErrorDo(final Consumer<Throwable> onError) { return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { onError.accept(t); } })); } @Override public Stream<V> onErrorResumeWith(final Function<Throwable, Publisher<V>, Exception> function) { return new RxJavaStream<>( impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, Publisher<? extends V>>() { @@ -259,7 +281,7 @@ } @Override public void subscribe(Subscriber<? super V> s) { public void subscribe(org.reactivestreams.Subscriber<? super V> s) { impl.toFlowable().subscribe(s); } @@ -279,7 +301,7 @@ } @Override public <O> Single<O> flatMap(final Function<V, Publisher<O>, Exception> function) { public <O> Single<O> flatMap(final Function<V, Single<O>, Exception> function) { return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function<V, SingleSource<O>>() { @Override public SingleSource<O> apply(V t) throws Exception { @@ -287,6 +309,17 @@ } })); } @Override public Single<V> onErrorResumeWith(final Function<Throwable, Single<V>, Exception> function) { return new RxJavaSingle<>( impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, SingleSource<V>>() { @Override public SingleSource<V> apply(Throwable error) throws Exception { return io.reactivex.Single.fromPublisher(function.apply(error)); } })); } } private static final class RxJavaCompletable implements Completable { @@ -303,7 +336,7 @@ } @Override public void subscribe(Subscriber<? super Void> s) { public void subscribe(org.reactivestreams.Subscriber<? super Void> s) { impl.<Void>toFlowable().subscribe(s); } } opendj-core/src/main/java/com/forgerock/reactive/Single.java
@@ -27,14 +27,14 @@ public interface Single<V> extends Publisher<V> { /** Emitter is used to notify when the operation has been completed, successfully or not. */ public interface Emitter<V> { public interface Subscriber<V> { /** * Signal a success value. * * @param t * the value, not null */ void onSuccess(V t); void onComplete(V t); /** * Signal an exception. @@ -46,13 +46,13 @@ } /** Adapts the streaming api to a callback one. */ public interface OnSubscribe<V> { public interface Emitter<V> { /** * Called for each SingleObserver that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void onSubscribe(Emitter<V> e) throws Exception; void subscribe(Subscriber<V> e) throws Exception; } /** @@ -82,7 +82,17 @@ * Function to apply to perform the asynchronous transformation * @return A new {@link Single} transforming the datum emitted by this {@link Single}. */ <O> Single<O> flatMap(Function<V, Publisher<O>, Exception> function); <O> Single<O> flatMap(Function<V, Single<O>, Exception> function); /** * When an error occurs in this stream, continue the processing with the new {@link Single} provided by the * function. * * @param function * Generates the single which must will used to resume operation when this {@link Single} failed. * @return A new {@link Single} */ Single<V> onErrorResumeWith(Function<Throwable, Single<V>, Exception> function); /** * Subscribe to the single value emitted by this {@link Single}. opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -75,6 +75,15 @@ Stream<V> onErrorResumeWith(Function<Throwable, Publisher<V>, Exception> function); /** * Invokes the on error {@link Consumer} when an error occurs on this stream. * * @param onError * The {@link Consumer} to invoke on error * @return a new {@link Stream} */ Stream<V> onErrorDo(Consumer<Throwable> onError); /** * Subscribe to this stream and drop all data produced by it. */ void subscribe(); opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
@@ -28,7 +28,7 @@ * {@code ByteSequenceReader} must be created using the associated * {@code ByteSequence}'s {@code asReader()} method. */ public final class ByteSequenceReader { public class ByteSequenceReader { /** The current position in the byte sequence. */ private int pos; @@ -55,7 +55,7 @@ * @param sequence * The byte sequence to be read. */ ByteSequenceReader(final ByteSequence sequence) { public ByteSequenceReader(final ByteSequence sequence) { this.sequence = sequence; } opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -23,6 +23,7 @@ import javax.net.ssl.SSLSession; import javax.security.sasl.SaslServer; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.ExtendedResult; /** @@ -36,17 +37,39 @@ /** Listens for disconnection event. */ public interface DisconnectListener { /** * Invoked when the connection has been closed as a result of a client disconnect, a fatal connection error, or * a server-side {@link #disconnect}. * Invoked when the connection has been disconnected because of an error (i.e: message too big). * * @param context * The {@link LDAPClientContext} which has failed * @param error * The error */ void exceptionOccurred(final LDAPClientContext context, final Throwable error); /** * Invoked when the client closes the connection, possibly using an unbind request. * * @param context * The {@link LDAPClientContext} which has been disconnected * @param unbindRequest * The unbind request, which may be {@code null} if one was not sent before the connection was * closed. */ void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest); /** * Invoked when the connection has been disconnected by the server. * * @param context * The {@link LDAPClientContext} which has been disconnected * @param resultCode * The {@link ResultCode} of the notification sent, or null * @param message * The message of the notification sent, or null * The result code which was included with the disconnect notification, or {@code null} if no * disconnect notification was sent. * @param diagnosticMessage * The diagnostic message, which may be empty or {@code null} indicating that none was provided. */ void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode, final String message); void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode, final String diagnosticMessage); } /** @@ -57,30 +80,24 @@ void onDisconnect(final DisconnectListener listener); /** * Disconnects the client without sending a disconnect notification. * <p> * <b>Server connections:</b> invoking this method causes * {@link ServerConnection#handleConnectionDisconnected * handleConnectionDisconnected} to be called before this method returns. * Disconnects the client without sending a disconnect notification. Invoking this method causes * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this * method returns. */ void disconnect(); /** * Disconnects the client and sends a disconnect notification, if possible, * containing the provided result code and diagnostic message. * <p> * <b>Server connections:</b> invoking this method causes * {@link ServerConnection#handleConnectionDisconnected * handleConnectionDisconnected} to be called before this method returns. * Disconnects the client and sends a disconnect notification, containing the provided result code and diagnostic * message. Invoking this method causes * {@link DisconnectListener#connectionDisconnected(LDAPClientContext, ResultCode, String)} to be called before this * method returns. * * @param resultCode * The result code which should be included with the disconnect * notification. * @param message * The diagnostic message, which may be empty or {@code null} * indicating that none was provided. * The result code to include with the disconnect notification * @param diagnosticMessage * The diagnostic message to include with the disconnect notification */ void disconnect(final ResultCode resultCode, final String message); void disconnect(final ResultCode resultCode, final String diagnosticMessage); /** * Returns the {@code InetSocketAddress} associated with the local system. opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -101,10 +101,22 @@ final ServerConnectionAdaptor<Integer> adapter = new ServerConnectionAdaptor<>(serverConnection); clientContext.onDisconnect(new DisconnectListener() { @Override public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String message) { serverConnection.handleConnectionDisconnected(resultCode, message); public void exceptionOccurred(final LDAPClientContext context, final Throwable error) { serverConnection.handleConnectionError(error); } @Override public void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest) { serverConnection.handleConnectionClosed(0, unbindRequest); } @Override public void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode, final String diagnosticMessage) { serverConnection.handleConnectionDisconnected(resultCode, diagnosticMessage); } }); return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() { @Override public Single<Stream<Response>> handle(final LDAPClientContext context, opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -38,6 +38,7 @@ import org.forgerock.util.Function; import org.forgerock.util.Options; import org.glassfish.grizzly.filterchain.FilterChain; import org.glassfish.grizzly.filterchain.FilterChainContext; import org.glassfish.grizzly.nio.transport.TCPNIOBindingHandler; import org.glassfish.grizzly.nio.transport.TCPNIOConnection; import org.glassfish.grizzly.nio.transport.TCPNIOServerConnection; @@ -66,7 +67,7 @@ * The addresses to listen on. * @param options * The LDAP listener options. * @param handler * @param requestHandlerFactory * The server connection factory which will be used to create server connections. * @throws IOException * If an error occurred while trying to listen on the provided address. @@ -74,8 +75,8 @@ public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses, final Options options, final Function<LDAPClientContext, ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>, LdapException> handler) throws IOException { this(addresses, handler, options, null); LdapException> requestHandlerFactory) throws IOException { this(addresses, requestHandlerFactory, options, null); } /** @@ -84,7 +85,7 @@ * * @param addresses * The addresses to listen on. * @param handler * @param requestHandlerFactory * The server connection factory which will be used to create server connections. * @param options * The LDAP listener options. @@ -97,14 +98,20 @@ public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses, final Function<LDAPClientContext, ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>, LdapException> handler, LdapException> requestHandlerFactory, final Options options, TCPNIOTransport transport) throws IOException { this.transport = DEFAULT_TRANSPORT.acquireIfNull(transport); this.options = Options.copyOf(options); final LDAPServerFilter serverFilter = new LDAPServerFilter(handler, options, final LDAPServerFilter serverFilter = new LDAPServerFilter(requestHandlerFactory, options, options.get(LDAP_DECODE_OPTIONS), options.get(MAX_CONCURRENT_REQUESTS)); final FilterChain ldapChain = GrizzlyUtils.buildFilterChain(this.transport.get().getProcessor(), new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)), serverFilter); new LdapCodec(options.get(REQUEST_MAX_SIZE_IN_BYTES), options.get(LDAP_DECODE_OPTIONS)) { @Override protected void onLdapCodecError(FilterChainContext ctx, Throwable error) { serverFilter.exceptionOccurred(ctx, error); } }, serverFilter); final TCPNIOBindingHandler bindingHandler = TCPNIOBindingHandler.builder(this.transport.get()) .processor(ldapChain).build(); this.serverConnections = new ArrayList<>(addresses.size()); opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -34,14 +34,14 @@ import javax.security.sasl.SaslServer; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.io.AbstractLDAPMessageHandler; import org.forgerock.opendj.io.LDAP; import org.forgerock.opendj.ldap.DecodeException; import org.forgerock.opendj.ldap.DecodeOptions; import org.forgerock.opendj.ldap.LDAPClientContext; import org.forgerock.opendj.ldap.LdapException; import org.forgerock.opendj.ldap.ResultCode; import org.forgerock.opendj.ldap.controls.Control; import org.forgerock.opendj.ldap.responses.BindResult; import org.forgerock.opendj.ldap.responses.CompareResult; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.ExtendedResult; import org.forgerock.opendj.ldap.responses.IntermediateResponse; import org.forgerock.opendj.ldap.responses.Response; @@ -55,8 +55,6 @@ import org.forgerock.util.Function; import org.forgerock.util.Options; import org.forgerock.util.Reject; import org.glassfish.grizzly.CloseReason; import org.glassfish.grizzly.CompletionHandler; import org.glassfish.grizzly.Connection; import org.glassfish.grizzly.Grizzly; import org.glassfish.grizzly.attributes.Attribute; @@ -72,8 +70,9 @@ import org.reactivestreams.Subscription; import com.forgerock.reactive.Completable; import com.forgerock.reactive.Completable.Emitter; import com.forgerock.reactive.Consumer; import com.forgerock.reactive.ReactiveHandler; import com.forgerock.reactive.Single; import com.forgerock.reactive.Stream; import io.reactivex.internal.util.BackpressureHelper; @@ -84,12 +83,14 @@ */ final class LDAPServerFilter extends BaseFilter { private static final Object DUMMY = new byte[0]; private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER .createAttribute("LDAPServerConnection"); private final Function<LDAPClientContext, ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>, LdapException> connectionHandler; LdapException> connectionHandlerFactory; /** * Map of cipher phrases to effective key size (bits). Taken from the @@ -132,11 +133,11 @@ * The maximum BER element size, or <code>0</code> to indicate that there is no limit. */ LDAPServerFilter( Function<LDAPClientContext, ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>, LdapException> connectionHandler, final Function<LDAPClientContext, ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>, LdapException> connectionHandlerFactory, final Options connectionOptions, final DecodeOptions options, final int maxPendingRequests) { this.connectionHandler = connectionHandler; this.connectionHandlerFactory = connectionHandlerFactory; this.connectionOptions = connectionOptions; this.maxConcurrentRequests = maxPendingRequests; } @@ -152,98 +153,86 @@ configureConnection(connection, logger, connectionOptions); final ClientConnectionImpl clientContext = new ClientConnectionImpl(connection); LDAP_CONNECTION_ATTR.set(connection, clientContext); final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> handler = connectionHandler .apply(clientContext); final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler = connectionHandlerFactory.apply(clientContext); streamFromPublisher(clientContext).flatMap(new Function<LdapRawMessage, Publisher<Integer>, Exception>() { @Override public Publisher<Integer> apply(final LdapRawMessage rawRequest) throws Exception { return handler .handle(clientContext, rawRequest) .flatMap(new Function<Stream<Response>, Publisher<Integer>, Exception>() { @Override public Publisher<Integer> apply(final Stream<Response> response) { return clientContext.write(response.onErrorResumeWith(toErrorMessage(rawRequest)) .map(toResponseMessage(rawRequest))).toSingle(1); } }); } }, maxConcurrentRequests).subscribe(); clientContext .read() .flatMap(new Function<LdapRawMessage, Publisher<Object>, Exception>() { @Override public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception { if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) { clientContext.notifyAndClose(rawRequest); return singleFrom(DUMMY); } Single<Stream<Response>> response; try { response = requestHandler.handle(clientContext, rawRequest); } catch (Exception e) { response = singleError(e); } return response .flatMap(new Function<Stream<Response>, Single<Object>, Exception>() { @Override public Single<Object> apply(final Stream<Response> response) { return clientContext.write(response.map(toLdapResponseMessage(rawRequest))) .toSingle(DUMMY); } }) .onErrorResumeWith(new Function<Throwable, Single<Object>, Exception>() { @Override public Single<Object> apply(final Throwable error) throws Exception { if (!(error instanceof LdapException)) { // Unexpected error, propagate it. return singleError(error); } final LdapException exception = (LdapException) error; return clientContext .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult()))) .toSingle(DUMMY); } }); } }, maxConcurrentRequests) .onErrorDo(new Consumer<Throwable>() { @Override public void accept(final Throwable error) throws Exception { clientContext.notifyAndCloseSilently(error); } }) .subscribe(); return ctx.getStopAction(); } private Function<Throwable, Publisher<Response>, Exception> toErrorMessage(final LdapRawMessage rawRequest) { return new Function<Throwable, Publisher<Response>, Exception>() { @Override public Publisher<Response> apply(Throwable error) throws Exception { if (!(error instanceof LdapException)) { // Propagate error return streamError(error); } final Result result = ((LdapException) error).getResult(); switch (rawRequest.getMessageType()) { case OP_TYPE_BIND_REQUEST: if (result instanceof BindResult) { return streamFrom((Response) result); } return streamFrom((Response) populateNewResultFromResult( Responses.newBindResult(result.getResultCode()), result)); case OP_TYPE_COMPARE_REQUEST: if (result instanceof CompareResult) { return streamFrom((Response) result); } return streamFrom((Response) populateNewResultFromResult( Responses.newCompareResult(result.getResultCode()), result)); case OP_TYPE_EXTENDED_REQUEST: if (result instanceof ExtendedResult) { return streamFrom((Response) result); } return streamFrom((Response) populateNewResultFromResult( Responses.newGenericExtendedResult(result.getResultCode()), result)); default: return streamFrom((Response) result); } } }; } private Result populateNewResultFromResult(final Result newResult, final Result result) { newResult.setDiagnosticMessage(result.getDiagnosticMessage()); newResult.setMatchedDN(result.getMatchedDN()); newResult.setCause(result.getCause()); for (final Control control : result.getControls()) { newResult.addControl(control); private final LdapResponseMessage toLdapResponseMessage(final LdapRawMessage rawRequest, final Result result) { switch (rawRequest.getMessageType()) { case OP_TYPE_ADD_REQUEST: return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), result); case OP_TYPE_BIND_REQUEST: return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), result); case OP_TYPE_COMPARE_REQUEST: return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), result); case OP_TYPE_DELETE_REQUEST: return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), result); case OP_TYPE_EXTENDED_REQUEST: return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), result); case OP_TYPE_MODIFY_DN_REQUEST: return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), result); case OP_TYPE_MODIFY_REQUEST: return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), result); case OP_TYPE_SEARCH_REQUEST: return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), result); default: throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType()); } return newResult; } private Function<Response, LdapResponseMessage, Exception> toResponseMessage(final LdapRawMessage rawRequest) { private Function<Response, LdapResponseMessage, Exception> toLdapResponseMessage(final LdapRawMessage rawRequest) { return new Function<Response, LdapResponseMessage, Exception>() { @Override public LdapResponseMessage apply(final Response response) { if (response instanceof Result) { switch (rawRequest.getMessageType()) { case OP_TYPE_ADD_REQUEST: return newResponseMessage(OP_TYPE_ADD_RESPONSE, rawRequest.getMessageId(), response); case OP_TYPE_BIND_REQUEST: return newResponseMessage(OP_TYPE_BIND_RESPONSE, rawRequest.getMessageId(), response); case OP_TYPE_COMPARE_REQUEST: return newResponseMessage(OP_TYPE_COMPARE_RESPONSE, rawRequest.getMessageId(), response); case OP_TYPE_DELETE_REQUEST: return newResponseMessage(OP_TYPE_DELETE_RESPONSE, rawRequest.getMessageId(), response); case OP_TYPE_EXTENDED_REQUEST: return newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, rawRequest.getMessageId(), response); case OP_TYPE_MODIFY_DN_REQUEST: return newResponseMessage(OP_TYPE_MODIFY_DN_RESPONSE, rawRequest.getMessageId(), response); case OP_TYPE_MODIFY_REQUEST: return newResponseMessage(OP_TYPE_MODIFY_RESPONSE, rawRequest.getMessageId(), response); case OP_TYPE_SEARCH_REQUEST: return newResponseMessage(OP_TYPE_SEARCH_RESULT_DONE, rawRequest.getMessageId(), response); default: throw new IllegalArgumentException("Unknown request: " + rawRequest.getMessageType()); } return toLdapResponseMessage(rawRequest, (Result) response); } if (response instanceof IntermediateResponse) { return newResponseMessage(OP_TYPE_INTERMEDIATE_RESPONSE, rawRequest.getMessageId(), response); @@ -274,7 +263,7 @@ return ctx.getStopAction(); } final class ClientConnectionImpl implements LDAPClientContext, Publisher<LdapRawMessage> { final class ClientConnectionImpl implements LDAPClientContext { final class GrizzlyBackpressureSubscription implements Subscription { private final AtomicLong pendingRequests = new AtomicLong(); @@ -339,38 +328,24 @@ private ClientConnectionImpl(final Connection<?> connection) { this.connection = connection; connection.closeFuture().addCompletionHandler(new CompletionHandler<CloseReason>() { @Override public void completed(CloseReason result) { disconnect0(null, null); } } Stream<LdapRawMessage> read() { return streamFromPublisher(new Publisher<LdapRawMessage>() { @Override public void updated(CloseReason result) { } @Override public void failed(Throwable throwable) { } @Override public void cancelled() { public void subscribe(final Subscriber<? super LdapRawMessage> subscriber) { if (upstream != null) { return; } upstream = new GrizzlyBackpressureSubscription(subscriber); } }); } @Override public void subscribe(final Subscriber<? super LdapRawMessage> s) { if (upstream != null) { return; } this.upstream = new GrizzlyBackpressureSubscription(s); } Completable write(final Publisher<LdapResponseMessage> messages) { return newCompletable(new Completable.OnSubscribe() { return newCompletable(new Completable.Emitter() { @Override public void onSubscribe(Emitter e) { public void subscribe(Completable.Subscriber e) { messages.subscribe(new LdapResponseMessageWriter(connection, e)); } }); @@ -378,9 +353,9 @@ @Override public void enableTLS(final SSLEngine sslEngine) { Reject.ifNull(sslEngine); Reject.ifNull(sslEngine, "sslEngine must not be null"); synchronized (this) { if (isTLSEnabled()) { if (isFilterExists(SSLFilter.class)) { throw new IllegalStateException("TLS already enabled"); } SSLUtils.setSSLEngine(connection, sslEngine); @@ -390,8 +365,14 @@ @Override public void enableSASL(final SaslServer saslServer) { SaslUtils.setSaslServer(connection, saslServer); installFilter(new SaslFilter()); Reject.ifNull(saslServer, "saslServer must not be null"); synchronized (this) { if (isFilterExists(SaslFilter.class)) { throw new IllegalStateException("Sasl already enabled"); } SaslUtils.setSaslServer(connection, saslServer); installFilter(new SaslFilter()); } } @Override @@ -474,16 +455,11 @@ GrizzlyUtils.addFilterToConnection(filter, connection); } /** * Indicates whether TLS is enabled this provided connection. * * @return {@code true} if TLS is enabled on this connection, otherwise {@code false}. */ private boolean isTLSEnabled() { private boolean isFilterExists(Class<?> filterKlass) { synchronized (this) { final FilterChain currentFilterChain = (FilterChain) connection.getProcessor(); for (final Filter filter : currentFilterChain) { if (filter instanceof SSLFilter) { if (filterKlass.isAssignableFrom(filter.getClass())) { return true; } } @@ -493,35 +469,97 @@ @Override public void onDisconnect(DisconnectListener listener) { Reject.ifNull(listener, "listener must not be null"); listeners.add(listener); } @Override public void disconnect() { disconnect0(null, null); if (isClosed.compareAndSet(false, true)) { try { for (DisconnectListener listener : listeners) { listener.connectionDisconnected(this, null, null); } } finally { closeConnection(); } } } private void closeConnection() { if (upstream != null) { upstream.cancel(); upstream = null; } connection.closeSilently(); } @Override public void disconnect(final ResultCode resultCode, final String message) { sendUnsolicitedNotification( Responses.newGenericExtendedResult(resultCode) .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION) .setDiagnosticMessage(message)); disconnect0(resultCode, message); public void disconnect(final ResultCode resultCode, final String diagnosticMessage) { // Close this connection context. if (isClosed.compareAndSet(false, true)) { sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode) .setOID(LDAP.OID_NOTICE_OF_DISCONNECTION) .setDiagnosticMessage(diagnosticMessage)); try { for (DisconnectListener listener : listeners) { listener.connectionDisconnected(this, resultCode, diagnosticMessage); } } finally { closeConnection(); } } } private void disconnect0(final ResultCode resultCode, final String message) { private void notifyAndClose(final LdapRawMessage unbindRequest) { // Close this connection context. if (isClosed.compareAndSet(false, true)) { if (unbindRequest == null) { doNotifySilentlyConnectionClosed(null); } else { try { LDAP.getReader(unbindRequest.getContent(), new DecodeOptions()) .readMessage(new AbstractLDAPMessageHandler() { @Override public void unbindRequest(int messageID, UnbindRequest unbindRequest) throws DecodeException, IOException { doNotifySilentlyConnectionClosed(unbindRequest); } }); } catch (Exception e) { doNotifySilentlyConnectionClosed(null); } } } } private void doNotifySilentlyConnectionClosed(final UnbindRequest unbindRequest) { try { for (final DisconnectListener listener : listeners) { try { listener.connectionClosed(this, unbindRequest); } catch (Exception e) { // TODO: Log as a warning ? } } } finally { closeConnection(); } } private void notifyAndCloseSilently(final Throwable error) { // Close this connection context. if (isClosed.compareAndSet(false, true)) { try { // Notify the server connection: it may be null if disconnect is // invoked during accept. for (DisconnectListener listener : listeners) { listener.connectionDisconnected(this, resultCode, message); for (final DisconnectListener listener : listeners) { try { listener.exceptionOccurred(this, error); } catch (Exception e) { // TODO: Log as a warning ? } } } finally { // Close the connection. connection.closeSilently(); closeConnection(); } } } opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -36,7 +36,7 @@ import org.glassfish.grizzly.filterchain.FilterChainContext; import org.glassfish.grizzly.filterchain.NextAction; final class LdapCodec extends LDAPBaseFilter { abstract class LdapCodec extends LDAPBaseFilter { LdapCodec(final int maxElementSize, final DecodeOptions decodeOptions) { super(decodeOptions, maxElementSize); @@ -44,15 +44,29 @@ @Override public NextAction handleRead(final FilterChainContext ctx) throws IOException { final Buffer buffer = ctx.getMessage(); final LdapRawMessage message = readMessage(buffer); if (message != null) { ctx.setMessage(message); return ctx.getInvokeAction(getRemainingBuffer(buffer)); try { final Buffer buffer = ctx.getMessage(); final LdapRawMessage message; message = readMessage(buffer); if (message != null) { ctx.setMessage(message); return ctx.getInvokeAction(getRemainingBuffer(buffer)); } return ctx.getStopAction(getRemainingBuffer(buffer)); } catch (Exception e) { onLdapCodecError(ctx, e); // make the connection deaf to any following input // onLdapDecodeError call will take care of error processing // and closing the connection final NextAction suspendAction = ctx.getSuspendAction(); ctx.completeAndRecycle(); return suspendAction; } return ctx.getStopAction(getRemainingBuffer(buffer)); } protected abstract void onLdapCodecError(FilterChainContext ctx, Throwable error); private LdapRawMessage readMessage(final Buffer buffer) throws IOException { try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) { final int packetStart = buffer.position(); @@ -123,10 +137,18 @@ try { final Buffer buffer = toBuffer(writer, ctx.<LdapResponseMessage> getMessage()); ctx.setMessage(buffer); return ctx.getInvokeAction(); } catch (Exception e) { onLdapCodecError(ctx, e); // make the connection deaf to any following input // onLdapDecodeError call will take care of error processing // and closing the connection final NextAction suspendAction = ctx.getSuspendAction(); ctx.completeAndRecycle(); return suspendAction; } finally { GrizzlyUtils.recycleWriter(writer); } return ctx.getInvokeAction(); } private Buffer toBuffer(final LDAPWriter<ASN1BufferWriter> writer, final LdapResponseMessage message) opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -26,10 +26,10 @@ final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> { private final Connection<?> connection; private final Completable.Emitter completable; private final Completable.Subscriber completable; private Subscription upstream; LdapResponseMessageWriter(final Connection<?> connection, final Completable.Emitter completable) { LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) { this.connection = connection; this.completable = completable; } @@ -72,6 +72,6 @@ @Override public void onComplete() { completable.complete(); completable.onComplete(); } } opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -610,7 +610,7 @@ * @throws Exception * If an unexpected error occurred. */ @Test(enabled = false) @Test public void testMaxRequestSize() throws Exception { final MockServerConnection serverConnection = new MockServerConnection(); final MockServerConnectionFactory factory = opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
@@ -171,58 +171,58 @@ public Single<Stream<Response>> filter(final LDAPClientConnection2 context, final LdapRawMessage encodedRequestMessage, final ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> next) throws Exception { return newSingle(new Single.OnSubscribe<Request>() { return newSingle(new Single.Emitter<Request>() { @Override public void onSubscribe(final Single.Emitter<Request> emitter) throws Exception { public void subscribe(final Single.Subscriber<Request> subscriber) throws Exception { LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions) .readMessage(new AbstractLDAPMessageHandler() { @Override public void abandonRequest(final int messageID, final AbandonRequest request) throws DecodeException, IOException { emitter.onSuccess(request); subscriber.onComplete(request); } @Override public void addRequest(int messageID, AddRequest request) throws DecodeException, IOException { emitter.onSuccess(request); subscriber.onComplete(request); } @Override public void bindRequest(int messageID, int version, GenericBindRequest request) throws DecodeException, IOException { emitter.onSuccess(request); subscriber.onComplete(request); } @Override public void modifyDNRequest(int messageID, ModifyDNRequest request) throws DecodeException, IOException { emitter.onSuccess(request); subscriber.onComplete(request); } @Override public void modifyRequest(int messageID, ModifyRequest request) throws DecodeException, IOException { emitter.onSuccess(request); subscriber.onComplete(request); } @Override public void searchRequest(int messageID, SearchRequest request) throws DecodeException, IOException { emitter.onSuccess(request); subscriber.onComplete(request); } @Override public void unbindRequest(int messageID, UnbindRequest request) throws DecodeException, IOException { emitter.onSuccess(request); subscriber.onComplete(request); } }); } }).flatMap(new Function<Request, Publisher<Stream<Response>>, Exception>() { }).flatMap(new Function<Request, Single<Stream<Response>>, Exception>() { @Override public Publisher<Stream<Response>> apply(Request t) throws Exception { return next.handle(context, t); public Single<Stream<Response>> apply(final Request request) throws Exception { return next.handle(context, request); } }); } opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -711,6 +711,10 @@ logger.traceException(e); } } else { clientContext.disconnect(); } // NYI -- Deregister the client connection from any server components that // might know about it. opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -747,8 +747,8 @@ LdapException>() { @Override public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(LDAPClientContext value) throws LdapException { final LDAPClientConnection2 conn = canAccept(value); apply(LDAPClientContext clientContext) throws LdapException { final LDAPClientConnection2 conn = canAccept(clientContext); return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() { @Override public Single<Stream<Response>> handle(LDAPClientContext context,