| | |
| | | |
| | | import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher; |
| | | import static org.forgerock.opendj.io.LDAP.*; |
| | | import static org.forgerock.util.Utils.closeSilently; |
| | | import static org.opends.messages.CoreMessages.*; |
| | | import static org.opends.messages.ProtocolMessages.*; |
| | | import static org.opends.server.loggers.AccessLogger.logDisconnect; |
| | |
| | | Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | | processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter); |
| | | try { |
| | | processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter); |
| | | } finally { |
| | | // We don't need the ASN1Reader anymore. |
| | | closeSilently(message.getContent()); |
| | | } |
| | | } |
| | | }, BackpressureStrategy.ERROR))) |
| | | .onNext(new Consumer<Response>() { |