mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
01.35.2012 398a2eaf655e77b39fe71b644d95802c30749e0e
Fix OPENDJ-593: FixedConnectionPool connection event listeners are never notified

+ re-enable and fix unit tests
+ use thread-safe list in LDAP connection implementation since updates may still occur during iteration.
3 files modified
230 ■■■■■ changed files
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java 4 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java 136 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java 90 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/com/forgerock/opendj/ldap/LDAPConnection.java
@@ -31,9 +31,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -201,7 +201,7 @@
            notifyErrorOccurred = isFailed;
            if (!isClosed) {
                if (listeners == null) {
                    listeners = new LinkedList<ConnectionEventListener>();
                    listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
                }
                listeners.add(listener);
            }
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -122,17 +122,17 @@
     * the client application closes this connection. More specifically, pooled
     * connections are not actually stored in the internal queue.
     */
    private final class PooledConnection implements Connection {
        // Connection event listeners registered against this pooled connection
        // should have the same life time as the pooled connection.
        private final List<ConnectionEventListener> listeners =
                new CopyOnWriteArrayList<ConnectionEventListener>();
    private final class PooledConnection implements Connection, ConnectionEventListener {
        private final Connection connection;
        private ErrorResultException error = null;
        private final AtomicBoolean isClosed = new AtomicBoolean(false);
        private boolean isDisconnectNotification = false;
        private List<ConnectionEventListener> listeners = null;
        // Guarded by stateLock.
        private final Object stateLock = new Object();
        PooledConnection(final Connection connection) {
        private PooledConnection(final Connection connection) {
            this.connection = connection;
        }
@@ -163,14 +163,34 @@
            return checkState().addAsync(request, intermediateResponseHandler, resultHandler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void addConnectionEventListener(final ConnectionEventListener listener) {
            Validator.ensureNotNull(listener);
            checkState();
            final boolean notifyClose;
            final boolean notifyErrorOccurred;
            synchronized (stateLock) {
                notifyClose = isClosed.get();
                notifyErrorOccurred = error != null;
                if (!notifyClose) {
                    if (listeners == null) {
                        // Create and register first listener. If an error has
                        // already occurred on the underlying connection, then
                        // the listener may be immediately invoked so ensure
                        // that it is already in the list.
                        listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
            listeners.add(listener);
                        connection.addConnectionEventListener(this);
                    } else {
                        listeners.add(listener);
                    }
                }
            }
            if (notifyErrorOccurred) {
                listener.handleConnectionError(isDisconnectNotification, error);
            }
            if (notifyClose) {
                listener.handleConnectionClosed();
            }
        }
        @Override
@@ -191,15 +211,22 @@
            return checkState().bindAsync(request, intermediateResponseHandler, resultHandler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void close() {
            final List<ConnectionEventListener> tmpListeners;
            synchronized (stateLock) {
            if (!isClosed.compareAndSet(false, true)) {
                // Already closed.
                return;
            }
                tmpListeners = listeners;
            }
            // Remove underlying listener if needed and do this before
            // subsequent connection events may occur.
            if (tmpListeners != null) {
                connection.removeConnectionEventListener(this);
            }
            // Don't put invalid connections back in the pool.
            if (connection.isValid()) {
@@ -222,11 +249,15 @@
                            - currentPoolSize.availablePermits(), poolSize));
                }
            }
            // Invoke listeners.
            if (tmpListeners != null) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    listener.handleConnectionClosed();
                }
            }
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void close(final UnbindRequest request, final String reason) {
            close();
@@ -294,17 +325,51 @@
                    resultHandler);
        }
        /**
         * {@inheritDoc}
        @Override
        public void handleConnectionClosed() {
            /*
             * The underlying connection was closed by the client. This can only
             * occur when the pool is being shut down and the underlying
             * connection is not in use.
         */
            throw new IllegalStateException(
                    "Pooled connection received unexpected close notification");
        }
        @Override
        public void handleConnectionError(final boolean isDisconnectNotification,
                final ErrorResultException error) {
            final List<ConnectionEventListener> tmpListeners;
            synchronized (stateLock) {
                tmpListeners = listeners;
                this.isDisconnectNotification = isDisconnectNotification;
                this.error = error;
            }
            if (tmpListeners != null) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    listener.handleConnectionError(isDisconnectNotification, error);
                }
            }
        }
        @Override
        public void handleUnsolicitedNotification(final ExtendedResult notification) {
            final List<ConnectionEventListener> tmpListeners;
            synchronized (stateLock) {
                tmpListeners = listeners;
            }
            if (tmpListeners != null) {
                for (final ConnectionEventListener listener : tmpListeners) {
                    listener.handleUnsolicitedNotification(notification);
                }
            }
        }
        @Override
        public boolean isClosed() {
            return isClosed.get();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public boolean isValid() {
            return connection.isValid() && !isClosed();
@@ -363,15 +428,15 @@
            return checkState().readEntryAsync(name, attributeDescriptions, handler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void removeConnectionEventListener(final ConnectionEventListener listener) {
            Validator.ensureNotNull(listener);
            checkState();
            synchronized (stateLock) {
                if (listeners != null) {
            listeners.remove(listener);
        }
            }
        }
        @Override
        public ConnectionEntryReader search(final SearchRequest request) {
@@ -430,9 +495,6 @@
            return checkState().searchSingleEntryAsync(request, handler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public String toString() {
            final StringBuilder builder = new StringBuilder();
@@ -495,19 +557,19 @@
        }
    }
    // Guarded by queue.
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
    private final Semaphore currentPoolSize;
    private final ConnectionFactory factory;
    // Guarded by queue.
    private boolean isClosed = false;
    private final ConnectionFactory factory;
    private final int poolSize;
    private final Semaphore currentPoolSize;
    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
    // Guarded by queue.
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    /**
     * Creates a new connection pool which will maintain {@code poolSize}
@@ -564,7 +626,7 @@
    public Connection getConnection() throws ErrorResultException {
        try {
            return getConnectionAsync(null).get();
        } catch (InterruptedException e) {
        } catch (final InterruptedException e) {
            throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
        }
    }
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -28,13 +28,12 @@
package org.forgerock.opendj.ldap;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.responses.Responses.newBindResult;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -69,9 +68,11 @@
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test(enabled = false)
    @Test
    public void testConnectionEventListenerClose() throws Exception {
        final ConnectionFactory factory = mockConnectionFactory(mock(Connection.class));
        final Connection pooledConnection = mock(Connection.class);
        when(pooledConnection.isValid()).thenReturn(true);
        final ConnectionFactory factory = mockConnectionFactory(pooledConnection);
        final ConnectionPool pool = newFixedConnectionPool(factory, 1);
        final Connection connection = pool.getConnection();
        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
@@ -95,27 +96,23 @@
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test(enabled = false)
    @Test
    public void testConnectionEventListenerError() throws Exception {
        final Connection badConnection = mock(Connection.class);
        when(badConnection.bind(any(BindRequest.class))).thenThrow(
                newErrorResult(newBindResult(ResultCode.CLIENT_SIDE_SERVER_DOWN)));
        final ConnectionFactory factory = mockConnectionFactory(badConnection);
        final ConnectionPool pool = newFixedConnectionPool(factory, 2);
        final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>();
        final Connection mockConnection = mockConnection(listeners);
        final ConnectionFactory factory = mockConnectionFactory(mockConnection);
        final ConnectionPool pool = newFixedConnectionPool(factory, 1);
        final Connection connection = pool.getConnection();
        final ConnectionEventListener listener = mock(ConnectionEventListener.class);
        connection.addConnectionEventListener(listener);
        try {
            connection.bind(Requests.newSimpleBindRequest("cn=test", "password".toCharArray()));
            fail("Expected connection error");
        } catch (final ErrorResultException e) {
            assertThat(e.getResult().getResultCode()).isEqualTo(ResultCode.CLIENT_SIDE_SERVER_DOWN);
        } finally {
            connection.close();
        }
        verify(listener).handleConnectionError(eq(false), any(ConnectionException.class));
        verify(listener).handleConnectionClosed();
        assertThat(listeners).hasSize(1);
        listeners.get(0).handleConnectionError(false,
                newErrorResult(ResultCode.CLIENT_SIDE_SERVER_DOWN));
        verify(listener, times(0)).handleConnectionClosed();
        verify(listener).handleConnectionError(eq(false), isA(ConnectionException.class));
        verify(listener, times(0)).handleUnsolicitedNotification(any(ExtendedResult.class));
        connection.close();
        assertThat(listeners).hasSize(0);
    }
    /**
@@ -126,32 +123,10 @@
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test(enabled = false)
    @Test
    public void testConnectionEventListenerUnsolicitedNotification() throws Exception {
        final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>();
        final Connection mockConnection = mock(Connection.class);
        // Handle listener registration / deregistration in mock connection.
        doAnswer(new Answer<Void>() {
            @Override
            public Void answer(final InvocationOnMock invocation) throws Throwable {
                final ConnectionEventListener listener =
                        (ConnectionEventListener) invocation.getArguments()[0];
                listeners.add(listener);
                return null;
            }
        }).when(mockConnection).addConnectionEventListener(any(ConnectionEventListener.class));
        doAnswer(new Answer<Void>() {
            @Override
            public Void answer(final InvocationOnMock invocation) throws Throwable {
                final ConnectionEventListener listener =
                        (ConnectionEventListener) invocation.getArguments()[0];
                listeners.remove(listener);
                return null;
            }
        }).when(mockConnection).removeConnectionEventListener(any(ConnectionEventListener.class));
        final Connection mockConnection = mockConnection(listeners);
        final ConnectionFactory factory = mockConnectionFactory(mockConnection);
        final ConnectionPool pool = newFixedConnectionPool(factory, 1);
        final Connection connection = pool.getConnection();
@@ -391,6 +366,33 @@
        pool.close();
    }
    private Connection mockConnection(final List<ConnectionEventListener> listeners) {
        final Connection mockConnection = mock(Connection.class);
        // Handle listener registration / deregistration in mock connection.
        doAnswer(new Answer<Void>() {
            @Override
            public Void answer(final InvocationOnMock invocation) throws Throwable {
                final ConnectionEventListener listener =
                        (ConnectionEventListener) invocation.getArguments()[0];
                listeners.add(listener);
                return null;
            }
        }).when(mockConnection).addConnectionEventListener(any(ConnectionEventListener.class));
        doAnswer(new Answer<Void>() {
            @Override
            public Void answer(final InvocationOnMock invocation) throws Throwable {
                final ConnectionEventListener listener =
                        (ConnectionEventListener) invocation.getArguments()[0];
                listeners.remove(listener);
                return null;
            }
        }).when(mockConnection).removeConnectionEventListener(any(ConnectionEventListener.class));
        return mockConnection;
    }
    @SuppressWarnings("unchecked")
    private ConnectionFactory mockConnectionFactory(final Connection first,
            final Connection... remaining) throws ErrorResultException {