From 5e424f9a2e8e3e48a898b02ed85cb6fe9cc1fd35 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Wed, 16 Nov 2016 15:35:53 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java | 7 ---
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 59 +++++++++++++++--------------
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java | 3 +
3 files changed, 33 insertions(+), 36 deletions(-)
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java
index be856b2..4d1a042 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactory.java
@@ -12,7 +12,7 @@
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS.
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.forgerock.opendj.grizzly;
@@ -105,6 +105,7 @@
private GrizzlyLDAPConnection adaptConnection(final Connection<?> connection) {
configureConnection(connection, logger, options);
+ connection.configureBlocking(true);
final GrizzlyLDAPConnection ldapConnection =
new GrizzlyLDAPConnection(connection, GrizzlyLDAPConnectionFactory.this);
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 4b53681..1ec8de7 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -238,58 +238,59 @@
final class GrizzlyBackpressureSubscription implements Subscription {
private final AtomicLong pendingRequests = new AtomicLong();
- private Subscriber<? super LdapRequestEnvelope> subscriber;
- volatile FilterChainContext ctx;
+ private final Subscriber<? super LdapRequestEnvelope> subscriber;
+ private FilterChainContext suspendedCtx;
- GrizzlyBackpressureSubscription(Subscriber<? super LdapRequestEnvelope> subscriber) {
+ GrizzlyBackpressureSubscription(final Subscriber<? super LdapRequestEnvelope> subscriber) {
this.subscriber = subscriber;
subscriber.onSubscribe(this);
}
NextAction handleRead(final FilterChainContext ctx) {
- final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
- if (sub == null) {
- // Subscription cancelled. Stop reading
- ctx.suspend();
- return ctx.getSuspendAction();
+ if (pendingRequests.get() == 1) {
+ subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
+ synchronized (this) {
+ if (BackpressureHelper.producedCancel(pendingRequests, 1) == 0) {
+ this.suspendedCtx = ctx;
+ ctx.suspend();
+ return ctx.getSuspendAction();
+ }
+ }
+ } else if (BackpressureHelper.producedCancel(pendingRequests, 1) >= 0) {
+ subscriber.onNext((LdapRequestEnvelope) ctx.getMessage());
}
- sub.onNext((LdapRequestEnvelope) ctx.getMessage());
- if (BackpressureHelper.produced(pendingRequests, 1) > 0) {
- return ctx.getStopAction();
- }
- this.ctx = ctx;
- ctx.suspend();
- return ctx.getSuspendAction();
+ return ctx.getStopAction();
}
@Override
- public void request(long n) {
- final FilterChainContext immutableRef = ctx;
- if (BackpressureHelper.add(pendingRequests, n) == 0 && immutableRef != null) {
- immutableRef.resumeNext();
- ctx = null;
+ public void request(final long n) {
+ if (pendingRequests.get() == 0) {
+ synchronized (this) {
+ if (BackpressureHelper.addCancel(pendingRequests, n) == 0) {
+ suspendedCtx.resume();
+ suspendedCtx = null;
+ return;
+ }
+ }
}
+ BackpressureHelper.addCancel(pendingRequests, n);
}
public void onError(final Throwable error) {
- final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
- if (sub != null) {
- subscriber = null;
- sub.onError(error);
+ if (pendingRequests.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
+ subscriber.onError(error);
}
}
public void onComplete() {
- final Subscriber<? super LdapRequestEnvelope> sub = subscriber;
- if (sub != null) {
- subscriber = null;
- sub.onComplete();
+ if (pendingRequests.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
+ subscriber.onComplete();
}
}
@Override
public void cancel() {
- subscriber = null;
+ pendingRequests.set(Long.MIN_VALUE);
}
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 3c8c20b..12e7ab1 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -55,10 +55,7 @@
@Override
public void completed(final Object result) {
- final Subscription sub = upstream;
- if (sub != null) {
- sub.request(1);
- }
+ upstream.request(1);
}
@Override
@@ -79,14 +76,12 @@
@Override
public void onError(final Throwable error) {
upstream.cancel();
- upstream = null;
downstream.onError(error);
}
@Override
public void onComplete() {
upstream.cancel();
- upstream = null;
downstream.onComplete();
}
}
--
Gitblit v1.10.0