mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
24.02.2014 c60627d3626afbba7388460c218de4df401904f5
Fix OPENDJ-1348: Various connection pool implementations do not recover if the target server is powered off and restarted

* HeartBeatConnectionFactory: check for heartbeat timeouts when there is a bind/startTLS in progress. Use the pending bind/startTLS as a proxy for the heartbeat request
* CachedConnectionPool: flush all pending connection futures if an attempt to establish a new pooled connection fails. Improve statistics in toString() representation to make problems easier to diagnose during debugging
* AbstractLoadBalancingAlgorithm: minor change - notify event listeners after closing the monitoring connection so that statistics are more accurate.

Reviewed by Jean-Noel.
5 files modified
235 ■■■■ changed files
opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java 3 ●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java 86 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java 26 ●●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java 50 ●●●● patch | view | raw | blame | history
opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java 70 ●●●●● patch | view | raw | blame | history
opendj-core/src/main/java/org/forgerock/opendj/ldap/AbstractLoadBalancingAlgorithm.java
@@ -135,10 +135,9 @@
         */
        @Override
        public void handleResult(final Connection connection) {
            notifyOnline();
            // The connection is not going to be used, so close it immediately.
            connection.close();
            notifyOnline();
        }
        @Override
opendj-core/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -42,6 +42,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -87,23 +88,34 @@
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            // Connection attempt failed, so decrease the pool size.
            pendingConnectionAttempts.decrementAndGet();
            availableConnections.release();
            logger.debug(LocalizableMessage.raw("Connection attempt failed: availableConnections=%d, maxPoolSize=%d",
            logger.debug(LocalizableMessage.raw(
                    "Connection attempt failed: availableConnections=%d, maxPoolSize=%d",
                    currentPoolSize(), maxPoolSize, error));
            QueueElement holder;
            /*
             * There may be many pending futures waiting for a connection
             * attempt to succeed. In some situations the number of pending
             * futures may exceed the pool size and the number of outstanding
             * connection attempts. If only one pending future is resolved per
             * failed connection attempt then some pending futures will be left
             * unresolved. Therefore, a failed connection attempt must fail all
             * pending futures, even if some of the subsequent connection
             * attempts succeed, which is unlikely (if one fails, then they are
             * all likely to fail).
             */
            final List<QueueElement> waitingFutures =
                    new LinkedList<CachedConnectionPool.QueueElement>();
            synchronized (queue) {
                if (hasWaitingFutures()) {
                    holder = queue.removeFirst();
                } else {
                    // No waiting futures.
                    return;
                while (hasWaitingFutures()) {
                    waitingFutures.add(queue.removeFirst());
                }
            }
            // There was waiting future, so close it.
            holder.getWaitingFuture().handleErrorResult(error);
            for (QueueElement waitingFuture : waitingFutures) {
                waitingFuture.getWaitingFuture().handleErrorResult(error);
            }
        }
        @Override
@@ -111,6 +123,7 @@
            logger.debug(LocalizableMessage.raw(
                    "Connection attempt succeeded:  availableConnections=%d, maxPoolSize=%d",
                    currentPoolSize(), maxPoolSize));
            pendingConnectionAttempts.decrementAndGet();
            publishConnection(connection);
        }
    }
