mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Gaetan Boismal
29.04.2014 600f6cb4356f2355a004604b9353505f0c2f7f49
opendj-grizzly/src/main/java/org/forgerock/opendj/grizzly/GrizzlyLDAPConnection.java
@@ -44,10 +44,10 @@
import org.forgerock.opendj.ldap.AbstractAsynchronousConnection;
import org.forgerock.opendj.ldap.ConnectionEventListener;
import org.forgerock.opendj.ldap.Connections;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.FutureResult;
import org.forgerock.opendj.ldap.IntermediateResponseHandler;
import org.forgerock.opendj.ldap.LDAPOptions;
import org.forgerock.opendj.ldap.LdapException;
import org.forgerock.opendj.ldap.LdapPromise;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SSLContextBuilder;
import org.forgerock.opendj.ldap.SearchResultHandler;
@@ -71,12 +71,10 @@
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.spi.AbstractLDAPFutureResultImpl;
import org.forgerock.opendj.ldap.spi.LDAPBindFutureResultImpl;
import org.forgerock.opendj.ldap.spi.LDAPCompareFutureResultImpl;
import org.forgerock.opendj.ldap.spi.LDAPExtendedFutureResultImpl;
import org.forgerock.opendj.ldap.spi.LDAPFutureResultImpl;
import org.forgerock.opendj.ldap.spi.LDAPSearchFutureResultImpl;
import org.forgerock.opendj.ldap.spi.BindResultLdapPromiseImpl;
import org.forgerock.opendj.ldap.spi.ExtendedResultLdapPromiseImpl;
import org.forgerock.opendj.ldap.spi.ResultLdapPromiseImpl;
import org.forgerock.opendj.ldap.spi.SearchResultLdapPromiseImpl;
import org.forgerock.util.Reject;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.filterchain.Filter;
@@ -84,8 +82,8 @@
import org.glassfish.grizzly.ssl.SSLEngineConfigurator;
import org.glassfish.grizzly.ssl.SSLFilter;
import static org.forgerock.opendj.ldap.LdapException.newErrorResult;
import static org.forgerock.opendj.ldap.FutureResultWrapper.*;
import static org.forgerock.opendj.ldap.LdapException.*;
import static org.forgerock.opendj.ldap.spi.LdapPromises.*;
import static com.forgerock.opendj.grizzly.GrizzlyMessages.*;
@@ -115,8 +113,8 @@
    private final org.glassfish.grizzly.Connection<?> connection;
    private final AtomicInteger nextMsgID = new AtomicInteger(1);
    private final GrizzlyLDAPConnectionFactory factory;
    private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
            new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
    private final ConcurrentHashMap<Integer, ResultLdapPromiseImpl<?, ?>> pendingRequests =
            new ConcurrentHashMap<Integer, ResultLdapPromiseImpl<?, ?>>();
    private final Object stateLock = new Object();
    /** Guarded by stateLock. */
    private Result connectionInvalidReason;
