mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
05.58.2012 48ac4129c6b93fff96360e71f722c0447153dc06
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -45,7 +45,6 @@
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.StartTLSExtendedRequest;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Result;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.EmptyCompletionHandler;
import org.glassfish.grizzly.filterchain.FilterChain;
@@ -53,144 +52,142 @@
import org.glassfish.grizzly.filterchain.TransportFilter;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.FutureResultTransformer;
import com.forgerock.opendj.util.RecursiveFutureResult;
import com.forgerock.opendj.util.AsynchronousFutureResult;
/**
 * LDAP connection factory implementation.
 */
public final class LDAPConnectionFactoryImpl implements ConnectionFactory {
    /**
     * Adapts a Grizzly connection completion handler to an LDAP connection
     * asynchronous future result.
     */
    @SuppressWarnings("rawtypes")
    private final class ConnectionCompletionHandler implements
    private final class CompletionHandlerAdapter implements
            CompletionHandler<org.glassfish.grizzly.Connection> {
        private final FutureResultTransformer<Result, Connection> startTLSFutureResult;
        private final RecursiveFutureResult<LDAPConnection, ExtendedResult> connectionFutureResult;
        private LDAPConnection connection;
        private ConnectionCompletionHandler(final ResultHandler<? super Connection> handler) {
            this.startTLSFutureResult = new FutureResultTransformer<Result, Connection>(handler) {
        private final AsynchronousFutureResult<? super Connection> future;
                @Override
                protected ErrorResultException transformErrorResult(
                        final ErrorResultException errorResult) {
                    // Ensure that the connection is closed.
                    try {
                        if (connection != null) {
                            connection.close();
                        }
                    } catch (final Exception e) {
                        // Ignore.
                    }
                    return errorResult;
                }
                @Override
                protected LDAPConnection transformResult(final Result result)
                        throws ErrorResultException {
                    return connection;
                }
            };
            this.connectionFutureResult =
                    new RecursiveFutureResult<LDAPConnection, ExtendedResult>(startTLSFutureResult) {
                        @Override
                        protected FutureResult<? extends ExtendedResult> chainResult(
                                final LDAPConnection innerResult,
                                final ResultHandler<? super ExtendedResult> handler)
                                throws ErrorResultException {
                            connection = innerResult;
                            if (options.getSSLContext() != null && options.useStartTLS()) {
                                // Chain StartTLS extended request.
                                final StartTLSExtendedRequest startTLS =
                                        Requests.newStartTLSExtendedRequest(options.getSSLContext());
                                startTLS.addEnabledCipherSuite(options
                                        .getEnabledCipherSuites()
                                        .toArray(
                                                new String[options.getEnabledCipherSuites().size()]));
                                startTLS.addEnabledProtocol(options.getEnabledProtocols().toArray(
                                        new String[options.getEnabledProtocols().size()]));
                                return connection.extendedRequestAsync(startTLS, null, handler);
                            } else if (options.getSSLContext() != null) {
                                // Install SSL/TLS layer.
                                try {
                                    connection.startTLS(options.getSSLContext(), options
                                            .getEnabledProtocols(), options
                                            .getEnabledCipherSuites(),
                                            new EmptyCompletionHandler<SSLEngine>() {
                                                @Override
                                                public void completed(final SSLEngine result) {
                                                    handler.handleResult(null);
                                                }
                                                @Override
                                                public void failed(final Throwable throwable) {
                                                    handler.handleErrorResult(newErrorResult(
                                                            ResultCode.CLIENT_SIDE_CONNECT_ERROR,
                                                            throwable.getMessage(), throwable));
                                                }
                                            });
                                    return null;
                                } catch (final IOException ioe) {
                                    throw newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR, ioe
                                            .getMessage(), ioe);
                                }
                            } else {
                                // Plain connection.
                                handler.handleResult(null);
                                return new CompletedFutureResult<ExtendedResult>(
                                        (ExtendedResult) null);
                            }
                        }
                    };
            startTLSFutureResult.setFutureResult(connectionFutureResult);
        private CompletionHandlerAdapter(final AsynchronousFutureResult<? super Connection> future) {
            this.future = future;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void cancelled() {
            // Ignore this.
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void completed(final org.glassfish.grizzly.Connection connection) {
            connectionFutureResult.handleResult(adaptConnection(connection));
        public void completed(final org.glassfish.grizzly.Connection result) {
            // Adapt the connection.
            final LDAPConnection connection = adaptConnection(result);
            // Plain connection.
            if (options.getSSLContext() == null) {
                onSuccess(connection);
                return;
            }
            // Start TLS or install SSL layer asynchronously.
            // Give up immediately if the future has been cancelled.
            if (future.isCancelled()) {
                connection.close();
                return;
            }
            if (options.useStartTLS()) {
                // Chain StartTLS extended request.
                final StartTLSExtendedRequest startTLS =
                        Requests.newStartTLSExtendedRequest(options.getSSLContext());
                startTLS.addEnabledCipherSuite(options.getEnabledCipherSuites().toArray(
                        new String[options.getEnabledCipherSuites().size()]));
                startTLS.addEnabledProtocol(options.getEnabledProtocols().toArray(
                        new String[options.getEnabledProtocols().size()]));
                final ResultHandler<ExtendedResult> handler = new ResultHandler<ExtendedResult>() {
                    public void handleErrorResult(final ErrorResultException error) {
                        onFailure(connection, error);
                    }
                    public void handleResult(final ExtendedResult result) {
                        onSuccess(connection);
                    }
                };
                connection.extendedRequestAsync(startTLS, null, handler);
            } else {
                // Install SSL/TLS layer.
                try {
                    connection.startTLS(options.getSSLContext(), options.getEnabledProtocols(),
                            options.getEnabledCipherSuites(),
                            new EmptyCompletionHandler<SSLEngine>() {
                                @Override
                                public void completed(final SSLEngine result) {
                                    onSuccess(connection);
                                }
                                @Override
                                public void failed(final Throwable throwable) {
                                    onFailure(connection, throwable);
                                }
                            });
                } catch (final IOException e) {
                    onFailure(connection, e);
                }
            }
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void failed(final Throwable throwable) {
            connectionFutureResult.handleErrorResult(adaptConnectionException(throwable));
            // Adapt and forward.
            future.handleErrorResult(adaptConnectionException(throwable));
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void updated(final org.glassfish.grizzly.Connection connection) {
        public void updated(final org.glassfish.grizzly.Connection result) {
            // Ignore this.
        }
    }
        private LDAPConnection adaptConnection(final org.glassfish.grizzly.Connection<?> connection) {
            // Test shows that its much faster with non block writes but risk
            // running out of memory if the server is slow.
            connection.configureBlocking(true);
            connection.setProcessor(defaultFilterChain);
            final LDAPConnection ldapConnection = new LDAPConnection(connection, options);
            clientFilter.registerConnection(connection, ldapConnection);
            return ldapConnection;
        }
        private ErrorResultException adaptConnectionException(Throwable t) {
            if (t instanceof ExecutionException) {
                t = t.getCause();
            }
            if (t instanceof ErrorResultException) {
                return (ErrorResultException) t;
            } else {
                return newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR, t.getMessage(), t);
            }
        }
        private void onFailure(final LDAPConnection connection, final Throwable t) {
            // Abort connection attempt due to error.
            connection.close();
            future.handleErrorResult(adaptConnectionException(t));
        }
        private void onSuccess(final LDAPConnection connection) {
            future.handleResult(connection);
            // Close the connection if the future was cancelled.
            if (future.isCancelled()) {
                connection.close();
            }
        }
    };
    private final LDAPClientFilter clientFilter;
    private final FilterChain defaultFilterChain;
    private final LDAPOptions options;
    private final SocketAddress socketAddress;
    private final TCPNIOTransport transport;
    private final FilterChain defaultFilterChain;
    private final LDAPClientFilter clientFilter;
    private final LDAPOptions options;
    /**
     * Creates a new LDAP connection factory implementation which can be used to
@@ -223,7 +220,7 @@
    public Connection getConnection() throws ErrorResultException {
        try {
            return getConnectionAsync(null).get();
        } catch (InterruptedException e) {
        } catch (final InterruptedException e) {
            throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
        }
    }
@@ -234,14 +231,11 @@
    @Override
    public FutureResult<Connection> getConnectionAsync(
            final ResultHandler<? super Connection> handler) {
        final ConnectionCompletionHandler ch = new ConnectionCompletionHandler(handler);
        try {
            ch.connectionFutureResult.setFutureResult(transport.connect(socketAddress, ch));
            return ch.startTLSFutureResult;
        } catch (final IOException e) {
            final ErrorResultException result = adaptConnectionException(e);
            return new CompletedFutureResult<Connection>(result);
        }
        final AsynchronousFutureResult<Connection> future =
                new AsynchronousFutureResult<Connection>(handler);
        final CompletionHandlerAdapter cha = new CompletionHandlerAdapter(future);
        transport.connect(socketAddress, cha);
        return future;
    }
    /**
@@ -264,23 +258,4 @@
        builder.append(')');
        return builder.toString();
    }
    private LDAPConnection adaptConnection(final org.glassfish.grizzly.Connection<?> connection) {
        // Test shows that its much faster with non block writes but risk
        // running out of memory if the server is slow.
        connection.configureBlocking(true);
        connection.setProcessor(defaultFilterChain);
        final LDAPConnection ldapConnection = new LDAPConnection(connection, options);
        clientFilter.registerConnection(connection, ldapConnection);
        return ldapConnection;
    }
    private ErrorResultException adaptConnectionException(Throwable t) {
        if (t instanceof ExecutionException) {
            t = t.getCause();
        }
        return newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR, t.getMessage(), t);
    }
}