From 2a88ad00862b2241f3b87ef8d4db383c69b54e3a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 24 Nov 2016 13:59:43 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 186 ++++++++++++++++++++++++++-------------------
1 files changed, 107 insertions(+), 79 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 70dc842..defc825 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -32,16 +32,14 @@
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -56,7 +54,7 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
+import org.forgerock.opendj.ldap.LDAPClientContextEventListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
@@ -64,6 +62,7 @@
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
+import org.forgerock.util.Reject;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.AbandonOperationBasis;
@@ -107,6 +106,7 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.util.TimeThread;
+import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -229,7 +229,7 @@
}
connectionID = DirectoryServer.newConnectionAccepted(this);
- clientContext.addConnectionEventListener(new ConnectionEventListener() {
+ clientContext.addListener(new LDAPClientContextEventListener() {
@Override
public void handleConnectionError(LDAPClientContext context, Throwable error) {
if (error instanceof LocalizableException) {
@@ -1676,17 +1676,17 @@
}
/** Upstream -> BlockingBackpressureSubscription -> Downstream. */
- private final class BlockingBackpressureSubscription
- implements Subscription, Publisher<Response>, Subscriber<Response> {
- private long pendingRequests;
- private final Queue<Response> queue = new LinkedList<>();
- private final Lock lock = new ReentrantLock();
- private final Condition spaceAvailable = lock.newCondition();
+ private final class BlockingBackpressureSubscription implements Subscription, Processor<Response, Response> {
+ private final AtomicLong pendingRequests = new AtomicLong();
+ private final AtomicInteger missedDrain = new AtomicInteger();
+ private final BlockingQueue<Response> queue = new LinkedBlockingQueue<>(32);
private final Publisher<Response> upstream;
private final long writeTimeoutMillis;
- private boolean upstreamCompleted;
private Subscription subscription;
private Subscriber<? super Response> downstream;
+ private volatile boolean done;
+ private Throwable error;
+ private volatile boolean cancelled;
BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) {
this.upstream = upstream;
@@ -1697,17 +1697,19 @@
@Override
public void subscribe(final Subscriber<? super Response> subscriber) {
+ Reject.ifNull(subscriber);
if (downstream != null) {
+ // This publisher only support one subscriber.
return;
}
downstream = subscriber;
- subscriber.onSubscribe(this);
- upstream.subscribe(this);
+ subscriber.onSubscribe(/* Subscription */ this);
+ upstream.subscribe(/* Subscriber */ this);
}
@Override
public void onSubscribe(final Subscription s) {
- if ( subscription != null) {
+ if (subscription != null) {
s.cancel();
return;
}
@@ -1716,91 +1718,117 @@
}
@Override
- public void request(long n) {
- lock.lock();
- try {
- if (pendingRequests != Long.MIN_VALUE) {
- pendingRequests += n;
- drain();
+ public void request(final long n) {
+ if (n == Long.MAX_VALUE) {
+ pendingRequests.set(Long.MAX_VALUE);
+ } else {
+ // There is a known and accepted problem here regarding reactive-stream contract in the sense that
+ // we're not supporting pendingRequests overflow (pendingRequests + n > Long.MAX_VALUE) for performance
+ // reason since this should never happen in the context we're using it.
+ pendingRequests.addAndGet(n);
+ }
+ drain();
+ }
+
+ // Taken from
+ // https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation
+ private void drain() {
+ if (missedDrain.getAndIncrement() != 0) {
+ // Another thread is already executing this drain method.
+ return;
+ }
+
+ int missed = 1;
+
+ for (;;) {
+ final long immutablePendingRequests = pendingRequests.get();
+ long emitted = 0L;
+ while (emitted != immutablePendingRequests) {
+ // Check if we should early exit because of cancellation
+ if (cancelled) {
+ return;
+ }
+
+ final Response response = queue.poll();
+ if (response != null) {
+ downstream.onNext(response);
+ emitted++;
+ } else if (done) {
+ // queue is empty and we received a completion (onError/onComplete) notification from upstream
+ forwardDoneEvent();
+ return;
+ } else {
+ // Queue is empty but upstream is not done yet.
+ break;
+ }
}
- } finally {
- lock.unlock();
+
+ // Check if an onError/onComplete from upstream arrived.
+ if (emitted == immutablePendingRequests) {
+ if (cancelled) {
+ return;
+ }
+
+ if (done && queue.isEmpty()) {
+ forwardDoneEvent();
+ return;
+ }
+ }
+
+ if (emitted != 0) {
+ pendingRequests.addAndGet(-emitted);
+ }
+
+ // Check to see if another thread asked for drain
+ missed = missedDrain.addAndGet(-missed);
+ if (missed == 0) {
+ // Nop, we can exit.
+ break;
+ }
}
}
- private void drain() {
- Response response;
- try {
- while (pendingRequests > 0 && (response = queue.poll()) != null) {
- downstream.onNext(response);
- // Forward response
- pendingRequests--;
- }
- if (upstreamCompleted && queue.isEmpty()) {
- if (pendingRequests != Long.MIN_VALUE) {
- downstream.onComplete();
- }
- cancel();
- }
- } finally {
- spaceAvailable.signalAll();
+ private void forwardDoneEvent() {
+ final Throwable immutableError = error;
+ if (immutableError != null) {
+ downstream.onError(immutableError);
+ } else {
+ downstream.onComplete();
}
}
@Override
public void onNext(final Response response) {
- lock.lock();
try {
- while (queue.size() >= 32) {
- try {
- if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
- // If we've gotten here, then the write timed out.
- downstream.onError(new ClosedChannelException());
- cancel();
- return;
- }
- } catch (InterruptedException e) {
- downstream.onError(e);
- cancel();
- return;
- }
+ if (queue.offer(response, writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
+ drain();
+ } else {
+ // If we've gotten here, then the write timed out.
+ onError(new ClosedChannelException().fillInStackTrace());
+ return;
}
- queue.add(response);
- drain();
- } finally {
- lock.unlock();
+ } catch (InterruptedException e) {
+ onError(e);
}
}
@Override
- public void onError(Throwable t) {
- downstream.onError(t);
- cancel();
+ public void onError(final Throwable error) {
+ this.error = error;
+ done = true;
+ drain();
}
@Override
public void onComplete() {
- lock.lock();
- try {
- // the onComplete() will be forwarded downstream once the pending messages have been flushed
- upstreamCompleted = true;
- drain();
- } finally {
- lock.unlock();
- }
+ done = true;
+ drain();
}
@Override
public void cancel() {
- lock.lock();
- try {
- pendingRequests = Long.MIN_VALUE;
- } finally {
- lock.unlock();
- }
- if (subscription != null) {
- subscription.cancel();
- }
+ cancelled = true;
+ subscription.cancel();
}
}
}
--
Gitblit v1.10.0