mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
13.09.2013 ec2e88cf45f3d212f3634a509c7e4d87b2081508
Backport fix for OPENDJ-1091: Implement a cached connection pool
1 files renamed
5 files modified
500 ■■■■ changed files
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java 8 ●●●● patch | view | raw | blame | history
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java 9 ●●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java 232 ●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java 145 ●●●●● patch | view | raw | blame | history
opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java 102 ●●●●● patch | view | raw | blame | history
opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java 4 ●●● patch | view | raw | blame | history
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -451,14 +451,14 @@
            final String remoteAddress = args[i];
            final int remotePort = Integer.parseInt(args[i + 1]);
            factories.add(Connections.newFixedConnectionPool(Connections
            factories.add(Connections.newCachedConnectionPool(Connections
                    .newAuthenticatedConnectionFactory(Connections
                            .newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
                                    remotePort)), Requests.newSimpleBindRequest(proxyDN,
                            proxyPassword.toCharArray())), Integer.MAX_VALUE));
            bindFactories.add(Connections.newFixedConnectionPool(Connections
                            proxyPassword.toCharArray()))));
            bindFactories.add(Connections.newCachedConnectionPool(Connections
                    .newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
                            remotePort)), Integer.MAX_VALUE));
                            remotePort))));
        }
        final RoundRobinLoadBalancingAlgorithm algorithm =
                new RoundRobinLoadBalancingAlgorithm(factories);
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
@@ -665,15 +665,14 @@
        // Create connection factories.
        final ConnectionFactory factory =
                Connections.newFixedConnectionPool(
                Connections.newCachedConnectionPool(
                        Connections.newAuthenticatedConnectionFactory(
                                new LDAPConnectionFactory(remoteAddress, remotePort),
                                Requests.newSimpleBindRequest(
                                        proxyDN, proxyPassword.toCharArray())),
                        Integer.MAX_VALUE);
                                        proxyDN, proxyPassword.toCharArray())));
        final ConnectionFactory bindFactory =
                Connections.newFixedConnectionPool(new LDAPConnectionFactory(
                        remoteAddress, remotePort), Integer.MAX_VALUE);
                Connections.newCachedConnectionPool(new LDAPConnectionFactory(
                        remoteAddress, remotePort));
        // Create a server connection adapter.
        final ProxyBackend backend = new ProxyBackend(factory, bindFactory);
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
File was renamed from opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -27,16 +27,20 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.*;
import static org.forgerock.opendj.ldap.CoreMessages.*;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@@ -62,13 +66,15 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
 * A simple connection pool implementation which maintains a fixed number of
 * connections.
 * A connection pool implementation which maintains a cache of pooled
 * connections with a configurable core pool size, maximum size, and expiration
 * policy.
 */
final class FixedConnectionPool implements ConnectionPool {
final class CachedConnectionPool implements ConnectionPool {
    /**
     * This result handler is invoked when an attempt to add a new connection to
@@ -79,13 +85,12 @@
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            // Connection attempt failed, so decrease the pool size.
            currentPoolSize.release();
            availableConnections.release();
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format(
                        "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error
                                .getMessage(), poolSize - currentPoolSize.availablePermits(),
                        poolSize));
                        "Connection attempt failed: %s, availableConnections=%d, maxPoolSize=%d",
                        error.getMessage(), currentPoolSize(), maxPoolSize));
            }
            QueueElement holder;
@@ -106,8 +111,8 @@
        public void handleResult(final Connection connection) {
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format(
                        "Connection attempt succeeded:  currentPoolSize=%d, poolSize=%d", poolSize
                                - currentPoolSize.availablePermits(), poolSize));
                        "Connection attempt succeeded:  availableConnections=%d, maxPoolSize=%d",
                        currentPoolSize(), maxPoolSize));
            }
            publishConnection(connection);
        }
@@ -191,7 +196,7 @@
        }
        @Override
        public Result applyChange(ChangeRecord request) throws ErrorResultException {
        public Result applyChange(final ChangeRecord request) throws ErrorResultException {
            return checkState().applyChange(request);
        }
@@ -249,15 +254,15 @@
                 * server, but the server may still be available. In order to
                 * avoid leaving pending futures hanging indefinitely, we should
                 * try to reconnect immediately. No need to release/acquire
                 * currentPoolSize.
                 * availableConnections.
                 */
                connection.close();
                factory.getConnectionAsync(connectionResultHandler);
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format(
                            "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize
                                    - currentPoolSize.availablePermits(), poolSize));
                            "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
                            currentPoolSize(), maxPoolSize));
                }
            }
