From ec2e88cf45f3d212f3634a509c7e4d87b2081508 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 13 Sep 2013 15:09:02 +0000
Subject: [PATCH] Backport fix for OPENDJ-1091: Implement a cached connection pool
---
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java | 232 +++++++++++++++++++------
opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java | 4
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java | 9
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java | 145 +++++++++++++++
opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java | 8
opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java | 102 +++++++++++
6 files changed, 427 insertions(+), 73 deletions(-)
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
index ac95d2a..87f212b 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/Proxy.java
@@ -451,14 +451,14 @@
final String remoteAddress = args[i];
final int remotePort = Integer.parseInt(args[i + 1]);
- factories.add(Connections.newFixedConnectionPool(Connections
+ factories.add(Connections.newCachedConnectionPool(Connections
.newAuthenticatedConnectionFactory(Connections
.newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
remotePort)), Requests.newSimpleBindRequest(proxyDN,
- proxyPassword.toCharArray())), Integer.MAX_VALUE));
- bindFactories.add(Connections.newFixedConnectionPool(Connections
+ proxyPassword.toCharArray()))));
+ bindFactories.add(Connections.newCachedConnectionPool(Connections
.newHeartBeatConnectionFactory(new LDAPConnectionFactory(remoteAddress,
- remotePort)), Integer.MAX_VALUE));
+ remotePort))));
}
final RoundRobinLoadBalancingAlgorithm algorithm =
new RoundRobinLoadBalancingAlgorithm(factories);
diff --git a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
index 454aa0f..7852a3b2 100644
--- a/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
+++ b/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/RewriterProxy.java
@@ -665,15 +665,14 @@
// Create connection factories.
final ConnectionFactory factory =
- Connections.newFixedConnectionPool(
+ Connections.newCachedConnectionPool(
Connections.newAuthenticatedConnectionFactory(
new LDAPConnectionFactory(remoteAddress, remotePort),
Requests.newSimpleBindRequest(
- proxyDN, proxyPassword.toCharArray())),
- Integer.MAX_VALUE);
+ proxyDN, proxyPassword.toCharArray())));
final ConnectionFactory bindFactory =
- Connections.newFixedConnectionPool(new LDAPConnectionFactory(
- remoteAddress, remotePort), Integer.MAX_VALUE);
+ Connections.newCachedConnectionPool(new LDAPConnectionFactory(
+ remoteAddress, remotePort));
// Create a server connection adapter.
final ProxyBackend backend = new ProxyBackend(factory, bindFactory);
diff --git a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
similarity index 76%
rename from opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
rename to opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
index 5225af8..48511a7 100644
--- a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -27,16 +27,20 @@
package org.forgerock.opendj.ldap;
-import static com.forgerock.opendj.util.StaticUtils.*;
-
-import static org.forgerock.opendj.ldap.CoreMessages.*;
-import static org.forgerock.opendj.ldap.ErrorResultException.*;
+import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
+import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
+import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
+import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@@ -62,13 +66,15 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
- * A simple connection pool implementation which maintains a fixed number of
- * connections.
+ * A connection pool implementation which maintains a cache of pooled
+ * connections with a configurable core pool size, maximum size, and expiration
+ * policy.
*/
-final class FixedConnectionPool implements ConnectionPool {
+final class CachedConnectionPool implements ConnectionPool {
/**
* This result handler is invoked when an attempt to add a new connection to
@@ -79,13 +85,12 @@
@Override
public void handleErrorResult(final ErrorResultException error) {
// Connection attempt failed, so decrease the pool size.
- currentPoolSize.release();
+ availableConnections.release();
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error
- .getMessage(), poolSize - currentPoolSize.availablePermits(),
- poolSize));
+ "Connection attempt failed: %s, availableConnections=%d, maxPoolSize=%d",
+ error.getMessage(), currentPoolSize(), maxPoolSize));
}
QueueElement holder;
@@ -106,8 +111,8 @@
public void handleResult(final Connection connection) {
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection attempt succeeded: currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ "Connection attempt succeeded: availableConnections=%d, maxPoolSize=%d",
+ currentPoolSize(), maxPoolSize));
}
publishConnection(connection);
}
@@ -191,7 +196,7 @@
}
@Override
- public Result applyChange(ChangeRecord request) throws ErrorResultException {
+ public Result applyChange(final ChangeRecord request) throws ErrorResultException {
return checkState().applyChange(request);
}
@@ -249,15 +254,15 @@
* server, but the server may still be available. In order to
* avoid leaving pending futures hanging indefinitely, we should
* try to reconnect immediately. No need to release/acquire
- * currentPoolSize.
+ * availableConnections.
*/
connection.close();
factory.getConnectionAsync(connectionResultHandler);
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
+ currentPoolSize(), maxPoolSize));
}
}
@@ -527,7 +532,54 @@
}
return connection;
}
+ }
+ /**
+ * Scheduled task responsible for purging non-core pooled connections which
+ * have been idle for longer than the idle timeout limit.
+ */
+ private final class PurgeIdleConnectionsTask implements Runnable {
+ @Override
+ public void run() {
+ final List<Connection> idleConnections;
+ synchronized (queue) {
+ if (isClosed) {
+ return;
+ }
+
+ /*
+ * Obtain a list of expired connections but don't close them yet
+ * since we don't want to hold the lock too long.
+ */
+ idleConnections = new LinkedList<Connection>();
+ final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis;
+ int nonCoreConnectionCount = currentPoolSize() - corePoolSize;
+ for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0
+ && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) {
+ idleConnections.add(holder.getWaitingConnection());
+ queue.poll();
+ availableConnections.release();
+ nonCoreConnectionCount--;
+ }
+ }
+
+ // Close the idle connections.
+ if (!idleConnections.isEmpty()) {
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format("Closing %d idle pooled connections: "
+ + "availableConnections=%d, maxPoolSize=%d", idleConnections.size(),
+ currentPoolSize(), maxPoolSize));
+ }
+ for (final Connection connection : idleConnections) {
+ connection.close();
+ }
+ }
+ }
+
+ private boolean isTimedOutQueuedConnection(final QueueElement holder,
+ final long timeoutMillis) {
+ return holder != null && !holder.isWaitingFuture() && holder.hasTimedOut(timeoutMillis);
+ }
}
/**
@@ -536,16 +588,19 @@
* connection request.
*/
private static final class QueueElement {
+ private final long timestampMillis;
private final Object value;
- QueueElement(final Connection connection) {
+ QueueElement(final Connection connection, final long timestampMillis) {
this.value = connection;
+ this.timestampMillis = timestampMillis;
}
- QueueElement(final ResultHandler<? super Connection> handler) {
+ QueueElement(final ResultHandler<? super Connection> handler, final long timestampMillis) {
this.value =
new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
handler);
+ this.timestampMillis = timestampMillis;
}
@Override
@@ -566,22 +621,61 @@
return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value;
}
+ boolean hasTimedOut(final long timeLimitMillis) {
+ return timestampMillis < timeLimitMillis;
+ }
+
boolean isWaitingFuture() {
return value instanceof AsynchronousFutureResult;
}
}
+ /**
+ * This is intended for unit testing only in order to inject fake time
+ * stamps. Use System.currentTimeMillis() when null.
+ */
+ Callable<Long> testTimeSource = null;
+
+ private final Semaphore availableConnections;
private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
- private final Semaphore currentPoolSize;
+ private final int corePoolSize;
+
private final ConnectionFactory factory;
private boolean isClosed = false;
- private final int poolSize;
+ private final ScheduledFuture<?> idleTimeoutFuture;
+ private final long idleTimeoutMillis;
+ private final int maxPoolSize;
private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+ private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
- FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
+ CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
+ final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
+ final ScheduledExecutorService scheduler) {
+ Validator.ensureNotNull(factory);
+ Validator.ensureTrue(corePoolSize >= 0, "corePoolSize < 0");
+ Validator.ensureTrue(maximumPoolSize > 0, "maxPoolSize <= 0");
+ Validator.ensureTrue(corePoolSize <= maximumPoolSize, "corePoolSize > maxPoolSize");
+ Validator.ensureTrue(idleTimeout >= 0, "idleTimeout < 0");
+ Validator.ensureTrue(idleTimeout == 0 || unit != null, "time unit is null");
+
this.factory = factory;
- this.poolSize = poolSize;
- this.currentPoolSize = new Semaphore(poolSize);
+ this.corePoolSize = corePoolSize;
+ this.maxPoolSize = maximumPoolSize;
+ this.availableConnections = new Semaphore(maximumPoolSize);
+
+ if (corePoolSize < maximumPoolSize && idleTimeout > 0) {
+ // Dynamic pool.
+ this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
+ this.idleTimeoutMillis = unit.toMillis(idleTimeout);
+ this.idleTimeoutFuture =
+ this.scheduler.get().scheduleWithFixedDelay(new PurgeIdleConnectionsTask(),
+ idleTimeout, idleTimeout, unit);
+ } else {
+ // Fixed pool.
+ this.scheduler = null;
+ this.idleTimeoutMillis = 0;
+ this.idleTimeoutFuture = null;
+ }
}
@Override
@@ -601,18 +695,24 @@
while (hasWaitingConnections()) {
final QueueElement holder = queue.removeFirst();
idleConnections.add(holder.getWaitingConnection());
+ availableConnections.release();
}
}
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ "Connection pool is closing: availableConnections=%d, maxPoolSize=%d",
+ currentPoolSize(), maxPoolSize));
}
- // Close the idle connections.
+ if (idleTimeoutFuture != null) {
+ idleTimeoutFuture.cancel(false);
+ scheduler.release();
+ }
+
+ // Close all idle connections.
for (final Connection connection : idleConnections) {
- closeConnection(connection);
+ connection.close();
}
// Close the underlying factory.
@@ -636,13 +736,11 @@
final QueueElement holder;
synchronized (queue) {
if (isClosed) {
- throw new IllegalStateException("FixedConnectionPool is already closed");
- }
-
- if (hasWaitingConnections()) {
+ throw new IllegalStateException("CachedConnectionPool is already closed");
+ } else if (hasWaitingConnections()) {
holder = queue.removeFirst();
} else {
- holder = new QueueElement(handler);
+ holder = new QueueElement(handler, currentTimeMillis());
queue.add(holder);
}
}
@@ -659,18 +757,18 @@
} else {
// Close the stale connection and try again.
connection.close();
- currentPoolSize.release();
+ availableConnections.release();
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection no longer valid: currentPoolSize=%d, poolSize=%d",
- poolSize - currentPoolSize.availablePermits(), poolSize));
+ "Connection no longer valid: availableConnections=%d, poolSize=%d",
+ currentPoolSize(), maxPoolSize));
}
}
} else {
// Grow the pool if needed.
final FutureResult<Connection> future = holder.getWaitingFuture();
- if (!future.isDone() && currentPoolSize.tryAcquire()) {
+ if (!future.isDone() && availableConnections.tryAcquire()) {
factory.getConnectionAsync(connectionResultHandler);
}
return future;
@@ -681,10 +779,10 @@
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
- builder.append("FixedConnectionPool(");
+ builder.append("CachedConnectionPool(");
builder.append(String.valueOf(factory));
builder.append(',');
- builder.append(poolSize);
+ builder.append(maxPoolSize);
builder.append(')');
return builder.toString();
}
@@ -699,15 +797,25 @@
close();
}
- private void closeConnection(final Connection connection) {
- // The connection will be closed, so decrease the pool size.
- currentPoolSize.release();
- connection.close();
+ // Package private for unit testing.
+ int currentPoolSize() {
+ return maxPoolSize - availableConnections.availablePermits();
+ }
- if (DEBUG_LOG.isLoggable(Level.FINE)) {
- DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: "
- + "currentPoolSize=%d, poolSize=%d", poolSize
- - currentPoolSize.availablePermits(), poolSize));
+ /*
+ * This method delegates to System.currentTimeMillis() except in unit tests
+ * where we use injected times.
+ */
+ private long currentTimeMillis() {
+ if (testTimeSource == null) {
+ return System.currentTimeMillis();
+ } else {
+ try {
+ return testTimeSource.call();
+ } catch (final Exception e) {
+ // Should not happen.
+ throw new RuntimeException(e);
+ }
}
}
@@ -727,21 +835,28 @@
if (hasWaitingFutures()) {
connectionPoolIsClosing = isClosed;
holder = queue.removeFirst();
+ } else if (isClosed) {
+ connectionPoolIsClosing = true;
+ holder = null;
} else {
- if (isClosed) {
- connectionPoolIsClosing = true;
- holder = null;
- } else {
- holder = new QueueElement(connection);
- queue.add(holder);
- return;
- }
+ holder = new QueueElement(connection, currentTimeMillis());
+ queue.add(holder);
+ return;
}
}
// There was waiting future, so complete it.
if (connectionPoolIsClosing) {
- closeConnection(connection);
+ // The connection will be closed, so decrease the pool size.
+ availableConnections.release();
+ connection.close();
+
+ if (DEBUG_LOG.isLoggable(Level.FINE)) {
+ DEBUG_LOG.fine(String.format(
+ "Closing connection because connection pool is closing: "
+ + "availableConnections=%d, maxPoolSize=%d", currentPoolSize(),
+ maxPoolSize));
+ }
if (holder != null) {
final ErrorResultException e =
@@ -751,9 +866,8 @@
if (DEBUG_LOG.isLoggable(Level.FINE)) {
DEBUG_LOG.fine(String.format(
- "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e
- .getMessage(), poolSize - currentPoolSize.availablePermits(),
- poolSize));
+ "Connection attempt failed: %s, availableConnections=%d, poolSize=%d",
+ e.getMessage(), currentPoolSize(), maxPoolSize));
}
}
} else {
diff --git a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index 42f31b2..985c887 100644
--- a/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -73,9 +73,150 @@
}
/**
+ * Creates a new connection pool which creates new connections as needed
+ * using the provided connection factory, but will reuse previously
+ * allocated connections when they are available.
+ * <p>
+ * Connections which have not been used for sixty seconds are closed and
+ * removed from the pool. Thus, a pool that remains idle for long enough
+ * will not contain any cached connections.
+ * <p>
+ * Connections obtained from the connection pool are guaranteed to be valid
+ * immediately before being returned to the calling application. More
+ * specifically, connections which have remained idle in the connection pool
+ * for a long time and which have been remotely closed due to a time out
+ * will never be returned. However, once a pooled connection has been
+ * obtained it is the responsibility of the calling application to handle
+ * subsequent connection failures, these being signaled via a
+ * {@link ConnectionException}.
+ *
+ * @param factory
+ * The connection factory to use for creating new connections.
+ * @return The new connection pool.
+ * @throws NullPointerException
+ * If {@code factory} was {@code null}.
+ */
+ public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory) {
+ return new CachedConnectionPool(factory, 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, null);
+ }
+
+ /**
+ * Creates a new connection pool which creates new connections as needed
+ * using the provided connection factory, but will reuse previously
+ * allocated connections when they are available.
+ * <p>
+ * Attempts to use more than {@code maximumPoolSize} connections at once
+ * will block until a connection is released back to the pool. In other
+ * words, this pool will prevent applications from using more than
+ * {@code maximumPoolSize} connections at the same time.
+ * <p>
+ * Connections which have not been used for the provided {@code idleTimeout}
+ * period are closed and removed from the pool, until there are only
+ * {@code corePoolSize} connections remaining.
+ * <p>
+ * Connections obtained from the connection pool are guaranteed to be valid
+ * immediately before being returned to the calling application. More
+ * specifically, connections which have remained idle in the connection pool
+ * for a long time and which have been remotely closed due to a time out
+ * will never be returned. However, once a pooled connection has been
+ * obtained it is the responsibility of the calling application to handle
+ * subsequent connection failures, these being signaled via a
+ * {@link ConnectionException}.
+ *
+ * @param factory
+ * The connection factory to use for creating new connections.
+ * @param corePoolSize
+ * The minimum number of connections to keep in the pool, even if
+ * they are idle.
+ * @param maximumPoolSize
+ * The maximum number of connections to allow in the pool.
+ * @param idleTimeout
+ * The time out period, after which unused non-core connections
+ * will be closed.
+ * @param unit
+ * The time unit for the {@code keepAliveTime} argument.
+ * @return The new connection pool.
+ * @throws IllegalArgumentException
+ * If {@code corePoolSize}, {@code maximumPoolSize} are less
+ * than or equal to zero, or if {@code idleTimeout} is negative,
+ * or if {@code corePoolSize} is greater than
+ * {@code maximumPoolSize}, or if {@code idleTimeout} is
+ * non-zero and {@code unit} is {@code null}.
+ * @throws NullPointerException
+ * If {@code factory} was {@code null}.
+ */
+ public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory,
+ final int corePoolSize, final int maximumPoolSize, final long idleTimeout,
+ final TimeUnit unit) {
+ return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit,
+ null);
+ }
+
+ /**
+ * Creates a new connection pool which creates new connections as needed
+ * using the provided connection factory, but will reuse previously
+ * allocated connections when they are available.
+ * <p>
+ * Attempts to use more than {@code maximumPoolSize} connections at once
+ * will block until a connection is released back to the pool. In other
+ * words, this pool will prevent applications from using more than
+ * {@code maximumPoolSize} connections at the same time.
+ * <p>
+ * Connections which have not been used for the provided {@code idleTimeout}
+ * period are closed and removed from the pool, until there are only
+ * {@code corePoolSize} connections remaining.
+ * <p>
+ * Connections obtained from the connection pool are guaranteed to be valid
+ * immediately before being returned to the calling application. More
+ * specifically, connections which have remained idle in the connection pool
+ * for a long time and which have been remotely closed due to a time out
+ * will never be returned. However, once a pooled connection has been
+ * obtained it is the responsibility of the calling application to handle
+ * subsequent connection failures, these being signaled via a
+ * {@link ConnectionException}.
+ *
+ * @param factory
+ * The connection factory to use for creating new connections.
+ * @param corePoolSize
+ * The minimum number of connections to keep in the pool, even if
+ * they are idle.
+ * @param maximumPoolSize
+ * The maximum number of connections to allow in the pool.
+ * @param idleTimeout
+ * The time out period, after which unused non-core connections
+ * will be closed.
+ * @param unit
+ * The time unit for the {@code keepAliveTime} argument.
+ * @param scheduler
+ * The scheduler which should be used for periodically checking
+ * for idle connections, or {@code null} if the default scheduler
+ * should be used.
+ * @return The new connection pool.
+ * @throws IllegalArgumentException
+ * If {@code corePoolSize}, {@code maximumPoolSize} are less
+ * than or equal to zero, or if {@code idleTimeout} is negative,
+ * or if {@code corePoolSize} is greater than
+ * {@code maximumPoolSize}, or if {@code idleTimeout} is
+ * non-zero and {@code unit} is {@code null}.
+ * @throws NullPointerException
+ * If {@code factory} was {@code null}.
+ */
+ public static ConnectionPool newCachedConnectionPool(final ConnectionFactory factory,
+ final int corePoolSize, final int maximumPoolSize, final long idleTimeout,
+ final TimeUnit unit, final ScheduledExecutorService scheduler) {
+ return new CachedConnectionPool(factory, corePoolSize, maximumPoolSize, idleTimeout, unit,
+ scheduler);
+ }
+
+ /**
* Creates a new connection pool which will maintain {@code poolSize}
* connections created using the provided connection factory.
* <p>
+ * Attempts to use more than {@code poolSize} connections at once will block
+ * until a connection is released back to the pool. In other words, this
+ * pool will prevent applications from using more than {@code poolSize}
+ * connections at the same time.
+ * <p>
* Connections obtained from the connection pool are guaranteed to be valid
* immediately before being returned to the calling application. More
* specifically, connections which have remained idle in the connection pool
@@ -97,9 +238,7 @@
*/
public static ConnectionPool newFixedConnectionPool(final ConnectionFactory factory,
final int poolSize) {
- Validator.ensureNotNull(factory);
- Validator.ensureTrue(poolSize >= 0, "negative pool size");
- return new FixedConnectionPool(factory, poolSize);
+ return new CachedConnectionPool(factory, poolSize, poolSize, 0L, null, null);
}
/**
diff --git a/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java b/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
index 226d8e7..83348e3 100644
--- a/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
+++ b/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionPoolTestCase.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS.
+ * Portions copyright 2011-2013 ForgeRock AS.
*/
package org.forgerock.opendj.ldap;
@@ -44,6 +44,8 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import org.forgerock.opendj.ldap.requests.BindRequest;
import org.forgerock.opendj.ldap.requests.Requests;
@@ -414,4 +416,102 @@
pool.close();
}
+ /**
+ * Verifies that a pool with connection keep alive correctly purges idle
+ * connections after the keepalive period has expired.
+ *
+ * @throws Exception
+ * If an unexpected error occurred.
+ */
+ @Test
+ public void testConnectionKeepAliveExpiration() throws Exception {
+ final Connection pooledConnection1 = mock(Connection.class, "pooledConnection1");
+ final Connection pooledConnection2 = mock(Connection.class, "pooledConnection2");
+ final Connection pooledConnection3 = mock(Connection.class, "pooledConnection3");
+ final Connection pooledConnection4 = mock(Connection.class, "pooledConnection4");
+ final Connection pooledConnection5 = mock(Connection.class, "pooledConnection5");
+ final Connection pooledConnection6 = mock(Connection.class, "pooledConnection6");
+
+ when(pooledConnection1.isValid()).thenReturn(true);
+ when(pooledConnection2.isValid()).thenReturn(true);
+ when(pooledConnection3.isValid()).thenReturn(true);
+ when(pooledConnection4.isValid()).thenReturn(true);
+ when(pooledConnection5.isValid()).thenReturn(true);
+ when(pooledConnection6.isValid()).thenReturn(true);
+
+ final ConnectionFactory factory =
+ mockConnectionFactory(pooledConnection1, pooledConnection2, pooledConnection3,
+ pooledConnection4, pooledConnection5, pooledConnection6);
+ final MockScheduler scheduler = new MockScheduler();
+ final CachedConnectionPool pool =
+ new CachedConnectionPool(factory, 2, 4, 100, TimeUnit.MILLISECONDS, scheduler);
+ assertThat(scheduler.isScheduled()).isTrue();
+
+ // First populate the pool with idle connections at time 0.
+ @SuppressWarnings("unchecked")
+ final Callable<Long> timeSource = mock(Callable.class);
+ when(timeSource.call()).thenReturn(0L);
+ pool.testTimeSource = timeSource;
+
+ assertThat(pool.currentPoolSize()).isEqualTo(0);
+ Connection c1 = pool.getConnection();
+ Connection c2 = pool.getConnection();
+ Connection c3 = pool.getConnection();
+ Connection c4 = pool.getConnection();
+ assertThat(pool.currentPoolSize()).isEqualTo(4);
+ c1.close();
+ c2.close();
+ c3.close();
+ c4.close();
+ assertThat(pool.currentPoolSize()).isEqualTo(4);
+
+ // First purge at time 50 is no-op because no connections have expired.
+ when(timeSource.call()).thenReturn(50L);
+ scheduler.getCommand().run();
+ assertThat(pool.currentPoolSize()).isEqualTo(4);
+
+ // Second purge at time 150 should remove 2 non-core connections.
+ when(timeSource.call()).thenReturn(150L);
+ scheduler.getCommand().run();
+ assertThat(pool.currentPoolSize()).isEqualTo(2);
+
+ verify(pooledConnection1, times(1)).close();
+ verify(pooledConnection2, times(1)).close();
+ verify(pooledConnection3, times(0)).close();
+ verify(pooledConnection4, times(0)).close();
+
+ // Regrow the pool at time 200.
+ when(timeSource.call()).thenReturn(200L);
+ Connection c5 = pool.getConnection(); // pooledConnection3
+ Connection c6 = pool.getConnection(); // pooledConnection4
+ Connection c7 = pool.getConnection(); // pooledConnection5
+ Connection c8 = pool.getConnection(); // pooledConnection6
+ assertThat(pool.currentPoolSize()).isEqualTo(4);
+ c5.close();
+ c6.close();
+ c7.close();
+ c8.close();
+ assertThat(pool.currentPoolSize()).isEqualTo(4);
+
+ // Third purge at time 250 should not remove any connections.
+ when(timeSource.call()).thenReturn(250L);
+ scheduler.getCommand().run();
+ assertThat(pool.currentPoolSize()).isEqualTo(4);
+
+ // Fourth purge at time 350 should remove 2 non-core connections.
+ when(timeSource.call()).thenReturn(350L);
+ scheduler.getCommand().run();
+ assertThat(pool.currentPoolSize()).isEqualTo(2);
+
+ verify(pooledConnection3, times(1)).close();
+ verify(pooledConnection4, times(1)).close();
+ verify(pooledConnection5, times(0)).close();
+ verify(pooledConnection6, times(0)).close();
+
+ pool.close();
+ verify(pooledConnection5, times(1)).close();
+ verify(pooledConnection6, times(1)).close();
+ assertThat(scheduler.isScheduled()).isFalse();
+ }
+
}
diff --git a/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java b/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
index b9cbbe1..7f84a81 100644
--- a/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
+++ b/opendj-rest2ldap/src/main/java/org/forgerock/opendj/rest2ldap/Rest2LDAP.java
@@ -1106,7 +1106,9 @@
factory =
Connections.newHeartBeatConnectionFactory(factory,
heartBeatIntervalSeconds, TimeUnit.SECONDS);
- factory = Connections.newFixedConnectionPool(factory, connectionPoolSize);
+ factory =
+ Connections.newCachedConnectionPool(factory, 0, connectionPoolSize, 60L,
+ TimeUnit.SECONDS);
}
servers.add(factory);
}
--
Gitblit v1.10.0