| | |
| | | |
| | | final class GrizzlyBackpressureSubscription implements Subscription { |
| | | private final AtomicLong pendingRequests = new AtomicLong(); |
| | | private Subscriber<? super LdapRequestEnvelope> subscriber; |
| | | volatile FilterChainContext ctx; |
| | | private final Subscriber<? super LdapRequestEnvelope> subscriber; |
| | | private FilterChainContext suspendedCtx; |
| | | |
| | | GrizzlyBackpressureSubscription(Subscriber<? super LdapRequestEnvelope> subscriber) { |
| | | GrizzlyBackpressureSubscription(final Subscriber<? super LdapRequestEnvelope> subscriber) { |
| | | this.subscriber = subscriber; |
| | | subscriber.onSubscribe(this); |
| | | } |
| | | |
| | | NextAction handleRead(final FilterChainContext ctx) { |
| | | final Subscriber<? super LdapRequestEnvelope> sub = subscriber; |
| | | if (sub == null) { |
| | | // Subscription cancelled. Stop reading |
| | | ctx.suspend(); |
| | | return ctx.getSuspendAction(); |
| | | if (pendingRequests.get() == 1) { |
| | | subscriber.onNext((LdapRequestEnvelope) ctx.getMessage()); |
| | | synchronized (this) { |
| | | if (BackpressureHelper.producedCancel(pendingRequests, 1) == 0) { |
| | | this.suspendedCtx = ctx; |
| | | ctx.suspend(); |
| | | return ctx.getSuspendAction(); |
| | | } |
| | | } |
| | | } else if (BackpressureHelper.producedCancel(pendingRequests, 1) >= 0) { |
| | | subscriber.onNext((LdapRequestEnvelope) ctx.getMessage()); |
| | | } |
| | | sub.onNext((LdapRequestEnvelope) ctx.getMessage()); |
| | | if (BackpressureHelper.produced(pendingRequests, 1) > 0) { |
| | | return ctx.getStopAction(); |
| | | } |
| | | this.ctx = ctx; |
| | | ctx.suspend(); |
| | | return ctx.getSuspendAction(); |
| | | return ctx.getStopAction(); |
| | | } |
| | | |
| | | @Override |
| | | public void request(long n) { |
| | | final FilterChainContext immutableRef = ctx; |
| | | if (BackpressureHelper.add(pendingRequests, n) == 0 && immutableRef != null) { |
| | | immutableRef.resumeNext(); |
| | | ctx = null; |
| | | public void request(final long n) { |
| | | if (pendingRequests.get() == 0) { |
| | | synchronized (this) { |
| | | if (BackpressureHelper.addCancel(pendingRequests, n) == 0) { |
| | | suspendedCtx.resume(); |
| | | suspendedCtx = null; |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | BackpressureHelper.addCancel(pendingRequests, n); |
| | | } |
| | | |
| | | public void onError(final Throwable error) { |
| | | final Subscriber<? super LdapRequestEnvelope> sub = subscriber; |
| | | if (sub != null) { |
| | | subscriber = null; |
| | | sub.onError(error); |
| | | if (pendingRequests.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { |
| | | subscriber.onError(error); |
| | | } |
| | | } |
| | | |
| | | public void onComplete() { |
| | | final Subscriber<? super LdapRequestEnvelope> sub = subscriber; |
| | | if (sub != null) { |
| | | subscriber = null; |
| | | sub.onComplete(); |
| | | if (pendingRequests.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { |
| | | subscriber.onComplete(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void cancel() { |
| | | subscriber = null; |
| | | pendingRequests.set(Long.MIN_VALUE); |
| | | } |
| | | } |
| | | |