From ec2e88cf45f3d212f3634a509c7e4d87b2081508 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 13 Sep 2013 15:09:02 +0000
Subject: [PATCH] Backport fix for OPENDJ-1091: Implement a cached connection pool

---
 opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java       |  232 +++++++++++++++++++------
 opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java            |    4 
 opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java |    9 
 opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java                |  145 +++++++++++++++
 opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java         |    8 
 opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java     |  102 +++++++++++
 6 files changed, 427 insertions(+), 73 deletions(-)

diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
index ac95d2a..87f212b 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -451,14 +451,14 @@
             final String remoteAddress = args[i];
             final int remotePort = Integer.parseInt(args[i + 1]);
 
-            factories.add(Connections.newFixedConnectionPool(Connections
+            factories.add(Connections.newCachedConnectionPool(Connections
                     .newAuthenticatedConnectionFactory(Connections
                             .newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
                                     remotePort)), Requests.newSimpleBindRequest(proxyDN,
-                            proxyPassword.toCharArray())), Integer.MAX_VALUE));
-            bindFactories.add(Connections.newFixedConnectionPool(Connections
+                            proxyPassword.toCharArray()))));
+            bindFactories.add(Connections.newCachedConnectionPool(Connections
                     .newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
-                            remotePort)), Integer.MAX_VALUE));
+                            remotePort))));
         }
         final RoundRobinLoadBalancingAlgorithm algorithm =
                 new RoundRobinLoadBalancingAlgorithm(factories);
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
index 454aa0f..7852a3b2 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
@@ -665,15 +665,14 @@
 
         // Create connection factories.
         final ConnectionFactory factory =
-                Connections.newFixedConnectionPool(
+                Connections.newCachedConnectionPool(
                         Connections.newAuthenticatedConnectionFactory(
                                 new LDAPConnectionFactory(remoteAddress, remotePort),
                                 Requests.newSimpleBindRequest(
-                                        proxyDN, proxyPassword.toCharArray())),
-                        Integer.MAX_VALUE);
+                                        proxyDN, proxyPassword.toCharArray())));
         final ConnectionFactory bindFactory =
-                Connections.newFixedConnectionPool(new LDAPConnectionFactory(
-                        remoteAddress, remotePort), Integer.MAX_VALUE);
+                Connections.newCachedConnectionPool(new LDAPConnectionFactory(
+                        remoteAddress, remotePort));
 
         // Create a server connection adapter.
         final ProxyBackend backend = new ProxyBackend(factory, bindFactory);
diff --git a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
similarity index 76%
rename from opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
rename to opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index 5225af8..48511a7 100644
--- a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -27,16 +27,20 @@
 
 package org.forgerock.opendj.ldap;
 
-import static com.forgerock.opendj.util.StaticUtils.*;
-
-import static org.forgerock.opendj.ldap.CoreMessages.*;
-import static org.forgerock.opendj.ldap.ErrorResultException.*;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
+import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
 
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 
@@ -62,13 +66,15 @@
 
 import com.forgerock.opendj.util.AsynchronousFutureResult;
 import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.ReferenceCountedObject;
 import com.forgerock.opendj.util.Validator;
 
 /**
- * A simple connection pool implementation which maintains a fixed number of
- * connections.
+ * A connection pool implementation which maintains a cache of pooled
+ * connections with a configurable core pool size, maximum size, and expiration
+ * policy.
  */
