From 77c14ffd8232293dc8fb1a7446ddf2e69ca4b7ff Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Sat, 07 Sep 2013 00:08:17 +0000
Subject: [PATCH] Fix OPENDJ-1091: Implement a cached connection pool

---
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java |  232 +++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 173 insertions(+), 59 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
similarity index 76%
rename from opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
rename to opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index 5225af8..48511a7 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -27,16 +27,20 @@
 
 package org.forgerock.opendj.ldap;
 
-import static com.forgerock.opendj.util.StaticUtils.*;
-
-import static org.forgerock.opendj.ldap.CoreMessages.*;
-import static org.forgerock.opendj.ldap.ErrorResultException.*;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
+import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
 
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 
@@ -62,13 +66,15 @@
 
 import com.forgerock.opendj.util.AsynchronousFutureResult;
 import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.ReferenceCountedObject;
 import com.forgerock.opendj.util.Validator;
 
 /**
- * A simple connection pool implementation which maintains a fixed number of
- * connections.
+ * A connection pool implementation which maintains a cache of pooled
+ * connections with a configurable core pool size, maximum size, and expiration
+ * policy.
  */
-final class FixedConnectionPool implements ConnectionPool {
+final class CachedConnectionPool implements ConnectionPool {
 
     /**
      * This result handler is invoked when an attempt to add a new connection to
@@ -79,13 +85,12 @@
         @Override
         public void handleErrorResult(final ErrorResultException error) {
             // Connection attempt failed, so decrease the pool size.
-            currentPoolSize.release();
+            availableConnections.release();
 
             if (DEBUG_LOG.isLoggable(Level.FINE)) {
                 DEBUG_LOG.fine(String.format(
-                        "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error
-                                .getMessage(), poolSize - currentPoolSize.availablePermits(),
-                        poolSize));
+                        "Connection attempt failed: %s, availableConnections=%d, maxPoolSize=%d",
+                        error.getMessage(), currentPoolSize(), maxPoolSize));
             }
 
             QueueElement holder;
@@ -106,8 +111,8 @@
         public void handleResult(final Connection connection) {
             if (DEBUG_LOG.isLoggable(Level.FINE)) {
                 DEBUG_LOG.fine(String.format(
-                        "Connection attempt succeeded:  currentPoolSize=%d, poolSize=%d", poolSize
-                                - currentPoolSize.availablePermits(), poolSize));
+                        "Connection attempt succeeded:  availableConnections=%d, maxPoolSize=%d",
+                        currentPoolSize(), maxPoolSize));
             }
             publishConnection(connection);
         }
@@ -191,7 +196,7 @@
         }
 
         @Override
-        public Result applyChange(ChangeRecord request) throws ErrorResultException {
+        public Result applyChange(final ChangeRecord request) throws ErrorResultException {
             return checkState().applyChange(request);
         }
 
@@ -249,15 +254,15 @@
                  * server, but the server may still be available. In order to
                  * avoid leaving pending futures hanging indefinitely, we should
                  * try to reconnect immediately. No need to release/acquire
-                 * currentPoolSize.
+                 * availableConnections.
                  */
                 connection.close();
                 factory.getConnectionAsync(connectionResultHandler);
 
                 if (DEBUG_LOG.isLoggable(Level.FINE)) {
                     DEBUG_LOG.fine(String.format(
-                            "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize
-                                    - currentPoolSize.availablePermits(), poolSize));
+                            "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
+                            currentPoolSize(), maxPoolSize));
                 }
             }
 
@@ -527,7 +532,54 @@
             }
             return connection;
         }
+    }
 
