From a2df03777d64a1cc2face1011a70effb0b98f133 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Sat, 22 Feb 2014 22:07:30 +0000
Subject: [PATCH] Fix OPENDJ-1348: Various connection pool implementations do not recover if the target server is powered off and restarted
---
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java | 109 ++++++++++++++++++++++++++++++++++--------------------
1 files changed, 69 insertions(+), 40 deletions(-)
diff --git a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index 86d41c1..7dc1c39 100644
--- a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
+++ b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.forgerock.opendj.ldap;
@@ -44,6 +44,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
@@ -88,6 +89,7 @@
@Override
public void handleErrorResult(final ErrorResultException error) {
// Connection attempt failed, so decrease the pool size.
+ pendingConnectionAttempts.decrementAndGet();
availableConnections.release();
if (DEBUG_LOG.isLoggable(Level.FINE)) {
@@ -96,18 +98,27 @@
error.getMessage(), currentPoolSize(), maxPoolSize));
}
- QueueElement holder;
+ /*
+ * There may be many pending futures waiting for a connection
+ * attempt to succeed. In some situations the number of pending
+ * futures may exceed the pool size and the number of outstanding
+ * connection attempts. If only one pending future is resolved per
+ * failed connection attempt then some pending futures will be left
+ * unresolved. Therefore, a failed connection attempt must fail all
+ * pending futures, even if some of the subsequent connection
+ * attempts succeed, which is unlikely (if one fails, then they are
+ * all likely to fail).
+ */
+ final List<QueueElement> waitingFutures =
+ new LinkedList<CachedConnectionPool.QueueElement>();
synchronized (queue) {
- if (hasWaitingFutures()) {
- holder = queue.removeFirst();
- } else {
- // No waiting futures.
- return;
+ while (hasWaitingFutures()) {
+ waitingFutures.add(queue.removeFirst());
}
}
-
- // There was waiting future, so close it.
- holder.getWaitingFuture().handleErrorResult(error);
+ for (QueueElement waitingFuture : waitingFutures) {
+ waitingFuture.getWaitingFuture().handleErrorResult(error);
+ }
}
@Override
@@ -117,6 +128,7 @@
"Connection attempt succeeded: availableConnections=%d, maxPoolSize=%d",
currentPoolSize(), maxPoolSize));
}
+ pendingConnectionAttempts.decrementAndGet();
publishConnection(connection);
}
}
@@ -260,6 +272,7 @@
* availableConnections.
*/
connection.close();
+ pendingConnectionAttempts.incrementAndGet();
factory.getConnectionAsync(connectionResultHandler);
if (DEBUG_LOG.isLoggable(Level.FINE)) {
@@ -668,7 +681,6 @@
private final Semaphore availableConnections;
private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
private final int corePoolSize;
-
private final ConnectionFactory factory;
private boolean isClosed = false;
private final ScheduledFuture<?> idleTimeoutFuture;
@@ -677,6 +689,12 @@
private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
+ /**
+ * The number of new connections which are in the process of being
+ * established.
+ */
+ private final AtomicInteger pendingConnectionAttempts = new AtomicInteger();
+
CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
final ScheduledExecutorService scheduler) {
@@ -776,47 +794,58 @@
}
}
- if (!holder.isWaitingFuture()) {
- // There was a completed connection attempt.
- final Connection connection = holder.getWaitingConnection();
- if (connection.isValid()) {
- final PooledConnection pooledConnection =
- newPooledConnection(connection, getStackTraceIfDebugEnabled());
- if (handler != null) {
- handler.handleResult(pooledConnection);
- }
- return new CompletedFutureResult<Connection>(pooledConnection);
- } else {
- // Close the stale connection and try again.
- connection.close();
- availableConnections.release();
-
- if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format(
- "Connection no longer valid: availableConnections=%d, poolSize=%d",
- currentPoolSize(), maxPoolSize));
- }
- }
- } else {
+ if (holder.isWaitingFuture()) {
// Grow the pool if needed.
final FutureResult<Connection> future = holder.getWaitingFuture();
if (!future.isDone() && availableConnections.tryAcquire()) {
+ pendingConnectionAttempts.incrementAndGet();
factory.getConnectionAsync(connectionResultHandler);
}
return future;
}
+
+ // There was a completed connection attempt.
+ final Connection connection = holder.getWaitingConnection();
+ if (connection.isValid()) {
+ final PooledConnection pooledConnection =
+ newPooledConnection(connection, getStackTraceIfDebugEnabled());
+ if (handler != null) {
+ handler.handleResult(pooledConnection);
+ }
+ return new CompletedFutureResult<Connection>(pooledConnection);
+ } else {
+ // Close the stale connection and try again.
+ connection.close();
+ availableConnections.release();
+
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format(
+ "Connection no longer valid: availableConnections=%d, poolSize=%d",
+ currentPoolSize(), maxPoolSize));
+ }
+ }
}
}
@Override
public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("CachedConnectionPool(");
- builder.append(String.valueOf(factory));
- builder.append(',');
- builder.append(maxPoolSize);
- builder.append(')');
- return builder.toString();
+ final int size = currentPoolSize();
+ final int pending = pendingConnectionAttempts.get();
+ int in = 0;
+ int blocked = 0;
+ synchronized (queue) {
+ for (QueueElement qe : queue) {
+ if (qe.isWaitingFuture()) {
+ blocked++;
+ } else {
+ in++;
+ }
+ }
+ }
+ final int out = size - in - pending;
+ return String.format("CachedConnectionPool(size=%d[in:%d + out:%d + "
+ + "pending:%d], maxSize=%d, blocked=%d, factory=%s)", size, in, out, pending,
+ maxPoolSize, blocked, String.valueOf(factory));
}
/**
--
Gitblit v1.10.0