-final class FixedConnectionPool implements ConnectionPool {
+final class CachedConnectionPool implements ConnectionPool {
 
     /**
      * This result handler is invoked when an attempt to add a new connection to
@@ -79,13 +85,12 @@
         @Override
         public void handleErrorResult(final ErrorResultException error) {
             // Connection attempt failed, so decrease the pool size.
-            currentPoolSize.release();
+            availableConnections.release();
 
             if (DEBUG_LOG.isLoggable(Level.FINE)) {
                 DEBUG_LOG.fine(String.format(
-                        "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error
-                                .getMessage(), poolSize - currentPoolSize.availablePermits(),
-                        poolSize));
+                        "Connection attempt failed: %s, availableConnections=%d, maxPoolSize=%d",
+                        error.getMessage(), currentPoolSize(), maxPoolSize));
             }
 
             QueueElement holder;
@@ -106,8 +111,8 @@
         public void handleResult(final Connection connection) {
             if (DEBUG_LOG.isLoggable(Level.FINE)) {
                 DEBUG_LOG.fine(String.format(
-                        "Connection attempt succeeded:  currentPoolSize=%d, poolSize=%d", poolSize
-                                - currentPoolSize.availablePermits(), poolSize));
+                        "Connection attempt succeeded:  availableConnections=%d, maxPoolSize=%d",
+                        currentPoolSize(), maxPoolSize));
             }
             publishConnection(connection);
         }
@@ -191,7 +196,7 @@
         }
 
         @Override
-        public Result applyChange(ChangeRecord request) throws ErrorResultException {
+        public Result applyChange(final ChangeRecord request) throws ErrorResultException {
             return checkState().applyChange(request);
         }
 
@@ -249,15 +254,15 @@
                  * server, but the server may still be available. In order to
                  * avoid leaving pending futures hanging indefinitely, we should
                  * try to reconnect immediately. No need to release/acquire
-                 * currentPoolSize.
+                 * availableConnections.
                  */
                 connection.close();
                 factory.getConnectionAsync(connectionResultHandler);
 
                 if (DEBUG_LOG.isLoggable(Level.FINE)) {
                     DEBUG_LOG.fine(String.format(
-                            "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize
-                                    - currentPoolSize.availablePermits(), poolSize));
+                            "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
+                            currentPoolSize(), maxPoolSize));
                 }
             }
 
@@ -527,7 +532,54 @@
             }
             return connection;
         }
+    }
 
+    /**
+     * Scheduled task responsible for purging non-core pooled connections which
+     * have been idle for longer than the idle timeout limit.
+     */
+    private final class PurgeIdleConnectionsTask implements Runnable {
+        @Override
+        public void run() {
+            final List<Connection> idleConnections;
+            synchronized (queue) {
+                if (isClosed) {
+                    return;
+                }
+
+                /*
+                 * Obtain a list of expired connections but don't close them yet
+                 * since we don't want to hold the lock too long.
+                 */
+                idleConnections = new LinkedList<Connection>();
+                final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis;
+                int nonCoreConnectionCount = currentPoolSize() - corePoolSize;
+                for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0
+                        && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) {
+                    idleConnections.add(holder.getWaitingConnection());
+                    queue.poll();
+                    availableConnections.release();
+                    nonCoreConnectionCount--;
+                }
+            }
+
+            // Close the idle connections.
+            if (!idleConnections.isEmpty()) {
+                if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                    DEBUG_LOG.fine(String.format("Closing %d idle pooled connections: "
+                            + "availableConnections=%d, maxPoolSize=%d", idleConnections.size(),
+                            currentPoolSize(), maxPoolSize));
+                }
+                for (final Connection connection : idleConnections) {
+                    connection.close();
+                }
+            }
+        }
+
+        private boolean isTimedOutQueuedConnection(final QueueElement holder,
+                final long timeoutMillis) {
+            return holder != null && !holder.isWaitingFuture() && holder.hasTimedOut(timeoutMillis);
+        }
     }
 
     /**
@@ -536,16 +588,19 @@
      * connection request.
      */
     private static final class QueueElement {
+        private final long timestampMillis;
         private final Object value;
 
-        QueueElement(final Connection connection) {
+        QueueElement(final Connection connection, final long timestampMillis) {
             this.value = connection;
+            this.timestampMillis = timestampMillis;
         }
 
-        QueueElement(final ResultHandler<? super Connection> handler) {
+        QueueElement(final ResultHandler<? super Connection> handler, final long timestampMillis) {
             this.value =
                     new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
                             handler);
+            this.timestampMillis = timestampMillis;
         }
 
         @Override
@@ -566,22 +621,61 @@
             return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value;
         }
 
+        boolean hasTimedOut(final long timeLimitMillis) {
+            return timestampMillis < timeLimitMillis;
+        }
+
         boolean isWaitingFuture() {
             return value instanceof AsynchronousFutureResult;
         }
     }
 
