From 86ad6a08499797f9b3204896caee947abb03394f Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java |  390 +++++++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 346 insertions(+), 44 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 593a92b..efe9929 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -17,22 +17,53 @@
 package org.forgerock.opendj.reactive;
 
 import static com.forgerock.reactive.RxJavaStreams.*;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY;
+import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE;
 import static org.opends.messages.CoreMessages.*;
 import static org.opends.messages.ProtocolMessages.*;
 import static org.opends.server.loggers.AccessLogger.logDisconnect;
 import static org.opends.server.protocols.ldap.LDAPConstants.*;
 import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
-import static org.opends.server.util.StaticUtils.getExceptionMessage;
+import static org.opends.server.util.StaticUtils.*;
 
 import java.net.InetAddress;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.security.cert.Certificate;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.sasl.SaslServer;
 
 import org.forgerock.i18n.LocalizableException;
 import org.forgerock.i18n.LocalizableMessage;
@@ -93,12 +124,15 @@
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.types.SearchResultReference;
 import org.opends.server.util.TimeThread;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
 
+import com.forgerock.reactive.Consumer;
 import com.forgerock.reactive.ReactiveHandler;
 import com.forgerock.reactive.Single;
 import com.forgerock.reactive.Stream;
 
-import io.reactivex.BackpressureOverflowStrategy;
 import io.reactivex.BackpressureStrategy;
 import io.reactivex.Flowable;
 import io.reactivex.FlowableEmitter;
@@ -502,7 +536,7 @@
         }
 
         // Controls are not allowed for LDAPv2 clients.
-        if (ldapVersion != 2) {
+        if (ldapVersion != 2 && operation.getResponseControls() != null) {
             for (Control control : operation.getResponseControls()) {
                 result.addControl(Converters.from(control));
             }
@@ -529,7 +563,7 @@
     }
 
     private Response toResponse(SearchResultEntry searchEntry) {
-        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
+        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
     }
 
     private FlowableEmitter<Response> getOut(Operation operation) {
@@ -949,12 +983,60 @@
      */
     @Override
     public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
-        return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
-            @Override
-            public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
-                processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
-            }
-        }, BackpressureStrategy.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
+        return singleFrom(streamFromPublisher(
+                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
+                    @Override
+                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
+                        processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+                    }
+                }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
+                    @Override
+                    public void accept(final Response response) throws Exception {
+                        if (keepStats) {
+                            statTracker.updateMessageWritten(
+                                    toLdapResponseType(message, response), message.getMessageId());
+                        }
+                    }
+                }));
+    }
+
+    private final byte toLdapResultType(final byte requestType) {
+        switch (requestType) {
+        case OP_TYPE_ADD_REQUEST:
+            return OP_TYPE_ADD_RESPONSE;
+        case OP_TYPE_BIND_REQUEST:
+            return OP_TYPE_BIND_RESPONSE;
+        case OP_TYPE_COMPARE_REQUEST:
+            return OP_TYPE_COMPARE_RESPONSE;
+        case OP_TYPE_DELETE_REQUEST:
+            return OP_TYPE_DELETE_RESPONSE;
+        case OP_TYPE_EXTENDED_REQUEST:
+            return OP_TYPE_EXTENDED_RESPONSE;
+        case OP_TYPE_MODIFY_DN_REQUEST:
+            return OP_TYPE_MODIFY_DN_RESPONSE;
+        case OP_TYPE_MODIFY_REQUEST:
+            return OP_TYPE_MODIFY_RESPONSE;
+        case OP_TYPE_SEARCH_REQUEST:
+            return OP_TYPE_SEARCH_RESULT_DONE;
+        default:
+            throw new IllegalArgumentException("Unknown request: " + requestType);
+        }
+    }
+
+    private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
+        if (response instanceof Result) {
+            return toLdapResultType(rawRequest.getMessageType());
+        }
+        if (response instanceof org.forgerock.opendj.ldap.responses.IntermediateResponse) {
+            return OP_TYPE_INTERMEDIATE_RESPONSE;
+        }
+        if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultEntry) {
+            return OP_TYPE_SEARCH_RESULT_ENTRY;
+        }
+        if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultReference) {
+            return OP_TYPE_SEARCH_RESULT_REFERENCE;
+        }
+        throw new IllegalArgumentException();
     }
 
     private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
