mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
03.10.2016 86ad6a08499797f9b3204896caee947abb03394f
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -17,22 +17,53 @@
package org.forgerock.opendj.reactive;
import static com.forgerock.reactive.RxJavaStreams.*;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
import static org.opends.server.protocols.ldap.LDAPConstants.*;
import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
import static org.opends.server.util.StaticUtils.getExceptionMessage;
import static org.opends.server.util.StaticUtils.*;
import java.net.InetAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
import org.forgerock.i18n.LocalizableException;
import org.forgerock.i18n.LocalizableMessage;
@@ -93,12 +124,15 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.util.TimeThread;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
@@ -502,7 +536,7 @@
        }
        // Controls are not allowed for LDAPv2 clients.
        if (ldapVersion != 2) {
        if (ldapVersion != 2 && operation.getResponseControls() != null) {
            for (Control control : operation.getResponseControls()) {
                result.addControl(Converters.from(control));
            }
@@ -529,7 +563,7 @@
    }
    private Response toResponse(SearchResultEntry searchEntry) {
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
    }
    private FlowableEmitter<Response> getOut(Operation operation) {
@@ -949,12 +983,60 @@
     */
    @Override
    public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
            @Override
            public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
            }
        }, BackpressureStrategy.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
        return singleFrom(streamFromPublisher(
                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                        processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                    }
                }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
                    @Override
                    public void accept(final Response response) throws Exception {
                        if (keepStats) {
                            statTracker.updateMessageWritten(
                                    toLdapResponseType(message, response), message.getMessageId());
                        }
                    }
                }));
    }
    private final byte toLdapResultType(final byte requestType) {
        switch (requestType) {
        case OP_TYPE_ADD_REQUEST:
            return OP_TYPE_ADD_RESPONSE;
        case OP_TYPE_BIND_REQUEST:
            return OP_TYPE_BIND_RESPONSE;
        case OP_TYPE_COMPARE_REQUEST:
            return OP_TYPE_COMPARE_RESPONSE;
        case OP_TYPE_DELETE_REQUEST:
            return OP_TYPE_DELETE_RESPONSE;
        case OP_TYPE_EXTENDED_REQUEST:
            return OP_TYPE_EXTENDED_RESPONSE;
        case OP_TYPE_MODIFY_DN_REQUEST:
            return OP_TYPE_MODIFY_DN_RESPONSE;
        case OP_TYPE_MODIFY_REQUEST:
            return OP_TYPE_MODIFY_RESPONSE;
        case OP_TYPE_SEARCH_REQUEST:
            return OP_TYPE_SEARCH_RESULT_DONE;
        default:
            throw new IllegalArgumentException("Unknown request: " + requestType);
        }
    }
    private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
        if (response instanceof Result) {
            return toLdapResultType(rawRequest.getMessageType());
        }
        if (response instanceof org.forgerock.opendj.ldap.responses.IntermediateResponse) {
            return OP_TYPE_INTERMEDIATE_RESPONSE;
        }
        if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultEntry) {
            return OP_TYPE_SEARCH_RESULT_ENTRY;
        }
        if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultReference) {
            return OP_TYPE_SEARCH_RESULT_REFERENCE;
        }
        throw new IllegalArgumentException();
    }
    private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
