| | |
| | | */ |
| | | @Override |
| | | public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) { |
| | | return streamFromPublisher(new BlockingBackpressureSubscription( |
| | | Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | | try { |
| | | processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter); |
| | | } finally { |
| | | // We don't need the ASN1Reader anymore. |
| | | closeSilently(message.getContent()); |
| | | } |
| | | } |
| | | }, BackpressureStrategy.ERROR))) |
| | | .onNext(new Consumer<Response>() { |
| | | @Override |
| | | public void accept(final Response response) throws Exception { |
| | | if (keepStats) { |
| | | statTracker.updateMessageWritten( |
| | | toLdapResponseType(message, response), message.getMessageId()); |
| | | } |
| | | } |
| | | }); |
| | | return streamFromPublisher( |
| | | new BlockingBackpressureSubscription(connectionHandler.getMaxBlockedWriteTimeLimit(), |
| | | Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | | try { |
| | | processLDAPMessage( |
| | | queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter); |
| | | } finally { |
| | | // We don't need the ASN1Reader anymore. |
| | | closeSilently(message.getContent()); |
| | | } |
| | | } |
| | | }, BackpressureStrategy.ERROR))) |
| | | .onNext(new Consumer<Response>() { |
| | | @Override |
| | | public void accept(final Response response) throws Exception { |
| | | if (keepStats) { |
| | | statTracker.updateMessageWritten( |
| | | toLdapResponseType(message, response), message.getMessageId()); |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | |
| | | private final byte toLdapResultType(final byte requestType) { |
| | |
| | | private final Condition spaceAvailable = lock.newCondition(); |
| | | private final Publisher<Response> upstream; |
| | | private final long writeTimeoutMillis; |
| | | private boolean upstreamCompleted; |
| | | private Subscription subscription; |
| | | private Subscriber<? super Response> downstream; |
| | | |
| | | BlockingBackpressureSubscription(final Publisher<Response> upstream) { |
| | | BlockingBackpressureSubscription(final long maxBlockedWriteTimeLimit, final Publisher<Response> upstream) { |
| | | this.upstream = upstream; |
| | | this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0 |
| | | this.writeTimeoutMillis = maxBlockedWriteTimeLimit == 0 |
| | | ? 30000 // Do not wait indefinitely, |
| | | : connectionHandler.getMaxBlockedWriteTimeLimit(); |
| | | : maxBlockedWriteTimeLimit; |
| | | } |
| | | |
| | | @Override |
| | |
| | | // Forward response |
| | | pendingRequests--; |
| | | } |
| | | if (upstreamCompleted && queue.isEmpty()) { |
| | | if (pendingRequests != Long.MIN_VALUE) { |
| | | downstream.onComplete(); |
| | | } |
| | | cancel(); |
| | | } |
| | | } finally { |
| | | spaceAvailable.signalAll(); |
| | | } |
| | |
| | | |
| | | @Override |
| | | public void onComplete() { |
| | | downstream.onComplete(); |
| | | cancel(); |
| | | lock.lock(); |
| | | try { |
| | | // the onComplete() will be forwarded downstream once the pending messages have been flushed |
| | | upstreamCompleted = true; |
| | | drain(); |
| | | } finally { |
| | | lock.unlock(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void cancel() { |
| | | lock.lock(); |
| | | try { |
| | | pendingRequests = Long.MIN_VALUE; |
| | | } finally { |
| | | lock.unlock(); |
| | | } |
| | | if (subscription != null) { |
| | | subscription.cancel(); |
| | | } |
| | | queue.clear(); |
| | | pendingRequests = Long.MIN_VALUE; |
| | | } |
| | | } |
| | | } |