/* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2016 ForgeRock AS. */ package org.forgerock.opendj.reactive; import static com.forgerock.reactive.RxJavaStreams.*; import static org.forgerock.util.Reject.checkNotNull; import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import org.forgerock.opendj.io.AbstractLDAPMessageHandler; import org.forgerock.opendj.io.LDAP; import org.forgerock.opendj.ldap.DecodeException; import org.forgerock.opendj.ldap.DecodeOptions; import org.forgerock.opendj.ldap.LDAPConnectionFactory; import org.forgerock.opendj.ldap.requests.AbandonRequest; import org.forgerock.opendj.ldap.requests.AddRequest; import org.forgerock.opendj.ldap.requests.GenericBindRequest; import org.forgerock.opendj.ldap.requests.ModifyDNRequest; import org.forgerock.opendj.ldap.requests.ModifyRequest; import org.forgerock.opendj.ldap.requests.Request; import org.forgerock.opendj.ldap.requests.SearchRequest; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.Response; import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage; import org.forgerock.util.Function; import org.reactivestreams.Publisher; import com.forgerock.reactive.ReactiveFilter; import com.forgerock.reactive.ReactiveFilter.SimpleReactiveFilter; import com.forgerock.reactive.ReactiveHandler; import com.forgerock.reactive.RxJavaStreams; import com.forgerock.reactive.Single; import com.forgerock.reactive.Stream; import io.reactivex.Flowable; import io.reactivex.Scheduler; import io.reactivex.schedulers.Schedulers; /** Maintains a set of standard {@link ReactiveHandler} / {@link ReactiveFilter} which can be used in ldap endpoint. */ public final class Components { /** * Routes incoming request to dedicated handler. * * @param * Context type in which request are processed * @param * Type of routed request * @param * Type of routed response * @param routes * {@link Route} where request will be forwarded to * @param defaultRoute * If no route can be applied for a specific request, it'll be forwarded to the defaultRoute * @return A new {@link ReactiveHandler} routing incoming requests */ public static ReactiveHandler routeTo(final Iterable> routes, final ReactiveHandler defaultRoute) { /** Routes requests. */ final class RouterHandler implements ReactiveHandler { @Override public Single handle(final CTX context, final REQ request) throws Exception { for (final Route route : routes) { if (route.predicate.matches(request, context)) { return route.handler.handle(context, request); } } return defaultRoute.handle(context, request); } } return new RouterHandler(); } /** * Allows to transform all aspects of a {@link Request}. * * @param * Context type in which request are processed * @param requestTransformer * Function in charge of performing the {@link Request} transformation * @param responseTransformer * Function in charge of performing the {@link Response} transformation * @return A new policy performing the {@link Request} and {@link Response} transformation. */ public static SimpleReactiveFilter> transform( final Function requestTransformer, final Function responseTransformer) { /** Transforms {@link Request} and {@link Response}. */ final class TransformFilter extends SimpleReactiveFilter> { @Override public Single> filter(final CTX context, final Request request, final ReactiveHandler> next) throws Exception { return next.handle(context, requestTransformer.apply(request)) .map(new Function, Stream, Exception>() { @Override public Stream apply(Stream responses) throws Exception { return responses.map(new Function() { @Override public Response apply(Response value) throws Exception { return responseTransformer.apply(value); } }); } }); } } return new TransformFilter(); } /** * Forward {@link Request} to the provided {@link LDAPConnectionFactory}. * * @param connectionFactory * The {@link LDAPConnectionFactory} used to send the forwarded {@link Request} * @return a {@link ReactiveHandler} Forwarding {@link Request} to the {@link LDAPConnectionFactory} */ public static ReactiveHandler> proxyTo( LDAPConnectionFactory connectionFactory) { return new ProxyToHandler(connectionFactory); } /** * {@link ReactiveHandler} responding with the provided Response for all incoming requests. * * @param * Context type in which request are processed * @param response * The {@link Response} to send as response for all {@link Request} * @return a {@link ReactiveHandler} replying {@link Response} for all {@link Request} */ public static ReactiveHandler> respondWith(final Response response) { return new ReactiveHandler>() { @Override public Single> handle(final CTX context, final Request request) { if (request instanceof UnbindRequest) { return singleFrom(RxJavaStreams.emptyStream()); } return singleFrom(streamFrom(response)); } }; } /** * Decodes incoming {@link LdapRawMessage} into a {@link Request}. * * @param decodeOptions * {@link DecodeOptions} used during {@link Request} decoding * @return a {@link ReactiveFilter} decoding {@link LdapRawMessage} into {@link Request} */ public static ReactiveFilter, Request, Stream> decodeRequest(final DecodeOptions decodeOptions) { return new ReactiveFilter, Request, Stream>() { @Override public Single> filter(final LDAPClientConnection2 context, final LdapRawMessage encodedRequestMessage, final ReactiveHandler> next) throws Exception { return newSingle(new Single.OnSubscribe() { @Override public void onSubscribe(final Single.Emitter emitter) throws Exception { LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions) .readMessage(new AbstractLDAPMessageHandler() { @Override public void abandonRequest(final int messageID, final AbandonRequest request) throws DecodeException, IOException { emitter.onSuccess(request); } @Override public void addRequest(int messageID, AddRequest request) throws DecodeException, IOException { emitter.onSuccess(request); } @Override public void bindRequest(int messageID, int version, GenericBindRequest request) throws DecodeException, IOException { emitter.onSuccess(request); } @Override public void modifyDNRequest(int messageID, ModifyDNRequest request) throws DecodeException, IOException { emitter.onSuccess(request); } @Override public void modifyRequest(int messageID, ModifyRequest request) throws DecodeException, IOException { emitter.onSuccess(request); } @Override public void searchRequest(int messageID, SearchRequest request) throws DecodeException, IOException { emitter.onSuccess(request); } @Override public void unbindRequest(int messageID, UnbindRequest request) throws DecodeException, IOException { emitter.onSuccess(request); } }); } }).flatMap(new Function>, Exception>() { @Override public Publisher> apply(Request t) throws Exception { return next.handle(context, t); } }); } }; } /** * Invoke the following {@link ReactiveFilter} from the given {@link Executor}. * * @param * Context type in which request are processed * @param * Type of dispatched request * @param * Type of dispatched response * @param executor * The {@link Executor} used to forward the request * @return A {@link ReactiveFilter} fowarding {@link Request} through the {@link Executor} */ public static ReactiveFilter dispatch(final Executor executor) { /** Dispatches request into an {@link Executor}. */ final class DispatchFilter extends SimpleReactiveFilter { private final Scheduler executor; DispatchFilter(final Executor executor) { this.executor = Schedulers.from(checkNotNull(executor, "executor must not be null")); } @Override public Single filter(final CTX context, final REQ request, final ReactiveHandler next) { return singleFromPublisher(Flowable.defer(new Callable>() { @Override public Publisher call() throws Exception { return next.handle(context, request); } }).subscribeOn(executor)); } } return new DispatchFilter(executor); } private Components() { // Prevent instantiation } }