From b33a21892a2d6c90394b9d1e9d537dc82c679b60 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 24 Feb 2014 14:02:53 +0000
Subject: [PATCH] Fix OPENDJ-1348: Various connection pool implementations do not recover if the target server is powered off and restarted

---
 opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java             |   50 ++++++++-
 opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java |   70 ++++++++++++-
 opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java         |   26 +++-
 opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java     |    3 
 opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java               |  106 +++++++++++++-------
 5 files changed, 191 insertions(+), 64 deletions(-)

diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
index 3eb841e..c65ad4f 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -135,10 +135,9 @@
          */
         @Override
         public void handleResult(final Connection connection) {
-            notifyOnline();
-
             // The connection is not going to be used, so close it immediately.
             connection.close();
+            notifyOnline();
         }
 
         @Override
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index c09ac6f..769b8cf 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -42,6 +42,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -87,23 +88,34 @@
         @Override
         public void handleErrorResult(final ErrorResultException error) {
             // Connection attempt failed, so decrease the pool size.
+            pendingConnectionAttempts.decrementAndGet();
             availableConnections.release();
 
-            logger.debug(LocalizableMessage.raw("Connection attempt failed: availableConnections=%d, maxPoolSize=%d",
+            logger.debug(LocalizableMessage.raw(
+                    "Connection attempt failed: availableConnections=%d, maxPoolSize=%d",
                     currentPoolSize(), maxPoolSize, error));
 
-            QueueElement holder;
+            /*
+             * There may be many pending futures waiting for a connection
+             * attempt to succeed. In some situations the number of pending
+             * futures may exceed the pool size and the number of outstanding
+             * connection attempts. If only one pending future is resolved per
+             * failed connection attempt then some pending futures will be left
+             * unresolved. Therefore, a failed connection attempt must fail all
+             * pending futures, even if some of the subsequent connection
+             * attempts succeed, which is unlikely (if one fails, then they are
+             * all likely to fail).
+             */
+            final List<QueueElement> waitingFutures =
+                    new LinkedList<CachedConnectionPool.QueueElement>();
             synchronized (queue) {
-                if (hasWaitingFutures()) {
-                    holder = queue.removeFirst();
-                } else {
-                    // No waiting futures.
-                    return;
+                while (hasWaitingFutures()) {
+                    waitingFutures.add(queue.removeFirst());
                 }
             }
-
-            // There was waiting future, so close it.
-            holder.getWaitingFuture().handleErrorResult(error);
+            for (QueueElement waitingFuture : waitingFutures) {
+                waitingFuture.getWaitingFuture().handleErrorResult(error);
+            }
         }
 
         @Override
@@ -111,6 +123,7 @@
             logger.debug(LocalizableMessage.raw(
                     "Connection attempt succeeded:  availableConnections=%d, maxPoolSize=%d",
                     currentPoolSize(), maxPoolSize));
+            pendingConnectionAttempts.decrementAndGet();
             publishConnection(connection);
         }
     }
@@ -254,6 +267,7 @@
                  * availableConnections.
                  */
                 connection.close();
+                pendingConnectionAttempts.incrementAndGet();
                 factory.getConnectionAsync(connectionResultHandler);
 
                 logger.debug(LocalizableMessage.raw(
@@ -660,7 +674,6 @@
     private final Semaphore availableConnections;
     private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
     private final int corePoolSize;
-
     private final ConnectionFactory factory;
     private boolean isClosed = false;
     private final ScheduledFuture<?> idleTimeoutFuture;
@@ -669,6 +682,12 @@
     private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
     private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
 
+    /**
+     * The number of new connections which are in the process of being
+     * established.
+     */
+    private final AtomicInteger pendingConnectionAttempts = new AtomicInteger();
+
     CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
             final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
             final ScheduledExecutorService scheduler) {
@@ -766,45 +785,56 @@
                 }
             }
 
-            if (!holder.isWaitingFuture()) {
-                // There was a completed connection attempt.
-                final Connection connection = holder.getWaitingConnection();
-                if (connection.isValid()) {
-                    final PooledConnection pooledConnection =
-                            newPooledConnection(connection, getStackTraceIfDebugEnabled());
-                    if (handler != null) {
-                        handler.handleResult(pooledConnection);
-                    }
-                    return new CompletedFutureResult<Connection>(pooledConnection);
-                } else {
-                    // Close the stale connection and try again.
-                    connection.close();
-                    availableConnections.release();
-
-                    logger.debug(LocalizableMessage.raw(
-                            "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
-                            currentPoolSize(), maxPoolSize));
-                }
-            } else {
+            if (holder.isWaitingFuture()) {
                 // Grow the pool if needed.
                 final FutureResult<Connection> future = holder.getWaitingFuture();
                 if (!future.isDone() && availableConnections.tryAcquire()) {
+                    pendingConnectionAttempts.incrementAndGet();
                     factory.getConnectionAsync(connectionResultHandler);
                 }
                 return future;
             }
+
+            // There was a completed connection attempt.
+            final Connection connection = holder.getWaitingConnection();
+            if (connection.isValid()) {
+                final PooledConnection pooledConnection =
+                        newPooledConnection(connection, getStackTraceIfDebugEnabled());
+                if (handler != null) {
+                    handler.handleResult(pooledConnection);
+                }
+                return new CompletedFutureResult<Connection>(pooledConnection);
+            } else {
+                // Close the stale connection and try again.
+                connection.close();
+                availableConnections.release();
+
+                logger.debug(LocalizableMessage.raw(
+                        "Connection no longer valid: availableConnections=%d, poolSize=%d",
+                        currentPoolSize(), maxPoolSize));
+            }
         }
     }
 
     @Override
     public String toString() {
-        final StringBuilder builder = new StringBuilder();
-        builder.append("CachedConnectionPool(");
-        builder.append(String.valueOf(factory));
-        builder.append(',');
-        builder.append(maxPoolSize);
-        builder.append(')');
-        return builder.toString();
+        final int size = currentPoolSize();
+        final int pending = pendingConnectionAttempts.get();
+        int in = 0;
+        int blocked = 0;
+        synchronized (queue) {
+            for (QueueElement qe : queue) {
+                if (qe.isWaitingFuture()) {
+                    blocked++;
+                } else {
+                    in++;
+                }
+            }
+        }
+        final int out = size - in - pending;
+        return String.format("CachedConnectionPool(size=%d[in:%d + out:%d + "
+                + "pending:%d], maxSize=%d, blocked=%d, factory=%s)", size, in, out, pending,
+                maxPoolSize, blocked, String.valueOf(factory));
     }
 
     /**
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 29e7cfa..1b13da3 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -792,14 +792,14 @@
         }
 
         private void checkForHeartBeat() {
-            if (sync.isHeldExclusively()) {
+            if (sync.isHeld()) {
                 /*
-                 * A heart beat is still in progress, but it should have
-                 * completed by now. Let's avoid aggressively terminating the
-                 * connection, because the heart beat may simply have been
-                 * delayed by a sudden surge of activity. Therefore, only flag
-                 * the connection as failed if no activity has been seen on the
-                 * connection since the heart beat was sent.
+                 * A heart beat or bind/startTLS is still in progress, but it
+                 * should have completed by now. Let's avoid aggressively
+                 * terminating the connection, because the heart beat may simply
+                 * have been delayed by a sudden surge of activity. Therefore,
+                 * only flag the connection as failed if no activity has been
+                 * seen on the connection since the heart beat was sent.
                  */
                 final long currentTimeMillis = timeSource.currentTimeMillis();
                 if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) {
@@ -907,7 +907,6 @@
             if (sync.tryLockExclusively()) {
                 try {
                     connection.searchAsync(heartBeatRequest, null, heartBeatHandler);
-                    return true;
                 } catch (final IllegalStateException e) {
                     /*
                      * This may happen when we attempt to send the heart beat
@@ -922,7 +921,12 @@
                     releaseHeartBeatLock();
                 }
             }
-            return false;
+            /*
+             * Indicate that a the heartbeat should be checked even if a
+             * bind/startTLS is in progress, since these operations will
+             * effectively act as the heartbeat.
+             */
+            return true;
         }
 
         private <R> R timestamp(final R response) {
@@ -997,6 +1001,10 @@
             return getState() == LOCKED_EXCLUSIVELY;
         }
 
+        boolean isHeld() {
+            return getState() != 0;
+        }
+
         @Override
         protected boolean tryAcquire(final int ignored) {
             if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) {
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
index bd19355..6efb8ab 100644
--- a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS.
+ *      Portions copyright 2011-2014 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
@@ -30,18 +30,16 @@
 import static org.fest.assertions.Assertions.assertThat;
 import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
 import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
-import static org.forgerock.opendj.ldap.TestCaseUtils.*;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -50,11 +48,13 @@
 import org.forgerock.opendj.ldap.requests.Requests;
 import org.forgerock.opendj.ldap.responses.ExtendedResult;
 import org.forgerock.opendj.ldap.responses.Responses;
+import org.mockito.ArgumentCaptor;
 import org.testng.annotations.Test;
 
 /**
  * Tests the connection pool implementation..
  */
+@SuppressWarnings("javadoc")
 public class ConnectionPoolTestCase extends SdkTestCase {
 
     /**
@@ -511,4 +511,38 @@
         assertThat(scheduler.isScheduled()).isFalse();
     }
 
+    /**
+     * Test that all outstanding pending connection futures are completed when a
+     * connection request fails.
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test(description = "OPENDJ-1348", timeOut = 10000)
+    public void testNewConnectionFailureFlushesAllPendingFutures() throws Exception {
+        final ConnectionFactory factory = mock(ConnectionFactory.class);
+        final int poolSize = 2;
+        final ConnectionPool pool = Connections.newFixedConnectionPool(factory, poolSize);
+
+        List<FutureResult<Connection>> futures = new ArrayList<FutureResult<Connection>>();
+        for (int i = 0; i < poolSize + 1; i++) {
+            futures.add(pool.getConnectionAsync(null));
+        }
+
+        final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
+        verify(factory, times(poolSize)).getConnectionAsync(arg.capture());
+        final ErrorResultException connectError =
+                ErrorResultException.newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR);
+        for (ResultHandler<Connection> handler : arg.getAllValues()) {
+            handler.handleErrorResult(connectError);
+        }
+
+        for (FutureResult<Connection> future : futures) {
+            try {
+                // Before the fix for OPENDJ-1348 the third future.get() would hang.
+                future.get();
+            } catch (ErrorResultException e) {
+                assertThat(e).isSameAs(connectError);
+            }
+        }
+    }
+
 }
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
index 01a7f34..1f64f20 100644
--- a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
@@ -283,20 +283,76 @@
     }
 
     @SuppressWarnings("unchecked")
-    @Test
-    public void testHeartBeatWhileBindInProgress() throws Exception {
+    @Test(description = "OPENDJ-1348")
+    public void testBindPreventsHeartBeatTimeout() throws Exception {
         mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
-
         hbc = hbcf.getConnection();
 
         /*
          * Send a bind request, trapping the bind call-back so that we can send
          * the response once we have attempted a heartbeat.
          */
-        when(
-                connection.bindAsync(any(BindRequest.class),
-                        any(IntermediateResponseHandler.class), any(ResultHandler.class)))
-                .thenReturn(null);
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+        hbc.bindAsync(newSimpleBindRequest(), null, null);
+        @SuppressWarnings("rawtypes")
+        final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
+        verify(connection, times(1)).bindAsync(any(BindRequest.class),
+                any(IntermediateResponseHandler.class), arg.capture());
+
+        // Verify no heartbeat is sent because there is a bind in progress.
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11001L);
+        scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.sendHeartBeat()
+        verify(connection, times(1)).searchAsync(same(HEARTBEAT),
+                any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+
+        // Send fake bind response, releasing the heartbeat.
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11099L);
+        arg.getValue().handleResult(newResult(ResultCode.SUCCESS));
+
+        // Check that bind response acts as heartbeat.
+        assertThat(hbc.isValid()).isTrue();
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11100L);
+        scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.checkForHeartBeat()
+        assertThat(hbc.isValid()).isTrue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(description = "OPENDJ-1348")
+    public void testBindTriggersHeartBeatTimeoutWhenTooSlow() throws Exception {
+        mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
+        hbc = hbcf.getConnection();
+
+        // Send another bind request which will timeout.
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(20000L);
+        hbc.bindAsync(newSimpleBindRequest(), null, null);
+        @SuppressWarnings("rawtypes")
+        final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
+        verify(connection, times(1)).bindAsync(any(BindRequest.class),
+                any(IntermediateResponseHandler.class), arg.capture());
+
+        // Verify no heartbeat is sent because there is a bind in progress.
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(20001L);
+        scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.sendHeartBeat()
+        verify(connection, times(1)).searchAsync(same(HEARTBEAT),
+                any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+
+        // Check that lack of bind response acts as heartbeat timeout.
+        assertThat(hbc.isValid()).isTrue();
+        when(hbcf.timeSource.currentTimeMillis()).thenReturn(20100L);
+        scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.checkForHeartBeat()
+        assertThat(hbc.isValid()).isFalse();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testHeartBeatWhileBindInProgress() throws Exception {
+        mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
+        hbc = hbcf.getConnection();
+
+        /*
+         * Send a bind request, trapping the bind call-back so that we can send
+         * the response once we have attempted a heartbeat.
+         */
         hbc.bindAsync(newSimpleBindRequest(), null, null);
 
         // Capture the bind result handler.

--
Gitblit v1.10.0