From bdc5fa0dd0980eb9e077ae80644504705a24e035 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java | 83 +++++++++++++++++++++++++++--------------
1 files changed, 55 insertions(+), 28 deletions(-)
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index 007ce33..adecbcf 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -15,7 +15,10 @@
*/
package org.forgerock.opendj.grizzly;
+import java.util.concurrent.CancellationException;
+
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
+import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.WriteHandler;
import org.reactivestreams.Subscriber;
@@ -23,55 +26,79 @@
import com.forgerock.reactive.Completable;
-final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
+final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage>, CompletionHandler {
private final Connection<?> connection;
- private final Completable.Subscriber completable;
+ private final Completable.Subscriber downstream;
private Subscription upstream;
- LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
+ LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber downstream) {
this.connection = connection;
- this.completable = completable;
+ this.downstream = downstream;
}
@Override
- public void onSubscribe(Subscription s) {
- this.upstream = s;
- requestMore();
+ public void onSubscribe(final Subscription s) {
+ if (upstream != null) {
+ s.cancel();
+ return;
+ }
+ upstream = s;
+ connection.notifyCanWrite(new WriteHandler() {
+ @Override
+ public void onWritePossible() throws Exception {
+ final Subscription sub = upstream;
+ if (sub != null) {
+ sub.request(1);
+ }
+ }
+
+ @Override
+ public void onError(final Throwable error) {
+ LdapResponseMessageWriter.this.onError(error);
+ }
+ });
}
- private void requestMore() {
- if (connection.canWrite()) {
- upstream.request(1);
- } else {
- connection.notifyCanWrite(new WriteHandler() {
- @Override
- public void onWritePossible() throws Exception {
- upstream.request(1);
- }
+ @Override
+ public void onNext(final LdapResponseMessage message) {
+ connection.write(message).addCompletionHandler(this);
+ }
- @Override
- public void onError(Throwable t) {
- upstream.cancel();
- completable.onError(t);
- }
- });
+ @Override
+ public void completed(final Object result) {
+ final Subscription sub = upstream;
+ if (sub != null) {
+ sub.request(1);
}
}
@Override
- public void onNext(LdapResponseMessage message) {
- connection.write(message);
- requestMore();
+ public void cancelled() {
+ failed(new CancellationException());
}
@Override
- public void onError(Throwable t) {
- completable.onError(t);
+ public void failed(final Throwable error) {
+ onError(error);
+ }
+
+ @Override
+ public void updated(final Object result) {
+ // Nothing to do
+ }
+
+ @Override
+ public void onError(final Throwable error) {
+ upstream.cancel();
+ upstream = null;
+ downstream.onError(error);
}
@Override
public void onComplete() {
- completable.onComplete();
+ upstream.cancel();
+ upstream = null;
+ downstream.onComplete();
}
}
--
Gitblit v1.10.0