/* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2016 ForgeRock AS. */ package com.forgerock.reactive; import org.forgerock.util.Function; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import io.reactivex.CompletableEmitter; import io.reactivex.CompletableOnSubscribe; import io.reactivex.CompletableSource; import io.reactivex.Flowable; import io.reactivex.SingleEmitter; import io.reactivex.SingleOnSubscribe; import io.reactivex.SingleSource; /** * {@link Stream} and {@link Single} implementations based on RxJava. */ public final class RxJavaStreams { private RxJavaStreams() { // Hide } /** * Create a new {@link Stream} from the given {@link Publisher}. * * @param * Type of data emitted * @param publisher * The {@link Publisher} to convert * @return A new {@link Stream} */ public static Stream streamFromPublisher(final Publisher publisher) { return new RxJavaStream<>(Flowable.fromPublisher(publisher)); } /** * Create a new {@link Stream} composed only of the given value. * * @param * Type of data emitted * @param value * The value emitted by this stream * @return A new {@link Stream} */ public static Stream streamFrom(final V value) { return new RxJavaStream<>(Flowable.just(value)); } /** * Create a new {@link Stream} composed only of the given error. * * @param * Type of data emitted * @param error * The error emitted by this stream * @return A new {@link Stream} */ public static Stream streamError(final Throwable error) { return new RxJavaStream<>(Flowable. error(error)); } /** * Create a new empty {@link Stream}. * * @param * Type of data emitted * @return An empty {@link Stream} */ public static Stream emptyStream() { return new RxJavaStream<>(Flowable. empty()); } /** * Create a new {@link Single} from the given {@link Publisher}. If the {@link Publisher} produce more than one * result, they'll be dropped and the inner {@link Subscription} cancelled. * * @param * Type of the datum emitted * @param publisher * The {@link Publisher} to convert * @return A new {@link Stream} */ public static Single singleFromPublisher(final Publisher publisher) { return new RxJavaSingle<>(io.reactivex.Single.fromPublisher(publisher)); } /** * Create a new {@link Single} from the given value. * * @param * Type of the datum emitted * @param value * The value contained by this {@link Single} * @return A new {@link Single} */ public static Single singleFrom(final V value) { return new RxJavaSingle<>(io.reactivex.Single.just(value)); } /** * Create a new {@link Single} from the given error. * * @param * Type of the datum emitted * @param error * The error emitted by this {@link Single} * @return A new {@link Single} */ public static Single singleError(final Throwable error) { return new RxJavaSingle<>(io.reactivex.Single.error(error)); } /** * Creates a bridge from callback world to {@link Single}. * * @param * Type of the datum emitted * @param emitter * Action to perform once this {@link Single} has been subscribed to. * @return A new {@link Single} */ public static Single newSingle(final Single.Emitter emitter) { return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe() { @Override public void subscribe(final SingleEmitter e) throws Exception { emitter.subscribe(new Single.Subscriber() { @Override public void onComplete(V value) { e.onSuccess(value); } @Override public void onError(Throwable error) { e.onError(error); } }); } })); } /** * Creates a bridge from callback world to {@link Completable}. * * @param onSubscribe * Action to perform once this {@link Completable} has been subscribed to. * @return A new {@link Completable} */ public static Completable newCompletable(final Completable.Emitter onSubscribe) { return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(final CompletableEmitter e) throws Exception { onSubscribe.subscribe(new Completable.Subscriber() { @Override public void onComplete() { e.onComplete(); } @Override public void onError(final Throwable error) { e.onError(error); } }); } })); } /** * Create a new {@link Completable} from the given error. * * @param error * The error emitted by this {@link Completable} * @return A new {@link Completable} */ public static Completable completableError(final Throwable error) { return new RxJavaCompletable(io.reactivex.Completable.error(error)); } private static final class RxJavaStream implements Stream { private final Flowable impl; private RxJavaStream(final Flowable impl) { this.impl = impl; } @Override public void subscribe(org.reactivestreams.Subscriber s) { impl.subscribe(s); } @Override public Stream map(final Function function) { return new RxJavaStream<>(impl.map(new io.reactivex.functions.Function() { @Override public O apply(V t) throws Exception { return function.apply(t); } })); } @Override public Stream flatMap(final Function, Exception> function, int maxConcurrency) { return new RxJavaStream<>(impl.flatMap(new io.reactivex.functions.Function>() { @Override public Publisher apply(V t) throws Exception { return function.apply(t); } }, maxConcurrency)); } @Override public Stream onNext(final Consumer onNext) { return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer() { @Override public void accept(V value) throws Exception { onNext.accept(value); } })); } @Override public Stream onError(final Consumer onError) { return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer() { @Override public void accept(Throwable t) throws Exception { onError.accept(t); } })); } @Override public Stream onErrorResumeWith(final Function, Exception> function) { return new RxJavaStream<>( impl.onErrorResumeNext(new io.reactivex.functions.Function>() { @Override public Publisher apply(Throwable t) throws Exception { return function.apply(t); } })); } @Override public Stream onComplete(final Action action) { return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() { @Override public void run() throws Exception { action.run(); } })); } @Override public void subscribe() { impl.subscribe(); } } private static final class RxJavaSingle implements Single { private final io.reactivex.Single impl; private RxJavaSingle(io.reactivex.Single impl) { this.impl = impl; } @Override public Stream toStream() { return new RxJavaStream<>(impl.toFlowable()); } @Override public Single map(final Function function) { return new RxJavaSingle<>(impl.map(new io.reactivex.functions.Function() { @Override public O apply(V t) throws Exception { return function.apply(t); } })); } @Override public void subscribe(org.reactivestreams.Subscriber s) { impl.toFlowable().subscribe(s); } @Override public void subscribe(final Consumer resultConsumer, final Consumer errorConsumer) { impl.subscribe(new io.reactivex.functions.Consumer() { @Override public void accept(V t) throws Exception { resultConsumer.accept(t); } }, new io.reactivex.functions.Consumer() { @Override public void accept(Throwable t) throws Exception { errorConsumer.accept(t); } }); } @Override public Single flatMap(final Function, Exception> function) { return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function>() { @Override public SingleSource apply(V t) throws Exception { return io.reactivex.Single.fromPublisher(function.apply(t)); } })); } @Override public Single onErrorResumeWith(final Function, Exception> function) { return new RxJavaSingle<>( impl.onErrorResumeNext(new io.reactivex.functions.Function>() { @Override public SingleSource apply(Throwable error) throws Exception { return io.reactivex.Single.fromPublisher(function.apply(error)); } })); } } private static final class RxJavaCompletable implements Completable { private final io.reactivex.Completable impl; RxJavaCompletable(io.reactivex.Completable impl) { this.impl = impl; } @Override public Single toSingle(V value) { return new RxJavaSingle<>(impl.toSingleDefault(value)); } @Override public void subscribe(org.reactivestreams.Subscriber s) { impl.toFlowable().subscribe(s); } @Override public Completable onErrorResumeWith(final Function function) { return new RxJavaCompletable( impl.onErrorResumeNext(new io.reactivex.functions.Function() { @Override public CompletableSource apply(Throwable error) throws Exception { return io.reactivex.Completable.fromPublisher(function.apply(error)); } })); } @Override public void subscribe(final Action completeAction, final Consumer errorConsumer) { impl.subscribe(new io.reactivex.functions.Action() { @Override public void run() throws Exception { completeAction.run(); } }, new io.reactivex.functions.Consumer() { @Override public void accept(final Throwable error) throws Exception { errorConsumer.accept(error); } }); } } }