From a2df03777d64a1cc2face1011a70effb0b98f133 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Sat, 22 Feb 2014 22:07:30 +0000
Subject: [PATCH] Fix OPENDJ-1348: Various connection pool implementations do not recover if the target server is powered off and restarted

---
 opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java |  109 ++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 69 insertions(+), 40 deletions(-)

diff --git a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index 86d41c1..7dc1c39 100644
--- a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
+++ b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS
+ *      Portions copyright 2011-2014 ForgeRock AS
  */
 
 package org.forgerock.opendj.ldap;
@@ -44,6 +44,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 
 import org.forgerock.opendj.ldap.requests.AbandonRequest;
@@ -88,6 +89,7 @@
         @Override
         public void handleErrorResult(final ErrorResultException error) {
             // Connection attempt failed, so decrease the pool size.
+            pendingConnectionAttempts.decrementAndGet();
             availableConnections.release();
 
             if (DEBUG_LOG.isLoggable(Level.FINE)) {
@@ -96,18 +98,27 @@
                         error.getMessage(), currentPoolSize(), maxPoolSize));
             }
 
-            QueueElement holder;
+            /*
+             * There may be many pending futures waiting for a connection
+             * attempt to succeed. In some situations the number of pending
+             * futures may exceed the pool size and the number of outstanding
+             * connection attempts. If only one pending future is resolved per
+             * failed connection attempt then some pending futures will be left
+             * unresolved. Therefore, a failed connection attempt must fail all
+             * pending futures, even if some of the subsequent connection
+             * attempts succeed, which is unlikely (if one fails, then they are
+             * all likely to fail).
+             */
+            final List<QueueElement> waitingFutures =
+                    new LinkedList<CachedConnectionPool.QueueElement>();
             synchronized (queue) {
-                if (hasWaitingFutures()) {
-                    holder = queue.removeFirst();
-                } else {
-                    // No waiting futures.
-                    return;
+                while (hasWaitingFutures()) {
+                    waitingFutures.add(queue.removeFirst());
                 }
             }
-
-            // There was waiting future, so close it.
-            holder.getWaitingFuture().handleErrorResult(error);
+            for (QueueElement waitingFuture : waitingFutures) {
+                waitingFuture.getWaitingFuture().handleErrorResult(error);
+            }
         }
 
         @Override
@@ -117,6 +128,7 @@
                         "Connection attempt succeeded:  availableConnections=%d, maxPoolSize=%d",
                         currentPoolSize(), maxPoolSize));
             }
+            pendingConnectionAttempts.decrementAndGet();
             publishConnection(connection);
         }
     }
@@ -260,6 +272,7 @@
                  * availableConnections.
                  */
                 connection.close();
+                pendingConnectionAttempts.incrementAndGet();
                 factory.getConnectionAsync(connectionResultHandler);
 
                 if (DEBUG_LOG.isLoggable(Level.FINE)) {
@@ -668,7 +681,6 @@
     private final Semaphore availableConnections;
     private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
     private final int corePoolSize;
-
     private final ConnectionFactory factory;
     private boolean isClosed = false;
     private final ScheduledFuture<?> idleTimeoutFuture;
@@ -677,6 +689,12 @@
     private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
     private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
 
+    /**
+     * The number of new connections which are in the process of being
+     * established.
+     */
+    private final AtomicInteger pendingConnectionAttempts = new AtomicInteger();
+
     CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
             final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
             final ScheduledExecutorService scheduler) {
@@ -776,47 +794,58 @@
                 }
             }
 
-            if (!holder.isWaitingFuture()) {
-                // There was a completed connection attempt.
-                final Connection connection = holder.getWaitingConnection();
-                if (connection.isValid()) {
-                    final PooledConnection pooledConnection =
-                            newPooledConnection(connection, getStackTraceIfDebugEnabled());
-                    if (handler != null) {
-                        handler.handleResult(pooledConnection);
-                    }
-                    return new CompletedFutureResult<Connection>(pooledConnection);
-                } else {
-                    // Close the stale connection and try again.
-                    connection.close();
-                    availableConnections.release();
-
-                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
-                        DEBUG_LOG.fine(String.format(
-                                "Connection no longer valid: availableConnections=%d, poolSize=%d",
-                                currentPoolSize(), maxPoolSize));
-                    }
-                }
-            } else {
+            if (holder.isWaitingFuture()) {
                 // Grow the pool if needed.
                 final FutureResult<Connection> future = holder.getWaitingFuture();
                 if (!future.isDone() && availableConnections.tryAcquire()) {
+                    pendingConnectionAttempts.incrementAndGet();
                     factory.getConnectionAsync(connectionResultHandler);
                 }
                 return future;
             }
+
+            // There was a completed connection attempt.
+            final Connection connection = holder.getWaitingConnection();
+            if (connection.isValid()) {
+                final PooledConnection pooledConnection =
+                        newPooledConnection(connection, getStackTraceIfDebugEnabled());
+                if (handler != null) {
+                    handler.handleResult(pooledConnection);
+                }
+                return new CompletedFutureResult<Connection>(pooledConnection);
+            } else {
+                // Close the stale connection and try again.
+                connection.close();
+                availableConnections.release();
+
+                if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                    DEBUG_LOG.fine(String.format(
+                            "Connection no longer valid: availableConnections=%d, poolSize=%d",
+                            currentPoolSize(), maxPoolSize));
+                }
+            }
         }
     }
 
     @Override
     public String toString() {
-        final StringBuilder builder = new StringBuilder();
-        builder.append("CachedConnectionPool(");
-        builder.append(String.valueOf(factory));
-        builder.append(',');
-        builder.append(maxPoolSize);
-        builder.append(')');
-        return builder.toString();
+        final int size = currentPoolSize();
+        final int pending = pendingConnectionAttempts.get();
+        int in = 0;
+        int blocked = 0;
+        synchronized (queue) {
+            for (QueueElement qe : queue) {
+                if (qe.isWaitingFuture()) {
+                    blocked++;
+                } else {
+                    in++;
+                }
+            }
+        }
+        final int out = size - in - pending;
+        return String.format("CachedConnectionPool(size=%d[in:%d + out:%d + "
+                + "pending:%d], maxSize=%d, blocked=%d, factory=%s)", size, in, out, pending,
+                maxPoolSize, blocked, String.valueOf(factory));
     }
 
     /**

--
Gitblit v1.10.0