mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
06.42.2016 bdc5fa0dd0980eb9e077ae80644504705a24e035
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -54,14 +54,14 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.CompareResult;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.core.AbandonOperationBasis;
@@ -123,7 +123,7 @@
 * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
 */
public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
        ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
        ReactiveHandler<QueueingStrategy, LdapRequestEnvelope, Stream<Response>> {
    private static final String REACTIVE_OUT = "reactive.out";
    /** The tracer object for the debug logger. */
@@ -230,9 +230,9 @@
        }
        connectionID = DirectoryServer.newConnectionAccepted(this);
        clientContext.onDisconnect(new DisconnectListener() {
        clientContext.addConnectionEventListener(new ConnectionEventListener() {
            @Override
            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
            public void handleConnectionError(LDAPClientContext context, Throwable error) {
                if (error instanceof LocalizableException) {
                    disconnect(
                            DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
@@ -242,13 +242,13 @@
            }
            @Override
            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
            public void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode,
                    String diagnosticMessage) {
                disconnect(DisconnectReason.SERVER_ERROR, false, null);
            }
            @Override
            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
            public void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
                disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
            }
        });
@@ -372,7 +372,7 @@
     */
    @Override
    public boolean isSecure() {
        return false;
        return clientContext.getSSLSession() != null || clientContext.getSASLServer() != null;
    }
    /**
@@ -405,7 +405,7 @@
        // an error result to the client indicating that a problem occurred.
        if (removeOperationInProgress(operation.getMessageID())) {
            final Response response = operationToResponse(operation);
            final FlowableEmitter<Response> out = getOut(operation);
            final FlowableEmitter<Response> out = getAttachedEmitter(operation);
            if (response != null) {
                out.onNext(response);
            }
@@ -534,22 +534,18 @@
     *            The search result entry to be sent to the client
     */
    @Override
    public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
        getEmitter(searchOperation).onNext(toResponse(searchEntry));
    public void sendSearchEntry(final SearchOperation searchOperation, final SearchResultEntry searchEntry) {
        getAttachedEmitter(searchOperation).onNext(toResponse(searchEntry));
    }
    private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
        return getOut(searchOperation);
    }
    private Response toResponse(SearchResultEntry searchEntry) {
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
    }
    private FlowableEmitter<Response> getOut(Operation operation) {
    private FlowableEmitter<Response> getAttachedEmitter(final Operation operation) {
        return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
    }
    private Response toResponse(final SearchResultEntry searchEntry) {
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
    }
    /**
     * Sends the provided search result reference to the client.
     *
@@ -572,7 +568,7 @@
            return false;
        }
        final FlowableEmitter<Response> out = getOut(searchOperation);
        final FlowableEmitter<Response> out = getAttachedEmitter(searchOperation);
        out.onNext(Converters.from(searchReference));
        return true;
@@ -589,7 +585,7 @@
    @Override
    protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
        final Operation operation = intermediateResponse.getOperation();
        final FlowableEmitter<Response> out = getOut(operation);
        final FlowableEmitter<Response> emitter = getAttachedEmitter(operation);
        final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
                intermediateResponse.getValue());
@@ -597,7 +593,7 @@
            response.addControl(Converters.from(control));
        }
        out.onNext(response);
        emitter.onNext(response);
        // The only reason we shouldn't continue processing is if the
        // connection is closed.
@@ -962,14 +958,15 @@
     *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
     */
    @Override
    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return streamFromPublisher(
                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
        return streamFromPublisher(new BlockingBackpressureSubscription(
                Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                        processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                    }
                }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
                }, BackpressureStrategy.ERROR)))
                .onNext(new Consumer<Response>() {
                    @Override
                    public void accept(final Response response) throws Exception {
                        if (keepStats) {
@@ -1003,7 +1000,7 @@
        }
    }
    private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
    private final byte toLdapResponseType(final LdapRequestEnvelope rawRequest, final Response response) {
        if (response instanceof Result) {
            return toLdapResultType(rawRequest.getMessageType());
        }
@@ -1160,8 +1157,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1229,17 +1226,17 @@
            versionString = "2";
            if (!connectionHandler.allowLDAPv2()) {
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                        ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
                                    .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                out.onComplete();
                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
                return false;
            }
            if (!controls.isEmpty()) {
                // LDAPv2 clients aren't allowed to send controls.
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                        ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
                                    .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                out.onComplete();
                disconnectControlsNotAllowed();
                return false;
@@ -1390,8 +1387,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1514,8 +1511,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1569,8 +1566,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1626,8 +1623,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;