@@ -527,7 +532,54 @@
            }
            return connection;
        }
    }
    /**
     * Scheduled task responsible for purging non-core pooled connections which
     * have been idle for longer than the idle timeout limit.
     */
    private final class PurgeIdleConnectionsTask implements Runnable {
        @Override
        public void run() {
            final List<Connection> idleConnections;
            synchronized (queue) {
                if (isClosed) {
                    return;
                }
                /*
                 * Obtain a list of expired connections but don't close them yet
                 * since we don't want to hold the lock too long.
                 */
                idleConnections = new LinkedList<Connection>();
                final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis;
                int nonCoreConnectionCount = currentPoolSize() - corePoolSize;
                for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0
                        && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) {
                    idleConnections.add(holder.getWaitingConnection());
                    queue.poll();
                    availableConnections.release();
                    nonCoreConnectionCount--;
                }
            }
            // Close the idle connections.
            if (!idleConnections.isEmpty()) {
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format("Closing %d idle pooled connections: "
                            + "availableConnections=%d, maxPoolSize=%d", idleConnections.size(),
                            currentPoolSize(), maxPoolSize));
                }
                for (final Connection connection : idleConnections) {
                    connection.close();
                }
            }
        }
        private boolean isTimedOutQueuedConnection(final QueueElement holder,
                final long timeoutMillis) {
            return holder != null && !holder.isWaitingFuture() && holder.hasTimedOut(timeoutMillis);
        }
    }
    /**
@@ -536,16 +588,19 @@
     * connection request.
     */
    private static final class QueueElement {
        private final long timestampMillis;
        private final Object value;
        QueueElement(final Connection connection) {
        QueueElement(final Connection connection, final long timestampMillis) {
            this.value = connection;
            this.timestampMillis = timestampMillis;
        }
        QueueElement(final ResultHandler<? super Connection> handler) {
        QueueElement(final ResultHandler<? super Connection> handler, final long timestampMillis) {
            this.value =
                    new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
                            handler);
            this.timestampMillis = timestampMillis;
        }
        @Override
@@ -566,22 +621,61 @@
            return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value;
        }
        boolean hasTimedOut(final long timeLimitMillis) {
            return timestampMillis < timeLimitMillis;
        }
        boolean isWaitingFuture() {
            return value instanceof AsynchronousFutureResult;
        }
    }
    /**
     * This is intended for unit testing only in order to inject fake time
     * stamps. Use System.currentTimeMillis() when null.
     */
    Callable<Long> testTimeSource = null;
    private final Semaphore availableConnections;
    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
    private final Semaphore currentPoolSize;
    private final int corePoolSize;
    private final ConnectionFactory factory;
    private boolean isClosed = false;
    private final int poolSize;
    private final ScheduledFuture<?> idleTimeoutFuture;
    private final long idleTimeoutMillis;
    private final int maxPoolSize;
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
    CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
            final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
            final ScheduledExecutorService scheduler) {
        Validator.ensureNotNull(factory);
        Validator.ensureTrue(corePoolSize >= 0, "corePoolSize < 0");
        Validator.ensureTrue(maximumPoolSize > 0, "maxPoolSize <= 0");
        Validator.ensureTrue(corePoolSize <= maximumPoolSize, "corePoolSize > maxPoolSize");
        Validator.ensureTrue(idleTimeout >= 0, "idleTimeout < 0");
        Validator.ensureTrue(idleTimeout == 0 || unit != null, "time unit is null");
        this.factory = factory;
        this.poolSize = poolSize;
        this.currentPoolSize = new Semaphore(poolSize);
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maximumPoolSize;
        this.availableConnections = new Semaphore(maximumPoolSize);
        if (corePoolSize < maximumPoolSize && idleTimeout > 0) {
            // Dynamic pool.
            this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
            this.idleTimeoutMillis = unit.toMillis(idleTimeout);
            this.idleTimeoutFuture =
                    this.scheduler.get().scheduleWithFixedDelay(new PurgeIdleConnectionsTask(),
                            idleTimeout, idleTimeout, unit);
        } else {
            // Fixed pool.
            this.scheduler = null;
            this.idleTimeoutMillis = 0;
            this.idleTimeoutFuture = null;
        }
    }
    @Override
