mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Gaetan Boismal
15.50.2014 45141fb11ef698b11c6fb3becca82ca10e11505a
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnection.java
@@ -26,9 +26,6 @@
 */
package org.forgerock.opendj.grizzly;
import static com.forgerock.opendj.grizzly.GrizzlyMessages.*;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.List;
@@ -52,7 +49,6 @@
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPOptions;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.ResultHandler;
import org.forgerock.opendj.ldap.SSLContextBuilder;
import org.forgerock.opendj.ldap.SearchResultHandler;
import org.forgerock.opendj.ldap.TimeoutEventListener;
@@ -81,15 +77,17 @@
import org.forgerock.opendj.ldap.spi.LDAPExtendedFutureResultImpl;
import org.forgerock.opendj.ldap.spi.LDAPFutureResultImpl;
import org.forgerock.opendj.ldap.spi.LDAPSearchFutureResultImpl;
import org.forgerock.util.Reject;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.filterchain.Filter;
import org.glassfish.grizzly.filterchain.FilterChain;
import org.glassfish.grizzly.ssl.SSLEngineConfigurator;
import org.glassfish.grizzly.ssl.SSLFilter;
import com.forgerock.opendj.util.CompletedFutureResult;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.FutureResultWrapper.*;
import org.forgerock.util.Reject;
import static com.forgerock.opendj.grizzly.GrizzlyMessages.*;
/**
 * LDAP connection implementation.
@@ -120,12 +118,12 @@
    private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
            new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
    private final Object stateLock = new Object();
    // Guarded by stateLock
    /** Guarded by stateLock. */
    private Result connectionInvalidReason;
    private boolean failedDueToDisconnect = false;
    private boolean isClosed = false;
    private boolean isFailed = false;
    private List<ConnectionEventListener> listeners = null;
    private boolean failedDueToDisconnect;
    private boolean isClosed;
    private boolean isFailed;
    private List<ConnectionEventListener> listeners;
    /**
     * Create a LDAP Connection with provided Grizzly connection and LDAP
@@ -162,19 +160,18 @@
                checkBindOrStartTLSInProgress();
            }
        } catch (final ErrorResultException e) {
            return new CompletedFutureResult<Void>(e);
            return newFailedFutureResult(e);
        }
        // Remove the future associated with the request to be abandoned.
        final AbstractLDAPFutureResultImpl<?> pendingRequest =
                pendingRequests.remove(request.getRequestID());
        final AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests.remove(request.getRequestID());
        if (pendingRequest == null) {
            /*
             * There has never been a request with the specified message ID or
             * the response has already been received and handled. We can ignore
             * this abandon request.
             */
            return new CompletedFutureResult<Void>((Void) null);
            return newSuccessfulFutureResult((Void) null);
        }
        /*
@@ -197,9 +194,9 @@
            final int messageID = nextMsgID.getAndIncrement();
            writer.writeAbandonRequest(messageID, request);
            connection.write(writer.getASN1Writer().getBuffer(), null);
            return new CompletedFutureResult<Void>((Void) null, messageID);
            return newSuccessfulFutureResult((Void) null, messageID);
        } catch (final IOException e) {
            return new CompletedFutureResult<Void>(adaptRequestIOException(e));
            return newFailedFutureResult(adaptRequestIOException(e));
        } finally {
            GrizzlyUtils.recycleWriter(writer);
        }
@@ -207,12 +204,10 @@
    @Override
    public FutureResult<Result> addAsync(final AddRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
                new LDAPFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
@@ -264,42 +259,35 @@
    @Override
    public FutureResult<BindResult> bindAsync(final BindRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super BindResult> resultHandler) {
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final BindClient context;
        try {
            context =
                    request.createBindClient(Connections.getHostString(factory.getSocketAddress()));
            context = request.createBindClient(Connections.getHostString(factory.getSocketAddress()));
        } catch (final Exception e) {
            // FIXME: I18N need to have a better error message.
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setDiagnosticMessage(
                            "An error occurred while creating a bind context").setCause(e);
            final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR)
                    .setDiagnosticMessage("An error occurred while creating a bind context").setCause(e);
            final ErrorResultException error = ErrorResultException.newErrorResult(errorResult);
            if (resultHandler != null) {
                resultHandler.handleErrorResult(error);
            }
            return new CompletedFutureResult<BindResult>(error, messageID);
            return newFailedFutureResult(error, messageID);
        }
        final LDAPBindFutureResultImpl future =
                new LDAPBindFutureResultImpl(messageID, context, resultHandler,
                        intermediateResponseHandler, this);
                new LDAPBindFutureResultImpl(messageID, context, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                if (!pendingRequests.isEmpty()) {
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                            .setDiagnosticMessage(
                                    "There are other operations pending on this connection"));
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
                            "There are other operations pending on this connection"));
                    return future;
                }
                if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                            .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
                            "Bind or Start TLS operation in progress"));
                    return future;
                }
                pendingRequests.put(messageID, future);
@@ -338,12 +326,10 @@
    @Override
    public FutureResult<CompareResult> compareAsync(final CompareRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super CompareResult> resultHandler) {
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPCompareFutureResultImpl future =
                new LDAPCompareFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
                new LDAPCompareFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
@@ -370,12 +356,10 @@
    @Override
    public FutureResult<Result> deleteAsync(final DeleteRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
                new LDAPFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
@@ -401,32 +385,26 @@
    }
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
            final ExtendedRequest<R> request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super R> resultHandler) {
    public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(final ExtendedRequest<R> request,
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPExtendedFutureResultImpl<R> future =
                new LDAPExtendedFutureResultImpl<R>(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
                new LDAPExtendedFutureResultImpl<R>(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
                if (StartTLSExtendedRequest.OID.equals(request.getOID())) {
                    if (!pendingRequests.isEmpty()) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "There are pending operations on this connection"));
                                ResultCode.OPERATIONS_ERROR, "", "There are pending operations on this connection"));
                        return future;
                    } else if (isTLSEnabled()) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "This connection is already TLS enabled"));
                                ResultCode.OPERATIONS_ERROR, "", "This connection is already TLS enabled"));
                        return future;
                    } else if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "Bind or Start TLS operation in progress"));
                                ResultCode.OPERATIONS_ERROR, "", "Bind or Start TLS operation in progress"));
                        return future;
                    }
                } else {
@@ -469,12 +447,10 @@
    @Override
    public FutureResult<Result> modifyAsync(final ModifyRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
                new LDAPFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
@@ -501,12 +477,10 @@
    @Override
    public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
                new LDAPFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
@@ -541,14 +515,13 @@
        }
    }
    /** {@inheritDoc} */
    @Override
    public FutureResult<Result> searchAsync(final SearchRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final SearchResultHandler resultHandler) {
        final IntermediateResponseHandler intermediateResponseHandler, final SearchResultHandler entryHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPSearchFutureResultImpl future =
                new LDAPSearchFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
            new LDAPSearchFutureResultImpl(messageID, request, entryHandler, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
@@ -614,23 +587,18 @@
                 */
                logger.debug(LocalizableMessage.raw("Failing bind or StartTLS request due to timeout %s"
                        + "(connection will be invalidated): ", future));
                final Result result =
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT.get(timeout)
                                        .toString());
                final Result result = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                        LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT.get(timeout).toString());
                future.adaptErrorResult(result);
                // Fail the connection.
                final Result errorResult =
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT.get(timeout)
                                        .toString());
                final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                        LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT.get(timeout).toString());
                connectionErrorOccurred(errorResult);
            } else {
                logger.debug(LocalizableMessage.raw("Failing request due to timeout: %s", future));
                final Result result =
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout).toString());
                final Result result = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                        LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout).toString());
                future.adaptErrorResult(result);
                /*
@@ -640,9 +608,9 @@
                 * request while holding the state lock, since a blocking write
                 * could hang the application.
                 */
