From 5e424f9a2e8e3e48a898b02ed85cb6fe9cc1fd35 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Wed, 16 Nov 2016 15:35:53 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java    |    7 ---
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java             |   59 +++++++++++++++--------------
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java |    3 +
 3 files changed, 33 insertions(+), 36 deletions(-)

diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java
index be856b2..4d1a042 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java
@@ -12,7 +12,7 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS.
+ * Portions Copyright 2011-2016 ForgeRock AS.
  */
 
 package org.forgerock.opendj.grizzly;
@@ -105,6 +105,7 @@
 
         private GrizzlyLDAPConnection adaptConnection(final Connection<?> connection) {
             configureConnection(connection, logger, options);
+            connection.configureBlocking(true);
 
             final GrizzlyLDAPConnection ldapConnection =
                     new GrizzlyLDAPConnection(connection, GrizzlyLDAPConnectionFactory.this);
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 4b53681..1ec8de7 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -238,58 +238,59 @@
 
         final class GrizzlyBackpressureSubscription implements Subscription {
             private final AtomicLong pendingRequests = new AtomicLong();
-            private Subscriber<? super LdapRequestEnvelope> subscriber;
-            volatile FilterChainContext ctx;
+            private final Subscriber<? super LdapRequestEnvelope> subscriber;
+            private FilterChainContext suspendedCtx;
 
-            GrizzlyBackpressureSubscription(Subscriber<? super LdapRequestEnvelope> subscriber) {
+            GrizzlyBackpressureSubscription(final Subscriber<? super LdapRequestEnvelope> subscriber) {
                 this.subscriber = subscriber;
                 subscriber.onSubscribe(this);
             }
 
             NextAction handleRead(final FilterChainContext ctx) {
-                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
-                if (sub == null) {
-                    // Subscription cancelled. Stop reading
-                    ctx.suspend();
-                    return ctx.getSuspendAction();
+                if (pendingRequests.get() == 1) {
+                    subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
+                    synchronized (this) {
+                        if (BackpressureHelper.producedCancel(pendingRequests, 1) == 0) {
+                            this.suspendedCtx = ctx;
+                            ctx.suspend();
+                            return ctx.getSuspendAction();
+                        }
+                    }
+                } else if (BackpressureHelper.producedCancel(pendingRequests, 1) >= 0) {
+                    subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
                 }
-                sub.onNext((LdapRequestEnvelope) ctx.getMessage());
-                if (BackpressureHelper.produced(pendingRequests, 1) > 0) {
-                    return ctx.getStopAction();
-                }
-                this.ctx = ctx;
-                ctx.suspend();
-                return ctx.getSuspendAction();
+                return ctx.getStopAction();
             }
 
             @Override
-            public void request(long n) {
-                final FilterChainContext immutableRef = ctx;
-                if (BackpressureHelper.add(pendingRequests, n) == 0 && immutableRef != null) {
-                    immutableRef.resumeNext();
-                    ctx = null;
+            public void request(final long n) {
+                if (pendingRequests.get() == 0) {
+                    synchronized (this) {
+                        if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
+                            suspendedCtx.resume();
+                            suspendedCtx = null;
+                            return;
+                        }
+                    }
                 }
+                BackpressureHelper.addCancel(pendingRequests, n);
             }
 
             public void onError(final Throwable error) {
-                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
-                if (sub != null) {
-                    subscriber = null;
-                    sub.onError(error);
+                if (pendingRequests.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
+                    subscriber.onError(error);
                 }
             }
 
             public void onComplete() {
-                final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
-                if (sub != null) {
-                    subscriber = null;
-                    sub.onComplete();
+                if (pendingRequests.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
+                    subscriber.onComplete();
                 }
             }
 
             @Override
             public void cancel() {
-                subscriber = null;
+                pendingRequests.set(Long.MIN_VALUE);
             }
         }
 
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 3c8c20b..12e7ab1 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -55,10 +55,7 @@
 
     @Override
     public void completed(final Object result) {
-        final Subscription sub = upstream;
-        if (sub != null) {
-            sub.request(1);
-        }
+        upstream.request(1);
     }
 
     @Override
@@ -79,14 +76,12 @@
     @Override
     public void onError(final Throwable error) {
         upstream.cancel();
-        upstream = null;
         downstream.onError(error);
     }
 
     @Override
     public void onComplete() {
         upstream.cancel();
-        upstream = null;
         downstream.onComplete();
     }
 }

--
Gitblit v1.10.0