mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
20.44.2016 5511a94238385a30b5b516ee360b234ff56d7c3f
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
@@ -171,58 +171,58 @@
            public Single<Stream<Response>> filter(final LDAPClientConnection2 context,
                    final LdapRawMessage encodedRequestMessage,
                    final ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> next) throws Exception {
                return newSingle(new Single.OnSubscribe<Request>() {
                return newSingle(new Single.Emitter<Request>() {
                    @Override
                    public void onSubscribe(final Single.Emitter<Request> emitter) throws Exception {
                    public void subscribe(final Single.Subscriber<Request> subscriber) throws Exception {
                        LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions)
                                .readMessage(new AbstractLDAPMessageHandler() {
                            @Override
                            public void abandonRequest(final int messageID, final AbandonRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void addRequest(int messageID, AddRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void bindRequest(int messageID, int version, GenericBindRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void modifyDNRequest(int messageID, ModifyDNRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void modifyRequest(int messageID, ModifyRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void searchRequest(int messageID, SearchRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                            @Override
                            public void unbindRequest(int messageID, UnbindRequest request)
                                    throws DecodeException, IOException {
                                emitter.onSuccess(request);
                                subscriber.onComplete(request);
                            }
                        });
                    }
                }).flatMap(new Function<Request, Publisher<Stream<Response>>, Exception>() {
                }).flatMap(new Function<Request, Single<Stream<Response>>, Exception>() {
                    @Override
                    public Publisher<Stream<Response>> apply(Request t) throws Exception {
                        return next.handle(context, t);
                    public Single<Stream<Response>> apply(final Request request) throws Exception {
                        return next.handle(context, request);
                    }
                });
            }