@@ -141,12 +139,12 @@
    }
    @Override
    public FutureResult<Void> abandonAsync(final AbandonRequest request) {
    public LdapPromise<Void> abandonAsync(final AbandonRequest request) {
        /*
         * Need to be careful here since both abandonAsync and Future.cancel can
         * Need to be careful here since both abandonAsync and Promise.cancel can
         * be called separately by the client application. Therefore
         * future.cancel() should abandon the request, and abandonAsync should
         * cancel the future. In addition, bind or StartTLS requests cannot be
         * promise.cancel() should abandon the request, and abandonAsync should
         * cancel the promise. In addition, bind or StartTLS requests cannot be
         * abandoned.
         */
        try {
@@ -160,22 +158,22 @@
                checkBindOrStartTLSInProgress();
            }
        } catch (final LdapException e) {
            return newFailedFutureResult(e);
            return newFailedLdapPromise(e);
        }
        // Remove the future associated with the request to be abandoned.
        final AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests.remove(request.getRequestID());
        // Remove the promise associated with the request to be abandoned.
        final ResultLdapPromiseImpl<?, ?> pendingRequest = pendingRequests.remove(request.getRequestID());
        if (pendingRequest == null) {
            /*
             * There has never been a request with the specified message ID or
             * the response has already been received and handled. We can ignore
             * this abandon request.
             */
            return newSuccessfulFutureResult((Void) null);
            return newSuccessfulLdapPromise((Void) null);
        }
        /*
         * This will cancel the future, but will also recursively invoke this
         * This will cancel the promise, but will also recursively invoke this
         * method. Since the pending request has been removed, there is no risk
         * of an infinite loop.
         */
@@ -188,31 +186,31 @@
        return sendAbandonRequest(request);
    }
    private FutureResult<Void> sendAbandonRequest(final AbandonRequest request) {
    private LdapPromise<Void> sendAbandonRequest(final AbandonRequest request) {
        final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
        try {
            final int messageID = nextMsgID.getAndIncrement();
            writer.writeAbandonRequest(messageID, request);
            connection.write(writer.getASN1Writer().getBuffer(), null);
            return newSuccessfulFutureResult((Void) null, messageID);
            return newSuccessfulLdapPromise((Void) null, messageID);
        } catch (final IOException e) {
            return newFailedFutureResult(adaptRequestIOException(e));
            return newFailedLdapPromise(adaptRequestIOException(e));
        } finally {
            GrizzlyUtils.recycleWriter(writer);
        }
    }
    @Override
    public FutureResult<Result> addAsync(final AddRequest request,
    public LdapPromise<Result> addAsync(final AddRequest request,
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        final ResultLdapPromiseImpl<AddRequest, Result> promise =
                newResultLdapPromise(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
@@ -227,9 +225,9 @@
                throw adaptRequestIOException(e);
            }
        } catch (final LdapException e) {
            future.adaptErrorResult(e.getResult());
            promise.adaptErrorResult(e.getResult());
        }
        return future;
        return promise;
    }
    @Override
@@ -250,7 +248,7 @@
        if (notifyErrorOccurred) {
            // Use the reason provided in the disconnect notification.
            listener.handleConnectionError(failedDueToDisconnect,
                    newErrorResult(connectionInvalidReason));
                    newLdapException(connectionInvalidReason));
        }
        if (notifyClose) {
            listener.handleConnectionClosed();
@@ -258,39 +256,33 @@
    }
    @Override
    public FutureResult<BindResult> bindAsync(final BindRequest request,
    public LdapPromise<BindResult> bindAsync(final BindRequest request,
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final BindClient context;
        try {
            context = request.createBindClient(Connections.getHostString(factory.getSocketAddress()));
        } catch (final Exception e) {
            // FIXME: I18N need to have a better error message.
            // FIXME: Is this the best result code?
            final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR)
                    .setDiagnosticMessage("An error occurred while creating a bind context").setCause(e);
            final LdapException error = LdapException.newErrorResult(errorResult);
            return newFailedFutureResult(error, messageID);
        } catch (final LdapException e) {
            return newFailedLdapPromise(e, messageID);
        }
        final LDAPBindFutureResultImpl future =
                new LDAPBindFutureResultImpl(messageID, context, intermediateResponseHandler, this);
        final BindResultLdapPromiseImpl promise =
                newBindLdapPromise(messageID, request, context, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                if (!pendingRequests.isEmpty()) {
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
                    promise.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
                            "There are other operations pending on this connection"));
                    return future;
                    return promise;
                }
                if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
                    promise.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
                            "Bind or Start TLS operation in progress"));
                    return future;
                    return promise;
                }
                pendingRequests.put(messageID, future);
                pendingRequests.put(messageID, promise);
            }
            try {
@@ -310,10 +302,10 @@
                throw adaptRequestIOException(e);
            }
        } catch (final LdapException e) {
            future.adaptErrorResult(e.getResult());
            promise.adaptErrorResult(e.getResult());
        }
        return future;
        return promise;
    }
    @Override
