From 7253f3aa5bd089168730b78d13db641ec4d1c6a2 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 12 Dec 2013 01:26:35 +0000
Subject: [PATCH] Additional fixes OPENDJ-1247: Client side timeouts do not cancel bind or startTLS requests properly

---
 opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java     |    5 
 opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java                |   47 ++-
 opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java  |   16 -
 opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java                |  110 ++++-----
 opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java |  471 +++++++++++++++++++++--------------------
 5 files changed, 333 insertions(+), 316 deletions(-)

diff --git a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
index a7b896c..94f602e 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
@@ -48,7 +48,6 @@
         extends AsynchronousFutureResult<S, ResultHandler<? super S>>
         implements IntermediateResponseHandler {
     private final Connection connection;
-    private final int requestID;
     private IntermediateResponseHandler intermediateResponseHandler;
     private volatile long timestamp;
 
@@ -56,21 +55,12 @@
         final ResultHandler<? super S> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler,
         final Connection connection) {
-        super(resultHandler);
-        this.requestID = requestID;
+        super(resultHandler, requestID);
         this.connection = connection;
         this.intermediateResponseHandler = intermediateResponseHandler;
         this.timestamp = System.currentTimeMillis();
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public final int getRequestID() {
-        return requestID;
-    }
-
     /** {@inheritDoc} */
     @Override
     public final boolean handleIntermediateResponse(final IntermediateResponse response) {
@@ -99,7 +89,7 @@
          * future. There is no risk of an infinite loop because the state of
          * this future has already been changed.
          */
-        connection.abandonAsync(Requests.newAbandonRequest(requestID));
+        connection.abandonAsync(Requests.newAbandonRequest(getRequestID()));
         return null;
     }
 
@@ -124,7 +114,7 @@
     @Override
     protected void toString(final StringBuilder sb) {
         sb.append(" requestID = ");
-        sb.append(requestID);
+        sb.append(getRequestID());
         sb.append(" timestamp = ");
         sb.append(timestamp);
         super.toString(sb);
diff --git a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 8829372..03bcc4d 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -30,7 +30,6 @@
 import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
 import static org.forgerock.opendj.ldap.CoreMessages.*;
 import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
-import static org.forgerock.opendj.ldap.requests.Requests.newAbandonRequest;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -575,65 +574,62 @@
 
     long cancelExpiredRequests(final long currentTime) {
         final long timeout = factory.getLDAPOptions().getTimeout(TimeUnit.MILLISECONDS);
+        if (timeout <= 0) {
+            return 0;
+        }
+
         long delay = timeout;
-        if (timeout > 0) {
-            for (final int requestID : pendingRequests.keySet()) {
-                final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
-                if (future != null && future.checkForTimeout()) {
-                    final long diff = (future.getTimestamp() + timeout) - currentTime;
-                    if (diff <= 0 && pendingRequests.remove(requestID) != null) {
-                        if (future.isBindOrStartTLS()) {
-                            /*
-                             * No other operations can be performed while a bind
-                             * or StartTLS request is active, so we cannot time
-                             * out the request. We therefore have a choice:
-                             * either ignore timeouts for these operations, or
-                             * enforce them but doing so requires invalidating
-                             * the connection. We'll do the latter, since
-                             * ignoring timeouts could cause the application to
-                             * hang.
-                             */
-                            DEBUG_LOG.fine("Failing bind or StartTLS request due to timeout "
-                                    + "(connection will be invalidated): " + future);
-                            final Result result =
-                                    Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
-                                            .setDiagnosticMessage(
-                                                    LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT
-                                                            .get(timeout).toString());
-                            future.adaptErrorResult(result);
+        for (final AbstractLDAPFutureResultImpl<?> future : pendingRequests.values()) {
+            if (future == null || !future.checkForTimeout()) {
+                continue;
+            }
+            final long diff = (future.getTimestamp() + timeout) - currentTime;
+            if (diff > 0) {
+                // Will expire in diff milliseconds.
+                delay = Math.min(delay, diff);
+            } else if (pendingRequests.remove(future.getRequestID()) == null) {
+                // Result arrived at the same time.
+                continue;
+            } else if (future.isBindOrStartTLS()) {
+                /*
+                 * No other operations can be performed while a bind or StartTLS
+                 * request is active, so we cannot time out the request. We
+                 * therefore have a choice: either ignore timeouts for these
+                 * operations, or enforce them but doing so requires
+                 * invalidating the connection. We'll do the latter, since
+                 * ignoring timeouts could cause the application to hang.
+                 */
+                DEBUG_LOG.fine("Failing bind or StartTLS request due to timeout "
+                        + "(connection will be invalidated): " + future);
+                final Result result =
+                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
+                                LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT.get(timeout)
+                                        .toString());
+                future.adaptErrorResult(result);
 
-                            // Fail the connection.
-                            final Result errorResult =
-                                    Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
-                                            .setDiagnosticMessage(
-                                                    LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT
-                                                            .get(timeout).toString());
-                            connectionErrorOccurred(errorResult);
-                        } else {
-                            DEBUG_LOG.fine("Failing request due to timeout: " + future);
-                            final Result result =
-                                    Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
-                                            .setDiagnosticMessage(
-                                                    LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout)
-                                                            .toString());
-                            future.adaptErrorResult(result);
+                // Fail the connection.
+                final Result errorResult =
+                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
+                                LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT.get(timeout)
+                                        .toString());
+                connectionErrorOccurred(errorResult);
+            } else {
+                DEBUG_LOG.fine("Failing request due to timeout: " + future);
+                final Result result =
+                        Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
+                                LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout).toString());
+                future.adaptErrorResult(result);
 
-                            /*
-                             * FIXME: there's a potential race condition here if
-                             * a bind or startTLS is initiated just after we
-                             * check the boolean. It seems potentially even more
-                             * dangerous to send the abandon request while
-                             * holding the state lock, since a blocking write
-                             * could hang the application.
-                             */
-                            if (!bindOrStartTLSInProgress.get()) {
-                                sendAbandonRequest(newAbandonRequest(requestID));
-                            }
-                        }
-                    } else {
-                        delay = Math.min(delay, diff);
-                    }
-                }
+                /*
+                 * FIXME: there's a potential race condition here if a bind or
+                 * startTLS is initiated just after we check the boolean. It
+                 * seems potentially even more dangerous to send the abandon
+                 * request while holding the state lock, since a blocking write
+                 * could hang the application.
+                 */
+//                if (!bindOrStartTLSInProgress.get()) {
+//                    sendAbandonRequest(newAbandonRequest(future.getRequestID()));
+//                }
             }
         }
         return delay;
