From b33a21892a2d6c90394b9d1e9d537dc82c679b60 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 24 Feb 2014 14:02:53 +0000
Subject: [PATCH] Fix OPENDJ-1348: Various connection pool implementations do not recover if the target server is powered off and restarted
---
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java | 50 ++++++++-
opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java | 70 ++++++++++++-
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java | 26 +++-
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java | 3
opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java | 106 +++++++++++++-------
5 files changed, 191 insertions(+), 64 deletions(-)
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
index 3eb841e..c65ad4f 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -135,10 +135,9 @@
*/
@Override
public void handleResult(final Connection connection) {
- notifyOnline();
-
// The connection is not going to be used, so close it immediately.
connection.close();
+ notifyOnline();
}
@Override
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index c09ac6f..769b8cf 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -42,6 +42,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -87,23 +88,34 @@
@Override
public void handleErrorResult(final ErrorResultException error) {
// Connection attempt failed, so decrease the pool size.
+ pendingConnectionAttempts.decrementAndGet();
availableConnections.release();
- logger.debug(LocalizableMessage.raw("Connection attempt failed: availableConnections=%d, maxPoolSize=%d",
+ logger.debug(LocalizableMessage.raw(
+ "Connection attempt failed: availableConnections=%d, maxPoolSize=%d",
currentPoolSize(), maxPoolSize, error));
- QueueElement holder;
+ /*
+ * There may be many pending futures waiting for a connection
+ * attempt to succeed. In some situations the number of pending
+ * futures may exceed the pool size and the number of outstanding
+ * connection attempts. If only one pending future is resolved per
+ * failed connection attempt then some pending futures will be left
+ * unresolved. Therefore, a failed connection attempt must fail all
+ * pending futures, even if some of the subsequent connection
+ * attempts succeed, which is unlikely (if one fails, then they are
+ * all likely to fail).
+ */
+ final List<QueueElement> waitingFutures =
+ new LinkedList<CachedConnectionPool.QueueElement>();
synchronized (queue) {
- if (hasWaitingFutures()) {
- holder = queue.removeFirst();
- } else {
- // No waiting futures.
- return;
+ while (hasWaitingFutures()) {
+ waitingFutures.add(queue.removeFirst());
}
}
-
- // There was waiting future, so close it.
- holder.getWaitingFuture().handleErrorResult(error);
+ for (QueueElement waitingFuture : waitingFutures) {
+ waitingFuture.getWaitingFuture().handleErrorResult(error);
+ }
}
@Override
@@ -111,6 +123,7 @@
logger.debug(LocalizableMessage.raw(
"Connection attempt succeeded: availableConnections=%d, maxPoolSize=%d",
currentPoolSize(), maxPoolSize));
+ pendingConnectionAttempts.decrementAndGet();
publishConnection(connection);
}
}
@@ -254,6 +267,7 @@
* availableConnections.
*/
connection.close();
+ pendingConnectionAttempts.incrementAndGet();
factory.getConnectionAsync(connectionResultHandler);
logger.debug(LocalizableMessage.raw(
@@ -660,7 +674,6 @@
private final Semaphore availableConnections;
private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
private final int corePoolSize;
-
private final ConnectionFactory factory;
private boolean isClosed = false;
private final ScheduledFuture<?> idleTimeoutFuture;
@@ -669,6 +682,12 @@
private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
+ /**
+ * The number of new connections which are in the process of being
+ * established.
+ */
+ private final AtomicInteger pendingConnectionAttempts = new AtomicInteger();
+
CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
final ScheduledExecutorService scheduler) {
@@ -766,45 +785,56 @@
}
}
- if (!holder.isWaitingFuture()) {
- // There was a completed connection attempt.
- final Connection connection = holder.getWaitingConnection();
- if (connection.isValid()) {
- final PooledConnection pooledConnection =
- newPooledConnection(connection, getStackTraceIfDebugEnabled());
- if (handler != null) {
- handler.handleResult(pooledConnection);
- }
- return new CompletedFutureResult<Connection>(pooledConnection);
- } else {
- // Close the stale connection and try again.
- connection.close();
- availableConnections.release();
-
- logger.debug(LocalizableMessage.raw(
- "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
- currentPoolSize(), maxPoolSize));
- }
- } else {
+ if (holder.isWaitingFuture()) {
// Grow the pool if needed.
final FutureResult<Connection> future = holder.getWaitingFuture();
if (!future.isDone() && availableConnections.tryAcquire()) {
+ pendingConnectionAttempts.incrementAndGet();
factory.getConnectionAsync(connectionResultHandler);
}
return future;
}
+
+ // There was a completed connection attempt.
+ final Connection connection = holder.getWaitingConnection();
+ if (connection.isValid()) {
+ final PooledConnection pooledConnection =
+ newPooledConnection(connection, getStackTraceIfDebugEnabled());
+ if (handler != null) {
+ handler.handleResult(pooledConnection);
+ }
+ return new CompletedFutureResult<Connection>(pooledConnection);
+ } else {
+ // Close the stale connection and try again.
+ connection.close();
+ availableConnections.release();
+
+ logger.debug(LocalizableMessage.raw(
+ "Connection no longer valid: availableConnections=%d, poolSize=%d",
+ currentPoolSize(), maxPoolSize));
+ }
}
}
@Override
public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("CachedConnectionPool(");
- builder.append(String.valueOf(factory));
- builder.append(',');
- builder.append(maxPoolSize);
- builder.append(')');
- return builder.toString();
+ final int size = currentPoolSize();
+ final int pending = pendingConnectionAttempts.get();
+ int in = 0;
+ int blocked = 0;
+ synchronized (queue) {
+ for (QueueElement qe : queue) {
+ if (qe.isWaitingFuture()) {
+ blocked++;
+ } else {
+ in++;
+ }
+ }
+ }
+ final int out = size - in - pending;
+ return String.format("CachedConnectionPool(size=%d[in:%d + out:%d + "
+ + "pending:%d], maxSize=%d, blocked=%d, factory=%s)", size, in, out, pending,
+ maxPoolSize, blocked, String.valueOf(factory));
}
/**
diff --git a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
index 29e7cfa..1b13da3 100644
--- a/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
+++ b/opendj-sdk/opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -792,14 +792,14 @@
}
private void checkForHeartBeat() {
- if (sync.isHeldExclusively()) {
+ if (sync.isHeld()) {
/*
- * A heart beat is still in progress, but it should have
- * completed by now. Let's avoid aggressively terminating the
- * connection, because the heart beat may simply have been
- * delayed by a sudden surge of activity. Therefore, only flag
- * the connection as failed if no activity has been seen on the
- * connection since the heart beat was sent.
+ * A heart beat or bind/startTLS is still in progress, but it
+ * should have completed by now. Let's avoid aggressively
+ * terminating the connection, because the heart beat may simply
+ * have been delayed by a sudden surge of activity. Therefore,
+ * only flag the connection as failed if no activity has been
+ * seen on the connection since the heart beat was sent.
*/
final long currentTimeMillis = timeSource.currentTimeMillis();
if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) {
@@ -907,7 +907,6 @@
if (sync.tryLockExclusively()) {
try {
connection.searchAsync(heartBeatRequest, null, heartBeatHandler);
- return true;
} catch (final IllegalStateException e) {
/*
* This may happen when we attempt to send the heart beat
@@ -922,7 +921,12 @@
releaseHeartBeatLock();
}
}
- return false;
+ /*
+ * Indicate that a the heartbeat should be checked even if a
+ * bind/startTLS is in progress, since these operations will
+ * effectively act as the heartbeat.
+ */
+ return true;
}
private <R> R timestamp(final R response) {
@@ -997,6 +1001,10 @@
return getState() == LOCKED_EXCLUSIVELY;
}
+ boolean isHeld() {
+ return getState() != 0;
+ }
+
@Override
protected boolean tryAcquire(final int ignored) {
if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) {
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
index bd19355..6efb8ab 100644
--- a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS.
+ * Portions copyright 2011-2014 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -30,18 +30,16 @@
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
-import static org.forgerock.opendj.ldap.TestCaseUtils.*;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
+import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -50,11 +48,13 @@
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Responses;
+import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
/**
* Tests the connection pool implementation..
*/
+@SuppressWarnings("javadoc")
public class ConnectionPoolTestCase extends SdkTestCase {
/**
@@ -511,4 +511,38 @@
assertThat(scheduler.isScheduled()).isFalse();
}
+ /**
+ * Test that all outstanding pending connection futures are completed when a
+ * connection request fails.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test(description = "OPENDJ-1348", timeOut = 10000)
+ public void testNewConnectionFailureFlushesAllPendingFutures() throws Exception {
+ final ConnectionFactory factory = mock(ConnectionFactory.class);
+ final int poolSize = 2;
+ final ConnectionPool pool = Connections.newFixedConnectionPool(factory, poolSize);
+
+ List<FutureResult<Connection>> futures = new ArrayList<FutureResult<Connection>>();
+ for (int i = 0; i < poolSize + 1; i++) {
+ futures.add(pool.getConnectionAsync(null));
+ }
+
+ final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
+ verify(factory, times(poolSize)).getConnectionAsync(arg.capture());
+ final ErrorResultException connectError =
+ ErrorResultException.newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR);
+ for (ResultHandler<Connection> handler : arg.getAllValues()) {
+ handler.handleErrorResult(connectError);
+ }
+
+ for (FutureResult<Connection> future : futures) {
+ try {
+ // Before the fix for OPENDJ-1348 the third future.get() would hang.
+ future.get();
+ } catch (ErrorResultException e) {
+ assertThat(e).isSameAs(connectError);
+ }
+ }
+ }
+
}
diff --git a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
index 01a7f34..1f64f20 100644
--- a/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
+++ b/opendj-sdk/opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
@@ -283,20 +283,76 @@
}
@SuppressWarnings("unchecked")
- @Test
- public void testHeartBeatWhileBindInProgress() throws Exception {
+ @Test(description = "OPENDJ-1348")
+ public void testBindPreventsHeartBeatTimeout() throws Exception {
mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
-
hbc = hbcf.getConnection();
/*
* Send a bind request, trapping the bind call-back so that we can send
* the response once we have attempted a heartbeat.
*/
- when(
- connection.bindAsync(any(BindRequest.class),
- any(IntermediateResponseHandler.class), any(ResultHandler.class)))
- .thenReturn(null);
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
+ hbc.bindAsync(newSimpleBindRequest(), null, null);
+ @SuppressWarnings("rawtypes")
+ final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
+ verify(connection, times(1)).bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class), arg.capture());
+
+ // Verify no heartbeat is sent because there is a bind in progress.
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11001L);
+ scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.sendHeartBeat()
+ verify(connection, times(1)).searchAsync(same(HEARTBEAT),
+ any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+
+ // Send fake bind response, releasing the heartbeat.
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11099L);
+ arg.getValue().handleResult(newResult(ResultCode.SUCCESS));
+
+ // Check that bind response acts as heartbeat.
+ assertThat(hbc.isValid()).isTrue();
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(11100L);
+ scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.checkForHeartBeat()
+ assertThat(hbc.isValid()).isTrue();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(description = "OPENDJ-1348")
+ public void testBindTriggersHeartBeatTimeoutWhenTooSlow() throws Exception {
+ mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
+ hbc = hbcf.getConnection();
+
+ // Send another bind request which will timeout.
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(20000L);
+ hbc.bindAsync(newSimpleBindRequest(), null, null);
+ @SuppressWarnings("rawtypes")
+ final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
+ verify(connection, times(1)).bindAsync(any(BindRequest.class),
+ any(IntermediateResponseHandler.class), arg.capture());
+
+ // Verify no heartbeat is sent because there is a bind in progress.
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(20001L);
+ scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.sendHeartBeat()
+ verify(connection, times(1)).searchAsync(same(HEARTBEAT),
+ any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
+
+ // Check that lack of bind response acts as heartbeat timeout.
+ assertThat(hbc.isValid()).isTrue();
+ when(hbcf.timeSource.currentTimeMillis()).thenReturn(20100L);
+ scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.checkForHeartBeat()
+ assertThat(hbc.isValid()).isFalse();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testHeartBeatWhileBindInProgress() throws Exception {
+ mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
+ hbc = hbcf.getConnection();
+
+ /*
+ * Send a bind request, trapping the bind call-back so that we can send
+ * the response once we have attempted a heartbeat.
+ */
hbc.bindAsync(newSimpleBindRequest(), null, null);
// Capture the bind result handler.
--
Gitblit v1.10.0