From 398a2eaf655e77b39fe71b644d95802c30749e0e Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 01 Oct 2012 21:35:14 +0000
Subject: [PATCH] Fix OPENDJ-593: FixedConnectionPool connection event listeners are never notified

---
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java |  148 +++++++++++++++++++++++++++++++++++--------------
 1 files changed, 105 insertions(+), 43 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
index 1cde3f2..296789f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -122,17 +122,17 @@
      * the client application closes this connection. More specifically, pooled
      * connections are not actually stored in the internal queue.
      */
-    private final class PooledConnection implements Connection {
-        // Connection event listeners registered against this pooled connection
-        // should have the same life time as the pooled connection.
-        private final List<ConnectionEventListener> listeners =
-                new CopyOnWriteArrayList<ConnectionEventListener>();
-
+    private final class PooledConnection implements Connection, ConnectionEventListener {
         private final Connection connection;
+        private ErrorResultException error = null;
 
         private final AtomicBoolean isClosed = new AtomicBoolean(false);
+        private boolean isDisconnectNotification = false;
+        private List<ConnectionEventListener> listeners = null;
+        // Guarded by stateLock.
+        private final Object stateLock = new Object();
 
-        PooledConnection(final Connection connection) {
+        private PooledConnection(final Connection connection) {
             this.connection = connection;
         }
 
@@ -163,14 +163,34 @@
             return checkState().addAsync(request, intermediateResponseHandler, resultHandler);
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public void addConnectionEventListener(final ConnectionEventListener listener) {
             Validator.ensureNotNull(listener);
-            checkState();
-            listeners.add(listener);
+            final boolean notifyClose;
+            final boolean notifyErrorOccurred;
+            synchronized (stateLock) {
+                notifyClose = isClosed.get();
+                notifyErrorOccurred = error != null;
+                if (!notifyClose) {
+                    if (listeners == null) {
+                        // Create and register first listener. If an error has
+                        // already occurred on the underlying connection, then
+                        // the listener may be immediately invoked so ensure
+                        // that it is already in the list.
+                        listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
+                        listeners.add(listener);
+                        connection.addConnectionEventListener(this);
+                    } else {
+                        listeners.add(listener);
+                    }
+                }
+            }
+            if (notifyErrorOccurred) {
+                listener.handleConnectionError(isDisconnectNotification, error);
+            }
+            if (notifyClose) {
+                listener.handleConnectionClosed();
+            }
         }
 
         @Override
@@ -191,14 +211,21 @@
             return checkState().bindAsync(request, intermediateResponseHandler, resultHandler);
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public void close() {
-            if (!isClosed.compareAndSet(false, true)) {
-                // Already closed.
-                return;
+            final List<ConnectionEventListener> tmpListeners;
+            synchronized (stateLock) {
+                if (!isClosed.compareAndSet(false, true)) {
+                    // Already closed.
+                    return;
+                }
+                tmpListeners = listeners;
+            }
+
+            // Remove underlying listener if needed and do this before
+            // subsequent connection events may occur.
+            if (tmpListeners != null) {
+                connection.removeConnectionEventListener(this);
             }
 
             // Don't put invalid connections back in the pool.
@@ -222,11 +249,15 @@
                             - currentPoolSize.availablePermits(), poolSize));
                 }
             }
+
+            // Invoke listeners.
+            if (tmpListeners != null) {
+                for (final ConnectionEventListener listener : tmpListeners) {
+                    listener.handleConnectionClosed();
+                }
+            }
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public void close(final UnbindRequest request, final String reason) {
             close();
@@ -294,17 +325,51 @@
                     resultHandler);
         }
 
-        /**
-         * {@inheritDoc}
-         */
+        @Override
+        public void handleConnectionClosed() {
+            /*
+             * The underlying connection was closed by the client. This can only
+             * occur when the pool is being shut down and the underlying
+             * connection is not in use.
+             */
+            throw new IllegalStateException(
+                    "Pooled connection received unexpected close notification");
+        }
+
+        @Override
+        public void handleConnectionError(final boolean isDisconnectNotification,
+                final ErrorResultException error) {
+            final List<ConnectionEventListener> tmpListeners;
+            synchronized (stateLock) {
+                tmpListeners = listeners;
+                this.isDisconnectNotification = isDisconnectNotification;
+                this.error = error;
+            }
+            if (tmpListeners != null) {
+                for (final ConnectionEventListener listener : tmpListeners) {
+                    listener.handleConnectionError(isDisconnectNotification, error);
+                }
+            }
+        }
+
+        @Override
+        public void handleUnsolicitedNotification(final ExtendedResult notification) {
+            final List<ConnectionEventListener> tmpListeners;
+            synchronized (stateLock) {
+                tmpListeners = listeners;
+            }
+            if (tmpListeners != null) {
+                for (final ConnectionEventListener listener : tmpListeners) {
+                    listener.handleUnsolicitedNotification(notification);
+                }
+            }
+        }
+
         @Override
         public boolean isClosed() {
             return isClosed.get();
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public boolean isValid() {
             return connection.isValid() && !isClosed();
@@ -363,14 +428,14 @@
             return checkState().readEntryAsync(name, attributeDescriptions, handler);
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public void removeConnectionEventListener(final ConnectionEventListener listener) {
             Validator.ensureNotNull(listener);
-            checkState();
-            listeners.remove(listener);
+            synchronized (stateLock) {
+                if (listeners != null) {
+                    listeners.remove(listener);
+                }
+            }
         }
 
         @Override
@@ -430,9 +495,6 @@
             return checkState().searchSingleEntryAsync(request, handler);
         }
 
-        /**
-         * {@inheritDoc}
-         */
         @Override
         public String toString() {
             final StringBuilder builder = new StringBuilder();
@@ -495,19 +557,19 @@
         }
     }
 
-    // Guarded by queue.
-    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
+
+    private final Semaphore currentPoolSize;
+
+    private final ConnectionFactory factory;
 
     // Guarded by queue.
     private boolean isClosed = false;
 
-    private final ConnectionFactory factory;
-
     private final int poolSize;
 
-    private final Semaphore currentPoolSize;
-
-    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
+    // Guarded by queue.
+    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
 
     /**
      * Creates a new connection pool which will maintain {@code poolSize}
@@ -564,7 +626,7 @@
     public Connection getConnection() throws ErrorResultException {
         try {
             return getConnectionAsync(null).get();
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
         }
     }

--
Gitblit v1.10.0