mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
16.16.2016 5e9caa5055a78b930500690cb919effd5c8bc2b1
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -958,27 +958,29 @@
     */
    @Override
    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
        return streamFromPublisher(new BlockingBackpressureSubscription(
                Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                        try {
                            processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                        } finally {
                            // We don't need the ASN1Reader anymore.
                            closeSilently(message.getContent());
                        }
                    }
                }, BackpressureStrategy.ERROR)))
                .onNext(new Consumer<Response>() {
                    @Override
                    public void accept(final Response response) throws Exception {
                        if (keepStats) {
                            statTracker.updateMessageWritten(
                                    toLdapResponseType(message, response), message.getMessageId());
                        }
                    }
                });
        return streamFromPublisher(
                new BlockingBackpressureSubscription(connectionHandler.getMaxBlockedWriteTimeLimit(),
                        Flowable.create(new FlowableOnSubscribe<Response>() {
                            @Override
                            public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                                try {
                                    processLDAPMessage(
                                            queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                                } finally {
                                    // We don't need the ASN1Reader anymore.
                                    closeSilently(message.getContent());
                                }
                            }
                        }, BackpressureStrategy.ERROR)))
                        .onNext(new Consumer<Response>() {
                            @Override
                            public void accept(final Response response) throws Exception {
                                if (keepStats) {
                                    statTracker.updateMessageWritten(
                                            toLdapResponseType(message, response), message.getMessageId());
                                }
                            }
                        });
    }
    private final byte toLdapResultType(final byte requestType) {
@@ -1682,14 +1684,15 @@
        private final Condition spaceAvailable = lock.newCondition();
        private final Publisher<Response> upstream;
        private final long writeTimeoutMillis;
        private boolean upstreamCompleted;
        private Subscription subscription;
        private Subscriber<? super Response> downstream;
        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
        BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
            this.upstream = upstream;
            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
            this.writeTimeoutMillis = maxBlockedWriteTimeLimit == 0
                    ? 30000 // Do not wait indefinitely,
                    : connectionHandler.getMaxBlockedWriteTimeLimit();
                    : maxBlockedWriteTimeLimit;
        }
        @Override
@@ -1733,6 +1736,12 @@
                    // Forward response
                    pendingRequests--;
                }
                if (upstreamCompleted && queue.isEmpty()) {
                    if (pendingRequests != Long.MIN_VALUE) {
                        downstream.onComplete();
                    }
                    cancel();
                }
            } finally {
                spaceAvailable.signalAll();
            }
@@ -1771,17 +1780,27 @@
        @Override
        public void onComplete() {
            downstream.onComplete();
            cancel();
            lock.lock();
            try {
                // the onComplete() will be forwarded downstream once the pending messages have been flushed
                upstreamCompleted = true;
                drain();
            } finally {
                lock.unlock();
            }
        }
        @Override
        public void cancel() {
            lock.lock();
            try {
                pendingRequests = Long.MIN_VALUE;
            } finally {
                lock.unlock();
            }
            if (subscription != null) {
                subscription.cancel();
            }
            queue.clear();
            pendingRequests = Long.MIN_VALUE;
        }
    }
}