@@ -325,16 +317,16 @@
    }
    @Override
    public FutureResult<CompareResult> compareAsync(final CompareRequest request,
    public LdapPromise<CompareResult> compareAsync(final CompareRequest request,
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPCompareFutureResultImpl future =
                new LDAPCompareFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        final ResultLdapPromiseImpl<CompareRequest, CompareResult> promise =
                newCompareLdapPromise(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
@@ -349,22 +341,22 @@
                throw adaptRequestIOException(e);
            }
        } catch (final LdapException e) {
            future.adaptErrorResult(e.getResult());
            promise.adaptErrorResult(e.getResult());
        }
        return future;
        return promise;
    }
    @Override
    public FutureResult<Result> deleteAsync(final DeleteRequest request,
    public LdapPromise<Result> deleteAsync(final DeleteRequest request,
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        final ResultLdapPromiseImpl<DeleteRequest, Result> promise =
                newResultLdapPromise(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
@@ -379,38 +371,38 @@
                throw adaptRequestIOException(e);
            }
        } catch (final LdapException e) {
            future.adaptErrorResult(e.getResult());
            promise.adaptErrorResult(e.getResult());
        }
        return future;
        return promise;
    }
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(final ExtendedRequest<R> request,
    public <R extends ExtendedResult> LdapPromise<R> extendedRequestAsync(final ExtendedRequest<R> request,
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPExtendedFutureResultImpl<R> future =
                new LDAPExtendedFutureResultImpl<R>(messageID, request, intermediateResponseHandler, this);
        final ExtendedResultLdapPromiseImpl<R> promise =
                newExtendedLdapPromise(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                if (StartTLSExtendedRequest.OID.equals(request.getOID())) {
                    if (!pendingRequests.isEmpty()) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                        promise.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "", "There are pending operations on this connection"));
                        return future;
                        return promise;
                    } else if (isTLSEnabled()) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                        promise.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "", "This connection is already TLS enabled"));
                        return future;
                        return promise;
                    } else if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                        promise.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "", "Bind or Start TLS operation in progress"));
                        return future;
                        return promise;
                    }
                } else {
                    checkBindOrStartTLSInProgress();
                }
                pendingRequests.put(messageID, future);
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
@@ -426,9 +418,9 @@
                throw adaptRequestIOException(e);
            }
        } catch (final LdapException e) {
            future.adaptErrorResult(e.getResult());
            promise.adaptErrorResult(e.getResult());
        }
        return future;
        return promise;
    }
    @Override
@@ -446,16 +438,16 @@
    }
    @Override
    public FutureResult<Result> modifyAsync(final ModifyRequest request,
    public LdapPromise<Result> modifyAsync(final ModifyRequest request,
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        final ResultLdapPromiseImpl<ModifyRequest, Result> promise =
                newResultLdapPromise(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
@@ -470,22 +462,22 @@
                throw adaptRequestIOException(e);
            }
        } catch (final LdapException e) {
            future.adaptErrorResult(e.getResult());
            promise.adaptErrorResult(e.getResult());
        }
        return future;
        return promise;
    }
    @Override
    public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
    public LdapPromise<Result> modifyDNAsync(final ModifyDNRequest request,
            final IntermediateResponseHandler intermediateResponseHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, intermediateResponseHandler, this);
        final ResultLdapPromiseImpl<ModifyDNRequest, Result> promise =
                newResultLdapPromise(messageID, request, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
@@ -500,9 +492,9 @@
                throw adaptRequestIOException(e);
            }
        } catch (final LdapException e) {
            future.adaptErrorResult(e.getResult());
            promise.adaptErrorResult(e.getResult());
        }
        return future;
        return promise;
    }
    @Override
@@ -517,16 +509,16 @@
    /** {@inheritDoc} */
    @Override
    public FutureResult<Result> searchAsync(final SearchRequest request,
    public LdapPromise<Result> searchAsync(final SearchRequest request,
        final IntermediateResponseHandler intermediateResponseHandler, final SearchResultHandler entryHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        final LDAPSearchFutureResultImpl future =
            new LDAPSearchFutureResultImpl(messageID, request, entryHandler, intermediateResponseHandler, this);
        final SearchResultLdapPromiseImpl promise =
                newSearchLdapPromise(messageID, request, entryHandler, intermediateResponseHandler, this);
        try {
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
                pendingRequests.put(messageID, promise);
            }
            try {
                final LDAPWriter<ASN1BufferWriter> writer = GrizzlyUtils.getWriter();
@@ -541,9 +533,9 @@
                throw adaptRequestIOException(e);
            }
        } catch (final LdapException e) {
            future.adaptErrorResult(e.getResult());
            promise.adaptErrorResult(e.getResult());
        }
        return future;
        return promise;
    }
    @Override
