From ede52c8f25aa2b7815e006cdfd6fe09f0766b548 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Wed, 09 Nov 2016 14:54:10 +0000
Subject: [PATCH] OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport
---
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java | 14 ++--
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java | 20 +-----
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java | 5 +
opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java | 5 -
pom.xml | 2
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java | 4
opendj-server-legacy/pom.xml | 18 +++++
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java | 3 +
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java | 8 +-
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java | 20 ++++++
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java | 8 +-
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java | 9 ---
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java | 8 ++
13 files changed, 75 insertions(+), 49 deletions(-)
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java
index f67783c..86c5b54 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java
@@ -145,24 +145,24 @@
* The maximum BER element size, or <code>0</code> to indicate
* that there is no limit.
* @param buffer
- * The buffer where the content will be read from.
+ * The buffer where the content will be read from. Note that
+ * @{code {@link #close()} this reader will also dispose the buffer.
*/
ASN1BufferReader(final int maxElementSize, final Buffer buffer) {
this.readLimiter = new RootSequenceLimiter();
this.buffer = buffer;
- this.buffer.allowBufferDispose(false);
this.maxElementSize = maxElementSize;
}
/**
- * Closes this ASN.1 reader and the underlying stream.
+ * Closes this ASN.1 reader and the underlying {@link Buffer}.
*
* @throws IOException
* if an I/O error occurs
*/
@Override
public void close() throws IOException {
- // Nothing to do
+ buffer.tryDispose();
}
/**
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
index 9cc966e..5f72851 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
@@ -158,8 +158,8 @@
void ensureAdditionalCapacity(final int size) {
final int newCount = outBuffer.position() + size;
- if (newCount > outBuffer.capacity()) {
- outBuffer = memoryManager.reallocate(outBuffer, Math.max(outBuffer.capacity() << 1, newCount));
+ if (newCount > outBuffer.limit()) {
+ outBuffer = memoryManager.reallocate(outBuffer, Math.max(outBuffer.limit() << 1, newCount));
}
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
index ce07e3a..e2760de 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
@@ -20,6 +20,7 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.glassfish.grizzly.memory.HeapMemoryManager;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
@@ -134,6 +135,8 @@
// Enabled by default.
builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
}
+ // Default heap manager has changed in grizzly 2.3.27 to the more memory consuming PooledMemoryManager
+ builder.setMemoryManager(new HeapMemoryManager());
final TCPNIOTransport transport = builder.build();
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
index a3ed311..f3636e3 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
@@ -425,17 +425,17 @@
buffer.mark();
if (!reader.elementAvailable()) {
buffer.reset();
- return ctx.getStopAction(buffer);
+ // We need to create a duplicate because buffer will be closed by the reader (try-with-resources)
+ return ctx.getStopAction(buffer.duplicate());
}
final int length = reader.peekLength();
final Buffer remainder = buffer.remaining() > length ? buffer.split(buffer.position() + length) : null;
buffer.reset();
- try (final ASN1BufferReader packetReader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
+ try (final ASN1BufferReader packetReader =
+ new ASN1BufferReader(maxASN1ElementSize, buffer.asReadOnlyBuffer())) {
final LDAPReader<? extends ASN1Reader> ldapReader = LDAP.getReader(packetReader, decodeOptions);
ctx.setMessage(null);
ldapReader.readMessage(handler);
- } finally {
- buffer.tryDispose();
}
return ctx.getInvokeAction(remainder);
} catch (IOException e) {
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
index 6e1b492..4b53681 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -264,8 +264,9 @@
@Override
public void request(long n) {
- if (BackpressureHelper.add(pendingRequests, n) == 0 && ctx != null) {
- ctx.resumeNext();
+ final FilterChainContext immutableRef = ctx;
+ if (BackpressureHelper.add(pendingRequests, n) == 0 && immutableRef != null) {
+ immutableRef.resumeNext();
ctx = null;
}
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
index 2130e0a..c713819 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -57,22 +57,24 @@
try {
final Buffer buffer = ctx.getMessage();
try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
- buffer.mark();
+ // Due to a bug in grizzly's ByteBufferWrapper.split(), we can't use byteBuffer.mark()
+ final int mark = buffer.position();
if (!reader.elementAvailable()) {
- buffer.reset();
- return ctx.getStopAction(buffer);
+ buffer.position(mark);
+ // We need to create a duplicate because buffer will be closed by the reader (try-with-resources)
+ return ctx.getStopAction(buffer.duplicate());
}
final int length = reader.peekLength();
if (length > maxASN1ElementSize) {
- buffer.reset();
+ buffer.position(mark);
throw DecodeException.fatalError(
ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED.get(length, maxASN1ElementSize));
}
final Buffer remainder = (buffer.remaining() > length)
? buffer.split(buffer.position() + length)
: null;
- buffer.reset();
- ctx.setMessage(decodePacket(new ASN1BufferReader(maxASN1ElementSize, buffer)));
+ buffer.position(mark);
+ ctx.setMessage(decodePacket(new ASN1BufferReader(maxASN1ElementSize, buffer.asReadOnlyBuffer())));
buffer.tryDispose();
return ctx.getInvokeAction(remainder);
}
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
index adecbcf..3c8c20b 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -20,7 +20,6 @@
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
-import org.glassfish.grizzly.WriteHandler;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -44,25 +43,14 @@
return;
}
upstream = s;
- connection.notifyCanWrite(new WriteHandler() {
- @Override
- public void onWritePossible() throws Exception {
- final Subscription sub = upstream;
- if (sub != null) {
- sub.request(1);
- }
- }
-
- @Override
- public void onError(final Throwable error) {
- LdapResponseMessageWriter.this.onError(error);
- }
- });
+ // We're requesting two response to allow overlap between async I/O and response computation.
+ // (allows to generate a response while we're waiting for the previous message to be written)
+ upstream.request(2);
}
@Override
public void onNext(final LdapResponseMessage message) {
- connection.write(message).addCompletionHandler(this);
+ connection.write(message, this);
}
@Override
diff --git a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java
index 7a3c849..5e6c1b6 100644
--- a/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java
+++ b/opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java
@@ -20,6 +20,7 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.glassfish.grizzly.memory.PooledMemoryManager;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
@@ -36,6 +37,7 @@
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
static final ServerTCPNIOTransport SERVER_TRANSPORT = new ServerTCPNIOTransport();
+ private static final long MB = 1024 * 1024;
private ServerTCPNIOTransport() {
// Prevent instantiation.
@@ -96,6 +98,24 @@
// Enabled by default.
builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
}
+ float heapPercent;
+ if (Runtime.getRuntime().maxMemory() < 1024 * MB) {
+ // Low heap
+ heapPercent = 0.01f;
+ } else {
+ // Compute a percentage to try to reach roughly 64Mb (big enough (tm))
+ heapPercent = 64f * MB / Runtime.getRuntime().maxMemory();
+ }
+ // Force usage of PooledMemoryManager which allows to use grizzly's buffers across threads.
+ builder.setMemoryManager(new PooledMemoryManager(
+ 1024, // Initial buffer size
+ 3, // Number of pools (with growing factor below this give us pools of 1K, 4K, 16k buffers)
+ 4, // Growing factor to apply on the size of the buffer polled by the next pool
+ selectorThreadCount, // Number of pool slices that every pool will stripe allocation requests across
+ heapPercent, // The percentage of the heap that this manager will use when populating the pools (5%)
+ 1f, // The percentage of buffers to be pre-allocated during MemoryManager initialization (100%)
+ true // true to use direct buffers or false to use heap buffers
+ ));
final TCPNIOTransport transport = builder.build();
diff --git a/opendj-server-legacy/pom.xml b/opendj-server-legacy/pom.xml
index eb280dd..940f2f8 100644
--- a/opendj-server-legacy/pom.xml
+++ b/opendj-server-legacy/pom.xml
@@ -163,6 +163,16 @@
<dependency>
<groupId>org.forgerock.commons</groupId>
+ <artifactId>forgerock-audit-handler-splunk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.forgerock.commons</groupId>
+ <artifactId>forgerock-audit-handler-jms</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.forgerock.commons</groupId>
<artifactId>forgerock-audit-json</artifactId>
</dependency>
@@ -670,9 +680,15 @@
<Export-Package>org.forgerock.opendj.server.embedded</Export-Package>
<!-- Import je changelog since it is not shipped in the main jar -->
<Import-Package>
- org.opends.server.replication.server.changelog.je,
+ org.opends.server.replication.server.changelog.je;resolution:=optional,
+ com.sleepycat.je*;resolution:=optional,
${opendj.osgi.import}
</Import-Package>
+ <Embed-Dependency>
+ forgerock-persistit-core,
+ <!--je,-->
+ jcip-annotations
+ </Embed-Dependency>
</instructions>
</configuration>
</execution>
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
index 2cb5125..6e95797 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -18,6 +18,7 @@
import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
import static org.forgerock.opendj.io.LDAP.*;
+import static org.forgerock.util.Utils.closeSilently;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
@@ -963,7 +964,12 @@
Flowable.create(new FlowableOnSubscribe<Response>() {
@Override
public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
- processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+ try {
+ processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
+ } finally {
+ // We don't need the ASN1Reader anymore.
+ closeSilently(message.getContent());
+ }
}
}, BackpressureStrategy.ERROR)))
.onNext(new Consumer<Response>() {
diff --git a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
index 5b5d2ba..821e971 100644
--- a/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
+++ b/opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -433,15 +433,6 @@
return (int) currentConfig.getMaxRequestSize();
}
- /**
- * Retrieves the size in bytes of the LDAP response message write buffer defined for this connection handler.
- *
- * @return The size in bytes of the LDAP response message write buffer.
- */
- public int getBufferSize() {
- return (int) currentConfig.getBufferSize();
- }
-
@Override
public String getProtocol() {
return protocol;
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java b/opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java
index d2a9ea0..1787cfc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java
@@ -534,9 +534,6 @@
setProcessingStopTime();
logBindResponse(this);
- // Send the bind response to the client.
- clientConnection.sendResponse(this);
-
// If the bind processing is finished, then unset the "bind in progress"
// flag to allow other operations to be processed on the connection.
if (getResultCode() != ResultCode.SASL_BIND_IN_PROGRESS)
@@ -545,6 +542,8 @@
}
clientConnection.finishBind();
+ clientConnection.sendResponse(this);
+
invokePostResponsePlugins(workflowExecuted);
}
}
diff --git a/pom.xml b/pom.xml
index 0b452df..dc869f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
<forgerock-build-tools.version>1.0.2</forgerock-build-tools.version>
<forgerock-doc-plugin.version>3.2.2-SNAPSHOT</forgerock-doc-plugin.version>
<freemarker.version>2.3.24-incubating</freemarker.version>
- <grizzly-framework.version>2.3.24</grizzly-framework.version>
+ <grizzly-framework.version>2.3.28</grizzly-framework.version>
<metrics-core.version>3.1.2</metrics-core.version>
<!-- OSGi bundles properties -->
--
Gitblit v1.10.0