+    /**
+     * This is intended for unit testing only in order to inject fake time
+     * stamps. Use System.currentTimeMillis() when null.
+     */
+    Callable<Long> testTimeSource = null;
+
+    private final Semaphore availableConnections;
     private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
-    private final Semaphore currentPoolSize;
+    private final int corePoolSize;
+
     private final ConnectionFactory factory;
     private boolean isClosed = false;
-    private final int poolSize;
+    private final ScheduledFuture<?> idleTimeoutFuture;
+    private final long idleTimeoutMillis;
+    private final int maxPoolSize;
     private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
 
-    FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
+    CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
+            final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
+            final ScheduledExecutorService scheduler) {
+        Validator.ensureNotNull(factory);
+        Validator.ensureTrue(corePoolSize >= 0, "corePoolSize < 0");
+        Validator.ensureTrue(maximumPoolSize > 0, "maxPoolSize <= 0");
+        Validator.ensureTrue(corePoolSize <= maximumPoolSize, "corePoolSize > maxPoolSize");
+        Validator.ensureTrue(idleTimeout >= 0, "idleTimeout < 0");
+        Validator.ensureTrue(idleTimeout == 0 || unit != null, "time unit is null");
+
         this.factory = factory;
-        this.poolSize = poolSize;
-        this.currentPoolSize = new Semaphore(poolSize);
+        this.corePoolSize = corePoolSize;
+        this.maxPoolSize = maximumPoolSize;
+        this.availableConnections = new Semaphore(maximumPoolSize);
+
+        if (corePoolSize < maximumPoolSize && idleTimeout > 0) {
+            // Dynamic pool.
+            this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
+            this.idleTimeoutMillis = unit.toMillis(idleTimeout);
+            this.idleTimeoutFuture =
+                    this.scheduler.get().scheduleWithFixedDelay(new PurgeIdleConnectionsTask(),
+                            idleTimeout, idleTimeout, unit);
+        } else {
+            // Fixed pool.
+            this.scheduler = null;
+            this.idleTimeoutMillis = 0;
+            this.idleTimeoutFuture = null;
+        }
     }
 
     @Override
@@ -601,18 +695,24 @@
             while (hasWaitingConnections()) {
                 final QueueElement holder = queue.removeFirst();
                 idleConnections.add(holder.getWaitingConnection());
+                availableConnections.release();
             }
         }
 
         if (DEBUG_LOG.isLoggable(Level.FINE)) {
             DEBUG_LOG.fine(String.format(
-                    "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize
-                            - currentPoolSize.availablePermits(), poolSize));
+                    "Connection pool is closing: availableConnections=%d, maxPoolSize=%d",
+                    currentPoolSize(), maxPoolSize));
         }
 
-        // Close the idle connections.
+        if (idleTimeoutFuture != null) {
+            idleTimeoutFuture.cancel(false);
+            scheduler.release();
+        }
+
+        // Close all idle connections.
         for (final Connection connection : idleConnections) {
-            closeConnection(connection);
+            connection.close();
         }
 
         // Close the underlying factory.
@@ -636,13 +736,11 @@
             final QueueElement holder;
             synchronized (queue) {
                 if (isClosed) {
-                    throw new IllegalStateException("FixedConnectionPool is already closed");
-                }
-
-                if (hasWaitingConnections()) {
+                    throw new IllegalStateException("CachedConnectionPool is already closed");
+                } else if (hasWaitingConnections()) {
                     holder = queue.removeFirst();
                 } else {
-                    holder = new QueueElement(handler);
+                    holder = new QueueElement(handler, currentTimeMillis());
                     queue.add(holder);
                 }
             }
