From a164e9cd3f00ca774e03b201accf521f23ab7da1 Mon Sep 17 00:00:00 2001
From: Prashant <prashant.thakre@gmail.com>
Date: Fri, 30 May 2025 09:18:26 +0000
Subject: [PATCH] Bump io.reactivex.rxjava to 3.x (#504)
---
opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java | 8 ++--
opendj-grizzly/pom.xml | 4 ++
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java | 70 +++++++++++++++++-----------------
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java | 5 ++
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 10 ++--
opendj-core/pom.xml | 4 +-
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 8 ++--
7 files changed, 58 insertions(+), 51 deletions(-)
diff --git a/opendj-core/pom.xml b/opendj-core/pom.xml
index bf02860..2a76a7d 100644
--- a/opendj-core/pom.xml
+++ b/opendj-core/pom.xml
@@ -56,9 +56,9 @@
</dependency>
<dependency>
- <groupId>io.reactivex.rxjava2</groupId>
+ <groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
- <version>2.2.21</version>
+ <version>3.1.10</version>
</dependency>
<dependency>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 2ef0205..1bb350f 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -18,13 +18,13 @@
import org.forgerock.util.Function;
import org.reactivestreams.Publisher;
-import io.reactivex.CompletableEmitter;
-import io.reactivex.CompletableOnSubscribe;
-import io.reactivex.CompletableSource;
-import io.reactivex.Flowable;
-import io.reactivex.SingleEmitter;
-import io.reactivex.SingleOnSubscribe;
-import io.reactivex.SingleSource;
+import io.reactivex.rxjava3.core.CompletableEmitter;
+import io.reactivex.rxjava3.core.CompletableOnSubscribe;
+import io.reactivex.rxjava3.core.CompletableSource;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.SingleEmitter;
+import io.reactivex.rxjava3.core.SingleOnSubscribe;
+import io.reactivex.rxjava3.core.SingleSource;
/**
* {@link Stream} and {@link Single} implementations based on RxJava.
@@ -96,7 +96,7 @@
* @return A new {@link Stream}
*/
public static <V> Single<V> singleFromPublisher(final Publisher<V> publisher) {
- return new RxJavaSingle<>(io.reactivex.Single.fromPublisher(publisher));
+ return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.fromPublisher(publisher));
}
/**
@@ -109,7 +109,7 @@
* @return A new {@link Single}
*/
public static <V> Single<V> singleFrom(final V value) {
- return new RxJavaSingle<>(io.reactivex.Single.just(value));
+ return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.just(value));
}
/**
@@ -122,7 +122,7 @@
* @return A new {@link Single}
*/
public static <V> Single<V> singleError(final Throwable error) {
- return new RxJavaSingle<>(io.reactivex.Single.<V>error(error));
+ return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.<V>error(error));
}
/**
@@ -135,7 +135,7 @@
* @return A new {@link Single}
*/
public static <V> Single<V> newSingle(final Single.Emitter<V> emitter) {
- return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe<V>() {
+ return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.create(new SingleOnSubscribe<V>() {
@Override
public void subscribe(final SingleEmitter<V> e) throws Exception {
emitter.subscribe(new Single.Subscriber<V>() {
@@ -161,7 +161,7 @@
* @return A new {@link Completable}
*/
public static Completable newCompletable(final Completable.Emitter onSubscribe) {
- return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() {
+ return new RxJavaCompletable(io.reactivex.rxjava3.core.Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter e) throws Exception {
onSubscribe.subscribe(new Completable.Subscriber() {
@@ -186,7 +186,7 @@
* @return A new {@link Completable}
*/
public static Completable completableError(final Throwable error) {
- return new RxJavaCompletable(io.reactivex.Completable.error(error));
+ return new RxJavaCompletable(io.reactivex.rxjava3.core.Completable.error(error));
}
private static final class RxJavaStream<V> implements Stream<V> {
@@ -204,7 +204,7 @@
@Override
public <O> Stream<O> map(final Function<V, O, Exception> function) {
- return new RxJavaStream<>(impl.map(new io.reactivex.functions.Function<V, O>() {
+ return new RxJavaStream<>(impl.map(new io.reactivex.rxjava3.functions.Function<V, O>() {
@Override
public O apply(V t) throws Exception {
return function.apply(t);
@@ -215,7 +215,7 @@
@Override
public <O> Stream<O> flatMap(final Function<? super V, ? extends Publisher<? extends O>, Exception> function,
int maxConcurrency) {
- return new RxJavaStream<>(impl.flatMap(new io.reactivex.functions.Function<V, Publisher<? extends O>>() {
+ return new RxJavaStream<>(impl.flatMap(new io.reactivex.rxjava3.functions.Function<V, Publisher<? extends O>>() {
@Override
public Publisher<? extends O> apply(V t) throws Exception {
return function.apply(t);
@@ -225,7 +225,7 @@
@Override
public Stream<V> onNext(final Consumer<V> onNext) {
- return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer<V>() {
+ return new RxJavaStream<>(impl.doOnNext(new io.reactivex.rxjava3.functions.Consumer<V>() {
@Override
public void accept(V value) throws Exception {
onNext.accept(value);
@@ -235,7 +235,7 @@
@Override
public Stream<V> onError(final Consumer<Throwable> onError) {
- return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
+ return new RxJavaStream<>(impl.doOnError(new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
@Override
public void accept(Throwable t) throws Exception {
onError.accept(t);
@@ -246,7 +246,7 @@
@Override
public Stream<V> onErrorResumeWith(final Function<Throwable, Publisher<V>, Exception> function) {
return new RxJavaStream<>(
- impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, Publisher<? extends V>>() {
+ impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, Publisher<? extends V>>() {
@Override
public Publisher<? extends V> apply(Throwable t) throws Exception {
return function.apply(t);
@@ -256,7 +256,7 @@
@Override
public Stream<V> onComplete(final Action action) {
- return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
+ return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.rxjava3.functions.Action() {
@Override
public void run() throws Exception {
action.run();
@@ -272,9 +272,9 @@
private static final class RxJavaSingle<V> implements Single<V> {
- private final io.reactivex.Single<V> impl;
+ private final io.reactivex.rxjava3.core.Single<V> impl;
- private RxJavaSingle(io.reactivex.Single<V> impl) {
+ private RxJavaSingle(io.reactivex.rxjava3.core.Single<V> impl) {
this.impl = impl;
}
@@ -285,7 +285,7 @@
@Override
public <O> Single<O> map(final Function<V, O, Exception> function) {
- return new RxJavaSingle<>(impl.map(new io.reactivex.functions.Function<V, O>() {
+ return new RxJavaSingle<>(impl.map(new io.reactivex.rxjava3.functions.Function<V, O>() {
@Override
public O apply(V t) throws Exception {
return function.apply(t);
@@ -300,12 +300,12 @@
@Override
public void subscribe(final Consumer<? super V> resultConsumer, final Consumer<Throwable> errorConsumer) {
- impl.subscribe(new io.reactivex.functions.Consumer<V>() {
+ impl.subscribe(new io.reactivex.rxjava3.functions.Consumer<V>() {
@Override
public void accept(V t) throws Exception {
resultConsumer.accept(t);
}
- }, new io.reactivex.functions.Consumer<Throwable>() {
+ }, new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
@Override
public void accept(Throwable t) throws Exception {
errorConsumer.accept(t);
@@ -315,10 +315,10 @@
@Override
public <O> Single<O> flatMap(final Function<V, Single<O>, Exception> function) {
- return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function<V, SingleSource<O>>() {
+ return new RxJavaSingle<>(impl.flatMap(new io.reactivex.rxjava3.functions.Function<V, SingleSource<O>>() {
@Override
public SingleSource<O> apply(V t) throws Exception {
- return io.reactivex.Single.fromPublisher(function.apply(t));
+ return io.reactivex.rxjava3.core.Single.fromPublisher(function.apply(t));
}
}));
}
@@ -326,10 +326,10 @@
@Override
public Single<V> onErrorResumeWith(final Function<Throwable, Single<V>, Exception> function) {
return new RxJavaSingle<>(
- impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, SingleSource<V>>() {
+ impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, SingleSource<V>>() {
@Override
public SingleSource<V> apply(Throwable error) throws Exception {
- return io.reactivex.Single.fromPublisher(function.apply(error));
+ return io.reactivex.rxjava3.core.Single.fromPublisher(function.apply(error));
}
}));
}
@@ -337,9 +337,9 @@
private static final class RxJavaCompletable implements Completable {
- private final io.reactivex.Completable impl;
+ private final io.reactivex.rxjava3.core.Completable impl;
- RxJavaCompletable(io.reactivex.Completable impl) {
+ RxJavaCompletable(io.reactivex.rxjava3.core.Completable impl) {
this.impl = impl;
}
@@ -356,22 +356,22 @@
@Override
public Completable onErrorResumeWith(final Function<Throwable, Completable, Exception> function) {
return new RxJavaCompletable(
- impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, CompletableSource>() {
+ impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, CompletableSource>() {
@Override
public CompletableSource apply(Throwable error) throws Exception {
- return io.reactivex.Completable.fromPublisher(function.apply(error));
+ return io.reactivex.rxjava3.core.Completable.fromPublisher(function.apply(error));
}
}));
}
@Override
public void subscribe(final Action completeAction, final Consumer<Throwable> errorConsumer) {
- impl.subscribe(new io.reactivex.functions.Action() {
+ impl.subscribe(new io.reactivex.rxjava3.functions.Action() {
@Override
public void run() throws Exception {
completeAction.run();
}
- }, new io.reactivex.functions.Consumer<Throwable>() {
+ }, new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
@Override
public void accept(final Throwable error) throws Exception {
errorConsumer.accept(error);
@@ -381,7 +381,7 @@
@Override
public Completable doAfterTerminate(final Action onTerminate) {
- return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.functions.Action() {
+ return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.rxjava3.functions.Action() {
@Override
public void run() throws Exception {
onTerminate.run();
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
index 0cb33bf..4edd2a2 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -59,10 +59,10 @@
import org.forgerock.util.Function;
import org.forgerock.util.promise.RuntimeExceptionHandler;
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
-import io.reactivex.FlowableEmitter;
-import io.reactivex.FlowableOnSubscribe;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.FlowableEmitter;
+import io.reactivex.rxjava3.core.FlowableOnSubscribe;
/**
* Adapt a {@link ServerConnectionFactory} to a {@link Function} compatible with
diff --git a/opendj-grizzly/pom.xml b/opendj-grizzly/pom.xml
index 7a6e175..0e1fc5f 100644
--- a/opendj-grizzly/pom.xml
+++ b/opendj-grizzly/pom.xml
@@ -51,6 +51,10 @@
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.reactivex.rxjava3</groupId>
+ <artifactId>rxjava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 0af9bc6..5dd6e09 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -85,8 +85,8 @@
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
-import io.reactivex.exceptions.OnErrorNotImplementedException;
-import io.reactivex.internal.util.BackpressureHelper;
+import io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException;
+import io.reactivex.rxjava3.internal.util.BackpressureHelper;
/**
* Grizzly filter implementation for decoding LDAP requests and handling server side logic for SSL and SASL operations
@@ -390,7 +390,7 @@
return false;
}
SSLUtils.setSSLEngine(connection, sslEngine);
-
+
Properties props = System.getProperties();
// Workaround for PKCS11
@@ -398,7 +398,7 @@
if ("none".equalsIgnoreCase(keyStoreFile)) {
System.setProperty(SSLContextConfigurator.TRUST_STORE_FILE, "NONE");
}
-
+
SSLFilter sslFilter = new SSLFilter();
sslFilter.setHandshakeTimeout(getLongProperty("org.forgerock.opendj.grizzly.handshakeTimeout", 10000), TimeUnit.MILLISECONDS);
installFilter(startTls ? new StartTLSFilter(sslFilter) : sslFilter);
@@ -661,7 +661,7 @@
});
}
});
-
+
}
}
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 138e371..47d90d8 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -18,10 +18,13 @@
import java.util.concurrent.CancellationException;
-import io.reactivex.exceptions.UndeliverableException;
+import io.reactivex.rxjava3.exceptions.UndeliverableException;
+
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
+
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
+
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index ab47f5f..7779dc7 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -117,10 +117,10 @@
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
-import io.reactivex.FlowableEmitter;
-import io.reactivex.FlowableOnSubscribe;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.FlowableEmitter;
+import io.reactivex.rxjava3.core.FlowableOnSubscribe;
/**
* This class defines an LDAP client connection, which is a type of client connection that will be accepted by an
--
Gitblit v1.10.0