mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
08.42.2016 ede52c8f25aa2b7815e006cdfd6fe09f0766b548
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

* Upgrade grizzly to 2.3.28
* Use HeapBufferManager for client SDK (ThreadLocal based)
* Use PooledMemoryManager for server listener (allows to pool buffer
acrros threads)
* Fix Buffer management in LdapCodec / LDAPClientFilter / ASN1Buffer*
* Fix race-condition on bind in progress detection
13 files modified
124 ■■■■■ changed files
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java 8 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java 4 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java 3 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java 8 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 5 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java 14 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java 20 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java 20 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/pom.xml 18 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 8 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java 9 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java 5 ●●●●● patch | view | raw | blame | history
pom.xml 2 ●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java
@@ -145,24 +145,24 @@
     *            The maximum BER element size, or <code>0</code> to indicate
     *            that there is no limit.
     * @param buffer
     *            The buffer where the content will be read from.
     *            The buffer where the content will be read from. Note that
     *            @{code {@link #close()} this reader will also dispose the buffer.
     */
    ASN1BufferReader(final int maxElementSize, final Buffer buffer) {
        this.readLimiter = new RootSequenceLimiter();
        this.buffer = buffer;
        this.buffer.allowBufferDispose(false);
        this.maxElementSize = maxElementSize;
    }
    /**
     * Closes this ASN.1 reader and the underlying stream.
     * Closes this ASN.1 reader and the underlying {@link Buffer}.
     *
     * @throws IOException
     *             if an I/O error occurs
     */
    @Override
    public void close() throws IOException {
        // Nothing to do
        buffer.tryDispose();
    }
    /**
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
@@ -158,8 +158,8 @@
    void ensureAdditionalCapacity(final int size) {
        final int newCount = outBuffer.position() + size;
        if (newCount > outBuffer.capacity()) {
            outBuffer = memoryManager.reallocate(outBuffer, Math.max(outBuffer.capacity() << 1, newCount));
        if (newCount > outBuffer.limit()) {
            outBuffer = memoryManager.reallocate(outBuffer, Math.max(outBuffer.limit() << 1, newCount));
        }
    }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
@@ -20,6 +20,7 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.glassfish.grizzly.memory.HeapMemoryManager;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
@@ -134,6 +135,8 @@
            // Enabled by default.
            builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
        }
        // Default heap manager has changed in grizzly 2.3.27 to the more memory consuming PooledMemoryManager
        builder.setMemoryManager(new HeapMemoryManager());
        final TCPNIOTransport transport = builder.build();
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
@@ -425,17 +425,17 @@
            buffer.mark();
            if (!reader.elementAvailable()) {
                buffer.reset();
                return ctx.getStopAction(buffer);
                // We need to create a duplicate because buffer will be closed by the reader (try-with-resources)
                return ctx.getStopAction(buffer.duplicate());
            }
            final int length = reader.peekLength();
            final Buffer remainder = buffer.remaining() > length ? buffer.split(buffer.position() + length) : null;
            buffer.reset();
            try (final ASN1BufferReader packetReader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
            try (final ASN1BufferReader packetReader =
                    new ASN1BufferReader(maxASN1ElementSize, buffer.asReadOnlyBuffer())) {
                final LDAPReader<? extends ASN1Reader> ldapReader = LDAP.getReader(packetReader, decodeOptions);
                ctx.setMessage(null);
                ldapReader.readMessage(handler);
            } finally {
                buffer.tryDispose();
            }
            return ctx.getInvokeAction(remainder);
        } catch (IOException e) {
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -264,8 +264,9 @@
            @Override
            public void request(long n) {
                if (BackpressureHelper.add(pendingRequests, n) == 0 && ctx != null) {
                    ctx.resumeNext();
                final FilterChainContext immutableRef = ctx;
                if (BackpressureHelper.add(pendingRequests, n) == 0 && immutableRef != null) {
                    immutableRef.resumeNext();
                    ctx = null;
                }
            }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -57,22 +57,24 @@
        try {
            final Buffer buffer = ctx.getMessage();
            try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
                buffer.mark();
                // Due to a bug in grizzly's ByteBufferWrapper.split(), we can't use byteBuffer.mark()
                final int mark = buffer.position();
                if (!reader.elementAvailable()) {
                    buffer.reset();
                    return ctx.getStopAction(buffer);
                    buffer.position(mark);
                    // We need to create a duplicate because buffer will be closed by the reader (try-with-resources)
                    return ctx.getStopAction(buffer.duplicate());
                }
                final int length = reader.peekLength();
                if (length > maxASN1ElementSize) {
                    buffer.reset();
                    buffer.position(mark);
                    throw DecodeException.fatalError(
                            ERR_LDAP_CLIENT_DECODE_MAX_REQUEST_SIZE_EXCEEDED.get(length, maxASN1ElementSize));
                }
                final Buffer remainder = (buffer.remaining() > length)
                        ? buffer.split(buffer.position() + length)
                        : null;
                buffer.reset();
                ctx.setMessage(decodePacket(new ASN1BufferReader(maxASN1ElementSize, buffer)));
                buffer.position(mark);
                ctx.setMessage(decodePacket(new ASN1BufferReader(maxASN1ElementSize, buffer.asReadOnlyBuffer())));
                buffer.tryDispose();
                return ctx.getInvokeAction(remainder);
            }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapResponseMessageWriter.java
@@ -20,7 +20,6 @@
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.WriteHandler;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -44,25 +43,14 @@
            return;
        }
        upstream = s;
        connection.notifyCanWrite(new WriteHandler() {
            @Override
            public void onWritePossible() throws Exception {
                final Subscription sub = upstream;
                if (sub != null) {
                    sub.request(1);
                }
            }
            @Override
            public void onError(final Throwable error) {
                LdapResponseMessageWriter.this.onError(error);
            }
        });
        // We're requesting two response to allow overlap between async I/O and response computation.
        // (allows to generate a response while we're waiting for the previous message to be written)
        upstream.request(2);
    }
    @Override
    public void onNext(final LdapResponseMessage message) {
        connection.write(message).addCompletionHandler(this);
        connection.write(message, this);
    }
    @Override
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java
@@ -20,6 +20,7 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.glassfish.grizzly.memory.PooledMemoryManager;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
@@ -36,6 +37,7 @@
    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
    static final ServerTCPNIOTransport SERVER_TRANSPORT = new ServerTCPNIOTransport();
    private static final long MB = 1024 * 1024;
    private ServerTCPNIOTransport() {
        // Prevent instantiation.
@@ -96,6 +98,24 @@
            // Enabled by default.
            builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
        }
        float heapPercent;
        if (Runtime.getRuntime().maxMemory() < 1024 * MB) {
            // Low heap
            heapPercent = 0.01f;
        } else {
            // Compute a percentage to try to reach roughly 64Mb (big enough (tm))
            heapPercent = 64f * MB / Runtime.getRuntime().maxMemory();
        }
        // Force usage of PooledMemoryManager which allows to use grizzly's buffers across threads.
        builder.setMemoryManager(new PooledMemoryManager(
                1024,  // Initial buffer size
                3,     // Number of pools (with growing factor below this give us pools of 1K, 4K, 16k buffers)
                4,     // Growing factor to apply on the size of the buffer polled by the next pool
                selectorThreadCount,    // Number of pool slices that every pool will stripe allocation requests across
                heapPercent, // The percentage of the heap that this manager will use when populating the pools (5%)
                1f,    // The percentage of buffers to be pre-allocated during MemoryManager initialization (100%)
                true   // true to use direct buffers or false to use heap buffers
        ));
        final TCPNIOTransport transport = builder.build();
opendj-server-legacy/pom.xml
@@ -163,6 +163,16 @@
    <dependency>
      <groupId>org.forgerock.commons</groupId>
      <artifactId>forgerock-audit-handler-splunk</artifactId>
    </dependency>
    <dependency>
      <groupId>org.forgerock.commons</groupId>
      <artifactId>forgerock-audit-handler-jms</artifactId>
    </dependency>
    <dependency>
      <groupId>org.forgerock.commons</groupId>
      <artifactId>forgerock-audit-json</artifactId>
    </dependency>
@@ -670,9 +680,15 @@
                <Export-Package>org.forgerock.opendj.server.embedded</Export-Package>
                <!-- Import je changelog since it is not shipped in the main jar -->
                <Import-Package>
                  org.opends.server.replication.server.changelog.je,
                  org.opends.server.replication.server.changelog.je;resolution:=optional,
                  com.sleepycat.je*;resolution:=optional,
                  ${opendj.osgi.import}
                </Import-Package>
                <Embed-Dependency>
                  forgerock-persistit-core,
                  <!--je,-->
                  jcip-annotations
                </Embed-Dependency>
              </instructions>
            </configuration>
          </execution>
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -18,6 +18,7 @@
import static com.forgerock.reactive.RxJavaStreams.streamFromPublisher;
import static org.forgerock.opendj.io.LDAP.*;
import static org.forgerock.util.Utils.closeSilently;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
@@ -963,7 +964,12 @@
                Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                        processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                        try {
                            processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
                        } finally {
                            // We don't need the ASN1Reader anymore.
                            closeSilently(message.getContent());
                        }
                    }
                }, BackpressureStrategy.ERROR)))
                .onNext(new Consumer<Response>() {
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -433,15 +433,6 @@
        return (int) currentConfig.getMaxRequestSize();
    }
    /**
     * Retrieves the size in bytes of the LDAP response message write buffer defined for this connection handler.
     *
     * @return The size in bytes of the LDAP response message write buffer.
     */
    public int getBufferSize() {
        return (int) currentConfig.getBufferSize();
    }
    @Override
    public String getProtocol() {
        return protocol;
opendj-server-legacy/src/main/java/org/opends/server/core/BindOperationBasis.java
@@ -534,9 +534,6 @@
      setProcessingStopTime();
      logBindResponse(this);
      // Send the bind response to the client.
      clientConnection.sendResponse(this);
      // If the bind processing is finished, then unset the "bind in progress"
      // flag to allow other operations to be processed on the connection.
      if (getResultCode() != ResultCode.SASL_BIND_IN_PROGRESS)
@@ -545,6 +542,8 @@
      }
      clientConnection.finishBind();
      clientConnection.sendResponse(this);
      invokePostResponsePlugins(workflowExecuted);
    }
  }
pom.xml
@@ -48,7 +48,7 @@
        <forgerock-build-tools.version>1.0.2</forgerock-build-tools.version>
        <forgerock-doc-plugin.version>3.2.2-SNAPSHOT</forgerock-doc-plugin.version>
        <freemarker.version>2.3.24-incubating</freemarker.version>
        <grizzly-framework.version>2.3.24</grizzly-framework.version>
        <grizzly-framework.version>2.3.28</grizzly-framework.version>
        <metrics-core.version>3.1.2</metrics-core.version>
        <!-- OSGi bundles properties -->