From 398a2eaf655e77b39fe71b644d95802c30749e0e Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 01 Oct 2012 21:35:14 +0000
Subject: [PATCH] Fix OPENDJ-593: FixedConnectionPool connection event listeners are never notified
---
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java | 148 +++++++++++++++++++++++++++++++++++--------------
1 files changed, 105 insertions(+), 43 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
index 1cde3f2..296789f 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -122,17 +122,17 @@
* the client application closes this connection. More specifically, pooled
* connections are not actually stored in the internal queue.
*/
- private final class PooledConnection implements Connection {
- // Connection event listeners registered against this pooled connection
- // should have the same life time as the pooled connection.
- private final List<ConnectionEventListener> listeners =
- new CopyOnWriteArrayList<ConnectionEventListener>();
-
+ private final class PooledConnection implements Connection, ConnectionEventListener {
private final Connection connection;
+ private ErrorResultException error = null;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private boolean isDisconnectNotification = false;
+ private List<ConnectionEventListener> listeners = null;
+ // Guarded by stateLock.
+ private final Object stateLock = new Object();
- PooledConnection(final Connection connection) {
+ private PooledConnection(final Connection connection) {
this.connection = connection;
}
@@ -163,14 +163,34 @@
return checkState().addAsync(request, intermediateResponseHandler, resultHandler);
}
- /**
- * {@inheritDoc}
- */
@Override
public void addConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
- checkState();
- listeners.add(listener);
+ final boolean notifyClose;
+ final boolean notifyErrorOccurred;
+ synchronized (stateLock) {
+ notifyClose = isClosed.get();
+ notifyErrorOccurred = error != null;
+ if (!notifyClose) {
+ if (listeners == null) {
+ // Create and register first listener. If an error has
+ // already occurred on the underlying connection, then
+ // the listener may be immediately invoked so ensure
+ // that it is already in the list.
+ listeners = new CopyOnWriteArrayList<ConnectionEventListener>();
+ listeners.add(listener);
+ connection.addConnectionEventListener(this);
+ } else {
+ listeners.add(listener);
+ }
+ }
+ }
+ if (notifyErrorOccurred) {
+ listener.handleConnectionError(isDisconnectNotification, error);
+ }
+ if (notifyClose) {
+ listener.handleConnectionClosed();
+ }
}
@Override
@@ -191,14 +211,21 @@
return checkState().bindAsync(request, intermediateResponseHandler, resultHandler);
}
- /**
- * {@inheritDoc}
- */
@Override
public void close() {
- if (!isClosed.compareAndSet(false, true)) {
- // Already closed.
- return;
+ final List<ConnectionEventListener> tmpListeners;
+ synchronized (stateLock) {
+ if (!isClosed.compareAndSet(false, true)) {
+ // Already closed.
+ return;
+ }
+ tmpListeners = listeners;
+ }
+
+ // Remove underlying listener if needed and do this before
+ // subsequent connection events may occur.
+ if (tmpListeners != null) {
+ connection.removeConnectionEventListener(this);
}
// Don't put invalid connections back in the pool.
@@ -222,11 +249,15 @@
- currentPoolSize.availablePermits(), poolSize));
}
}
+
+ // Invoke listeners.
+ if (tmpListeners != null) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleConnectionClosed();
+ }
+ }
}
- /**
- * {@inheritDoc}
- */
@Override
public void close(final UnbindRequest request, final String reason) {
close();
@@ -294,17 +325,51 @@
resultHandler);
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void handleConnectionClosed() {
+ /*
+ * The underlying connection was closed by the client. This can only
+ * occur when the pool is being shut down and the underlying
+ * connection is not in use.
+ */
+ throw new IllegalStateException(
+ "Pooled connection received unexpected close notification");
+ }
+
+ @Override
+ public void handleConnectionError(final boolean isDisconnectNotification,
+ final ErrorResultException error) {
+ final List<ConnectionEventListener> tmpListeners;
+ synchronized (stateLock) {
+ tmpListeners = listeners;
+ this.isDisconnectNotification = isDisconnectNotification;
+ this.error = error;
+ }
+ if (tmpListeners != null) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleConnectionError(isDisconnectNotification, error);
+ }
+ }
+ }
+
+ @Override
+ public void handleUnsolicitedNotification(final ExtendedResult notification) {
+ final List<ConnectionEventListener> tmpListeners;
+ synchronized (stateLock) {
+ tmpListeners = listeners;
+ }
+ if (tmpListeners != null) {
+ for (final ConnectionEventListener listener : tmpListeners) {
+ listener.handleUnsolicitedNotification(notification);
+ }
+ }
+ }
+
@Override
public boolean isClosed() {
return isClosed.get();
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean isValid() {
return connection.isValid() && !isClosed();
@@ -363,14 +428,14 @@
return checkState().readEntryAsync(name, attributeDescriptions, handler);
}
- /**
- * {@inheritDoc}
- */
@Override
public void removeConnectionEventListener(final ConnectionEventListener listener) {
Validator.ensureNotNull(listener);
- checkState();
- listeners.remove(listener);
+ synchronized (stateLock) {
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ }
}
@Override
@@ -430,9 +495,6 @@
return checkState().searchSingleEntryAsync(request, handler);
}
- /**
- * {@inheritDoc}
- */
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -495,19 +557,19 @@
}
}
- // Guarded by queue.
- private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+ private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
+
+ private final Semaphore currentPoolSize;
+
+ private final ConnectionFactory factory;
// Guarded by queue.
private boolean isClosed = false;
- private final ConnectionFactory factory;
-
private final int poolSize;
- private final Semaphore currentPoolSize;
-
- private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
+ // Guarded by queue.
+ private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
/**
* Creates a new connection pool which will maintain {@code poolSize}
@@ -564,7 +626,7 @@
public Connection getConnection() throws ErrorResultException {
try {
return getConnectionAsync(null).get();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw newErrorResult(ResultCode.CLIENT_SIDE_USER_CANCELLED, e);
}
}
--
Gitblit v1.10.0