From 5511a94238385a30b5b516ee360b234ff56d7c3f Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java | 24 ++++++++++++------------
1 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
index 5bea25b..14db1c6 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/Components.java
@@ -171,58 +171,58 @@
public Single<Stream<Response>> filter(final LDAPClientConnection2 context,
final LdapRawMessage encodedRequestMessage,
final ReactiveHandler<LDAPClientConnection2, Request, Stream<Response>> next) throws Exception {
- return newSingle(new Single.OnSubscribe<Request>() {
+ return newSingle(new Single.Emitter<Request>() {
@Override
- public void onSubscribe(final Single.Emitter<Request> emitter) throws Exception {
+ public void subscribe(final Single.Subscriber<Request> subscriber) throws Exception {
LDAP.getReader(encodedRequestMessage.getContent(), decodeOptions)
.readMessage(new AbstractLDAPMessageHandler() {
@Override
public void abandonRequest(final int messageID, final AbandonRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void addRequest(int messageID, AddRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void bindRequest(int messageID, int version, GenericBindRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void modifyDNRequest(int messageID, ModifyDNRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void modifyRequest(int messageID, ModifyRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void searchRequest(int messageID, SearchRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
@Override
public void unbindRequest(int messageID, UnbindRequest request)
throws DecodeException, IOException {
- emitter.onSuccess(request);
+ subscriber.onComplete(request);
}
});
}
- }).flatMap(new Function<Request, Publisher<Stream<Response>>, Exception>() {
+ }).flatMap(new Function<Request, Single<Stream<Response>>, Exception>() {
@Override
- public Publisher<Stream<Response>> apply(Request t) throws Exception {
- return next.handle(context, t);
+ public Single<Stream<Response>> apply(final Request request) throws Exception {
+ return next.handle(context, request);
}
});
}
--
Gitblit v1.10.0