diff --git a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
index 91b4c93..4298c45 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -34,6 +34,7 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -167,7 +168,9 @@
             connection.configureBlocking(true);
             final LDAPConnection ldapConnection =
                     new LDAPConnection(connection, LDAPConnectionFactoryImpl.this);
-            timeoutChecker.get().addConnection(ldapConnection);
+            if (options.getTimeout(TimeUnit.MILLISECONDS) > 0) {
+                timeoutChecker.get().addConnection(ldapConnection);
+            }
             clientFilter.registerConnection(connection, ldapConnection);
             return ldapConnection;
         }
diff --git a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
index 61fb58c..b169be8 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
@@ -59,7 +59,7 @@
     /**
      * Condition variable used for coordinating the timeout thread.
      */
-    private final Object available = new Object();
+    private final Object stateLock = new Object();
 
     /**
      * The connection set must be safe from CMEs because expiring requests can
@@ -73,6 +73,12 @@
      */
     private volatile boolean shutdownRequested = false;
 
+    /**
+     * Used for signalling that new connections have been added while performing
+     * timeout processing.
+     */
+    private volatile boolean pendingNewConnections = false;
+
     private TimeoutChecker() {
         final Thread checkerThread = new Thread("OpenDJ LDAP SDK Connection Timeout Checker") {
             @Override
@@ -81,7 +87,13 @@
                 while (!shutdownRequested) {
                     final long currentTime = System.currentTimeMillis();
                     long delay = 0;
-
+                    /*
+                     * New connections may be added during iteration and may be
+                     * missed resulting in the timeout checker waiting longer
+                     * than it should, or even forever (e.g. if the new
+                     * connection is the first).
+                     */
+                    pendingNewConnections = false;
                     for (final LDAPConnection connection : connections) {
                         if (DEBUG_LOG.isLoggable(Level.FINER)) {
                             DEBUG_LOG.finer("Checking connection " + connection + " delay = "
@@ -100,11 +112,18 @@
                     }
 
                     try {
-                        synchronized (available) {
-                            if (delay <= 0) {
-                                available.wait();
+                        synchronized (stateLock) {
+                            if (shutdownRequested || pendingNewConnections) {
+                                // Loop immediately.
+                                pendingNewConnections = false;
+                            } else if (delay <= 0) {
+                                /*
+                                 * If there is at least one connection then the
+                                 * delay should be > 0.
+                                 */
+                                stateLock.wait();
                             } else {
-                                available.wait(delay);
+                                stateLock.wait(delay);
                             }
                         }
                     } catch (final InterruptedException e) {
@@ -120,7 +139,10 @@
 
     void addConnection(final LDAPConnection connection) {
         connections.add(connection);
-        signal();
+        synchronized (stateLock) {
+            pendingNewConnections = true;
+            stateLock.notifyAll();
+        }
     }
 
     void removeConnection(final LDAPConnection connection) {
@@ -129,14 +151,9 @@
     }
 
     private void shutdown() {
-        shutdownRequested = true;
-        signal();
-    }
-
-    // Wakes the timeout checker if it is sleeping.
-    private void signal() {
-        synchronized (available) {
-            available.notifyAll();
+        synchronized (stateLock) {
+            shutdownRequested = true;
+            stateLock.notifyAll();
         }
     }
 }
diff --git a/opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java b/opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java
index ed70fff..7778e46 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java
@@ -25,16 +25,23 @@
  */
 package org.forgerock.opendj.ldap;
 
-import static com.forgerock.opendj.util.StaticUtils.closeSilently;
 import static org.fest.assertions.Assertions.assertThat;
 import static org.fest.assertions.Fail.fail;
 import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
 import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 import java.io.IOException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.forgerock.opendj.ldap.requests.AbandonRequest;
@@ -43,10 +50,10 @@
 import org.forgerock.opendj.ldap.requests.UnbindRequest;
 import org.forgerock.opendj.ldap.responses.BindResult;
 import org.forgerock.opendj.ldap.responses.SearchResultEntry;
-import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.stubbing.Stubber;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
 /**
@@ -54,9 +61,40 @@
  */
 @SuppressWarnings({ "javadoc", "unchecked" })
 public class LDAPConnectionFactoryTestCase extends SdkTestCase {
+    // Manual testing has gone up to 10000 iterations.
+    private static final int ITERATIONS = 100;
+
     // Test timeout for tests which need to wait for network events.
     private static final long TEST_TIMEOUT = 30L;
 
+    /*
+     * It is usually quite a bad code smell to share state between unit tests.
+     * However, in this case we want to re-use the same factories and listeners
+     * in order to avoid shutting down and restarting the transport for each
+     * iteration.
+     */
+
+    private final AtomicReference<LDAPClientContext> context =
+            new AtomicReference<LDAPClientContext>();
+    private volatile ServerConnection<Integer> serverConnection;
+    private final LDAPListener server = createServer();
+    private final ConnectionFactory factory = new LDAPConnectionFactory(server.getSocketAddress(),
+            new LDAPOptions().setTimeout(1, TimeUnit.MILLISECONDS));
+    private final ConnectionFactory pool = Connections.newFixedConnectionPool(factory, 10);
+
+    private final Semaphore connectLatch = new Semaphore(0);
+    private final Semaphore abandonLatch = new Semaphore(0);
+    private final Semaphore bindLatch = new Semaphore(0);
+    private final Semaphore searchLatch = new Semaphore(0);
+    private final Semaphore closeLatch = new Semaphore(0);
+
+    @AfterClass
+    public void tearDown() {
+        pool.close();
+        factory.close();
+        server.close();
+    }
+
     /**
      * Unit test for OPENDJ-1247: a locally timed out bind request will leave a
      * connection in an invalid state since a bind (or startTLS) is in progress
@@ -66,78 +104,45 @@
      */
     @Test
     public void testClientSideTimeoutForBindRequest() throws Exception {
-        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
-        final Semaphore latch = new Semaphore(0);
+        resetState();
+        registerBindEvent();
+        registerCloseEvent();
 
-        // The server connection should receive a bind, but no abandon request.
-        final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
-        release(latch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
-                any(BindRequest.class), any(IntermediateResponseHandler.class),
-                any(ResultHandler.class));
-        release(latch).when(serverConnection).handleConnectionClosed(any(Integer.class),
-                any(UnbindRequest.class));
-
-        final LDAPListener server = createServer(latch, context, serverConnection);
-        final ConnectionFactory factory =
-                new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions().setTimeout(
-                        1, TimeUnit.MILLISECONDS));
-        Connection connection = null;
-        try {
-            // Connect to the server.
-            connection = factory.getConnection();
-
-            // Wait for the server to accept the connection.
-            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
-            /*
-             * A bind request timeout should cause the connection to fail, so
-             * ensure that event listeners are fired.
-             */
-            final ConnectionEventListener listener = mock(ConnectionEventListener.class);
-            connection.addConnectionEventListener(listener);
-
-            final ResultHandler<BindResult> handler = mock(ResultHandler.class);
-            final FutureResult<BindResult> future =
-                    connection.bindAsync(newSimpleBindRequest(), null, handler);
-
-            // Wait for the server to receive the bind request.
-            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
-            // Wait for the request to timeout.
+        for (int i = 0; i < ITERATIONS; i++) {
+            final Connection connection = factory.getConnection();
             try {
-                future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
-                fail("The bind request succeeded unexpectedly");
-            } catch (TimeoutResultException e) {
-                assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
-                verify(handler).handleErrorResult(same(e));
+                waitForConnect();
+                final MockConnectionEventListener listener = new MockConnectionEventListener();
+                connection.addConnectionEventListener(listener);
 
-                // The connection should no longer be valid.
-                ArgumentCaptor<ErrorResultException> capturedError =
-                        ArgumentCaptor.forClass(ErrorResultException.class);
-                verify(listener).handleConnectionError(eq(false), capturedError.capture());
-                assertThat(capturedError.getValue().getResult().getResultCode()).isEqualTo(
-                        ResultCode.CLIENT_SIDE_TIMEOUT);
-                assertThat(connection.isValid()).isFalse();
+                final ResultHandler<BindResult> handler = mock(ResultHandler.class);
+                final FutureResult<BindResult> future =
+                        connection.bindAsync(newSimpleBindRequest(), null, handler);
+                waitForBind();
+
+                // Wait for the request to timeout.
+                try {
+                    future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+                    fail("The bind request succeeded unexpectedly");
+                } catch (TimeoutResultException e) {
+                    verifyResultCodeIsClientSideTimeout(e);
+                    verify(handler).handleErrorResult(same(e));
+
+                    /*
+                     * The connection should no longer be valid, the event
+                     * listener should have been notified, but no abandon should
+                     * have been sent.
+                     */
+                    listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
+                    assertThat(connection.isValid()).isFalse();
+                    verifyResultCodeIsClientSideTimeout(listener.getError());
+                    connection.close();
+                    waitForClose();
+                    verifyNoAbandonSent();
+                }
+            } finally {
                 connection.close();
-
-                // Wait for the server to receive the close request.
-                assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
-                /*
-                 * Check that the only interactions were the bind and the close
-                 * and specifically there was no abandon request.
-                 */
-                verify(serverConnection).handleBind(any(Integer.class), eq(3),
-                        any(BindRequest.class), any(IntermediateResponseHandler.class),
-                        any(ResultHandler.class));
-                verify(serverConnection).handleConnectionClosed(any(Integer.class),
-                        any(UnbindRequest.class));
-                verifyNoMoreInteractions(serverConnection);
             }
-        } finally {
-            closeSilently(connection);
-            factory.close();
-            server.close();
         }
     }
 
@@ -148,82 +153,48 @@
      */
     @Test
     public void testClientSideTimeoutForBindRequestInConnectionPool() throws Exception {
-        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
-        final Semaphore latch = new Semaphore(0);
+        resetState();
+        registerBindEvent();
+        registerCloseEvent();
 
-        // The server connection should receive a bind, but no abandon request.
-        final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
-        release(latch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
-                any(BindRequest.class), any(IntermediateResponseHandler.class),
-                any(ResultHandler.class));
-        release(latch).when(serverConnection).handleConnectionClosed(any(Integer.class),
-                any(UnbindRequest.class));
-
-        final LDAPListener server = createServer(latch, context, serverConnection);
-        final ConnectionFactory factory =
-                Connections.newFixedConnectionPool(
-                        new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions()
-                                .setTimeout(1, TimeUnit.MILLISECONDS)), 10);
-        Connection connection = null;
-        try {
-            // Get pooled connection to the server.
-            connection = factory.getConnection();
-
-            // Wait for the server to accept the connection.
-            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
-            /*
-             * Sanity check: close the connection and reopen. There should be no
-             * interactions with the server due to the pool.
-             */
-            connection.close();
-            connection = factory.getConnection();
-            verifyNoMoreInteractions(serverConnection);
-
-            // Now bind with timeout.
-            final ResultHandler<BindResult> handler = mock(ResultHandler.class);
-            final FutureResult<BindResult> future =
-                    connection.bindAsync(newSimpleBindRequest(), null, handler);
-
-            // Wait for the server to receive the bind request.
-            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
-            // Wait for the request to timeout.
+        for (int i = 0; i < ITERATIONS; i++) {
+            final Connection connection = pool.getConnection();
             try {
-                future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
-                fail("The bind request succeeded unexpectedly");
-            } catch (TimeoutResultException e) {
-                assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
-                verify(handler).handleErrorResult(same(e));
+                waitForConnect();
+                final MockConnectionEventListener listener = new MockConnectionEventListener();
+                connection.addConnectionEventListener(listener);
 
-                // The connection should no longer be valid.
-                assertThat(connection.isValid()).isFalse();
+                // Now bind with timeout.
+                final ResultHandler<BindResult> handler = mock(ResultHandler.class);
+                final FutureResult<BindResult> future =
+                        connection.bindAsync(newSimpleBindRequest(), null, handler);
+                waitForBind();
+
+                // Wait for the request to timeout.
+                try {
+                    future.get(5, TimeUnit.SECONDS);
+                    fail("The bind request succeeded unexpectedly");
+                } catch (TimeoutException e) {
+                    fail("The bind request future get timed out");
+                } catch (TimeoutResultException e) {
+                    verifyResultCodeIsClientSideTimeout(e);
+                    verify(handler).handleErrorResult(same(e));
+
+                    /*
+                     * The connection should no longer be valid, the event
+                     * listener should have been notified, but no abandon should
+                     * have been sent.
+                     */
+                    listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
+                    assertThat(connection.isValid()).isFalse();
+                    verifyResultCodeIsClientSideTimeout(listener.getError());
+                    connection.close();
+                    waitForClose();
+                    verifyNoAbandonSent();
+                }
+            } finally {
                 connection.close();
-
-                // Wait for the server to receive the close request.
-                assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
-                /*
-                 * Check that the only interactions were the bind and the close
-                 * and specifically there was no abandon request.
-                 */
-                verify(serverConnection).handleBind(any(Integer.class), eq(3),
-                        any(BindRequest.class), any(IntermediateResponseHandler.class),
-                        any(ResultHandler.class));
-                verify(serverConnection).handleConnectionClosed(any(Integer.class),
-                        any(UnbindRequest.class));
-                verifyNoMoreInteractions(serverConnection);
-
-                // Now get another connection. This time we should reconnect to the server.
-                connection = factory.getConnection();
-
-                // Wait for the server to accept the connection.
-                assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
             }
-        } finally {
-            closeSilently(connection);
-            factory.close();
-            server.close();
         }
     }
 
@@ -235,62 +206,43 @@
      */
     @Test
     public void testClientSideTimeoutForSearchRequest() throws Exception {
-        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
-        final Semaphore latch = new Semaphore(0);
+        resetState();
+        registerSearchEvent();
+        registerAbandonEvent();
 
-        // The server connection should receive a search and then an abandon.
-        final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
-        release(latch).when(serverConnection).handleSearch(any(Integer.class),
-                any(SearchRequest.class), any(IntermediateResponseHandler.class),
-                any(SearchResultHandler.class));
-        release(latch).when(serverConnection).handleAbandon(any(Integer.class),
-                any(AbandonRequest.class));
-
-        final LDAPListener server = createServer(latch, context, serverConnection);
-        final ConnectionFactory factory =
-                new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions().setTimeout(
-                        1, TimeUnit.MILLISECONDS));
-        Connection connection = null;
-        try {
-            // Connect to the server.
-            connection = factory.getConnection();
-
-            // Wait for the server to accept the connection.
-            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
-            /*
-             * A search request timeout should not cause the connection to fail,
-             * so ensure that event listeners are not fired.
-             */
-            final ConnectionEventListener listener = mock(ConnectionEventListener.class);
-            connection.addConnectionEventListener(listener);
-
-            final ResultHandler<SearchResultEntry> handler = mock(ResultHandler.class);
-            final FutureResult<SearchResultEntry> future =
-                    connection.readEntryAsync(DN.valueOf("cn=test"), null, handler);
-
-            // Wait for the server to receive the search request.
-            assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
-            // Wait for the request to timeout.
+        for (int i = 0; i < ITERATIONS; i++) {
+            final Connection connection = factory.getConnection();
             try {
-                future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
-                fail("The search request succeeded unexpectedly");
-            } catch (TimeoutResultException e) {
-                assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
-                verify(handler).handleErrorResult(same(e));
+                waitForConnect();
+                final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+                connection.addConnectionEventListener(listener);
 
-                // The connection should still be valid.
-                verifyZeroInteractions(listener);
-                assertThat(connection.isValid()).isTrue();
+                final ResultHandler<SearchResultEntry> handler = mock(ResultHandler.class);
+                final FutureResult<SearchResultEntry> future =
+                        connection.readEntryAsync(DN.valueOf("cn=test"), null, handler);
+                waitForSearch();
 
-                // Wait for the server to receive the abandon request.
-                assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
+                // Wait for the request to timeout.
+                try {
+                    future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+                    fail("The search request succeeded unexpectedly");
+                } catch (TimeoutResultException e) {
+                    verifyResultCodeIsClientSideTimeout(e);
+                    verify(handler).handleErrorResult(same(e));
+
+                    // The connection should still be valid.
+                    assertThat(connection.isValid()).isTrue();
+                    verifyZeroInteractions(listener);
+
+                    /*
+                     * FIXME: The search should have been abandoned (see comment
+                     * in LDAPConnection for explanation).
+                     */
+                    // waitForAbandon();
+                }
+            } finally {
+                connection.close();
             }
-        } finally {
-            closeSilently(connection);
-            factory.close();
-            server.close();
         }
     }
 
@@ -300,58 +252,117 @@
      */
     @Test
     public void testResourceManagement() throws Exception {
-        final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
-        final Semaphore latch = new Semaphore(0);
-        final LDAPListener server = createServer(latch, context, mock(ServerConnection.class));
-        final ConnectionFactory factory = new LDAPConnectionFactory(server.getSocketAddress());
-        try {
-            for (int i = 0; i < 100; i++) {
-                // Connect to the server.
-                final Connection connection = factory.getConnection();
-                try {
-                    // Wait for the server to accept the connection.
-                    assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
+        resetState();
 
-                    final MockConnectionEventListener listener = new MockConnectionEventListener();
-                    connection.addConnectionEventListener(listener);
+        for (int i = 0; i < ITERATIONS; i++) {
+            final Connection connection = factory.getConnection();
+            try {
+                waitForConnect();
+                final MockConnectionEventListener listener = new MockConnectionEventListener();
+                connection.addConnectionEventListener(listener);
 
-                    // Perform remote disconnect which will trigger a client side connection error.
-                    context.get().disconnect();
+                // Perform remote disconnect which will trigger a client side connection error.
+                context.get().disconnect();
 
-                    // Wait for the error notification to reach the client.
-                    listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
-                } finally {
-                    connection.close();
-                }
+                // Wait for the error notification to reach the client.
+                listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
+            } finally {
+                connection.close();
             }
-        } finally {
-            factory.close();
-            server.close();
         }
     }
 
-    private LDAPListener createServer(final Semaphore latch,
-            final AtomicReference<LDAPClientContext> context,
-            final ServerConnection<Integer> serverConnection) throws IOException {
-        return new LDAPListener(findFreeSocketAddress(),
-                new ServerConnectionFactory<LDAPClientContext, Integer>() {
-                    @Override
-                    public ServerConnection<Integer> handleAccept(
-                            final LDAPClientContext clientContext) throws ErrorResultException {
-                        context.set(clientContext);
-                        latch.release();
-                        return serverConnection;
-                    }
-                });
+    private LDAPListener createServer() {
+        try {
+            return new LDAPListener(findFreeSocketAddress(),
+                    new ServerConnectionFactory<LDAPClientContext, Integer>() {
+                        @Override
+                        public ServerConnection<Integer> handleAccept(
+                                final LDAPClientContext clientContext) throws ErrorResultException {
+                            context.set(clientContext);
+                            connectLatch.release();
+                            return serverConnection;
+                        }
+                    });
+        } catch (IOException e) {
+            fail("Unable to create LDAP listener", e);
+            return null;
+        }
     }
 
-    private Stubber release(final Semaphore latch) {
+    private Stubber notifyEvent(final Semaphore latch) {
         return doAnswer(new Answer<Void>() {
             @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
+            public Void answer(InvocationOnMock invocation) {
                 latch.release();
                 return null;
             }
         });
     }
+
+    private void registerAbandonEvent() {
+        notifyEvent(abandonLatch).when(serverConnection).handleAbandon(any(Integer.class),
+                any(AbandonRequest.class));
+    }
+
+    private void registerBindEvent() {
+        notifyEvent(bindLatch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
+                any(BindRequest.class), any(IntermediateResponseHandler.class),
+                any(ResultHandler.class));
+    }
+
+    private void registerCloseEvent() {
+        notifyEvent(closeLatch).when(serverConnection).handleConnectionClosed(any(Integer.class),
+                any(UnbindRequest.class));
+    }
+
+    private void registerSearchEvent() {
+        notifyEvent(searchLatch).when(serverConnection).handleSearch(any(Integer.class),
+                any(SearchRequest.class), any(IntermediateResponseHandler.class),
+                any(SearchResultHandler.class));
+    }
+
+    private void resetState() {
+        connectLatch.drainPermits();
+        abandonLatch.drainPermits();
+        bindLatch.drainPermits();
+        searchLatch.drainPermits();
+        closeLatch.drainPermits();
+        context.set(null);
+        serverConnection = mock(ServerConnection.class);
+    }
+
+    private void verifyNoAbandonSent() {
+        verify(serverConnection, never()).handleAbandon(any(Integer.class),
+                any(AbandonRequest.class));
+    }
+
+    private void verifyResultCodeIsClientSideTimeout(ErrorResultException error) {
+        assertThat(error.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
+    }
+
+    @SuppressWarnings("unused")
+    private void waitForAbandon() throws InterruptedException {
+        waitForEvent(abandonLatch);
+    }
+
+    private void waitForBind() throws InterruptedException {
+        waitForEvent(bindLatch);
+    }
+
+    private void waitForClose() throws InterruptedException {
+        waitForEvent(closeLatch);
+    }
+
+    private void waitForConnect() throws InterruptedException {
+        waitForEvent(connectLatch);
+    }
+
+    private void waitForEvent(final Semaphore latch) throws InterruptedException {
+        assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
+    }
+
+    private void waitForSearch() throws InterruptedException {
+        waitForEvent(searchLatch);
+    }
 }

--
Gitblit v1.10.0