| File was renamed from opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java |
| | |
| | | |
| | | package org.forgerock.opendj.ldap; |
| | | |
| | | import static com.forgerock.opendj.util.StaticUtils.*; |
| | | |
| | | import static org.forgerock.opendj.ldap.CoreMessages.*; |
| | | import static org.forgerock.opendj.ldap.ErrorResultException.*; |
| | | import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG; |
| | | import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER; |
| | | import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING; |
| | | import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult; |
| | | |
| | | import java.util.Collection; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.concurrent.Callable; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.logging.Level; |
| | | |
| | |
| | | |
| | | import com.forgerock.opendj.util.AsynchronousFutureResult; |
| | | import com.forgerock.opendj.util.CompletedFutureResult; |
| | | import com.forgerock.opendj.util.ReferenceCountedObject; |
| | | import com.forgerock.opendj.util.Validator; |
| | | |
| | | /** |
| | | * A simple connection pool implementation which maintains a fixed number of |
| | | * connections. |
| | | * A connection pool implementation which maintains a cache of pooled |
| | | * connections with a configurable core pool size, maximum size, and expiration |
| | | * policy. |
| | | */ |
| | | final class FixedConnectionPool implements ConnectionPool { |
| | | final class CachedConnectionPool implements ConnectionPool { |
| | | |
| | | /** |
| | | * This result handler is invoked when an attempt to add a new connection to |
| | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | // Connection attempt failed, so decrease the pool size. |
| | | currentPoolSize.release(); |
| | | availableConnections.release(); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error |
| | | .getMessage(), poolSize - currentPoolSize.availablePermits(), |
| | | poolSize)); |
| | | "Connection attempt failed: %s, availableConnections=%d, maxPoolSize=%d", |
| | | error.getMessage(), currentPoolSize(), maxPoolSize)); |
| | | } |
| | | |
| | | QueueElement holder; |
| | |
| | | public void handleResult(final Connection connection) { |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection attempt succeeded: currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | "Connection attempt succeeded: availableConnections=%d, maxPoolSize=%d", |
| | | currentPoolSize(), maxPoolSize)); |
| | | } |
| | | publishConnection(connection); |
| | | } |
| | |
| | | } |
| | | |
| | | @Override |
| | | public Result applyChange(ChangeRecord request) throws ErrorResultException { |
| | | public Result applyChange(final ChangeRecord request) throws ErrorResultException { |
| | | return checkState().applyChange(request); |
| | | } |
| | | |
| | |
| | | * server, but the server may still be available. In order to |
| | | * avoid leaving pending futures hanging indefinitely, we should |
| | | * try to reconnect immediately. No need to release/acquire |
| | | * currentPoolSize. |
| | | * availableConnections. |
| | | */ |
| | | connection.close(); |
| | | factory.getConnectionAsync(connectionResultHandler); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | "Connection no longer valid: availableConnections=%d, maxPoolSize=%d", |
| | | currentPoolSize(), maxPoolSize)); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | return connection; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Scheduled task responsible for purging non-core pooled connections which |
| | | * have been idle for longer than the idle timeout limit. |
| | | */ |
| | | private final class PurgeIdleConnectionsTask implements Runnable { |
| | | @Override |
| | | public void run() { |
| | | final List<Connection> idleConnections; |
| | | synchronized (queue) { |
| | | if (isClosed) { |
| | | return; |
| | | } |
| | | |
| | | /* |
| | | * Obtain a list of expired connections but don't close them yet |
| | | * since we don't want to hold the lock too long. |
| | | */ |
| | | idleConnections = new LinkedList<Connection>(); |
| | | final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis; |
| | | int nonCoreConnectionCount = currentPoolSize() - corePoolSize; |
| | | for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0 |
| | | && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) { |
| | | idleConnections.add(holder.getWaitingConnection()); |
| | | queue.poll(); |
| | | availableConnections.release(); |
| | | nonCoreConnectionCount--; |
| | | } |
| | | } |
| | | |
| | | // Close the idle connections. |
| | | if (!idleConnections.isEmpty()) { |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Closing %d idle pooled connections: " |
| | | + "availableConnections=%d, maxPoolSize=%d", idleConnections.size(), |
| | | currentPoolSize(), maxPoolSize)); |
| | | } |
| | | for (final Connection connection : idleConnections) { |
| | | connection.close(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private boolean isTimedOutQueuedConnection(final QueueElement holder, |
| | | final long timeoutMillis) { |
| | | return holder != null && !holder.isWaitingFuture() && holder.hasTimedOut(timeoutMillis); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * connection request. |
| | | */ |
| | | private static final class QueueElement { |
| | | private final long timestampMillis; |
| | | private final Object value; |
| | | |
| | | QueueElement(final Connection connection) { |
| | | QueueElement(final Connection connection, final long timestampMillis) { |
| | | this.value = connection; |
| | | this.timestampMillis = timestampMillis; |
| | | } |
| | | |
| | | QueueElement(final ResultHandler<? super Connection> handler) { |
| | | QueueElement(final ResultHandler<? super Connection> handler, final long timestampMillis) { |
| | | this.value = |
| | | new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>( |
| | | handler); |
| | | this.timestampMillis = timestampMillis; |
| | | } |
| | | |
| | | @Override |
| | |
| | | return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value; |
| | | } |
| | | |
| | | boolean hasTimedOut(final long timeLimitMillis) { |
| | | return timestampMillis < timeLimitMillis; |
| | | } |
| | | |
| | | boolean isWaitingFuture() { |
| | | return value instanceof AsynchronousFutureResult; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This is intended for unit testing only in order to inject fake time |
| | | * stamps. Use System.currentTimeMillis() when null. |
| | | */ |
| | | Callable<Long> testTimeSource = null; |
| | | |
| | | private final Semaphore availableConnections; |
| | | private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler(); |
| | | private final Semaphore currentPoolSize; |
| | | private final int corePoolSize; |
| | | |
| | | private final ConnectionFactory factory; |
| | | private boolean isClosed = false; |
| | | private final int poolSize; |
| | | private final ScheduledFuture<?> idleTimeoutFuture; |
| | | private final long idleTimeoutMillis; |
| | | private final int maxPoolSize; |
| | | private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); |
| | | private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; |
| | | |
| | | FixedConnectionPool(final ConnectionFactory factory, final int poolSize) { |
| | | CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize, |
| | | final int maximumPoolSize, final long idleTimeout, final TimeUnit unit, |
| | | final ScheduledExecutorService scheduler) { |
| | | Validator.ensureNotNull(factory); |
| | | Validator.ensureTrue(corePoolSize >= 0, "corePoolSize < 0"); |
| | | Validator.ensureTrue(maximumPoolSize > 0, "maxPoolSize <= 0"); |
| | | Validator.ensureTrue(corePoolSize <= maximumPoolSize, "corePoolSize > maxPoolSize"); |
| | | Validator.ensureTrue(idleTimeout >= 0, "idleTimeout < 0"); |
| | | Validator.ensureTrue(idleTimeout == 0 || unit != null, "time unit is null"); |
| | | |
| | | this.factory = factory; |
| | | this.poolSize = poolSize; |
| | | this.currentPoolSize = new Semaphore(poolSize); |
| | | this.corePoolSize = corePoolSize; |
| | | this.maxPoolSize = maximumPoolSize; |
| | | this.availableConnections = new Semaphore(maximumPoolSize); |
| | | |
| | | if (corePoolSize < maximumPoolSize && idleTimeout > 0) { |
| | | // Dynamic pool. |
| | | this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler); |
| | | this.idleTimeoutMillis = unit.toMillis(idleTimeout); |
| | | this.idleTimeoutFuture = |
| | | this.scheduler.get().scheduleWithFixedDelay(new PurgeIdleConnectionsTask(), |
| | | idleTimeout, idleTimeout, unit); |
| | | } else { |
| | | // Fixed pool. |
| | | this.scheduler = null; |
| | | this.idleTimeoutMillis = 0; |
| | | this.idleTimeoutFuture = null; |
| | | } |
| | | } |
| | | |
| | | @Override |
| | |
| | | while (hasWaitingConnections()) { |
| | | final QueueElement holder = queue.removeFirst(); |
| | | idleConnections.add(holder.getWaitingConnection()); |
| | | availableConnections.release(); |
| | | } |
| | | } |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | "Connection pool is closing: availableConnections=%d, maxPoolSize=%d", |
| | | currentPoolSize(), maxPoolSize)); |
| | | } |
| | | |
| | | // Close the idle connections. |
| | | if (idleTimeoutFuture != null) { |
| | | idleTimeoutFuture.cancel(false); |
| | | scheduler.release(); |
| | | } |
| | | |
| | | // Close all idle connections. |
| | | for (final Connection connection : idleConnections) { |
| | | closeConnection(connection); |
| | | connection.close(); |
| | | } |
| | | |
| | | // Close the underlying factory. |
| | |
| | | final QueueElement holder; |
| | | synchronized (queue) { |
| | | if (isClosed) { |
| | | throw new IllegalStateException("FixedConnectionPool is already closed"); |
| | | } |
| | | |
| | | if (hasWaitingConnections()) { |
| | | throw new IllegalStateException("CachedConnectionPool is already closed"); |
| | | } else if (hasWaitingConnections()) { |
| | | holder = queue.removeFirst(); |
| | | } else { |
| | | holder = new QueueElement(handler); |
| | | holder = new QueueElement(handler, currentTimeMillis()); |
| | | queue.add(holder); |
| | | } |
| | | } |
| | |
| | | } else { |
| | | // Close the stale connection and try again. |
| | | connection.close(); |
| | | currentPoolSize.release(); |
| | | availableConnections.release(); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection no longer valid: currentPoolSize=%d, poolSize=%d", |
| | | poolSize - currentPoolSize.availablePermits(), poolSize)); |
| | | "Connection no longer valid: availableConnections=%d, poolSize=%d", |
| | | currentPoolSize(), maxPoolSize)); |
| | | } |
| | | } |
| | | } else { |
| | | // Grow the pool if needed. |
| | | final FutureResult<Connection> future = holder.getWaitingFuture(); |
| | | if (!future.isDone() && currentPoolSize.tryAcquire()) { |
| | | if (!future.isDone() && availableConnections.tryAcquire()) { |
| | | factory.getConnectionAsync(connectionResultHandler); |
| | | } |
| | | return future; |
| | |
| | | @Override |
| | | public String toString() { |
| | | final StringBuilder builder = new StringBuilder(); |
| | | builder.append("FixedConnectionPool("); |
| | | builder.append("CachedConnectionPool("); |
| | | builder.append(String.valueOf(factory)); |
| | | builder.append(','); |
| | | builder.append(poolSize); |
| | | builder.append(maxPoolSize); |
| | | builder.append(')'); |
| | | return builder.toString(); |
| | | } |
| | |
| | | close(); |
| | | } |
| | | |
| | | private void closeConnection(final Connection connection) { |
| | | // The connection will be closed, so decrease the pool size. |
| | | currentPoolSize.release(); |
| | | connection.close(); |
| | | // Package private for unit testing. |
| | | int currentPoolSize() { |
| | | return maxPoolSize - availableConnections.availablePermits(); |
| | | } |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: " |
| | | + "currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | /* |
| | | * This method delegates to System.currentTimeMillis() except in unit tests |
| | | * where we use injected times. |
| | | */ |
| | | private long currentTimeMillis() { |
| | | if (testTimeSource == null) { |
| | | return System.currentTimeMillis(); |
| | | } else { |
| | | try { |
| | | return testTimeSource.call(); |
| | | } catch (final Exception e) { |
| | | // Should not happen. |
| | | throw new RuntimeException(e); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | if (hasWaitingFutures()) { |
| | | connectionPoolIsClosing = isClosed; |
| | | holder = queue.removeFirst(); |
| | | } else if (isClosed) { |
| | | connectionPoolIsClosing = true; |
| | | holder = null; |
| | | } else { |
| | | if (isClosed) { |
| | | connectionPoolIsClosing = true; |
| | | holder = null; |
| | | } else { |
| | | holder = new QueueElement(connection); |
| | | queue.add(holder); |
| | | return; |
| | | } |
| | | holder = new QueueElement(connection, currentTimeMillis()); |
| | | queue.add(holder); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // There was waiting future, so complete it. |
| | | if (connectionPoolIsClosing) { |
| | | closeConnection(connection); |
| | | // The connection will be closed, so decrease the pool size. |
| | | availableConnections.release(); |
| | | connection.close(); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Closing connection because connection pool is closing: " |
| | | + "availableConnections=%d, maxPoolSize=%d", currentPoolSize(), |
| | | maxPoolSize)); |
| | | } |
| | | |
| | | if (holder != null) { |
| | | final ErrorResultException e = |
| | |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e |
| | | .getMessage(), poolSize - currentPoolSize.availablePermits(), |
| | | poolSize)); |
| | | "Connection attempt failed: %s, availableConnections=%d, poolSize=%d", |
| | | e.getMessage(), currentPoolSize(), maxPoolSize)); |
| | | } |
| | | } |
| | | } else { |