mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
03.10.2016 86ad6a08499797f9b3204896caee947abb03394f
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

More fixes (StartTLS, SASL, blocking write, ldapv2, ...)
1 files deleted
2 files added
36 files modified
1095 ■■■■ changed files
opendj-core/clirr-ignored-api-changes.xml 13 ●●●●● patch | view | raw | blame | history
opendj-core/pom.xml 2 ●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java 10 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/com/forgerock/reactive/Stream.java 9 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java 8 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/io/LDAPWriter.java 42 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/AttributeDescription.java 20 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java 16 ●●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java 6 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java 12 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java 42 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnection.java 29 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java 4 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyUtils.java 19 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java 4 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 74 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java 36 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java 34 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslUtils.java 44 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java 114 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/StartTLSFilter.java 42 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ASN1BufferWriterTestCase.java 2 ●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java 6 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPReaderWriterTestCase.java 2 ●●● patch | view | raw | blame | history
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java 2 ●●● patch | view | raw | blame | history
opendj-server-legacy/resource/config/config.ldif 4 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Adapters.java 2 ●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java 4 ●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 368 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java 22 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/authorization/dseecompat/AciContainer.java 8 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/config/AdministrationConnector.java 12 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/core/ConnectionHandlerConfigManager.java 14 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/extensions/ExternalSASLMechanismHandler.java 6 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/extensions/SASLContext.java 7 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/protocols/ldap/LDAPStatistics.java 26 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/core/OperationTestCase.java 8 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/protocols/ldap/LdapTestCase.java 8 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/protocols/ldap/TestLDAPConnectionHandler.java 14 ●●●● patch | view | raw | blame | history
opendj-core/clirr-ignored-api-changes.xml
@@ -159,6 +159,13 @@
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
    <differenceType>7006</differenceType>
    <method>%regex[void\s*enableTLS\(javax\.net\.ssl\.SSLContext,\s*java\.lang\.String\[\],\s*java\.lang\.String\[\],\s*boolean,\s*boolean\)]</method>
    <to>boolean</to>
    <justification>Simplify management of security layer</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/ldap/LDAPClientContext</className>
    <differenceType>7012</differenceType>
    <method>void onDisconnect(org.forgerock.opendj.ldap.LDAPClientContext$DisconnectListener)</method>
    <justification>Allows to register connection state listener</justification>
@@ -219,4 +226,10 @@
    <to>%regex[org\.forgerock\.opendj\.ldap\.spi\.LDAPListenerImpl\s*getLDAPListener\(java\.util\.Set,\s*org\.forgerock\.opendj\.ldap\.ServerConnectionFactory,\s*org\.forgerock\.util\.Options\)]</to>
    <justification>Accept multiple SocketAddress to bind to</justification>
  </difference>
  <difference>
    <className>org/forgerock/opendj/io/LDAP</className>
    <differenceType>7004</differenceType>
    <method>org.forgerock.opendj.io.LDAPWriter getWriter(org.forgerock.opendj.io.ASN1Writer)</method>
    <justification>Add support for LdapV2 encoding</justification>
  </difference>
</differences>
opendj-core/pom.xml
@@ -58,7 +58,7 @@
        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.0.0-RC5</version>
            <version>2.0.0</version>
        </dependency>
        <dependency>
