opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -211,26 +211,6 @@ } @Override public void subscribe(final Consumer<V> onResult, final Consumer<Throwable> onError, final Action onComplete) { impl.subscribe(new io.reactivex.functions.Consumer<V>() { @Override public void accept(V t) throws Exception { onResult.accept(t); } }, new io.reactivex.functions.Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { onError.accept(t); } }, new io.reactivex.functions.Action() { @Override public void run() throws Exception { onComplete.run(); } }); } @Override public Stream<V> onErrorDo(final Consumer<Throwable> onError) { return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() { @Override @@ -252,6 +232,16 @@ } @Override public Stream<V> onCompleteDo(final Action action) { return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() { @Override public void run() throws Exception { action.run(); } })); } @Override public void subscribe() { impl.subscribe(); } opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -53,18 +53,6 @@ int maxConcurrency); /** * Subscribe to the data emitted by this {@link Stream}. * * @param onResult * The {@link Consumer} invoked for each data of this stream * @param onError * The {@link Consumer} invoked on error in this stream. * @param onComplete * The {@link Action} invoked once this {@link Stream} is completed. */ void subscribe(Consumer<V> onResult, Consumer<Throwable> onError, Action onComplete); /** * When an error occurs in this stream, continue the processing with the new {@link Stream} provided by the * function. * @@ -84,6 +72,15 @@ Stream<V> onErrorDo(Consumer<Throwable> onError); /** * Invokes the on complete {@link Action} when this stream is completed. * * @param onComplete * The {@link Action} to invoke on stream completion * @return a new {@link Stream} */ Stream<V> onCompleteDo(Action onComplete); /** * Subscribe to this stream and drop all data produced by it. */ void subscribe(); opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -69,6 +69,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import com.forgerock.reactive.Action; import com.forgerock.reactive.Completable; import com.forgerock.reactive.Consumer; import com.forgerock.reactive.ReactiveHandler; @@ -162,7 +163,7 @@ @Override public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception { if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) { clientContext.notifyAndClose(rawRequest); clientContext.notifyConnectionClosed(rawRequest); return singleFrom(DUMMY); } Single<Stream<Response>> response; @@ -197,7 +198,13 @@ .onErrorDo(new Consumer<Throwable>() { @Override public void accept(final Throwable error) throws Exception { clientContext.notifyAndCloseSilently(error); clientContext.notifyErrorAndCloseSilently(error); } }) .onCompleteDo(new Action() { @Override public void run() throws Exception { clientContext.notifyConnectionClosed(null); } }) .subscribe(); @@ -511,7 +518,7 @@ } } private void notifyAndClose(final LdapRawMessage unbindRequest) { private void notifyConnectionClosed(final LdapRawMessage unbindRequest) { // Close this connection context. if (isClosed.compareAndSet(false, true)) { if (unbindRequest == null) { @@ -547,7 +554,7 @@ } } private void notifyAndCloseSilently(final Throwable error) { private void notifyErrorAndCloseSilently(final Throwable error) { // Close this connection context. if (isClosed.compareAndSet(false, true)) { try { opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.forgerock.i18n.LocalizableException; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.LocalizableMessageBuilder; import org.forgerock.i18n.slf4j.LocalizedLogger; @@ -41,7 +42,9 @@ import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.DN; import org.forgerock.opendj.ldap.LDAPClientContext; import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener; import org.forgerock.opendj.ldap.ResultCode; import org.forgerock.opendj.ldap.requests.UnbindRequest; import org.forgerock.opendj.ldap.responses.CompareResult; import org.forgerock.opendj.ldap.responses.Response; import org.forgerock.opendj.ldap.responses.Responses; @@ -221,6 +224,33 @@ } connectionID = DirectoryServer.newConnectionAccepted(this); clientContext.onDisconnect(new DisconnectListener() { @Override public void exceptionOccurred(LDAPClientContext context, Throwable error) { if (error instanceof LocalizableException) { disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject()); } else { disconnect(DisconnectReason.PROTOCOL_ERROR, true, null); } } @Override public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage) { disconnect(DisconnectReason.SERVER_ERROR, false, null); } @Override public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) { disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null); } }); } /**