From bdc5fa0dd0980eb9e077ae80644504705a24e035 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 07 Nov 2016 13:59:40 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java |   83 ++++++++++++++++++++---------------------
 1 files changed, 40 insertions(+), 43 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index af9a708..2cb5125 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -54,14 +54,14 @@
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.LDAPClientContext;
-import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
+import org.forgerock.opendj.ldap.LDAPClientContext.ConnectionEventListener;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.CompareResult;
 import org.forgerock.opendj.ldap.responses.Response;
 import org.forgerock.opendj.ldap.responses.Responses;
 import org.forgerock.opendj.ldap.responses.Result;
-import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
+import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRequestEnvelope;
 import org.opends.server.api.ClientConnection;
 import org.opends.server.api.ConnectionHandler;
 import org.opends.server.core.AbandonOperationBasis;
@@ -123,7 +123,7 @@
  * instance of the LDAP connection handler and have its requests decoded by an LDAP request handler.
  */
 public final class LDAPClientConnection2 extends ClientConnection implements TLSCapableConnection,
-        ReactiveHandler<QueueingStrategy, LdapRawMessage, Stream<Response>> {
+        ReactiveHandler<QueueingStrategy, LdapRequestEnvelope, Stream<Response>> {
     private static final String REACTIVE_OUT = "reactive.out";
 
     /** The tracer object for the debug logger. */
@@ -230,9 +230,9 @@
         }
 
         connectionID = DirectoryServer.newConnectionAccepted(this);
-        clientContext.onDisconnect(new DisconnectListener() {
+        clientContext.addConnectionEventListener(new ConnectionEventListener() {
             @Override
-            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
+            public void handleConnectionError(LDAPClientContext context, Throwable error) {
                 if (error instanceof LocalizableException) {
                     disconnect(
                             DisconnectReason.PROTOCOL_ERROR, true, ((LocalizableException) error).getMessageObject());
@@ -242,13 +242,13 @@
             }
 
             @Override
-            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
+            public void handleConnectionDisconnected(LDAPClientContext context, ResultCode resultCode,
                     String diagnosticMessage) {
                 disconnect(DisconnectReason.SERVER_ERROR, false, null);
             }
 
             @Override
-            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
+            public void handleConnectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
                 disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
             }
         });
@@ -372,7 +372,7 @@
      */
     @Override
     public boolean isSecure() {
-        return false;
+        return clientContext.getSSLSession() != null || clientContext.getSASLServer() != null;
     }
 
     /**
@@ -405,7 +405,7 @@
         // an error result to the client indicating that a problem occurred.
         if (removeOperationInProgress(operation.getMessageID())) {
             final Response response = operationToResponse(operation);
-            final FlowableEmitter<Response> out = getOut(operation);
+            final FlowableEmitter<Response> out = getAttachedEmitter(operation);
             if (response != null) {
                 out.onNext(response);
             }
@@ -534,22 +534,18 @@
      *            The search result entry to be sent to the client
      */
     @Override
-    public void sendSearchEntry(SearchOperation searchOperation, SearchResultEntry searchEntry) {
-        getEmitter(searchOperation).onNext(toResponse(searchEntry));
+    public void sendSearchEntry(final SearchOperation searchOperation, final SearchResultEntry searchEntry) {
+        getAttachedEmitter(searchOperation).onNext(toResponse(searchEntry));
     }
 
-    private FlowableEmitter<Response> getEmitter(SearchOperation searchOperation) {
-        return getOut(searchOperation);
-    }
-
-    private Response toResponse(SearchResultEntry searchEntry) {
-        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
-    }
-
-    private FlowableEmitter<Response> getOut(Operation operation) {
+    private FlowableEmitter<Response> getAttachedEmitter(final Operation operation) {
         return ((FlowableEmitter<Response>) operation.getAttachment(REACTIVE_OUT));
     }
 
+    private Response toResponse(final SearchResultEntry searchEntry) {
+        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
+    }
+
     /**
      * Sends the provided search result reference to the client.
      *
@@ -572,7 +568,7 @@
             return false;
         }
 
-        final FlowableEmitter<Response> out = getOut(searchOperation);
+        final FlowableEmitter<Response> out = getAttachedEmitter(searchOperation);
         out.onNext(Converters.from(searchReference));
 
         return true;
@@ -589,7 +585,7 @@
     @Override
     protected boolean sendIntermediateResponseMessage(IntermediateResponse intermediateResponse) {
         final Operation operation = intermediateResponse.getOperation();
-        final FlowableEmitter<Response> out = getOut(operation);
+        final FlowableEmitter<Response> emitter = getAttachedEmitter(operation);
 
         final Response response = Responses.newGenericIntermediateResponse(intermediateResponse.getOID(),
                 intermediateResponse.getValue());
@@ -597,7 +593,7 @@
             response.addControl(Converters.from(control));
         }
 
-        out.onNext(response);
+        emitter.onNext(response);
 
         // The only reason we shouldn't continue processing is if the
         // connection is closed.
@@ -962,14 +958,15 @@
      *         a fatal error and the client has been disconnected as a result, or if the client unbound from the server.
      */
     @Override
-    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
-        return streamFromPublisher(
-                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
+    public Stream<Response> handle(final QueueingStrategy queueingStrategy, final LdapRequestEnvelope message) {
+        return streamFromPublisher(new BlockingBackpressureSubscription(
+                Flowable.create(new FlowableOnSubscribe<Response>() {
                     @Override
                     public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                         processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                     }
-                }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
+                }, BackpressureStrategy.ERROR)))
+                .onNext(new Consumer<Response>() {
                     @Override
                     public void accept(final Response response) throws Exception {
                         if (keepStats) {
@@ -1003,7 +1000,7 @@
         }
     }
 
-    private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
+    private final byte toLdapResponseType(final LdapRequestEnvelope rawRequest, final Response response) {
         if (response instanceof Result) {
             return toLdapResultType(rawRequest.getMessageType());
         }
@@ -1160,8 +1157,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1229,17 +1226,17 @@
             versionString = "2";
 
             if (!connectionHandler.allowLDAPv2()) {
-                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                        ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get().toString()));
+                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
+                                    .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                 out.onComplete();
-                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CLIENTS_NOT_ALLOWED.get());
+                disconnect(DisconnectReason.PROTOCOL_ERROR, false, ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get());
                 return false;
             }
 
             if (!controls.isEmpty()) {
                 // LDAPv2 clients aren't allowed to send controls.
-                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                        ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+                out.onNext(Responses.newBindResult(ResultCode.PROTOCOL_ERROR)
+                                    .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
                 out.onComplete();
                 disconnectControlsNotAllowed();
                 return false;
@@ -1390,8 +1387,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1514,8 +1511,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1569,8 +1566,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;
@@ -1626,8 +1623,8 @@
             final List<Control> controls, final FlowableEmitter<Response> out) {
         if (ldapVersion == 2 && !controls.isEmpty()) {
             // LDAPv2 clients aren't allowed to send controls.
-            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
-                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
+            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR)
+                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
             out.onComplete();
             disconnectControlsNotAllowed();
             return false;

--
Gitblit v1.10.0