opendj-core/src/main/java/com/forgerock/reactive/RxJavaStreams.java
@@ -211,6 +211,16 @@
        }
        @Override
        public Stream<V> onNextDo(final Consumer<V> onNext) {
            return new RxJavaStream<>(impl.doOnNext(new io.reactivex.functions.Consumer<V>() {
                @Override
                public void accept(V value) throws Exception {
                    onNext.accept(value);
                }
            }));
        }
        @Override
        public Stream<V> onErrorDo(final Consumer<Throwable> onError) {
            return new RxJavaStream<>(impl.doOnError(new io.reactivex.functions.Consumer<Throwable>() {
                @Override
opendj-core/src/main/java/com/forgerock/reactive/Stream.java
@@ -63,6 +63,15 @@
    Stream<V> onErrorResumeWith(Function<Throwable, Publisher<V>, Exception> function);
    /**
     * Invokes the on next {@link Consumer} when this stream emits a value.
     *
     * @param onNext
     *            The {@link Consumer} to invoke when a value is emitted by this stream
     * @return a new {@link Stream}
     */
    Stream<V> onNextDo(Consumer<V> onNext);
    /**
     * Invokes the on error {@link Consumer} when an error occurs on this stream.
     *
     * @param onError
opendj-core/src/main/java/org/forgerock/opendj/io/LDAP.java
@@ -11,7 +11,7 @@
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2013-2015 ForgeRock AS.
 * Copyright 2013-2016 ForgeRock AS.
 */
package org.forgerock.opendj.io;
@@ -492,11 +492,13 @@
     *            The type of ASN.1 writer used for encoding elements.
     * @param asn1Writer
     *            The ASN.1 writer to which LDAP messages will be written.
     * @param ldapVersion
     *            Version of the protocol to use to encode the messages.
     * @return A new LDAP writer which will write LDAP messages to the provided
     *         ASN.1 writer.
     */
    public static <W extends ASN1Writer> LDAPWriter<W> getWriter(final W asn1Writer) {
        return new LDAPWriter<>(asn1Writer);
    public static <W extends ASN1Writer> LDAPWriter<W> getWriter(final W asn1Writer, final int ldapVersion) {
        return new LDAPWriter<>(asn1Writer, ldapVersion);
    }
    /**
opendj-core/src/main/java/org/forgerock/opendj/io/LDAPWriter.java
@@ -12,7 +12,7 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2009-2010 Sun Microsystems, Inc.
 * Portions copyright 2011-2013 ForgeRock AS.
 * Portions copyright 2011-2016 ForgeRock AS.
 */
package org.forgerock.opendj.io;
@@ -21,8 +21,12 @@
import java.util.List;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.Attribute;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.Entry;
import org.forgerock.opendj.ldap.LinkedAttribute;
import org.forgerock.opendj.ldap.LinkedHashMapEntry;
import org.forgerock.opendj.ldap.Modification;
import org.forgerock.opendj.ldap.controls.Control;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
@@ -57,9 +61,20 @@
    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
    private final W writer;
    private final int protocolVersion;
    LDAPWriter(final W asn1Writer) {
    LDAPWriter(final W asn1Writer, final int ldapVersion) {
        this.writer = asn1Writer;
        this.protocolVersion = ldapVersion;
    }
    /**
     * Returns the protocol version supported by this {@link LDAPWriter}.
     *
     * @return The protocol version supported by this {@link LDAPWriter}
     */
    public int getProtocolVersion() {
        return protocolVersion;
    }
    /**
@@ -105,11 +120,23 @@
        logger.trace("ENCODE LDAP ADD REQUEST(messageID=%d, request=%s)", messageID, request);
        writeMessageHeader(messageID);
        {
            LDAP.writeEntry(writer, LDAP.OP_TYPE_ADD_REQUEST, request);
            LDAP.writeEntry(writer, LDAP.OP_TYPE_ADD_REQUEST, adaptEntry(request));
        }
        writeMessageFooter(request.getControls());
    }
    private Entry adaptEntry(Entry entry) {
        if  (protocolVersion >= 3) {
            return entry;
        }
        final Entry v2entry =  new LinkedHashMapEntry(entry.getName());
        for (Attribute attribute : entry.getAllAttributes()) {
            v2entry.addAttribute(
                    new LinkedAttribute(attribute.getAttributeDescription().withoutAnyOptions(), attribute));
        }
        return v2entry;
    }
    /**
     * Writes the provided add result.
     *
@@ -565,7 +592,7 @@
        logger.trace("ENCODE LDAP SEARCH RESULT ENTRY(messageID=%d, entry=%s)", messageID, entry);
        writeMessageHeader(messageID);
        {
            LDAP.writeEntry(writer, LDAP.OP_TYPE_SEARCH_RESULT_ENTRY, entry);
            LDAP.writeEntry(writer, LDAP.OP_TYPE_SEARCH_RESULT_ENTRY, adaptEntry(entry));
        }
        writeMessageFooter(entry.getControls());
    }
@@ -582,6 +609,9 @@
     */
    public void writeSearchResultReference(final int messageID,
            final SearchResultReference reference) throws IOException {
        if (protocolVersion <= 2) {
            return;
        }
        logger.trace("ENCODE LDAP SEARCH RESULT REFERENCE(messageID=%d, reference=%s)", messageID, reference);
        writeMessageHeader(messageID);
        {
@@ -649,7 +679,7 @@
    }
    private void writeMessageFooter(final List<Control> controls) throws IOException {
        if (!controls.isEmpty()) {
        if (!controls.isEmpty() && protocolVersion >= 3) {
            writer.writeStartSequence(LDAP.TYPE_CONTROL_SEQUENCE);
            {
                for (final Control control : controls) {
@@ -675,6 +705,7 @@
        writer.writeEnumerated(rawMessage.getResultCode().intValue());
        writer.writeOctetString(rawMessage.getMatchedDN());
        writer.writeOctetString(rawMessage.getDiagnosticMessage());
        if (protocolVersion >= 3) {
        final List<String> referralURIs = rawMessage.getReferralURIs();
        if (!referralURIs.isEmpty()) {
            writer.writeStartSequence(LDAP.TYPE_REFERRAL_SEQUENCE);
@@ -687,3 +718,4 @@
        }
    }
}
}
opendj-core/src/main/java/org/forgerock/opendj/ldap/AttributeDescription.java
@@ -527,6 +527,26 @@
    }
    /**
     * Returns an attribute description having the same attribute type as this attribute description
     * except that all options has been removed.
     * <p>
     * This method is idempotent: if this attribute description does not contain
     * option then this attribute description will be returned.
     *
     * @return The new attribute description excluding all {@code option}.
     * @throws NullPointerException
     *             If {@code attributeDescription} or {@code option} was
     *             {@code null}.
     */
    public AttributeDescription withoutAnyOptions() {
        if (!optionsPimpl.hasOptions()) {
            return this;
        }
        final String newAttributeDescription = attributeDescription.substring(0, attributeDescription.indexOf(';'));
        return new AttributeDescription(newAttributeDescription, nameOrOid, attributeType, ZERO_OPTION_IMPL);
    }
    /**
     * Creates an attribute description having the provided attribute type and no options.
     *
     * @param attributeType
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -163,14 +163,20 @@
    void sendUnsolicitedNotification(ExtendedResult notification);
    /**
     * Installs the TLS/SSL security layer on the underlying connection. The
     * TLS/SSL security layer will be installed beneath any existing connection
     * security layers and can only be installed at most once.
     * Installs the TLS/SSL security layer on the underlying connection. The TLS/SSL security layer will be installed
     * beneath any existing connection security layers and can only be installed at most once.
     *
     * @param sslEngine
     *            The {@code SSLEngine} which should be used to secure the conneciton.
     *            The {@code SSLEngine} which should be used to secure the connection.
     * @param startTls
     *            Must be {@code true} if the TLS filter has to be installed as a consequence of a StartTLS request
     *            performed by a client. When {@code true} the TLS filter will be installed atomically after the first
     *            message sent to prevent race-condition.
     * @return {@code true} if the TLS filter has been enabled, {@code false} if it was already enabled.
     * @throws NullPointerException
     *             if sslEngine is null
     */
    void enableTLS(SSLEngine sslEngine);
    boolean enableTLS(SSLEngine sslEngine, boolean startTls);
    /**
     * Installs the SASL security layer on the underlying connection.
opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
@@ -392,13 +392,13 @@
                final IntermediateResponseHandler intermediateResponseHandler,
                final LdapResultHandler<R> resultHandler) throws UnsupportedOperationException {
            if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
                final R result = request.getResultDecoder().newExtendedErrorResult(ResultCode.SUCCESS, "", "");
                resultHandler.handleResult(result);
                final SSLEngine engine = sslContext.createSSLEngine();
                engine.setEnabledCipherSuites(sslContext.getServerSocketFactory().getSupportedCipherSuites());
                engine.setNeedClientAuth(false);
                engine.setUseClientMode(false);
                clientContext.enableTLS(engine);
                clientContext.enableTLS(engine, true);
                final R result = request.getResultDecoder().newExtendedErrorResult(ResultCode.SUCCESS, "", "");
                resultHandler.handleResult(result);
            }
        }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
@@ -146,18 +146,14 @@
    /** Creates a new ASN.1 writer that writes to a StreamWriter. */
    ASN1BufferWriter(MemoryManager memoryManager) {
        this.sequenceBuffer = this.rootBuffer = new RootSequenceBuffer();
        this.rootBuffer = new RootSequenceBuffer();
        this.memoryManager = memoryManager;
        this.outBuffer = memoryManager.allocate(BUFFER_INIT_SIZE);
    }
    /** Reset the writer. */
    void reset() {
        if (outBuffer.capacity() > DEFAULT_MAX_INTERNAL_BUFFER_SIZE) {
        sequenceBuffer = rootBuffer;
            outBuffer = memoryManager.allocate(BUFFER_INIT_SIZE);
        } else {
            outBuffer.clear();
        }
    }
    void ensureAdditionalCapacity(final int size) {
@@ -193,8 +189,8 @@
    /** Recycle the writer to allow re-use. */
    @Override
    public void recycle() {
        sequenceBuffer = rootBuffer;
        outBuffer = memoryManager.allocate(BUFFER_INIT_SIZE);
        sequenceBuffer = null;
        outBuffer = null;
    }
    @Override
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
@@ -23,6 +23,7 @@
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
import org.glassfish.grizzly.strategies.WorkerThreadIOStrategy;
import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
import com.forgerock.opendj.util.ReferenceCountedObject;
@@ -55,7 +56,29 @@
    protected TCPNIOTransport newInstance() {
        final TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
        /*
         * Determine which threading strategy to use, and total number of
         * threads.
         */
        final String useWorkerThreadsStr =
                System.getProperty("org.forgerock.opendj.transport.useWorkerThreads");
        final boolean useWorkerThreadStrategy;
        if (useWorkerThreadsStr != null) {
            useWorkerThreadStrategy = Boolean.parseBoolean(useWorkerThreadsStr);
        } else {
            /*
             * The most best performing strategy to use is the
             * SameThreadIOStrategy, however it can only be used in cases where
             * result listeners will not block.
             */
            useWorkerThreadStrategy = true;
        }
        if (useWorkerThreadStrategy) {
            builder.setIOStrategy(WorkerThreadIOStrategy.getInstance());
        } else {
        builder.setIOStrategy(SameThreadIOStrategy.getInstance());
        }
        // Calculate thread counts.
        final int cpus = Runtime.getRuntime().availableProcessors();
@@ -67,13 +90,30 @@
        if (selectorsStr != null) {
            selectorThreadCount = Integer.parseInt(selectorsStr);
        } else {
            selectorThreadCount = Math.max(5, (cpus / 2) - 1);
            selectorThreadCount =
                    useWorkerThreadStrategy ? Math.max(2, cpus / 4) : Math.max(5, (cpus / 2) - 1);
        }
        builder.setSelectorThreadPoolConfig(ThreadPoolConfig.defaultConfig().setCorePoolSize(
                selectorThreadCount).setMaxPoolSize(selectorThreadCount).setPoolName(
                "OpenDJ LDAP SDK Grizzly selector thread"));
        // Calculate the number of worker threads.
        if (builder.getWorkerThreadPoolConfig() != null) {
            final String workersStr = System.getProperty("org.forgerock.opendj.transport.workers");
            final int workerThreadCount;
            if (workersStr != null) {
                workerThreadCount = Integer.parseInt(workersStr);
            } else {
                workerThreadCount = useWorkerThreadStrategy ? Math.max(5, (cpus * 2)) : 0;
            }
            builder.setWorkerThreadPoolConfig(ThreadPoolConfig.defaultConfig().setCorePoolSize(
                    workerThreadCount).setMaxPoolSize(workerThreadCount).setPoolName(
                    "OpenDJ LDAP SDK Grizzly worker thread"));
        }
        // Parse IO related options.
        final String lingerStr = System.getProperty("org.forgerock.opendj.transport.linger");
        if (lingerStr != null) {
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnection.java
@@ -88,6 +88,7 @@
/** LDAP connection implementation. */
final class GrizzlyLDAPConnection implements LDAPConnectionImpl, TimeoutEventListener {
    static final int LDAP_V3 = 3;
    /**
     * A dummy SSL client engine configurator as SSLFilter only needs client
     * config. This prevents Grizzly from needlessly using JVM defaults which
@@ -186,7 +187,7 @@
    }
    private LdapPromise<Void> sendAbandonRequest(final AbandonRequest request) {
        final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
        final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
        try {
            final int messageID = nextMsgID.getAndIncrement();
            writer.writeAbandonRequest(messageID, request);
@@ -212,7 +213,8 @@
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
                final LDAPWriter<ASN1BufferWriter> writer =
                        GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
                try {
                    writer.writeAddRequest(messageID, request);
                    connection.write(writer.getASN1Writer().getBuffer(), null);
@@ -285,7 +287,8 @@
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
                final LDAPWriter<ASN1BufferWriter> writer =
                        GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
                try {
                    // Use the bind client to get the initial request instead of
                    // using the bind request passed to this method.
@@ -333,7 +336,8 @@
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
                final LDAPWriter<ASN1BufferWriter> writer =
                        GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
                try {
                    writer.writeCompareRequest(messageID, request);
                    connection.write(writer.getASN1Writer().getBuffer(), null);
@@ -363,7 +367,8 @@
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
                final LDAPWriter<ASN1BufferWriter> writer =
                        GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
                try {
                    writer.writeDeleteRequest(messageID, request);
                    connection.write(writer.getASN1Writer().getBuffer(), null);
@@ -409,7 +414,8 @@
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
                final LDAPWriter<ASN1BufferWriter> writer =
                        GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
                try {
                    writer.writeExtendedRequest(messageID, request);
                    connection.write(writer.getASN1Writer().getBuffer(), null);
@@ -454,7 +460,8 @@
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
                final LDAPWriter<ASN1BufferWriter> writer =
                        GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
                try {
                    writer.writeModifyRequest(messageID, request);
                    connection.write(writer.getASN1Writer().getBuffer(), null);
@@ -484,7 +491,8 @@
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
                final LDAPWriter<ASN1BufferWriter> writer =
                        GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
                try {
                    writer.writeModifyDNRequest(messageID, request);
                    connection.write(writer.getASN1Writer().getBuffer(), null);
@@ -524,7 +532,8 @@
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
                final LDAPWriter<ASN1BufferWriter> writer =
                        GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
                try {
                    writer.writeSearchRequest(messageID, request);
                    connection.write(writer.getASN1Writer().getBuffer(), null);
@@ -668,7 +677,7 @@
         * connection and release resources.
         */
        if (notifyClose) {
            final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager());
            final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(connection.getMemoryManager(), LDAP_V3);
            try {
                writer.writeUnbindRequest(nextMsgID.getAndIncrement(), unbindRequest);
                connection.write(writer.getASN1Writer().getBuffer(), null);
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -16,7 +16,7 @@
 */
package org.forgerock.opendj.grizzly;
import static org.forgerock.opendj.grizzly.DefaultTCPNIOTransport.DEFAULT_TRANSPORT;
import static org.forgerock.opendj.grizzly.ServerTCPNIOTransport.SERVER_TRANSPORT;
import static org.forgerock.opendj.ldap.CommonLDAPOptions.LDAP_DECODE_OPTIONS;
import static org.forgerock.opendj.ldap.LDAPListener.*;
@@ -101,7 +101,7 @@
                           LdapException> requestHandlerFactory,
            final Options options, TCPNIOTransport transport) throws IOException {
        this.transport = DEFAULT_TRANSPORT.acquireIfNull(transport);
        this.transport = SERVER_TRANSPORT.acquireIfNull(transport);
        this.options = Options.copyOf(options);
        final LDAPServerFilter serverFilter = new LDAPServerFilter(requestHandlerFactory, options,
                options.get(LDAP_DECODE_OPTIONS), options.get(MAX_CONCURRENT_REQUESTS));
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyUtils.java
@@ -47,6 +47,9 @@
    @SuppressWarnings("rawtypes")
    private static final ThreadCache.CachedTypeIndex<LDAPWriter> WRITER_INDEX = ThreadCache
            .obtainIndex(LDAPWriter.class, 1);
    @SuppressWarnings("rawtypes")
    private static final ThreadCache.CachedTypeIndex<LDAPWriter> WRITER_INDEX_V2 = ThreadCache
            .obtainIndex(LDAPWriter.class.getName() + ".ldapV2", LDAPWriter.class, 1);
    /**
     * Build a filter chain from the provided processor if possible and the
@@ -118,11 +121,11 @@
     */
    static FilterChain addFilterToChain(final Filter filter, final FilterChain chain) {
        // By default, before LDAP filter which is the last one
        if (filter instanceof SSLFilter) {
        if (filter instanceof SSLFilter || filter instanceof StartTLSFilter) {
            return FilterChainBuilder.stateless().addAll(chain).add(1, filter).build();
        }
        if (filter instanceof SaslFilter) {
            final int pos = chain.get(1) instanceof SSLFilter ? 2 : 1;
            final int pos = (chain.get(1) instanceof SSLFilter || chain.get(1) instanceof StartTLSFilter) ? 2 : 1;
            return FilterChainBuilder.stateless().addAll(chain).add(pos, filter).build();
        }
        return FilterChainBuilder.stateless().addAll(chain).add(chain.size() - 1, filter).build();
@@ -157,10 +160,12 @@
     * @return a LDAP writer
     */
    @SuppressWarnings("unchecked")
    static LDAPWriter<ASN1BufferWriter> getWriter(final MemoryManager memoryManager) {
        LDAPWriter<ASN1BufferWriter> writer = ThreadCache.takeFromCache(WRITER_INDEX);
    static LDAPWriter<ASN1BufferWriter> getWriter(final MemoryManager memoryManager, final int protocolVersion) {
        LDAPWriter<ASN1BufferWriter> writer = protocolVersion >= 3
                ? ThreadCache.takeFromCache(WRITER_INDEX)
                : ThreadCache.takeFromCache(WRITER_INDEX_V2);
        if (writer == null) {
            writer = LDAP.getWriter(new ASN1BufferWriter(memoryManager));
            writer = LDAP.getWriter(new ASN1BufferWriter(memoryManager), protocolVersion);
        }
        writer.getASN1Writer().reset();
        return writer;
@@ -176,7 +181,7 @@
     */
    static void recycleWriter(LDAPWriter<ASN1BufferWriter> writer) {
        writer.getASN1Writer().recycle();
        ThreadCache.putToCache(WRITER_INDEX, writer);
        ThreadCache.putToCache(writer.getProtocolVersion() >= 3 ? WRITER_INDEX : WRITER_INDEX_V2, writer);
    }
    static void configureConnection(final Connection<?> connection, final LocalizedLogger logger, Options options) {
@@ -184,7 +189,7 @@
         * Test shows that its much faster with non block writes but risk
         * running out of memory if the server is slow.
         */
        connection.configureBlocking(true);
        connection.configureBlocking(false);
        // Configure socket options.
        final SocketChannel channel = (SocketChannel) ((TCPNIOConnection) connection).getChannel();
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPClientFilter.java
@@ -120,8 +120,8 @@
                                // bind response.
                                final int msgID = ldapConnection.continuePendingBindRequest(promise);
                                LDAPWriter<ASN1BufferWriter> ldapWriter =
                                        GrizzlyUtils.getWriter(context.getMemoryManager());
                                LDAPWriter<ASN1BufferWriter> ldapWriter = GrizzlyUtils.getWriter(
                                                context.getMemoryManager(), GrizzlyLDAPConnection.LDAP_V3);
                                try {
                                    final GenericBindRequest nextRequest =
                                            bindClient.nextBindRequest();
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -143,14 +143,11 @@
    }
    @Override
    public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
        LDAP_CONNECTION_ATTR.remove(ctx.getConnection()).upstream.onError(error);
    }
    @Override
    public NextAction handleAccept(final FilterChainContext ctx) throws IOException {
        final Connection<?> connection = ctx.getConnection();
        configureConnection(connection, logger, connectionOptions);
        connection.configureBlocking(false);
        final ClientConnectionImpl clientContext = new ClientConnectionImpl(connection);
        LDAP_CONNECTION_ATTR.set(connection, clientContext);
        final ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> requestHandler =
@@ -258,15 +255,27 @@
    @Override
    public NextAction handleRead(final FilterChainContext ctx) throws IOException {
        final ClientConnectionImpl sub = LDAP_CONNECTION_ATTR.get(ctx.getConnection());
        return sub.upstream.handleRead(ctx);
        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.get(ctx.getConnection());
        if (clientContext != null) {
            return clientContext.handleRead(ctx);
        }
        ctx.suspend();
        return ctx.getSuspendAction();
    }
    @Override
    public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
        if (clientContext != null) {
            clientContext.exceptionOccurred(ctx, error);
        }
    }
    @Override
    public NextAction handleClose(final FilterChainContext ctx) throws IOException {
        final ClientConnectionImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
        if (clientContext != null && clientContext.upstream != null) {
            clientContext.upstream.onComplete();
        if (clientContext != null) {
            clientContext.handleClose(ctx);
        }
        return ctx.getStopAction();
    }
@@ -307,11 +316,11 @@
                }
            }
            public void onError(Throwable t) {
            public void onError(final Throwable error) {
                final Subscriber<? super LdapRawMessage> sub = subscriber;
                if (sub != null) {
                    subscriber = null;
                    sub.onError(t);
                    sub.onError(error);
                }
            }
@@ -332,12 +341,37 @@
        private final Connection<?> connection;
        private final AtomicBoolean isClosed = new AtomicBoolean(false);
        private final List<DisconnectListener> listeners = new LinkedList<>();
        private SaslServer saslServer;
        GrizzlyBackpressureSubscription upstream;
        private ClientConnectionImpl(final Connection<?> connection) {
            this.connection = connection;
        }
        NextAction handleRead(final FilterChainContext ctx)  {
            final GrizzlyBackpressureSubscription immutableRef = upstream;
            if (immutableRef != null) {
                return immutableRef.handleRead(ctx);
            }
            ctx.suspend();
            return ctx.getSuspendAction();
        }
        void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
            final GrizzlyBackpressureSubscription immutableRef = upstream;
            if (immutableRef != null) {
                immutableRef.onError(error);
            }
        }
        NextAction handleClose(final FilterChainContext ctx) {
            final GrizzlyBackpressureSubscription immutableRef = upstream;
            if (immutableRef != null) {
                immutableRef.onComplete();
            }
            return ctx.getStopAction();
        }
        Stream<LdapRawMessage> read() {
            return streamFromPublisher(new Publisher<LdapRawMessage>() {
                @Override
@@ -360,14 +394,15 @@
        }
        @Override
        public void enableTLS(final SSLEngine sslEngine) {
        public boolean enableTLS(final SSLEngine sslEngine, final boolean startTls) {
            Reject.ifNull(sslEngine, "sslEngine must not be null");
            synchronized (this) {
                if (isFilterExists(SSLFilter.class)) {
                    throw new IllegalStateException("TLS already enabled");
                    return false;
                }
                SSLUtils.setSSLEngine(connection, sslEngine);
                installFilter(new SSLFilter());
                installFilter(startTls ? new StartTLSFilter(new SSLFilter()) : new SSLFilter());
                return true;
            }
        }
@@ -376,10 +411,10 @@
            Reject.ifNull(saslServer, "saslServer must not be null");
            synchronized (this) {
                if (isFilterExists(SaslFilter.class)) {
                    throw new IllegalStateException("Sasl already enabled");
                    return;
                }
                SaslUtils.setSaslServer(connection, saslServer);
                installFilter(new SaslFilter());
                this.saslServer = saslServer;
                installFilter(new SaslFilter(saslServer));
            }
        }
@@ -412,15 +447,14 @@
        }
        private int getSaslSecurityStrengthFactor() {
            final SaslServer saslServer = SaslUtils.getSaslServer(connection);
            if (saslServer == null) {
                return 0;
            }
            int ssf = 0;
            final String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
            if (SaslUtils.SASL_AUTH_INTEGRITY.equalsIgnoreCase(qop)) {
            if (SaslFilter.SASL_AUTH_INTEGRITY.equalsIgnoreCase(qop)) {
                ssf = 1;
            } else if (SaslUtils.SASL_AUTH_CONFIDENTIALITY.equalsIgnoreCase(qop)) {
            } else if (SaslFilter.SASL_AUTH_CONFIDENTIALITY.equalsIgnoreCase(qop)) {
                final String negStrength = (String) saslServer.getNegotiatedProperty(Sasl.STRENGTH);
                if ("low".equalsIgnoreCase(negStrength)) {
                    ssf = 40;
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -33,22 +33,38 @@
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapResponseMessage;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.attributes.AttributeStorage;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
abstract class LdapCodec extends LDAPBaseFilter {
    private static final Attribute<Boolean> IS_LDAP_V2 = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
            .createAttribute(LdapCodec.class.getName() + ".IsLdapV2", Boolean.FALSE);
    private static final Attribute<Boolean> IS_LDAP_V2_PENDING = Grizzly.DEFAULT_ATTRIBUTE_BUILDER
            .createAttribute(LdapCodec.class.getName() + ".PendingLdapV2", Boolean.FALSE);
    LdapCodec(final int maxElementSize, final DecodeOptions decodeOptions) {
        super(decodeOptions, maxElementSize);
    }
    @Override
    public NextAction handleAccept(FilterChainContext ctx) throws IOException {
        // Default value mechanism of Grizzly's attribute doesn't seems to work.
        IS_LDAP_V2.set(ctx.getConnection(), Boolean.FALSE);
        return ctx.getInvokeAction();
    }
    @Override
    public NextAction handleRead(final FilterChainContext ctx) throws IOException {
        try {
            final Buffer buffer = ctx.getMessage();
            final LdapRawMessage message;
            message = readMessage(buffer);
            message = readMessage(buffer, ctx.getConnection());
            if (message != null) {
                ctx.setMessage(message);
                return ctx.getInvokeAction(getRemainingBuffer(buffer));
@@ -67,7 +83,8 @@
    protected abstract void onLdapCodecError(FilterChainContext ctx, Throwable error);
    private LdapRawMessage readMessage(final Buffer buffer) throws IOException {
    private LdapRawMessage readMessage(final Buffer buffer, final AttributeStorage attributeStorage)
            throws IOException {
        try (final ASN1BufferReader reader = new ASN1BufferReader(maxASN1ElementSize, buffer)) {
            final int packetStart = buffer.position();
            if (!reader.elementAvailable()) {
@@ -83,11 +100,12 @@
            final Buffer packet = buffer.slice(packetStart, buffer.position() + length);
            buffer.position(buffer.position() + length);
            return decodePacket(new ASN1BufferReader(maxASN1ElementSize, packet));
            return decodePacket(new ASN1BufferReader(maxASN1ElementSize, packet), attributeStorage);
        }
    }
    private LdapRawMessage decodePacket(final ASN1BufferReader reader) throws IOException {
    private LdapRawMessage decodePacket(final ASN1BufferReader reader, final AttributeStorage attributeStorage)
            throws IOException {
        reader.mark();
        try {
            reader.readStartSequence();
@@ -101,6 +119,7 @@
                reader.readStartSequence(messageType);
                protocolVersion = (int) reader.readInteger();
                rawDn = reader.readOctetStringAsString();
                IS_LDAP_V2_PENDING.set(attributeStorage, protocolVersion == 2);
                break;
            case OP_TYPE_DELETE_REQUEST:
                rawDn = reader.readOctetStringAsString(messageType);
@@ -131,7 +150,14 @@
    @Override
    public NextAction handleWrite(final FilterChainContext ctx) throws IOException {
        final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(ctx.getMemoryManager());
        final LdapResponseMessage response = ctx.<LdapResponseMessage>getMessage();
        if (response.getMessageType() == OP_TYPE_BIND_RESPONSE) {
            final Boolean isLdapV2 = IS_LDAP_V2_PENDING.remove(ctx.getConnection());
            IS_LDAP_V2.set(ctx.getConnection(), isLdapV2);
        }
        final int protocolVersion = IS_LDAP_V2.get(ctx.getConnection()) ? 2 : 3;
        final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter(ctx.getMemoryManager(), protocolVersion);
        try {
            final Buffer buffer = toBuffer(writer, ctx.<LdapResponseMessage> getMessage());
            ctx.setMessage(buffer);
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java
@@ -30,7 +30,23 @@
final class SaslFilter extends BaseFilter {
    /** Used to check if negotiated QOP is confidentiality or integrity. */
    static final String SASL_AUTH_CONFIDENTIALITY = "auth-conf";
    static final String SASL_AUTH_INTEGRITY = "auth-int";
    private static final int INT_SIZE = 4;
    private final SaslServer saslServer;
    private final boolean enableAfterNextMessage;
    SaslFilter(final SaslServer saslServer) {
        this(saslServer, true);
    }
    private SaslFilter(final SaslServer saslServer, final boolean enableAfterNextMessage) {
        this.saslServer = saslServer;
        this.enableAfterNextMessage = enableAfterNextMessage;
    }
    @Override
    public NextAction handleRead(final FilterChainContext ctx) throws IOException {
@@ -53,15 +69,14 @@
    }
    private Buffer unwrap(final FilterChainContext ctx, final Buffer buffer, final int length) throws SaslException {
        final SaslServer server = SaslUtils.getSaslServer(ctx.getConnection());
        if (buffer.hasArray()) {
            return Buffers.wrap(ctx.getMemoryManager(),
                    server.unwrap(buffer.array(), buffer.arrayOffset() + buffer.position(), length));
                    saslServer.unwrap(buffer.array(), buffer.arrayOffset() + buffer.position(), length));
        }
        final Buffer heapBuffer = toHeapBuffer(buffer, length);
        try {
            return Buffers.wrap(ctx.getMemoryManager(),
                    server.unwrap(heapBuffer.array(), heapBuffer.arrayOffset() + heapBuffer.position(), length));
                    saslServer.unwrap(heapBuffer.array(), heapBuffer.arrayOffset() + heapBuffer.position(), length));
        } finally {
            heapBuffer.dispose();
        }
@@ -77,6 +92,10 @@
    @Override
    public NextAction handleWrite(final FilterChainContext ctx) throws IOException {
        if (enableAfterNextMessage) {
            ctx.getFilterChain().set(ctx.getFilterIdx(), new SaslFilter(saslServer, false));
            return ctx.getInvokeAction();
        }
        final Buffer message = ctx.getMessage();
        ctx.setMessage(wrap(ctx, message));
        message.dispose();
@@ -84,18 +103,15 @@
    }
    private Buffer wrap(final FilterChainContext ctx, final Buffer buffer) throws SaslException {
        final SaslServer server = SaslUtils.getSaslServer(ctx.getConnection());
        final Buffer contentBuffer;
        if (buffer.hasArray()) {
            contentBuffer = Buffers.wrap(ctx.getMemoryManager(),
                    server.wrap(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()));
                    saslServer.wrap(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()));
        } else {
            final Buffer heapBuffer = toHeapBuffer(buffer, buffer.remaining());
            try {
                contentBuffer = Buffers.wrap(
                        ctx.getMemoryManager(),
                        server.wrap(heapBuffer.array(), heapBuffer.arrayOffset() + heapBuffer.position(),
                                heapBuffer.remaining()));
                contentBuffer = Buffers.wrap(ctx.getMemoryManager(), saslServer.wrap(
                        heapBuffer.array(), heapBuffer.arrayOffset() + heapBuffer.position(), heapBuffer.remaining()));
            } finally {
                heapBuffer.dispose();
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslUtils.java
File was deleted
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ServerTCPNIOTransport.java
New file
@@ -0,0 +1,114 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2010 Sun Microsystems, Inc.
 * Portions copyright 2011-2016 ForgeRock AS.
 */
package org.forgerock.opendj.grizzly;
import java.io.IOException;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
import com.forgerock.opendj.util.ReferenceCountedObject;
/**
 * The default {@link TCPNIOTransport} which all {@code LDAPConnectionFactory}s
 * and {@code LDAPListener}s will use unless otherwise specified in their
 * options.
 */
final class ServerTCPNIOTransport extends ReferenceCountedObject<TCPNIOTransport> {
    private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
    static final ServerTCPNIOTransport SERVER_TRANSPORT = new ServerTCPNIOTransport();
    private ServerTCPNIOTransport() {
        // Prevent instantiation.
    }
    @Override
    protected void destroyInstance(final TCPNIOTransport instance) {
        try {
            instance.shutdownNow();
        } catch (final IOException e) {
            // TODO: I18N
            logger.warn(LocalizableMessage.raw("An error occurred while shutting down the Grizzly transport", e));
        }
    }
    @Override
    protected TCPNIOTransport newInstance() {
        final TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
        builder.setIOStrategy(SameThreadIOStrategy.getInstance());
        // Calculate thread counts.
        final int cpus = Runtime.getRuntime().availableProcessors();
        // Calculate the number of selector threads.
        final String selectorsStr = System.getProperty("org.forgerock.opendj.transport.selectors");
        final int selectorThreadCount;
        if (selectorsStr != null) {
            selectorThreadCount = Integer.parseInt(selectorsStr);
        } else {
            selectorThreadCount = Math.max(5, (cpus / 2) - 1);
        }
        builder.setSelectorThreadPoolConfig(
                ThreadPoolConfig.defaultConfig()
                                .setCorePoolSize(selectorThreadCount)
                                .setMaxPoolSize(selectorThreadCount)
                                .setPoolName("OpenDJ LDAP SDK Grizzly selector thread"));
        // Parse IO related options.
        final String lingerStr = System.getProperty("org.forgerock.opendj.transport.linger");
        if (lingerStr != null) {
            // Disabled by default.
            builder.setLinger(Integer.parseInt(lingerStr));
        }
        final String tcpNoDelayStr =
                System.getProperty("org.forgerock.opendj.transport.tcpNoDelay");
        if (tcpNoDelayStr != null) {
            // Enabled by default.
            builder.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelayStr));
        }
        final String reuseAddressStr =
                System.getProperty("org.forgerock.opendj.transport.reuseAddress");
        if (reuseAddressStr != null) {
            // Enabled by default.
            builder.setReuseAddress(Boolean.parseBoolean(reuseAddressStr));
        }
        final TCPNIOTransport transport = builder.build();
        // FIXME: raise bug in Grizzly. We should not need to do this, but
        // failure to do so causes many deadlocks.
        transport.setSelectorRunnersCount(selectorThreadCount);
        try {
            transport.start();
        } catch (final IOException e) {
            throw new RuntimeException(e);
        }
        return transport;
    }
}
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/StartTLSFilter.java
New file
@@ -0,0 +1,42 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.forgerock.opendj.grizzly;
import java.io.IOException;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.ssl.SSLFilter;
/**
 * Implements server-side flow of StartTLS by replacing itself with a {@link SSLFilter} atomically once the first
 * message has been written. This first message is supposed to be the response of the StartTLS request which must be
 * written in clear-text mode.
 */
final class StartTLSFilter extends BaseFilter {
    private final SSLFilter sslFilter;
    StartTLSFilter(final SSLFilter sslFilter) {
        this.sslFilter = sslFilter;
    }
    @Override
    public NextAction handleWrite(FilterChainContext ctx) throws IOException {
        ctx.getFilterChain().set(ctx.getFilterIdx(), sslFilter);
        return ctx.getInvokeAction();
    }
}
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ASN1BufferWriterTestCase.java
@@ -53,7 +53,7 @@
    @Override
    protected ASN1Writer getWriter() throws IOException {
        writer.flush();
        writer.recycle();
        writer.reset();
        return writer;
    }
}
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -343,14 +343,14 @@
     * @throws Exception
     *             If an unexpected exception occurred.
     */
    @Test(timeOut = 10000)
    @Test // (timeOut = 10000)
    public void testLDAPListenerLoadBalanceDuringHandleBind() throws Exception {
        // Online server listener.
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener(new InetSocketAddress(0), onlineServerConnectionFactory);
                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
        try {
@@ -402,7 +402,7 @@
                    new MockServerConnectionFactory(proxyServerConnection);
            final LDAPListener proxyListener =
                    new LDAPListener(new InetSocketAddress(0), proxyServerConnectionFactory);
                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
            final InetSocketAddress proxyAddr =
                    (InetSocketAddress) proxyListener.getSocketAddresses().iterator().next();
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPReaderWriterTestCase.java
@@ -35,7 +35,7 @@
    @Override
    protected LDAPWriter<? extends ASN1Writer> getLDAPWriter() {
        return GrizzlyUtils.getWriter(MemoryManager.DEFAULT_MEMORY_MANAGER);
        return GrizzlyUtils.getWriter(MemoryManager.DEFAULT_MEMORY_MANAGER, 3);
    }
    @Override
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Server.java
@@ -111,7 +111,7 @@
                            @Override
                            public ServerConnection<Integer> handleAccept(final LDAPClientContext clientContext)
                                    throws LdapException {
                                clientContext.enableTLS(sslContext.createSSLEngine());
                                clientContext.enableTLS(sslContext.createSSLEngine(), false);
                                return connectionHandler.handleAccept(clientContext);
                            }
                        };
opendj-server-legacy/resource/config/config.ldif
@@ -306,7 +306,7 @@
objectClass: ds-cfg-connection-handler
objectClass: ds-cfg-ldap-connection-handler
cn: LDAP Connection Handler
ds-cfg-java-class: org.opends.server.protocols.ldap.LDAPConnectionHandler
ds-cfg-java-class: org.forgerock.opendj.reactive.LDAPConnectionHandler2
ds-cfg-enabled: true
ds-cfg-listen-address: 0.0.0.0
ds-cfg-listen-port: 389
@@ -330,7 +330,7 @@
objectClass: ds-cfg-connection-handler
objectClass: ds-cfg-ldap-connection-handler
cn: LDAPS Connection Handler
ds-cfg-java-class: org.opends.server.protocols.ldap.LDAPConnectionHandler
ds-cfg-java-class: org.forgerock.opendj.reactive.LDAPConnectionHandler2
ds-cfg-enabled: false
ds-cfg-listen-address: 0.0.0.0
ds-cfg-listen-port: 636
opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Adapters.java
@@ -146,7 +146,7 @@
                  @Override
                  public void handleInternalSearchEntry(InternalSearchOperation searchOperation,
                          SearchResultEntry searchEntry) throws DirectoryException {
                      handler.handleEntry(partiallyWrap(searchEntry));
                      handler.handleEntry(partiallyWrap(searchEntry, 3));
                  }
              };
opendj-server-legacy/src/main/java/org/forgerock/opendj/adapter/server3x/Converters.java
@@ -631,10 +631,12 @@
     *
     * @param srvResultEntry
     *          value to convert
     * @param ldapVersion
     *         Version of the ldap protocol
     * @return the converted value
     */
    public static org.forgerock.opendj.ldap.responses.SearchResultEntry partiallyWrap(
            final org.opends.server.types.SearchResultEntry srvResultEntry) {
            final org.opends.server.types.SearchResultEntry srvResultEntry, final int ldapVersion) {
        final ArrayList<Control> controls = new ArrayList<>(srvResultEntry.getControls().size());
        for(org.opends.server.types.Control control : srvResultEntry.getControls()) {
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -17,22 +17,53 @@
package org.forgerock.opendj.reactive;
import static com.forgerock.reactive.RxJavaStreams.*;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_ADD_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_BIND_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_COMPARE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_DELETE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_EXTENDED_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_INTERMEDIATE_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_DN_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_MODIFY_RESPONSE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_REQUEST;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_DONE;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_ENTRY;
import static org.forgerock.opendj.io.LDAP.OP_TYPE_SEARCH_RESULT_REFERENCE;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ProtocolMessages.*;
import static org.opends.server.loggers.AccessLogger.logDisconnect;
import static org.opends.server.protocols.ldap.LDAPConstants.*;
import static org.opends.server.util.ServerConstants.OID_START_TLS_REQUEST;
import static org.opends.server.util.StaticUtils.getExceptionMessage;
import static org.opends.server.util.StaticUtils.*;
import java.net.InetAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
import org.forgerock.i18n.LocalizableException;
import org.forgerock.i18n.LocalizableMessage;
@@ -93,12 +124,15 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.util.TimeThread;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
@@ -502,7 +536,7 @@
        }
        // Controls are not allowed for LDAPv2 clients.
        if (ldapVersion != 2) {
        if (ldapVersion != 2 && operation.getResponseControls() != null) {
            for (Control control : operation.getResponseControls()) {
                result.addControl(Converters.from(control));
            }
@@ -529,7 +563,7 @@
    }
    private Response toResponse(SearchResultEntry searchEntry) {
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry));
        return Responses.newSearchResultEntry(Converters.partiallyWrap(searchEntry, ldapVersion));
    }
    private FlowableEmitter<Response> getOut(Operation operation) {
@@ -949,12 +983,60 @@
     */
    @Override
    public Single<Stream<Response>> handle(final QueueingStrategy queueingStrategy, final LdapRawMessage message) {
        return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
        return singleFrom(streamFromPublisher(
                new BlockingBackpressureSubscription(Flowable.create(new FlowableOnSubscribe<Response>() {
            @Override
            public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
            }
        }, BackpressureStrategy.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
                }, BackpressureStrategy.ERROR))).onNextDo(new Consumer<Response>() {
                    @Override
                    public void accept(final Response response) throws Exception {
                        if (keepStats) {
                            statTracker.updateMessageWritten(
                                    toLdapResponseType(message, response), message.getMessageId());
                        }
                    }
                }));
    }
    private final byte toLdapResultType(final byte requestType) {
        switch (requestType) {
        case OP_TYPE_ADD_REQUEST:
            return OP_TYPE_ADD_RESPONSE;
        case OP_TYPE_BIND_REQUEST:
            return OP_TYPE_BIND_RESPONSE;
        case OP_TYPE_COMPARE_REQUEST:
            return OP_TYPE_COMPARE_RESPONSE;
        case OP_TYPE_DELETE_REQUEST:
            return OP_TYPE_DELETE_RESPONSE;
        case OP_TYPE_EXTENDED_REQUEST:
            return OP_TYPE_EXTENDED_RESPONSE;
        case OP_TYPE_MODIFY_DN_REQUEST:
            return OP_TYPE_MODIFY_DN_RESPONSE;
        case OP_TYPE_MODIFY_REQUEST:
            return OP_TYPE_MODIFY_RESPONSE;
        case OP_TYPE_SEARCH_REQUEST:
            return OP_TYPE_SEARCH_RESULT_DONE;
        default:
            throw new IllegalArgumentException("Unknown request: " + requestType);
        }
    }
    private final byte toLdapResponseType(final LdapRawMessage rawRequest, final Response response) {
        if (response instanceof Result) {
            return toLdapResultType(rawRequest.getMessageType());
        }
        if (response instanceof org.forgerock.opendj.ldap.responses.IntermediateResponse) {
            return OP_TYPE_INTERMEDIATE_RESPONSE;
        }
        if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultEntry) {
            return OP_TYPE_SEARCH_RESULT_ENTRY;
        }
        if (response instanceof org.forgerock.opendj.ldap.responses.SearchResultReference) {
            return OP_TYPE_SEARCH_RESULT_REFERENCE;
        }
        throw new IllegalArgumentException();
    }
    private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,
@@ -1116,10 +1198,20 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            for (String referral : de.getReferralURLs()) {
                result.addReferralURI(referral);
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && addOp.getResponseControls() != null) {
                for (Control control : addOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
@@ -1216,12 +1308,21 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newBindResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            for (String referral : de.getReferralURLs()) {
                result.addReferralURI(referral);
            final Result result = Responses.newBindResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && bindOp.getResponseControls() != null) {
                for (Control control : bindOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
@@ -1252,8 +1353,8 @@
            final List<Control> controls, final FlowableEmitter<Response> out) {
        if (ldapVersion == 2 && !controls.isEmpty()) {
            // LDAPv2 clients aren't allowed to send controls.
            out.onNext(Responses.newResult(ResultCode.PROTOCOL_ERROR).setDiagnosticMessage(
                    ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onNext(Responses.newCompareResult(ResultCode.PROTOCOL_ERROR)
                                .setDiagnosticMessage(ERR_LDAPV2_CONTROLS_NOT_ALLOWED.get().toString()));
            out.onComplete();
            disconnectControlsNotAllowed();
            return false;
@@ -1271,9 +1372,21 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final CompareResult result = Responses.newCompareResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            final CompareResult result = Responses.newCompareResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
            result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && compareOp.getResponseControls() != null) {
                for (Control control : compareOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
        }
@@ -1315,9 +1428,21 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode())
                    .setDiagnosticMessage(de.getLocalizedMessage()).setMatchedDN(de.getMatchedDN().toString());
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
            result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && deleteOp.getResponseControls() != null) {
                for (Control control : deleteOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
@@ -1371,10 +1496,21 @@
            addOperationInProgress(queueingStrategy, extendedOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            final Result result = Responses.newGenericExtendedResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
            result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && extendedOp.getResponseControls() != null) {
                for (Control control : extendedOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
        }
@@ -1415,10 +1551,21 @@
            addOperationInProgress(queueingStrategy, modifyOp);
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
            result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && modifyOp.getResponseControls() != null) {
                for (Control control : modifyOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
            }
            out.onNext(result);
            out.onComplete();
        }
@@ -1461,12 +1608,21 @@
        } catch (DirectoryException de) {
            logger.traceException(de);
            final Result result = Responses.newResult(de.getResultCode()).setDiagnosticMessage(de.getMessage())
                    .setMatchedDN(de.getMatchedDN().toString());
            final Result result = Responses.newResult(de.getResultCode());
            if (de.getLocalizedMessage() != null) {
                result.setDiagnosticMessage(de.getLocalizedMessage());
            }
            if (de.getMatchedDN() != null) {
                result.setMatchedDN(de.getMatchedDN().toString());
            }
            if (de.getReferralURLs() != null) {
            result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (ldapVersion != 2 && modifyDNOp.getResponseControls() != null) {
            for (Control control : modifyDNOp.getResponseControls()) {
                result.addControl(Converters.from(control));
            }
            }
            out.onNext(result);
            out.onComplete();
        }
@@ -1520,7 +1676,7 @@
            if (de.getReferralURLs() != null) {
                result.getReferralURIs().addAll(de.getReferralURLs());
            }
            if (searchOp.getResponseControls() != null) {
            if (ldapVersion != 2 && searchOp.getResponseControls() != null) {
                for (Control control : searchOp.getResponseControls()) {
                    result.addControl(Converters.from(control));
                }
@@ -1611,8 +1767,34 @@
    }
    @Override
    public boolean prepareTLS(LocalizableMessageBuilder unavailableReason) {
        throw new UnsupportedOperationException();
    public boolean prepareTLS(final LocalizableMessageBuilder unavailableReason) {
        // Make sure that the connection handler allows the use of the
        // StartTLS operation.
        if (!connectionHandler.allowStartTLS()) {
            unavailableReason.append(ERR_LDAP_TLS_STARTTLS_NOT_ALLOWED.get());
            return false;
        }
        try {
            if (!clientContext.enableTLS(connectionHandler.createSSLEngine(), true)) {
                unavailableReason.append(ERR_LDAP_TLS_EXISTING_SECURITY_PROVIDER.get(SSLEngine.class.getName()));
                return false;
            }
        } catch (DirectoryException de) {
            logger.traceException(de);
            unavailableReason.append(ERR_LDAP_TLS_CANNOT_CREATE_TLS_PROVIDER.get(stackTraceToSingleLineString(de)));
            return false;
        }
        return true;
    }
    /**
     * Installs the SASL security layer on the underlying connection.
     *
     * @param saslServer
     *            The {@code SaslServer} which should be used to secure the conneciton.
     */
    public void enableSASL(final SaslServer saslServer) {
        clientContext.enableSASL(saslServer);
    }
    /**
@@ -1639,11 +1821,131 @@
     * @return The array of certificates associated with a connection.
     */
    public Certificate[] getClientCertificateChain() {
        final SSLSession sslSession = clientContext.getSSLSession();
        if (sslSession != null) {
            try {
                return sslSession.getPeerCertificates();
            } catch (SSLPeerUnverifiedException e) {
                logger.traceException(e);
            }
        }
        return new Certificate[0];
    }
    @Override
    public int getSSF() {
        return 0;
        return clientContext.getSecurityStrengthFactor();
    }
    /** Upstream -> BlockingBackpressureSubscription -> Downstream */
    private final class BlockingBackpressureSubscription
            implements Subscription, Publisher<Response>, Subscriber<Response> {
        private long pendingRequests;
        private final Queue<Response> queue = new LinkedList<>();
        private final Lock lock = new ReentrantLock();
        private final Condition spaceAvailable = lock.newCondition();
        private final Publisher<Response> upstream;
        private final long writeTimeoutMillis;
        private Subscription subscription;
        private Subscriber<? super Response> downstream;
        BlockingBackpressureSubscription(final Publisher<Response> upstream) {
            this.upstream = upstream;
            this.writeTimeoutMillis = connectionHandler.getMaxBlockedWriteTimeLimit() == 0
                    ? 30000 // Do not wait indefinitely,
                    : connectionHandler.getMaxBlockedWriteTimeLimit();
        }
        @Override
        public void subscribe(final Subscriber<? super Response> subscriber) {
            if (downstream != null) {
                return;
            }
            downstream = subscriber;
            subscriber.onSubscribe(this);
            upstream.subscribe(this);
        }
        @Override
        public void onSubscribe(final Subscription s) {
            if ( subscription != null) {
                s.cancel();
                return;
            }
            subscription = s;
            subscription.request(Long.MAX_VALUE);
        }
        @Override
        public void request(long n) {
            lock.lock();
            try {
                if (pendingRequests != Long.MIN_VALUE) {
                    pendingRequests += n;
                    drain();
                }
            } finally {
                lock.unlock();
            }
        }
        private void drain() {
            Response response;
            try {
                while (pendingRequests > 0 && (response = queue.poll()) != null) {
                    downstream.onNext(response);
                    // Forward response
                    pendingRequests--;
                }
            } finally {
                spaceAvailable.signalAll();
            }
        }
        @Override
        public void onNext(final Response response) {
            lock.lock();
            try {
                while (queue.size() >= 32) {
                    try {
                        if (!spaceAvailable.await(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
                            // If we've gotten here, then the write timed out.
                            downstream.onError(new ClosedChannelException());
                            cancel();
                            return;
                        }
                    } catch (InterruptedException e) {
                        downstream.onError(e);
                        cancel();
                        return;
                    }
                }
                queue.add(response);
                drain();
            } finally {
                lock.unlock();
            }
        }
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
            cancel();
        }
        @Override
        public void onComplete() {
            downstream.onComplete();
            cancel();
        }
        @Override
        public void cancel() {
            if (subscription != null) {
                subscription.cancel();
            }
            queue.clear();
            pendingRequests = Long.MIN_VALUE;
        }
    }
}
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPConnectionHandler2.java
@@ -53,9 +53,11 @@
import org.forgerock.opendj.ldap.AddressMask;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.LDAPListener;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.spi.LdapMessages.LdapRawMessage;
import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
@@ -646,6 +648,24 @@
                    public ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>> apply(
                            LDAPClientContext clientContext) throws LdapException {
                        final LDAPClientConnection2 conn = canAccept(clientContext);
                        connectionList.add(conn);
                        clientContext.onDisconnect(new DisconnectListener() {
                            @Override
                            public void exceptionOccurred(LDAPClientContext context, Throwable error) {
                                connectionList.remove(conn);
                            }
                            @Override
                            public void connectionDisconnected(LDAPClientContext context, ResultCode resultCode,
                                    String diagnosticMessage) {
                                connectionList.remove(conn);
                            }
                            @Override
                            public void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest) {
                                connectionList.remove(conn);
                            }
                        });
                        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
                            @Override
                            public Single<Stream<Response>> handle(LDAPClientContext context, LdapRawMessage request)
@@ -798,7 +818,7 @@
        if (useSSL()) {
            try {
                clientContext.enableTLS(createSSLEngine());
                clientContext.enableTLS(createSSLEngine(), false);
            } catch (DirectoryException e) {
                throw LdapException.newLdapException(e.getResultCode(), e);
            }
opendj-server-legacy/src/main/java/org/opends/server/authorization/dseecompat/AciContainer.java
@@ -18,7 +18,7 @@
import static org.opends.server.authorization.dseecompat.Aci.*;
import static org.opends.server.authorization.dseecompat.AciHandler.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.ServerConstants.OID_GET_EFFECTIVE_RIGHTS;
import java.net.InetAddress;
import java.security.cert.Certificate;
@@ -29,12 +29,12 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.schema.AttributeType;
import org.forgerock.opendj.reactive.LDAPClientConnection2;
import org.opends.server.api.ClientConnection;
import org.opends.server.api.Group;
import org.opends.server.controls.GetEffectiveRightsRequestControl;
import org.opends.server.core.AddOperation;
import org.opends.server.core.SearchOperation;
import org.opends.server.protocols.ldap.LDAPClientConnection;
import org.opends.server.types.AuthenticationInfo;
import org.opends.server.types.AuthenticationType;
import org.opends.server.types.DirectoryException;
@@ -628,8 +628,8 @@
             */
            if (authInfo.hasAuthenticationType(AuthenticationType.SASL)
                && authInfo.hasSASLMechanism(saslMech)
                && clientConnection instanceof LDAPClientConnection) {
                LDAPClientConnection lc = (LDAPClientConnection) clientConnection;
                && clientConnection instanceof LDAPClientConnection2) {
                LDAPClientConnection2 lc = (LDAPClientConnection2) clientConnection;
                Certificate[] certChain = lc.getClientCertificateChain();
                if (certChain.length != 0) {
                  matched = EnumEvalResult.TRUE;
opendj-server-legacy/src/main/java/org/opends/server/config/AdministrationConnector.java
@@ -35,6 +35,7 @@
import org.forgerock.opendj.config.server.ConfigurationChangeListener;
import org.forgerock.opendj.ldap.AddressMask;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.reactive.LDAPConnectionHandler2;
import org.forgerock.opendj.server.config.meta.LDAPConnectionHandlerCfgDefn.SSLClientAuthPolicy;
import org.forgerock.opendj.server.config.server.AdministrationConnectorCfg;
import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
@@ -47,7 +48,6 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ServerContext;
import org.opends.server.core.SynchronousStrategy;
import org.opends.server.protocols.ldap.LDAPConnectionHandler;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.FilePermission;
import org.opends.server.types.InitializationException;
@@ -56,8 +56,8 @@
import org.opends.server.util.SetupUtils;
/**
 * This class is a wrapper on top of LDAPConnectionHandler to manage
 * the administration connector, which is an LDAPConnectionHandler
 * This class is a wrapper on top of LDAPConnectionHandler2 to manage
 * the administration connector, which is an LDAPConnectionHandler2
 * with specific (limited) configuration properties.
 */
public final class AdministrationConnector implements
@@ -73,7 +73,7 @@
  private static final String FRIENDLY_NAME = "Administration Connector";
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private LDAPConnectionHandler adminConnectionHandler;
  private LDAPConnectionHandler2 adminConnectionHandler;
  private AdministrationConnectorCfg config;
  /** Predefined values for Administration Connector configuration. */
@@ -131,7 +131,7 @@
    this.config = configuration;
    // Administration Connector uses the LDAP connection handler implementation
    adminConnectionHandler = new LDAPConnectionHandler(
    adminConnectionHandler = new LDAPConnectionHandler2(
        new SynchronousStrategy(), FRIENDLY_NAME);
    adminConnectionHandler.initializeConnectionHandler(serverContext, new LDAPConnectionCfgAdapter(config));
    adminConnectionHandler.setAdminConnectionHandler();
@@ -157,7 +157,7 @@
   *
   * @return The connection handler linked to this administration connector.
   */
  public LDAPConnectionHandler getConnectionHandler()
  public LDAPConnectionHandler2 getConnectionHandler()
  {
    return adminConnectionHandler;
  }
opendj-server-legacy/src/main/java/org/opends/server/core/ConnectionHandlerConfigManager.java
@@ -16,9 +16,9 @@
 */
package org.opends.server.core;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.ConfigMessages.ERR_CONFIG_CONNHANDLER_CANNOT_INITIALIZE;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.util.StaticUtils.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
import java.util.Map;
@@ -26,20 +26,20 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.config.ClassPropertyDefinition;
import org.forgerock.opendj.config.server.ConfigChangeResult;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.config.server.ConfigurationAddListener;
import org.forgerock.opendj.config.server.ConfigurationChangeListener;
import org.forgerock.opendj.config.server.ConfigurationDeleteListener;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.reactive.LDAPConnectionHandler2;
import org.forgerock.opendj.server.config.meta.ConnectionHandlerCfgDefn;
import org.forgerock.opendj.server.config.server.AdministrationConnectorCfg;
import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
import org.forgerock.opendj.server.config.server.RootCfg;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.config.AdministrationConnector;
import org.opends.server.protocols.ldap.LDAPConnectionHandler;
import org.forgerock.opendj.config.server.ConfigChangeResult;
import org.forgerock.opendj.ldap.DN;
import org.opends.server.types.InitializationException;
/**
@@ -263,7 +263,7 @@
    // Put this connection handler in the hash so that we will be
    // able to find it if it is altered.
    LDAPConnectionHandler connectionHandler = ac.getConnectionHandler();
    LDAPConnectionHandler2 connectionHandler = ac.getConnectionHandler();
    connectionHandlers.put(administrationConnectorCfg.dn(), connectionHandler);
    // Register the connection handler with the Directory Server.
opendj-server-legacy/src/main/java/org/opends/server/extensions/ExternalSASLMechanismHandler.java
@@ -27,6 +27,7 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.schema.AttributeType;
import org.forgerock.opendj.reactive.LDAPClientConnection2;
import org.forgerock.opendj.server.config.server.ExternalSASLMechanismHandlerCfg;
import org.forgerock.opendj.server.config.server.SASLMechanismHandlerCfg;
import org.opends.server.api.CertificateMapper;
@@ -34,7 +35,6 @@
import org.opends.server.api.SASLMechanismHandler;
import org.opends.server.core.BindOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.ldap.LDAPClientConnection;
import org.opends.server.types.Attribute;
import org.opends.server.types.AuthenticationInfo;
import org.forgerock.opendj.ldap.DN;
@@ -152,13 +152,13 @@
      return;
    }
    if(!(clientConnection instanceof LDAPClientConnection)) {
    if(!(clientConnection instanceof LDAPClientConnection2)) {
        bindOperation.setResultCode(ResultCode.INVALID_CREDENTIALS);
        LocalizableMessage message = ERR_SASLEXTERNAL_NOT_LDAP_CLIENT_INSTANCE.get();
        bindOperation.setAuthFailureReason(message);
        return;
    }
    LDAPClientConnection lc = (LDAPClientConnection) clientConnection;
    LDAPClientConnection2 lc = (LDAPClientConnection2) clientConnection;
    Certificate[] clientCertChain = lc.getClientCertificateChain();
    if (clientCertChain == null || clientCertChain.length == 0) {
      bindOperation.setResultCode(ResultCode.INVALID_CREDENTIALS);
opendj-server-legacy/src/main/java/org/opends/server/extensions/SASLContext.java
@@ -44,6 +44,7 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.reactive.LDAPClientConnection2;
import org.ietf.jgss.GSSException;
import org.opends.server.api.AuthenticationPolicyState;
import org.opends.server.api.ClientConnection;
@@ -370,10 +371,8 @@
      // use in later processing.
      if (isConfidentialIntegrity())
      {
        final SASLByteChannel saslByteChannel = SASLByteChannel
            .getSASLByteChannel(clientConn, mechanism, this);
        final LDAPClientConnection ldapConn = (LDAPClientConnection) clientConn;
        ldapConn.setSASLPendingProvider(saslByteChannel);
        final LDAPClientConnection2 ldapConn = (LDAPClientConnection2) clientConn;
        ldapConn.enableSASL(saslServer);
      }
      else
      {
opendj-server-legacy/src/main/java/org/opends/server/protocols/ldap/LDAPStatistics.java
@@ -407,14 +407,14 @@
   * Updates the appropriate set of counters based on the provided
   * message that has been written to the client.
   *
   * @param message
   *          The message that was written to the client.
   * @param messageType
   *          The message type that was written to the client.
   * @param messageId
   *          The message id that was written to the client
   */
  public void updateMessageWritten(LDAPMessage message)
  {
  public void updateMessageWritten(byte messageType, int messageId) {
      messagesWritten.getAndIncrement();
      switch (message.getProtocolOp().getType())
      switch (messageType)
      {
      case OP_TYPE_ADD_RESPONSE:
        addResponses.getAndIncrement();
@@ -437,7 +437,7 @@
        // We don't want to include unsolicited notifications as
        // "completed" operations.
        if (message.getMessageID() > 0)
        if (messageId > 0)
        {
          operationsCompleted.getAndIncrement();
        }
@@ -464,6 +464,18 @@
  }
  /**
   * Updates the appropriate set of counters based on the provided
   * message that has been written to the client.
   *
   * @param message
   *          The message that was written to the client.
   */
  public void updateMessageWritten(LDAPMessage message)
  {
      updateMessageWritten(message.getProtocolOp().getType(), message.getMessageID());
  }
  /**
   * Updates the appropriate set of counters to indicate that an
   * operation was abandoned without sending a response to the client.
   */
opendj-server-legacy/src/test/java/org/opends/server/core/OperationTestCase.java
@@ -12,15 +12,15 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2006-2010 Sun Microsystems, Inc.
 * Portions Copyright 2013-2015 ForgeRock AS.
 * Portions Copyright 2013-2016 ForgeRock AS.
 */
package org.opends.server.core;
import static org.testng.Assert.*;
import org.forgerock.opendj.reactive.LDAPConnectionHandler2;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.protocols.ldap.LDAPConnectionHandler;
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPStatistics;
import org.opends.server.types.Control;
@@ -57,9 +57,9 @@
  {
    for (ConnectionHandler ch : DirectoryServer.getConnectionHandlers())
    {
      if (ch instanceof LDAPConnectionHandler)
      if (ch instanceof LDAPConnectionHandler2)
      {
        LDAPConnectionHandler lch = (LDAPConnectionHandler) ch;
          LDAPConnectionHandler2 lch = (LDAPConnectionHandler2) ch;
        if (lch.useSSL())
        {
          ldapsStatistics = lch.getStatTracker();
opendj-server-legacy/src/test/java/org/opends/server/protocols/ldap/LdapTestCase.java
@@ -17,13 +17,13 @@
package org.opends.server.protocols.ldap;
import static org.mockito.Mockito.mock;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.config.ConfigConstants.ATTR_LISTEN_PORT;
import java.util.Iterator;
import java.util.List;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.reactive.LDAPConnectionHandler2;
import org.forgerock.opendj.server.config.meta.LDAPConnectionHandlerCfgDefn;
import org.forgerock.opendj.server.config.server.LDAPConnectionHandlerCfg;
import org.opends.server.DirectoryServerTestCase;
@@ -97,14 +97,14 @@
   * @return Returns the new LDAP connection handler.
   * @throws Exception if the handler cannot be initialized.
   */
  static LDAPConnectionHandler getLDAPHandlerInstance(Entry handlerEntry)
  static LDAPConnectionHandler2 getLDAPHandlerInstance(Entry handlerEntry)
      throws Exception
  {
    long serverLdapPort = TestCaseUtils.findFreePort();
    Attribute a = Attributes.create(ATTR_LISTEN_PORT, String.valueOf(serverLdapPort));
    handlerEntry.addAttribute(a, null);
    LDAPConnectionHandlerCfg config = getConfiguration(handlerEntry);
    LDAPConnectionHandler handler = new LDAPConnectionHandler();
    LDAPConnectionHandler2 handler = new LDAPConnectionHandler2();
    handler.initializeConnectionHandler(mock(ServerContext.class), config);
    return handler;
  }
opendj-server-legacy/src/test/java/org/opends/server/protocols/ldap/TestLDAPConnectionHandler.java
@@ -32,6 +32,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.Attribute;
import org.forgerock.opendj.ldap.schema.AttributeType;
import org.forgerock.opendj.reactive.LDAPConnectionHandler2;
import org.opends.server.types.Attributes;
import org.forgerock.opendj.ldap.DN;
import org.opends.server.types.Entry;
@@ -73,12 +74,12 @@
        "objectClass: ds-cfg-connection-handler",
        "objectClass: ds-cfg-ldap-connection-handler",
        "cn: LDAP Connection Handler",
        "ds-cfg-java-class: org.opends.server.protocols.ldap.LDAPConnectionHandler",
        "ds-cfg-java-class: org.forgerock.opendj.reactive.LDAPConnectionHandler2",
        "ds-cfg-enabled: true",
        "ds-cfg-listen-address: 0.0.0.0",
        "ds-cfg-accept-backlog: 128",
        "ds-cfg-allow-ldap-v2: false",
        "ds-cfg-keep-stats: false",
        "ds-cfg-keep-stats: true",
        "ds-cfg-use-tcp-keep-alive: true",
        "ds-cfg-use-tcp-no-delay: true",
        "ds-cfg-allow-tcp-reuse-address: true",
@@ -91,10 +92,9 @@
        "ds-cfg-ssl-cert-nickname: server-cert",
        "ds-cfg-key-manager-provider: cn=JKS,cn=Key Manager Providers,cn=config",
        "ds-cfg-trust-manager-provider: cn=JKS,cn=Trust Manager Providers,cn=config");
    LDAPConnectionHandler LDAPConnHandler=getLDAPHandlerInstance(LDAPHandlerEntry);
    LDAPConnectionHandler2 LDAPConnHandler=getLDAPHandlerInstance(LDAPHandlerEntry);
    LDAPConnHandler.allowLDAPv2();
    LDAPConnHandler.allowStartTLS();
    LDAPConnHandler.keepStats();
    LDAPConnHandler.toString(new StringBuilder());
    LDAPConnHandler.toString();
    LDAPStatistics tracker=LDAPConnHandler.getStatTracker();
@@ -118,7 +118,7 @@
    Attribute startTls1=Attributes.create(ATTR_ALLOW_STARTTLS, String.valueOf(false));
    LDAPHandlerEntry.addAttribute(useSSL1,null);
    LDAPHandlerEntry.addAttribute(startTls1,null);
    LDAPConnectionHandler LDAPSConnHandler = getLDAPHandlerInstance(LDAPHandlerEntry);
    LDAPConnectionHandler2 LDAPSConnHandler = getLDAPHandlerInstance(LDAPHandlerEntry);
    LDAPSConnHandler.finalizeConnectionHandler(reasonMsg);
    LDAPConnHandler.processServerShutdown(reasonMsg);
  }
@@ -209,7 +209,7 @@
        "objectClass: ds-cfg-connection-handler",
        "objectClass: ds-cfg-ldap-connection-handler",
        "cn: LDAP Connection Handler",
        "ds-cfg-java-class: org.opends.server.protocols.ldap.LDAPConnectionHandler",
        "ds-cfg-java-class: org.forgerock.opendj.reactive.LDAPConnectionHandler2",
        "ds-cfg-enabled: true",
        "ds-cfg-listen-address: 0.0.0.0",
        "ds-cfg-accept-backlog: 128",
@@ -227,7 +227,7 @@
        "ds-cfg-ssl-cert-nickname: server-cert",
        "ds-cfg-key-manager-provider: cn=JKS,cn=Key Manager Providers,cn=config",
        "ds-cfg-trust-manager-provider: cn=JKS,cn=Trust Manager Providers,cn=config");
    LDAPConnectionHandler LDAPConnHandler=getLDAPHandlerInstance(GoodHandlerEntry);
    LDAPConnectionHandler2 LDAPConnHandler=getLDAPHandlerInstance(GoodHandlerEntry);
    //Make attrTypes to remove
    AttributeType at0=DirectoryServer.getSchema().getAttributeType(ATTR_LISTEN_PORT);
//    AttributeType at1=DirectoryServer.getAttributeType(ATTR_LISTEN_ADDRESS, true);