@@ -601,18 +695,24 @@
            while (hasWaitingConnections()) {
                final QueueElement holder = queue.removeFirst();
                idleConnections.add(holder.getWaitingConnection());
                availableConnections.release();
            }
        }
        if (DEBUG_LOG.isLoggable(Level.FINE)) {
            DEBUG_LOG.fine(String.format(
                    "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize
                            - currentPoolSize.availablePermits(), poolSize));
                    "Connection pool is closing: availableConnections=%d, maxPoolSize=%d",
                    currentPoolSize(), maxPoolSize));
        }
        // Close the idle connections.
        if (idleTimeoutFuture != null) {
            idleTimeoutFuture.cancel(false);
            scheduler.release();
        }
        // Close all idle connections.
        for (final Connection connection : idleConnections) {
            closeConnection(connection);
            connection.close();
        }
        // Close the underlying factory.
@@ -636,13 +736,11 @@
            final QueueElement holder;
            synchronized (queue) {
                if (isClosed) {
                    throw new IllegalStateException("FixedConnectionPool is already closed");
                }
                if (hasWaitingConnections()) {
                    throw new IllegalStateException("CachedConnectionPool is already closed");
                } else if (hasWaitingConnections()) {
                    holder = queue.removeFirst();
                } else {
                    holder = new QueueElement(handler);
                    holder = new QueueElement(handler, currentTimeMillis());
                    queue.add(holder);
                }
            }
@@ -659,18 +757,18 @@
                } else {
                    // Close the stale connection and try again.
                    connection.close();
                    currentPoolSize.release();
                    availableConnections.release();
                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
                        DEBUG_LOG.fine(String.format(
                                "Connection no longer valid: currentPoolSize=%d, poolSize=%d",
                                poolSize - currentPoolSize.availablePermits(), poolSize));
                                "Connection no longer valid: availableConnections=%d, poolSize=%d",
                                currentPoolSize(), maxPoolSize));
                    }
                }
            } else {
                // Grow the pool if needed.
                final FutureResult<Connection> future = holder.getWaitingFuture();
                if (!future.isDone() && currentPoolSize.tryAcquire()) {
                if (!future.isDone() && availableConnections.tryAcquire()) {
                    factory.getConnectionAsync(connectionResultHandler);
                }
                return future;
@@ -681,10 +779,10 @@
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("FixedConnectionPool(");
        builder.append("CachedConnectionPool(");
        builder.append(String.valueOf(factory));
        builder.append(',');
        builder.append(poolSize);
        builder.append(maxPoolSize);
        builder.append(')');
        return builder.toString();
    }
@@ -699,15 +797,25 @@
        close();
    }
    private void closeConnection(final Connection connection) {
        // The connection will be closed, so decrease the pool size.
        currentPoolSize.release();
        connection.close();
    // Package private for unit testing.
    int currentPoolSize() {
        return maxPoolSize - availableConnections.availablePermits();
    }
        if (DEBUG_LOG.isLoggable(Level.FINE)) {
            DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: "
                    + "currentPoolSize=%d, poolSize=%d", poolSize
                    - currentPoolSize.availablePermits(), poolSize));
    /*
     * This method delegates to System.currentTimeMillis() except in unit tests
     * where we use injected times.
     */
    private long currentTimeMillis() {
        if (testTimeSource == null) {
            return System.currentTimeMillis();
        } else {
            try {
                return testTimeSource.call();
            } catch (final Exception e) {
                // Should not happen.
                throw new RuntimeException(e);
            }
        }
    }
