mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
16.16.2016 5e9caa5055a78b930500690cb919effd5c8bc2b1
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

* Fix bug where the server could drop latest messages of a response
(i.e: search entries) leaving the client side waiting forever.

* Fix bug preventing exception to be notified to the registered
ConnectionEventListener
6 files modified
223 ■■■■ changed files
opendj-core/clirr-ignored-api-changes.xml 7 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Completable.java 10 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java 15 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java 5 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 149 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 37 ●●●● patch | view | raw | blame | history
opendj-core/clirr-ignored-api-changes.xml
@@ -232,4 +232,11 @@
    <method>org.forgerock.opendj.io.LDAPWriter getWriter(org.forgerock.opendj.io.ASN1Writer)</method>
    <justification>Add support for LdapV2 encoding</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
    <differenceType>7006</differenceType>
    <method>void sendUnsolicitedNotification(org.forgerock.opendj.ldap.responses.ExtendedResult)</method>
    <to>com.forgerock.reactive.Completable</to>
    <justification>Return a completable so that operation can be chained (i.e: closing connection)</justification>
  </difference>
</differences>
opendj-core/src/main/java/com/forgerock/reactive/Completable.java
@@ -63,6 +63,16 @@
    Completable onErrorResumeWith(Function<Throwable, Completable, Exception> function);
    /**
     * Subscribe to the result of this {@link Completable}.
     *
     * @param completeAction
     *            An {@link Action} which will be invoked on successful completion
     * @param errorConsumer
     *            A {@link Consumer} which will be invoked on error
     */
    void subscribe(Action completeAction, Consumer<Throwable> errorConsumer);
    /**
     * Returns a {@link Single} which will complete with the provided value once this {@link Completable} completes.
     *
     * @param <V>
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -364,5 +364,20 @@
                        }
                    }));
        }
        @Override
        public void subscribe(final Action completeAction, final Consumer<Throwable> errorConsumer) {
            impl.subscribe(new io.reactivex.functions.Action() {
                @Override
                public void run() throws Exception {
                    completeAction.run();
                }
            }, new io.reactivex.functions.Consumer<Throwable>() {
                @Override
                public void accept(final Throwable error) throws Exception {
                    errorConsumer.accept(error);
                }
            });
        }
    }
}
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -27,6 +27,8 @@
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import com.forgerock.reactive.Completable;
/**
 * An LDAP client which has connected to a {@link ServerConnectionFactory}. An
 * LDAP client context can be used to query information about the client's
@@ -169,8 +171,9 @@
     *
     * @param notification
     *            The notification to send.
     * @return A {@link Completable} which will be completed once the notification has been sent.
     */
    void sendUnsolicitedNotification(ExtendedResult notification);
    Completable sendUnsolicitedNotification(ExtendedResult notification);
    /**
     * Installs the TLS/SSL security layer on the underlying connection. The TLS/SSL security layer will be installed
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -19,13 +19,15 @@
import static com.forgerock.reactive.RxJavaStreams.*;
import static org.forgerock.opendj.grizzly.GrizzlyUtils.configureConnection;
import static org.forgerock.opendj.io.LDAP.*;
import static org.forgerock.opendj.ldap.responses.Responses.newGenericExtendedResult;
import static org.forgerock.opendj.ldap.spi.LdapMessages.newResponseMessage;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLEngine;
@@ -46,7 +48,6 @@
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.IntermediateResponse;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
@@ -55,7 +56,12 @@
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import org.forgerock.util.promise.ExceptionHandler;
import org.forgerock.util.promise.PromiseImpl;
import org.forgerock.util.promise.ResultHandler;
import org.forgerock.util.promise.RuntimeExceptionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.EmptyCompletionHandler;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.Filter;
import org.glassfish.grizzly.filterchain.FilterChain;
@@ -70,6 +76,7 @@
import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Stream;
@@ -163,7 +170,7 @@
            @Override
            public Publisher<Void> apply(final LdapRequestEnvelope rawRequest) throws Exception {
                if (rawRequest.getMessageType() == OP_TYPE_UNBIND_REQUEST) {
                    clientContext.notifyConnectionClosed(rawRequest);
                    clientContext.notifyConnectionClosedSilently(rawRequest);
                    return emptyStream();
                }
                Stream<Response> response;
@@ -190,14 +197,14 @@
        }, maxConcurrentRequests).onErrorResumeWith(new Function<Throwable, Publisher<Void>, Exception>() {
            @Override
            public Publisher<Void> apply(Throwable error) throws Exception {
                clientContext.notifyErrorAndCloseSilently(error);
                clientContext.notifyErrorSilently(error);
                // Swallow the error to prevent the subscribe() below to report it on the console.
                return emptyStream();
            }
        }).onComplete(new Action() {
            @Override
            public void run() throws Exception {
                clientContext.notifyConnectionClosed(null);
                connection.closeSilently();
            }
        }).subscribe();
        return ctx.getStopAction();
@@ -303,8 +310,8 @@
        }
        private final Connection<?> connection;
        private final AtomicBoolean isClosed = new AtomicBoolean(false);
        private final List<ConnectionEventListener> listeners = new LinkedList<>();
        private volatile boolean isClosed;
        private final List<ConnectionEventListener> connectionEventListeners = new LinkedList<>();
        private SaslServer saslServer;
        private GrizzlyBackpressureSubscription downstream;
@@ -334,12 +341,12 @@
        @Override
        public NextAction handleClose(final FilterChainContext ctx) {
            if (isClosed.compareAndSet(false, true)) {
            isClosed = true;
                final GrizzlyBackpressureSubscription immutableRef = downstream;
                if (immutableRef != null) {
                    downstream.onComplete();
                immutableRef.onComplete();
                }
            }
            notifyConnectionClosedSilently(null);
            return ctx.getStopAction();
        }
@@ -489,50 +496,44 @@
        @Override
        public void addConnectionEventListener(ConnectionEventListener listener) {
            Reject.ifNull(listener, "listener must not be null");
            listeners.add(listener);
            synchronized (connectionEventListeners) {
                connectionEventListeners.add(listener);
            }
        }
        @Override
        public void disconnect() {
            if (isClosed.compareAndSet(false, true)) {
                try {
                    for (ConnectionEventListener listener : listeners) {
                        listener.handleConnectionDisconnected(this, null, null);
                    }
                } finally {
                    closeConnection();
                }
            }
            notifySilentlyConnectionDisconnected(null, null);
            connection.closeSilently();
        }
        private void closeConnection() {
            if (downstream != null) {
                downstream.cancel();
                downstream = null;
            }
            connection.closeSilently();
        }
        @Override
        public void disconnect(final ResultCode resultCode, final String diagnosticMessage) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                sendUnsolicitedNotification(Responses.newGenericExtendedResult(resultCode)
            notifySilentlyConnectionDisconnected(resultCode, diagnosticMessage);
            sendUnsolicitedNotification(
                    newGenericExtendedResult(resultCode)
                                                     .setOID(OID_NOTICE_OF_DISCONNECTION)
                                                     .setDiagnosticMessage(diagnosticMessage));
                try {
                    for (ConnectionEventListener listener : listeners) {
                        listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
                    }
                } finally {
                        .setDiagnosticMessage(diagnosticMessage)
            ).subscribe(new Action() {
                @Override
                public void run() throws Exception {
                    closeConnection();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(final Throwable error) throws Exception {
                    closeConnection();
            }
            });
        }
        private void notifyConnectionClosed(final LdapRequestEnvelope unbindRequest) {
        private void notifyConnectionClosedSilently(final LdapRequestEnvelope unbindRequest) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                if (unbindRequest == null) {
                    notifySilentlyConnectionClosed(null);
                } else {
@@ -550,47 +551,97 @@
                    }
                }
            }
        private void notifySilentlyConnectionDisconnected(final ResultCode resultCode, final String diagnosticMessage) {
            for (final ConnectionEventListener listener : getAndClearListeners()) {
                try {
                    listener.handleConnectionDisconnected(this, resultCode, diagnosticMessage);
                } catch (Exception e) {
                    logger.traceException(e);
                }
            }
        }
        private void notifySilentlyConnectionClosed(final UnbindRequest unbindRequest) {
            try {
                for (final ConnectionEventListener listener : listeners) {
            for (final ConnectionEventListener listener : getAndClearListeners()) {
                    try {
                        listener.handleConnectionClosed(this, unbindRequest);
                    } catch (Exception e) {
                        logger.traceException(e);
                    }
                }
            } finally {
                closeConnection();
            }
        }
        private void notifyErrorAndCloseSilently(final Throwable error) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                try {
                    for (final ConnectionEventListener listener : listeners) {
        private void notifyErrorSilently(final Throwable error) {
            for (final ConnectionEventListener listener : getAndClearListeners()) {
                        try {
                            listener.handleConnectionError(this, error);
                        } catch (Exception e) {
                            logger.traceException(e);
                        }
                    }
                } finally {
                    closeConnection();
                }
        private ArrayList<ConnectionEventListener> getAndClearListeners() {
            synchronized (connectionEventListeners) {
                final ArrayList<ConnectionEventListener> listeners = new ArrayList<>(connectionEventListeners);
                connectionEventListeners.clear();
                return listeners;
            }
        }
        @Override
        public boolean isClosed() {
            return isClosed.get();
            return isClosed;
        }
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        public Completable sendUnsolicitedNotification(final ExtendedResult notification) {
            // We use a promise so that the notification is sent even if the Completable is not subscribed.
            final PromiseImpl<Boolean, Exception> promise = PromiseImpl.create();
            connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification),
                    new EmptyCompletionHandler() {
                        @Override
                        public void cancelled() {
                            promise.handleException(new CancellationException());
        }
        @Override
        public void sendUnsolicitedNotification(final ExtendedResult notification) {
            connection.write(newResponseMessage(OP_TYPE_EXTENDED_RESPONSE, 0, notification));
                        public void failed(Throwable throwable) {
                            if (throwable instanceof Exception) {
                                promise.handleException((Exception) throwable);
                            } else {
                                promise.handleException(new Exception(throwable));
                            }
                        }
                        @Override
                        public void completed(Object result) {
                            promise.handleResult(Boolean.TRUE);
                        }
                    });
            return newCompletable(new Completable.Emitter() {
                @Override
                public void subscribe(final Completable.Subscriber s) throws Exception {
                    promise.thenOnResult(new ResultHandler<Boolean>() {
                        @Override
                        public void handleResult(Boolean result) {
                            s.onComplete();
                        }
                    }).thenOnException(new ExceptionHandler<Exception>() {
                        @Override
                        public void handleException(Exception exception) {
                            s.onError(exception);
                        }
                    }).thenOnRuntimeException(new RuntimeExceptionHandler() {
                        @Override
                        public void handleRuntimeException(RuntimeException exception) {
                            s.onError(exception);
                        }
                    });
                }
            });
        }
    }
}
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -958,12 +958,14 @@
     */
    @Override
    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
        return streamFromPublisher(new BlockingBackpressureSubscription(
        return streamFromPublisher(
                new BlockingBackpressureSubscription(connectionHandler.getMaxBlockedWriteTimeLimit(),
                Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                        try {
                            processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                                    processLDAPMessage(
                                            queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                        } finally {
                            // We don't need the ASN1Reader anymore.
                            closeSilently(message.getContent());
@@ -1682,14 +1684,15 @@
        private final Condition spaceAvailable = lock.newCondition();
        private final Publisher<Response> upstream;
        private final long writeTimeoutMillis;
        private boolean upstreamCompleted;
        private Subscription subscription;
        private Subscriber<? super Response> downstream;
        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
        BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
            this.upstream = upstream;
            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
            this.writeTimeoutMillis = maxBlockedWriteTimeLimit == 0
                    ? 30000 // Do not wait indefinitely,
                    : connectionHandler.getMaxBlockedWriteTimeLimit();
                    : maxBlockedWriteTimeLimit;
        }
        @Override
@@ -1733,6 +1736,12 @@
                    // Forward response
                    pendingRequests--;
                }
                if (upstreamCompleted && queue.isEmpty()) {
                    if (pendingRequests != Long.MIN_VALUE) {
                        downstream.onComplete();
                    }
                    cancel();
                }
            } finally {
                spaceAvailable.signalAll();
            }
@@ -1771,17 +1780,27 @@
        @Override
        public void onComplete() {
            downstream.onComplete();
            cancel();
            lock.lock();
            try {
                // the onComplete() will be forwarded downstream once the pending messages have been flushed
                upstreamCompleted = true;
                drain();
            } finally {
                lock.unlock();
            }
        }
        @Override
        public void cancel() {
            lock.lock();
            try {
                pendingRequests = Long.MIN_VALUE;
            } finally {
                lock.unlock();
            }
            if (subscription != null) {
                subscription.cancel();
            }
            queue.clear();
            pendingRequests = Long.MIN_VALUE;
        }
    }
}