mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
12.46.2012 7983705a0ce4de2fd6e7a4061fe4afa0f9e8c66a
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -83,22 +83,22 @@
/**
 * LDAP connection implementation.
 * <p>
 * TODO: handle illegal state exceptions.
 */
final class LDAPConnection extends AbstractAsynchronousConnection implements Connection {
    private final org.glassfish.grizzly.Connection<?> connection;
    private Result connectionInvalidReason;
    private boolean isClosed = false;
    private final List<ConnectionEventListener> listeners =
            new CopyOnWriteArrayList<ConnectionEventListener>();
    private final AtomicInteger nextMsgID = new AtomicInteger(1);
    private final AtomicBoolean bindOrStartTLSInProgress = new AtomicBoolean(false);
    private final org.glassfish.grizzly.Connection<?> connection;
    private final LDAPWriter ldapWriter = new LDAPWriter();
    private final AtomicInteger nextMsgID = new AtomicInteger(1);
    private final LDAPOptions options;
    private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
            new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
    private final Object stateLock = new Object();
    private final LDAPWriter ldapWriter = new LDAPWriter();
    private final LDAPOptions options;
    // Guarded by stateLock
    private Result connectionInvalidReason;
    private boolean failedDueToDisconnect = false;
    private boolean isClosed = false;
    private boolean isFailed = false;
    private List<ConnectionEventListener> listeners = null;
    /**
     * Creates a new LDAP connection.
@@ -116,60 +116,47 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Void> abandonAsync(final AbandonRequest request) {
        final AbstractLDAPFutureResultImpl<?> pendingRequest;
        final int messageID = nextMsgID.getAndIncrement();
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                return new CompletedFutureResult<Void>(newErrorResult(connectionInvalidReason),
                        messageID);
            }
            if (bindOrStartTLSInProgress.get()) {
                final Result errorResult =
                        Responses.newResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
                                "Bind or Start TLS operation in progress");
                return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
            }
            // First remove the future associated with the request to be
            // abandoned.
            pendingRequest = pendingRequests.remove(request.getRequestID());
        }
        if (pendingRequest == null) {
            // There has never been a request with the specified message ID or
            // the response has already been received and handled. We can ignore
            // this abandon request.
            // Message ID will be -1 since no request was sent.
            return new CompletedFutureResult<Void>((Void) null);
        }
        pendingRequest.cancel(false);
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.abandonRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
                return new CompletedFutureResult<Void>((Void) null, messageID);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                // Remove the future associated with the request to be abandoned.
                pendingRequest = pendingRequests.remove(request.getRequestID());
            }
        } catch (final IOException e) {
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
            if (pendingRequest == null) {
                // There has never been a request with the specified message ID or
                // the response has already been received and handled. We can ignore
                // this abandon request.
                // Message ID will be -1 since no request was sent.
                return new CompletedFutureResult<Void>((Void) null);
            }
            pendingRequest.cancel(false);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.abandonRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                    return new CompletedFutureResult<Void>((Void) null, messageID);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            return new CompletedFutureResult<Void>(e, messageID);
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> addAsync(final AddRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
@@ -177,59 +164,67 @@
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.addRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.addRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void addConnectionEventListener(final ConnectionEventListener listener) {
        Validator.ensureNotNull(listener);
        listeners.add(listener);
        final boolean notifyClose;
        final boolean notifyErrorOccurred;
        synchronized (stateLock) {
            notifyClose = isClosed;
            notifyErrorOccurred = isFailed;
            if (!isClosed) {
                if (listeners == null) {
                    listeners = new LinkedList<ConnectionEventListener>();
                }
                listeners.add(listener);
            }
        }
        if (notifyErrorOccurred) {
            // Use the reason provided in the disconnect notification.
            listener.handleConnectionError(failedDueToDisconnect,
                    newErrorResult(connectionInvalidReason));
        }
        if (notifyClose) {
            listener.handleConnectionClosed();
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bindAsync(final BindRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super BindResult> resultHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        BindClient context;
        final BindClient context;
        try {
            context =
                    request.createBindClient(
@@ -253,49 +248,41 @@
                new LDAPBindFutureResultImpl(messageID, context, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (!pendingRequests.isEmpty()) {
                future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage(
                                "There are other operations pending on this connection"));
                return future;
            }
            if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                // Use the bind client to get the initial request instead of
                // using the bind request passed to this method.
                final GenericBindRequest initialRequest = context.nextBindRequest();
                ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                if (!pendingRequests.isEmpty()) {
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                            .setDiagnosticMessage(
                                    "There are other operations pending on this connection"));
                    return future;
                }
                if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                            .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                    return future;
                }
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            bindOrStartTLSInProgress.set(false);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    // Use the bind client to get the initial request instead of
                    // using the bind request passed to this method.
                    final GenericBindRequest initialRequest = context.nextBindRequest();
                    ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                bindOrStartTLSInProgress.set(false);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
@@ -304,6 +291,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void close(final UnbindRequest request, final String reason) {
        // FIXME: I18N need to internationalize this message.
        Validator.ensureNotNull(request);
@@ -316,6 +304,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compareAsync(final CompareRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super CompareResult> resultHandler) {
@@ -323,45 +312,34 @@
        final LDAPCompareFutureResultImpl future =
                new LDAPCompareFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newCompareResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.compareRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.compareRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> deleteAsync(final DeleteRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
@@ -369,45 +347,34 @@
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.deleteRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.deleteRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
            final ExtendedRequest<R> request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -416,68 +383,54 @@
        final LDAPExtendedFutureResultImpl<R> future =
                new LDAPExtendedFutureResultImpl<R>(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
                if (!pendingRequests.isEmpty()) {
                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                            ResultCode.OPERATIONS_ERROR, "",
                            "There are pending operations on this connection"));
                    return future;
                }
                if (isTLSEnabled()) {
                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                            ResultCode.OPERATIONS_ERROR, "",
                            "This connection is already TLS enabled"));
                    return future;
                }
                if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                            ResultCode.OPERATIONS_ERROR, "",
                            "Bind or Start TLS operation in progress"));
                    return future;
                }
            } else {
                if (bindOrStartTLSInProgress.get()) {
                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                            ResultCode.OPERATIONS_ERROR, "",
                            "Bind or Start TLS operation in progress"));
                    return future;
                }
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.extendedRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
                    if (!pendingRequests.isEmpty()) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "There are pending operations on this connection"));
                        return future;
                    } else if (isTLSEnabled()) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "This connection is already TLS enabled"));
                        return future;
                    } else if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "Bind or Start TLS operation in progress"));
                        return future;
                    }
                } else {
                    checkBindOrStartTLSInProgress();
                }
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            bindOrStartTLSInProgress.set(false);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.extendedRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                bindOrStartTLSInProgress.set(false);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isClosed() {
        synchronized (stateLock) {
            return isClosed;
@@ -487,15 +440,17 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid() {
        synchronized (stateLock) {
            return connectionInvalidReason == null && !isClosed;
            return isValid0();
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyAsync(final ModifyRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
@@ -503,45 +458,34 @@
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.modifyRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.modifyRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
@@ -549,53 +493,47 @@
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void removeConnectionEventListener(final ConnectionEventListener listener) {
        Validator.ensureNotNull(listener);
        listeners.remove(listener);
        synchronized (stateLock) {
            if (listeners != null) {
                listeners.remove(listener);
            }
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> searchAsync(final SearchRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final SearchResultHandler resultHandler) {
@@ -603,47 +541,36 @@
        final LDAPSearchFutureResultImpl future =
                new LDAPSearchFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.searchRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.searchRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        final StringBuilder builder = new StringBuilder();
        builder.append("LDAPConnection(");
        builder.append(connection.getLocalAddress());
        builder.append(',');
@@ -652,23 +579,11 @@
        return builder.toString();
    }
    int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
            throws ErrorResultException {
        final int newMsgID = nextMsgID.getAndIncrement();
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                throw newErrorResult(connectionInvalidReason);
            }
            pendingRequests.put(newMsgID, future);
        }
        return newMsgID;
    }
    long cancelExpiredRequests(final long currentTime) {
        final long timeout = options.getTimeout(TimeUnit.MILLISECONDS);
        long delay = timeout;
        if (timeout > 0) {
            for (int requestID : pendingRequests.keySet()) {
            for (final int requestID : pendingRequests.keySet()) {
                final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
                if (future != null) {
                    final long diff = (future.getTimestamp() + timeout) - currentTime;
@@ -687,52 +602,53 @@
        return delay;
    }
    /**
     * Closes this connection, invoking event listeners as needed.
     *
     * @param unbindRequest
     *            The client provided unbind request if this is a client
     *            initiated close, or {@code null} if the connection has failed.
     * @param isDisconnectNotification
     *            {@code true} if this is a connection failure signalled by a
     *            server disconnect notification.
     * @param reason
     *            The result indicating why the connection was closed.
     */
    void close(final UnbindRequest unbindRequest, final boolean isDisconnectNotification,
            final Result reason) {
        boolean notifyClose = false;
        boolean notifyErrorOccurred = false;
        final boolean notifyClose;
        final boolean notifyErrorOccurred;
        final List<ConnectionEventListener> tmpListeners;
        synchronized (stateLock) {
            if (isClosed) {
                // Already closed.
                // Already closed locally.
                return;
            }
            if (connectionInvalidReason != null) {
                // Already closed.
                isClosed = true;
                return;
            }
            if (unbindRequest != null) {
                // User closed.
                isClosed = true;
            } else if (unbindRequest != null) {
                // Local close.
                notifyClose = true;
                notifyErrorOccurred = false;
                isClosed = true;
                tmpListeners = listeners;
                listeners = null; // Prevent future invocations.
                if (connectionInvalidReason == null) {
                    connectionInvalidReason = reason;
                }
            } else if (isFailed) {
                // Already failed.
                return;
            } else {
                // Connection has failed and this is the first indication.
                notifyClose = false;
                notifyErrorOccurred = true;
            }
            // Mark the connection as invalid.
            if (!isDisconnectNotification) {
                // Connection termination was detected locally, so use the
                // provided
                // reason for all subsequent requests.
                isFailed = true;
                failedDueToDisconnect = isDisconnectNotification;
                connectionInvalidReason = reason;
            } else {
                // Connection termination was triggered remotely. We don't want
                // to blindly pass on the result code to requests since it could
                // be confused for a genuine response. For example, if the
                // disconnect contained the invalidCredentials result code then
                // this could be misinterpreted as a genuine authentication
                // failure for subsequent bind requests.
                connectionInvalidReason =
                        Responses.newResult(ResultCode.CLIENT_SIDE_SERVER_DOWN)
                                .setDiagnosticMessage("Connection closed by server");
                tmpListeners = listeners; // Keep list for client close.
            }
        }
        // First abort all outstanding requests.
        for (int requestID : pendingRequests.keySet()) {
        for (final int requestID : pendingRequests.keySet()) {
            final AbstractLDAPFutureResultImpl<?> future = pendingRequests.remove(requestID);
            if (future != null) {
                future.adaptErrorResult(connectionInvalidReason);
@@ -741,7 +657,7 @@
        // Now try cleanly closing the connection if possible.
        // Only send unbind if specified.
        if (unbindRequest != null && !isDisconnectNotification) {
        if (unbindRequest != null) {
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
@@ -758,18 +674,29 @@
        connection.closeSilently();
        // Notify listeners.
        if (notifyClose) {
            for (final ConnectionEventListener listener : listeners) {
                listener.handleConnectionClosed();
        if (tmpListeners != null) {
            if (notifyErrorOccurred) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    // Use the reason provided in the disconnect notification.
                    listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
                }
            }
            if (notifyClose) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    listener.handleConnectionClosed();
                }
            }
        }
    }
        if (notifyErrorOccurred) {
            for (final ConnectionEventListener listener : listeners) {
                // Use the reason provided in the disconnect notification.
                listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
            }
    int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
            throws ErrorResultException {
        final int newMsgID = nextMsgID.getAndIncrement();
        synchronized (stateLock) {
            checkConnectionIsValid();
            pendingRequests.put(newMsgID, future);
        }
        return newMsgID;
    }
    LDAPOptions getLDAPOptions() {
@@ -781,13 +708,14 @@
    }
    void handleUnsolicitedNotification(final ExtendedResult result) {
        if (isClosed()) {
            // Don't notify after connection is closed.
            return;
        final List<ConnectionEventListener> tmpListeners;
        synchronized (stateLock) {
            tmpListeners = listeners;
        }
        for (final ConnectionEventListener listener : listeners) {
            listener.handleUnsolicitedNotification(result);
        if (tmpListeners != null) {
            for (final ConnectionEventListener listener : tmpListeners) {
                listener.handleUnsolicitedNotification(result);
            }
        }
    }
@@ -801,7 +729,7 @@
    void installFilter(final Filter filter) {
        synchronized (stateLock) {
            // Determine the index where the filter should be added.
            FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
            final FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
            int filterIndex = oldFilterChain.size() - 1;
            if (filter instanceof SSLFilter) {
                // Beneath any ConnectionSecurityLayerFilters if present,
@@ -815,7 +743,7 @@
            }
            // Create the new filter chain.
            FilterChain newFilterChain =
            final FilterChain newFilterChain =
                    FilterChainBuilder.stateless().addAll(oldFilterChain).add(filterIndex, filter)
                            .build();
            connection.setProcessor(newFilterChain);
@@ -831,7 +759,7 @@
    boolean isTLSEnabled() {
        synchronized (stateLock) {
            final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
            for (Filter filter : currentFilterChain) {
            for (final Filter filter : currentFilterChain) {
                if (filter instanceof SSLFilter) {
                    return true;
                }
@@ -856,19 +784,56 @@
                throw new IllegalStateException("TLS already enabled");
            }
            SSLEngineConfigurator sslEngineConfigurator =
            final SSLEngineConfigurator sslEngineConfigurator =
                    new SSLEngineConfigurator(sslContext, true, false, false);
            sslEngineConfigurator.setEnabledProtocols(protocols.isEmpty() ? null : protocols
                    .toArray(new String[protocols.size()]));
            sslEngineConfigurator.setEnabledCipherSuites(cipherSuites.isEmpty() ? null
                    : cipherSuites.toArray(new String[cipherSuites.size()]));
            SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
            final SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
            installFilter(sslFilter);
            sslFilter.handshake(connection, completionHandler);
        }
    }
    private ErrorResultException adaptRequestIOException(final IOException e) {
        // FIXME: what other sort of IOExceptions can be thrown?
        // FIXME: Is this the best result code?
        final Result errorResult =
                Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
        connectionErrorOccurred(errorResult);
        return newErrorResult(errorResult);
    }
    private void checkBindOrStartTLSInProgress() throws ErrorResultException {
        if (bindOrStartTLSInProgress.get()) {
            throw newErrorResult(ResultCode.OPERATIONS_ERROR,
                    "Bind or Start TLS operation in progress");
        }
    }
    private void checkConnectionIsValid() throws ErrorResultException {
        if (!isValid0()) {
            if (failedDueToDisconnect) {
                // Connection termination was triggered remotely. We don't want
                // to blindly pass on the result code to requests since it could
                // be confused for a genuine response. For example, if the
                // disconnect contained the invalidCredentials result code then
                // this could be misinterpreted as a genuine authentication
                // failure for subsequent bind requests.
                throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN,
                        "Connection closed by server");
            } else {
                throw newErrorResult(connectionInvalidReason);
            }
        }
    }
    private void connectionErrorOccurred(final Result reason) {
        close(null, false, reason);
    }
    private boolean isValid0() {
        return !isFailed && !isClosed;
    }
}