From 376465285de63517e1fad3d38cf671c7f2632221 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Wed, 16 Nov 2016 15:35:53 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java |   24 ++++++++++++++++--------
 1 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 1ec8de7..0fcaa00 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -249,31 +249,39 @@
             NextAction handleRead(final FilterChainContext ctx) {
                 if (pendingRequests.get() == 1) {
                     subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
+                    // This synchronized ensure that the context suspend is done atomically with the fact that
+                    // pendingRequests is set to 0
                     synchronized (this) {
-                        if (BackpressureHelper.producedCancel(pendingRequests, 1) == 0) {
-                            this.suspendedCtx = ctx;
+                        // Another request() might have add some pendingRequests in the mean time
+                        if (pendingRequests.compareAndSet(1, 0)) {
                             ctx.suspend();
+                            this.suspendedCtx = ctx;
                             return ctx.getSuspendAction();
                         }
                     }
-                } else if (BackpressureHelper.producedCancel(pendingRequests, 1) >= 0) {
-                    subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
+                    return ctx.getStopAction();
                 }
+                if (BackpressureHelper.producedCancel(pendingRequests, 1) == Long.MIN_VALUE) {
+                    ctx.suspend();
+                    return ctx.getSuspendAction();
+                }
+                subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
                 return ctx.getStopAction();
             }
 
             @Override
             public void request(final long n) {
-                if (pendingRequests.get() == 0) {
+                if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
+                    // This synchronized, coupled to the previous one, ensure the atomicity by "waiting" until the
+                    // context has been suspended
                     synchronized (this) {
-                        if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
+                        // On startup, pendingRequests = 0 and suspendedCtx is null
+                        if (suspendedCtx != null) {
                             suspendedCtx.resume();
                             suspendedCtx = null;
-                            return;
                         }
                     }
                 }
-                BackpressureHelper.addCancel(pendingRequests, n);
             }
 
             public void onError(final Throwable error) {

--
Gitblit v1.10.0