From 77c14ffd8232293dc8fb1a7446ddf2e69ca4b7ff Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Sat, 07 Sep 2013 00:08:17 +0000
Subject: [PATCH] Fix OPENDJ-1091: Implement a cached connection pool
---
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java | 232 +++++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 173 insertions(+), 59 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
similarity index 76%
rename from opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
rename to opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index 5225af8..48511a7 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -27,16 +27,20 @@
package org.forgerock.opendj.ldap;
-import static com.forgerock.opendj.util.StaticUtils.*;
-
-import static org.forgerock.opendj.ldap.CoreMessages.*;
-import static org.forgerock.opendj.ldap.ErrorResultException.*;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
+import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@@ -62,13 +66,15 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
- * A simple connection pool implementation which maintains a fixed number of
- * connections.
+ * A connection pool implementation which maintains a cache of pooled
+ * connections with a configurable core pool size, maximum size, and expiration
+ * policy.
*/
-final class FixedConnectionPool implements ConnectionPool {
+final class CachedConnectionPool implements ConnectionPool {
/**
* This result handler is invoked when an attempt to add a new connection to
@@ -79,13 +85,12 @@
@Override
public void handleErrorResult(final ErrorResultException error) {
// Connection attempt failed, so decrease the pool size.
- currentPoolSize.release();
+ availableConnections.release();
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error
- .getMessage(), poolSize - currentPoolSize.availablePermits(),
- poolSize));
+ "Connection attempt failed: %s, availableConnections=%d, maxPoolSize=%d",
+ error.getMessage(), currentPoolSize(), maxPoolSize));
}
QueueElement holder;
@@ -106,8 +111,8 @@
public void handleResult(final Connection connection) {
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection attempt succeeded: currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ "Connection attempt succeeded: availableConnections=%d, maxPoolSize=%d",
+ currentPoolSize(), maxPoolSize));
}
publishConnection(connection);
}
@@ -191,7 +196,7 @@
}
@Override
- public Result applyChange(ChangeRecord request) throws ErrorResultException {
+ public Result applyChange(final ChangeRecord request) throws ErrorResultException {
return checkState().applyChange(request);
}
@@ -249,15 +254,15 @@
* server, but the server may still be available. In order to
* avoid leaving pending futures hanging indefinitely, we should
* try to reconnect immediately. No need to release/acquire
- * currentPoolSize.
+ * availableConnections.
*/
connection.close();
factory.getConnectionAsync(connectionResultHandler);
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
+ currentPoolSize(), maxPoolSize));
}
}
@@ -527,7 +532,54 @@
}
return connection;
}
+ }
+ /**
+ * Scheduled task responsible for purging non-core pooled connections which
+ * have been idle for longer than the idle timeout limit.
+ */
+ private final class PurgeIdleConnectionsTask implements Runnable {
+ @Override
+ public void run() {
+ final List<Connection> idleConnections;
+ synchronized (queue) {
+ if (isClosed) {
+ return;
+ }
+
+ /*
+ * Obtain a list of expired connections but don't close them yet
+ * since we don't want to hold the lock too long.
+ */
+ idleConnections = new LinkedList<Connection>();
+ final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis;
+ int nonCoreConnectionCount = currentPoolSize() - corePoolSize;
+ for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0
+ && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) {
+ idleConnections.add(holder.getWaitingConnection());
+ queue.poll();
+ availableConnections.release();
+ nonCoreConnectionCount--;
+ }
+ }
+
+ // Close the idle connections.
+ if (!idleConnections.isEmpty()) {
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Closing %d idle pooled connections: "
+ + "availableConnections=%d, maxPoolSize=%d", idleConnections.size(),
+ currentPoolSize(), maxPoolSize));
+ }
+ for (final Connection connection : idleConnections) {
+ connection.close();
+ }
+ }
+ }
+
+ private boolean isTimedOutQueuedConnection(final QueueElement holder,
+ final long timeoutMillis) {
+ return holder != null && !holder.isWaitingFuture() && holder.hasTimedOut(timeoutMillis);
+ }
}
/**
@@ -536,16 +588,19 @@
* connection request.
*/
private static final class QueueElement {
+ private final long timestampMillis;
private final Object value;
- QueueElement(final Connection connection) {
+ QueueElement(final Connection connection, final long timestampMillis) {
this.value = connection;
+ this.timestampMillis = timestampMillis;
}
- QueueElement(final ResultHandler<? super Connection> handler) {
+ QueueElement(final ResultHandler<? super Connection> handler, final long timestampMillis) {
this.value =
new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
handler);
+ this.timestampMillis = timestampMillis;
}
@Override
@@ -566,22 +621,61 @@
return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value;
}
+ boolean hasTimedOut(final long timeLimitMillis) {
+ return timestampMillis < timeLimitMillis;
+ }
+
boolean isWaitingFuture() {
return value instanceof AsynchronousFutureResult;
}
}
+ /**
+ * This is intended for unit testing only in order to inject fake time
+ * stamps. Use System.currentTimeMillis() when null.
+ */
+ Callable<Long> testTimeSource = null;
+
+ private final Semaphore availableConnections;
private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
- private final Semaphore currentPoolSize;
+ private final int corePoolSize;
+
private final ConnectionFactory factory;
private boolean isClosed = false;
- private final int poolSize;
+ private final ScheduledFuture<?> idleTimeoutFuture;
+ private final long idleTimeoutMillis;
+ private final int maxPoolSize;
private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
- FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
+ CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
+ final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
+ final ScheduledExecutorService scheduler) {
+ Validator.ensureNotNull(factory);
+ Validator.ensureTrue(corePoolSize >= 0, "corePoolSize < 0");
+ Validator.ensureTrue(maximumPoolSize > 0, "maxPoolSize <= 0");
+ Validator.ensureTrue(corePoolSize <= maximumPoolSize, "corePoolSize > maxPoolSize");
+ Validator.ensureTrue(idleTimeout >= 0, "idleTimeout < 0");
+ Validator.ensureTrue(idleTimeout == 0 || unit != null, "time unit is null");
+
this.factory = factory;
- this.poolSize = poolSize;
- this.currentPoolSize = new Semaphore(poolSize);
+ this.corePoolSize = corePoolSize;
+ this.maxPoolSize = maximumPoolSize;
+ this.availableConnections = new Semaphore(maximumPoolSize);
+
+ if (corePoolSize < maximumPoolSize && idleTimeout > 0) {
+ // Dynamic pool.
+ this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
+ this.idleTimeoutMillis = unit.toMillis(idleTimeout);
+ this.idleTimeoutFuture =
+ this.scheduler.get().scheduleWithFixedDelay(new PurgeIdleConnectionsTask(),
+ idleTimeout, idleTimeout, unit);
+ } else {
+ // Fixed pool.
+ this.scheduler = null;
+ this.idleTimeoutMillis = 0;
+ this.idleTimeoutFuture = null;
+ }
}
@Override
@@ -601,18 +695,24 @@
while (hasWaitingConnections()) {
final QueueElement holder = queue.removeFirst();
idleConnections.add(holder.getWaitingConnection());
+ availableConnections.release();
}
}
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ "Connection pool is closing: availableConnections=%d, maxPoolSize=%d",
+ currentPoolSize(), maxPoolSize));
}
- // Close the idle connections.
+ if (idleTimeoutFuture != null) {
+ idleTimeoutFuture.cancel(false);
+ scheduler.release();
+ }
+
+ // Close all idle connections.
for (final Connection connection : idleConnections) {
- closeConnection(connection);
+ connection.close();
}
// Close the underlying factory.
@@ -636,13 +736,11 @@
final QueueElement holder;
synchronized (queue) {
if (isClosed) {
- throw new IllegalStateException("FixedConnectionPool is already closed");
- }
-
- if (hasWaitingConnections()) {
+ throw new IllegalStateException("CachedConnectionPool is already closed");
+ } else if (hasWaitingConnections()) {
holder = queue.removeFirst();
} else {
- holder = new QueueElement(handler);
+ holder = new QueueElement(handler, currentTimeMillis());
queue.add(holder);
}
}
@@ -659,18 +757,18 @@
} else {
// Close the stale connection and try again.
connection.close();
- currentPoolSize.release();
+ availableConnections.release();
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection no longer valid: currentPoolSize=%d, poolSize=%d",
- poolSize - currentPoolSize.availablePermits(), poolSize));
+ "Connection no longer valid: availableConnections=%d, poolSize=%d",
+ currentPoolSize(), maxPoolSize));
}
}
} else {
// Grow the pool if needed.
final FutureResult<Connection> future = holder.getWaitingFuture();
- if (!future.isDone() && currentPoolSize.tryAcquire()) {
+ if (!future.isDone() && availableConnections.tryAcquire()) {
factory.getConnectionAsync(connectionResultHandler);
}
return future;
@@ -681,10 +779,10 @@
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
- builder.append("FixedConnectionPool(");
+ builder.append("CachedConnectionPool(");
builder.append(String.valueOf(factory));
builder.append(',');
- builder.append(poolSize);
+ builder.append(maxPoolSize);
builder.append(')');
return builder.toString();
}
@@ -699,15 +797,25 @@
close();
}
- private void closeConnection(final Connection connection) {
- // The connection will be closed, so decrease the pool size.
- currentPoolSize.release();
- connection.close();
+ // Package private for unit testing.
+ int currentPoolSize() {
+ return maxPoolSize - availableConnections.availablePermits();
+ }
- if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: "
- + "currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ /*
+ * This method delegates to System.currentTimeMillis() except in unit tests
+ * where we use injected times.
+ */
+ private long currentTimeMillis() {
+ if (testTimeSource == null) {
+ return System.currentTimeMillis();
+ } else {
+ try {
+ return testTimeSource.call();
+ } catch (final Exception e) {
+ // Should not happen.
+ throw new RuntimeException(e);
+ }
}
}
@@ -727,21 +835,28 @@
if (hasWaitingFutures()) {
connectionPoolIsClosing = isClosed;
holder = queue.removeFirst();
+ } else if (isClosed) {
+ connectionPoolIsClosing = true;
+ holder = null;
} else {
- if (isClosed) {
- connectionPoolIsClosing = true;
- holder = null;
- } else {
- holder = new QueueElement(connection);
- queue.add(holder);
- return;
- }
+ holder = new QueueElement(connection, currentTimeMillis());
+ queue.add(holder);
+ return;
}
}
// There was waiting future, so complete it.
if (connectionPoolIsClosing) {
- closeConnection(connection);
+ // The connection will be closed, so decrease the pool size.
+ availableConnections.release();
+ connection.close();
+
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format(
+ "Closing connection because connection pool is closing: "
+ + "availableConnections=%d, maxPoolSize=%d", currentPoolSize(),
+ maxPoolSize));
+ }
if (holder != null) {
final ErrorResultException e =
@@ -751,9 +866,8 @@
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e
- .getMessage(), poolSize - currentPoolSize.availablePermits(),
- poolSize));
+ "Connection attempt failed: %s, availableConnections=%d, poolSize=%d",
+ e.getMessage(), currentPoolSize(), maxPoolSize));
}
}
} else {
--
Gitblit v1.10.0