@@ -1116,10 +1198,20 @@
         } catch (DirectoryException de) {
             logger.traceException(de);
 
-            final Result result = Responses.newResult(de.getResultCode())
-                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
-            for (String referral : de.getReferralURLs()) {
-                result.addReferralURI(referral);
+            final Result result = Responses.newResult(de.getResultCode());
+            if (de.getLocalizedMessage() != null) {
+                result.setDiagnosticMessage(de.getLocalizedMessage());
+            }
+            if (de.getMatchedDN() != null) {
+                result.setMatchedDN(de.getMatchedDN().toString());
+            }
+            if (de.getReferralURLs() != null) {
+                result.getReferralURIs().addAll(de.getReferralURLs());
+            }
+            if (ldapVersion != 2 && addOp.getResponseControls() != null) {
+                for (Control control : addOp.getResponseControls()) {
+                    result.addControl(Converters.from(control));
+                }
             }
 
             out.onNext(result);
@@ -1216,12 +1308,21 @@
         } catch (DirectoryException de) {
             logger.traceException(de);
 
-            final Result result = Responses.newBindResult(de.getResultCode())
-                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
-            for (String referral : de.getReferralURLs()) {
-                result.addReferralURI(referral);
+            final Result result = Responses.newBindResult(de.getResultCode());
+            if (de.getLocalizedMessage() != null) {
+                result.setDiagnosticMessage(de.getLocalizedMessage());
             }
-
+            if (de.getMatchedDN() != null) {
+                result.setMatchedDN(de.getMatchedDN().toString());
+            }
+            if (de.getReferralURLs() != null) {
+                result.getReferralURIs().addAll(de.getReferralURLs());
+            }
+            if (ldapVersion != 2 && bindOp.getResponseControls() != null) {
+                for (Control control : bindOp.getResponseControls()) {
+                    result.addControl(Converters.from(control));
+                }
+            }
             out.onNext(result);
             out.onComplete();
 
@@ -1252,8 +1353,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newCompareResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1271,9 +1372,21 @@
         } catch (DirectoryException de) {
             logger.traceException(de);
 
-            final CompareResult result = Responses.newCompareResult(de.getResultCode())
-                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
-            result.getReferralURIs().addAll(de.getReferralURLs());
+            final CompareResult result = Responses.newCompareResult(de.getResultCode());
+            if (de.getLocalizedMessage() != null) {
+                result.setDiagnosticMessage(de.getLocalizedMessage());
+            }
+            if (de.getMatchedDN() != null) {
+                result.setMatchedDN(de.getMatchedDN().toString());
+            }
+            if (de.getReferralURLs() != null) {
+                result.getReferralURIs().addAll(de.getReferralURLs());
+            }
+            if (ldapVersion != 2 && compareOp.getResponseControls() != null) {
+                for (Control control : compareOp.getResponseControls()) {
+                    result.addControl(Converters.from(control));
+                }
+            }
             out.onNext(result);
             out.onComplete();
         }
@@ -1315,9 +1428,21 @@
         } catch (DirectoryException de) {
             logger.traceException(de);
 
-            final Result result = Responses.newResult(de.getResultCode())
-                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
-            result.getReferralURIs().addAll(de.getReferralURLs());
+            final Result result = Responses.newResult(de.getResultCode());
+            if (de.getLocalizedMessage() != null) {
+                result.setDiagnosticMessage(de.getLocalizedMessage());
+            }
+            if (de.getMatchedDN() != null) {
+                result.setMatchedDN(de.getMatchedDN().toString());
+            }
+            if (de.getReferralURLs() != null) {
+                result.getReferralURIs().addAll(de.getReferralURLs());
+            }
+            if (ldapVersion != 2 && deleteOp.getResponseControls() != null) {
+                for (Control control : deleteOp.getResponseControls()) {
+                    result.addControl(Converters.from(control));
+                }
+            }
 
             out.onNext(result);
             out.onComplete();
@@ -1371,10 +1496,21 @@
             addOperationInProgress(queueingStrategy, extendedOp);
         } catch (DirectoryException de) {
             logger.traceException(de);
-            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
-                    .setMatchedDN(de.getMatchedDN().toString());
-            result.getReferralURIs().addAll(de.getReferralURLs());
-
+            final Result result = Responses.newGenericExtendedResult(de.getResultCode());
+            if (de.getLocalizedMessage() != null) {
+                result.setDiagnosticMessage(de.getLocalizedMessage());
+            }
+            if (de.getMatchedDN() != null) {
+                result.setMatchedDN(de.getMatchedDN().toString());
+            }
+            if (de.getReferralURLs() != null) {
+                result.getReferralURIs().addAll(de.getReferralURLs());
+            }
+            if (ldapVersion != 2 && extendedOp.getResponseControls() != null) {
+                for (Control control : extendedOp.getResponseControls()) {
+                    result.addControl(Converters.from(control));
+                }
+            }
             out.onNext(result);
             out.onComplete();
         }
@@ -1415,10 +1551,21 @@
             addOperationInProgress(queueingStrategy, modifyOp);
         } catch (DirectoryException de) {
             logger.traceException(de);
-            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
-                    .setMatchedDN(de.getMatchedDN().toString());
-            result.getReferralURIs().addAll(de.getReferralURLs());
-
+            final Result result = Responses.newResult(de.getResultCode());
+            if (de.getLocalizedMessage() != null) {
+                result.setDiagnosticMessage(de.getLocalizedMessage());
+            }
+            if (de.getMatchedDN() != null) {
+                result.setMatchedDN(de.getMatchedDN().toString());
+            }
+            if (de.getReferralURLs() != null) {
+                result.getReferralURIs().addAll(de.getReferralURLs());
+            }
+            if (ldapVersion != 2 && modifyOp.getResponseControls() != null) {
+                for (Control control : modifyOp.getResponseControls()) {
+                    result.addControl(Converters.from(control));
+                }
+            }
             out.onNext(result);
             out.onComplete();
         }
