mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
25.01.2016 3384638f67ebb7895dd095433d1380dde0b40f3d
OPENDJ-3179: Migrate LDAP Connection Handler to SDK Grizzly transport

Fixes regarding comments on the PR made so far.
26 files modified
287 ■■■■■ changed files
opendj-core/pom.xml 2 ●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java 4 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java 15 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java 4 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java 11 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LDAPListenerImpl.java 4 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java 29 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java 4 ●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java 7 ●●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java 14 ●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java 8 ●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java 4 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java 13 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java 9 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java 22 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java 37 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java 14 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java 7 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java 3 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java 4 ●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java 8 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransportTestCase.java 10 ●●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java 4 ●●●● patch | view | raw | blame | history
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java 41 ●●●● patch | view | raw | blame | history
opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/Utils.java 5 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java 4 ●●●● patch | view | raw | blame | history
opendj-core/pom.xml
@@ -58,7 +58,7 @@
        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.0.0-RC3</version>
            <version>2.0.0-RC5</version>
        </dependency>
        <dependency>
opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
@@ -28,7 +28,7 @@
 * {@code ByteSequenceReader} must be created using the associated
 * {@code ByteSequence}'s {@code asReader()} method.
 */
public class ByteSequenceReader {
public final class ByteSequenceReader {
    /** The current position in the byte sequence. */
    private int pos;
@@ -55,7 +55,7 @@
     * @param sequence
     *            The byte sequence to be read.
     */
    public ByteSequenceReader(final ByteSequence sequence) {
    ByteSequenceReader(final ByteSequence sequence) {
        this.sequence = sequence;
    }
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPClientContext.java
@@ -37,17 +37,17 @@
    /** Listens for disconnection event. */
    public interface DisconnectListener {
        /**
         * Invoked when the connection has been disconnected because of an error (i.e: message too big).
         * Invoked when the connection has been disconnected because of an error (e.g: message too big).
         *
         * @param context
         *            The {@link LDAPClientContext} which has failed
         * @param error
         *            The error
         */
        void exceptionOccurred(final LDAPClientContext context, final Throwable error);
        void exceptionOccurred(LDAPClientContext context, Throwable error);
        /**
         * Invoked when the client closes the connection, possibly using an unbind request.
         * Invoked when the client closed the connection, possibly using an unbind request.
         *
         * @param context
         *            The {@link LDAPClientContext} which has been disconnected
@@ -55,7 +55,7 @@
         *            The unbind request, which may be {@code null} if one was not sent before the connection was
         *            closed.
         */
        void connectionClosed(final LDAPClientContext context, final UnbindRequest unbindRequest);
        void connectionClosed(LDAPClientContext context, UnbindRequest unbindRequest);
        /**
         * Invoked when the connection has been disconnected by the server.
@@ -68,8 +68,7 @@
         * @param diagnosticMessage
         *            The diagnostic message, which may be empty or {@code null} indicating that none was provided.
         */
        void connectionDisconnected(final LDAPClientContext context, final ResultCode resultCode,
                final String diagnosticMessage);
        void connectionDisconnected(LDAPClientContext context, ResultCode resultCode, String diagnosticMessage);
    }
    /**
@@ -77,7 +76,7 @@
     *
     * @param listener The {@link DisconnectListener} to register.
     */
    void onDisconnect(final DisconnectListener listener);
    void onDisconnect(DisconnectListener listener);
    /**
     * Disconnects the client without sending a disconnect notification. Invoking this method causes
@@ -97,7 +96,7 @@
     * @param diagnosticMessage
     *            The diagnostic message to include with the disconnect notification
     */
    void disconnect(final ResultCode resultCode, final String diagnosticMessage);
    void disconnect(ResultCode resultCode, String diagnosticMessage);
    /**
     * Returns the {@code InetSocketAddress} associated with the local system.
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPConnectionFactory.java
@@ -17,7 +17,7 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.ldap.CoreMessages.HBCF_CONNECTION_CLOSED_BY_CLIENT;
import static com.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_FAILED;
import static com.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_UNEXPECTED;
import static com.forgerock.opendj.ldap.CoreMessages.HBCF_HEARTBEAT_TIMEOUT;
import static com.forgerock.opendj.ldap.CoreMessages.LDAP_CONNECTION_CONNECT_TIMEOUT;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
@@ -502,7 +502,7 @@
                        connectException = newHeartBeatTimeoutError();
                    } else {
                        connectException = newLdapException(ResultCode.CLIENT_SIDE_SERVER_DOWN,
                                                            HBCF_HEARTBEAT_FAILED.get(),
                                                            ERR_CONNECTION_UNEXPECTED.get(e),
                                                            e);
                    }
                    if (promise.tryHandleException(connectException)) {
opendj-core/src/main/java/org/forgerock/opendj/ldap/LDAPListener.java
@@ -19,8 +19,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Collections;
import java.util.Set;
import org.forgerock.opendj.ldap.spi.LDAPListenerImpl;
@@ -194,14 +193,12 @@
     *             If {@code address}, {code factory}, or {@code options} was
     *             {@code null}.
     */
    public LDAPListener(final SocketAddress address,
    public LDAPListener(final InetSocketAddress address,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
            final Options options) throws IOException {
        Reject.ifNull(address, factory, options);
        this.provider = getTransportProvider(options);
        final Set<SocketAddress> addresses = new HashSet<>();
        addresses.add(address);
        this.impl = provider.getLDAPListener(addresses, factory, options);
        this.impl = provider.getLDAPListener(Collections.singleton(address), factory, options);
    }
    /**
@@ -263,7 +260,7 @@
     *
     * @return The address that this LDAP listener is listening on.
     */
    public Set<? extends SocketAddress> getSocketAddresses() {
    public Set<InetSocketAddress> getSocketAddresses() {
        return impl.getSocketAddresses();
    }
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LDAPListenerImpl.java
@@ -16,7 +16,7 @@
package org.forgerock.opendj.ldap.spi;
import java.io.Closeable;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.Set;
/**
@@ -36,7 +36,7 @@
     *
     * @return The addresses that this LDAP listener is listening on.
     */
    Set<? extends SocketAddress> getSocketAddresses();
    Set<InetSocketAddress> getSocketAddresses();
    /**
     * Closes this stream and releases any system resources associated
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/LdapMessages.java
@@ -18,7 +18,6 @@
import org.forgerock.opendj.io.ASN1Reader;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.responses.Response;
import org.forgerock.opendj.ldap.schema.Schema;
/**
 * Contains statics methods to create ldap messages.
@@ -40,15 +39,13 @@
     *            Protocol version to use (only for Bind requests)
     * @param rawDn
     *            Unparsed name contained in the request (or null if DN is not applicable)
     * @param schema
     *            Schema to use to parse the DN
     * @param reader
     *            An {@link ASN1Reader} containing the full encoded ldap message packet.
     * @return A new {@link LdapRawMessage}
     */
    public static LdapRawMessage newRawMessage(final byte messageType, final int messageId, final int protocolVersion,
            final String rawDn, final Schema schema, final ASN1Reader reader) {
        return new LdapRawMessage(messageType, messageId, protocolVersion, rawDn, schema, reader);
            final String rawDn, final ASN1Reader reader) {
        return new LdapRawMessage(messageType, messageId, protocolVersion, rawDn, reader);
    }
    /**
@@ -73,16 +70,13 @@
     */
    public static final class LdapRawMessage extends LdapMessageEnvelope<ASN1Reader> {
        private final String rawDn;
        private final Schema schema;
        private final int version;
        private DN dn;
        private LdapRawMessage(final byte messageType, final int messageId, final int version, final String rawDn,
                final Schema schema, final ASN1Reader content) {
                final ASN1Reader content) {
            super(messageType, messageId, content);
            this.version = version;
            this.rawDn = rawDn;
            this.schema = schema;
        }
        /**
@@ -102,23 +96,6 @@
        public String getRawDn() {
            return rawDn;
        }
        /**
         * Get the decoded form of the {@link DN} contained in the message (or null if the message doesn't contains a
         * DN).
         *
         * @return The decoded {@link DN} contained in the request, or null if the message doesn't contains a DN.
         */
        public DN getDn() {
            if (rawDn == null) {
                return null;
            }
            if (dn != null) {
                return dn;
            }
            dn = DN.valueOf(rawDn.toString(), schema);
            return dn;
        }
    }
    /**
opendj-core/src/main/java/org/forgerock/opendj/ldap/spi/TransportProvider.java
@@ -16,7 +16,7 @@
package org.forgerock.opendj.ldap.spi;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
@@ -64,7 +64,7 @@
     *             If an error occurred while trying to listen on the provided
     *             address.
     */
    LDAPListenerImpl getLDAPListener(Set<? extends SocketAddress> addresses,
    LDAPListenerImpl getLDAPListener(Set<InetSocketAddress> addresses,
            ServerConnectionFactory<LDAPClientContext, Integer> factory, Options options)
            throws IOException;
}
opendj-core/src/test/java/org/forgerock/opendj/ldap/LDAPServer.java
@@ -19,7 +19,7 @@
import static org.forgerock.opendj.ldap.LDAPListener.CONNECT_MAX_BACKLOG;
import static org.forgerock.opendj.ldap.LdapException.newLdapException;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.forgerock.opendj.ldap.TestCaseUtils.loopbackWithDynamicPort;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -226,8 +226,7 @@
                        props.put(Sasl.QOP, "auth-conf,auth-int,auth");
                        saslServer =
                                Sasl.createSaslServer(saslMech, "ldap",
                                       ((InetSocketAddress) listener.getSocketAddresses().iterator().next())
                                       .getHostName(),
                                       listener.getSocketAddresses().iterator().next().getHostName(),
                                       props,
                                        new CallbackHandler() {
                                            @Override
@@ -518,7 +517,7 @@
            return;
        }
        sslContext = new SSLContextBuilder().getSSLContext();
        listener = new LDAPListener(findFreeSocketAddress(), getInstance(),
        listener = new LDAPListener(loopbackWithDynamicPort(), getInstance(),
                        Options.defaultOptions().set(CONNECT_MAX_BACKLOG, 4096));
        isRunning = true;
    }
opendj-core/src/test/java/org/forgerock/opendj/ldap/TestCaseUtils.java
@@ -12,13 +12,14 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2009-2010 Sun Microsystems, Inc.
 * Portions Copyright 2012-2015 ForgeRock AS.
 * Portions Copyright 2012-2016 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.List;
@@ -84,6 +85,15 @@
    }
    /**
     * Create a new {@link InetSocketAddress} configured with loopback address and dynamic port 0
     *
     * @return A new loopback {@link InetSocketAddress}
     */
    public static InetSocketAddress loopbackWithDynamicPort() {
        return new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
    }
    /**
     * Finds a free server socket port on the local host.
     *
     * @return The free port.
@@ -91,7 +101,7 @@
    public static InetSocketAddress findFreeSocketAddress() {
        try (ServerSocket serverLdapSocket = new ServerSocket()) {
            serverLdapSocket.setReuseAddress(true);
            serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0));
            serverLdapSocket.bind(loopbackWithDynamicPort());
            return (InetSocketAddress) serverLdapSocket.getLocalSocketAddress();
        } catch (IOException e) {
            throw new RuntimeException(e);
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicLDAPListener.java
@@ -17,7 +17,7 @@
package org.forgerock.opendj.ldap.spi;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
@@ -29,7 +29,7 @@
 */
public final class BasicLDAPListener implements LDAPListenerImpl {
    private final ServerConnectionFactory<LDAPClientContext, Integer> connectionFactory;
    private final Set<? extends SocketAddress> socketAddresses;
    private final Set<InetSocketAddress> socketAddresses;
    /**
     * Creates a new LDAP listener implementation which does nothing.
@@ -44,7 +44,7 @@
     * @throws IOException
     *             is never thrown with this do-nothing implementation
     */
    public BasicLDAPListener(final Set<? extends SocketAddress> addresses,
    public BasicLDAPListener(final Set<InetSocketAddress> addresses,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory,
            final Options options) throws IOException {
        this.connectionFactory = factory;
@@ -57,7 +57,7 @@
    }
    @Override
    public Set<? extends SocketAddress> getSocketAddresses() {
    public Set<InetSocketAddress> getSocketAddresses() {
        return socketAddresses;
    }
opendj-core/src/test/java/org/forgerock/opendj/ldap/spi/BasicTransportProvider.java
@@ -16,7 +16,7 @@
package org.forgerock.opendj.ldap.spi;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.Set;
import org.forgerock.opendj.ldap.LDAPClientContext;
@@ -44,7 +44,7 @@
    }
    @Override
    public LDAPListenerImpl getLDAPListener(Set<? extends SocketAddress> addresses,
    public LDAPListenerImpl getLDAPListener(Set<InetSocketAddress> addresses,
            ServerConnectionFactory<LDAPClientContext, Integer> factory, Options options) throws IOException {
        return new BasicLDAPListener(addresses, factory, options);
    }
opendj-grizzly/src/main/java/com/forgerock/opendj/grizzly/GrizzlyTransportProvider.java
@@ -18,7 +18,7 @@
import static com.forgerock.reactive.RxJavaStreams.*;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.Set;
import org.forgerock.opendj.grizzly.GrizzlyLDAPConnectionFactory;
@@ -29,6 +29,7 @@
import org.forgerock.opendj.io.LDAPReader;
import org.forgerock.opendj.ldap.CommonLDAPOptions;
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.LDAPClientContext;
import org.forgerock.opendj.ldap.LDAPClientContext.DisconnectListener;
import org.forgerock.opendj.ldap.LdapException;
@@ -58,9 +59,9 @@
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableEmitter.BackpressureMode;
import io.reactivex.FlowableOnSubscribe;
/**
@@ -74,7 +75,7 @@
    }
    @Override
    public LDAPListenerImpl getLDAPListener(final Set<? extends SocketAddress> addresses,
    public LDAPListenerImpl getLDAPListener(final Set<InetSocketAddress> addresses,
            final ServerConnectionFactory<LDAPClientContext, Integer> factory, final Options options)
            throws IOException {
        return new GrizzlyLDAPListener(addresses, options,
@@ -117,12 +118,12 @@
            }
        });
        final DecodeOptions decodeOptions = options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS);
        return new ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>() {
            @Override
            public Single<Stream<Response>> handle(final LDAPClientContext context,
                    final LdapRawMessage rawRequest) throws Exception {
                final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(),
                        options.get(CommonLDAPOptions.LDAP_DECODE_OPTIONS));
                final LDAPReader<ASN1Reader> reader = LDAP.getReader(rawRequest.getContent(), decodeOptions);
                return singleFrom(streamFromPublisher(Flowable.create(new FlowableOnSubscribe<Response>() {
                    @Override
                    public void subscribe(final FlowableEmitter<Response> emitter) throws Exception {
@@ -189,7 +190,7 @@
                        });
                        emitter.onComplete();
                    }
                }, BackpressureMode.ERROR)));
                }, BackpressureStrategy.ERROR)));
            }
        };
    }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferReader.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -137,14 +138,14 @@
    private SequenceLimiter readLimiter;
    /**
     * Creates a new ASN1 reader whose source is the provided input stream and
     * Creates a new ASN1 reader whose source is the provided buffer and
     * having a user defined maximum BER element size.
     *
     * @param maxElementSize
     *            The maximum BER element size, or <code>0</code> to indicate
     *            that there is no limit.
     * @param memoryManager
     *            The memory manager to use for buffering.
     * @param buffer
     *            The buffer where the content will be read from.
     */
    ASN1BufferReader(final int maxElementSize, final Buffer buffer) {
        this.readLimiter = new RootSequenceLimiter();
@@ -392,7 +393,7 @@
        String str;
        try {
            str = buffer.toStringContent(Charset.forName("UTF-8"), buffer.position(), buffer.position() + peekLength);
            str = buffer.toStringContent(StandardCharsets.UTF_8, buffer.position(), buffer.position() + peekLength);
        } catch (final Exception e) {
            // TODO: I18N
            logger.warn(LocalizableMessage.raw("Unable to decode ASN.1 OCTETSTRING bytes as UTF-8 string: %s", e));
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/ASN1BufferWriter.java
@@ -139,15 +139,6 @@
    /** Default maximum size for cached protocol/entry encoding buffers. */
    private static final int DEFAULT_MAX_INTERNAL_BUFFER_SIZE = 32 * 1024;
    /** Reset the writer. */
    void reset() {
        if (outBuffer.capacity() > DEFAULT_MAX_INTERNAL_BUFFER_SIZE) {
            outBuffer = memoryManager.allocate(BUFFER_INIT_SIZE);
        } else {
            outBuffer.clear();
        }
    }
    private final MemoryManager<Buffer> memoryManager;
    private SequenceBuffer sequenceBuffer;
    private Buffer outBuffer;
@@ -160,6 +151,15 @@
        this.outBuffer = memoryManager.allocate(BUFFER_INIT_SIZE);
    }
    /** Reset the writer. */
    void reset() {
        if (outBuffer.capacity() > DEFAULT_MAX_INTERNAL_BUFFER_SIZE) {
            outBuffer = memoryManager.allocate(BUFFER_INIT_SIZE);
        } else {
            outBuffer.clear();
        }
    }
    void ensureAdditionalCapacity(final int size) {
        final int newCount = outBuffer.position() + size;
        if (newCount > outBuffer.capacity()) {
@@ -386,10 +386,6 @@
        sequenceBuffer.writeByte(type);
        writeLength(sequenceBuffer, value.length());
        sequenceBuffer.writeByteSequence(value);
//        // TODO: Is there a more efficient way to do this?
//        for (int i = 0; i < value.length(); i++) {
//            sequenceBuffer.writeByte(value.byteAt(i));
//        }
        if (logger.isTraceEnabled()) {
            logger.trace("WRITE ASN.1 OCTETSTRING(type=0x%x, length=%d)", type, value.length());
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransport.java
@@ -23,7 +23,6 @@
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import org.glassfish.grizzly.nio.transport.TCPNIOTransportBuilder;
import org.glassfish.grizzly.strategies.SameThreadIOStrategy;
import org.glassfish.grizzly.strategies.WorkerThreadIOStrategy;
import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
import com.forgerock.opendj.util.ReferenceCountedObject;
@@ -56,24 +55,7 @@
    protected TCPNIOTransport newInstance() {
        final TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
        /*
         * Determine which threading strategy to use, and total number of
         * threads.
         */
        final String useWorkerThreadsStr =
                System.getProperty("org.forgerock.opendj.transport.useWorkerThreads");
        final boolean useWorkerThreadStrategy;
        if (useWorkerThreadsStr != null) {
            useWorkerThreadStrategy = Boolean.parseBoolean(useWorkerThreadsStr);
        } else {
            useWorkerThreadStrategy = false;
        }
        if (useWorkerThreadStrategy) {
            builder.setIOStrategy(WorkerThreadIOStrategy.getInstance());
        } else {
            builder.setIOStrategy(SameThreadIOStrategy.getInstance());
        }
        // Calculate thread counts.
        final int cpus = Runtime.getRuntime().availableProcessors();
@@ -85,30 +67,13 @@
        if (selectorsStr != null) {
            selectorThreadCount = Integer.parseInt(selectorsStr);
        } else {
            selectorThreadCount =
                    useWorkerThreadStrategy ? Math.max(2, cpus / 4) : Math.max(5, (cpus / 2) - 1);
            selectorThreadCount = Math.max(5, (cpus / 2) - 1);
        }
        builder.setSelectorThreadPoolConfig(ThreadPoolConfig.defaultConfig().setCorePoolSize(
                selectorThreadCount).setMaxPoolSize(selectorThreadCount).setPoolName(
                "OpenDJ LDAP SDK Grizzly selector thread"));
        // Calculate the number of worker threads.
        if (builder.getWorkerThreadPoolConfig() != null) {
            final String workersStr = System.getProperty("org.forgerock.opendj.transport.workers");
            final int workerThreadCount;
            if (workersStr != null) {
                workerThreadCount = Integer.parseInt(workersStr);
            } else {
                workerThreadCount = useWorkerThreadStrategy ? Math.max(5, (cpus * 2)) : 0;
            }
            builder.setWorkerThreadPoolConfig(ThreadPoolConfig.defaultConfig().setCorePoolSize(
                    workerThreadCount).setMaxPoolSize(workerThreadCount).setPoolName(
                    "OpenDJ LDAP SDK Grizzly worker thread"));
        }
        // Parse IO related options.
        final String lingerStr = System.getProperty("org.forgerock.opendj.transport.linger");
        if (lingerStr != null) {
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListener.java
@@ -21,7 +21,7 @@
import static org.forgerock.opendj.ldap.LDAPListener.*;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -56,7 +56,7 @@
    private final ReferenceCountedObject<TCPNIOTransport>.Reference transport;
    private final Collection<TCPNIOServerConnection> serverConnections;
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final Set<SocketAddress> socketAddresses;
    private final Set<InetSocketAddress> socketAddresses;
    private final Options options;
    /**
@@ -72,7 +72,7 @@
     * @throws IOException
     *             If an error occurred while trying to listen on the provided address.
     */
    public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses, final Options options,
    public GrizzlyLDAPListener(final Set<InetSocketAddress> addresses, final Options options,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           LdapException> requestHandlerFactory) throws IOException {
@@ -95,7 +95,7 @@
     * @throws IOException
     *             If an error occurred while trying to listen on the provided address.
     */
    public GrizzlyLDAPListener(final Set<? extends SocketAddress> addresses,
    public GrizzlyLDAPListener(final Set<InetSocketAddress> addresses,
            final Function<LDAPClientContext,
                           ReactiveHandler<LDAPClientContext, LdapRawMessage, Stream<Response>>,
                           LdapException> requestHandlerFactory,
@@ -116,10 +116,10 @@
                .processor(ldapChain).build();
        this.serverConnections = new ArrayList<>(addresses.size());
        this.socketAddresses = new HashSet<>(addresses.size());
        for (final SocketAddress address : addresses) {
        for (final InetSocketAddress address : addresses) {
            final TCPNIOServerConnection bound = bindingHandler.bind(address, options.get(CONNECT_MAX_BACKLOG));
            serverConnections.add(bound);
            socketAddresses.add(bound.getLocalAddress());
            socketAddresses.add((InetSocketAddress) bound.getLocalAddress());
        }
    }
@@ -142,7 +142,7 @@
    }
    @Override
    public Set<? extends SocketAddress> getSocketAddresses() {
    public Set<InetSocketAddress> getSocketAddresses() {
        return socketAddresses;
    }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LDAPServerFilter.java
@@ -71,7 +71,6 @@
import com.forgerock.reactive.Action;
import com.forgerock.reactive.Completable;
import com.forgerock.reactive.Consumer;
import com.forgerock.reactive.ReactiveHandler;
import com.forgerock.reactive.Single;
import com.forgerock.reactive.Stream;
@@ -195,10 +194,12 @@
                            });
                }
            }, maxConcurrentRequests)
            .onErrorDo(new Consumer<Throwable>() {
            .onErrorResumeWith(new Function<Throwable, Publisher<Object>, Exception>() {
                @Override
                public void accept(final Throwable error) throws Exception {
                public Publisher<Object> apply(Throwable error) throws Exception {
                    clientContext.notifyErrorAndCloseSilently(error);
                    // Swallow the error to prevent the subscribe() below to report it on the console.
                    return streamFrom(DUMMY);
                }
            })
            .onCompleteDo(new Action() {
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/LdapCodec.java
@@ -119,8 +119,7 @@
                rawDn = null;
                protocolVersion = -1;
            }
            return LdapMessages.newRawMessage(messageType, messageId, protocolVersion, rawDn,
                    rawDn != null ? decodeOptions.getSchemaResolver().resolveSchema(rawDn) : null, reader);
            return LdapMessages.newRawMessage(messageType, messageId, protocolVersion, rawDn, reader);
        } finally {
            reader.reset();
        }
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/SaslFilter.java
@@ -30,10 +30,12 @@
final class SaslFilter extends BaseFilter {
    private static final int INT_SIZE = 4;
    @Override
    public NextAction handleRead(final FilterChainContext ctx) throws IOException {
        final Buffer message = ctx.getMessage();
        if (message.remaining() < 4) {
        if (message.remaining() < INT_SIZE) {
            return ctx.getStopAction(message);
        }
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/ConnectionFactoryTestCase.java
@@ -544,7 +544,7 @@
                    }
                });
        LDAPListener listener = new LDAPListener(findFreeSocketAddress(), mockServer);
        LDAPListener listener = new LDAPListener(new InetSocketAddress("127.0.0.1", 0), mockServer);
        try {
            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory(
                    ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getHostName(),
@@ -627,12 +627,12 @@
                    }
                });
        LDAPListener listener = new LDAPListener(findFreeSocketAddress(), mockServer);
        LDAPListener listener = new LDAPListener(new InetSocketAddress("127.0.0.1", 0), mockServer);
        try {
            LDAPConnectionFactory clientFactory =
                    new LDAPConnectionFactory(
                            ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getHostName(),
                            ((InetSocketAddress) listener.getSocketAddresses().iterator().next()).getPort());
                            listener.getSocketAddresses().iterator().next().getHostName(),
                            listener.getSocketAddresses().iterator().next().getPort());
            final Connection client = clientFactory.getConnection();
            connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
            try {
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/DefaultTCPNIOTransportTestCase.java
@@ -12,13 +12,13 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2010 Sun Microsystems, Inc.
 * Portions copyright 2012-2013 ForgeRock AS.
 * Portions copyright 2012-2016 ForgeRock AS.
 */
package org.forgerock.opendj.grizzly;
import static org.forgerock.opendj.grizzly.DefaultTCPNIOTransport.DEFAULT_TRANSPORT;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.forgerock.opendj.ldap.TestCaseUtils.loopbackWithDynamicPort;
import static org.testng.Assert.assertTrue;
import java.net.Socket;
@@ -46,10 +46,8 @@
    @Test(enabled = false)
    public void testGetInstance() throws Exception {
        // Create a transport.
        final ReferenceCountedObject<TCPNIOTransport>.Reference transport =
                DEFAULT_TRANSPORT.acquire();
        SocketAddress socketAddress = findFreeSocketAddress();
        transport.get().bind(socketAddress);
        final ReferenceCountedObject<TCPNIOTransport>.Reference transport = DEFAULT_TRANSPORT.acquire();
        SocketAddress socketAddress = transport.get().bind(loopbackWithDynamicPort()).getLocalAddress();
        // Establish a socket connection to see if the transport factory works.
        final Socket socket = new Socket();
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnectionFactoryTestCase.java
@@ -19,7 +19,7 @@
import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.CommonLDAPOptions.*;
import static org.forgerock.opendj.ldap.LDAPConnectionFactory.*;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.forgerock.opendj.ldap.TestCaseUtils.*;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import static org.forgerock.util.time.Duration.duration;
import static org.mockito.Matchers.*;
@@ -349,7 +349,7 @@
    private LDAPListener createServer() {
        try {
            return new LDAPListener(findFreeSocketAddress(),
            return new LDAPListener(loopbackWithDynamicPort(),
                    new ServerConnectionFactory<LDAPClientContext, Integer>() {
                        @Override
                        public ServerConnection<Integer> handleAccept(
opendj-grizzly/src/test/java/org/forgerock/opendj/grizzly/GrizzlyLDAPListenerTestCase.java
@@ -201,7 +201,7 @@
    public void testCreateLDAPListener() throws Exception {
        // test no exception is thrown, which means transport provider is
        // correctly loaded
        LDAPListener listener = new LDAPListener(findFreeSocketAddress(), mock(ServerConnectionFactory.class));
        LDAPListener listener = new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class));
        listener.close();
    }
@@ -212,8 +212,8 @@
        // test no exception is thrown, which means transport provider is correctly loaded
        Options options = defaultOptions().set(TRANSPORT_PROVIDER_CLASS_LOADER,
                                                       Thread.currentThread().getContextClassLoader());
        LDAPListener listener = new LDAPListener(findFreeSocketAddress(),
                mock(ServerConnectionFactory.class), options);
        LDAPListener listener =
                new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class), options);
        listener.close();
    }
@@ -223,7 +223,8 @@
        expectedExceptionsMessageRegExp = "^The requested provider 'unknown' .*")
    public void testCreateLDAPListenerFailureProviderNotFound() throws Exception {
        Options options = defaultOptions().set(TRANSPORT_PROVIDER, "unknown");
        LDAPListener listener = new LDAPListener(findFreeSocketAddress(), mock(ServerConnectionFactory.class), options);
        LDAPListener listener
            = new LDAPListener(loopbackWithDynamicPort(), mock(ServerConnectionFactory.class), options);
        listener.close();
    }
@@ -238,7 +239,7 @@
        final MockServerConnection serverConnection = new MockServerConnection();
        final MockServerConnectionFactory serverConnectionFactory =
                new MockServerConnectionFactory(serverConnection);
        final LDAPListener listener = new LDAPListener(new InetSocketAddress(0), serverConnectionFactory);
        final LDAPListener listener = new LDAPListener(loopbackWithDynamicPort(), serverConnectionFactory);
        final InetSocketAddress addr = (InetSocketAddress) listener.getSocketAddresses().iterator().next();
        try {
            // Connect and close.
@@ -267,9 +268,9 @@
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final InetSocketAddress onlineAddr = findFreeSocketAddress();
        final LDAPListener onlineServerListener =
                new LDAPListener(onlineAddr, onlineServerConnectionFactory);
                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
        final InetSocketAddress onlineAddr = onlineServerListener.getSocketAddresses().iterator().next();
        try {
            // Connection pool and load balancing tests.
@@ -311,9 +312,9 @@
                    };
            final InetSocketAddress proxyAddr = findFreeSocketAddress();
            final LDAPListener proxyListener =
                    new LDAPListener(proxyAddr, proxyServerConnectionFactory);
                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
            final InetSocketAddress proxyAddr = proxyListener.getSocketAddresses().iterator().next();
            try {
                // Connect and close.
                final Connection connection =
@@ -350,8 +351,7 @@
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener(new InetSocketAddress(0), onlineServerConnectionFactory);
        final InetSocketAddress onlineServerAddr =
                (InetSocketAddress) onlineServerListener.getSocketAddresses().iterator().next();
        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
        try {
            // Connection pool and load balancing tests.
@@ -444,9 +444,9 @@
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final InetSocketAddress onlineServerAddr = new InetSocketAddress(0);
        final LDAPListener onlineServerListener =
                new LDAPListener(onlineServerAddr, onlineServerConnectionFactory);
                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
        try {
            final MockServerConnection proxyServerConnection = new MockServerConnection();
@@ -493,8 +493,9 @@
                        }
                    };
            final LDAPListener proxyListener =
                    new LDAPListener(findFreeSocketAddress(), proxyServerConnectionFactory);
                    new LDAPListener(loopbackWithDynamicPort(), proxyServerConnectionFactory);
            try {
                // Connect and close.
                final Connection connection =
@@ -528,9 +529,9 @@
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final InetSocketAddress onlineServerAddr = findFreeSocketAddress();
        final LDAPListener onlineServerListener =
                new LDAPListener(onlineServerAddr, onlineServerConnectionFactory);
                new LDAPListener(loopbackWithDynamicPort(), onlineServerConnectionFactory);
        final InetSocketAddress onlineServerAddr = onlineServerListener.getSocketAddresses().iterator().next();
        try {
            final MockServerConnection proxyServerConnection = new MockServerConnection() {
@@ -610,7 +611,7 @@
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test
    @Test(expectedExceptions = LdapException.class)
    public void testMaxRequestSize() throws Exception {
        final MockServerConnection serverConnection = new MockServerConnection();
        final MockServerConnectionFactory factory =
@@ -654,6 +655,7 @@
                        .isTrue();
                assertThat(serverConnection.isClosed.getCount()).isEqualTo(1);
                assertThat(serverConnection.context.get().isClosed()).isTrue();
                throw e;
            }
        } finally {
            if (connection != null) {
@@ -674,8 +676,9 @@
        final MockServerConnection serverConnection = new MockServerConnection();
        final MockServerConnectionFactory factory =
                new MockServerConnectionFactory(serverConnection);
        final InetSocketAddress listenerAddr = findFreeSocketAddress();
        final LDAPListener listener = new LDAPListener(listenerAddr, factory);
        final LDAPListener listener =
                new LDAPListener(loopbackWithDynamicPort(), factory);
        final InetSocketAddress listenerAddr = listener.getSocketAddresses().iterator().next();
        final Connection connection;
        try {
opendj-ldap-toolkit/src/main/java/com/forgerock/opendj/ldap/tools/Utils.java
@@ -215,11 +215,6 @@
     * Sets default system property settings for the xxxrate performance tools.
     */
    static void setDefaultPerfToolProperties() {
        // Use SameThreadStrategy by default.
        if (System.getProperty("org.forgerock.opendj.transport.useWorkerThreads") == null) {
            System.setProperty("org.forgerock.opendj.transport.useWorkerThreads", "false");
        }
        /* Configure connections to be terminate immediately after closing (this
         prevents port exhaustion in xxxrate tools when
         connecting/disconnecting).*/
opendj-server-legacy/src/main/java/org/forgerock/opendj/reactive/LDAPClientConnection2.java
@@ -99,9 +99,9 @@
import com.forgerock.reactive.Stream;
import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableEmitter.BackpressureMode;
import io.reactivex.FlowableOnSubscribe;
/**
@@ -954,7 +954,7 @@
            public void subscribe(FlowableEmitter<Response> emitter) throws Exception {
                processLDAPMessage(queueingStrategy, LDAPReader.readMessage(message.getContent()), emitter);
            }
        }, BackpressureMode.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
        }, BackpressureStrategy.NONE).onBackpressureBuffer(64, null, BackpressureOverflowStrategy.ERROR)));
    }
    private boolean processLDAPMessage(final QueueingStrategy queueingStrategy, final LDAPMessage message,