@@ -659,18 +757,18 @@
                 } else {
                     // Close the stale connection and try again.
                     connection.close();
-                    currentPoolSize.release();
+                    availableConnections.release();
 
                     if (DEBUG_LOG.isLoggable(Level.FINE)) {
                         DEBUG_LOG.fine(String.format(
-                                "Connection no longer valid: currentPoolSize=%d, poolSize=%d",
-                                poolSize - currentPoolSize.availablePermits(), poolSize));
+                                "Connection no longer valid: availableConnections=%d, poolSize=%d",
+                                currentPoolSize(), maxPoolSize));
                     }
                 }
             } else {
                 // Grow the pool if needed.
                 final FutureResult<Connection> future = holder.getWaitingFuture();
-                if (!future.isDone() && currentPoolSize.tryAcquire()) {
+                if (!future.isDone() && availableConnections.tryAcquire()) {
                     factory.getConnectionAsync(connectionResultHandler);
                 }
                 return future;
@@ -681,10 +779,10 @@
     @Override
     public String toString() {
         final StringBuilder builder = new StringBuilder();
-        builder.append("FixedConnectionPool(");
+        builder.append("CachedConnectionPool(");
         builder.append(String.valueOf(factory));
         builder.append(',');
-        builder.append(poolSize);
+        builder.append(maxPoolSize);
         builder.append(')');
         return builder.toString();
     }
@@ -699,15 +797,25 @@
         close();
     }
 
-    private void closeConnection(final Connection connection) {
-        // The connection will be closed, so decrease the pool size.
-        currentPoolSize.release();
-        connection.close();
+    // Package private for unit testing.
+    int currentPoolSize() {
+        return maxPoolSize - availableConnections.availablePermits();
+    }
 
-        if (DEBUG_LOG.isLoggable(Level.FINE)) {
-            DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: "
-                    + "currentPoolSize=%d, poolSize=%d", poolSize
-                    - currentPoolSize.availablePermits(), poolSize));
+    /*
+     * This method delegates to System.currentTimeMillis() except in unit tests
+     * where we use injected times.
+     */
+    private long currentTimeMillis() {
+        if (testTimeSource == null) {
+            return System.currentTimeMillis();
+        } else {
+            try {
+                return testTimeSource.call();
+            } catch (final Exception e) {
+                // Should not happen.
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -727,21 +835,28 @@
             if (hasWaitingFutures()) {
                 connectionPoolIsClosing = isClosed;
                 holder = queue.removeFirst();
+            } else if (isClosed) {
+                connectionPoolIsClosing = true;
+                holder = null;
             } else {
-                if (isClosed) {
-                    connectionPoolIsClosing = true;
-                    holder = null;
-                } else {
-                    holder = new QueueElement(connection);
-                    queue.add(holder);
-                    return;
-                }
+                holder = new QueueElement(connection, currentTimeMillis());
+                queue.add(holder);
+                return;
             }
         }
 
         // There was waiting future, so complete it.
         if (connectionPoolIsClosing) {
-            closeConnection(connection);
+            // The connection will be closed, so decrease the pool size.
+            availableConnections.release();
+            connection.close();
+
+            if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                DEBUG_LOG.fine(String.format(
+                        "Closing connection because connection pool is closing: "
+                                + "availableConnections=%d, maxPoolSize=%d", currentPoolSize(),
+                        maxPoolSize));
+            }
 
             if (holder != null) {
                 final ErrorResultException e =
@@ -751,9 +866,8 @@
 
                 if (DEBUG_LOG.isLoggable(Level.FINE)) {
                     DEBUG_LOG.fine(String.format(
-                            "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e
-                                    .getMessage(), poolSize - currentPoolSize.availablePermits(),
-                            poolSize));
+                            "Connection attempt failed: %s, availableConnections=%d, poolSize=%d",
+                            e.getMessage(), currentPoolSize(), maxPoolSize));
                 }
             }
         } else {
diff --git a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index 42f31b2..985c887 100644
--- a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -73,9 +73,150 @@
     }
 
     /**
+     * Creates a new connection pool which creates new connections as needed
+     * using the provided connection factory, but will reuse previously
+     * allocated connections when they are available.
+     * <p>
+     * Connections which have not been used for sixty seconds are closed and
+     * removed from the pool. Thus, a pool that remains idle for long enough
+     * will not contain any cached connections.
+     * <p>
+     * Connections obtained from the connection pool are guaranteed to be valid
+     * immediately before being returned to the calling application. More
+     * specifically, connections which have remained idle in the connection pool
+     * for a long time and which have been remotely closed due to a time out
+     * will never be returned. However, once a pooled connection has been
+     * obtained it is the responsibility of the calling application to handle
+     * subsequent connection failures, these being signaled via a
+     * {@link ConnectionException}.
+     *
+     * @param factory
+     *            The connection factory to use for creating new connections.
+     * @return The new connection pool.
+     * @throws NullPointerException
+     *             If {@code factory} was {@code null}.
+     */
+    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory) {
+        return new CachedConnectionPool(factory, 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, null);
+    }
+
+    /**
+     * Creates a new connection pool which creates new connections as needed
+     * using the provided connection factory, but will reuse previously
+     * allocated connections when they are available.
+     * <p>
+     * Attempts to use more than {@code maximumPoolSize} connections at once
+     * will block until a connection is released back to the pool. In other
+     * words, this pool will prevent applications from using more than
+     * {@code maximumPoolSize} connections at the same time.
+     * <p>
+     * Connections which have not been used for the provided {@code idleTimeout}
+     * period are closed and removed from the pool, until there are only
+     * {@code corePoolSize} connections remaining.
+     * <p>
+     * Connections obtained from the connection pool are guaranteed to be valid
+     * immediately before being returned to the calling application. More
+     * specifically, connections which have remained idle in the connection pool
+     * for a long time and which have been remotely closed due to a time out
+     * will never be returned. However, once a pooled connection has been
+     * obtained it is the responsibility of the calling application to handle
+     * subsequent connection failures, these being signaled via a
+     * {@link ConnectionException}.
+     *
+     * @param factory
+     *            The connection factory to use for creating new connections.
+     * @param corePoolSize
+     *            The minimum number of connections to keep in the pool, even if
+     *            they are idle.
+     * @param maximumPoolSize
+     *            The maximum number of connections to allow in the pool.
+     * @param idleTimeout
+     *            The time out period, after which unused non-core connections
+     *            will be closed.
+     * @param unit
+     *            The time unit for the {@code keepAliveTime} argument.
+     * @return The new connection pool.
+     * @throws IllegalArgumentException
+     *             If {@code corePoolSize}, {@code maximumPoolSize} are less
+     *             than or equal to zero, or if {@code idleTimeout} is negative,
+     *             or if {@code corePoolSize} is greater than
+     *             {@code maximumPoolSize}, or if {@code idleTimeout} is
+     *             non-zero and {@code unit} is {@code null}.
+     * @throws NullPointerException
+     *             If {@code factory} was {@code null}.
+     */
+    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory,
+            final int corePoolSize, final int maximumPoolSize, final long idleTimeout,
+            final TimeUnit unit) {
+        return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit,
+                null);
+    }
+
+    /**
+     * Creates a new connection pool which creates new connections as needed
+     * using the provided connection factory, but will reuse previously
+     * allocated connections when they are available.
+     * <p>
+     * Attempts to use more than {@code maximumPoolSize} connections at once
+     * will block until a connection is released back to the pool. In other
+     * words, this pool will prevent applications from using more than
+     * {@code maximumPoolSize} connections at the same time.
+     * <p>
+     * Connections which have not been used for the provided {@code idleTimeout}
+     * period are closed and removed from the pool, until there are only
+     * {@code corePoolSize} connections remaining.
+     * <p>
+     * Connections obtained from the connection pool are guaranteed to be valid
+     * immediately before being returned to the calling application. More
+     * specifically, connections which have remained idle in the connection pool
+     * for a long time and which have been remotely closed due to a time out
+     * will never be returned. However, once a pooled connection has been
+     * obtained it is the responsibility of the calling application to handle
+     * subsequent connection failures, these being signaled via a
+     * {@link ConnectionException}.
+     *
+     * @param factory
+     *            The connection factory to use for creating new connections.
+     * @param corePoolSize
+     *            The minimum number of connections to keep in the pool, even if
+     *            they are idle.
+     * @param maximumPoolSize
+     *            The maximum number of connections to allow in the pool.
+     * @param idleTimeout
+     *            The time out period, after which unused non-core connections
+     *            will be closed.
+     * @param unit
+     *            The time unit for the {@code keepAliveTime} argument.
+     * @param scheduler
+     *            The scheduler which should be used for periodically checking
+     *            for idle connections, or {@code null} if the default scheduler
+     *            should be used.
+     * @return The new connection pool.
+     * @throws IllegalArgumentException
+     *             If {@code corePoolSize}, {@code maximumPoolSize} are less
+     *             than or equal to zero, or if {@code idleTimeout} is negative,
+     *             or if {@code corePoolSize} is greater than
+     *             {@code maximumPoolSize}, or if {@code idleTimeout} is
+     *             non-zero and {@code unit} is {@code null}.
+     * @throws NullPointerException
+     *             If {@code factory} was {@code null}.
+     */
+    public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory,
+            final int corePoolSize, final int maximumPoolSize, final long idleTimeout,
+            final TimeUnit unit, final ScheduledExecutorService scheduler) {
+        return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit,
+                scheduler);
+    }
+
+    /**
      * Creates a new connection pool which will maintain {@code poolSize}
      * connections created using the provided connection factory.
      * <p>
+     * Attempts to use more than {@code poolSize} connections at once will block
+     * until a connection is released back to the pool. In other words, this
+     * pool will prevent applications from using more than {@code poolSize}
+     * connections at the same time.
+     * <p>
      * Connections obtained from the connection pool are guaranteed to be valid
      * immediately before being returned to the calling application. More
      * specifically, connections which have remained idle in the connection pool
@@ -97,9 +238,7 @@
      */
     public static ConnectionPool newFixedConnectionPool(final ConnectionFactory factory,
             final int poolSize) {
-        Validator.ensureNotNull(factory);
-        Validator.ensureTrue(poolSize >= 0, "negative pool size");
-        return new FixedConnectionPool(factory, poolSize);
+        return new CachedConnectionPool(factory, poolSize, poolSize, 0L, null, null);
     }
 
     /**
diff --git a/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java b/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
index 226d8e7..83348e3 100644
--- a/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
+++ b/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2012 ForgeRock AS.
+ *      Portions copyright 2011-2013 ForgeRock AS.
  */
 
 package org.forgerock.opendj.ldap;
