OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
Removed Single from ReactiveHandler/ReactiveFilter, use Stream directly
| | |
| | | */ |
| | | package com.forgerock.reactive; |
| | | |
| | | import org.forgerock.util.Function; |
| | | import org.reactivestreams.Publisher; |
| | | |
| | | /** {@link Completable} is used to communicates a terminated operation which doesn't produce a result. */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * When an error occurs in this completable, continue the processing with the new {@link Completable} provided by |
| | | * the function. |
| | | * |
| | | * @param function |
| | | * Generates the stream which must will used to resume operation when this {@link Completable} failed. |
| | | * @return A new {@link Completable} |
| | | */ |
| | | Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function); |
| | | |
| | | /** |
| | | * Creates a {@link Single} which will emit the specified value when this {@link Completable} complete. |
| | | * |
| | | * @param <V> |
| | |
| | | final ReactiveFilter<C, I1, O1, I2, O2> parent = this; |
| | | return new ReactiveHandler<C, I1, O1>() { |
| | | @Override |
| | | public Single<O1> handle(final C context, final I1 request) throws Exception { |
| | | public O1 handle(final C context, final I1 request) throws Exception { |
| | | return parent.filter(context, request, handler); |
| | | } |
| | | }; |
| | |
| | | * @throws Exception |
| | | * If the operation cannot be done |
| | | */ |
| | | public abstract Single<O1> filter(final C context, final I1 request, final ReactiveHandler<C, I2, O2> next) |
| | | public abstract O1 filter(final C context, final I1 request, final ReactiveHandler<C, I2, O2> next) |
| | | throws Exception; |
| | | |
| | | private static final class ConcatenatedFilter<C, I1, O1, I2, O2> extends ReactiveFilter<C, I1, O1, I2, O2> { |
| | |
| | | } |
| | | |
| | | @Override |
| | | public Single<O1> filter(C context, I1 request, ReactiveHandler<C, I2, O2> handler) throws Exception { |
| | | public O1 filter(C context, I1 request, ReactiveHandler<C, I2, O2> handler) throws Exception { |
| | | return converter.apply(handler).handle(context, request); |
| | | } |
| | | } |
| | |
| | | * @throws Exception |
| | | * if the request cannot be processed |
| | | */ |
| | | Single<REP> handle(final CTX context, final REQ request) throws Exception; |
| | | REP handle(final CTX context, final REQ request) throws Exception; |
| | | } |
| | |
| | | |
| | | import io.reactivex.CompletableEmitter; |
| | | import io.reactivex.CompletableOnSubscribe; |
| | | import io.reactivex.CompletableSource; |
| | | import io.reactivex.Flowable; |
| | | import io.reactivex.SingleEmitter; |
| | | import io.reactivex.SingleOnSubscribe; |
| | |
| | | })); |
| | | } |
| | | |
| | | /** |
| | | * Create a new {@link Completable} from the given error. |
| | | * |
| | | * @param error |
| | | * The error emitted by this {@link Completable} |
| | | * @return A new {@link Completable} |
| | | */ |
| | | public static Completable completableError(final Throwable error) { |
| | | return new RxJavaCompletable(io.reactivex.Completable.error(error)); |
| | | } |
| | | |
| | | private static final class RxJavaStream<V> implements Stream<V> { |
| | | |
| | | private final Flowable<V> impl; |
| | |
| | | public void subscribe(org.reactivestreams.Subscriber<? super Void> s) { |
| | | impl.<Void>toFlowable().subscribe(s); |
| | | } |
| | | |
| | | @Override |
| | | public Completable onErrorResumeWith(final Function<Throwable, Completable, Exception> function) { |
| | | return new RxJavaCompletable( |
| | | impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, CompletableSource>() { |
| | | @Override |
| | | public CompletableSource apply(Throwable error) throws Exception { |
| | | return io.reactivex.Completable.fromPublisher(function.apply(error)); |
| | | } |
| | | })); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | package com.forgerock.opendj.grizzly; |
| | | |
| | | import static com.forgerock.reactive.RxJavaStreams.*; |
| | | import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.InetSocketAddress; |
| | |
| | | import org.forgerock.util.Options; |
| | | |
| | | import com.forgerock.reactive.ReactiveHandler; |
| | | import com.forgerock.reactive.Single; |
| | | import com.forgerock.reactive.Stream; |
| | | |
| | | import io.reactivex.BackpressureStrategy; |
| | |
| | | final DecodeOptions decodeOptions = options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS); |
| | | return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() { |
| | | @Override |
| | | public Single<Stream<Response>> handle(final LDAPClientContext context, |
| | | public Stream<Response> handle(final LDAPClientContext context, |
| | | final LdapRawMessage rawRequest) throws Exception { |
| | | final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(), decodeOptions); |
| | | return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | return streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(final FlowableEmitter<Response> emitter) throws Exception { |
| | | reader.readMessage(new AbstractLDAPMessageHandler() { |
| | |
| | | }); |
| | | emitter.onComplete(); |
| | | } |
| | | }, BackpressureStrategy.ERROR))); |
| | | }, BackpressureStrategy.ERROR)); |
| | | } |
| | | }; |
| | | } |
| | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | /** Initial size of newly created buffers. */ |
| | | private static final int BUFFER_INIT_SIZE = 4096; |
| | | private static final int BUFFER_INIT_SIZE = 1024; |
| | | /** Default maximum size for cached protocol/entry encoding buffers. */ |
| | | private static final int DEFAULT_MAX_INTERNAL_BUFFER_SIZE = 32 * 1024; |
| | | |
| | |
| | | import com.forgerock.reactive.Action; |
| | | import com.forgerock.reactive.Completable; |
| | | import com.forgerock.reactive.ReactiveHandler; |
| | | import com.forgerock.reactive.Single; |
| | | import com.forgerock.reactive.Stream; |
| | | |
| | | import io.reactivex.internal.util.BackpressureHelper; |
| | |
| | | */ |
| | | final class LDAPServerFilter extends BaseFilter { |
| | | |
| | | private static final Object DUMMY = new byte[0]; |
| | | |
| | | private static final Attribute<ClientConnectionImpl> LDAP_CONNECTION_ATTR = Grizzly.DEFAULT_ATTRIBUTE_BUILDER |
| | | .createAttribute("LDAPServerConnection"); |
| | | |
| | |
| | | final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler = |
| | | connectionHandlerFactory.apply(clientContext); |
| | | |
| | | clientContext |
| | | .read() |
| | | .flatMap(new Function<LdapRawMessage, Publisher<Object>, Exception>() { |
| | | @Override |
| | | public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception { |
| | | if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) { |
| | | clientContext.notifyConnectionClosed(rawRequest); |
| | | return singleFrom(DUMMY); |
| | | } |
| | | Single<Stream<Response>> response; |
| | | try { |
| | | response = requestHandler.handle(clientContext, rawRequest); |
| | | } catch (Exception e) { |
| | | response = singleError(e); |
| | | } |
| | | return response |
| | | .flatMap(new Function<Stream<Response>, Single<Object>, Exception>() { |
| | | @Override |
| | | public Single<Object> apply(final Stream<Response> response) { |
| | | return clientContext.write(response.map(toLdapResponseMessage(rawRequest))) |
| | | .toSingle(DUMMY); |
| | | clientContext.read().flatMap(new Function<LdapRawMessage, Publisher<Void>, Exception>() { |
| | | @Override |
| | | public Publisher<Void> apply(final LdapRawMessage rawRequest) throws Exception { |
| | | if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) { |
| | | clientContext.notifyConnectionClosed(rawRequest); |
| | | return emptyStream(); |
| | | } |
| | | Stream<Response> response; |
| | | try { |
| | | response = requestHandler.handle(clientContext, rawRequest); |
| | | } catch (Exception e) { |
| | | response = streamError(e); |
| | | } |
| | | return clientContext |
| | | .write(response.map(toLdapResponseMessage(rawRequest))) |
| | | .onErrorResumeWith(new Function<Throwable, Completable, Exception>() { |
| | | @Override |
| | | public Completable apply(final Throwable error) throws Exception { |
| | | if (!(error instanceof LdapException)) { |
| | | // Unexpected error, propagate it. |
| | | return completableError(error); |
| | | } |
| | | }) |
| | | .onErrorResumeWith(new Function<Throwable, Single<Object>, Exception>() { |
| | | @Override |
| | | public Single<Object> apply(final Throwable error) throws Exception { |
| | | if (!(error instanceof LdapException)) { |
| | | // Unexpected error, propagate it. |
| | | return singleError(error); |
| | | } |
| | | final LdapException exception = (LdapException) error; |
| | | return clientContext |
| | | .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult()))) |
| | | .toSingle(DUMMY); |
| | | } |
| | | }); |
| | | } |
| | | }, maxConcurrentRequests) |
| | | .onErrorResumeWith(new Function<Throwable, Publisher<Object>, Exception>() { |
| | | @Override |
| | | public Publisher<Object> apply(Throwable error) throws Exception { |
| | | clientContext.notifyErrorAndCloseSilently(error); |
| | | // Swallow the error to prevent the subscribe() below to report it on the console. |
| | | return streamFrom(DUMMY); |
| | | } |
| | | }) |
| | | .onCompleteDo(new Action() { |
| | | @Override |
| | | public void run() throws Exception { |
| | | clientContext.notifyConnectionClosed(null); |
| | | } |
| | | }) |
| | | .subscribe(); |
| | | final LdapException exception = (LdapException) error; |
| | | return clientContext |
| | | .write(singleFrom(toLdapResponseMessage(rawRequest, exception.getResult()))); |
| | | } |
| | | }); |
| | | } |
| | | }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() { |
| | | @Override |
| | | public Publisher<Void> apply(Throwable error) throws Exception { |
| | | clientContext.notifyErrorAndCloseSilently(error); |
| | | // Swallow the error to prevent the subscribe() below to report it on the console. |
| | | return emptyStream(); |
| | | } |
| | | }).onCompleteDo(new Action() { |
| | | @Override |
| | | public void run() throws Exception { |
| | | clientContext.notifyConnectionClosed(null); |
| | | } |
| | | }).subscribe(); |
| | | return ctx.getStopAction(); |
| | | } |
| | | |
| | |
| | | */ |
| | | package org.forgerock.opendj.reactive; |
| | | |
| | | import static com.forgerock.reactive.RxJavaStreams.*; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE; |
| | | import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher; |
| | | import static org.forgerock.opendj.io.LDAP.*; |
| | | import static org.opends.messages.CoreMessages.*; |
| | | import static org.opends.messages.ProtocolMessages.*; |
| | | import static org.opends.server.loggers.AccessLogger.logDisconnect; |
| | | import static org.opends.server.protocols.ldap.LDAPConstants.*; |
| | | import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | |
| | | |
| | | import com.forgerock.reactive.Consumer; |
| | | import com.forgerock.reactive.ReactiveHandler; |
| | | import com.forgerock.reactive.Single; |
| | | import com.forgerock.reactive.Stream; |
| | | |
| | | import io.reactivex.BackpressureStrategy; |
| | |
| | | * a fatal error and the client has been disconnected as a result, or if the client unbound from the server. |
| | | */ |
| | | @Override |
| | | public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) { |
| | | return singleFrom(streamFromPublisher( |
| | | public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) { |
| | | return streamFromPublisher( |
| | | new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | |
| | | toLdapResponseType(message, response), message.getMessageId()); |
| | | } |
| | | } |
| | | })); |
| | | }); |
| | | } |
| | | |
| | | private final byte toLdapResultType(final byte requestType) { |
| | |
| | | |
| | | BlockingBackpressureSubscription(final Publisher<Response> upstream) { |
| | | this.upstream = upstream; |
| | | this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0 |
| | | ? 30000 // Do not wait indefinitely, |
| | | this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0 |
| | | ? 30000 // Do not wait indefinitely, |
| | | : connectionHandler.getMaxBlockedWriteTimeLimit(); |
| | | } |
| | | |
| | |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.reactive.ReactiveHandler; |
| | | import com.forgerock.reactive.Single; |
| | | import com.forgerock.reactive.Stream; |
| | | |
| | | /** |
| | |
| | | }); |
| | | return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() { |
| | | @Override |
| | | public Single<Stream<Response>> handle(LDAPClientContext context, LdapRawMessage request) |
| | | public Stream<Response> handle(LDAPClientContext context, LdapRawMessage request) |
| | | throws Exception { |
| | | return conn.handle(queueingStrategy, request); |
| | | } |