//              if (!bindOrStartTLSInProgress.get()) {
//                  sendAbandonRequest(newAbandonRequest(future.getRequestID()));
//              }
                // if (!bindOrStartTLSInProgress.get()) {
                // sendAbandonRequest(newAbandonRequest(future.getRequestID()));
                // }
            }
        }
        return delay;
@@ -813,22 +781,20 @@
        bindOrStartTLSInProgress.set(state);
    }
    void startTLS(final SSLContext sslContext, final List<String> protocols,
            final List<String> cipherSuites, final CompletionHandler<SSLEngine> completionHandler)
            throws IOException {
    void startTLS(final SSLContext sslContext, final List<String> protocols, final List<String> cipherSuites,
            final CompletionHandler<SSLEngine> completionHandler) throws IOException {
        synchronized (stateLock) {
            if (isTLSEnabled()) {
                throw new IllegalStateException("TLS already enabled");
            }
            final SSLEngineConfigurator sslEngineConfigurator =
                    new SSLEngineConfigurator(sslContext, true, false, false);
            final SSLEngineConfigurator sslEngineConfigurator = new SSLEngineConfigurator(sslContext, true, false,
                    false);
            sslEngineConfigurator.setEnabledProtocols(protocols.isEmpty() ? null : protocols
                    .toArray(new String[protocols.size()]));
            sslEngineConfigurator.setEnabledCipherSuites(cipherSuites.isEmpty() ? null
                    : cipherSuites.toArray(new String[cipherSuites.size()]));
            final SSLFilter sslFilter =
                    new SSLFilter(DUMMY_SSL_ENGINE_CONFIGURATOR, sslEngineConfigurator);
            sslEngineConfigurator.setEnabledCipherSuites(cipherSuites.isEmpty() ? null : cipherSuites
                    .toArray(new String[cipherSuites.size()]));
            final SSLFilter sslFilter = new SSLFilter(DUMMY_SSL_ENGINE_CONFIGURATOR, sslEngineConfigurator);
            installFilter(sslFilter);
            sslFilter.handshake(connection, completionHandler);
        }
@@ -837,16 +803,14 @@
    private ErrorResultException adaptRequestIOException(final IOException e) {
        // FIXME: what other sort of IOExceptions can be thrown?
        // FIXME: Is this the best result code?
        final Result errorResult =
                Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
        final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
        connectionErrorOccurred(errorResult);
        return newErrorResult(errorResult);
    }
    private void checkBindOrStartTLSInProgress() throws ErrorResultException {
        if (bindOrStartTLSInProgress.get()) {
            throw newErrorResult(ResultCode.OPERATIONS_ERROR,
                    "Bind or Start TLS operation in progress");
            throw newErrorResult(ResultCode.OPERATIONS_ERROR, "Bind or Start TLS operation in progress");
        }
    }
@@ -861,8 +825,7 @@
                 * this could be misinterpreted as a genuine authentication
                 * failure for subsequent bind requests.
                 */
                throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN,
                        "Connection closed by server");
                throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, "Connection closed by server");
            } else {
                throw newErrorResult(connectionInvalidReason);
            }