From ede52c8f25aa2b7815e006cdfd6fe09f0766b548 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Wed, 09 Nov 2016 14:54:10 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

---
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java                     |   14 ++--
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java     |   20 +-----
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java              |    5 +
 opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java            |    5 -
 pom.xml                                                                                      |    2 
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java              |    4 
 opendj-server-legacy/pom.xml                                                                 |   18 +++++
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java        |    3 +
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java              |    8 +-
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java         |   20 ++++++
 opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java              |    8 +-
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java |    9 ---
 opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java  |    8 ++
 13 files changed, 75 insertions(+), 49 deletions(-)

diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java
index f67783c..86c5b54 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java
@@ -145,24 +145,24 @@
      *            The maximum BER element size, or <code>0</code> to indicate
      *            that there is no limit.
      * @param buffer
-     *            The buffer where the content will be read from.
+     *            The buffer where the content will be read from. Note that
+     *            @{code {@link #close()} this reader will also dispose the buffer.
      */
     ASN1BufferReader(final int maxElementSize, final Buffer buffer) {
         this.readLimiter = new RootSequenceLimiter();
         this.buffer = buffer;
-        this.buffer.allowBufferDispose(false);
         this.maxElementSize = maxElementSize;
     }
 
     /**
-     * Closes this ASN.1 reader and the underlying stream.
+     * Closes this ASN.1 reader and the underlying {@link Buffer}.
      *
      * @throws IOException
      *             if an I/O error occurs
      */
     @Override
     public void close() throws IOException {
-        // Nothing to do
+        buffer.tryDispose();
     }
 
     /**
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
index 9cc966e..5f72851 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
@@ -158,8 +158,8 @@
 
     void ensureAdditionalCapacity(final int size) {
         final int newCount = outBuffer.position() + size;
-        if (newCount > outBuffer.capacity()) {
-            outBuffer = memoryManager.reallocate(outBuffer, Math.max(outBuffer.capacity() << 1, newCount));
+        if (newCount > outBuffer.limit()) {
+            outBuffer = memoryManager.reallocate(outBuffer, Math.max(outBuffer.limit() << 1, newCount));
         }
     }
 
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
index ce07e3a..e2760de 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
@@ -20,6 +20,7 @@
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.glassfish.grizzly.memory.HeapMemoryManager;
 import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
 import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
 import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
@@ -134,6 +135,8 @@
             // Enabled by default.
             builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
         }
+        // Default heap manager has changed in grizzly 2.3.27 to the more memory consuming PooledMemoryManager
+        builder.setMemoryManager(new HeapMemoryManager());
 
         final TCPNIOTransport transport = builder.build();
 
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
index a3ed311..f3636e3 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
@@ -425,17 +425,17 @@
             buffer.mark();
             if (!reader.elementAvailable()) {
                 buffer.reset();
-                return ctx.getStopAction(buffer);
+                // We need to create a duplicate because buffer will be closed by the reader (try-with-resources)
+                return ctx.getStopAction(buffer.duplicate());
             }
             final int length = reader.peekLength();
             final Buffer remainder = buffer.remaining() > length ? buffer.split(buffer.position() + length) : null;
             buffer.reset();
-            try (final ASN1BufferReader packetReader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
+            try (final ASN1BufferReader packetReader =
+                    new ASN1BufferReader(maxASN1ElementSize, buffer.asReadOnlyBuffer())) {
                 final LDAPReader<? extends ASN1Reader> ldapReader = LDAP.getReader(packetReader, decodeOptions);
                 ctx.setMessage(null);
                 ldapReader.readMessage(handler);
-            } finally {
-                buffer.tryDispose();
             }
             return ctx.getInvokeAction(remainder);
         } catch (IOException e) {
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 6e1b492..4b53681 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -264,8 +264,9 @@
 
             @Override
             public void request(long n) {
-                if (BackpressureHelper.add(pendingRequests, n) == 0 && ctx != null) {
-                    ctx.resumeNext();
+                final FilterChainContext immutableRef = ctx;
+                if (BackpressureHelper.add(pendingRequests, n) == 0 && immutableRef != null) {
+                    immutableRef.resumeNext();
                     ctx = null;
                 }
             }
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
index 2130e0a..c713819 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -57,22 +57,24 @@
         try {
             final Buffer buffer = ctx.getMessage();
             try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
-                buffer.mark();
+                // Due to a bug in grizzly's ByteBufferWrapper.split(), we can't use byteBuffer.mark()
+                final int mark = buffer.position();
                 if (!reader.elementAvailable()) {
-                    buffer.reset();
-                    return ctx.getStopAction(buffer);
+                    buffer.position(mark);
+                    // We need to create a duplicate because buffer will be closed by the reader (try-with-resources)
+                    return ctx.getStopAction(buffer.duplicate());
                 }
                 final int length = reader.peekLength();
                 if (length > maxASN1ElementSize) {
-                    buffer.reset();
+                    buffer.position(mark);
                     throw DecodeException.fatalError(
                             ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED.get(length, maxASN1ElementSize));
                 }
                 final Buffer remainder = (buffer.remaining() > length)
                         ? buffer.split(buffer.position() + length)
                         : null;
-                buffer.reset();
-                ctx.setMessage(decodePacket(new ASN1BufferReader(maxASN1ElementSize, buffer)));
+                buffer.position(mark);
+                ctx.setMessage(decodePacket(new ASN1BufferReader(maxASN1ElementSize, buffer.asReadOnlyBuffer())));
                 buffer.tryDispose();
                 return ctx.getInvokeAction(remainder);
             }
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index adecbcf..3c8c20b 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -20,7 +20,6 @@
 import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
 import org.glassfish.grizzly.CompletionHandler;
 import org.glassfish.grizzly.Connection;
-import org.glassfish.grizzly.WriteHandler;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
@@ -44,25 +43,14 @@
             return;
         }
         upstream = s;
-        connection.notifyCanWrite(new WriteHandler() {
-            @Override
-            public void onWritePossible() throws Exception {
-                final Subscription sub = upstream;
-                if (sub != null) {
-                    sub.request(1);
-                }
-            }
-
-            @Override
-            public void onError(final Throwable error) {
-                LdapResponseMessageWriter.this.onError(error);
-            }
-        });
+        // We're requesting two response to allow overlap between async I/O and response computation.
+        // (allows to generate a response while we're waiting for the previous message to be written)
+        upstream.request(2);
     }
 
     @Override
     public void onNext(final LdapResponseMessage message) {
-        connection.write(message).addCompletionHandler(this);
+        connection.write(message, this);
     }
 
     @Override
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java
index 7a3c849..5e6c1b6 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java
@@ -20,6 +20,7 @@
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.glassfish.grizzly.memory.PooledMemoryManager;
 import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
 import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
 import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
@@ -36,6 +37,7 @@
 
     private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
     static final ServerTCPNIOTransport SERVER_TRANSPORT = new ServerTCPNIOTransport();
+    private static final long MB = 1024 * 1024;
 
     private ServerTCPNIOTransport() {
         // Prevent instantiation.
@@ -96,6 +98,24 @@
             // Enabled by default.
             builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
         }
+        float heapPercent;
+        if (Runtime.getRuntime().maxMemory() < 1024 * MB) {
+            // Low heap
+            heapPercent = 0.01f;
+        } else {
+            // Compute a percentage to try to reach roughly 64Mb (big enough (tm))
+            heapPercent = 64f * MB / Runtime.getRuntime().maxMemory();
+        }
+        // Force usage of PooledMemoryManager which allows to use grizzly's buffers across threads.
+        builder.setMemoryManager(new PooledMemoryManager(
+                1024,  // Initial buffer size
+                3,     // Number of pools (with growing factor below this give us pools of 1K, 4K, 16k buffers)
+                4,     // Growing factor to apply on the size of the buffer polled by the next pool
+                selectorThreadCount,    // Number of pool slices that every pool will stripe allocation requests across
+                heapPercent, // The percentage of the heap that this manager will use when populating the pools (5%)
+                1f,    // The percentage of buffers to be pre-allocated during MemoryManager initialization (100%)
+                true   // true to use direct buffers or false to use heap buffers
+        ));
 
         final TCPNIOTransport transport = builder.build();
 
diff --git a/opendj-server-legacy/pom.xml b/opendj-server-legacy/pom.xml
index eb280dd..940f2f8 100644
--- a/opendj-server-legacy/pom.xml
+++ b/opendj-server-legacy/pom.xml
@@ -163,6 +163,16 @@
 
     <dependency>
       <groupId>org.forgerock.commons</groupId>
+      <artifactId>forgerock-audit-handler-splunk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.forgerock.commons</groupId>
+      <artifactId>forgerock-audit-handler-jms</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.forgerock.commons</groupId>
       <artifactId>forgerock-audit-json</artifactId>
     </dependency>
 
@@ -670,9 +680,15 @@
                 <Export-Package>org.forgerock.opendj.server.embedded</Export-Package>
                 <!-- Import je changelog since it is not shipped in the main jar -->
                 <Import-Package>
-                  org.opends.server.replication.server.changelog.je,
+                  org.opends.server.replication.server.changelog.je;resolution:=optional,
+                  com.sleepycat.je*;resolution:=optional,
                   ${opendj.osgi.import}
                 </Import-Package>
+                <Embed-Dependency>
+                  forgerock-persistit-core,
+                  <!--je,-->
+                  jcip-annotations
+                </Embed-Dependency>
               </instructions>
             </configuration>
           </execution>
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 2cb5125..6e95797 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -18,6 +18,7 @@
 
 import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
 import static org.forgerock.opendj.io.LDAP.*;
+import static org.forgerock.util.Utils.closeSilently;
 import static org.opends.messages.CoreMessages.*;
 import static org.opends.messages.ProtocolMessages.*;
 import static org.opends.server.loggers.AccessLogger.logDisconnect;
@@ -963,7 +964,12 @@
                 Flowable.create(new FlowableOnSubscribe<Response>() {
                     @Override
                     public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
-                        processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+                        try {
+                            processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+                        } finally {
+                            // We don't need the ASN1Reader anymore.
+                            closeSilently(message.getContent());
+                        }
                     }
                 }, BackpressureStrategy.ERROR)))
                 .onNext(new Consumer<Response>() {
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index 5b5d2ba..821e971 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -433,15 +433,6 @@
         return (int) currentConfig.getMaxRequestSize();
     }
 
-    /**
-     * Retrieves the size in bytes of the LDAP response message write buffer defined for this connection handler.
-     *
-     * @return The size in bytes of the LDAP response message write buffer.
-     */
-    public int getBufferSize() {
-        return (int) currentConfig.getBufferSize();
-    }
-
     @Override
     public String getProtocol() {
         return protocol;
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java b/opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java
index d2a9ea0..1787cfc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java
@@ -534,9 +534,6 @@
       setProcessingStopTime();
       logBindResponse(this);
 
-      // Send the bind response to the client.
-      clientConnection.sendResponse(this);
-
       // If the bind processing is finished, then unset the "bind in progress"
       // flag to allow other operations to be processed on the connection.
       if (getResultCode() != ResultCode.SASL_BIND_IN_PROGRESS)
@@ -545,6 +542,8 @@
       }
       clientConnection.finishBind();
 
+      clientConnection.sendResponse(this);
+
       invokePostResponsePlugins(workflowExecuted);
     }
   }
diff --git a/pom.xml b/pom.xml
index 0b452df..dc869f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
         <forgerock-build-tools.version>1.0.2</forgerock-build-tools.version>
         <forgerock-doc-plugin.version>3.2.2-SNAPSHOT</forgerock-doc-plugin.version>
         <freemarker.version>2.3.24-incubating</freemarker.version>
-        <grizzly-framework.version>2.3.24</grizzly-framework.version>
+        <grizzly-framework.version>2.3.28</grizzly-framework.version>
         <metrics-core.version>3.1.2</metrics-core.version>
 
         <!-- OSGi bundles properties -->

--
Gitblit v1.10.0