+    /**
+     * Scheduled task responsible for purging non-core pooled connections which
+     * have been idle for longer than the idle timeout limit.
+     */
+    private final class PurgeIdleConnectionsTask implements Runnable {
+        @Override
+        public void run() {
+            final List<Connection> idleConnections;
+            synchronized (queue) {
+                if (isClosed) {
+                    return;
+                }
+
+                /*
+                 * Obtain a list of expired connections but don't close them yet
+                 * since we don't want to hold the lock too long.
+                 */
+                idleConnections = new LinkedList<Connection>();
+                final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis;
+                int nonCoreConnectionCount = currentPoolSize() - corePoolSize;
+                for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0
+                        && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) {
+                    idleConnections.add(holder.getWaitingConnection());
+                    queue.poll();
+                    availableConnections.release();
+                    nonCoreConnectionCount--;
+                }
+            }
+
+            // Close the idle connections.
+            if (!idleConnections.isEmpty()) {
+                if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                    DEBUG_LOG.fine(String.format("Closing %d idle pooled connections: "
+                            + "availableConnections=%d, maxPoolSize=%d", idleConnections.size(),
+                            currentPoolSize(), maxPoolSize));
+                }
+                for (final Connection connection : idleConnections) {
+                    connection.close();
+                }
+            }
+        }
+
+        private boolean isTimedOutQueuedConnection(final QueueElement holder,
+                final long timeoutMillis) {
+            return holder != null && !holder.isWaitingFuture() && holder.hasTimedOut(timeoutMillis);
+        }
     }
 
     /**
@@ -536,16 +588,19 @@
      * connection request.
      */
     private static final class QueueElement {
+        private final long timestampMillis;
         private final Object value;
 
-        QueueElement(final Connection connection) {
+        QueueElement(final Connection connection, final long timestampMillis) {
             this.value = connection;
+            this.timestampMillis = timestampMillis;
         }
 
-        QueueElement(final ResultHandler<? super Connection> handler) {
+        QueueElement(final ResultHandler<? super Connection> handler, final long timestampMillis) {
             this.value =
                     new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
                             handler);
+            this.timestampMillis = timestampMillis;
         }
 
         @Override
@@ -566,22 +621,61 @@
             return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value;
         }
 
+        boolean hasTimedOut(final long timeLimitMillis) {
+            return timestampMillis < timeLimitMillis;
+        }
+
         boolean isWaitingFuture() {
             return value instanceof AsynchronousFutureResult;
         }
     }
 
+    /**
+     * This is intended for unit testing only in order to inject fake time
+     * stamps. Use System.currentTimeMillis() when null.
+     */
+    Callable<Long> testTimeSource = null;
+
+    private final Semaphore availableConnections;
     private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
-    private final Semaphore currentPoolSize;
+    private final int corePoolSize;
+
     private final ConnectionFactory factory;
     private boolean isClosed = false;
-    private final int poolSize;
+    private final ScheduledFuture<?> idleTimeoutFuture;
+    private final long idleTimeoutMillis;
+    private final int maxPoolSize;
     private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
 
-    FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
+    CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
+            final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
+            final ScheduledExecutorService scheduler) {
+        Validator.ensureNotNull(factory);
+        Validator.ensureTrue(corePoolSize >= 0, "corePoolSize < 0");
+        Validator.ensureTrue(maximumPoolSize > 0, "maxPoolSize <= 0");
+        Validator.ensureTrue(corePoolSize <= maximumPoolSize, "corePoolSize > maxPoolSize");
+        Validator.ensureTrue(idleTimeout >= 0, "idleTimeout < 0");
+        Validator.ensureTrue(idleTimeout == 0 || unit != null, "time unit is null");
+
         this.factory = factory;
-        this.poolSize = poolSize;
-        this.currentPoolSize = new Semaphore(poolSize);
+        this.corePoolSize = corePoolSize;
+        this.maxPoolSize = maximumPoolSize;
+        this.availableConnections = new Semaphore(maximumPoolSize);
+
+        if (corePoolSize < maximumPoolSize && idleTimeout > 0) {
+            // Dynamic pool.
+            this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
+            this.idleTimeoutMillis = unit.toMillis(idleTimeout);
+            this.idleTimeoutFuture =
+                    this.scheduler.get().scheduleWithFixedDelay(new PurgeIdleConnectionsTask(),
+                            idleTimeout, idleTimeout, unit);
+        } else {
+            // Fixed pool.
+            this.scheduler = null;
+            this.idleTimeoutMillis = 0;
+            this.idleTimeoutFuture = null;
+        }
     }
 
     @Override
@@ -601,18 +695,24 @@
             while (hasWaitingConnections()) {
                 final QueueElement holder = queue.removeFirst();
                 idleConnections.add(holder.getWaitingConnection());
+                availableConnections.release();
             }
         }
 
         if (DEBUG_LOG.isLoggable(Level.FINE)) {
             DEBUG_LOG.fine(String.format(
-                    "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize
-                            - currentPoolSize.availablePermits(), poolSize));
+                    "Connection pool is closing: availableConnections=%d, maxPoolSize=%d",
+                    currentPoolSize(), maxPoolSize));
         }
 
-        // Close the idle connections.
+        if (idleTimeoutFuture != null) {
+            idleTimeoutFuture.cancel(false);
+            scheduler.release();
+        }
+
+        // Close all idle connections.
         for (final Connection connection : idleConnections) {
-            closeConnection(connection);
+            connection.close();
         }
 
         // Close the underlying factory.