@@ -727,21 +835,28 @@
            if (hasWaitingFutures()) {
                connectionPoolIsClosing = isClosed;
                holder = queue.removeFirst();
            } else if (isClosed) {
                connectionPoolIsClosing = true;
                holder = null;
            } else {
                if (isClosed) {
                    connectionPoolIsClosing = true;
                    holder = null;
                } else {
                    holder = new QueueElement(connection);
                    queue.add(holder);
                    return;
                }
                holder = new QueueElement(connection, currentTimeMillis());
                queue.add(holder);
                return;
            }
        }
        // There was waiting future, so complete it.
        if (connectionPoolIsClosing) {
            closeConnection(connection);
            // The connection will be closed, so decrease the pool size.
            availableConnections.release();
            connection.close();
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format(
                        "Closing connection because connection pool is closing: "
                                + "availableConnections=%d, maxPoolSize=%d", currentPoolSize(),
                        maxPoolSize));
            }
            if (holder != null) {
                final ErrorResultException e =
@@ -751,9 +866,8 @@
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format(
                            "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e
                                    .getMessage(), poolSize - currentPoolSize.availablePermits(),
                            poolSize));
                            "Connection attempt failed: %s, availableConnections=%d, poolSize=%d",
                            e.getMessage(), currentPoolSize(), maxPoolSize));
                }
            }
        } else {
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -73,9 +73,150 @@
    }
    /**
     * Creates a new connection pool which creates new connections as needed
     * using the provided connection factory, but will reuse previously
     * allocated connections when they are available.
     * <p>
     * Connections which have not been used for sixty seconds are closed and
     * removed from the pool. Thus, a pool that remains idle for long enough
     * will not contain any cached connections.
     * <p>
     * Connections obtained from the connection pool are guaranteed to be valid
     * immediately before being returned to the calling application. More
     * specifically, connections which have remained idle in the connection pool
     * for a long time and which have been remotely closed due to a time out
     * will never be returned. However, once a pooled connection has been
     * obtained it is the responsibility of the calling application to handle
     * subsequent connection failures, these being signaled via a
     * {@link ConnectionException}.
     *
     * @param factory
     *            The connection factory to use for creating new connections.
     * @return The new connection pool.
     * @throws NullPointerException
     *             If {@code factory} was {@code null}.
     */
    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory) {
        return new CachedConnectionPool(factory, 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, null);
    }
    /**
     * Creates a new connection pool which creates new connections as needed
     * using the provided connection factory, but will reuse previously
     * allocated connections when they are available.
     * <p>
     * Attempts to use more than {@code maximumPoolSize} connections at once
     * will block until a connection is released back to the pool. In other
     * words, this pool will prevent applications from using more than
     * {@code maximumPoolSize} connections at the same time.
     * <p>
     * Connections which have not been used for the provided {@code idleTimeout}
     * period are closed and removed from the pool, until there are only
     * {@code corePoolSize} connections remaining.
     * <p>
     * Connections obtained from the connection pool are guaranteed to be valid
     * immediately before being returned to the calling application. More
     * specifically, connections which have remained idle in the connection pool
     * for a long time and which have been remotely closed due to a time out
     * will never be returned. However, once a pooled connection has been
     * obtained it is the responsibility of the calling application to handle
     * subsequent connection failures, these being signaled via a
     * {@link ConnectionException}.
     *
     * @param factory
     *            The connection factory to use for creating new connections.
     * @param corePoolSize
     *            The minimum number of connections to keep in the pool, even if
     *            they are idle.
     * @param maximumPoolSize
     *            The maximum number of connections to allow in the pool.
     * @param idleTimeout
     *            The time out period, after which unused non-core connections
     *            will be closed.
     * @param unit
     *            The time unit for the {@code keepAliveTime} argument.
     * @return The new connection pool.
     * @throws IllegalArgumentException
     *             If {@code corePoolSize}, {@code maximumPoolSize} are less
     *             than or equal to zero, or if {@code idleTimeout} is negative,
     *             or if {@code corePoolSize} is greater than
     *             {@code maximumPoolSize}, or if {@code idleTimeout} is
     *             non-zero and {@code unit} is {@code null}.
     * @throws NullPointerException
     *             If {@code factory} was {@code null}.
     */
    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory,
            final int corePoolSize, final int maximumPoolSize, final long idleTimeout,
            final TimeUnit unit) {
        return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit,
                null);
    }
    /**
     * Creates a new connection pool which creates new connections as needed
     * using the provided connection factory, but will reuse previously
     * allocated connections when they are available.
     * <p>
     * Attempts to use more than {@code maximumPoolSize} connections at once
     * will block until a connection is released back to the pool. In other
     * words, this pool will prevent applications from using more than
     * {@code maximumPoolSize} connections at the same time.
     * <p>
     * Connections which have not been used for the provided {@code idleTimeout}
     * period are closed and removed from the pool, until there are only
     * {@code corePoolSize} connections remaining.
     * <p>
     * Connections obtained from the connection pool are guaranteed to be valid
     * immediately before being returned to the calling application. More
     * specifically, connections which have remained idle in the connection pool
     * for a long time and which have been remotely closed due to a time out
     * will never be returned. However, once a pooled connection has been
     * obtained it is the responsibility of the calling application to handle
     * subsequent connection failures, these being signaled via a
     * {@link ConnectionException}.
     *
     * @param factory
     *            The connection factory to use for creating new connections.
     * @param corePoolSize
     *            The minimum number of connections to keep in the pool, even if
     *            they are idle.
     * @param maximumPoolSize
     *            The maximum number of connections to allow in the pool.
     * @param idleTimeout
     *            The time out period, after which unused non-core connections
     *            will be closed.
     * @param unit
     *            The time unit for the {@code keepAliveTime} argument.
     * @param scheduler
     *            The scheduler which should be used for periodically checking
     *            for idle connections, or {@code null} if the default scheduler
     *            should be used.
     * @return The new connection pool.
     * @throws IllegalArgumentException
     *             If {@code corePoolSize}, {@code maximumPoolSize} are less
     *             than or equal to zero, or if {@code idleTimeout} is negative,
     *             or if {@code corePoolSize} is greater than
     *             {@code maximumPoolSize}, or if {@code idleTimeout} is
     *             non-zero and {@code unit} is {@code null}.
     * @throws NullPointerException
     *             If {@code factory} was {@code null}.
     */
    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory,
            final int corePoolSize, final int maximumPoolSize, final long idleTimeout,
            final TimeUnit unit, final ScheduledExecutorService scheduler) {
        return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit,
                scheduler);
    }
    /**
     * Creates a new connection pool which will maintain {@code poolSize}
     * connections created using the provided connection factory.
     * <p>
     * Attempts to use more than {@code poolSize} connections at once will block
     * until a connection is released back to the pool. In other words, this
     * pool will prevent applications from using more than {@code poolSize}
     * connections at the same time.
     * <p>
     * Connections obtained from the connection pool are guaranteed to be valid
     * immediately before being returned to the calling application. More
     * specifically, connections which have remained idle in the connection pool
@@ -97,9 +238,7 @@
     */
    public static ConnectionPool newFixedConnectionPool(final ConnectionFactory factory,
            final int poolSize) {
        Validator.ensureNotNull(factory);
        Validator.ensureTrue(poolSize >= 0, "negative pool size");
        return new FixedConnectionPool(factory, poolSize);
        return new CachedConnectionPool(factory, poolSize, poolSize, 0L, null, null);
    }
    /**
opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS.
 *      Portions copyright 2011-2013 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -44,6 +44,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.Requests;
@@ -414,4 +416,102 @@
        pool.close();
    }
    /**
     * Verifies that a pool with connection keep alive correctly purges idle
     * connections after the keepalive period has expired.
     *
     * @throws Exception
     *             If an unexpected error occurred.
     */
    @Test
    public void testConnectionKeepAliveExpiration() throws Exception {
        final Connection pooledConnection1 = mock(Connection.class, "pooledConnection1");
        final Connection pooledConnection2 = mock(Connection.class, "pooledConnection2");
        final Connection pooledConnection3 = mock(Connection.class, "pooledConnection3");
        final Connection pooledConnection4 = mock(Connection.class, "pooledConnection4");
        final Connection pooledConnection5 = mock(Connection.class, "pooledConnection5");
        final Connection pooledConnection6 = mock(Connection.class, "pooledConnection6");
        when(pooledConnection1.isValid()).thenReturn(true);
        when(pooledConnection2.isValid()).thenReturn(true);
        when(pooledConnection3.isValid()).thenReturn(true);
        when(pooledConnection4.isValid()).thenReturn(true);
        when(pooledConnection5.isValid()).thenReturn(true);
        when(pooledConnection6.isValid()).thenReturn(true);
        final ConnectionFactory factory =
                mockConnectionFactory(pooledConnection1, pooledConnection2, pooledConnection3,
                        pooledConnection4, pooledConnection5, pooledConnection6);
        final MockScheduler scheduler = new MockScheduler();
        final CachedConnectionPool pool =
                new CachedConnectionPool(factory, 2, 4, 100, TimeUnit.MILLISECONDS, scheduler);
        assertThat(scheduler.isScheduled()).isTrue();
        // First populate the pool with idle connections at time 0.
        @SuppressWarnings("unchecked")
        final Callable<Long> timeSource = mock(Callable.class);
        when(timeSource.call()).thenReturn(0L);
        pool.testTimeSource = timeSource;
        assertThat(pool.currentPoolSize()).isEqualTo(0);
        Connection c1 = pool.getConnection();
        Connection c2 = pool.getConnection();
        Connection c3 = pool.getConnection();
        Connection c4 = pool.getConnection();
        assertThat(pool.currentPoolSize()).isEqualTo(4);
        c1.close();
        c2.close();
        c3.close();
        c4.close();
        assertThat(pool.currentPoolSize()).isEqualTo(4);
        // First purge at time 50 is no-op because no connections have expired.
        when(timeSource.call()).thenReturn(50L);
        scheduler.getCommand().run();
        assertThat(pool.currentPoolSize()).isEqualTo(4);
        // Second purge at time 150 should remove 2 non-core connections.
        when(timeSource.call()).thenReturn(150L);
        scheduler.getCommand().run();
        assertThat(pool.currentPoolSize()).isEqualTo(2);
        verify(pooledConnection1, times(1)).close();
        verify(pooledConnection2, times(1)).close();
        verify(pooledConnection3, times(0)).close();
        verify(pooledConnection4, times(0)).close();
        // Regrow the pool at time 200.
        when(timeSource.call()).thenReturn(200L);
        Connection c5 = pool.getConnection(); // pooledConnection3
        Connection c6 = pool.getConnection(); // pooledConnection4
        Connection c7 = pool.getConnection(); // pooledConnection5
        Connection c8 = pool.getConnection(); // pooledConnection6
        assertThat(pool.currentPoolSize()).isEqualTo(4);
        c5.close();
        c6.close();
        c7.close();
        c8.close();
        assertThat(pool.currentPoolSize()).isEqualTo(4);
        // Third purge at time 250 should not remove any connections.
        when(timeSource.call()).thenReturn(250L);
        scheduler.getCommand().run();
        assertThat(pool.currentPoolSize()).isEqualTo(4);
        // Fourth purge at time 350 should remove 2 non-core connections.
        when(timeSource.call()).thenReturn(350L);
        scheduler.getCommand().run();
        assertThat(pool.currentPoolSize()).isEqualTo(2);
        verify(pooledConnection3, times(1)).close();
        verify(pooledConnection4, times(1)).close();
        verify(pooledConnection5, times(0)).close();
        verify(pooledConnection6, times(0)).close();
        pool.close();
        verify(pooledConnection5, times(1)).close();
        verify(pooledConnection6, times(1)).close();
        assertThat(scheduler.isScheduled()).isFalse();
    }
}
opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
@@ -1106,7 +1106,9 @@
                factory =
                        Connections.newHeartBeatConnectionFactory(factory,
                                heartBeatIntervalSeconds, TimeUnit.SECONDS);
                factory = Connections.newFixedConnectionPool(factory, connectionPoolSize);
                factory =
                        Connections.newCachedConnectionPool(factory, 0, connectionPoolSize, 60L,
                                TimeUnit.SECONDS);
            }
            servers.add(factory);
        }