@@ -565,18 +557,18 @@
        }
        long delay = timeout;
        for (final AbstractLDAPFutureResultImpl<?> future : pendingRequests.values()) {
            if (future == null || !future.checkForTimeout()) {
        for (final ResultLdapPromiseImpl<?, ?> promise : pendingRequests.values()) {
            if (promise == null || !promise.checkForTimeout()) {
                continue;
            }
            final long diff = (future.getTimestamp() + timeout) - currentTime;
            final long diff = (promise.getTimestamp() + timeout) - currentTime;
            if (diff > 0) {
                // Will expire in diff milliseconds.
                delay = Math.min(delay, diff);
            } else if (pendingRequests.remove(future.getRequestID()) == null) {
            } else if (pendingRequests.remove(promise.getRequestID()) == null) {
                // Result arrived at the same time.
                continue;
            } else if (future.isBindOrStartTLS()) {
            } else if (promise.isBindOrStartTLS()) {
                /*
                 * No other operations can be performed while a bind or StartTLS
                 * request is active, so we cannot time out the request. We
@@ -586,20 +578,20 @@
                 * ignoring timeouts could cause the application to hang.
                 */
                logger.debug(LocalizableMessage.raw("Failing bind or StartTLS request due to timeout %s"
                        + "(connection will be invalidated): ", future));
                        + "(connection will be invalidated): ", promise));
                final Result result = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                        LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT.get(timeout).toString());
                future.adaptErrorResult(result);
                promise.adaptErrorResult(result);
                // Fail the connection.
                final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                        LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT.get(timeout).toString());
                connectionErrorOccurred(errorResult);
            } else {
                logger.debug(LocalizableMessage.raw("Failing request due to timeout: %s", future));
                logger.debug(LocalizableMessage.raw("Failing request due to timeout: %s", promise));
                final Result result = Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                        LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout).toString());
                future.adaptErrorResult(result);
                promise.adaptErrorResult(result);
                /*
                 * FIXME: there's a potential race condition here if a bind or
@@ -609,7 +601,7 @@
                 * could hang the application.
                 */
                // if (!bindOrStartTLSInProgress.get()) {
                // sendAbandonRequest(newAbandonRequest(future.getRequestID()));
                // sendAbandonRequest(newAbandonRequest(promise.getRequestID()));
                // }
            }
        }
@@ -668,9 +660,9 @@
        // First abort all outstanding requests.
        for (final int requestID : pendingRequests.keySet()) {
            final AbstractLDAPFutureResultImpl<?> future = pendingRequests.remove(requestID);
            if (future != null) {
                future.adaptErrorResult(connectionInvalidReason);
            final ResultLdapPromiseImpl<?, ?> promise = pendingRequests.remove(requestID);
            if (promise != null) {
                promise.adaptErrorResult(connectionInvalidReason);
            }
        }
@@ -701,7 +693,7 @@
            if (notifyErrorOccurred) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    // Use the reason provided in the disconnect notification.
                    listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
                    listener.handleConnectionError(isDisconnectNotification, newLdapException(reason));
                }
            }
            if (notifyClose) {
@@ -712,11 +704,11 @@
        }
    }
    int continuePendingBindRequest(final LDAPBindFutureResultImpl future) throws LdapException {
    int continuePendingBindRequest(final BindResultLdapPromiseImpl promise) throws LdapException {
        final int newMsgID = nextMsgID.getAndIncrement();
        synchronized (stateLock) {
            checkConnectionIsValid();
            pendingRequests.put(newMsgID, future);
            pendingRequests.put(newMsgID, promise);
        }
        return newMsgID;
    }
@@ -725,7 +717,7 @@
        return factory.getLDAPOptions();
    }
    AbstractLDAPFutureResultImpl<?> getPendingRequest(final Integer messageID) {
    ResultLdapPromiseImpl<?, ?> getPendingRequest(final Integer messageID) {
        return pendingRequests.get(messageID);
    }
@@ -772,7 +764,7 @@
        }
    }
    AbstractLDAPFutureResultImpl<?> removePendingRequest(final Integer messageID) {
    ResultLdapPromiseImpl<?, ?> removePendingRequest(final Integer messageID) {
        return pendingRequests.remove(messageID);
    }
@@ -804,12 +796,12 @@
        // FIXME: Is this the best result code?
        final Result errorResult = Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
        connectionErrorOccurred(errorResult);
        return newErrorResult(errorResult);
        return newLdapException(errorResult);
    }
    private void checkBindOrStartTLSInProgress() throws LdapException {
        if (bindOrStartTLSInProgress.get()) {
            throw newErrorResult(ResultCode.OPERATIONS_ERROR, "Bind or Start TLS operation in progress");
            throw newLdapException(ResultCode.OPERATIONS_ERROR, "Bind or Start TLS operation in progress");
        }
    }
@@ -824,9 +816,9 @@
                 * this could be misinterpreted as a genuine authentication
                 * failure for subsequent bind requests.
                 */
                throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN, "Connection closed by server");
                throw newLdapException(ResultCode.CLIENT_SIDE_SERVER_DOWN, "Connection closed by server");
            } else {
                throw newErrorResult(connectionInvalidReason);
                throw newLdapException(connectionInvalidReason);
            }
        }
    }