mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
03.09.2016 bb8d8ab8ac1bc14b26abf45c8fda5cf571c1c9bb
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -16,30 +16,11 @@
 */
package org.forgerock.opendj.reactive;
import static com.forgerock.reactive.RxJavaStreams.*;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE;
import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
import static org.forgerock.opendj.io.LDAP.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
import static org.opends.server.protocols.ldap.LDAPConstants.*;
import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
import static org.opends.server.util.StaticUtils.*;
@@ -130,7 +111,6 @@
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureStrategy;
@@ -982,8 +962,8 @@
     *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
     */
    @Override
    public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return singleFrom(streamFromPublisher(
    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return streamFromPublisher(
                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
@@ -997,7 +977,7 @@
                                    toLdapResponseType(message, response), message.getMessageId());
                        }
                    }
                }));
                });
    }
    private final byte toLdapResultType(final byte requestType) {
@@ -1851,8 +1831,8 @@
        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
            this.upstream = upstream;
            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
                    ? 30000 // Do not wait indefinitely,
            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
                    ? 30000 // Do not wait indefinitely,
                    : connectionHandler.getMaxBlockedWriteTimeLimit();
        }