From 5e9caa5055a78b930500690cb919effd5c8bc2b1 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Fri, 18 Nov 2016 10:31:28 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 75 +++++++++++++++++++++++--------------
1 files changed, 47 insertions(+), 28 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index bab468d..a753eee 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -958,27 +958,29 @@
*/
@Override
public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
- return streamFromPublisher(new BlockingBackpressureSubscription(
- Flowable.create(new FlowableOnSubscribe<Response>() {
- @Override
- public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
- try {
- processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
- } finally {
- // We don't need the ASN1Reader anymore.
- closeSilently(message.getContent());
- }
- }
- }, BackpressureStrategy.ERROR)))
- .onNext(new Consumer<Response>() {
- @Override
- public void accept(final Response response) throws Exception {
- if (keepStats) {
- statTracker.updateMessageWritten(
- toLdapResponseType(message, response), message.getMessageId());
- }
- }
- });
+ return streamFromPublisher(
+ new BlockingBackpressureSubscription(connectionHandler.getMaxBlockedWriteTimeLimit(),
+ Flowable.create(new FlowableOnSubscribe<Response>() {
+ @Override
+ public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
+ try {
+ processLDAPMessage(
+ queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+ } finally {
+ // We don't need the ASN1Reader anymore.
+ closeSilently(message.getContent());
+ }
+ }
+ }, BackpressureStrategy.ERROR)))
+ .onNext(new Consumer<Response>() {
+ @Override
+ public void accept(final Response response) throws Exception {
+ if (keepStats) {
+ statTracker.updateMessageWritten(
+ toLdapResponseType(message, response), message.getMessageId());
+ }
+ }
+ });
}
private final byte toLdapResultType(final byte requestType) {
@@ -1682,14 +1684,15 @@
private final Condition spaceAvailable = lock.newCondition();
private final Publisher<Response> upstream;
private final long writeTimeoutMillis;
+ private boolean upstreamCompleted;
private Subscription subscription;
private Subscriber<? super Response> downstream;
- BlockingBackpressureSubscription(final Publisher<Response> upstream) {
+ BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
this.upstream = upstream;
- this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
+ this.writeTimeoutMillis = maxBlockedWriteTimeLimit == 0
? 30000 // Do not wait indefinitely,
- : connectionHandler.getMaxBlockedWriteTimeLimit();
+ : maxBlockedWriteTimeLimit;
}
@Override
@@ -1733,6 +1736,12 @@
// Forward response
pendingRequests--;
}
+ if (upstreamCompleted && queue.isEmpty()) {
+ if (pendingRequests != Long.MIN_VALUE) {
+ downstream.onComplete();
+ }
+ cancel();
+ }
} finally {
spaceAvailable.signalAll();
}
@@ -1771,17 +1780,27 @@
@Override
public void onComplete() {
- downstream.onComplete();
- cancel();
+ lock.lock();
+ try {
+ // the onComplete() will be forwarded downstream once the pending messages have been flushed
+ upstreamCompleted = true;
+ drain();
+ } finally {
+ lock.unlock();
+ }
}
@Override
public void cancel() {
+ lock.lock();
+ try {
+ pendingRequests = Long.MIN_VALUE;
+ } finally {
+ lock.unlock();
+ }
if (subscription != null) {
subscription.cancel();
}
- queue.clear();
- pendingRequests = Long.MIN_VALUE;
}
}
}
--
Gitblit v1.10.0