| | |
| | | public Single<Stream<Response>> filter(final LDAPClientConnection2 context, |
| | | final LdapRawMessage encodedRequestMessage, |
| | | final ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> next) throws Exception { |
| | | return newSingle(new Single.OnSubscribe<Request>() { |
| | | return newSingle(new Single.Emitter<Request>() { |
| | | @Override |
| | | public void onSubscribe(final Single.Emitter<Request> emitter) throws Exception { |
| | | public void subscribe(final Single.Subscriber<Request> subscriber) throws Exception { |
| | | LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions) |
| | | .readMessage(new AbstractLDAPMessageHandler() { |
| | | @Override |
| | | public void abandonRequest(final int messageID, final AbandonRequest request) |
| | | throws DecodeException, IOException { |
| | | emitter.onSuccess(request); |
| | | subscriber.onComplete(request); |
| | | } |
| | | |
| | | @Override |
| | | public void addRequest(int messageID, AddRequest request) |
| | | throws DecodeException, IOException { |
| | | emitter.onSuccess(request); |
| | | subscriber.onComplete(request); |
| | | } |
| | | |
| | | @Override |
| | | public void bindRequest(int messageID, int version, GenericBindRequest request) |
| | | throws DecodeException, IOException { |
| | | emitter.onSuccess(request); |
| | | subscriber.onComplete(request); |
| | | } |
| | | |
| | | @Override |
| | | public void modifyDNRequest(int messageID, ModifyDNRequest request) |
| | | throws DecodeException, IOException { |
| | | emitter.onSuccess(request); |
| | | subscriber.onComplete(request); |
| | | } |
| | | |
| | | @Override |
| | | public void modifyRequest(int messageID, ModifyRequest request) |
| | | throws DecodeException, IOException { |
| | | emitter.onSuccess(request); |
| | | subscriber.onComplete(request); |
| | | } |
| | | |
| | | @Override |
| | | public void searchRequest(int messageID, SearchRequest request) |
| | | throws DecodeException, IOException { |
| | | emitter.onSuccess(request); |
| | | subscriber.onComplete(request); |
| | | } |
| | | |
| | | @Override |
| | | public void unbindRequest(int messageID, UnbindRequest request) |
| | | throws DecodeException, IOException { |
| | | emitter.onSuccess(request); |
| | | subscriber.onComplete(request); |
| | | } |
| | | }); |
| | | } |
| | | }).flatMap(new Function<Request, Publisher<Stream<Response>>, Exception>() { |
| | | }).flatMap(new Function<Request, Single<Stream<Response>>, Exception>() { |
| | | @Override |
| | | public Publisher<Stream<Response>> apply(Request t) throws Exception { |
| | | return next.handle(context, t); |
| | | public Single<Stream<Response>> apply(final Request request) throws Exception { |
| | | return next.handle(context, request); |
| | | } |
| | | }); |
| | | } |