mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
06.42.2016 bdc5fa0dd0980eb9e077ae80644504705a24e035
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -15,7 +15,10 @@
 */
package org.forgerock.opendj.grizzly;
import java.util.concurrent.CancellationException;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.WriteHandler;
import org.reactivestreams.Subscriber;
@@ -23,55 +26,79 @@
import com.forgerock.reactive.Completable;
final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> {
final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage>, CompletionHandler {
    private final Connection<?> connection;
    private final Completable.Subscriber completable;
    private final Completable.Subscriber downstream;
    private Subscription upstream;
    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) {
    LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber downstream) {
        this.connection = connection;
        this.completable = completable;
        this.downstream = downstream;
    }
    @Override
    public void onSubscribe(Subscription s) {
        this.upstream = s;
        requestMore();
    public void onSubscribe(final Subscription s) {
        if (upstream != null) {
            s.cancel();
            return;
        }
        upstream = s;
        connection.notifyCanWrite(new WriteHandler() {
            @Override
            public void onWritePossible() throws Exception {
                final Subscription sub = upstream;
                if (sub != null) {
                    sub.request(1);
                }
            }
            @Override
            public void onError(final Throwable error) {
                LdapResponseMessageWriter.this.onError(error);
            }
        });
    }
    private void requestMore() {
        if (connection.canWrite()) {
            upstream.request(1);
        } else {
            connection.notifyCanWrite(new WriteHandler() {
                @Override
                public void onWritePossible() throws Exception {
                    upstream.request(1);
                }
    @Override
    public void onNext(final LdapResponseMessage message) {
        connection.write(message).addCompletionHandler(this);
    }
                @Override
                public void onError(Throwable t) {
                    upstream.cancel();
                    completable.onError(t);
                }
            });
    @Override
    public void completed(final Object result) {
        final Subscription sub = upstream;
        if (sub != null) {
            sub.request(1);
        }
    }
    @Override
    public void onNext(LdapResponseMessage message) {
        connection.write(message);
        requestMore();
    public void cancelled() {
        failed(new CancellationException());
    }
    @Override
    public void onError(Throwable t) {
        completable.onError(t);
    public void failed(final Throwable error) {
        onError(error);
    }
    @Override
    public void updated(final Object result) {
        // Nothing to do
    }
    @Override
    public void onError(final Throwable error) {
        upstream.cancel();
        upstream = null;
        downstream.onError(error);
    }
    @Override
    public void onComplete() {
        completable.onComplete();
        upstream.cancel();
        upstream = null;
        downstream.onComplete();
    }
}