mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
21.44.2016 b59e161bd2d26df6023efea77353c8f3f61dd37b
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

Bugfix: Register disconnection handler in LDAPClientConnection2
4 files modified
96 ■■■■■ changed files
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java 30 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Stream.java 21 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 15 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 30 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -211,26 +211,6 @@
        }
        @Override
        public void subscribe(final Consumer<V> onResult, final Consumer<Throwable> onError, final Action onComplete) {
            impl.subscribe(new io.reactivex.functions.Consumer<V>() {
                @Override
                public void accept(V t) throws Exception {
                    onResult.accept(t);
                }
            }, new io.reactivex.functions.Consumer<Throwable>() {
                @Override
                public void accept(Throwable t) throws Exception {
                    onError.accept(t);
                }
            }, new io.reactivex.functions.Action() {
                @Override
                public void run() throws Exception {
                    onComplete.run();
                }
            });
        }
        @Override
        public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
            return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
                @Override
@@ -252,6 +232,16 @@
        }
        @Override
        public Stream<V> onCompleteDo(final Action action) {
            return new RxJavaStream<>(impl.doOnComplete(new io.reactivex.functions.Action() {
                @Override
                public void run() throws Exception {
                    action.run();
                }
            }));
        }
        @Override
        public void subscribe() {
            impl.subscribe();
        }
opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -53,18 +53,6 @@
            int maxConcurrency);
    /**
     * Subscribe to the data emitted by this {@link Stream}.
     *
     * @param onResult
     *            The {@link Consumer} invoked for each data of this stream
     * @param onError
     *            The {@link Consumer} invoked on error in this stream.
     * @param onComplete
     *            The {@link Action} invoked once this {@link Stream} is completed.
     */
    void subscribe(Consumer<V> onResult, Consumer<Throwable> onError, Action onComplete);
    /**
     * When an error occurs in this stream, continue the processing with the new {@link Stream} provided by the
     * function.
     *
@@ -84,6 +72,15 @@
    Stream<V> onErrorDo(Consumer<Throwable> onError);
    /**
     * Invokes the on complete {@link Action} when this stream is completed.
     *
     * @param onComplete
     *            The {@link Action} to invoke on stream completion
     * @return a new {@link Stream}
     */
    Stream<V> onCompleteDo(Action onComplete);
    /**
     * Subscribe to this stream and drop all data produced by it.
     */
    void subscribe();
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -69,6 +69,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
@@ -162,7 +163,7 @@
                @Override
                public Publisher<Object> apply(final LdapRawMessage rawRequest) throws Exception {
                    if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
                        clientContext.notifyAndClose(rawRequest);
                        clientContext.notifyConnectionClosed(rawRequest);
                        return singleFrom(DUMMY);
                    }
                    Single<Stream<Response>> response;
@@ -197,7 +198,13 @@
            .onErrorDo(new Consumer<Throwable>() {
                @Override
                public void accept(final Throwable error) throws Exception {
                    clientContext.notifyAndCloseSilently(error);
                    clientContext.notifyErrorAndCloseSilently(error);
                }
            })
            .onCompleteDo(new Action() {
                @Override
                public void run() throws Exception {
                    clientContext.notifyConnectionClosed(null);
                }
            })
            .subscribe();
@@ -511,7 +518,7 @@
            }
        }
        private void notifyAndClose(final LdapRawMessage unbindRequest) {
        private void notifyConnectionClosed(final LdapRawMessage unbindRequest) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                if (unbindRequest == null) {
@@ -547,7 +554,7 @@
            }
        }
        private void notifyAndCloseSilently(final Throwable error) {
        private void notifyErrorAndCloseSilently(final Throwable error) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                try {
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -34,6 +34,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.LocalizableException;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -41,7 +42,9 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Responses;
@@ -221,6 +224,33 @@
    }
    connectionID = DirectoryServer.newConnectionAccepted(this);
    clientContext.onDisconnect(new DisconnectListener()
    {
      @Override
      public void exceptionOccurred(LDAPClientContext context, Throwable error)
      {
        if (error instanceof LocalizableException)
        {
          disconnect(DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
        }
        else
        {
          disconnect(DisconnectReason.PROTOCOL_ERROR, true, null);
        }
      }
      @Override
      public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage)
      {
        disconnect(DisconnectReason.SERVER_ERROR, false, null);
      }
      @Override
      public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest)
      {
        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
      }
    });
  }
  /**