@@ -636,13 +736,11 @@
             final QueueElement holder;
             synchronized (queue) {
                 if (isClosed) {
-                    throw new IllegalStateException("FixedConnectionPool is already closed");
-                }
-
-                if (hasWaitingConnections()) {
+                    throw new IllegalStateException("CachedConnectionPool is already closed");
+                } else if (hasWaitingConnections()) {
                     holder = queue.removeFirst();
                 } else {
-                    holder = new QueueElement(handler);
+                    holder = new QueueElement(handler, currentTimeMillis());
                     queue.add(holder);
                 }
             }
@@ -659,18 +757,18 @@
                 } else {
                     // Close the stale connection and try again.
                     connection.close();
-                    currentPoolSize.release();
+                    availableConnections.release();
 
                     if (DEBUG_LOG.isLoggable(Level.FINE)) {
                         DEBUG_LOG.fine(String.format(
-                                "Connection no longer valid: currentPoolSize=%d, poolSize=%d",
-                                poolSize - currentPoolSize.availablePermits(), poolSize));
+                                "Connection no longer valid: availableConnections=%d, poolSize=%d",
+                                currentPoolSize(), maxPoolSize));
                     }
                 }
             } else {
                 // Grow the pool if needed.
                 final FutureResult<Connection> future = holder.getWaitingFuture();
-                if (!future.isDone() && currentPoolSize.tryAcquire()) {
+                if (!future.isDone() && availableConnections.tryAcquire()) {
                     factory.getConnectionAsync(connectionResultHandler);
                 }
                 return future;
@@ -681,10 +779,10 @@
     @Override
     public String toString() {
         final StringBuilder builder = new StringBuilder();
-        builder.append("FixedConnectionPool(");
+        builder.append("CachedConnectionPool(");
         builder.append(String.valueOf(factory));
         builder.append(',');
-        builder.append(poolSize);
+        builder.append(maxPoolSize);
         builder.append(')');
         return builder.toString();
     }
@@ -699,15 +797,25 @@
         close();
     }
 
-    private void closeConnection(final Connection connection) {
-        // The connection will be closed, so decrease the pool size.
-        currentPoolSize.release();
-        connection.close();
+    // Package private for unit testing.
+    int currentPoolSize() {
+        return maxPoolSize - availableConnections.availablePermits();
+    }
 
-        if (DEBUG_LOG.isLoggable(Level.FINE)) {
-            DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: "
-                    + "currentPoolSize=%d, poolSize=%d", poolSize
-                    - currentPoolSize.availablePermits(), poolSize));
+    /*
+     * This method delegates to System.currentTimeMillis() except in unit tests
+     * where we use injected times.
+     */
+    private long currentTimeMillis() {
+        if (testTimeSource == null) {
+            return System.currentTimeMillis();
+        } else {
+            try {
+                return testTimeSource.call();
+            } catch (final Exception e) {
+                // Should not happen.
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -727,21 +835,28 @@
             if (hasWaitingFutures()) {
                 connectionPoolIsClosing = isClosed;
                 holder = queue.removeFirst();
+            } else if (isClosed) {
+                connectionPoolIsClosing = true;
+                holder = null;
             } else {
-                if (isClosed) {
-                    connectionPoolIsClosing = true;
-                    holder = null;
-                } else {
-                    holder = new QueueElement(connection);
-                    queue.add(holder);
-                    return;
-                }
+                holder = new QueueElement(connection, currentTimeMillis());
+                queue.add(holder);
+                return;
             }
         }
 
         // There was waiting future, so complete it.
         if (connectionPoolIsClosing) {
-            closeConnection(connection);
+            // The connection will be closed, so decrease the pool size.
+            availableConnections.release();
+            connection.close();
+
+            if (DEBUG_LOG.isLoggable(Level.FINE)) {
+                DEBUG_LOG.fine(String.format(
+                        "Closing connection because connection pool is closing: "
+                                + "availableConnections=%d, maxPoolSize=%d", currentPoolSize(),
+                        maxPoolSize));
+            }
 
             if (holder != null) {
                 final ErrorResultException e =
@@ -751,9 +866,8 @@
 
                 if (DEBUG_LOG.isLoggable(Level.FINE)) {
                     DEBUG_LOG.fine(String.format(
-                            "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e
-                                    .getMessage(), poolSize - currentPoolSize.availablePermits(),
-                            poolSize));
+                            "Connection attempt failed: %s, availableConnections=%d, poolSize=%d",
+                            e.getMessage(), currentPoolSize(), maxPoolSize));
                 }
             }
         } else {

--
Gitblit v1.10.0