| | |
| | | */ |
| | | package org.forgerock.opendj.grizzly; |
| | | |
| | | import java.util.concurrent.CancellationException; |
| | | |
| | | import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage; |
| | | import org.glassfish.grizzly.CompletionHandler; |
| | | import org.glassfish.grizzly.Connection; |
| | | import org.glassfish.grizzly.WriteHandler; |
| | | import org.reactivestreams.Subscriber; |
| | |
| | | |
| | | import com.forgerock.reactive.Completable; |
| | | |
| | | final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage> { |
| | | final class LdapResponseMessageWriter implements Subscriber<LdapResponseMessage>, CompletionHandler { |
| | | |
| | | private final Connection<?> connection; |
| | | private final Completable.Subscriber completable; |
| | | private final Completable.Subscriber downstream; |
| | | private Subscription upstream; |
| | | |
| | | LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber completable) { |
| | | LdapResponseMessageWriter(final Connection<?> connection, final Completable.Subscriber downstream) { |
| | | this.connection = connection; |
| | | this.completable = completable; |
| | | this.downstream = downstream; |
| | | } |
| | | |
| | | @Override |
| | | public void onSubscribe(Subscription s) { |
| | | this.upstream = s; |
| | | requestMore(); |
| | | public void onSubscribe(final Subscription s) { |
| | | if (upstream != null) { |
| | | s.cancel(); |
| | | return; |
| | | } |
| | | upstream = s; |
| | | connection.notifyCanWrite(new WriteHandler() { |
| | | @Override |
| | | public void onWritePossible() throws Exception { |
| | | final Subscription sub = upstream; |
| | | if (sub != null) { |
| | | sub.request(1); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onError(final Throwable error) { |
| | | LdapResponseMessageWriter.this.onError(error); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | private void requestMore() { |
| | | if (connection.canWrite()) { |
| | | upstream.request(1); |
| | | } else { |
| | | connection.notifyCanWrite(new WriteHandler() { |
| | | @Override |
| | | public void onWritePossible() throws Exception { |
| | | upstream.request(1); |
| | | } |
| | | @Override |
| | | public void onNext(final LdapResponseMessage message) { |
| | | connection.write(message).addCompletionHandler(this); |
| | | } |
| | | |
| | | @Override |
| | | public void onError(Throwable t) { |
| | | upstream.cancel(); |
| | | completable.onError(t); |
| | | } |
| | | }); |
| | | @Override |
| | | public void completed(final Object result) { |
| | | final Subscription sub = upstream; |
| | | if (sub != null) { |
| | | sub.request(1); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onNext(LdapResponseMessage message) { |
| | | connection.write(message); |
| | | requestMore(); |
| | | public void cancelled() { |
| | | failed(new CancellationException()); |
| | | } |
| | | |
| | | @Override |
| | | public void onError(Throwable t) { |
| | | completable.onError(t); |
| | | public void failed(final Throwable error) { |
| | | onError(error); |
| | | } |
| | | |
| | | @Override |
| | | public void updated(final Object result) { |
| | | // Nothing to do |
| | | } |
| | | |
| | | @Override |
| | | public void onError(final Throwable error) { |
| | | upstream.cancel(); |
| | | upstream = null; |
| | | downstream.onError(error); |
| | | } |
| | | |
| | | @Override |
| | | public void onComplete() { |
| | | completable.onComplete(); |
| | | upstream.cancel(); |
| | | upstream = null; |
| | | downstream.onComplete(); |
| | | } |
| | | } |