mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
18.17.2016 2a88ad00862b2241f3b87ef8d4db383c69b54e3a
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -32,16 +32,14 @@
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -56,7 +54,7 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -64,6 +62,7 @@
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.forgerock.util.Reject;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.AbandonOperationBasis;
@@ -107,6 +106,7 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.util.TimeThread;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -229,7 +229,7 @@
        }
        connectionID = DirectoryServer.newConnectionAccepted(this);
        clientContext.addConnectionEventListener(new ConnectionEventListener() {
        clientContext.addListener(new LDAPClientContextEventListener() {
            @Override
            public void handleConnectionError(LDAPClientContext context, Throwable error) {
                if (error instanceof LocalizableException) {
@@ -1676,17 +1676,17 @@
    }
    /** Upstream -> BlockingBackpressureSubscription -> Downstream. */
    private final class BlockingBackpressureSubscription
            implements Subscription, Publisher<Response>, Subscriber<Response> {
        private long pendingRequests;
        private final Queue<Response> queue = new LinkedList<>();
        private final Lock lock = new ReentrantLock();
        private final Condition spaceAvailable = lock.newCondition();
    private final class BlockingBackpressureSubscription implements Subscription, Processor<Response, Response> {
        private final AtomicLong pendingRequests = new AtomicLong();
        private final AtomicInteger missedDrain = new AtomicInteger();
        private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(32);
        private final Publisher<Response> upstream;
        private final long writeTimeoutMillis;
        private boolean upstreamCompleted;
        private Subscription subscription;
        private Subscriber<? super Response> downstream;
        private volatile boolean done;
        private Throwable error;
        private volatile boolean cancelled;
        BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
            this.upstream = upstream;
@@ -1697,17 +1697,19 @@
        @Override
        public void subscribe(final Subscriber<? super Response> subscriber) {
            Reject.ifNull(subscriber);
            if (downstream != null) {
                // This publisher only support one subscriber.
                return;
            }
            downstream = subscriber;
            subscriber.onSubscribe(this);
            upstream.subscribe(this);
            subscriber.onSubscribe(/* Subscription */ this);
            upstream.subscribe(/* Subscriber */ this);
        }
        @Override
        public void onSubscribe(final Subscription s) {
            if ( subscription != null) {
            if (subscription != null) {
                s.cancel();
                return;
            }
@@ -1716,91 +1718,117 @@
        }
        @Override
        public void request(long n) {
            lock.lock();
            try {
                if (pendingRequests != Long.MIN_VALUE) {
                    pendingRequests += n;
                    drain();
        public void request(final long n) {
            if (n == Long.MAX_VALUE) {
                pendingRequests.set(Long.MAX_VALUE);
            } else {
                // There is a known and accepted problem here regarding reactive-stream contract in the sense that
                // we're not supporting pendingRequests overflow (pendingRequests + n > Long.MAX_VALUE) for performance
                // reason since this should never happen in the context we're using it.
                pendingRequests.addAndGet(n);
            }
            drain();
        }
        // Taken from
        // https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation
        private void drain() {
            if (missedDrain.getAndIncrement() != 0) {
                // Another thread is already executing this drain method.
                return;
            }
            int missed = 1;
            for (;;) {
                final long immutablePendingRequests = pendingRequests.get();
                long emitted = 0L;
                while (emitted != immutablePendingRequests) {
                    // Check if we should early exit because of cancellation
                    if (cancelled) {
                        return;
                    }
                    final Response response = queue.poll();
                    if (response != null) {
                        downstream.onNext(response);
                        emitted++;
                    } else if (done) {
                        // queue is empty and we received a completion (onError/onComplete) notification from upstream
                        forwardDoneEvent();
                        return;
                    } else {
                        // Queue is empty but upstream is not done yet.
                        break;
                    }
                }
            } finally {
                lock.unlock();
                // Check if an onError/onComplete from upstream arrived.
                if (emitted == immutablePendingRequests) {
                    if (cancelled) {
                        return;
                    }
                    if (done && queue.isEmpty()) {
                        forwardDoneEvent();
                        return;
                    }
                }
                if (emitted != 0) {
                    pendingRequests.addAndGet(-emitted);
                }
                // Check to see if another thread asked for drain
                missed = missedDrain.addAndGet(-missed);
                if (missed == 0) {
                    // Nop, we can exit.
                    break;
                }
            }
        }
        private void drain() {
            Response response;
            try {
                while (pendingRequests > 0 && (response = queue.poll()) != null) {
                    downstream.onNext(response);
                    // Forward response
                    pendingRequests--;
                }
                if (upstreamCompleted && queue.isEmpty()) {
                    if (pendingRequests != Long.MIN_VALUE) {
                        downstream.onComplete();
                    }
                    cancel();
                }
            } finally {
                spaceAvailable.signalAll();
        private void forwardDoneEvent() {
            final Throwable immutableError = error;
            if (immutableError != null) {
                downstream.onError(immutableError);
            } else {
                downstream.onComplete();
            }
        }
        @Override
        public void onNext(final Response response) {
            lock.lock();
            try {
                while (queue.size() >= 32) {
                    try {
                        if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
                            // If we've gotten here, then the write timed out.
                            downstream.onError(new ClosedChannelException());
                            cancel();
                            return;
                        }
                    } catch (InterruptedException e) {
                        downstream.onError(e);
                        cancel();
                        return;
                    }
                if (queue.offer(response, writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
                    drain();
                } else {
                    // If we've gotten here, then the write timed out.
                    onError(new ClosedChannelException().fillInStackTrace());
                    return;
                }
                queue.add(response);
                drain();
            } finally {
                lock.unlock();
            } catch (InterruptedException e) {
                onError(e);
            }
        }
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
            cancel();
        public void onError(final Throwable error) {
            this.error = error;
            done = true;
            drain();
        }
        @Override
        public void onComplete() {
            lock.lock();
            try {
                // the onComplete() will be forwarded downstream once the pending messages have been flushed
                upstreamCompleted = true;
                drain();
            } finally {
                lock.unlock();
            }
            done = true;
            drain();
        }
        @Override
        public void cancel() {
            lock.lock();
            try {
                pendingRequests = Long.MIN_VALUE;
            } finally {
                lock.unlock();
            }
            if (subscription != null) {
                subscription.cancel();
            }
            cancelled = true;
            subscription.cancel();
        }
    }
}