@@ -1461,11 +1608,20 @@
         } catch (DirectoryException de) {
             logger.traceException(de);
 
-            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
-                    .setMatchedDN(de.getMatchedDN().toString());
-            result.getReferralURIs().addAll(de.getReferralURLs());
-            for (Control control : modifyDNOp.getResponseControls()) {
-                result.addControl(Converters.from(control));
+            final Result result = Responses.newResult(de.getResultCode());
+            if (de.getLocalizedMessage() != null) {
+                result.setDiagnosticMessage(de.getLocalizedMessage());
+            }
+            if (de.getMatchedDN() != null) {
+                result.setMatchedDN(de.getMatchedDN().toString());
+            }
+            if (de.getReferralURLs() != null) {
+                result.getReferralURIs().addAll(de.getReferralURLs());
+            }
+            if (ldapVersion != 2 && modifyDNOp.getResponseControls() != null) {
+                for (Control control : modifyDNOp.getResponseControls()) {
+                    result.addControl(Converters.from(control));
+                }
             }
             out.onNext(result);
             out.onComplete();
@@ -1520,7 +1676,7 @@
             if (de.getReferralURLs() != null) {
                 result.getReferralURIs().addAll(de.getReferralURLs());
             }
-            if (searchOp.getResponseControls() != null) {
+            if (ldapVersion != 2 && searchOp.getResponseControls() != null) {
                 for (Control control : searchOp.getResponseControls()) {
                     result.addControl(Converters.from(control));
                 }
@@ -1611,8 +1767,34 @@
     }
 
     @Override
-    public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
-        throw new UnsupportedOperationException();
+    public boolean prepareTLS(final LocalizableMessageBuilder unavailableReason) {
+        // Make sure that the connection handler allows the use of the
+        // StartTLS operation.
+        if (!connectionHandler.allowStartTLS()) {
+            unavailableReason.append(ERR_LDAP_TLS_STARTTLS_NOT_ALLOWED.get());
+            return false;
+        }
+        try {
+            if (!clientContext.enableTLS(connectionHandler.createSSLEngine(), true)) {
+                unavailableReason.append(ERR_LDAP_TLS_EXISTING_SECURITY_PROVIDER.get(SSLEngine.class.getName()));
+                return false;
+            }
+        } catch (DirectoryException de) {
+            logger.traceException(de);
+            unavailableReason.append(ERR_LDAP_TLS_CANNOT_CREATE_TLS_PROVIDER.get(stackTraceToSingleLineString(de)));
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Installs the SASL security layer on the underlying connection.
+     *
+     * @param saslServer
+     *            The {@code SaslServer} which should be used to secure the conneciton.
+     */
+    public void enableSASL(final SaslServer saslServer) {
+        clientContext.enableSASL(saslServer);
     }
 
     /**
@@ -1639,11 +1821,131 @@
      * @return The array of certificates associated with a connection.
      */
     public Certificate[] getClientCertificateChain() {
+        final SSLSession sslSession = clientContext.getSSLSession();
+        if (sslSession != null) {
+            try {
+                return sslSession.getPeerCertificates();
+            } catch (SSLPeerUnverifiedException e) {
+                logger.traceException(e);
+            }
+        }
         return new Certificate[0];
     }
 
     @Override
     public int getSSF() {
-        return 0;
+        return clientContext.getSecurityStrengthFactor();
+    }
+
+    /** Upstream -> BlockingBackpressureSubscription -> Downstream */
+    private final class BlockingBackpressureSubscription
+            implements Subscription, Publisher<Response>, Subscriber<Response> {
+        private long pendingRequests;
+        private final Queue<Response> queue = new LinkedList<>();
+        private final Lock lock = new ReentrantLock();
+        private final Condition spaceAvailable = lock.newCondition();
+        private final Publisher<Response> upstream;
+        private final long writeTimeoutMillis;
+        private Subscription subscription;
+        private Subscriber<? super Response> downstream;
+
+        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
+            this.upstream = upstream;
+            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0 
+                    ? 30000 // Do not wait indefinitely, 
+                    : connectionHandler.getMaxBlockedWriteTimeLimit();
+        }
+
+        @Override
+        public void subscribe(final Subscriber<? super Response> subscriber) {
+            if (downstream != null) {
+                return;
+            }
+            downstream = subscriber;
+            subscriber.onSubscribe(this);
+            upstream.subscribe(this);
+        }
+
+        @Override
+        public void onSubscribe(final Subscription s) {
+            if ( subscription != null) {
+                s.cancel();
+                return;
+            }
+            subscription = s;
+            subscription.request(Long.MAX_VALUE);
+        }
+
+        @Override
+        public void request(long n) {
+            lock.lock();
+            try {
+                if (pendingRequests != Long.MIN_VALUE) {
+                    pendingRequests += n;
+                    drain();
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        private void drain() {
+            Response response;
+            try {
+                while (pendingRequests > 0 && (response = queue.poll()) != null) {
+                    downstream.onNext(response);
+                    // Forward response
+                    pendingRequests--;
+                }
+            } finally {
+                spaceAvailable.signalAll();
+            }
+        }
+
+        @Override
+        public void onNext(final Response response) {
+            lock.lock();
+            try {
+                while (queue.size() >= 32) {
+                    try {
+                        if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
+                            // If we've gotten here, then the write timed out.
+                            downstream.onError(new ClosedChannelException());
+                            cancel();
+                            return;
+                        }
+                    } catch (InterruptedException e) {
+                        downstream.onError(e);
+                        cancel();
+                        return;
+                    }
+                }
+                queue.add(response);
+                drain();
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            downstream.onError(t);
+            cancel();
+        }
+
+        @Override
+        public void onComplete() {
+            downstream.onComplete();
+            cancel();
+        }
+
+        @Override
+        public void cancel() {
+            if (subscription != null) {
+                subscription.cancel();
+            }
+            queue.clear();
+            pendingRequests = Long.MIN_VALUE;
+        }
     }
 }

--
Gitblit v1.10.0