mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Prashant
30.18.2025 a164e9cd3f00ca774e03b201accf521f23ab7da1
Bump io.reactivex.rxjava to 3.x (#504)

Co-authored-by: Prashant <Prashant.Thakre@vida.id>
7 files modified
103 ■■■■ changed files
opendj-core/pom.xml 4 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java 70 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java 8 ●●●● patch | view | raw | blame | history
opendj-grizzly/pom.xml 4 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 4 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java 5 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 8 ●●●● patch | view | raw | blame | history
opendj-core/pom.xml
@@ -56,9 +56,9 @@
        </dependency>
        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.2.21</version>
            <version>3.1.10</version>
        </dependency>
        <dependency>
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -18,13 +18,13 @@
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.CompletableSource;
import io.reactivex.Flowable;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.SingleSource;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.CompletableOnSubscribe;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.SingleEmitter;
import io.reactivex.rxjava3.core.SingleOnSubscribe;
import io.reactivex.rxjava3.core.SingleSource;
/**
 * {@link Stream} and {@link Single} implementations based on RxJava.
@@ -96,7 +96,7 @@
     * @return A new {@link Stream}
     */
    public static <V> Single<V> singleFromPublisher(final Publisher<V> publisher) {
        return new RxJavaSingle<>(io.reactivex.Single.fromPublisher(publisher));
        return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.fromPublisher(publisher));
    }
    /**
@@ -109,7 +109,7 @@
     * @return A new {@link Single}
     */
    public static <V> Single<V> singleFrom(final V value) {
        return new RxJavaSingle<>(io.reactivex.Single.just(value));
        return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.just(value));
    }
    /**
@@ -122,7 +122,7 @@
     * @return A new {@link Single}
     */
    public static <V> Single<V> singleError(final Throwable error) {
        return new RxJavaSingle<>(io.reactivex.Single.<V>error(error));
        return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.<V>error(error));
    }
    /**
@@ -135,7 +135,7 @@
     * @return A new {@link Single}
     */
    public static <V> Single<V> newSingle(final Single.Emitter<V> emitter) {
        return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe<V>() {
        return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.create(new SingleOnSubscribe<V>() {
            @Override
            public void subscribe(final SingleEmitter<V> e) throws Exception {
                emitter.subscribe(new Single.Subscriber<V>() {
@@ -161,7 +161,7 @@
     * @return A new {@link Completable}
     */
    public static Completable newCompletable(final Completable.Emitter onSubscribe) {
        return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() {
        return new RxJavaCompletable(io.reactivex.rxjava3.core.Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(final CompletableEmitter e) throws Exception {
                onSubscribe.subscribe(new Completable.Subscriber() {
@@ -186,7 +186,7 @@
     * @return A new {@link Completable}
     */
    public static Completable completableError(final Throwable error) {
        return new RxJavaCompletable(io.reactivex.Completable.error(error));
        return new RxJavaCompletable(io.reactivex.rxjava3.core.Completable.error(error));
    }
    private static final class RxJavaStream<V> implements Stream<V> {
@@ -204,7 +204,7 @@
        @Override
        public <O> Stream<O> map(final Function<V, O, Exception> function) {
            return new RxJavaStream<>(impl.map(new io.reactivex.functions.Function<V, O>() {
            return new RxJavaStream<>(impl.map(new io.reactivex.rxjava3.functions.Function<V, O>() {
                @Override
                public O apply(V t) throws Exception {
                    return function.apply(t);
@@ -215,7 +215,7 @@
        @Override
        public <O> Stream<O> flatMap(final Function<? super V, ? extends Publisher<? extends O>, Exception> function,
                int maxConcurrency) {
            return new RxJavaStream<>(impl.flatMap(new io.reactivex.functions.Function<V, Publisher<? extends O>>() {
            return new RxJavaStream<>(impl.flatMap(new io.reactivex.rxjava3.functions.Function<V, Publisher<? extends O>>() {
                @Override
                public Publisher<? extends O> apply(V t) throws Exception {
                    return function.apply(t);
@@ -225,7 +225,7 @@
        @Override
        public Stream<V> onNext(final Consumer<V> onNext) {
            return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer<V>() {
            return new RxJavaStream<>(impl.doOnNext(new io.reactivex.rxjava3.functions.Consumer<V>() {
                @Override
                public void accept(V value) throws Exception {
                    onNext.accept(value);
@@ -235,7 +235,7 @@
        @Override
        public Stream<V> onError(final Consumer<Throwable> onError) {
            return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
            return new RxJavaStream<>(impl.doOnError(new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
                @Override
                public void accept(Throwable t) throws Exception {
                    onError.accept(t);
@@ -246,7 +246,7 @@
        @Override
        public Stream<V> onErrorResumeWith(final Function<Throwable, Publisher<V>, Exception> function) {
            return new RxJavaStream<>(
                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, Publisher<? extends V>>() {
                    impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, Publisher<? extends V>>() {
                        @Override
                        public Publisher<? extends V> apply(Throwable t) throws Exception {
                            return function.apply(t);
@@ -256,7 +256,7 @@
        @Override
        public Stream<V> onComplete(final Action action) {
            return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
            return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.rxjava3.functions.Action() {
                @Override
                public void run() throws Exception {
                    action.run();
@@ -272,9 +272,9 @@
    private static final class RxJavaSingle<V> implements Single<V> {
        private final io.reactivex.Single<V> impl;
        private final io.reactivex.rxjava3.core.Single<V> impl;
        private RxJavaSingle(io.reactivex.Single<V> impl) {
        private RxJavaSingle(io.reactivex.rxjava3.core.Single<V> impl) {
            this.impl = impl;
        }
@@ -285,7 +285,7 @@
        @Override
        public <O> Single<O> map(final Function<V, O, Exception> function) {
            return new RxJavaSingle<>(impl.map(new io.reactivex.functions.Function<V, O>() {
            return new RxJavaSingle<>(impl.map(new io.reactivex.rxjava3.functions.Function<V, O>() {
                @Override
                public O apply(V t) throws Exception {
                    return function.apply(t);
@@ -300,12 +300,12 @@
        @Override
        public void subscribe(final Consumer<? super V> resultConsumer, final Consumer<Throwable> errorConsumer) {
            impl.subscribe(new io.reactivex.functions.Consumer<V>() {
            impl.subscribe(new io.reactivex.rxjava3.functions.Consumer<V>() {
                @Override
                public void accept(V t) throws Exception {
                    resultConsumer.accept(t);
                }
            }, new io.reactivex.functions.Consumer<Throwable>() {
            }, new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
                @Override
                public void accept(Throwable t) throws Exception {
                    errorConsumer.accept(t);
@@ -315,10 +315,10 @@
        @Override
        public <O> Single<O> flatMap(final Function<V, Single<O>, Exception> function) {
            return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function<V, SingleSource<O>>() {
            return new RxJavaSingle<>(impl.flatMap(new io.reactivex.rxjava3.functions.Function<V, SingleSource<O>>() {
                @Override
                public SingleSource<O> apply(V t) throws Exception {
                    return io.reactivex.Single.fromPublisher(function.apply(t));
                    return io.reactivex.rxjava3.core.Single.fromPublisher(function.apply(t));
                }
            }));
        }
@@ -326,10 +326,10 @@
        @Override
        public Single<V> onErrorResumeWith(final Function<Throwable, Single<V>, Exception> function) {
            return new RxJavaSingle<>(
                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, SingleSource<V>>() {
                    impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, SingleSource<V>>() {
                        @Override
                        public SingleSource<V> apply(Throwable error) throws Exception {
                            return io.reactivex.Single.fromPublisher(function.apply(error));
                            return io.reactivex.rxjava3.core.Single.fromPublisher(function.apply(error));
                        }
                    }));
        }
@@ -337,9 +337,9 @@
    private static final class RxJavaCompletable implements Completable {
        private final io.reactivex.Completable impl;
        private final io.reactivex.rxjava3.core.Completable impl;
        RxJavaCompletable(io.reactivex.Completable impl) {
        RxJavaCompletable(io.reactivex.rxjava3.core.Completable impl) {
            this.impl = impl;
        }
@@ -356,22 +356,22 @@
        @Override
        public Completable onErrorResumeWith(final Function<Throwable, Completable, Exception> function) {
            return new RxJavaCompletable(
                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, CompletableSource>() {
                    impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, CompletableSource>() {
                        @Override
                        public CompletableSource apply(Throwable error) throws Exception {
                            return io.reactivex.Completable.fromPublisher(function.apply(error));
                            return io.reactivex.rxjava3.core.Completable.fromPublisher(function.apply(error));
                        }
                    }));
        }
        @Override
        public void subscribe(final Action completeAction, final Consumer<Throwable> errorConsumer) {
            impl.subscribe(new io.reactivex.functions.Action() {
            impl.subscribe(new io.reactivex.rxjava3.functions.Action() {
                @Override
                public void run() throws Exception {
                    completeAction.run();
                }
            }, new io.reactivex.functions.Consumer<Throwable>() {
            }, new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
                @Override
                public void accept(final Throwable error) throws Exception {
                    errorConsumer.accept(error);
@@ -381,7 +381,7 @@
        @Override
        public Completable doAfterTerminate(final Action onTerminate) {
            return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.functions.Action() {
            return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.rxjava3.functions.Action() {
                @Override
                public void run() throws Exception {
                    onTerminate.run();
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -59,10 +59,10 @@
import org.forgerock.util.Function;
import org.forgerock.util.promise.RuntimeExceptionHandler;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableEmitter;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;
/**
 * Adapt a {@link ServerConnectionFactory} to a {@link Function} compatible with
opendj-grizzly/pom.xml
@@ -51,6 +51,10 @@
                        <groupId>io.reactivex.rxjava2</groupId>
                        <artifactId>rxjava</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>io.reactivex.rxjava3</groupId>
                        <artifactId>rxjava</artifactId>
                    </exclusion>
            </exclusions>
        </dependency>
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -85,8 +85,8 @@
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
/**
 * Grizzly filter implementation for decoding LDAP requests and handling server side logic for SSL and SASL operations
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -18,10 +18,13 @@
import java.util.concurrent.CancellationException;
import io.reactivex.exceptions.UndeliverableException;
import io.reactivex.rxjava3.exceptions.UndeliverableException;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -117,10 +117,10 @@
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableEmitter;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;
/**
 * This class defines an LDAP client connection, which is a type of client connection that will be accepted by an