@@ -254,6 +267,7 @@
                 * availableConnections.
                 */
                connection.close();
                pendingConnectionAttempts.incrementAndGet();
                factory.getConnectionAsync(connectionResultHandler);
                logger.debug(LocalizableMessage.raw(
@@ -660,7 +674,6 @@
    private final Semaphore availableConnections;
    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
    private final int corePoolSize;
    private final ConnectionFactory factory;
    private boolean isClosed = false;
    private final ScheduledFuture<?> idleTimeoutFuture;
@@ -669,6 +682,12 @@
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    /**
     * The number of new connections which are in the process of being
     * established.
     */
    private final AtomicInteger pendingConnectionAttempts = new AtomicInteger();
    CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
            final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
            final ScheduledExecutorService scheduler) {
@@ -766,7 +785,16 @@
                }
            }
            if (!holder.isWaitingFuture()) {
            if (holder.isWaitingFuture()) {
                // Grow the pool if needed.
                final FutureResult<Connection> future = holder.getWaitingFuture();
                if (!future.isDone() && availableConnections.tryAcquire()) {
                    pendingConnectionAttempts.incrementAndGet();
                    factory.getConnectionAsync(connectionResultHandler);
                }
                return future;
            }
                // There was a completed connection attempt.
                final Connection connection = holder.getWaitingConnection();
                if (connection.isValid()) {
@@ -782,29 +810,31 @@
                    availableConnections.release();
                    logger.debug(LocalizableMessage.raw(
                            "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
                        "Connection no longer valid: availableConnections=%d, poolSize=%d",
                            currentPoolSize(), maxPoolSize));
                }
            } else {
                // Grow the pool if needed.
                final FutureResult<Connection> future = holder.getWaitingFuture();
                if (!future.isDone() && availableConnections.tryAcquire()) {
                    factory.getConnectionAsync(connectionResultHandler);
                }
                return future;
            }
        }
    }
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("CachedConnectionPool(");
        builder.append(String.valueOf(factory));
        builder.append(',');
        builder.append(maxPoolSize);
        builder.append(')');
        return builder.toString();
        final int size = currentPoolSize();
        final int pending = pendingConnectionAttempts.get();
        int in = 0;
        int blocked = 0;
        synchronized (queue) {
            for (QueueElement qe : queue) {
                if (qe.isWaitingFuture()) {
                    blocked++;
                } else {
                    in++;
                }
            }
        }
        final int out = size - in - pending;
        return String.format("CachedConnectionPool(size=%d[in:%d + out:%d + "
                + "pending:%d], maxSize=%d, blocked=%d, factory=%s)", size, in, out, pending,
                maxPoolSize, blocked, String.valueOf(factory));
    }
    /**
opendj-core/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -792,14 +792,14 @@
        }
        private void checkForHeartBeat() {
            if (sync.isHeldExclusively()) {
            if (sync.isHeld()) {
                /*
                 * A heart beat is still in progress, but it should have
                 * completed by now. Let's avoid aggressively terminating the
                 * connection, because the heart beat may simply have been
                 * delayed by a sudden surge of activity. Therefore, only flag
                 * the connection as failed if no activity has been seen on the
                 * connection since the heart beat was sent.
                 * A heart beat or bind/startTLS is still in progress, but it
                 * should have completed by now. Let's avoid aggressively
                 * terminating the connection, because the heart beat may simply
                 * have been delayed by a sudden surge of activity. Therefore,
                 * only flag the connection as failed if no activity has been
                 * seen on the connection since the heart beat was sent.
                 */
                final long currentTimeMillis = timeSource.currentTimeMillis();
                if (lastResponseTimestamp < (currentTimeMillis - timeoutMS)) {
@@ -907,7 +907,6 @@
            if (sync.tryLockExclusively()) {
                try {
                    connection.searchAsync(heartBeatRequest, null, heartBeatHandler);
                    return true;
                } catch (final IllegalStateException e) {
                    /*
                     * This may happen when we attempt to send the heart beat
@@ -922,7 +921,12 @@
                    releaseHeartBeatLock();
                }
            }
            return false;
            /*
             * Indicate that a the heartbeat should be checked even if a
             * bind/startTLS is in progress, since these operations will
             * effectively act as the heartbeat.
             */
            return true;
        }
        private <R> R timestamp(final R response) {
@@ -997,6 +1001,10 @@
            return getState() == LOCKED_EXCLUSIVELY;
        }
        boolean isHeld() {
            return getState() != 0;
        }
        @Override
        protected boolean tryAcquire(final int ignored) {
            if (compareAndSetState(UNLOCKED, LOCKED_EXCLUSIVELY)) {
opendj-core/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS.
 *      Portions copyright 2011-2014 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -30,18 +30,16 @@
import static org.fest.assertions.Assertions.assertThat;
import static org.forgerock.opendj.ldap.Connections.newFixedConnectionPool;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import static org.forgerock.opendj.ldap.TestCaseUtils.*;
import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnection;
import static org.forgerock.opendj.ldap.TestCaseUtils.mockConnectionFactory;
import static org.forgerock.opendj.ldap.TestCaseUtils.mockTimeSource;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -50,11 +48,13 @@
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.responses.ExtendedResult;
import org.forgerock.opendj.ldap.responses.Responses;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
/**
 * Tests the connection pool implementation..
 */
@SuppressWarnings("javadoc")
public class ConnectionPoolTestCase extends SdkTestCase {
    /**
@@ -511,4 +511,38 @@
        assertThat(scheduler.isScheduled()).isFalse();
    }
    /**
     * Test that all outstanding pending connection futures are completed when a
     * connection request fails.
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @Test(description = "OPENDJ-1348", timeOut = 10000)
    public void testNewConnectionFailureFlushesAllPendingFutures() throws Exception {
        final ConnectionFactory factory = mock(ConnectionFactory.class);
        final int poolSize = 2;
        final ConnectionPool pool = Connections.newFixedConnectionPool(factory, poolSize);
        List<FutureResult<Connection>> futures = new ArrayList<FutureResult<Connection>>();
        for (int i = 0; i < poolSize + 1; i++) {
            futures.add(pool.getConnectionAsync(null));
        }
        final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
        verify(factory, times(poolSize)).getConnectionAsync(arg.capture());
        final ErrorResultException connectError =
                ErrorResultException.newErrorResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR);
        for (ResultHandler<Connection> handler : arg.getAllValues()) {
            handler.handleErrorResult(connectError);
        }
        for (FutureResult<Connection> future : futures) {
            try {
                // Before the fix for OPENDJ-1348 the third future.get() would hang.
                future.get();
            } catch (ErrorResultException e) {
                assertThat(e).isSameAs(connectError);
            }
        }
    }
}
opendj-core/src/test/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactoryTestCase.java
@@ -283,20 +283,76 @@
    }
    @SuppressWarnings("unchecked")
    @Test
    public void testHeartBeatWhileBindInProgress() throws Exception {
    @Test(description = "OPENDJ-1348")
    public void testBindPreventsHeartBeatTimeout() throws Exception {
        mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
        hbc = hbcf.getConnection();
        /*
         * Send a bind request, trapping the bind call-back so that we can send
         * the response once we have attempted a heartbeat.
         */
        when(
                connection.bindAsync(any(BindRequest.class),
                        any(IntermediateResponseHandler.class), any(ResultHandler.class)))
                .thenReturn(null);
        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11000L);
        hbc.bindAsync(newSimpleBindRequest(), null, null);
        @SuppressWarnings("rawtypes")
        final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
        verify(connection, times(1)).bindAsync(any(BindRequest.class),
                any(IntermediateResponseHandler.class), arg.capture());
        // Verify no heartbeat is sent because there is a bind in progress.
        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11001L);
        scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.sendHeartBeat()
        verify(connection, times(1)).searchAsync(same(HEARTBEAT),
                any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
        // Send fake bind response, releasing the heartbeat.
        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11099L);
        arg.getValue().handleResult(newResult(ResultCode.SUCCESS));
        // Check that bind response acts as heartbeat.
        assertThat(hbc.isValid()).isTrue();
        when(hbcf.timeSource.currentTimeMillis()).thenReturn(11100L);
        scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.checkForHeartBeat()
        assertThat(hbc.isValid()).isTrue();
    }
    @SuppressWarnings("unchecked")
    @Test(description = "OPENDJ-1348")
    public void testBindTriggersHeartBeatTimeoutWhenTooSlow() throws Exception {
        mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
        hbc = hbcf.getConnection();
        // Send another bind request which will timeout.
        when(hbcf.timeSource.currentTimeMillis()).thenReturn(20000L);
        hbc.bindAsync(newSimpleBindRequest(), null, null);
        @SuppressWarnings("rawtypes")
        final ArgumentCaptor<ResultHandler> arg = ArgumentCaptor.forClass(ResultHandler.class);
        verify(connection, times(1)).bindAsync(any(BindRequest.class),
                any(IntermediateResponseHandler.class), arg.capture());
        // Verify no heartbeat is sent because there is a bind in progress.
        when(hbcf.timeSource.currentTimeMillis()).thenReturn(20001L);
        scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.sendHeartBeat()
        verify(connection, times(1)).searchAsync(same(HEARTBEAT),
                any(IntermediateResponseHandler.class), any(SearchResultHandler.class));
        // Check that lack of bind response acts as heartbeat timeout.
        assertThat(hbc.isValid()).isTrue();
        when(hbcf.timeSource.currentTimeMillis()).thenReturn(20100L);
        scheduler.runAllTasks(); // Invokes HBCF.ConnectionImpl.checkForHeartBeat()
        assertThat(hbc.isValid()).isFalse();
    }
    @SuppressWarnings("unchecked")
    @Test
    public void testHeartBeatWhileBindInProgress() throws Exception {
        mockConnectionWithInitialHeartbeatResult(ResultCode.SUCCESS);
        hbc = hbcf.getConnection();
        /*
         * Send a bind request, trapping the bind call-back so that we can send
         * the response once we have attempted a heartbeat.
         */
        hbc.bindAsync(newSimpleBindRequest(), null, null);
        // Capture the bind result handler.