From a164e9cd3f00ca774e03b201accf521f23ab7da1 Mon Sep 17 00:00:00 2001
From: Prashant <prashant.thakre@gmail.com>
Date: Fri, 30 May 2025 09:18:26 +0000
Subject: [PATCH] Bump io.reactivex.rxjava to 3.x (#504)

---
 opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java        |    8 ++--
 opendj-grizzly/pom.xml                                                                      |    4 ++
 opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java                         |   70 +++++++++++++++++-----------------
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java    |    5 ++
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java             |   10 ++--
 opendj-core/pom.xml                                                                         |    4 +-
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java |    8 ++--
 7 files changed, 58 insertions(+), 51 deletions(-)

diff --git a/opendj-core/pom.xml b/opendj-core/pom.xml
index bf02860..2a76a7d 100644
--- a/opendj-core/pom.xml
+++ b/opendj-core/pom.xml
@@ -56,9 +56,9 @@
         </dependency>
 
         <dependency>
-            <groupId>io.reactivex.rxjava2</groupId>
+            <groupId>io.reactivex.rxjava3</groupId>
             <artifactId>rxjava</artifactId>
-            <version>2.2.21</version>
+            <version>3.1.10</version>
         </dependency>
 
         <dependency>
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
index 2ef0205..1bb350f 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -18,13 +18,13 @@
 import org.forgerock.util.Function;
 import org.reactivestreams.Publisher;
 
-import io.reactivex.CompletableEmitter;
-import io.reactivex.CompletableOnSubscribe;
-import io.reactivex.CompletableSource;
-import io.reactivex.Flowable;
-import io.reactivex.SingleEmitter;
-import io.reactivex.SingleOnSubscribe;
-import io.reactivex.SingleSource;
+import io.reactivex.rxjava3.core.CompletableEmitter;
+import io.reactivex.rxjava3.core.CompletableOnSubscribe;
+import io.reactivex.rxjava3.core.CompletableSource;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.SingleEmitter;
+import io.reactivex.rxjava3.core.SingleOnSubscribe;
+import io.reactivex.rxjava3.core.SingleSource;
 
 /**
  * {@link Stream} and {@link Single} implementations based on RxJava.
@@ -96,7 +96,7 @@
      * @return A new {@link Stream}
      */
     public static <V> Single<V> singleFromPublisher(final Publisher<V> publisher) {
-        return new RxJavaSingle<>(io.reactivex.Single.fromPublisher(publisher));
+        return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.fromPublisher(publisher));
     }
 
     /**
@@ -109,7 +109,7 @@
      * @return A new {@link Single}
      */
     public static <V> Single<V> singleFrom(final V value) {
-        return new RxJavaSingle<>(io.reactivex.Single.just(value));
+        return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.just(value));
     }
 
     /**
@@ -122,7 +122,7 @@
      * @return A new {@link Single}
      */
     public static <V> Single<V> singleError(final Throwable error) {
-        return new RxJavaSingle<>(io.reactivex.Single.<V>error(error));
+        return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.<V>error(error));
     }
 
     /**
@@ -135,7 +135,7 @@
      * @return A new {@link Single}
      */
     public static <V> Single<V> newSingle(final Single.Emitter<V> emitter) {
-        return new RxJavaSingle<>(io.reactivex.Single.create(new SingleOnSubscribe<V>() {
+        return new RxJavaSingle<>(io.reactivex.rxjava3.core.Single.create(new SingleOnSubscribe<V>() {
             @Override
             public void subscribe(final SingleEmitter<V> e) throws Exception {
                 emitter.subscribe(new Single.Subscriber<V>() {
@@ -161,7 +161,7 @@
      * @return A new {@link Completable}
      */
     public static Completable newCompletable(final Completable.Emitter onSubscribe) {
-        return new RxJavaCompletable(io.reactivex.Completable.create(new CompletableOnSubscribe() {
+        return new RxJavaCompletable(io.reactivex.rxjava3.core.Completable.create(new CompletableOnSubscribe() {
             @Override
             public void subscribe(final CompletableEmitter e) throws Exception {
                 onSubscribe.subscribe(new Completable.Subscriber() {
@@ -186,7 +186,7 @@
      * @return A new {@link Completable}
      */
     public static Completable completableError(final Throwable error) {
-        return new RxJavaCompletable(io.reactivex.Completable.error(error));
+        return new RxJavaCompletable(io.reactivex.rxjava3.core.Completable.error(error));
     }
 
     private static final class RxJavaStream<V> implements Stream<V> {
@@ -204,7 +204,7 @@
 
         @Override
         public <O> Stream<O> map(final Function<V, O, Exception> function) {
-            return new RxJavaStream<>(impl.map(new io.reactivex.functions.Function<V, O>() {
+            return new RxJavaStream<>(impl.map(new io.reactivex.rxjava3.functions.Function<V, O>() {
                 @Override
                 public O apply(V t) throws Exception {
                     return function.apply(t);
@@ -215,7 +215,7 @@
         @Override
         public <O> Stream<O> flatMap(final Function<? super V, ? extends Publisher<? extends O>, Exception> function,
                 int maxConcurrency) {
-            return new RxJavaStream<>(impl.flatMap(new io.reactivex.functions.Function<V, Publisher<? extends O>>() {
+            return new RxJavaStream<>(impl.flatMap(new io.reactivex.rxjava3.functions.Function<V, Publisher<? extends O>>() {
                 @Override
                 public Publisher<? extends O> apply(V t) throws Exception {
                     return function.apply(t);
@@ -225,7 +225,7 @@
 
         @Override
         public Stream<V> onNext(final Consumer<V> onNext) {
-            return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer<V>() {
+            return new RxJavaStream<>(impl.doOnNext(new io.reactivex.rxjava3.functions.Consumer<V>() {
                 @Override
                 public void accept(V value) throws Exception {
                     onNext.accept(value);
@@ -235,7 +235,7 @@
 
         @Override
         public Stream<V> onError(final Consumer<Throwable> onError) {
-            return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
+            return new RxJavaStream<>(impl.doOnError(new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
                 @Override
                 public void accept(Throwable t) throws Exception {
                     onError.accept(t);
@@ -246,7 +246,7 @@
         @Override
         public Stream<V> onErrorResumeWith(final Function<Throwable, Publisher<V>, Exception> function) {
             return new RxJavaStream<>(
-                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, Publisher<? extends V>>() {
+                    impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, Publisher<? extends V>>() {
                         @Override
                         public Publisher<? extends V> apply(Throwable t) throws Exception {
                             return function.apply(t);
@@ -256,7 +256,7 @@
 
         @Override
         public Stream<V> onComplete(final Action action) {
-            return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
+            return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.rxjava3.functions.Action() {
                 @Override
                 public void run() throws Exception {
                     action.run();
@@ -272,9 +272,9 @@
 
     private static final class RxJavaSingle<V> implements Single<V> {
 
-        private final io.reactivex.Single<V> impl;
+        private final io.reactivex.rxjava3.core.Single<V> impl;
 
-        private RxJavaSingle(io.reactivex.Single<V> impl) {
+        private RxJavaSingle(io.reactivex.rxjava3.core.Single<V> impl) {
             this.impl = impl;
         }
 
@@ -285,7 +285,7 @@
 
         @Override
         public <O> Single<O> map(final Function<V, O, Exception> function) {
-            return new RxJavaSingle<>(impl.map(new io.reactivex.functions.Function<V, O>() {
+            return new RxJavaSingle<>(impl.map(new io.reactivex.rxjava3.functions.Function<V, O>() {
                 @Override
                 public O apply(V t) throws Exception {
                     return function.apply(t);
@@ -300,12 +300,12 @@
 
         @Override
         public void subscribe(final Consumer<? super V> resultConsumer, final Consumer<Throwable> errorConsumer) {
-            impl.subscribe(new io.reactivex.functions.Consumer<V>() {
+            impl.subscribe(new io.reactivex.rxjava3.functions.Consumer<V>() {
                 @Override
                 public void accept(V t) throws Exception {
                     resultConsumer.accept(t);
                 }
-            }, new io.reactivex.functions.Consumer<Throwable>() {
+            }, new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
                 @Override
                 public void accept(Throwable t) throws Exception {
                     errorConsumer.accept(t);
@@ -315,10 +315,10 @@
 
         @Override
         public <O> Single<O> flatMap(final Function<V, Single<O>, Exception> function) {
-            return new RxJavaSingle<>(impl.flatMap(new io.reactivex.functions.Function<V, SingleSource<O>>() {
+            return new RxJavaSingle<>(impl.flatMap(new io.reactivex.rxjava3.functions.Function<V, SingleSource<O>>() {
                 @Override
                 public SingleSource<O> apply(V t) throws Exception {
-                    return io.reactivex.Single.fromPublisher(function.apply(t));
+                    return io.reactivex.rxjava3.core.Single.fromPublisher(function.apply(t));
                 }
             }));
         }
@@ -326,10 +326,10 @@
         @Override
         public Single<V> onErrorResumeWith(final Function<Throwable, Single<V>, Exception> function) {
             return new RxJavaSingle<>(
-                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, SingleSource<V>>() {
+                    impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, SingleSource<V>>() {
                         @Override
                         public SingleSource<V> apply(Throwable error) throws Exception {
-                            return io.reactivex.Single.fromPublisher(function.apply(error));
+                            return io.reactivex.rxjava3.core.Single.fromPublisher(function.apply(error));
                         }
                     }));
         }
@@ -337,9 +337,9 @@
 
     private static final class RxJavaCompletable implements Completable {
 
-        private final io.reactivex.Completable impl;
+        private final io.reactivex.rxjava3.core.Completable impl;
 
-        RxJavaCompletable(io.reactivex.Completable impl) {
+        RxJavaCompletable(io.reactivex.rxjava3.core.Completable impl) {
             this.impl = impl;
         }
 
@@ -356,22 +356,22 @@
         @Override
         public Completable onErrorResumeWith(final Function<Throwable, Completable, Exception> function) {
             return new RxJavaCompletable(
-                    impl.onErrorResumeNext(new io.reactivex.functions.Function<Throwable, CompletableSource>() {
+                    impl.onErrorResumeNext(new io.reactivex.rxjava3.functions.Function<Throwable, CompletableSource>() {
                         @Override
                         public CompletableSource apply(Throwable error) throws Exception {
-                            return io.reactivex.Completable.fromPublisher(function.apply(error));
+                            return io.reactivex.rxjava3.core.Completable.fromPublisher(function.apply(error));
                         }
                     }));
         }
 
         @Override
         public void subscribe(final Action completeAction, final Consumer<Throwable> errorConsumer) {
-            impl.subscribe(new io.reactivex.functions.Action() {
+            impl.subscribe(new io.reactivex.rxjava3.functions.Action() {
                 @Override
                 public void run() throws Exception {
                     completeAction.run();
                 }
-            }, new io.reactivex.functions.Consumer<Throwable>() {
+            }, new io.reactivex.rxjava3.functions.Consumer<Throwable>() {
                 @Override
                 public void accept(final Throwable error) throws Exception {
                     errorConsumer.accept(error);
@@ -381,7 +381,7 @@
 
         @Override
         public Completable doAfterTerminate(final Action onTerminate) {
-            return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.functions.Action() {
+            return new RxJavaCompletable(impl.doAfterTerminate(new io.reactivex.rxjava3.functions.Action() {
                 @Override
                 public void run() throws Exception {
                     onTerminate.run();
diff --git a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
index 0cb33bf..4edd2a2 100644
--- a/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
+++ b/opendj-core/src/main/java/com/forgerock/reactive/ServerConnectionFactoryAdapter.java
@@ -59,10 +59,10 @@
 import org.forgerock.util.Function;
 import org.forgerock.util.promise.RuntimeExceptionHandler;
 
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
-import io.reactivex.FlowableEmitter;
-import io.reactivex.FlowableOnSubscribe;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.FlowableEmitter;
+import io.reactivex.rxjava3.core.FlowableOnSubscribe;
 
 /**
  * Adapt a {@link ServerConnectionFactory} to a {@link Function} compatible with
diff --git a/opendj-grizzly/pom.xml b/opendj-grizzly/pom.xml
index 7a6e175..0e1fc5f 100644
--- a/opendj-grizzly/pom.xml
+++ b/opendj-grizzly/pom.xml
@@ -51,6 +51,10 @@
 	            		<groupId>io.reactivex.rxjava2</groupId>
 	            		<artifactId>rxjava</artifactId>
 	            	</exclusion>
+	            	<exclusion>
+	            		<groupId>io.reactivex.rxjava3</groupId>
+	            		<artifactId>rxjava</artifactId>
+	            	</exclusion>
             </exclusions>
         </dependency>
 
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 0af9bc6..5dd6e09 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -85,8 +85,8 @@
 import com.forgerock.reactive.ReactiveHandler;
 import com.forgerock.reactive.Stream;
 
-import io.reactivex.exceptions.OnErrorNotImplementedException;
-import io.reactivex.internal.util.BackpressureHelper;
+import io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException;
+import io.reactivex.rxjava3.internal.util.BackpressureHelper;
 
 /**
  * Grizzly filter implementation for decoding LDAP requests and handling server side logic for SSL and SASL operations
@@ -390,7 +390,7 @@
                     return false;
                 }
                 SSLUtils.setSSLEngine(connection, sslEngine);
-                
+
                 Properties props = System.getProperties();
 
                 // Workaround for PKCS11
@@ -398,7 +398,7 @@
                 if ("none".equalsIgnoreCase(keyStoreFile)) {
                 	System.setProperty(SSLContextConfigurator.TRUST_STORE_FILE, "NONE");
                 }
-                
+
                 SSLFilter sslFilter = new SSLFilter();
                 sslFilter.setHandshakeTimeout(getLongProperty("org.forgerock.opendj.grizzly.handshakeTimeout", 10000), TimeUnit.MILLISECONDS);
                 installFilter(startTls ? new StartTLSFilter(sslFilter) : sslFilter);
@@ -661,7 +661,7 @@
                     });
                 }
             });
-            
+
         }
     }
 }
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 138e371..47d90d8 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -18,10 +18,13 @@
 
 import java.util.concurrent.CancellationException;
 
-import io.reactivex.exceptions.UndeliverableException;
+import io.reactivex.rxjava3.exceptions.UndeliverableException;
+
 import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
+
 import org.glassfish.grizzly.CompletionHandler;
 import org.glassfish.grizzly.Connection;
+
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index ab47f5f..7779dc7 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -117,10 +117,10 @@
 import com.forgerock.reactive.ReactiveHandler;
 import com.forgerock.reactive.Stream;
 
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
-import io.reactivex.FlowableEmitter;
-import io.reactivex.FlowableOnSubscribe;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.FlowableEmitter;
+import io.reactivex.rxjava3.core.FlowableOnSubscribe;
 
 /**
  * This class defines an LDAP client connection, which is a type of client connection that will be accepted by an

--
Gitblit v1.10.0