| | |
| | | */ |
| | | package org.forgerock.opendj.reactive; |
| | | |
| | | import static com.forgerock.reactive.RxJavaStreams.*; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY; |
| | | import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE; |
| | | import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher; |
| | | import static org.forgerock.opendj.io.LDAP.*; |
| | | import static org.opends.messages.CoreMessages.*; |
| | | import static org.opends.messages.ProtocolMessages.*; |
| | | import static org.opends.server.loggers.AccessLogger.logDisconnect; |
| | | import static org.opends.server.protocols.ldap.LDAPConstants.*; |
| | | import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | |
| | | |
| | | import com.forgerock.reactive.Consumer; |
| | | import com.forgerock.reactive.ReactiveHandler; |
| | | import com.forgerock.reactive.Single; |
| | | import com.forgerock.reactive.Stream; |
| | | |
| | | import io.reactivex.BackpressureStrategy; |
| | |
| | | * a fatal error and the client has been disconnected as a result, or if the client unbound from the server. |
| | | */ |
| | | @Override |
| | | public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) { |
| | | return singleFrom(streamFromPublisher( |
| | | public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) { |
| | | return streamFromPublisher( |
| | | new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() { |
| | | @Override |
| | | public void subscribe(FlowableEmitter<Response> emitter) throws Exception { |
| | |
| | | toLdapResponseType(message, response), message.getMessageId()); |
| | | } |
| | | } |
| | | })); |
| | | }); |
| | | } |
| | | |
| | | private final byte toLdapResultType(final byte requestType) { |
| | |
| | | |
| | | BlockingBackpressureSubscription(final Publisher<Response> upstream) { |
| | | this.upstream = upstream; |
| | | this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0 |
| | | ? 30000 // Do not wait indefinitely, |
| | | this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0 |
| | | ? 30000 // Do not wait indefinitely, |
| | | : connectionHandler.getMaxBlockedWriteTimeLimit(); |
| | | } |
| | | |