| | |
| | | NextAction handleRead(final FilterChainContext ctx) { |
| | | if (pendingRequests.get() == 1) { |
| | | subscriber.onNext((LdapRequestEnvelope) ctx.getMessage()); |
| | | // This synchronized ensure that the context suspend is done atomically with the fact that |
| | | // pendingRequests is set to 0 |
| | | synchronized (this) { |
| | | if (BackpressureHelper.producedCancel(pendingRequests, 1) == 0) { |
| | | this.suspendedCtx = ctx; |
| | | // Another request() might have add some pendingRequests in the mean time |
| | | if (pendingRequests.compareAndSet(1, 0)) { |
| | | ctx.suspend(); |
| | | this.suspendedCtx = ctx; |
| | | return ctx.getSuspendAction(); |
| | | } |
| | | } |
| | | } else if (BackpressureHelper.producedCancel(pendingRequests, 1) >= 0) { |
| | | subscriber.onNext((LdapRequestEnvelope) ctx.getMessage()); |
| | | return ctx.getStopAction(); |
| | | } |
| | | if (BackpressureHelper.producedCancel(pendingRequests, 1) == Long.MIN_VALUE) { |
| | | ctx.suspend(); |
| | | return ctx.getSuspendAction(); |
| | | } |
| | | subscriber.onNext((LdapRequestEnvelope) ctx.getMessage()); |
| | | return ctx.getStopAction(); |
| | | } |
| | | |
| | | @Override |
| | | public void request(final long n) { |
| | | if (pendingRequests.get() == 0) { |
| | | synchronized (this) { |
| | | if (BackpressureHelper.addCancel(pendingRequests, n) == 0) { |
| | | // This synchronized, coupled to the previous one, ensure the atomicity by "waiting" until the |
| | | // context has been suspended |
| | | synchronized (this) { |
| | | // On startup, pendingRequests = 0 and suspendedCtx is null |
| | | if (suspendedCtx != null) { |
| | | suspendedCtx.resume(); |
| | | suspendedCtx = null; |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | BackpressureHelper.addCancel(pendingRequests, n); |
| | | } |
| | | |
| | | public void onError(final Throwable error) { |