| | |
| | | import org.forgerock.util.Function; |
| | | import org.reactivestreams.Publisher; |
| | | |
| | | import io.reactivex.CompletableEmitter; |
| | | import io.reactivex.CompletableOnSubscribe; |
| | | import io.reactivex.CompletableSource; |
| | | import io.reactivex.Flowable; |
| | | import io.reactivex.SingleEmitter; |
| | | import io.reactivex.SingleOnSubscribe; |
| | | import io.reactivex.SingleSource; |
| | | import io.reactivex.rxjava3.core.CompletableEmitter; |
| | | import io.reactivex.rxjava3.core.CompletableOnSubscribe; |
| | | import io.reactivex.rxjava3.core.CompletableSource; |
| | | import io.reactivex.rxjava3.core.Flowable; |
| | | import io.reactivex.rxjava3.core.SingleEmitter; |
| | | import io.reactivex.rxjava3.core.SingleOnSubscribe; |
| | | import io.reactivex.rxjava3.core.SingleSource; |
| | | |
| | | /** |
| | | * {@link Stream} and {@link Single} implementations based on RxJava. |
| | |
| | | * @return A new {@link Stream} |
| | | */ |
| | | public static <V> Single<V> singleFromPublisher(final Publisher<V> publisher) { |
| | | return new RxJavaSingle<>(io.reactivex.Single.fromPublisher(publisher)); |
| | | return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.fromPublisher(publisher)); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return A new {@link Single} |
| | | */ |
| | | public static <V> Single<V> singleFrom(final V value) { |
| | | return new RxJavaSingle<>(io.reactivex.Single.just(value)); |
| | | return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.just(value)); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return A new {@link Single} |
| | | */ |
| | | public static <V> Single<V> singleError(final Throwable error) { |
| | | return new RxJavaSingle<>(io.reactivex.Single.<V>error(error)); |
| | | return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.<V>error(error)); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return A new {@link Single} |
| | | */ |
| | | public static <V> Single<V> newSingle(final Single.Emitter<V> emitter) { |
| | | return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe<V>() { |
| | | return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.create(new SingleOnSubscribe<V>() { |
| | | @Override |
| | | public void subscribe(final SingleEmitter<V> e) throws Exception { |
| | | emitter.subscribe(new Single.Subscriber<V>() { |
| | |
| | | * @return A new {@link Completable} |
| | | */ |
| | | public static Completable newCompletable(final Completable.Emitter onSubscribe) { |
| | | return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() { |
| | | return new RxJavaCompletable(io.reactivex.rxjava3.core.Completable.create(new CompletableOnSubscribe() { |
| | | @Override |
| | | public void subscribe(final CompletableEmitter e) throws Exception { |
| | | onSubscribe.subscribe(new Completable.Subscriber() { |
| | |
| | | * @return A new {@link Completable} |
| | | */ |
| | | public static Completable completableError(final Throwable error) { |
| | | return new RxJavaCompletable(io.reactivex.Completable.error(error)); |
| | | return new RxJavaCompletable(io.reactivex.rxjava3.core.Completable.error(error)); |
| | | } |
| | | |
| | | private static final class RxJavaStream<V> implements Stream<V> { |
| | |
| | | |
| | | @Override |
| | | public <O> Stream<O> map(final Function<V, O, Exception> function) { |
| | | return new RxJavaStream<>(impl.map(new io.reactivex.functions.Function<V, O>() { |
| | | return new RxJavaStream<>(impl.map(new io.reactivex.rxjava3.functions.Function<V, O>() { |
| | | @Override |
| | | public O apply(V t) throws Exception { |
| | | return function.apply(t); |
| | |
| | | @Override |
| | | public <O> Stream<O> flatMap(final Function<? super V, ? extends Publisher<? extends O>, Exception> function, |
| | | int maxConcurrency) { |
| | | return new RxJavaStream<>(impl.flatMap(new io.reactivex.functions.Function<V, Publisher<? extends O>>() { |
| | | return new RxJavaStream<>(impl.flatMap(new io.reactivex.rxjava3.functions.Function<V, Publisher<? extends O>>() { |
| | | @Override |
| | | public Publisher<? extends O> apply(V t) throws Exception { |
| | | return function.apply(t); |
| | |
| | | |
| | | @Override |
| | | public Stream<V> onNext(final Consumer<V> onNext) { |
| | | return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer<V>() { |
| | | return new RxJavaStream<>(impl.doOnNext(new io.reactivex.rxjava3.functions.Consumer<V>() { |
| | | @Override |
| | | public void accept(V value) throws Exception { |
| | | onNext.accept(value); |
| | |
| | | |
| | | @Override |
| | | public Stream<V> onError(final Consumer<Throwable> onError) { |
| | | return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() { |
| | | return new RxJavaStream<>(impl.doOnError(new io.reactivex.rxjava3.functions.Consumer<Throwable>() { |
| | | @Override |
| | | public void accept(Throwable t) throws Exception { |
| | | onError.accept(t); |
| | |
| | | @Override |
| | | public Stream<V> onErrorResumeWith(final Function<Throwable, Publisher<V>, Exception> function) { |
| | | return new RxJavaStream<>( |
| | | impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, Publisher<? extends V>>() { |
| | | impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, Publisher<? extends V>>() { |
| | | @Override |
| | | public Publisher<? extends V> apply(Throwable t) throws Exception { |
| | | return function.apply(t); |
| | |
| | | |
| | | @Override |
| | | public Stream<V> onComplete(final Action action) { |
| | | return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() { |
| | | return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.rxjava3.functions.Action() { |
| | | @Override |
| | | public void run() throws Exception { |
| | | action.run(); |
| | |
| | | |
| | | private static final class RxJavaSingle<V> implements Single<V> { |
| | | |
| | | private final io.reactivex.Single<V> impl; |
| | | private final io.reactivex.rxjava3.core.Single<V> impl; |
| | | |
| | | private RxJavaSingle(io.reactivex.Single<V> impl) { |
| | | private RxJavaSingle(io.reactivex.rxjava3.core.Single<V> impl) { |
| | | this.impl = impl; |
| | | } |
| | | |
| | |
| | | |
| | | @Override |
| | | public <O> Single<O> map(final Function<V, O, Exception> function) { |
| | | return new RxJavaSingle<>(impl.map(new io.reactivex.functions.Function<V, O>() { |
| | | return new RxJavaSingle<>(impl.map(new io.reactivex.rxjava3.functions.Function<V, O>() { |
| | | @Override |
| | | public O apply(V t) throws Exception { |
| | | return function.apply(t); |
| | |
| | | |
| | | @Override |
| | | public void subscribe(final Consumer<? super V> resultConsumer, final Consumer<Throwable> errorConsumer) { |
| | | impl.subscribe(new io.reactivex.functions.Consumer<V>() { |
| | | impl.subscribe(new io.reactivex.rxjava3.functions.Consumer<V>() { |
| | | @Override |
| | | public void accept(V t) throws Exception { |
| | | resultConsumer.accept(t); |
| | | } |
| | | }, new io.reactivex.functions.Consumer<Throwable>() { |
| | | }, new io.reactivex.rxjava3.functions.Consumer<Throwable>() { |
| | | @Override |
| | | public void accept(Throwable t) throws Exception { |
| | | errorConsumer.accept(t); |
| | |
| | | |
| | | @Override |
| | | public <O> Single<O> flatMap(final Function<V, Single<O>, Exception> function) { |
| | | return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function<V, SingleSource<O>>() { |
| | | return new RxJavaSingle<>(impl.flatMap(new io.reactivex.rxjava3.functions.Function<V, SingleSource<O>>() { |
| | | @Override |
| | | public SingleSource<O> apply(V t) throws Exception { |
| | | return io.reactivex.Single.fromPublisher(function.apply(t)); |
| | | return io.reactivex.rxjava3.core.Single.fromPublisher(function.apply(t)); |
| | | } |
| | | })); |
| | | } |
| | |
| | | @Override |
| | | public Single<V> onErrorResumeWith(final Function<Throwable, Single<V>, Exception> function) { |
| | | return new RxJavaSingle<>( |
| | | impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, SingleSource<V>>() { |
| | | impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, SingleSource<V>>() { |
| | | @Override |
| | | public SingleSource<V> apply(Throwable error) throws Exception { |
| | | return io.reactivex.Single.fromPublisher(function.apply(error)); |
| | | return io.reactivex.rxjava3.core.Single.fromPublisher(function.apply(error)); |
| | | } |
| | | })); |
| | | } |
| | |
| | | |
| | | private static final class RxJavaCompletable implements Completable { |
| | | |
| | | private final io.reactivex.Completable impl; |
| | | private final io.reactivex.rxjava3.core.Completable impl; |
| | | |
| | | RxJavaCompletable(io.reactivex.Completable impl) { |
| | | RxJavaCompletable(io.reactivex.rxjava3.core.Completable impl) { |
| | | this.impl = impl; |
| | | } |
| | | |
| | |
| | | @Override |
| | | public Completable onErrorResumeWith(final Function<Throwable, Completable, Exception> function) { |
| | | return new RxJavaCompletable( |
| | | impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, CompletableSource>() { |
| | | impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, CompletableSource>() { |
| | | @Override |
| | | public CompletableSource apply(Throwable error) throws Exception { |
| | | return io.reactivex.Completable.fromPublisher(function.apply(error)); |
| | | return io.reactivex.rxjava3.core.Completable.fromPublisher(function.apply(error)); |
| | | } |
| | | })); |
| | | } |
| | | |
| | | @Override |
| | | public void subscribe(final Action completeAction, final Consumer<Throwable> errorConsumer) { |
| | | impl.subscribe(new io.reactivex.functions.Action() { |
| | | impl.subscribe(new io.reactivex.rxjava3.functions.Action() { |
| | | @Override |
| | | public void run() throws Exception { |
| | | completeAction.run(); |
| | | } |
| | | }, new io.reactivex.functions.Consumer<Throwable>() { |
| | | }, new io.reactivex.rxjava3.functions.Consumer<Throwable>() { |
| | | @Override |
| | | public void accept(final Throwable error) throws Exception { |
| | | errorConsumer.accept(error); |
| | |
| | | |
| | | @Override |
| | | public Completable doAfterTerminate(final Action onTerminate) { |
| | | return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.functions.Action() { |
| | | return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.rxjava3.functions.Action() { |
| | | @Override |
| | | public void run() throws Exception { |
| | | onTerminate.run(); |