opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -135,10 +135,9 @@ */ @Override public void handleResult(final Connection connection) { notifyOnline(); // The connection is not going to be used, so close it immediately. connection.close(); notifyOnline(); } @Override opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -42,6 +42,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; @@ -87,23 +88,34 @@ @Override public void handleErrorResult(final ErrorResultException error) { // Connection attempt failed, so decrease the pool size. pendingConnectionAttempts.decrementAndGet(); availableConnections.release(); logger.debug(LocalizableMessage.raw("Connection attempt failed: availableConnections=%d, maxPoolSize=%d", logger.debug(LocalizableMessage.raw( "Connection attempt failed: availableConnections=%d, maxPoolSize=%d", currentPoolSize(), maxPoolSize, error)); QueueElement holder; /* * There may be many pending futures waiting for a connection * attempt to succeed. In some situations the number of pending * futures may exceed the pool size and the number of outstanding * connection attempts. If only one pending future is resolved per * failed connection attempt then some pending futures will be left * unresolved. Therefore, a failed connection attempt must fail all * pending futures, even if some of the subsequent connection * attempts succeed, which is unlikely (if one fails, then they are * all likely to fail). */ final List<QueueElement> waitingFutures = new LinkedList<CachedConnectionPool.QueueElement>(); synchronized (queue) { if (hasWaitingFutures()) { holder = queue.removeFirst(); } else { // No waiting futures. return; while (hasWaitingFutures()) { waitingFutures.add(queue.removeFirst()); } } // There was waiting future, so close it. holder.getWaitingFuture().handleErrorResult(error); for (QueueElement waitingFuture : waitingFutures) { waitingFuture.getWaitingFuture().handleErrorResult(error); } } @Override @@ -111,6 +123,7 @@ logger.debug(LocalizableMessage.raw( "Connection attempt succeeded: availableConnections=%d, maxPoolSize=%d", currentPoolSize(), maxPoolSize)); pendingConnectionAttempts.decrementAndGet(); publishConnection(connection); } } @@ -254,6 +267,7 @@ * availableConnections. */ connection.close(); pendingConnectionAttempts.incrementAndGet(); factory.getConnectionAsync(connectionResultHandler); logger.debug(LocalizableMessage.raw( @@ -660,7 +674,6 @@ private final Semaphore availableConnections; private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler(); private final int corePoolSize; private final ConnectionFactory factory; private boolean isClosed = false; private final ScheduledFuture<?> idleTimeoutFuture; @@ -669,6 +682,12 @@ private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; /** * The number of new connections which are in the process of being * established. */ private final AtomicInteger pendingConnectionAttempts = new AtomicInteger(); CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize, final int maximumPoolSize, final long idleTimeout, final TimeUnit unit, final ScheduledExecutorService scheduler) { @@ -766,45 +785,56 @@ } } if (!holder.isWaitingFuture()) { // There was a completed connection attempt. final Connection connection = holder.getWaitingConnection(); if (connection.isValid()) { final PooledConnection pooledConnection = newPooledConnection(connection, getStackTraceIfDebugEnabled()); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult<Connection>(pooledConnection); } else { // Close the stale connection and try again. connection.close(); availableConnections.release(); logger.debug(LocalizableMessage.raw( "Connection no longer valid: availableConnections=%d, maxPoolSize=%d", currentPoolSize(), maxPoolSize)); } } else { if (holder.isWaitingFuture()) { // Grow the pool if needed. final FutureResult<Connection> future = holder.getWaitingFuture(); if (!future.isDone() && availableConnections.tryAcquire()) { pendingConnectionAttempts.incrementAndGet(); factory.getConnectionAsync(connectionResultHandler); } return future; } // There was a completed connection attempt. final Connection connection = holder.getWaitingConnection(); if (connection.isValid()) { final PooledConnection pooledConnection = newPooledConnection(connection, getStackTraceIfDebugEnabled()); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult<Connection>(pooledConnection); } else { // Close the stale connection and try again. connection.close(); availableConnections.release(); logger.debug(LocalizableMessage.raw( "Connection no longer valid: availableConnections=%d, poolSize=%d", currentPoolSize(), maxPoolSize)); } } } @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("CachedConnectionPool("); builder.append(String.valueOf(factory)); builder.append(','); builder.append(maxPoolSize); builder.append(')'); return builder.toString(); final int size = currentPoolSize(); final int pending = pendingConnectionAttempts.get(); int in = 0; int blocked = 0; synchronized (queue) { for (QueueElement qe : queue) { if (qe.isWaitingFuture()) { blocked++; } else { in++; } } } final int out = size - in - pending; return String.format("CachedConnectionPool(size=%d[in:%d + out:%d + " + "pending:%d], maxSize=%d, blocked=%d, factory=%s)", size, in, out, pending, maxPoolSize, blocked, String.valueOf(factory)); } /** opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -792,14 +792,14 @@ } private void checkForHeartBeat() { if (sync.isHeldExclusively()) { if (sync.isHeld()) { /* * A heart beat is still in progress, but it should have * completed by now. Let's avoid aggressively terminating the * connection, because the heart beat may simply have been * delayed by a sudden surge of activity. Therefore, only flag * the connection as failed if no activity has been seen on the * connection since the heart beat was sent. * A heart beat or bind/startTLS is still in progress, but it * should have completed by now. Let's avoid aggressively * terminating the connection, because the heart beat may simply * have been delayed by a sudden surge of activity. Therefore, * only flag the connection as failed if no activity has been * seen on the connection since the heart beat was sent. */ final long currentTimeMillis = timeSource.currentTimeMillis(); if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) { @@ -907,7 +907,6 @@ if (sync.tryLockExclusively()) { try { connection.searchAsync(heartBeatRequest, null, heartBeatHandler); return true; } catch (final IllegalStateException e) { /* * This may happen when we attempt to send the heart beat @@ -922,7 +921,12 @@ releaseHeartBeatLock(); } } return false; /* * Indicate that a the heartbeat should be checked even if a * bind/startTLS is in progress, since these operations will * effectively act as the heartbeat. */ return true; } private <R> R timestamp(final R response) { @@ -997,6 +1001,10 @@ return getState() == LOCKED_EXCLUSIVELY; } boolean isHeld() { return getState() != 0; } @Override protected boolean tryAcquire(final int ignored) { if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) { opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -22,7 +22,7 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS. * Portions copyright 2011-2014 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -30,18 +30,16 @@ import static org.fest.assertions.Assertions.assertThat; import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool; import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; import static org.forgerock.opendj.ldap.TestCaseUtils.*; import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection; import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory; import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -50,11 +48,13 @@ import org.forgerock.opendj.ldap.requests.Requests; import org.forgerock.opendj.ldap.responses.ExtendedResult; import org.forgerock.opendj.ldap.responses.Responses; import org.mockito.ArgumentCaptor; import org.testng.annotations.Test; /** * Tests the connection pool implementation.. */ @SuppressWarnings("javadoc") public class ConnectionPoolTestCase extends SdkTestCase { /** @@ -511,4 +511,38 @@ assertThat(scheduler.isScheduled()).isFalse(); } /** * Test that all outstanding pending connection futures are completed when a * connection request fails. */ @SuppressWarnings({ "rawtypes", "unchecked" }) @Test(description = "OPENDJ-1348", timeOut = 10000) public void testNewConnectionFailureFlushesAllPendingFutures() throws Exception { final ConnectionFactory factory = mock(ConnectionFactory.class); final int poolSize = 2; final ConnectionPool pool = Connections.newFixedConnectionPool(factory, poolSize); List<FutureResult<Connection>> futures = new ArrayList<FutureResult<Connection>>(); for (int i = 0; i < poolSize + 1; i++) { futures.add(pool.getConnectionAsync(null)); } final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class); verify(factory, times(poolSize)).getConnectionAsync(arg.capture()); final ErrorResultException connectError = ErrorResultException.newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR); for (ResultHandler<Connection> handler : arg.getAllValues()) { handler.handleErrorResult(connectError); } for (FutureResult<Connection> future : futures) { try { // Before the fix for OPENDJ-1348 the third future.get() would hang. future.get(); } catch (ErrorResultException e) { assertThat(e).isSameAs(connectError); } } } } opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
@@ -283,20 +283,76 @@ } @SuppressWarnings("unchecked") @Test public void testHeartBeatWhileBindInProgress() throws Exception { @Test(description = "OPENDJ-1348") public void testBindPreventsHeartBeatTimeout() throws Exception { mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS); hbc = hbcf.getConnection(); /* * Send a bind request, trapping the bind call-back so that we can send * the response once we have attempted a heartbeat. */ when( connection.bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), any(ResultHandler.class))) .thenReturn(null); when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L); hbc.bindAsync(newSimpleBindRequest(), null, null); @SuppressWarnings("rawtypes") final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class); verify(connection, times(1)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), arg.capture()); // Verify no heartbeat is sent because there is a bind in progress. when(hbcf.timeSource.currentTimeMillis()).thenReturn(11001L); scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.sendHeartBeat() verify(connection, times(1)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); // Send fake bind response, releasing the heartbeat. when(hbcf.timeSource.currentTimeMillis()).thenReturn(11099L); arg.getValue().handleResult(newResult(ResultCode.SUCCESS)); // Check that bind response acts as heartbeat. assertThat(hbc.isValid()).isTrue(); when(hbcf.timeSource.currentTimeMillis()).thenReturn(11100L); scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.checkForHeartBeat() assertThat(hbc.isValid()).isTrue(); } @SuppressWarnings("unchecked") @Test(description = "OPENDJ-1348") public void testBindTriggersHeartBeatTimeoutWhenTooSlow() throws Exception { mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS); hbc = hbcf.getConnection(); // Send another bind request which will timeout. when(hbcf.timeSource.currentTimeMillis()).thenReturn(20000L); hbc.bindAsync(newSimpleBindRequest(), null, null); @SuppressWarnings("rawtypes") final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class); verify(connection, times(1)).bindAsync(any(BindRequest.class), any(IntermediateResponseHandler.class), arg.capture()); // Verify no heartbeat is sent because there is a bind in progress. when(hbcf.timeSource.currentTimeMillis()).thenReturn(20001L); scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.sendHeartBeat() verify(connection, times(1)).searchAsync(same(HEARTBEAT), any(IntermediateResponseHandler.class), any(SearchResultHandler.class)); // Check that lack of bind response acts as heartbeat timeout. assertThat(hbc.isValid()).isTrue(); when(hbcf.timeSource.currentTimeMillis()).thenReturn(20100L); scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.checkForHeartBeat() assertThat(hbc.isValid()).isFalse(); } @SuppressWarnings("unchecked") @Test public void testHeartBeatWhileBindInProgress() throws Exception { mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS); hbc = hbcf.getConnection(); /* * Send a bind request, trapping the bind call-back so that we can send * the response once we have attempted a heartbeat. */ hbc.bindAsync(newSimpleBindRequest(), null, null); // Capture the bind result handler.