mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
12.26.2013 09f227d9000f4cd30d19191f514bbbd55dc4d40a
Additional fixes OPENDJ-1247: Client side timeouts do not cancel bind or startTLS requests properly

* fix unit tests
* revert functionality which was abandoning expired requests as it introduces a potential, albeit very unlikely, race condition
* fix a bug which meant that the timeout checker could fail to enforce timeouts for the first LDAP connection
* minor improvements: only register connections which have a non-zero timeout; remove duplicate requestID field from LDAP future implementation.
5 files modified
489 ■■■■ changed files
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java 16 ●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java 76 ●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java 3 ●●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java 45 ●●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java 349 ●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
@@ -48,7 +48,6 @@
        extends AsynchronousFutureResult<S, ResultHandler<? super S>>
        implements IntermediateResponseHandler {
    private final Connection connection;
    private final int requestID;
    private IntermediateResponseHandler intermediateResponseHandler;
    private volatile long timestamp;
@@ -56,21 +55,12 @@
        final ResultHandler<? super S> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler,
        final Connection connection) {
        super(resultHandler);
        this.requestID = requestID;
        super(resultHandler, requestID);
        this.connection = connection;
        this.intermediateResponseHandler = intermediateResponseHandler;
        this.timestamp = System.currentTimeMillis();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final int getRequestID() {
        return requestID;
    }
    /** {@inheritDoc} */
    @Override
    public final boolean handleIntermediateResponse(final IntermediateResponse response) {
@@ -99,7 +89,7 @@
         * future. There is no risk of an infinite loop because the state of
         * this future has already been changed.
         */
        connection.abandonAsync(Requests.newAbandonRequest(requestID));
        connection.abandonAsync(Requests.newAbandonRequest(getRequestID()));
        return null;
    }
@@ -124,7 +114,7 @@
    @Override
    protected void toString(final StringBuilder sb) {
        sb.append(" requestID = ");
        sb.append(requestID);
        sb.append(getRequestID());
        sb.append(" timestamp = ");
        sb.append(timestamp);
        super.toString(sb);
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -30,7 +30,6 @@
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static org.forgerock.opendj.ldap.CoreMessages.*;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.requests.Requests.newAbandonRequest;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -575,65 +574,62 @@
    long cancelExpiredRequests(final long currentTime) {
        final long timeout = factory.getLDAPOptions().getTimeout(TimeUnit.MILLISECONDS);
        if (timeout <= 0) {
            return 0;
        }
        long delay = timeout;
        if (timeout > 0) {
            for (final int requestID : pendingRequests.keySet()) {
                final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
                if (future != null && future.checkForTimeout()) {
        for (final AbstractLDAPFutureResultImpl<?> future : pendingRequests.values()) {
            if (future == null || !future.checkForTimeout()) {
                continue;
            }
                    final long diff = (future.getTimestamp() + timeout) - currentTime;
                    if (diff <= 0 && pendingRequests.remove(requestID) != null) {
                        if (future.isBindOrStartTLS()) {
            if (diff > 0) {
                // Will expire in diff milliseconds.
                delay = Math.min(delay, diff);
            } else if (pendingRequests.remove(future.getRequestID()) == null) {
                // Result arrived at the same time.
                continue;
            } else if (future.isBindOrStartTLS()) {
                            /*
                             * No other operations can be performed while a bind
                             * or StartTLS request is active, so we cannot time
                             * out the request. We therefore have a choice:
                             * either ignore timeouts for these operations, or
                             * enforce them but doing so requires invalidating
                             * the connection. We'll do the latter, since
                             * ignoring timeouts could cause the application to
                             * hang.
                 * No other operations can be performed while a bind or StartTLS
                 * request is active, so we cannot time out the request. We
                 * therefore have a choice: either ignore timeouts for these
                 * operations, or enforce them but doing so requires
                 * invalidating the connection. We'll do the latter, since
                 * ignoring timeouts could cause the application to hang.
                             */
                            DEBUG_LOG.fine("Failing bind or StartTLS request due to timeout "
                                    + "(connection will be invalidated): " + future);
                            final Result result =
                                    Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
                                            .setDiagnosticMessage(
                                                    LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT
                                                            .get(timeout).toString());
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT.get(timeout)
                                        .toString());
                            future.adaptErrorResult(result);
                            // Fail the connection.
                            final Result errorResult =
                                    Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
                                            .setDiagnosticMessage(
                                                    LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT
                                                            .get(timeout).toString());
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT.get(timeout)
                                        .toString());
                            connectionErrorOccurred(errorResult);
                        } else {
                            DEBUG_LOG.fine("Failing request due to timeout: " + future);
                            final Result result =
                                    Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
                                            .setDiagnosticMessage(
                                                    LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout)
                                                            .toString());
                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
                                LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout).toString());
                            future.adaptErrorResult(result);
                            /*
                             * FIXME: there's a potential race condition here if
                             * a bind or startTLS is initiated just after we
                             * check the boolean. It seems potentially even more
                             * dangerous to send the abandon request while
                             * holding the state lock, since a blocking write
                 * FIXME: there's a potential race condition here if a bind or
                 * startTLS is initiated just after we check the boolean. It
                 * seems potentially even more dangerous to send the abandon
                 * request while holding the state lock, since a blocking write
                             * could hang the application.
                             */
                            if (!bindOrStartTLSInProgress.get()) {
                                sendAbandonRequest(newAbandonRequest(requestID));
                            }
                        }
                    } else {
                        delay = Math.min(delay, diff);
                    }
                }
//                if (!bindOrStartTLSInProgress.get()) {
//                    sendAbandonRequest(newAbandonRequest(future.getRequestID()));
//                }
            }
        }
        return delay;
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -34,6 +34,7 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -167,7 +168,9 @@
            connection.configureBlocking(true);
            final LDAPConnection ldapConnection =
                    new LDAPConnection(connection, LDAPConnectionFactoryImpl.this);
            if (options.getTimeout(TimeUnit.MILLISECONDS) > 0) {
            timeoutChecker.get().addConnection(ldapConnection);
            }
            clientFilter.registerConnection(connection, ldapConnection);
            return ldapConnection;
        }
opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
@@ -59,7 +59,7 @@
    /**
     * Condition variable used for coordinating the timeout thread.
     */
    private final Object available = new Object();
    private final Object stateLock = new Object();
    /**
     * The connection set must be safe from CMEs because expiring requests can
@@ -73,6 +73,12 @@
     */
    private volatile boolean shutdownRequested = false;
    /**
     * Used for signalling that new connections have been added while performing
     * timeout processing.
     */
    private volatile boolean pendingNewConnections = false;
    private TimeoutChecker() {
        final Thread checkerThread = new Thread("OpenDJ LDAP SDK Connection Timeout Checker") {
            @Override
@@ -81,7 +87,13 @@
                while (!shutdownRequested) {
                    final long currentTime = System.currentTimeMillis();
                    long delay = 0;
                    /*
                     * New connections may be added during iteration and may be
                     * missed resulting in the timeout checker waiting longer
                     * than it should, or even forever (e.g. if the new
                     * connection is the first).
                     */
                    pendingNewConnections = false;
                    for (final LDAPConnection connection : connections) {
                        if (DEBUG_LOG.isLoggable(Level.FINER)) {
                            DEBUG_LOG.finer("Checking connection " + connection + " delay = "
@@ -100,11 +112,18 @@
                    }
                    try {
                        synchronized (available) {
                            if (delay <= 0) {
                                available.wait();
                        synchronized (stateLock) {
                            if (shutdownRequested || pendingNewConnections) {
                                // Loop immediately.
                                pendingNewConnections = false;
                            } else if (delay <= 0) {
                                /*
                                 * If there is at least one connection then the
                                 * delay should be > 0.
                                 */
                                stateLock.wait();
                            } else {
                                available.wait(delay);
                                stateLock.wait(delay);
                            }
                        }
                    } catch (final InterruptedException e) {
@@ -120,7 +139,10 @@
    void addConnection(final LDAPConnection connection) {
        connections.add(connection);
        signal();
        synchronized (stateLock) {
            pendingNewConnections = true;
            stateLock.notifyAll();
        }
    }
    void removeConnection(final LDAPConnection connection) {
@@ -129,14 +151,9 @@
    }
    private void shutdown() {
        synchronized (stateLock) {
        shutdownRequested = true;
        signal();
    }
    // Wakes the timeout checker if it is sleeping.
    private void signal() {
        synchronized (available) {
            available.notifyAll();
            stateLock.notifyAll();
        }
    }
}
opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java
@@ -25,16 +25,23 @@
 */
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.closeSilently;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
@@ -43,10 +50,10 @@
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.Stubber;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
/**
@@ -54,9 +61,40 @@
 */
@SuppressWarnings({ "javadoc", "unchecked" })
public class LDAPConnectionFactoryTestCase extends SdkTestCase {
    // Manual testing has gone up to 10000 iterations.
    private static final int ITERATIONS = 100;
    // Test timeout for tests which need to wait for network events.
    private static final long TEST_TIMEOUT = 30L;
    /*
     * It is usually quite a bad code smell to share state between unit tests.
     * However, in this case we want to re-use the same factories and listeners
     * in order to avoid shutting down and restarting the transport for each
     * iteration.
     */
    private final AtomicReference<LDAPClientContext> context =
            new AtomicReference<LDAPClientContext>();
    private volatile ServerConnection<Integer> serverConnection;
    private final LDAPListener server = createServer();
    private final ConnectionFactory factory = new LDAPConnectionFactory(server.getSocketAddress(),
            new LDAPOptions().setTimeout(1, TimeUnit.MILLISECONDS));
    private final ConnectionFactory pool = Connections.newFixedConnectionPool(factory, 10);
    private final Semaphore connectLatch = new Semaphore(0);
    private final Semaphore abandonLatch = new Semaphore(0);
    private final Semaphore bindLatch = new Semaphore(0);
    private final Semaphore searchLatch = new Semaphore(0);
    private final Semaphore closeLatch = new Semaphore(0);
    @AfterClass
    public void tearDown() {
        pool.close();
        factory.close();
        server.close();
    }
    /**
     * Unit test for OPENDJ-1247: a locally timed out bind request will leave a
     * connection in an invalid state since a bind (or startTLS) is in progress
@@ -66,78 +104,45 @@
     */
    @Test
    public void testClientSideTimeoutForBindRequest() throws Exception {
        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
        final Semaphore latch = new Semaphore(0);
        resetState();
        registerBindEvent();
        registerCloseEvent();
        // The server connection should receive a bind, but no abandon request.
        final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
        release(latch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
                any(BindRequest.class), any(IntermediateResponseHandler.class),
                any(ResultHandler.class));
        release(latch).when(serverConnection).handleConnectionClosed(any(Integer.class),
                any(UnbindRequest.class));
        final LDAPListener server = createServer(latch, context, serverConnection);
        final ConnectionFactory factory =
                new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions().setTimeout(
                        1, TimeUnit.MILLISECONDS));
        Connection connection = null;
        for (int i = 0; i < ITERATIONS; i++) {
            final Connection connection = factory.getConnection();
        try {
            // Connect to the server.
            connection = factory.getConnection();
            // Wait for the server to accept the connection.
            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
            /*
             * A bind request timeout should cause the connection to fail, so
             * ensure that event listeners are fired.
             */
            final ConnectionEventListener listener = mock(ConnectionEventListener.class);
                waitForConnect();
                final MockConnectionEventListener listener = new MockConnectionEventListener();
            connection.addConnectionEventListener(listener);
            final ResultHandler<BindResult> handler = mock(ResultHandler.class);
            final FutureResult<BindResult> future =
                    connection.bindAsync(newSimpleBindRequest(), null, handler);
            // Wait for the server to receive the bind request.
            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
                waitForBind();
            // Wait for the request to timeout.
            try {
                future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
                fail("The bind request succeeded unexpectedly");
            } catch (TimeoutResultException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
                    verifyResultCodeIsClientSideTimeout(e);
                verify(handler).handleErrorResult(same(e));
                // The connection should no longer be valid.
                ArgumentCaptor<ErrorResultException> capturedError =
                        ArgumentCaptor.forClass(ErrorResultException.class);
                verify(listener).handleConnectionError(eq(false), capturedError.capture());
                assertThat(capturedError.getValue().getResult().getResultCode()).isEqualTo(
                        ResultCode.CLIENT_SIDE_TIMEOUT);
                assertThat(connection.isValid()).isFalse();
                connection.close();
                // Wait for the server to receive the close request.
                assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
                /*
                 * Check that the only interactions were the bind and the close
                 * and specifically there was no abandon request.
                     * The connection should no longer be valid, the event
                     * listener should have been notified, but no abandon should
                     * have been sent.
                 */
                verify(serverConnection).handleBind(any(Integer.class), eq(3),
                        any(BindRequest.class), any(IntermediateResponseHandler.class),
                        any(ResultHandler.class));
                verify(serverConnection).handleConnectionClosed(any(Integer.class),
                        any(UnbindRequest.class));
                verifyNoMoreInteractions(serverConnection);
                    listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
                    assertThat(connection.isValid()).isFalse();
                    verifyResultCodeIsClientSideTimeout(listener.getError());
                    connection.close();
                    waitForClose();
                    verifyNoAbandonSent();
            }
        } finally {
            closeSilently(connection);
            factory.close();
            server.close();
                connection.close();
            }
        }
    }
@@ -148,82 +153,48 @@
     */
    @Test
    public void testClientSideTimeoutForBindRequestInConnectionPool() throws Exception {
        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
        final Semaphore latch = new Semaphore(0);
        resetState();
        registerBindEvent();
        registerCloseEvent();
        // The server connection should receive a bind, but no abandon request.
        final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
        release(latch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
                any(BindRequest.class), any(IntermediateResponseHandler.class),
                any(ResultHandler.class));
        release(latch).when(serverConnection).handleConnectionClosed(any(Integer.class),
                any(UnbindRequest.class));
        final LDAPListener server = createServer(latch, context, serverConnection);
        final ConnectionFactory factory =
                Connections.newFixedConnectionPool(
                        new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions()
                                .setTimeout(1, TimeUnit.MILLISECONDS)), 10);
        Connection connection = null;
        for (int i = 0; i < ITERATIONS; i++) {
            final Connection connection = pool.getConnection();
        try {
            // Get pooled connection to the server.
            connection = factory.getConnection();
            // Wait for the server to accept the connection.
            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
            /*
             * Sanity check: close the connection and reopen. There should be no
             * interactions with the server due to the pool.
             */
            connection.close();
            connection = factory.getConnection();
            verifyNoMoreInteractions(serverConnection);
                waitForConnect();
                final MockConnectionEventListener listener = new MockConnectionEventListener();
                connection.addConnectionEventListener(listener);
            // Now bind with timeout.
            final ResultHandler<BindResult> handler = mock(ResultHandler.class);
            final FutureResult<BindResult> future =
                    connection.bindAsync(newSimpleBindRequest(), null, handler);
            // Wait for the server to receive the bind request.
            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
                waitForBind();
            // Wait for the request to timeout.
            try {
                future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
                    future.get(5, TimeUnit.SECONDS);
                fail("The bind request succeeded unexpectedly");
                } catch (TimeoutException e) {
                    fail("The bind request future get timed out");
            } catch (TimeoutResultException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
                    verifyResultCodeIsClientSideTimeout(e);
                verify(handler).handleErrorResult(same(e));
                // The connection should no longer be valid.
                assertThat(connection.isValid()).isFalse();
                connection.close();
                // Wait for the server to receive the close request.
                assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
                /*
                 * Check that the only interactions were the bind and the close
                 * and specifically there was no abandon request.
                     * The connection should no longer be valid, the event
                     * listener should have been notified, but no abandon should
                     * have been sent.
                 */
                verify(serverConnection).handleBind(any(Integer.class), eq(3),
                        any(BindRequest.class), any(IntermediateResponseHandler.class),
                        any(ResultHandler.class));
                verify(serverConnection).handleConnectionClosed(any(Integer.class),
                        any(UnbindRequest.class));
                verifyNoMoreInteractions(serverConnection);
                // Now get another connection. This time we should reconnect to the server.
                connection = factory.getConnection();
                // Wait for the server to accept the connection.
                assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
                    listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
                    assertThat(connection.isValid()).isFalse();
                    verifyResultCodeIsClientSideTimeout(listener.getError());
                    connection.close();
                    waitForClose();
                    verifyNoAbandonSent();
            }
        } finally {
            closeSilently(connection);
            factory.close();
            server.close();
                connection.close();
            }
        }
    }
@@ -235,62 +206,43 @@
     */
    @Test
    public void testClientSideTimeoutForSearchRequest() throws Exception {
        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
        final Semaphore latch = new Semaphore(0);
        resetState();
        registerSearchEvent();
        registerAbandonEvent();
        // The server connection should receive a search and then an abandon.
        final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
        release(latch).when(serverConnection).handleSearch(any(Integer.class),
                any(SearchRequest.class), any(IntermediateResponseHandler.class),
                any(SearchResultHandler.class));
        release(latch).when(serverConnection).handleAbandon(any(Integer.class),
                any(AbandonRequest.class));
        final LDAPListener server = createServer(latch, context, serverConnection);
        final ConnectionFactory factory =
                new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions().setTimeout(
                        1, TimeUnit.MILLISECONDS));
        Connection connection = null;
        for (int i = 0; i < ITERATIONS; i++) {
            final Connection connection = factory.getConnection();
        try {
            // Connect to the server.
            connection = factory.getConnection();
            // Wait for the server to accept the connection.
            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
            /*
             * A search request timeout should not cause the connection to fail,
             * so ensure that event listeners are not fired.
             */
                waitForConnect();
            final ConnectionEventListener listener = mock(ConnectionEventListener.class);
            connection.addConnectionEventListener(listener);
            final ResultHandler<SearchResultEntry> handler = mock(ResultHandler.class);
            final FutureResult<SearchResultEntry> future =
                    connection.readEntryAsync(DN.valueOf("cn=test"), null, handler);
            // Wait for the server to receive the search request.
            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
                waitForSearch();
            // Wait for the request to timeout.
            try {
                future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
                fail("The search request succeeded unexpectedly");
            } catch (TimeoutResultException e) {
                assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
                    verifyResultCodeIsClientSideTimeout(e);
                verify(handler).handleErrorResult(same(e));
                // The connection should still be valid.
                verifyZeroInteractions(listener);
                assertThat(connection.isValid()).isTrue();
                    verifyZeroInteractions(listener);
                // Wait for the server to receive the abandon request.
                assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
                    /*
                     * FIXME: The search should have been abandoned (see comment
                     * in LDAPConnection for explanation).
                     */
                    // waitForAbandon();
            }
        } finally {
            closeSilently(connection);
            factory.close();
            server.close();
                connection.close();
            }
        }
    }
@@ -300,18 +252,12 @@
     */
    @Test
    public void testResourceManagement() throws Exception {
        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
        final Semaphore latch = new Semaphore(0);
        final LDAPListener server = createServer(latch, context, mock(ServerConnection.class));
        final ConnectionFactory factory = new LDAPConnectionFactory(server.getSocketAddress());
        try {
            for (int i = 0; i < 100; i++) {
                // Connect to the server.
        resetState();
        for (int i = 0; i < ITERATIONS; i++) {
                final Connection connection = factory.getConnection();
                try {
                    // Wait for the server to accept the connection.
                    assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
                waitForConnect();
                    final MockConnectionEventListener listener = new MockConnectionEventListener();
                    connection.addConnectionEventListener(listener);
@@ -324,34 +270,99 @@
                    connection.close();
                }
            }
        } finally {
            factory.close();
            server.close();
        }
    }
    private LDAPListener createServer(final Semaphore latch,
            final AtomicReference<LDAPClientContext> context,
            final ServerConnection<Integer> serverConnection) throws IOException {
    private LDAPListener createServer() {
        try {
        return new LDAPListener(findFreeSocketAddress(),
                new ServerConnectionFactory<LDAPClientContext, Integer>() {
                    @Override
                    public ServerConnection<Integer> handleAccept(
                            final LDAPClientContext clientContext) throws ErrorResultException {
                        context.set(clientContext);
                        latch.release();
                            connectLatch.release();
                        return serverConnection;
                    }
                });
        } catch (IOException e) {
            fail("Unable to create LDAP listener", e);
            return null;
        }
    }
    private Stubber release(final Semaphore latch) {
    private Stubber notifyEvent(final Semaphore latch) {
        return doAnswer(new Answer<Void>() {
            @Override
            public Void answer(InvocationOnMock invocation) throws Throwable {
            public Void answer(InvocationOnMock invocation) {
                latch.release();
                return null;
            }
        });
    }
    private void registerAbandonEvent() {
        notifyEvent(abandonLatch).when(serverConnection).handleAbandon(any(Integer.class),
                any(AbandonRequest.class));
    }
    private void registerBindEvent() {
        notifyEvent(bindLatch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
                any(BindRequest.class), any(IntermediateResponseHandler.class),
                any(ResultHandler.class));
    }
    private void registerCloseEvent() {
        notifyEvent(closeLatch).when(serverConnection).handleConnectionClosed(any(Integer.class),
                any(UnbindRequest.class));
    }
    private void registerSearchEvent() {
        notifyEvent(searchLatch).when(serverConnection).handleSearch(any(Integer.class),
                any(SearchRequest.class), any(IntermediateResponseHandler.class),
                any(SearchResultHandler.class));
    }
    private void resetState() {
        connectLatch.drainPermits();
        abandonLatch.drainPermits();
        bindLatch.drainPermits();
        searchLatch.drainPermits();
        closeLatch.drainPermits();
        context.set(null);
        serverConnection = mock(ServerConnection.class);
    }
    private void verifyNoAbandonSent() {
        verify(serverConnection, never()).handleAbandon(any(Integer.class),
                any(AbandonRequest.class));
    }
    private void verifyResultCodeIsClientSideTimeout(ErrorResultException error) {
        assertThat(error.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
    }
    @SuppressWarnings("unused")
    private void waitForAbandon() throws InterruptedException {
        waitForEvent(abandonLatch);
    }
    private void waitForBind() throws InterruptedException {
        waitForEvent(bindLatch);
    }
    private void waitForClose() throws InterruptedException {
        waitForEvent(closeLatch);
    }
    private void waitForConnect() throws InterruptedException {
        waitForEvent(connectLatch);
    }
    private void waitForEvent(final Semaphore latch) throws InterruptedException {
        assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
    }
    private void waitForSearch() throws InterruptedException {
        waitForEvent(searchLatch);
    }
}