From 376465285de63517e1fad3d38cf671c7f2632221 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Wed, 16 Nov 2016 15:35:53 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 24 ++++++++++++++++--------
1 files changed, 16 insertions(+), 8 deletions(-)
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 1ec8de7..0fcaa00 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -249,31 +249,39 @@
NextAction handleRead(final FilterChainContext ctx) {
if (pendingRequests.get() == 1) {
subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
+ // This synchronized ensure that the context suspend is done atomically with the fact that
+ // pendingRequests is set to 0
synchronized (this) {
- if (BackpressureHelper.producedCancel(pendingRequests, 1) == 0) {
- this.suspendedCtx = ctx;
+ // Another request() might have add some pendingRequests in the mean time
+ if (pendingRequests.compareAndSet(1, 0)) {
ctx.suspend();
+ this.suspendedCtx = ctx;
return ctx.getSuspendAction();
}
}
- } else if (BackpressureHelper.producedCancel(pendingRequests, 1) >= 0) {
- subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
+ return ctx.getStopAction();
}
+ if (BackpressureHelper.producedCancel(pendingRequests, 1) == Long.MIN_VALUE) {
+ ctx.suspend();
+ return ctx.getSuspendAction();
+ }
+ subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
return ctx.getStopAction();
}
@Override
public void request(final long n) {
- if (pendingRequests.get() == 0) {
+ if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
+ // This synchronized, coupled to the previous one, ensure the atomicity by "waiting" until the
+ // context has been suspended
synchronized (this) {
- if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
+ // On startup, pendingRequests = 0 and suspendedCtx is null
+ if (suspendedCtx != null) {
suspendedCtx.resume();
suspendedCtx = null;
- return;
}
}
}
- BackpressureHelper.addCancel(pendingRequests, n);
}
public void onError(final Throwable error) {
--
Gitblit v1.10.0