opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -201,7 +201,7 @@ notifyErrorOccurred = isFailed; if (!isClosed) { if (listeners == null) { listeners = new LinkedList<ConnectionEventListener>(); listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); } listeners.add(listener); } opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -122,17 +122,17 @@ * the client application closes this connection. More specifically, pooled * connections are not actually stored in the internal queue. */ private final class PooledConnection implements Connection { // Connection event listeners registered against this pooled connection // should have the same life time as the pooled connection. private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); private final class PooledConnection implements Connection, ConnectionEventListener { private final Connection connection; private ErrorResultException error = null; private final AtomicBoolean isClosed = new AtomicBoolean(false); private boolean isDisconnectNotification = false; private List<ConnectionEventListener> listeners = null; // Guarded by stateLock. private final Object stateLock = new Object(); PooledConnection(final Connection connection) { private PooledConnection(final Connection connection) { this.connection = connection; } @@ -163,14 +163,34 @@ return checkState().addAsync(request, intermediateResponseHandler, resultHandler); } /** * {@inheritDoc} */ @Override public void addConnectionEventListener(final ConnectionEventListener listener) { Validator.ensureNotNull(listener); checkState(); final boolean notifyClose; final boolean notifyErrorOccurred; synchronized (stateLock) { notifyClose = isClosed.get(); notifyErrorOccurred = error != null; if (!notifyClose) { if (listeners == null) { // Create and register first listener. If an error has // already occurred on the underlying connection, then // the listener may be immediately invoked so ensure // that it is already in the list. listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); listeners.add(listener); connection.addConnectionEventListener(this); } else { listeners.add(listener); } } } if (notifyErrorOccurred) { listener.handleConnectionError(isDisconnectNotification, error); } if (notifyClose) { listener.handleConnectionClosed(); } } @Override @@ -191,15 +211,22 @@ return checkState().bindAsync(request, intermediateResponseHandler, resultHandler); } /** * {@inheritDoc} */ @Override public void close() { final List<ConnectionEventListener> tmpListeners; synchronized (stateLock) { if (!isClosed.compareAndSet(false, true)) { // Already closed. return; } tmpListeners = listeners; } // Remove underlying listener if needed and do this before // subsequent connection events may occur. if (tmpListeners != null) { connection.removeConnectionEventListener(this); } // Don't put invalid connections back in the pool. if (connection.isValid()) { @@ -222,11 +249,15 @@ - currentPoolSize.availablePermits(), poolSize)); } } // Invoke listeners. if (tmpListeners != null) { for (final ConnectionEventListener listener : tmpListeners) { listener.handleConnectionClosed(); } } } /** * {@inheritDoc} */ @Override public void close(final UnbindRequest request, final String reason) { close(); @@ -294,17 +325,51 @@ resultHandler); } /** * {@inheritDoc} @Override public void handleConnectionClosed() { /* * The underlying connection was closed by the client. This can only * occur when the pool is being shut down and the underlying * connection is not in use. */ throw new IllegalStateException( "Pooled connection received unexpected close notification"); } @Override public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { final List<ConnectionEventListener> tmpListeners; synchronized (stateLock) { tmpListeners = listeners; this.isDisconnectNotification = isDisconnectNotification; this.error = error; } if (tmpListeners != null) { for (final ConnectionEventListener listener : tmpListeners) { listener.handleConnectionError(isDisconnectNotification, error); } } } @Override public void handleUnsolicitedNotification(final ExtendedResult notification) { final List<ConnectionEventListener> tmpListeners; synchronized (stateLock) { tmpListeners = listeners; } if (tmpListeners != null) { for (final ConnectionEventListener listener : tmpListeners) { listener.handleUnsolicitedNotification(notification); } } } @Override public boolean isClosed() { return isClosed.get(); } /** * {@inheritDoc} */ @Override public boolean isValid() { return connection.isValid() && !isClosed(); @@ -363,15 +428,15 @@ return checkState().readEntryAsync(name, attributeDescriptions, handler); } /** * {@inheritDoc} */ @Override public void removeConnectionEventListener(final ConnectionEventListener listener) { Validator.ensureNotNull(listener); checkState(); synchronized (stateLock) { if (listeners != null) { listeners.remove(listener); } } } @Override public ConnectionEntryReader search(final SearchRequest request) { @@ -430,9 +495,6 @@ return checkState().searchSingleEntryAsync(request, handler); } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -495,19 +557,19 @@ } } // Guarded by queue. private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler(); private final Semaphore currentPoolSize; private final ConnectionFactory factory; // Guarded by queue. private boolean isClosed = false; private final ConnectionFactory factory; private final int poolSize; private final Semaphore currentPoolSize; private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler(); // Guarded by queue. private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); /** * Creates a new connection pool which will maintain {@code poolSize} @@ -564,7 +626,7 @@ public Connection getConnection() throws ErrorResultException { try { return getConnectionAsync(null).get(); } catch (InterruptedException e) { } catch (final InterruptedException e) { throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e); } } opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -28,13 +28,12 @@ package org.forgerock.opendj.ldap; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Fail.fail; import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.forgerock.opendj.ldap.responses.Responses.newBindResult; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -69,9 +68,11 @@ * @throws Exception * If an unexpected error occurred. */ @Test(enabled = false) @Test public void testConnectionEventListenerClose() throws Exception { final ConnectionFactory factory = mockConnectionFactory(mock(Connection.class)); final Connection pooledConnection = mock(Connection.class); when(pooledConnection.isValid()).thenReturn(true); final ConnectionFactory factory = mockConnectionFactory(pooledConnection); final ConnectionPool pool = newFixedConnectionPool(factory, 1); final Connection connection = pool.getConnection(); final ConnectionEventListener listener = mock(ConnectionEventListener.class); @@ -95,27 +96,23 @@ * @throws Exception * If an unexpected error occurred. */ @Test(enabled = false) @Test public void testConnectionEventListenerError() throws Exception { final Connection badConnection = mock(Connection.class); when(badConnection.bind(any(BindRequest.class))).thenThrow( newErrorResult(newBindResult(ResultCode.CLIENT_SIDE_SERVER_DOWN))); final ConnectionFactory factory = mockConnectionFactory(badConnection); final ConnectionPool pool = newFixedConnectionPool(factory, 2); final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>(); final Connection mockConnection = mockConnection(listeners); final ConnectionFactory factory = mockConnectionFactory(mockConnection); final ConnectionPool pool = newFixedConnectionPool(factory, 1); final Connection connection = pool.getConnection(); final ConnectionEventListener listener = mock(ConnectionEventListener.class); connection.addConnectionEventListener(listener); try { connection.bind(Requests.newSimpleBindRequest("cn=test", "password".toCharArray())); fail("Expected connection error"); } catch (final ErrorResultException e) { assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_SERVER_DOWN); } finally { connection.close(); } verify(listener).handleConnectionError(eq(false), any(ConnectionException.class)); verify(listener).handleConnectionClosed(); assertThat(listeners).hasSize(1); listeners.get(0).handleConnectionError(false, newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN)); verify(listener, times(0)).handleConnectionClosed(); verify(listener).handleConnectionError(eq(false), isA(ConnectionException.class)); verify(listener, times(0)).handleUnsolicitedNotification(any(ExtendedResult.class)); connection.close(); assertThat(listeners).hasSize(0); } /** @@ -126,32 +123,10 @@ * @throws Exception * If an unexpected error occurred. */ @Test(enabled = false) @Test public void testConnectionEventListenerUnsolicitedNotification() throws Exception { final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>(); final Connection mockConnection = mock(Connection.class); // Handle listener registration / deregistration in mock connection. doAnswer(new Answer<Void>() { @Override public Void answer(final InvocationOnMock invocation) throws Throwable { final ConnectionEventListener listener = (ConnectionEventListener) invocation.getArguments()[0]; listeners.add(listener); return null; } }).when(mockConnection).addConnectionEventListener(any(ConnectionEventListener.class)); doAnswer(new Answer<Void>() { @Override public Void answer(final InvocationOnMock invocation) throws Throwable { final ConnectionEventListener listener = (ConnectionEventListener) invocation.getArguments()[0]; listeners.remove(listener); return null; } }).when(mockConnection).removeConnectionEventListener(any(ConnectionEventListener.class)); final Connection mockConnection = mockConnection(listeners); final ConnectionFactory factory = mockConnectionFactory(mockConnection); final ConnectionPool pool = newFixedConnectionPool(factory, 1); final Connection connection = pool.getConnection(); @@ -391,6 +366,33 @@ pool.close(); } private Connection mockConnection(final List<ConnectionEventListener> listeners) { final Connection mockConnection = mock(Connection.class); // Handle listener registration / deregistration in mock connection. doAnswer(new Answer<Void>() { @Override public Void answer(final InvocationOnMock invocation) throws Throwable { final ConnectionEventListener listener = (ConnectionEventListener) invocation.getArguments()[0]; listeners.add(listener); return null; } }).when(mockConnection).addConnectionEventListener(any(ConnectionEventListener.class)); doAnswer(new Answer<Void>() { @Override public Void answer(final InvocationOnMock invocation) throws Throwable { final ConnectionEventListener listener = (ConnectionEventListener) invocation.getArguments()[0]; listeners.remove(listener); return null; } }).when(mockConnection).removeConnectionEventListener(any(ConnectionEventListener.class)); return mockConnection; } @SuppressWarnings("unchecked") private ConnectionFactory mockConnectionFactory(final Connection first, final Connection... remaining) throws ErrorResultException {