@@ -1116,10 +1198,20 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            for (String referral : de.getReferralURLs()) {
                result.addReferralURI(referral);
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && addOp.getResponseControls() != null) {
                for (Control control : addOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
@@ -1216,12 +1308,21 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newBindResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            for (String referral : de.getReferralURLs()) {
                result.addReferralURI(referral);
            final Result result = Responses.newBindResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && bindOp.getResponseControls() != null) {
                for (Control control : bindOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
@@ -1252,8 +1353,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newCompareResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1271,9 +1372,21 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final CompareResult result = Responses.newCompareResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            final CompareResult result = Responses.newCompareResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && compareOp.getResponseControls() != null) {
                for (Control control : compareOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
        }
@@ -1315,9 +1428,21 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && deleteOp.getResponseControls() != null) {
                for (Control control : deleteOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
@@ -1371,10 +1496,21 @@
            addOperationInProgress(queueingStrategy, extendedOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            final Result result = Responses.newGenericExtendedResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && extendedOp.getResponseControls() != null) {
                for (Control control : extendedOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
        }
@@ -1415,10 +1551,21 @@
            addOperationInProgress(queueingStrategy, modifyOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && modifyOp.getResponseControls() != null) {
                for (Control control : modifyOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
        }
@@ -1461,11 +1608,20 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            result.getReferralURIs().addAll(de.getReferralURLs());
            for (Control control : modifyDNOp.getResponseControls()) {
                result.addControl(Converters.from(control));
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && modifyDNOp.getResponseControls() != null) {
                for (Control control : modifyDNOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
@@ -1520,7 +1676,7 @@
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (searchOp.getResponseControls() != null) {
            if (ldapVersion != 2 && searchOp.getResponseControls() != null) {
                for (Control control : searchOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
@@ -1611,8 +1767,34 @@
    }
    @Override
    public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
        throw new UnsupportedOperationException();
    public boolean prepareTLS(final LocalizableMessageBuilder unavailableReason) {
        // Make sure that the connection handler allows the use of the
        // StartTLS operation.
        if (!connectionHandler.allowStartTLS()) {
            unavailableReason.append(ERR_LDAP_TLS_STARTTLS_NOT_ALLOWED.get());
            return false;
        }
        try {
            if (!clientContext.enableTLS(connectionHandler.createSSLEngine(), true)) {
                unavailableReason.append(ERR_LDAP_TLS_EXISTING_SECURITY_PROVIDER.get(SSLEngine.class.getName()));
                return false;
            }
        } catch (DirectoryException de) {
            logger.traceException(de);
            unavailableReason.append(ERR_LDAP_TLS_CANNOT_CREATE_TLS_PROVIDER.get(stackTraceToSingleLineString(de)));
            return false;
        }
        return true;
    }
    /**
     * Installs the SASL security layer on the underlying connection.
     *
     * @param saslServer
     *            The {@code SaslServer} which should be used to secure the conneciton.
     */
    public void enableSASL(final SaslServer saslServer) {
        clientContext.enableSASL(saslServer);
    }
    /**
@@ -1639,11 +1821,131 @@
     * @return The array of certificates associated with a connection.
     */
    public Certificate[] getClientCertificateChain() {
        final SSLSession sslSession = clientContext.getSSLSession();
        if (sslSession != null) {
            try {
                return sslSession.getPeerCertificates();
            } catch (SSLPeerUnverifiedException e) {
                logger.traceException(e);
            }
        }
        return new Certificate[0];
    }
    @Override
    public int getSSF() {
        return 0;
        return clientContext.getSecurityStrengthFactor();
    }
    /** Upstream -> BlockingBackpressureSubscription -> Downstream */
    private final class BlockingBackpressureSubscription
            implements Subscription, Publisher<Response>, Subscriber<Response> {
        private long pendingRequests;
        private final Queue<Response> queue = new LinkedList<>();
        private final Lock lock = new ReentrantLock();
        private final Condition spaceAvailable = lock.newCondition();
        private final Publisher<Response> upstream;
        private final long writeTimeoutMillis;
        private Subscription subscription;
        private Subscriber<? super Response> downstream;
        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
            this.upstream = upstream;
            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
                    ? 30000 // Do not wait indefinitely,
                    : connectionHandler.getMaxBlockedWriteTimeLimit();
        }
        @Override
        public void subscribe(final Subscriber<? super Response> subscriber) {
            if (downstream != null) {
                return;
            }
            downstream = subscriber;
            subscriber.onSubscribe(this);
            upstream.subscribe(this);
        }
        @Override
        public void onSubscribe(final Subscription s) {
            if ( subscription != null) {
                s.cancel();
                return;
            }
            subscription = s;
            subscription.request(Long.MAX_VALUE);
        }
        @Override
        public void request(long n) {
            lock.lock();
            try {
                if (pendingRequests != Long.MIN_VALUE) {
                    pendingRequests += n;
                    drain();
                }
            } finally {
                lock.unlock();
            }
        }
        private void drain() {
            Response response;
            try {
                while (pendingRequests > 0 && (response = queue.poll()) != null) {
                    downstream.onNext(response);
                    // Forward response
                    pendingRequests--;
                }
            } finally {
                spaceAvailable.signalAll();
            }
        }
        @Override
        public void onNext(final Response response) {
            lock.lock();
            try {
                while (queue.size() >= 32) {
                    try {
                        if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
                            // If we've gotten here, then the write timed out.
                            downstream.onError(new ClosedChannelException());
                            cancel();
                            return;
                        }
                    } catch (InterruptedException e) {
                        downstream.onError(e);
                        cancel();
                        return;
                    }
                }
                queue.add(response);
                drain();
            } finally {
                lock.unlock();
            }
        }
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
            cancel();
        }
        @Override
        public void onComplete() {
            downstream.onComplete();
            cancel();
        }
        @Override
        public void cancel() {
            if (subscription != null) {
                subscription.cancel();
            }
            queue.clear();
            pendingRequests = Long.MIN_VALUE;
        }
    }
}