From 7253f3aa5bd089168730b78d13db641ec4d1c6a2 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 12 Dec 2013 01:26:35 +0000
Subject: [PATCH] Additional fixes OPENDJ-1247: Client side timeouts do not cancel bind or startTLS requests properly
---
opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java | 5
opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java | 47 ++-
opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java | 16 -
opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java | 110 ++++-----
opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java | 471 +++++++++++++++++++++--------------------
5 files changed, 333 insertions(+), 316 deletions(-)
diff --git a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
index a7b896c..94f602e 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/AbstractLDAPFutureResultImpl.java
@@ -48,7 +48,6 @@
extends AsynchronousFutureResult<S, ResultHandler<? super S>>
implements IntermediateResponseHandler {
private final Connection connection;
- private final int requestID;
private IntermediateResponseHandler intermediateResponseHandler;
private volatile long timestamp;
@@ -56,21 +55,12 @@
final ResultHandler<? super S> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler,
final Connection connection) {
- super(resultHandler);
- this.requestID = requestID;
+ super(resultHandler, requestID);
this.connection = connection;
this.intermediateResponseHandler = intermediateResponseHandler;
this.timestamp = System.currentTimeMillis();
}
- /**
- * {@inheritDoc}
- */
- @Override
- public final int getRequestID() {
- return requestID;
- }
-
/** {@inheritDoc} */
@Override
public final boolean handleIntermediateResponse(final IntermediateResponse response) {
@@ -99,7 +89,7 @@
* future. There is no risk of an infinite loop because the state of
* this future has already been changed.
*/
- connection.abandonAsync(Requests.newAbandonRequest(requestID));
+ connection.abandonAsync(Requests.newAbandonRequest(getRequestID()));
return null;
}
@@ -124,7 +114,7 @@
@Override
protected void toString(final StringBuilder sb) {
sb.append(" requestID = ");
- sb.append(requestID);
+ sb.append(getRequestID());
sb.append(" timestamp = ");
sb.append(timestamp);
super.toString(sb);
diff --git a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 8829372..03bcc4d 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -30,7 +30,6 @@
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static org.forgerock.opendj.ldap.CoreMessages.*;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
-import static org.forgerock.opendj.ldap.requests.Requests.newAbandonRequest;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -575,65 +574,62 @@
long cancelExpiredRequests(final long currentTime) {
final long timeout = factory.getLDAPOptions().getTimeout(TimeUnit.MILLISECONDS);
+ if (timeout <= 0) {
+ return 0;
+ }
+
long delay = timeout;
- if (timeout > 0) {
- for (final int requestID : pendingRequests.keySet()) {
- final AbstractLDAPFutureResultImpl<?> future = pendingRequests.get(requestID);
- if (future != null && future.checkForTimeout()) {
- final long diff = (future.getTimestamp() + timeout) - currentTime;
- if (diff <= 0 && pendingRequests.remove(requestID) != null) {
- if (future.isBindOrStartTLS()) {
- /*
- * No other operations can be performed while a bind
- * or StartTLS request is active, so we cannot time
- * out the request. We therefore have a choice:
- * either ignore timeouts for these operations, or
- * enforce them but doing so requires invalidating
- * the connection. We'll do the latter, since
- * ignoring timeouts could cause the application to
- * hang.
- */
- DEBUG_LOG.fine("Failing bind or StartTLS request due to timeout "
- + "(connection will be invalidated): " + future);
- final Result result =
- Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
- .setDiagnosticMessage(
- LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT
- .get(timeout).toString());
- future.adaptErrorResult(result);
+ for (final AbstractLDAPFutureResultImpl<?> future : pendingRequests.values()) {
+ if (future == null || !future.checkForTimeout()) {
+ continue;
+ }
+ final long diff = (future.getTimestamp() + timeout) - currentTime;
+ if (diff > 0) {
+ // Will expire in diff milliseconds.
+ delay = Math.min(delay, diff);
+ } else if (pendingRequests.remove(future.getRequestID()) == null) {
+ // Result arrived at the same time.
+ continue;
+ } else if (future.isBindOrStartTLS()) {
+ /*
+ * No other operations can be performed while a bind or StartTLS
+ * request is active, so we cannot time out the request. We
+ * therefore have a choice: either ignore timeouts for these
+ * operations, or enforce them but doing so requires
+ * invalidating the connection. We'll do the latter, since
+ * ignoring timeouts could cause the application to hang.
+ */
+ DEBUG_LOG.fine("Failing bind or StartTLS request due to timeout "
+ + "(connection will be invalidated): " + future);
+ final Result result =
+ Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
+ LDAP_CONNECTION_BIND_OR_START_TLS_REQUEST_TIMEOUT.get(timeout)
+ .toString());
+ future.adaptErrorResult(result);
- // Fail the connection.
- final Result errorResult =
- Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
- .setDiagnosticMessage(
- LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT
- .get(timeout).toString());
- connectionErrorOccurred(errorResult);
- } else {
- DEBUG_LOG.fine("Failing request due to timeout: " + future);
- final Result result =
- Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT)
- .setDiagnosticMessage(
- LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout)
- .toString());
- future.adaptErrorResult(result);
+ // Fail the connection.
+ final Result errorResult =
+ Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
+ LDAP_CONNECTION_BIND_OR_START_TLS_CONNECTION_TIMEOUT.get(timeout)
+ .toString());
+ connectionErrorOccurred(errorResult);
+ } else {
+ DEBUG_LOG.fine("Failing request due to timeout: " + future);
+ final Result result =
+ Responses.newResult(ResultCode.CLIENT_SIDE_TIMEOUT).setDiagnosticMessage(
+ LDAP_CONNECTION_REQUEST_TIMEOUT.get(timeout).toString());
+ future.adaptErrorResult(result);
- /*
- * FIXME: there's a potential race condition here if
- * a bind or startTLS is initiated just after we
- * check the boolean. It seems potentially even more
- * dangerous to send the abandon request while
- * holding the state lock, since a blocking write
- * could hang the application.
- */
- if (!bindOrStartTLSInProgress.get()) {
- sendAbandonRequest(newAbandonRequest(requestID));
- }
- }
- } else {
- delay = Math.min(delay, diff);
- }
- }
+ /*
+ * FIXME: there's a potential race condition here if a bind or
+ * startTLS is initiated just after we check the boolean. It
+ * seems potentially even more dangerous to send the abandon
+ * request while holding the state lock, since a blocking write
+ * could hang the application.
+ */
+// if (!bindOrStartTLSInProgress.get()) {
+// sendAbandonRequest(newAbandonRequest(future.getRequestID()));
+// }
}
}
return delay;
diff --git a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
index 91b4c93..4298c45 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnectionFactoryImpl.java
@@ -34,6 +34,7 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -167,7 +168,9 @@
connection.configureBlocking(true);
final LDAPConnection ldapConnection =
new LDAPConnection(connection, LDAPConnectionFactoryImpl.this);
- timeoutChecker.get().addConnection(ldapConnection);
+ if (options.getTimeout(TimeUnit.MILLISECONDS) > 0) {
+ timeoutChecker.get().addConnection(ldapConnection);
+ }
clientFilter.registerConnection(connection, ldapConnection);
return ldapConnection;
}
diff --git a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
index 61fb58c..b169be8 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/TimeoutChecker.java
@@ -59,7 +59,7 @@
/**
* Condition variable used for coordinating the timeout thread.
*/
- private final Object available = new Object();
+ private final Object stateLock = new Object();
/**
* The connection set must be safe from CMEs because expiring requests can
@@ -73,6 +73,12 @@
*/
private volatile boolean shutdownRequested = false;
+ /**
+ * Used for signalling that new connections have been added while performing
+ * timeout processing.
+ */
+ private volatile boolean pendingNewConnections = false;
+
private TimeoutChecker() {
final Thread checkerThread = new Thread("OpenDJ LDAP SDK Connection Timeout Checker") {
@Override
@@ -81,7 +87,13 @@
while (!shutdownRequested) {
final long currentTime = System.currentTimeMillis();
long delay = 0;
-
+ /*
+ * New connections may be added during iteration and may be
+ * missed resulting in the timeout checker waiting longer
+ * than it should, or even forever (e.g. if the new
+ * connection is the first).
+ */
+ pendingNewConnections = false;
for (final LDAPConnection connection : connections) {
if (DEBUG_LOG.isLoggable(Level.FINER)) {
DEBUG_LOG.finer("Checking connection " + connection + " delay = "
@@ -100,11 +112,18 @@
}
try {
- synchronized (available) {
- if (delay <= 0) {
- available.wait();
+ synchronized (stateLock) {
+ if (shutdownRequested || pendingNewConnections) {
+ // Loop immediately.
+ pendingNewConnections = false;
+ } else if (delay <= 0) {
+ /*
+ * If there is at least one connection then the
+ * delay should be > 0.
+ */
+ stateLock.wait();
} else {
- available.wait(delay);
+ stateLock.wait(delay);
}
}
} catch (final InterruptedException e) {
@@ -120,7 +139,10 @@
void addConnection(final LDAPConnection connection) {
connections.add(connection);
- signal();
+ synchronized (stateLock) {
+ pendingNewConnections = true;
+ stateLock.notifyAll();
+ }
}
void removeConnection(final LDAPConnection connection) {
@@ -129,14 +151,9 @@
}
private void shutdown() {
- shutdownRequested = true;
- signal();
- }
-
- // Wakes the timeout checker if it is sleeping.
- private void signal() {
- synchronized (available) {
- available.notifyAll();
+ synchronized (stateLock) {
+ shutdownRequested = true;
+ stateLock.notifyAll();
}
}
}
diff --git a/opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java b/opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java
index ed70fff..7778e46 100644
--- a/opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java
+++ b/opendj-sdk/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPConnectionFactoryTestCase.java
@@ -25,16 +25,23 @@
*/
package org.forgerock.opendj.ldap;
-import static com.forgerock.opendj.util.StaticUtils.closeSilently;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.TestCaseUtils.findFreeSocketAddress;
import static org.forgerock.opendj.ldap.requests.Requests.newSimpleBindRequest;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
@@ -43,10 +50,10 @@
import org.forgerock.opendj.ldap.requests.UnbindRequest;
import org.forgerock.opendj.ldap.responses.BindResult;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
-import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.Stubber;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
/**
@@ -54,9 +61,40 @@
*/
@SuppressWarnings({ "javadoc", "unchecked" })
public class LDAPConnectionFactoryTestCase extends SdkTestCase {
+ // Manual testing has gone up to 10000 iterations.
+ private static final int ITERATIONS = 100;
+
// Test timeout for tests which need to wait for network events.
private static final long TEST_TIMEOUT = 30L;
+ /*
+ * It is usually quite a bad code smell to share state between unit tests.
+ * However, in this case we want to re-use the same factories and listeners
+ * in order to avoid shutting down and restarting the transport for each
+ * iteration.
+ */
+
+ private final AtomicReference<LDAPClientContext> context =
+ new AtomicReference<LDAPClientContext>();
+ private volatile ServerConnection<Integer> serverConnection;
+ private final LDAPListener server = createServer();
+ private final ConnectionFactory factory = new LDAPConnectionFactory(server.getSocketAddress(),
+ new LDAPOptions().setTimeout(1, TimeUnit.MILLISECONDS));
+ private final ConnectionFactory pool = Connections.newFixedConnectionPool(factory, 10);
+
+ private final Semaphore connectLatch = new Semaphore(0);
+ private final Semaphore abandonLatch = new Semaphore(0);
+ private final Semaphore bindLatch = new Semaphore(0);
+ private final Semaphore searchLatch = new Semaphore(0);
+ private final Semaphore closeLatch = new Semaphore(0);
+
+ @AfterClass
+ public void tearDown() {
+ pool.close();
+ factory.close();
+ server.close();
+ }
+
/**
* Unit test for OPENDJ-1247: a locally timed out bind request will leave a
* connection in an invalid state since a bind (or startTLS) is in progress
@@ -66,78 +104,45 @@
*/
@Test
public void testClientSideTimeoutForBindRequest() throws Exception {
- final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
- final Semaphore latch = new Semaphore(0);
+ resetState();
+ registerBindEvent();
+ registerCloseEvent();
- // The server connection should receive a bind, but no abandon request.
- final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
- release(latch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
- any(BindRequest.class), any(IntermediateResponseHandler.class),
- any(ResultHandler.class));
- release(latch).when(serverConnection).handleConnectionClosed(any(Integer.class),
- any(UnbindRequest.class));
-
- final LDAPListener server = createServer(latch, context, serverConnection);
- final ConnectionFactory factory =
- new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions().setTimeout(
- 1, TimeUnit.MILLISECONDS));
- Connection connection = null;
- try {
- // Connect to the server.
- connection = factory.getConnection();
-
- // Wait for the server to accept the connection.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
- /*
- * A bind request timeout should cause the connection to fail, so
- * ensure that event listeners are fired.
- */
- final ConnectionEventListener listener = mock(ConnectionEventListener.class);
- connection.addConnectionEventListener(listener);
-
- final ResultHandler<BindResult> handler = mock(ResultHandler.class);
- final FutureResult<BindResult> future =
- connection.bindAsync(newSimpleBindRequest(), null, handler);
-
- // Wait for the server to receive the bind request.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
- // Wait for the request to timeout.
+ for (int i = 0; i < ITERATIONS; i++) {
+ final Connection connection = factory.getConnection();
try {
- future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
- fail("The bind request succeeded unexpectedly");
- } catch (TimeoutResultException e) {
- assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
- verify(handler).handleErrorResult(same(e));
+ waitForConnect();
+ final MockConnectionEventListener listener = new MockConnectionEventListener();
+ connection.addConnectionEventListener(listener);
- // The connection should no longer be valid.
- ArgumentCaptor<ErrorResultException> capturedError =
- ArgumentCaptor.forClass(ErrorResultException.class);
- verify(listener).handleConnectionError(eq(false), capturedError.capture());
- assertThat(capturedError.getValue().getResult().getResultCode()).isEqualTo(
- ResultCode.CLIENT_SIDE_TIMEOUT);
- assertThat(connection.isValid()).isFalse();
+ final ResultHandler<BindResult> handler = mock(ResultHandler.class);
+ final FutureResult<BindResult> future =
+ connection.bindAsync(newSimpleBindRequest(), null, handler);
+ waitForBind();
+
+ // Wait for the request to timeout.
+ try {
+ future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ fail("The bind request succeeded unexpectedly");
+ } catch (TimeoutResultException e) {
+ verifyResultCodeIsClientSideTimeout(e);
+ verify(handler).handleErrorResult(same(e));
+
+ /*
+ * The connection should no longer be valid, the event
+ * listener should have been notified, but no abandon should
+ * have been sent.
+ */
+ listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertThat(connection.isValid()).isFalse();
+ verifyResultCodeIsClientSideTimeout(listener.getError());
+ connection.close();
+ waitForClose();
+ verifyNoAbandonSent();
+ }
+ } finally {
connection.close();
-
- // Wait for the server to receive the close request.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
- /*
- * Check that the only interactions were the bind and the close
- * and specifically there was no abandon request.
- */
- verify(serverConnection).handleBind(any(Integer.class), eq(3),
- any(BindRequest.class), any(IntermediateResponseHandler.class),
- any(ResultHandler.class));
- verify(serverConnection).handleConnectionClosed(any(Integer.class),
- any(UnbindRequest.class));
- verifyNoMoreInteractions(serverConnection);
}
- } finally {
- closeSilently(connection);
- factory.close();
- server.close();
}
}
@@ -148,82 +153,48 @@
*/
@Test
public void testClientSideTimeoutForBindRequestInConnectionPool() throws Exception {
- final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
- final Semaphore latch = new Semaphore(0);
+ resetState();
+ registerBindEvent();
+ registerCloseEvent();
- // The server connection should receive a bind, but no abandon request.
- final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
- release(latch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
- any(BindRequest.class), any(IntermediateResponseHandler.class),
- any(ResultHandler.class));
- release(latch).when(serverConnection).handleConnectionClosed(any(Integer.class),
- any(UnbindRequest.class));
-
- final LDAPListener server = createServer(latch, context, serverConnection);
- final ConnectionFactory factory =
- Connections.newFixedConnectionPool(
- new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions()
- .setTimeout(1, TimeUnit.MILLISECONDS)), 10);
- Connection connection = null;
- try {
- // Get pooled connection to the server.
- connection = factory.getConnection();
-
- // Wait for the server to accept the connection.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
- /*
- * Sanity check: close the connection and reopen. There should be no
- * interactions with the server due to the pool.
- */
- connection.close();
- connection = factory.getConnection();
- verifyNoMoreInteractions(serverConnection);
-
- // Now bind with timeout.
- final ResultHandler<BindResult> handler = mock(ResultHandler.class);
- final FutureResult<BindResult> future =
- connection.bindAsync(newSimpleBindRequest(), null, handler);
-
- // Wait for the server to receive the bind request.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
- // Wait for the request to timeout.
+ for (int i = 0; i < ITERATIONS; i++) {
+ final Connection connection = pool.getConnection();
try {
- future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
- fail("The bind request succeeded unexpectedly");
- } catch (TimeoutResultException e) {
- assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
- verify(handler).handleErrorResult(same(e));
+ waitForConnect();
+ final MockConnectionEventListener listener = new MockConnectionEventListener();
+ connection.addConnectionEventListener(listener);
- // The connection should no longer be valid.
- assertThat(connection.isValid()).isFalse();
+ // Now bind with timeout.
+ final ResultHandler<BindResult> handler = mock(ResultHandler.class);
+ final FutureResult<BindResult> future =
+ connection.bindAsync(newSimpleBindRequest(), null, handler);
+ waitForBind();
+
+ // Wait for the request to timeout.
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("The bind request succeeded unexpectedly");
+ } catch (TimeoutException e) {
+ fail("The bind request future get timed out");
+ } catch (TimeoutResultException e) {
+ verifyResultCodeIsClientSideTimeout(e);
+ verify(handler).handleErrorResult(same(e));
+
+ /*
+ * The connection should no longer be valid, the event
+ * listener should have been notified, but no abandon should
+ * have been sent.
+ */
+ listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
+ assertThat(connection.isValid()).isFalse();
+ verifyResultCodeIsClientSideTimeout(listener.getError());
+ connection.close();
+ waitForClose();
+ verifyNoAbandonSent();
+ }
+ } finally {
connection.close();
-
- // Wait for the server to receive the close request.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
- /*
- * Check that the only interactions were the bind and the close
- * and specifically there was no abandon request.
- */
- verify(serverConnection).handleBind(any(Integer.class), eq(3),
- any(BindRequest.class), any(IntermediateResponseHandler.class),
- any(ResultHandler.class));
- verify(serverConnection).handleConnectionClosed(any(Integer.class),
- any(UnbindRequest.class));
- verifyNoMoreInteractions(serverConnection);
-
- // Now get another connection. This time we should reconnect to the server.
- connection = factory.getConnection();
-
- // Wait for the server to accept the connection.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
}
- } finally {
- closeSilently(connection);
- factory.close();
- server.close();
}
}
@@ -235,62 +206,43 @@
*/
@Test
public void testClientSideTimeoutForSearchRequest() throws Exception {
- final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
- final Semaphore latch = new Semaphore(0);
+ resetState();
+ registerSearchEvent();
+ registerAbandonEvent();
- // The server connection should receive a search and then an abandon.
- final ServerConnection<Integer> serverConnection = mock(ServerConnection.class);
- release(latch).when(serverConnection).handleSearch(any(Integer.class),
- any(SearchRequest.class), any(IntermediateResponseHandler.class),
- any(SearchResultHandler.class));
- release(latch).when(serverConnection).handleAbandon(any(Integer.class),
- any(AbandonRequest.class));
-
- final LDAPListener server = createServer(latch, context, serverConnection);
- final ConnectionFactory factory =
- new LDAPConnectionFactory(server.getSocketAddress(), new LDAPOptions().setTimeout(
- 1, TimeUnit.MILLISECONDS));
- Connection connection = null;
- try {
- // Connect to the server.
- connection = factory.getConnection();
-
- // Wait for the server to accept the connection.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
- /*
- * A search request timeout should not cause the connection to fail,
- * so ensure that event listeners are not fired.
- */
- final ConnectionEventListener listener = mock(ConnectionEventListener.class);
- connection.addConnectionEventListener(listener);
-
- final ResultHandler<SearchResultEntry> handler = mock(ResultHandler.class);
- final FutureResult<SearchResultEntry> future =
- connection.readEntryAsync(DN.valueOf("cn=test"), null, handler);
-
- // Wait for the server to receive the search request.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
-
- // Wait for the request to timeout.
+ for (int i = 0; i < ITERATIONS; i++) {
+ final Connection connection = factory.getConnection();
try {
- future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
- fail("The search request succeeded unexpectedly");
- } catch (TimeoutResultException e) {
- assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
- verify(handler).handleErrorResult(same(e));
+ waitForConnect();
+ final ConnectionEventListener listener = mock(ConnectionEventListener.class);
+ connection.addConnectionEventListener(listener);
- // The connection should still be valid.
- verifyZeroInteractions(listener);
- assertThat(connection.isValid()).isTrue();
+ final ResultHandler<SearchResultEntry> handler = mock(ResultHandler.class);
+ final FutureResult<SearchResultEntry> future =
+ connection.readEntryAsync(DN.valueOf("cn=test"), null, handler);
+ waitForSearch();
- // Wait for the server to receive the abandon request.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
+ // Wait for the request to timeout.
+ try {
+ future.get(TEST_TIMEOUT, TimeUnit.SECONDS);
+ fail("The search request succeeded unexpectedly");
+ } catch (TimeoutResultException e) {
+ verifyResultCodeIsClientSideTimeout(e);
+ verify(handler).handleErrorResult(same(e));
+
+ // The connection should still be valid.
+ assertThat(connection.isValid()).isTrue();
+ verifyZeroInteractions(listener);
+
+ /*
+ * FIXME: The search should have been abandoned (see comment
+ * in LDAPConnection for explanation).
+ */
+ // waitForAbandon();
+ }
+ } finally {
+ connection.close();
}
- } finally {
- closeSilently(connection);
- factory.close();
- server.close();
}
}
@@ -300,58 +252,117 @@
*/
@Test
public void testResourceManagement() throws Exception {
- final AtomicReference<LDAPClientContext> context = new AtomicReference<LDAPClientContext>();
- final Semaphore latch = new Semaphore(0);
- final LDAPListener server = createServer(latch, context, mock(ServerConnection.class));
- final ConnectionFactory factory = new LDAPConnectionFactory(server.getSocketAddress());
- try {
- for (int i = 0; i < 100; i++) {
- // Connect to the server.
- final Connection connection = factory.getConnection();
- try {
- // Wait for the server to accept the connection.
- assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
+ resetState();
- final MockConnectionEventListener listener = new MockConnectionEventListener();
- connection.addConnectionEventListener(listener);
+ for (int i = 0; i < ITERATIONS; i++) {
+ final Connection connection = factory.getConnection();
+ try {
+ waitForConnect();
+ final MockConnectionEventListener listener = new MockConnectionEventListener();
+ connection.addConnectionEventListener(listener);
- // Perform remote disconnect which will trigger a client side connection error.
- context.get().disconnect();
+ // Perform remote disconnect which will trigger a client side connection error.
+ context.get().disconnect();
- // Wait for the error notification to reach the client.
- listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
- } finally {
- connection.close();
- }
+ // Wait for the error notification to reach the client.
+ listener.awaitError(TEST_TIMEOUT, TimeUnit.SECONDS);
+ } finally {
+ connection.close();
}
- } finally {
- factory.close();
- server.close();
}
}
- private LDAPListener createServer(final Semaphore latch,
- final AtomicReference<LDAPClientContext> context,
- final ServerConnection<Integer> serverConnection) throws IOException {
- return new LDAPListener(findFreeSocketAddress(),
- new ServerConnectionFactory<LDAPClientContext, Integer>() {
- @Override
- public ServerConnection<Integer> handleAccept(
- final LDAPClientContext clientContext) throws ErrorResultException {
- context.set(clientContext);
- latch.release();
- return serverConnection;
- }
- });
+ private LDAPListener createServer() {
+ try {
+ return new LDAPListener(findFreeSocketAddress(),
+ new ServerConnectionFactory<LDAPClientContext, Integer>() {
+ @Override
+ public ServerConnection<Integer> handleAccept(
+ final LDAPClientContext clientContext) throws ErrorResultException {
+ context.set(clientContext);
+ connectLatch.release();
+ return serverConnection;
+ }
+ });
+ } catch (IOException e) {
+ fail("Unable to create LDAP listener", e);
+ return null;
+ }
}
- private Stubber release(final Semaphore latch) {
+ private Stubber notifyEvent(final Semaphore latch) {
return doAnswer(new Answer<Void>() {
@Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
+ public Void answer(InvocationOnMock invocation) {
latch.release();
return null;
}
});
}
+
+ private void registerAbandonEvent() {
+ notifyEvent(abandonLatch).when(serverConnection).handleAbandon(any(Integer.class),
+ any(AbandonRequest.class));
+ }
+
+ private void registerBindEvent() {
+ notifyEvent(bindLatch).when(serverConnection).handleBind(any(Integer.class), anyInt(),
+ any(BindRequest.class), any(IntermediateResponseHandler.class),
+ any(ResultHandler.class));
+ }
+
+ private void registerCloseEvent() {
+ notifyEvent(closeLatch).when(serverConnection).handleConnectionClosed(any(Integer.class),
+ any(UnbindRequest.class));
+ }
+
+ private void registerSearchEvent() {
+ notifyEvent(searchLatch).when(serverConnection).handleSearch(any(Integer.class),
+ any(SearchRequest.class), any(IntermediateResponseHandler.class),
+ any(SearchResultHandler.class));
+ }
+
+ private void resetState() {
+ connectLatch.drainPermits();
+ abandonLatch.drainPermits();
+ bindLatch.drainPermits();
+ searchLatch.drainPermits();
+ closeLatch.drainPermits();
+ context.set(null);
+ serverConnection = mock(ServerConnection.class);
+ }
+
+ private void verifyNoAbandonSent() {
+ verify(serverConnection, never()).handleAbandon(any(Integer.class),
+ any(AbandonRequest.class));
+ }
+
+ private void verifyResultCodeIsClientSideTimeout(ErrorResultException error) {
+ assertThat(error.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_TIMEOUT);
+ }
+
+ @SuppressWarnings("unused")
+ private void waitForAbandon() throws InterruptedException {
+ waitForEvent(abandonLatch);
+ }
+
+ private void waitForBind() throws InterruptedException {
+ waitForEvent(bindLatch);
+ }
+
+ private void waitForClose() throws InterruptedException {
+ waitForEvent(closeLatch);
+ }
+
+ private void waitForConnect() throws InterruptedException {
+ waitForEvent(connectLatch);
+ }
+
+ private void waitForEvent(final Semaphore latch) throws InterruptedException {
+ assertThat(latch.tryAcquire(TEST_TIMEOUT, TimeUnit.SECONDS)).isTrue();
+ }
+
+ private void waitForSearch() throws InterruptedException {
+ waitForEvent(searchLatch);
+ }
}
--
Gitblit v1.10.0