mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
12.46.2012 7983705a0ce4de2fd6e7a4061fe4afa0f9e8c66a
First part of fix for OPENDJ-590: ConnectionPool may return already closed/disconnected connections

* fix potential race conditions in LDAP client connection and server listener connection event notification.
* add more comprehensive unit tests for connection event listeners and Connection.isValid().
1 files added
5 files modified
1715 ■■■■ changed files
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java 49 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java 763 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java 350 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java 242 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java 199 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockConnectionEventListener.java 112 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPClientFilter.java
@@ -226,42 +226,43 @@
                            LDAP_CONNECTION_ATTR.get(ctx.getConnection());
                    if (ldapConnection != null) {
                        if (messageID == 0) {
                            // Unsolicited notification received.
                            if ((result.getOID() != null)
                                    && result.getOID().equals(OID_NOTICE_OF_DISCONNECTION)) {
                                // Treat this as a connection error.
                                final Result errorResult =
                                        Responses
                                                .newResult(result.getResultCode())
                                                .setDiagnosticMessage(result.getDiagnosticMessage());
                                ldapConnection.close(null, true, errorResult);
                                return;
                            } else {
                                // Unsolicited notification received.
                                ldapConnection.handleUnsolicitedNotification(result);
                            }
                        }
                        } else {
                            final AbstractLDAPFutureResultImpl<?> pendingRequest =
                                    ldapConnection.removePendingRequest(messageID);
                        final AbstractLDAPFutureResultImpl<?> pendingRequest =
                                ldapConnection.removePendingRequest(messageID);
                        if (pendingRequest != null) {
                            if (pendingRequest instanceof LDAPExtendedFutureResultImpl<?>) {
                                final LDAPExtendedFutureResultImpl<?> extendedFuture =
                                        ((LDAPExtendedFutureResultImpl<?>) pendingRequest);
                                try {
                                    handleExtendedResult0(ldapConnection, extendedFuture, result);
                                } catch (final DecodeException de) {
                                    // FIXME: should the connection be closed as
                                    // well?
                                    final Result errorResult =
                                            Responses.newResult(
                                                    ResultCode.CLIENT_SIDE_DECODING_ERROR)
                                                    .setDiagnosticMessage(de.getLocalizedMessage())
                                                    .setCause(de);
                                    extendedFuture.adaptErrorResult(errorResult);
                            if (pendingRequest != null) {
                                if (pendingRequest instanceof LDAPExtendedFutureResultImpl<?>) {
                                    final LDAPExtendedFutureResultImpl<?> extendedFuture =
                                            ((LDAPExtendedFutureResultImpl<?>) pendingRequest);
                                    try {
                                        handleExtendedResult0(ldapConnection, extendedFuture,
                                                result);
                                    } catch (final DecodeException de) {
                                        // FIXME: should the connection be closed as
                                        // well?
                                        final Result errorResult =
                                                Responses.newResult(
                                                        ResultCode.CLIENT_SIDE_DECODING_ERROR)
                                                        .setDiagnosticMessage(
                                                                de.getLocalizedMessage()).setCause(
                                                                de);
                                        extendedFuture.adaptErrorResult(errorResult);
                                    }
                                } else {
                                    throw new UnexpectedResponseException(messageID, result);
                                }
                            } else {
                                throw new UnexpectedResponseException(messageID, result);
                            }
                        }
                    }
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -83,22 +83,22 @@
/**
 * LDAP connection implementation.
 * <p>
 * TODO: handle illegal state exceptions.
 */
final class LDAPConnection extends AbstractAsynchronousConnection implements Connection {
    private final org.glassfish.grizzly.Connection<?> connection;
    private Result connectionInvalidReason;
    private boolean isClosed = false;
    private final List<ConnectionEventListener> listeners =
            new CopyOnWriteArrayList<ConnectionEventListener>();
    private final AtomicInteger nextMsgID = new AtomicInteger(1);
    private final AtomicBoolean bindOrStartTLSInProgress = new AtomicBoolean(false);
    private final org.glassfish.grizzly.Connection<?> connection;
    private final LDAPWriter ldapWriter = new LDAPWriter();
    private final AtomicInteger nextMsgID = new AtomicInteger(1);
    private final LDAPOptions options;
    private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests =
            new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
    private final Object stateLock = new Object();
    private final LDAPWriter ldapWriter = new LDAPWriter();
    private final LDAPOptions options;
    // Guarded by stateLock
    private Result connectionInvalidReason;
    private boolean failedDueToDisconnect = false;
    private boolean isClosed = false;
    private boolean isFailed = false;
    private List<ConnectionEventListener> listeners = null;
    /**
     * Creates a new LDAP connection.
@@ -116,60 +116,47 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Void> abandonAsync(final AbandonRequest request) {
        final AbstractLDAPFutureResultImpl<?> pendingRequest;
        final int messageID = nextMsgID.getAndIncrement();
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                return new CompletedFutureResult<Void>(newErrorResult(connectionInvalidReason),
                        messageID);
            }
            if (bindOrStartTLSInProgress.get()) {
                final Result errorResult =
                        Responses.newResult(ResultCode.OPERATIONS_ERROR).setDiagnosticMessage(
                                "Bind or Start TLS operation in progress");
                return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
            }
            // First remove the future associated with the request to be
            // abandoned.
            pendingRequest = pendingRequests.remove(request.getRequestID());
        }
        if (pendingRequest == null) {
            // There has never been a request with the specified message ID or
            // the response has already been received and handled. We can ignore
            // this abandon request.
            // Message ID will be -1 since no request was sent.
            return new CompletedFutureResult<Void>((Void) null);
        }
        pendingRequest.cancel(false);
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.abandonRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
                return new CompletedFutureResult<Void>((Void) null, messageID);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                // Remove the future associated with the request to be abandoned.
                pendingRequest = pendingRequests.remove(request.getRequestID());
            }
        } catch (final IOException e) {
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            return new CompletedFutureResult<Void>(newErrorResult(errorResult), messageID);
            if (pendingRequest == null) {
                // There has never been a request with the specified message ID or
                // the response has already been received and handled. We can ignore
                // this abandon request.
                // Message ID will be -1 since no request was sent.
                return new CompletedFutureResult<Void>((Void) null);
            }
            pendingRequest.cancel(false);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.abandonRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                    return new CompletedFutureResult<Void>((Void) null, messageID);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            return new CompletedFutureResult<Void>(e, messageID);
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> addAsync(final AddRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
@@ -177,59 +164,67 @@
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.addRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.addRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void addConnectionEventListener(final ConnectionEventListener listener) {
        Validator.ensureNotNull(listener);
        listeners.add(listener);
        final boolean notifyClose;
        final boolean notifyErrorOccurred;
        synchronized (stateLock) {
            notifyClose = isClosed;
            notifyErrorOccurred = isFailed;
            if (!isClosed) {
                if (listeners == null) {
                    listeners = new LinkedList<ConnectionEventListener>();
                }
                listeners.add(listener);
            }
        }
        if (notifyErrorOccurred) {
            // Use the reason provided in the disconnect notification.
            listener.handleConnectionError(failedDueToDisconnect,
                    newErrorResult(connectionInvalidReason));
        }
        if (notifyClose) {
            listener.handleConnectionClosed();
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bindAsync(final BindRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super BindResult> resultHandler) {
        final int messageID = nextMsgID.getAndIncrement();
        BindClient context;
        final BindClient context;
        try {
            context =
                    request.createBindClient(
@@ -253,49 +248,41 @@
                new LDAPBindFutureResultImpl(messageID, context, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (!pendingRequests.isEmpty()) {
                future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage(
                                "There are other operations pending on this connection"));
                return future;
            }
            if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                // Use the bind client to get the initial request instead of
                // using the bind request passed to this method.
                final GenericBindRequest initialRequest = context.nextBindRequest();
                ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                if (!pendingRequests.isEmpty()) {
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                            .setDiagnosticMessage(
                                    "There are other operations pending on this connection"));
                    return future;
                }
                if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                    future.setResultOrError(Responses.newBindResult(ResultCode.OPERATIONS_ERROR)
                            .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                    return future;
                }
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            bindOrStartTLSInProgress.set(false);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    // Use the bind client to get the initial request instead of
                    // using the bind request passed to this method.
                    final GenericBindRequest initialRequest = context.nextBindRequest();
                    ldapWriter.bindRequest(asn1Writer, messageID, 3, initialRequest);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                bindOrStartTLSInProgress.set(false);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
@@ -304,6 +291,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void close(final UnbindRequest request, final String reason) {
        // FIXME: I18N need to internationalize this message.
        Validator.ensureNotNull(request);
@@ -316,6 +304,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compareAsync(final CompareRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super CompareResult> resultHandler) {
@@ -323,45 +312,34 @@
        final LDAPCompareFutureResultImpl future =
                new LDAPCompareFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newCompareResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.compareRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.compareRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> deleteAsync(final DeleteRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
@@ -369,45 +347,34 @@
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.deleteRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.deleteRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
            final ExtendedRequest<R> request,
            final IntermediateResponseHandler intermediateResponseHandler,
@@ -416,68 +383,54 @@
        final LDAPExtendedFutureResultImpl<R> future =
                new LDAPExtendedFutureResultImpl<R>(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
                if (!pendingRequests.isEmpty()) {
                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                            ResultCode.OPERATIONS_ERROR, "",
                            "There are pending operations on this connection"));
                    return future;
                }
                if (isTLSEnabled()) {
                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                            ResultCode.OPERATIONS_ERROR, "",
                            "This connection is already TLS enabled"));
                    return future;
                }
                if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                            ResultCode.OPERATIONS_ERROR, "",
                            "Bind or Start TLS operation in progress"));
                    return future;
                }
            } else {
                if (bindOrStartTLSInProgress.get()) {
                    future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                            ResultCode.OPERATIONS_ERROR, "",
                            "Bind or Start TLS operation in progress"));
                    return future;
                }
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.extendedRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                if (request.getOID().equals(StartTLSExtendedRequest.OID)) {
                    if (!pendingRequests.isEmpty()) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "There are pending operations on this connection"));
                        return future;
                    } else if (isTLSEnabled()) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "This connection is already TLS enabled"));
                        return future;
                    } else if (!bindOrStartTLSInProgress.compareAndSet(false, true)) {
                        future.setResultOrError(request.getResultDecoder().newExtendedErrorResult(
                                ResultCode.OPERATIONS_ERROR, "",
                                "Bind or Start TLS operation in progress"));
                        return future;
                    }
                } else {
                    checkBindOrStartTLSInProgress();
                }
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            bindOrStartTLSInProgress.set(false);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.extendedRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                bindOrStartTLSInProgress.set(false);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isClosed() {
        synchronized (stateLock) {
            return isClosed;
@@ -487,15 +440,17 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid() {
        synchronized (stateLock) {
            return connectionInvalidReason == null && !isClosed;
            return isValid0();
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyAsync(final ModifyRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
@@ -503,45 +458,34 @@
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.modifyRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.modifyRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final ResultHandler<? super Result> resultHandler) {
@@ -549,53 +493,47 @@
        final LDAPFutureResultImpl future =
                new LDAPFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.modifyDNRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void removeConnectionEventListener(final ConnectionEventListener listener) {
        Validator.ensureNotNull(listener);
        listeners.remove(listener);
        synchronized (stateLock) {
            if (listeners != null) {
                listeners.remove(listener);
            }
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> searchAsync(final SearchRequest request,
            final IntermediateResponseHandler intermediateResponseHandler,
            final SearchResultHandler resultHandler) {
@@ -603,47 +541,36 @@
        final LDAPSearchFutureResultImpl future =
                new LDAPSearchFutureResultImpl(messageID, request, resultHandler,
                        intermediateResponseHandler, this);
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                future.adaptErrorResult(connectionInvalidReason);
                return future;
            }
            if (bindOrStartTLSInProgress.get()) {
                future.setResultOrError(Responses.newResult(ResultCode.OPERATIONS_ERROR)
                        .setDiagnosticMessage("Bind or Start TLS operation in progress"));
                return future;
            }
            pendingRequests.put(messageID, future);
        }
        try {
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                ldapWriter.searchRequest(asn1Writer, messageID, request);
                connection.write(asn1Writer.getBuffer(), null);
            } finally {
                asn1Writer.recycle();
            synchronized (stateLock) {
                checkConnectionIsValid();
                checkBindOrStartTLSInProgress();
                pendingRequests.put(messageID, future);
            }
        } catch (final IOException e) {
            pendingRequests.remove(messageID);
            // FIXME: what other sort of IOExceptions can be thrown?
            // FIXME: Is this the best result code?
            final Result errorResult =
                    Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
            connectionErrorOccurred(errorResult);
            future.adaptErrorResult(errorResult);
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
                    ldapWriter.searchRequest(asn1Writer, messageID, request);
                    connection.write(asn1Writer.getBuffer(), null);
                } finally {
                    asn1Writer.recycle();
                }
            } catch (final IOException e) {
                pendingRequests.remove(messageID);
                throw adaptRequestIOException(e);
            }
        } catch (final ErrorResultException e) {
            future.adaptErrorResult(e.getResult());
        }
        return future;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        final StringBuilder builder = new StringBuilder();
        builder.append("LDAPConnection(");
        builder.append(connection.getLocalAddress());
        builder.append(',');
@@ -652,23 +579,11 @@
        return builder.toString();
    }
    int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
            throws ErrorResultException {
        final int newMsgID = nextMsgID.getAndIncrement();
        synchronized (stateLock) {
            if (connectionInvalidReason != null) {
                throw newErrorResult(connectionInvalidReason);
            }
            pendingRequests.put(newMsgID, future);
        }
        return newMsgID;
    }
    long cancelExpiredRequests(final long currentTime) {
        final long timeout = options.getTimeout(TimeUnit.MILLISECONDS);
        long delay = timeout;
        if (timeout > 0) {
            for (int requestID : pendingRequests.keySet()) {
            for (final int requestID : pendingRequests.keySet()) {
                final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
                if (future != null) {
                    final long diff = (future.getTimestamp() + timeout) - currentTime;
@@ -687,52 +602,53 @@
        return delay;
    }
    /**
     * Closes this connection, invoking event listeners as needed.
     *
     * @param unbindRequest
     *            The client provided unbind request if this is a client
     *            initiated close, or {@code null} if the connection has failed.
     * @param isDisconnectNotification
     *            {@code true} if this is a connection failure signalled by a
     *            server disconnect notification.
     * @param reason
     *            The result indicating why the connection was closed.
     */
    void close(final UnbindRequest unbindRequest, final boolean isDisconnectNotification,
            final Result reason) {
        boolean notifyClose = false;
        boolean notifyErrorOccurred = false;
        final boolean notifyClose;
        final boolean notifyErrorOccurred;
        final List<ConnectionEventListener> tmpListeners;
        synchronized (stateLock) {
            if (isClosed) {
                // Already closed.
                // Already closed locally.
                return;
            }
            if (connectionInvalidReason != null) {
                // Already closed.
                isClosed = true;
                return;
            }
            if (unbindRequest != null) {
                // User closed.
                isClosed = true;
            } else if (unbindRequest != null) {
                // Local close.
                notifyClose = true;
                notifyErrorOccurred = false;
                isClosed = true;
                tmpListeners = listeners;
                listeners = null; // Prevent future invocations.
                if (connectionInvalidReason == null) {
                    connectionInvalidReason = reason;
                }
            } else if (isFailed) {
                // Already failed.
                return;
            } else {
                // Connection has failed and this is the first indication.
                notifyClose = false;
                notifyErrorOccurred = true;
            }
            // Mark the connection as invalid.
            if (!isDisconnectNotification) {
                // Connection termination was detected locally, so use the
                // provided
                // reason for all subsequent requests.
                isFailed = true;
                failedDueToDisconnect = isDisconnectNotification;
                connectionInvalidReason = reason;
            } else {
                // Connection termination was triggered remotely. We don't want
                // to blindly pass on the result code to requests since it could
                // be confused for a genuine response. For example, if the
                // disconnect contained the invalidCredentials result code then
                // this could be misinterpreted as a genuine authentication
                // failure for subsequent bind requests.
                connectionInvalidReason =
                        Responses.newResult(ResultCode.CLIENT_SIDE_SERVER_DOWN)
                                .setDiagnosticMessage("Connection closed by server");
                tmpListeners = listeners; // Keep list for client close.
            }
        }
        // First abort all outstanding requests.
        for (int requestID : pendingRequests.keySet()) {
        for (final int requestID : pendingRequests.keySet()) {
            final AbstractLDAPFutureResultImpl<?> future = pendingRequests.remove(requestID);
            if (future != null) {
                future.adaptErrorResult(connectionInvalidReason);
@@ -741,7 +657,7 @@
        // Now try cleanly closing the connection if possible.
        // Only send unbind if specified.
        if (unbindRequest != null && !isDisconnectNotification) {
        if (unbindRequest != null) {
            try {
                final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
                try {
@@ -758,18 +674,29 @@
        connection.closeSilently();
        // Notify listeners.
        if (notifyClose) {
            for (final ConnectionEventListener listener : listeners) {
                listener.handleConnectionClosed();
        if (tmpListeners != null) {
            if (notifyErrorOccurred) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    // Use the reason provided in the disconnect notification.
                    listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
                }
            }
            if (notifyClose) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    listener.handleConnectionClosed();
                }
            }
        }
    }
        if (notifyErrorOccurred) {
            for (final ConnectionEventListener listener : listeners) {
                // Use the reason provided in the disconnect notification.
                listener.handleConnectionError(isDisconnectNotification, newErrorResult(reason));
            }
    int continuePendingBindRequest(final LDAPBindFutureResultImpl future)
            throws ErrorResultException {
        final int newMsgID = nextMsgID.getAndIncrement();
        synchronized (stateLock) {
            checkConnectionIsValid();
            pendingRequests.put(newMsgID, future);
        }
        return newMsgID;
    }
    LDAPOptions getLDAPOptions() {
@@ -781,13 +708,14 @@
    }
    void handleUnsolicitedNotification(final ExtendedResult result) {
        if (isClosed()) {
            // Don't notify after connection is closed.
            return;
        final List<ConnectionEventListener> tmpListeners;
        synchronized (stateLock) {
            tmpListeners = listeners;
        }
        for (final ConnectionEventListener listener : listeners) {
            listener.handleUnsolicitedNotification(result);
        if (tmpListeners != null) {
            for (final ConnectionEventListener listener : tmpListeners) {
                listener.handleUnsolicitedNotification(result);
            }
        }
    }
@@ -801,7 +729,7 @@
    void installFilter(final Filter filter) {
        synchronized (stateLock) {
            // Determine the index where the filter should be added.
            FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
            final FilterChain oldFilterChain = (FilterChain) connection.getProcessor();
            int filterIndex = oldFilterChain.size() - 1;
            if (filter instanceof SSLFilter) {
                // Beneath any ConnectionSecurityLayerFilters if present,
@@ -815,7 +743,7 @@
            }
            // Create the new filter chain.
            FilterChain newFilterChain =
            final FilterChain newFilterChain =
                    FilterChainBuilder.stateless().addAll(oldFilterChain).add(filterIndex, filter)
                            .build();
            connection.setProcessor(newFilterChain);
@@ -831,7 +759,7 @@
    boolean isTLSEnabled() {
        synchronized (stateLock) {
            final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
            for (Filter filter : currentFilterChain) {
            for (final Filter filter : currentFilterChain) {
                if (filter instanceof SSLFilter) {
                    return true;
                }
@@ -856,19 +784,56 @@
                throw new IllegalStateException("TLS already enabled");
            }
            SSLEngineConfigurator sslEngineConfigurator =
            final SSLEngineConfigurator sslEngineConfigurator =
                    new SSLEngineConfigurator(sslContext, true, false, false);
            sslEngineConfigurator.setEnabledProtocols(protocols.isEmpty() ? null : protocols
                    .toArray(new String[protocols.size()]));
            sslEngineConfigurator.setEnabledCipherSuites(cipherSuites.isEmpty() ? null
                    : cipherSuites.toArray(new String[cipherSuites.size()]));
            SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
            final SSLFilter sslFilter = new SSLFilter(null, sslEngineConfigurator);
            installFilter(sslFilter);
            sslFilter.handshake(connection, completionHandler);
        }
    }
    private ErrorResultException adaptRequestIOException(final IOException e) {
        // FIXME: what other sort of IOExceptions can be thrown?
        // FIXME: Is this the best result code?
        final Result errorResult =
                Responses.newResult(ResultCode.CLIENT_SIDE_ENCODING_ERROR).setCause(e);
        connectionErrorOccurred(errorResult);
        return newErrorResult(errorResult);
    }
    private void checkBindOrStartTLSInProgress() throws ErrorResultException {
        if (bindOrStartTLSInProgress.get()) {
            throw newErrorResult(ResultCode.OPERATIONS_ERROR,
                    "Bind or Start TLS operation in progress");
        }
    }
    private void checkConnectionIsValid() throws ErrorResultException {
        if (!isValid0()) {
            if (failedDueToDisconnect) {
                // Connection termination was triggered remotely. We don't want
                // to blindly pass on the result code to requests since it could
                // be confused for a genuine response. For example, if the
                // disconnect contained the invalidCredentials result code then
                // this could be misinterpreted as a genuine authentication
                // failure for subsequent bind requests.
                throw newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN,
                        "Connection closed by server");
            } else {
                throw newErrorResult(connectionInvalidReason);
            }
        }
    }
    private void connectionErrorOccurred(final Result reason) {
        close(null, false, reason);
    }
    private boolean isValid0() {
        return !isFailed && !isClosed;
    }
}
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPServerFilter.java
@@ -33,6 +33,7 @@
import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -89,12 +90,12 @@
final class LDAPServerFilter extends BaseFilter {
    private abstract class AbstractHandler<R extends Result> implements
            IntermediateResponseHandler, ResultHandler<R> {
        protected final ClientContextImpl context;
        protected final int messageID;
        protected final Connection<?> connection;
        protected AbstractHandler(final int messageID, final Connection<?> connection) {
        protected AbstractHandler(final ClientContextImpl context, final int messageID) {
            this.messageID = messageID;
            this.connection = connection;
            this.context = context;
        }
        @Override
@@ -102,9 +103,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.intermediateResponse(asn1Writer, messageID, response);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
                return false;
            } finally {
                asn1Writer.recycle();
@@ -114,8 +115,8 @@
    }
    private final class AddHandler extends AbstractHandler<Result> {
        private AddHandler(final int messageID, final Connection<?> connection) {
            super(messageID, connection);
        private AddHandler(final ClientContextImpl context, final int messageID) {
            super(context, messageID);
        }
        @Override
@@ -128,9 +129,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.addResult(asn1Writer, messageID, result);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
@@ -138,8 +139,8 @@
    }
    private final class BindHandler extends AbstractHandler<BindResult> {
        private BindHandler(final int messageID, final Connection<?> connection) {
            super(messageID, connection);
        private BindHandler(final ClientContextImpl context, final int messageID) {
            super(context, messageID);
        }
        @Override
@@ -164,9 +165,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.bindResult(asn1Writer, messageID, result);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
@@ -175,9 +176,7 @@
    private final class ClientContextImpl implements LDAPClientContext {
        private final Connection<?> connection;
        private volatile boolean isClosed = false;
        private final AtomicBoolean isClosed = new AtomicBoolean();
        private ServerConnection<Integer> serverConnection = null;
        private ClientContextImpl(final Connection<?> connection) {
@@ -186,18 +185,44 @@
        @Override
        public void disconnect() {
            LDAPServerFilter.notifyConnectionDisconnected(connection, null, null);
            disconnect0(null, null);
        }
        @Override
        public void disconnect(final ResultCode resultCode, final String message) {
            Validator.ensureNotNull(resultCode);
            final GenericExtendedResult notification =
                    Responses.newGenericExtendedResult(resultCode).setOID(
                            OID_NOTICE_OF_DISCONNECTION).setDiagnosticMessage(message);
            sendUnsolicitedNotification(notification);
            LDAPServerFilter.notifyConnectionDisconnected(connection, resultCode, message);
            disconnect0(resultCode, message);
        }
        @Override
        public void enableConnectionSecurityLayer(final ConnectionSecurityLayer layer) {
            synchronized (this) {
                installFilter(new ConnectionSecurityLayerFilter(layer, connection.getTransport()
                        .getMemoryManager()));
            }
        }
        @Override
        public void enableTLS(final SSLContext sslContext, final String[] protocols,
                final String[] suites, final boolean wantClientAuth, final boolean needClientAuth) {
            Validator.ensureNotNull(sslContext);
            synchronized (this) {
                if (isTLSEnabled()) {
                    throw new IllegalStateException("TLS already enabled");
                }
                final SSLEngineConfigurator sslEngineConfigurator =
                        new SSLEngineConfigurator(sslContext, false, false, false);
                sslEngineConfigurator.setEnabledCipherSuites(suites);
                sslEngineConfigurator.setEnabledProtocols(protocols);
                sslEngineConfigurator.setWantClientAuth(wantClientAuth);
                sslEngineConfigurator.setNeedClientAuth(needClientAuth);
                installFilter(new SSLFilter(sslEngineConfigurator, null));
            }
        }
        @Override
@@ -232,7 +257,7 @@
         */
        @Override
        public boolean isClosed() {
            return isClosed;
            return isClosed.get();
        }
        @Override
@@ -242,44 +267,18 @@
                LDAP_WRITER.extendedResult(asn1Writer, 0, notification);
                connection.write(asn1Writer.getBuffer(), null);
            } catch (final IOException ioe) {
                LDAPServerFilter.notifyConnectionException(connection, ioe);
                handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
        }
        @Override
        public void enableConnectionSecurityLayer(final ConnectionSecurityLayer layer) {
            synchronized (this) {
                installFilter(new ConnectionSecurityLayerFilter(layer, connection.getTransport()
                        .getMemoryManager()));
            }
        }
        @Override
        public void enableTLS(final SSLContext sslContext, final String[] protocols,
                final String[] suites, final boolean wantClientAuth, final boolean needClientAuth) {
            Validator.ensureNotNull(sslContext);
            synchronized (this) {
                if (isTLSEnabled()) {
                    throw new IllegalStateException("TLS already enabled");
                }
                SSLEngineConfigurator sslEngineConfigurator =
                        new SSLEngineConfigurator(sslContext, false, false, false);
                sslEngineConfigurator.setEnabledCipherSuites(suites);
                sslEngineConfigurator.setEnabledProtocols(protocols);
                sslEngineConfigurator.setWantClientAuth(wantClientAuth);
                sslEngineConfigurator.setNeedClientAuth(needClientAuth);
                installFilter(new SSLFilter(sslEngineConfigurator, null));
            }
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public String toString() {
            StringBuilder builder = new StringBuilder();
            final StringBuilder builder = new StringBuilder();
            builder.append("LDAPClientContext(");
            builder.append(getLocalAddress());
            builder.append(',');
@@ -288,16 +287,67 @@
            return builder.toString();
        }
        private void close() {
            isClosed = true;
        public void write(final ASN1BufferWriter asn1Writer) {
            connection.write(asn1Writer.getBuffer(), null);
        }
        private void disconnect0(final ResultCode resultCode, final String message) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                try {
                    // Notify the server connection: it may be null if disconnect is
                    // invoked during accept.
                    if (serverConnection != null) {
                        serverConnection.handleConnectionDisconnected(resultCode, message);
                    }
                } finally {
                    // Close the connection.
                    connection.closeSilently();
                }
            }
        }
        private ServerConnection<Integer> getServerConnection() {
            return serverConnection;
        }
        private void setServerConnection(final ServerConnection<Integer> serverConnection) {
            this.serverConnection = serverConnection;
        private void handleClose(final int messageID, final UnbindRequest unbindRequest) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                try {
                    // Notify the server connection: it may be null if disconnect is
                    // invoked during accept.
                    if (serverConnection != null) {
                        serverConnection.handleConnectionClosed(messageID, unbindRequest);
                    }
                } finally {
                    // If this close was a result of an unbind request then the
                    // connection won't actually be closed yet. To avoid TIME_WAIT TCP
                    // state, let the client disconnect.
                    if (unbindRequest != null) {
                        return;
                    }
                    // Close the connection.
                    connection.closeSilently();
                }
            }
        }
        private void handleError(final Throwable error) {
            // Close this connection context.
            if (isClosed.compareAndSet(false, true)) {
                try {
                    // Notify the server connection: it may be null if disconnect is
                    // invoked during accept.
                    if (serverConnection != null) {
                        serverConnection.handleConnectionError(error);
                    }
                } finally {
                    // Close the connection.
                    connection.closeSilently();
                }
            }
        }
        /**
@@ -313,8 +363,7 @@
            int filterIndex = oldFilterChain.size() - 1;
            if (filter instanceof SSLFilter) {
                // Beneath any ConnectionSecurityLayerFilters if present,
                // otherwise
                // beneath the LDAP filter.
                // otherwise beneath the LDAP filter.
                for (int i = oldFilterChain.size() - 2; i >= 0; i--) {
                    if (!(oldFilterChain.get(i) instanceof ConnectionSecurityLayerFilter)) {
                        filterIndex = i + 1;
@@ -339,7 +388,7 @@
        private boolean isTLSEnabled() {
            synchronized (this) {
                final FilterChain currentFilterChain = (FilterChain) connection.getProcessor();
                for (Filter filter : currentFilterChain) {
                for (final Filter filter : currentFilterChain) {
                    if (filter instanceof SSLFilter) {
                        return true;
                    }
@@ -348,11 +397,14 @@
            }
        }
        private void setServerConnection(final ServerConnection<Integer> serverConnection) {
            this.serverConnection = serverConnection;
        }
    }
    private final class CompareHandler extends AbstractHandler<CompareResult> {
        private CompareHandler(final int messageID, final Connection<?> connection) {
            super(messageID, connection);
        private CompareHandler(final ClientContextImpl context, final int messageID) {
            super(context, messageID);
        }
        @Override
@@ -377,9 +429,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.compareResult(asn1Writer, messageID, result);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
@@ -387,8 +439,8 @@
    }
    private final class DeleteHandler extends AbstractHandler<Result> {
        private DeleteHandler(final int messageID, final Connection<?> connection) {
            super(messageID, connection);
        private DeleteHandler(final ClientContextImpl context, final int messageID) {
            super(context, messageID);
        }
        @Override
@@ -401,9 +453,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.deleteResult(asn1Writer, messageID, result);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
@@ -411,8 +463,8 @@
    }
    private final class ExtendedHandler<R extends ExtendedResult> extends AbstractHandler<R> {
        private ExtendedHandler(final int messageID, final Connection<?> connection) {
            super(messageID, connection);
        private ExtendedHandler(final ClientContextImpl context, final int messageID) {
            super(context, messageID);
        }
        @Override
@@ -438,9 +490,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.extendedResult(asn1Writer, messageID, result);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
@@ -448,8 +500,8 @@
    }
    private final class ModifyDNHandler extends AbstractHandler<Result> {
        private ModifyDNHandler(final int messageID, final Connection<?> connection) {
            super(messageID, connection);
        private ModifyDNHandler(final ClientContextImpl context, final int messageID) {
            super(context, messageID);
        }
        @Override
@@ -462,9 +514,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.modifyDNResult(asn1Writer, messageID, result);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
@@ -472,8 +524,8 @@
    }
    private final class ModifyHandler extends AbstractHandler<Result> {
        private ModifyHandler(final int messageID, final Connection<?> connection) {
            super(messageID, connection);
        private ModifyHandler(final ClientContextImpl context, final int messageID) {
            super(context, messageID);
        }
        @Override
@@ -486,9 +538,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.modifyResult(asn1Writer, messageID, result);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
@@ -497,8 +549,8 @@
    private final class SearchHandler extends AbstractHandler<Result> implements
            SearchResultHandler {
        private SearchHandler(final int messageID, final Connection<?> connection) {
            super(messageID, connection);
        private SearchHandler(final ClientContextImpl context, final int messageID) {
            super(context, messageID);
        }
        @Override
@@ -506,9 +558,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.searchResultEntry(asn1Writer, messageID, entry);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
                return false;
            } finally {
                asn1Writer.recycle();
@@ -526,9 +578,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.searchResultReference(asn1Writer, messageID, reference);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
                return false;
            } finally {
                asn1Writer.recycle();
@@ -541,9 +593,9 @@
            final ASN1BufferWriter asn1Writer = ASN1BufferWriter.getWriter();
            try {
                LDAP_WRITER.searchResult(asn1Writer, messageID, result);
                connection.write(asn1Writer.getBuffer(), null);
                context.write(asn1Writer);
            } catch (final IOException ioe) {
                notifyConnectionException(connection, ioe);
                context.handleError(ioe);
            } finally {
                asn1Writer.recycle();
            }
@@ -554,6 +606,14 @@
    // following RFCs: 5289, 4346, 3268,4132 and 4162.
    private static final Map<String, Integer> CIPHER_KEY_SIZES;
    private static final Attribute<ASN1BufferReader> LDAP_ASN1_READER_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPASN1Reader");
    private static final Attribute<ClientContextImpl> LDAP_CONNECTION_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPServerConnection");
    private static final LDAPWriter LDAP_WRITER = new LDAPWriter();
    static {
        CIPHER_KEY_SIZES = new LinkedHashMap<String, Integer>();
        CIPHER_KEY_SIZES.put("_WITH_AES_256_CBC_", 256);
@@ -572,78 +632,10 @@
        CIPHER_KEY_SIZES.put("_WITH_NULL_", 0);
    }
    private static final LDAPWriter LDAP_WRITER = new LDAPWriter();
    private final LDAPReader ldapReader;
    private static final Attribute<ClientContextImpl> LDAP_CONNECTION_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPServerConnection");
    private static final Attribute<ASN1BufferReader> LDAP_ASN1_READER_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("LDAPASN1Reader");
    private static void notifyConnectionClosed(final Connection<?> connection, final int messageID,
            final UnbindRequest unbindRequest) {
        final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection);
        if (clientContext != null) {
            // Close the connection context.
            clientContext.close();
            // Notify the server connection: it may be null if disconnect is
            // invoked during accept.
            final ServerConnection<Integer> serverConnection = clientContext.getServerConnection();
            if (serverConnection != null) {
                serverConnection.handleConnectionClosed(messageID, unbindRequest);
            }
            // If this close was a result of an unbind request then the
            // connection won't actually be closed yet. To avoid TIME_WAIT TCP
            // state, let the client disconnect.
            if (unbindRequest != null) {
                return;
            }
            // Close the connection.
            connection.closeSilently();
        }
    }
    private static void notifyConnectionDisconnected(final Connection<?> connection,
            final ResultCode resultCode, final String message) {
        final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection);
        if (clientContext != null) {
            // Close the connection context.
            clientContext.close();
            // Notify the server connection: it may be null if disconnect is
            // invoked during accept.
            final ServerConnection<Integer> serverConnection = clientContext.getServerConnection();
            if (serverConnection != null) {
                serverConnection.handleConnectionDisconnected(resultCode, message);
            }
            // Close the connection.
            connection.closeSilently();
        }
    }
    private static void notifyConnectionException(final Connection<?> connection,
            final Throwable error) {
        final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(connection);
        if (clientContext != null) {
            // Close the connection context.
            clientContext.close();
            // Notify the server connection: it may be null if disconnect is
            // invoked during accept.
            final ServerConnection<Integer> serverConnection = clientContext.getServerConnection();
            if (serverConnection != null) {
                serverConnection.handleConnectionError(error);
            }
            // Close the connection.
            connection.closeSilently();
        }
    }
    private final LDAPListenerImpl listener;
    private final int maxASN1ElementSize;
    private final AbstractLDAPMessageHandler<FilterChainContext> serverRequestHandler =
            new AbstractLDAPMessageHandler<FilterChainContext>() {
                @Override
@@ -664,7 +656,7 @@
                            LDAP_CONNECTION_ATTR.get(ctx.getConnection());
                    if (clientContext != null) {
                        final ServerConnection<Integer> conn = clientContext.getServerConnection();
                        final AddHandler handler = new AddHandler(messageID, ctx.getConnection());
                        final AddHandler handler = new AddHandler(clientContext, messageID);
                        conn.handleAdd(messageID, request, handler, handler);
                    }
                }
@@ -677,7 +669,7 @@
                            LDAP_CONNECTION_ATTR.get(ctx.getConnection());
                    if (clientContext != null) {
                        final ServerConnection<Integer> conn = clientContext.getServerConnection();
                        final BindHandler handler = new BindHandler(messageID, ctx.getConnection());
                        final BindHandler handler = new BindHandler(clientContext, messageID);
                        conn.handleBind(messageID, version, bindContext, handler, handler);
                    }
                }
@@ -689,8 +681,7 @@
                            LDAP_CONNECTION_ATTR.get(ctx.getConnection());
                    if (clientContext != null) {
                        final ServerConnection<Integer> conn = clientContext.getServerConnection();
                        final CompareHandler handler =
                                new CompareHandler(messageID, ctx.getConnection());
                        final CompareHandler handler = new CompareHandler(clientContext, messageID);
                        conn.handleCompare(messageID, request, handler, handler);
                    }
                }
@@ -702,8 +693,7 @@
                            LDAP_CONNECTION_ATTR.get(ctx.getConnection());
                    if (clientContext != null) {
                        final ServerConnection<Integer> conn = clientContext.getServerConnection();
                        final DeleteHandler handler =
                                new DeleteHandler(messageID, ctx.getConnection());
                        final DeleteHandler handler = new DeleteHandler(clientContext, messageID);
                        conn.handleDelete(messageID, request, handler, handler);
                    }
                }
@@ -717,7 +707,7 @@
                    if (clientContext != null) {
                        final ServerConnection<Integer> conn = clientContext.getServerConnection();
                        final ExtendedHandler<R> handler =
                                new ExtendedHandler<R>(messageID, ctx.getConnection());
                                new ExtendedHandler<R>(clientContext, messageID);
                        conn.handleExtendedRequest(messageID, request, handler, handler);
                    }
                }
@@ -730,7 +720,7 @@
                    if (clientContext != null) {
                        final ServerConnection<Integer> conn = clientContext.getServerConnection();
                        final ModifyDNHandler handler =
                                new ModifyDNHandler(messageID, ctx.getConnection());
                                new ModifyDNHandler(clientContext, messageID);
                        conn.handleModifyDN(messageID, request, handler, handler);
                    }
                }
@@ -742,8 +732,7 @@
                            LDAP_CONNECTION_ATTR.get(ctx.getConnection());
                    if (clientContext != null) {
                        final ServerConnection<Integer> conn = clientContext.getServerConnection();
                        final ModifyHandler handler =
                                new ModifyHandler(messageID, ctx.getConnection());
                        final ModifyHandler handler = new ModifyHandler(clientContext, messageID);
                        conn.handleModify(messageID, request, handler, handler);
                    }
                }
@@ -755,8 +744,7 @@
                            LDAP_CONNECTION_ATTR.get(ctx.getConnection());
                    if (clientContext != null) {
                        final ServerConnection<Integer> conn = clientContext.getServerConnection();
                        final SearchHandler handler =
                                new SearchHandler(messageID, ctx.getConnection());
                        final SearchHandler handler = new SearchHandler(clientContext, messageID);
                        conn.handleSearch(messageID, request, handler, handler);
                    }
                }
@@ -764,23 +752,23 @@
                @Override
                public void unbindRequest(final FilterChainContext ctx, final int messageID,
                        final UnbindRequest request) {
                    notifyConnectionClosed(ctx.getConnection(), messageID, request);
                    // Remove the client context causing any subsequent LDAP
                    // traffic to be ignored.
                    final ClientContextImpl clientContext =
                            LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
                    if (clientContext != null) {
                        clientContext.handleClose(messageID, request);
                    }
                }
                @Override
                public void unrecognizedMessage(final FilterChainContext ctx, final int messageID,
                        final byte messageTag, final ByteString messageBytes) {
                    notifyConnectionException(ctx.getConnection(), new UnsupportedMessageException(
                            messageID, messageTag, messageBytes));
                    exceptionOccurred(ctx, new UnsupportedMessageException(messageID, messageTag,
                            messageBytes));
                }
            };
    private final int maxASN1ElementSize;
    private final LDAPReader ldapReader;
    private final LDAPListenerImpl listener;
    LDAPServerFilter(final LDAPListenerImpl listener, final LDAPReader ldapReader,
            final int maxASN1ElementSize) {
        this.listener = listener;
@@ -790,7 +778,10 @@
    @Override
    public void exceptionOccurred(final FilterChainContext ctx, final Throwable error) {
        notifyConnectionException(ctx.getConnection(), error);
        final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
        if (clientContext != null) {
            clientContext.handleError(error);
        }
    }
    @Override
@@ -812,7 +803,10 @@
    @Override
    public NextAction handleClose(final FilterChainContext ctx) throws IOException {
        notifyConnectionClosed(ctx.getConnection(), -1, null);
        final ClientContextImpl clientContext = LDAP_CONNECTION_ATTR.remove(ctx.getConnection());
        if (clientContext != null) {
            clientContext.handleClose(-1, null);
        }
        return ctx.getStopAction();
    }
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -22,13 +22,15 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 *      Portions copyright 2011-2012 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,16 +39,22 @@
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import javax.net.ssl.SSLContext;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.DigestMD5SASLBindRequest;
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.Responses;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.schema.Schema;
import org.forgerock.opendj.ldap.schema.SchemaBuilder;
@@ -65,6 +73,10 @@
 */
@SuppressWarnings("javadoc")
public class ConnectionFactoryTestCase extends SdkTestCase {
    // Test timeout in ms for tests which need to wait for network events.
    private static final long TEST_TIMEOUT = 30L;
    private static final long TEST_TIMEOUT_MS = TEST_TIMEOUT * 1000L;
    class MyResultHandler implements ResultHandler<Connection> {
        // latch.
        private final CountDownLatch latch;
@@ -114,8 +126,8 @@
        StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
    }
    @DataProvider(name = "connectionFactories")
    public Object[][] getConnectionFactories() throws Exception {
    @DataProvider
    Object[][] connectionFactories() throws Exception {
        Object[][] factories = new Object[21][1];
        // HeartBeatConnectionFactory
@@ -429,4 +441,228 @@
        assertThat(realConnectionIsClosed[2]).isTrue();
        assertThat(realConnectionIsClosed[3]).isTrue();
    }
    private static final class CloseNotify {
        private boolean closeOnAccept;
        private boolean doBindFirst;
        private boolean useEventListener;
        private boolean sendDisconnectNotification;
        private CloseNotify(boolean closeOnAccept, boolean doBindFirst, boolean useEventListener,
                boolean sendDisconnectNotification) {
            this.closeOnAccept = closeOnAccept;
            this.doBindFirst = doBindFirst;
            this.useEventListener = useEventListener;
            this.sendDisconnectNotification = sendDisconnectNotification;
        }
        @Override
        public String toString() {
            StringBuilder builder = new StringBuilder();
            builder.append("[");
            if (closeOnAccept) {
                builder.append(" closeOnAccept");
            }
            if (doBindFirst) {
                builder.append(" doBindFirst");
            }
            if (useEventListener) {
                builder.append(" useEventListener");
            }
            if (sendDisconnectNotification) {
                builder.append(" sendDisconnectNotification");
            }
            builder.append(" ]");
            return builder.toString();
        }
    }
    @DataProvider
    Object[][] closeNotifyConfig() {
        // @formatter:off
        return new Object[][] {
            // closeOnAccept, doBindFirst, useEventListener, sendDisconnectNotification
            // Close on accept.
            { new CloseNotify(true,  false, false, false) },
            { new CloseNotify(true,  false, true,  false) },
            // Use disconnect.
            { new CloseNotify(false, false, false, false) },
            { new CloseNotify(false, false, false, true) },
            { new CloseNotify(false, false, true,  false) },
            { new CloseNotify(false, false, true,  true) },
            { new CloseNotify(false, true,  false, false) },
            { new CloseNotify(false, true,  false, true) },
            { new CloseNotify(false, true,  true,  false) },
            { new CloseNotify(false, true,  true,  true) },
        };
        // @formatter:on
    }
    @SuppressWarnings("unchecked")
    @Test(dataProvider = "closeNotifyConfig")
    public void testCloseNotify(final CloseNotify config) throws Exception {
        final CountDownLatch connectLatch = new CountDownLatch(1);
        final AtomicReference<LDAPClientContext> contextHolder =
                new AtomicReference<LDAPClientContext>();
        final ServerConnectionFactory<LDAPClientContext, Integer> mockServer =
                mock(ServerConnectionFactory.class);
        when(mockServer.handleAccept(any(LDAPClientContext.class))).thenAnswer(
                new Answer<ServerConnection<Integer>>() {
                    public ServerConnection<Integer> answer(InvocationOnMock invocation)
                            throws Throwable {
                        // Allow the context to be accessed from outside the mock.
                        contextHolder.set((LDAPClientContext) invocation.getArguments()[0]);
                        connectLatch.countDown(); /* is this needed? */
                        if (config.closeOnAccept) {
                            throw newErrorResult(ResultCode.UNAVAILABLE);
                        } else {
                            // Return a mock connection which always succeeds for binds.
                            ServerConnection<Integer> mockConnection = mock(ServerConnection.class);
                            doAnswer(new Answer<Void>() {
                                @Override
                                public Void answer(InvocationOnMock invocation) throws Throwable {
                                    ResultHandler<? super BindResult> resultHandler =
                                            (ResultHandler<? super BindResult>) invocation
                                                    .getArguments()[4];
                                    resultHandler.handleResult(Responses
                                            .newBindResult(ResultCode.SUCCESS));
                                    return null;
                                }
                            }).when(mockConnection).handleBind(anyInt(), anyInt(),
                                    any(BindRequest.class), any(IntermediateResponseHandler.class),
                                    any(ResultHandler.class));
                            return mockConnection;
                        }
                    }
                });
        final int port = TestCaseUtils.findFreePort();
        LDAPListener listener = new LDAPListener(port, mockServer);
        try {
            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory("localhost", port);
            final Connection client = clientFactory.getConnection();
            connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
            MockConnectionEventListener mockListener = null;
            try {
                if (config.useEventListener) {
                    mockListener = new MockConnectionEventListener();
                    client.addConnectionEventListener(mockListener);
                }
                if (config.doBindFirst) {
                    client.bind("cn=test", "password".toCharArray());
                }
                if (!config.closeOnAccept) {
                    // Disconnect using client context.
                    LDAPClientContext context = contextHolder.get();
                    assertThat(context).isNotNull();
                    assertThat(context.isClosed()).isFalse();
                    if (config.sendDisconnectNotification) {
                        context.disconnect(ResultCode.BUSY, "busy");
                    } else {
                        context.disconnect();
                    }
                    assertThat(context.isClosed()).isTrue();
                }
                // Block until remote close is signalled.
                if (mockListener != null) {
                    // Block using listener.
                    mockListener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
                    assertThat(mockListener.getInvocationCount()).isEqualTo(1);
                    assertThat(mockListener.isDisconnectNotification()).isEqualTo(
                            config.sendDisconnectNotification);
                    assertThat(mockListener.getError()).isNotNull();
                } else {
                    // Block by spinning on isValid.
                    waitForCondition(new Callable<Boolean>() {
                        @Override
                        public Boolean call() throws Exception {
                            return !client.isValid();
                        }
                    });
                }
                assertThat(client.isValid()).isFalse();
                assertThat(client.isClosed()).isFalse();
            } finally {
                client.close();
            }
            // Check state after remote close and local close.
            assertThat(client.isValid()).isFalse();
            assertThat(client.isClosed()).isTrue();
            if (mockListener != null) {
                mockListener.awaitClose(TEST_TIMEOUT, TimeUnit.SECONDS);
                assertThat(mockListener.getInvocationCount()).isEqualTo(2);
            }
        } finally {
            listener.close();
        }
    }
    @SuppressWarnings("unchecked")
    @Test
    public void testUnsolicitedNotifications() throws Exception {
        final CountDownLatch connectLatch = new CountDownLatch(1);
        final AtomicReference<LDAPClientContext> contextHolder =
                new AtomicReference<LDAPClientContext>();
        final ServerConnectionFactory<LDAPClientContext, Integer> mockServer =
                mock(ServerConnectionFactory.class);
        when(mockServer.handleAccept(any(LDAPClientContext.class))).thenAnswer(
                new Answer<ServerConnection<Integer>>() {
                    public ServerConnection<Integer> answer(InvocationOnMock invocation)
                            throws Throwable {
                        // Allow the context to be accessed from outside the mock.
                        contextHolder.set((LDAPClientContext) invocation.getArguments()[0]);
                        connectLatch.countDown(); /* is this needed? */
                        return mock(ServerConnection.class);
                    }
                });
        final int port = TestCaseUtils.findFreePort();
        LDAPListener listener = new LDAPListener(port, mockServer);
        try {
            LDAPConnectionFactory clientFactory = new LDAPConnectionFactory("localhost", port);
            final Connection client = clientFactory.getConnection();
            connectLatch.await(TEST_TIMEOUT, TimeUnit.SECONDS);
            try {
                MockConnectionEventListener mockListener = new MockConnectionEventListener();
                client.addConnectionEventListener(mockListener);
                // Send notification.
                LDAPClientContext context = contextHolder.get();
                assertThat(context).isNotNull();
                context.sendUnsolicitedNotification(Responses.newGenericExtendedResult(
                        ResultCode.OTHER).setOID("1.2.3.4"));
                assertThat(context.isClosed()).isFalse();
                // Block using listener.
                mockListener.awaitNotification(TEST_TIMEOUT, TimeUnit.SECONDS);
                assertThat(mockListener.getInvocationCount()).isEqualTo(1);
                assertThat(mockListener.getNotification()).isNotNull();
                assertThat(mockListener.getNotification().getResultCode()).isEqualTo(
                        ResultCode.OTHER);
                assertThat(mockListener.getNotification().getOID()).isEqualTo("1.2.3.4");
                assertThat(client.isValid()).isTrue();
                assertThat(client.isClosed()).isFalse();
            } finally {
                client.close();
            }
        } finally {
            listener.close();
        }
    }
    private void waitForCondition(Callable<Boolean> condition) throws Exception {
        long timeout = System.currentTimeMillis() + TEST_TIMEOUT_MS;
        while (!condition.call()) {
            Thread.yield();
            if (System.currentTimeMillis() > timeout) {
                throw new TimeoutException("Test timed out after " + TEST_TIMEOUT + " seconds");
            }
        }
    }
}
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -40,7 +40,6 @@
import org.forgerock.opendj.ldap.requests.ExtendedRequest;
import org.forgerock.opendj.ldap.requests.ModifyDNRequest;
import org.forgerock.opendj.ldap.requests.ModifyRequest;
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
@@ -60,30 +59,6 @@
 */
public class LDAPListenerTestCase extends SdkTestCase {
    private static class MockConnectionEventListener implements ConnectionEventListener {
        final CountDownLatch closeLatch = new CountDownLatch(1);
        String errorMessage = null;
        @Override
        public void handleConnectionClosed() {
            errorMessage = "Unexpected call to handleConnectionClosed";
            closeLatch.countDown();
        }
        @Override
        public void handleConnectionError(final boolean isDisconnectNotification,
                final ErrorResultException error) {
            errorMessage = "Unexpected call to handleConnectionError";
            closeLatch.countDown();
        }
        @Override
        public void handleUnsolicitedNotification(final ExtendedResult notification) {
            errorMessage = "Unexpected call to handleUnsolicitedNotification";
            closeLatch.countDown();
        }
    }
    private static class MockServerConnection implements ServerConnection<Integer> {
        volatile LDAPClientContext context = null;
        final CountDownLatch isConnected = new CountDownLatch(1);
@@ -256,180 +231,6 @@
    }
    /**
     * Tests connection event listener.
     *
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test
    public void testConnectionEventListenerClose() throws Exception {
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener("localhost", TestCaseUtils.findFreePort(),
                        onlineServerConnectionFactory);
        final Connection connection;
        try {
            // Connect and bind.
            connection =
                    new LDAPConnectionFactory(onlineServerListener.getSocketAddress())
                            .getConnection();
            final MockConnectionEventListener listener = new MockConnectionEventListener() {
                @Override
                public void handleConnectionClosed() {
                    closeLatch.countDown();
                }
            };
            connection.addConnectionEventListener(listener);
            Assert.assertEquals(listener.closeLatch.getCount(), 1);
            connection.close();
            listener.closeLatch.await();
            Assert.assertNull(listener.errorMessage);
        } finally {
            onlineServerListener.close();
        }
    }
    /**
     * Tests connection event listener.
     *
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test(enabled = false)
    public void testConnectionEventListenerDisconnect() throws Exception {
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener("localhost", TestCaseUtils.findFreePort(),
                        onlineServerConnectionFactory);
        final Connection connection;
        try {
            // Connect and bind.
            connection =
                    new LDAPConnectionFactory(onlineServerListener.getSocketAddress())
                            .getConnection();
            final MockConnectionEventListener listener = new MockConnectionEventListener() {
                @Override
                public void handleConnectionError(final boolean isDisconnectNotification,
                        final ErrorResultException error) {
                    if (isDisconnectNotification) {
                        errorMessage = "Unexpected disconnect notification";
                    }
                    closeLatch.countDown();
                }
            };
            connection.addConnectionEventListener(listener);
            Assert.assertEquals(listener.closeLatch.getCount(), 1);
            Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS));
            onlineServerConnection.context.disconnect();
            listener.closeLatch.await();
            Assert.assertNull(listener.errorMessage);
            connection.close();
        } finally {
            onlineServerListener.close();
        }
    }
    /**
     * Tests connection event listener.
     *
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test
    public void testConnectionEventListenerDisconnectNotification() throws Exception {
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener("localhost", TestCaseUtils.findFreePort(),
                        onlineServerConnectionFactory);
        final Connection connection;
        try {
            // Connect and bind.
            connection =
                    new LDAPConnectionFactory(onlineServerListener.getSocketAddress())
                            .getConnection();
            final MockConnectionEventListener listener = new MockConnectionEventListener() {
                @Override
                public void handleConnectionError(final boolean isDisconnectNotification,
                        final ErrorResultException error) {
                    if (!isDisconnectNotification
                            || !error.getResult().getResultCode().equals(ResultCode.BUSY)
                            || !error.getResult().getDiagnosticMessage().equals("test")) {
                        errorMessage = "Missing disconnect notification: " + error;
                    }
                    closeLatch.countDown();
                }
            };
            connection.addConnectionEventListener(listener);
            Assert.assertEquals(listener.closeLatch.getCount(), 1);
            Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS));
            onlineServerConnection.context.disconnect(ResultCode.BUSY, "test");
            listener.closeLatch.await();
            Assert.assertNull(listener.errorMessage);
            connection.close();
        } finally {
            onlineServerListener.close();
        }
    }
    /**
     * Tests connection event listener.
     *
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test
    public void testConnectionEventListenerUnbind() throws Exception {
        final MockServerConnection onlineServerConnection = new MockServerConnection();
        final MockServerConnectionFactory onlineServerConnectionFactory =
                new MockServerConnectionFactory(onlineServerConnection);
        final LDAPListener onlineServerListener =
                new LDAPListener("localhost", TestCaseUtils.findFreePort(),
                        onlineServerConnectionFactory);
        final Connection connection;
        try {
            // Connect and bind.
            connection =
                    new LDAPConnectionFactory(onlineServerListener.getSocketAddress())
                            .getConnection();
            final MockConnectionEventListener listener = new MockConnectionEventListener() {
                @Override
                public void handleConnectionClosed() {
                    closeLatch.countDown();
                }
            };
            connection.addConnectionEventListener(listener);
            Assert.assertEquals(listener.closeLatch.getCount(), 1);
            connection.close(Requests.newUnbindRequest(), "called from unit test");
            listener.closeLatch.await();
            Assert.assertNull(listener.errorMessage);
        } finally {
            onlineServerListener.close();
        }
    }
    /**
     * Tests basic LDAP listener functionality.
     *
     * @throws Exception
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/MockConnectionEventListener.java
New file
@@ -0,0 +1,112 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2012 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
/**
 * A connection event listener which records events and signals when it has been
 * notified.
 */
final class MockConnectionEventListener implements ConnectionEventListener {
    private final CountDownLatch closedLatch = new CountDownLatch(1);
    private final CountDownLatch errorLatch = new CountDownLatch(1);
    private final CountDownLatch notificationLatch = new CountDownLatch(1);
    private Boolean isDisconnectNotification = null;
    private ErrorResultException error = null;
    private ExtendedResult notification = null;
    private final AtomicInteger invocationCount = new AtomicInteger();
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleConnectionClosed() {
        invocationCount.incrementAndGet();
        closedLatch.countDown();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleConnectionError(boolean isDisconnectNotification, ErrorResultException error) {
        this.isDisconnectNotification = isDisconnectNotification;
        this.error = error;
        invocationCount.incrementAndGet();
        errorLatch.countDown();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleUnsolicitedNotification(ExtendedResult notification) {
        this.notification = notification;
        invocationCount.incrementAndGet();
        notificationLatch.countDown();
    }
    void awaitClose(long timeout, TimeUnit unit) {
        await(closedLatch, timeout, unit);
    }
    void awaitError(long timeout, TimeUnit unit) {
        await(errorLatch, timeout, unit);
    }
    void awaitNotification(long timeout, TimeUnit unit) {
        await(notificationLatch, timeout, unit);
    }
    Boolean isDisconnectNotification() {
        return isDisconnectNotification;
    }
    ErrorResultException getError() {
        return error;
    }
    ExtendedResult getNotification() {
        return notification;
    }
    private void await(CountDownLatch latch, long timeout, TimeUnit unit) {
        try {
            latch.await(timeout, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    int getInvocationCount() {
        return invocationCount.get();
    }
}