From 398a2eaf655e77b39fe71b644d95802c30749e0e Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 01 Oct 2012 21:35:14 +0000
Subject: [PATCH] Fix OPENDJ-593: FixedConnectionPool connection event listeners are never notified
---
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java | 4
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java | 148 +++++++++++++++++++++--------
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java | 90 +++++++++--------
3 files changed, 153 insertions(+), 89 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
index 23102b2..475170f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -201,7 +201,7 @@
notifyErrorOccurred = isFailed;
if (!isClosed) {
if (listeners == null) {
- listeners = new LinkedList<ConnectionEventListener>();
+ listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
}
listeners.add(listener);
}
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
index 1cde3f2..296789f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -122,17 +122,17 @@
* the client application closes this connection. More specifically, pooled
* connections are not actually stored in the internal queue.
*/
- private final class PooledConnection implements Connection {
- // Connection event listeners registered against this pooled connection
- // should have the same life time as the pooled connection.
- private final List<ConnectionEventListener> listeners =
- new CopyOnWriteArrayList<ConnectionEventListener>();
-
+ private final class PooledConnection implements Connection, ConnectionEventListener {
private final Connection connection;
+ private ErrorResultException error = null;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private boolean isDisconnectNotification = false;
+ private List<ConnectionEventListener> listeners = null;
+ // Guarded by stateLock.
+ private final Object stateLock = new Object();
- PooledConnection(final Connection connection) {
+ private PooledConnection(final Connection connection) {
this.connection = connection;
}
@@ -163,14 +163,34 @@
return checkState().addAsync(request, intermediateResponseHandler, resultHandler);
}
- /**
- * {@inheritDoc}
- */
@Override
public void addConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
- checkState();
- listeners.add(listener);
+ final boolean notifyClose;
+ final boolean notifyErrorOccurred;
+ synchronized (stateLock) {
+ notifyClose = isClosed.get();
+ notifyErrorOccurred = error != null;
+ if (!notifyClose) {
+ if (listeners == null) {
+ // Create and register first listener. If an error has
+ // already occurred on the underlying connection, then
+ // the listener may be immediately invoked so ensure
+ // that it is already in the list.
+ listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
+ listeners.add(listener);
+ connection.addConnectionEventListener(this);
+ } else {
+ listeners.add(listener);
+ }
+ }
+ }
+ if (notifyErrorOccurred) {
+ listener.handleConnectionError(isDisconnectNotification, error);
+ }
+ if (notifyClose) {
+ listener.handleConnectionClosed();
+ }
}
@Override
@@ -191,14 +211,21 @@
return checkState().bindAsync(request, intermediateResponseHandler, resultHandler);
}
- /**
- * {@inheritDoc}
- */
@Override
public void close() {
- if (!isClosed.compareAndSet(false, true)) {
- // Already closed.
- return;
+ final List<ConnectionEventListener> tmpListeners;
+ synchronized (stateLock) {
+ if (!isClosed.compareAndSet(false, true)) {
+ // Already closed.
+ return;
+ }
+ tmpListeners = listeners;
+ }
+
+ // Remove underlying listener if needed and do this before
+ // subsequent connection events may occur.
+ if (tmpListeners != null) {
+ connection.removeConnectionEventListener(this);
}
// Don't put invalid connections back in the pool.
@@ -222,11 +249,15 @@
- currentPoolSize.availablePermits(), poolSize));
}
}
+
+ // Invoke listeners.
+ if (tmpListeners != null) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleConnectionClosed();
+ }
+ }
}
- /**
- * {@inheritDoc}
- */
@Override
public void close(final UnbindRequest request, final String reason) {
close();
@@ -294,17 +325,51 @@
resultHandler);
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void handleConnectionClosed() {
+ /*
+ * The underlying connection was closed by the client. This can only
+ * occur when the pool is being shut down and the underlying
+ * connection is not in use.
+ */
+ throw new IllegalStateException(
+ "Pooled connection received unexpected close notification");
+ }
+
+ @Override
+ public void handleConnectionError(final boolean isDisconnectNotification,
+ final ErrorResultException error) {
+ final List<ConnectionEventListener> tmpListeners;
+ synchronized (stateLock) {
+ tmpListeners = listeners;
+ this.isDisconnectNotification = isDisconnectNotification;
+ this.error = error;
+ }
+ if (tmpListeners != null) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleConnectionError(isDisconnectNotification, error);
+ }
+ }
+ }
+
+ @Override
+ public void handleUnsolicitedNotification(final ExtendedResult notification) {
+ final List<ConnectionEventListener> tmpListeners;
+ synchronized (stateLock) {
+ tmpListeners = listeners;
+ }
+ if (tmpListeners != null) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleUnsolicitedNotification(notification);
+ }
+ }
+ }
+
@Override
public boolean isClosed() {
return isClosed.get();
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean isValid() {
return connection.isValid() && !isClosed();
@@ -363,14 +428,14 @@
return checkState().readEntryAsync(name, attributeDescriptions, handler);
}
- /**
- * {@inheritDoc}
- */
@Override
public void removeConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
- checkState();
- listeners.remove(listener);
+ synchronized (stateLock) {
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ }
}
@Override
@@ -430,9 +495,6 @@
return checkState().searchSingleEntryAsync(request, handler);
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -495,19 +557,19 @@
}
}
- // Guarded by queue.
- private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+ private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
+
+ private final Semaphore currentPoolSize;
+
+ private final ConnectionFactory factory;
// Guarded by queue.
private boolean isClosed = false;
- private final ConnectionFactory factory;
-
private final int poolSize;
- private final Semaphore currentPoolSize;
-
- private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
+ // Guarded by queue.
+ private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
/**
* Creates a new connection pool which will maintain {@code poolSize}
@@ -564,7 +626,7 @@
public Connection getConnection() throws ErrorResultException {
try {
return getConnectionAsync(null).get();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
}
}
diff --git a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
index 53793e0..71ea964 100644
--- a/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
+++ b/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -28,13 +28,12 @@
package org.forgerock.opendj.ldap;
import static org.fest.assertions.Assertions.assertThat;
-import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
-import static org.forgerock.opendj.ldap.responses.Responses.newBindResult;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -69,9 +68,11 @@
* @throws Exception
* If an unexpected error occurred.
*/
- @Test(enabled = false)
+ @Test
public void testConnectionEventListenerClose() throws Exception {
- final ConnectionFactory factory = mockConnectionFactory(mock(Connection.class));
+ final Connection pooledConnection = mock(Connection.class);
+ when(pooledConnection.isValid()).thenReturn(true);
+ final ConnectionFactory factory = mockConnectionFactory(pooledConnection);
final ConnectionPool pool = newFixedConnectionPool(factory, 1);
final Connection connection = pool.getConnection();
final ConnectionEventListener listener = mock(ConnectionEventListener.class);
@@ -95,27 +96,23 @@
* @throws Exception
* If an unexpected error occurred.
*/
- @Test(enabled = false)
+ @Test
public void testConnectionEventListenerError() throws Exception {
- final Connection badConnection = mock(Connection.class);
- when(badConnection.bind(any(BindRequest.class))).thenThrow(
- newErrorResult(newBindResult(ResultCode.CLIENT_SIDE_SERVER_DOWN)));
- final ConnectionFactory factory = mockConnectionFactory(badConnection);
- final ConnectionPool pool = newFixedConnectionPool(factory, 2);
+ final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>();
+ final Connection mockConnection = mockConnection(listeners);
+ final ConnectionFactory factory = mockConnectionFactory(mockConnection);
+ final ConnectionPool pool = newFixedConnectionPool(factory, 1);
final Connection connection = pool.getConnection();
final ConnectionEventListener listener = mock(ConnectionEventListener.class);
connection.addConnectionEventListener(listener);
- try {
- connection.bind(Requests.newSimpleBindRequest("cn=test", "password".toCharArray()));
- fail("Expected connection error");
- } catch (final ErrorResultException e) {
- assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_SERVER_DOWN);
- } finally {
- connection.close();
- }
- verify(listener).handleConnectionError(eq(false), any(ConnectionException.class));
- verify(listener).handleConnectionClosed();
+ assertThat(listeners).hasSize(1);
+ listeners.get(0).handleConnectionError(false,
+ newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN));
+ verify(listener, times(0)).handleConnectionClosed();
+ verify(listener).handleConnectionError(eq(false), isA(ConnectionException.class));
verify(listener, times(0)).handleUnsolicitedNotification(any(ExtendedResult.class));
+ connection.close();
+ assertThat(listeners).hasSize(0);
}
/**
@@ -126,32 +123,10 @@
* @throws Exception
* If an unexpected error occurred.
*/
- @Test(enabled = false)
+ @Test
public void testConnectionEventListenerUnsolicitedNotification() throws Exception {
final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>();
- final Connection mockConnection = mock(Connection.class);
-
- // Handle listener registration / deregistration in mock connection.
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(final InvocationOnMock invocation) throws Throwable {
- final ConnectionEventListener listener =
- (ConnectionEventListener) invocation.getArguments()[0];
- listeners.add(listener);
- return null;
- }
- }).when(mockConnection).addConnectionEventListener(any(ConnectionEventListener.class));
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(final InvocationOnMock invocation) throws Throwable {
- final ConnectionEventListener listener =
- (ConnectionEventListener) invocation.getArguments()[0];
- listeners.remove(listener);
- return null;
- }
- }).when(mockConnection).removeConnectionEventListener(any(ConnectionEventListener.class));
-
+ final Connection mockConnection = mockConnection(listeners);
final ConnectionFactory factory = mockConnectionFactory(mockConnection);
final ConnectionPool pool = newFixedConnectionPool(factory, 1);
final Connection connection = pool.getConnection();
@@ -391,6 +366,33 @@
pool.close();
}
+ private Connection mockConnection(final List<ConnectionEventListener> listeners) {
+ final Connection mockConnection = mock(Connection.class);
+
+ // Handle listener registration / deregistration in mock connection.
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(final InvocationOnMock invocation) throws Throwable {
+ final ConnectionEventListener listener =
+ (ConnectionEventListener) invocation.getArguments()[0];
+ listeners.add(listener);
+ return null;
+ }
+ }).when(mockConnection).addConnectionEventListener(any(ConnectionEventListener.class));
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(final InvocationOnMock invocation) throws Throwable {
+ final ConnectionEventListener listener =
+ (ConnectionEventListener) invocation.getArguments()[0];
+ listeners.remove(listener);
+ return null;
+ }
+ }).when(mockConnection).removeConnectionEventListener(any(ConnectionEventListener.class));
+
+ return mockConnection;
+ }
+
@SuppressWarnings("unchecked")
private ConnectionFactory mockConnectionFactory(final Connection first,
final Connection... remaining) throws ErrorResultException {
--
Gitblit v1.10.0