@@ -44,6 +44,8 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import org.forgerock.opendj.ldap.requests.BindRequest;
 import org.forgerock.opendj.ldap.requests.Requests;
@@ -414,4 +416,102 @@
         pool.close();
     }
 
+    /**
+     * Verifies that a pool with connection keep alive correctly purges idle
+     * connections after the keepalive period has expired.
+     *
+     * @throws Exception
+     *             If an unexpected error occurred.
+     */
+    @Test
+    public void testConnectionKeepAliveExpiration() throws Exception {
+        final Connection pooledConnection1 = mock(Connection.class, "pooledConnection1");
+        final Connection pooledConnection2 = mock(Connection.class, "pooledConnection2");
+        final Connection pooledConnection3 = mock(Connection.class, "pooledConnection3");
+        final Connection pooledConnection4 = mock(Connection.class, "pooledConnection4");
+        final Connection pooledConnection5 = mock(Connection.class, "pooledConnection5");
+        final Connection pooledConnection6 = mock(Connection.class, "pooledConnection6");
+
+        when(pooledConnection1.isValid()).thenReturn(true);
+        when(pooledConnection2.isValid()).thenReturn(true);
+        when(pooledConnection3.isValid()).thenReturn(true);
+        when(pooledConnection4.isValid()).thenReturn(true);
+        when(pooledConnection5.isValid()).thenReturn(true);
+        when(pooledConnection6.isValid()).thenReturn(true);
+
+        final ConnectionFactory factory =
+                mockConnectionFactory(pooledConnection1, pooledConnection2, pooledConnection3,
+                        pooledConnection4, pooledConnection5, pooledConnection6);
+        final MockScheduler scheduler = new MockScheduler();
+        final CachedConnectionPool pool =
+                new CachedConnectionPool(factory, 2, 4, 100, TimeUnit.MILLISECONDS, scheduler);
+        assertThat(scheduler.isScheduled()).isTrue();
+
+        // First populate the pool with idle connections at time 0.
+        @SuppressWarnings("unchecked")
+        final Callable<Long> timeSource = mock(Callable.class);
+        when(timeSource.call()).thenReturn(0L);
+        pool.testTimeSource = timeSource;
+
+        assertThat(pool.currentPoolSize()).isEqualTo(0);
+        Connection c1 = pool.getConnection();
+        Connection c2 = pool.getConnection();
+        Connection c3 = pool.getConnection();
+        Connection c4 = pool.getConnection();
+        assertThat(pool.currentPoolSize()).isEqualTo(4);
+        c1.close();
+        c2.close();
+        c3.close();
+        c4.close();
+        assertThat(pool.currentPoolSize()).isEqualTo(4);
+
+        // First purge at time 50 is no-op because no connections have expired.
+        when(timeSource.call()).thenReturn(50L);
+        scheduler.getCommand().run();
+        assertThat(pool.currentPoolSize()).isEqualTo(4);
+
+        // Second purge at time 150 should remove 2 non-core connections.
+        when(timeSource.call()).thenReturn(150L);
+        scheduler.getCommand().run();
+        assertThat(pool.currentPoolSize()).isEqualTo(2);
+
+        verify(pooledConnection1, times(1)).close();
+        verify(pooledConnection2, times(1)).close();
+        verify(pooledConnection3, times(0)).close();
+        verify(pooledConnection4, times(0)).close();
+
+        // Regrow the pool at time 200.
+        when(timeSource.call()).thenReturn(200L);
+        Connection c5 = pool.getConnection(); // pooledConnection3
+        Connection c6 = pool.getConnection(); // pooledConnection4
+        Connection c7 = pool.getConnection(); // pooledConnection5
+        Connection c8 = pool.getConnection(); // pooledConnection6
+        assertThat(pool.currentPoolSize()).isEqualTo(4);
+        c5.close();
+        c6.close();
+        c7.close();
+        c8.close();
+        assertThat(pool.currentPoolSize()).isEqualTo(4);
+
+        // Third purge at time 250 should not remove any connections.
+        when(timeSource.call()).thenReturn(250L);
+        scheduler.getCommand().run();
+        assertThat(pool.currentPoolSize()).isEqualTo(4);
+
+        // Fourth purge at time 350 should remove 2 non-core connections.
+        when(timeSource.call()).thenReturn(350L);
+        scheduler.getCommand().run();
+        assertThat(pool.currentPoolSize()).isEqualTo(2);
+
+        verify(pooledConnection3, times(1)).close();
+        verify(pooledConnection4, times(1)).close();
+        verify(pooledConnection5, times(0)).close();
+        verify(pooledConnection6, times(0)).close();
+
+        pool.close();
+        verify(pooledConnection5, times(1)).close();
+        verify(pooledConnection6, times(1)).close();
+        assertThat(scheduler.isScheduled()).isFalse();
+    }
+
 }
diff --git a/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java b/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
index b9cbbe1..7f84a81 100644
--- a/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
+++ b/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
@@ -1106,7 +1106,9 @@
                 factory =
                         Connections.newHeartBeatConnectionFactory(factory,
                                 heartBeatIntervalSeconds, TimeUnit.SECONDS);
-                factory = Connections.newFixedConnectionPool(factory, connectionPoolSize);
+                factory =
+                        Connections.newCachedConnectionPool(factory, 0, connectionPoolSize, 60L,
+                                TimeUnit.SECONDS);
             }
             servers.add(factory);
         }

--
Gitblit v1.10.0