mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
07.08.2013 77c14ffd8232293dc8fb1a7446ddf2e69ca4b7ff
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
File was renamed from opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -27,16 +27,20 @@
package org.forgerock.opendj.ldap;
import static com.forgerock.opendj.util.StaticUtils.*;
import static org.forgerock.opendj.ldap.CoreMessages.*;
import static org.forgerock.opendj.ldap.ErrorResultException.*;
import static com.forgerock.opendj.util.StaticUtils.DEBUG_LOG;
import static com.forgerock.opendj.util.StaticUtils.DEFAULT_SCHEDULER;
import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
import static org.forgerock.opendj.ldap.ErrorResultException.newErrorResult;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@@ -62,13 +66,15 @@
import com.forgerock.opendj.util.AsynchronousFutureResult;
import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.ReferenceCountedObject;
import com.forgerock.opendj.util.Validator;
/**
 * A simple connection pool implementation which maintains a fixed number of
 * connections.
 * A connection pool implementation which maintains a cache of pooled
 * connections with a configurable core pool size, maximum size, and expiration
 * policy.
 */
final class FixedConnectionPool implements ConnectionPool {
final class CachedConnectionPool implements ConnectionPool {
    /**
     * This result handler is invoked when an attempt to add a new connection to
@@ -79,13 +85,12 @@
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            // Connection attempt failed, so decrease the pool size.
            currentPoolSize.release();
            availableConnections.release();
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format(
                        "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error
                                .getMessage(), poolSize - currentPoolSize.availablePermits(),
                        poolSize));
                        "Connection attempt failed: %s, availableConnections=%d, maxPoolSize=%d",
                        error.getMessage(), currentPoolSize(), maxPoolSize));
            }
            QueueElement holder;
@@ -106,8 +111,8 @@
        public void handleResult(final Connection connection) {
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format(
                        "Connection attempt succeeded:  currentPoolSize=%d, poolSize=%d", poolSize
                                - currentPoolSize.availablePermits(), poolSize));
                        "Connection attempt succeeded:  availableConnections=%d, maxPoolSize=%d",
                        currentPoolSize(), maxPoolSize));
            }
            publishConnection(connection);
        }
@@ -191,7 +196,7 @@
        }
        @Override
        public Result applyChange(ChangeRecord request) throws ErrorResultException {
        public Result applyChange(final ChangeRecord request) throws ErrorResultException {
            return checkState().applyChange(request);
        }
@@ -249,15 +254,15 @@
                 * server, but the server may still be available. In order to
                 * avoid leaving pending futures hanging indefinitely, we should
                 * try to reconnect immediately. No need to release/acquire
                 * currentPoolSize.
                 * availableConnections.
                 */
                connection.close();
                factory.getConnectionAsync(connectionResultHandler);
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format(
                            "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize
                                    - currentPoolSize.availablePermits(), poolSize));
                            "Connection no longer valid: availableConnections=%d, maxPoolSize=%d",
                            currentPoolSize(), maxPoolSize));
                }
            }
@@ -527,7 +532,54 @@
            }
            return connection;
        }
    }
    /**
     * Scheduled task responsible for purging non-core pooled connections which
     * have been idle for longer than the idle timeout limit.
     */
    private final class PurgeIdleConnectionsTask implements Runnable {
        @Override
        public void run() {
            final List<Connection> idleConnections;
            synchronized (queue) {
                if (isClosed) {
                    return;
                }
                /*
                 * Obtain a list of expired connections but don't close them yet
                 * since we don't want to hold the lock too long.
                 */
                idleConnections = new LinkedList<Connection>();
                final long timeoutMillis = currentTimeMillis() - idleTimeoutMillis;
                int nonCoreConnectionCount = currentPoolSize() - corePoolSize;
                for (QueueElement holder = queue.peek(); nonCoreConnectionCount > 0
                        && isTimedOutQueuedConnection(holder, timeoutMillis); holder = queue.peek()) {
                    idleConnections.add(holder.getWaitingConnection());
                    queue.poll();
                    availableConnections.release();
                    nonCoreConnectionCount--;
                }
            }
            // Close the idle connections.
            if (!idleConnections.isEmpty()) {
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format("Closing %d idle pooled connections: "
                            + "availableConnections=%d, maxPoolSize=%d", idleConnections.size(),
                            currentPoolSize(), maxPoolSize));
                }
                for (final Connection connection : idleConnections) {
                    connection.close();
                }
            }
        }
        private boolean isTimedOutQueuedConnection(final QueueElement holder,
                final long timeoutMillis) {
            return holder != null && !holder.isWaitingFuture() && holder.hasTimedOut(timeoutMillis);
        }
    }
    /**
@@ -536,16 +588,19 @@
     * connection request.
     */
    private static final class QueueElement {
        private final long timestampMillis;
        private final Object value;
        QueueElement(final Connection connection) {
        QueueElement(final Connection connection, final long timestampMillis) {
            this.value = connection;
            this.timestampMillis = timestampMillis;
        }
        QueueElement(final ResultHandler<? super Connection> handler) {
        QueueElement(final ResultHandler<? super Connection> handler, final long timestampMillis) {
            this.value =
                    new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(
                            handler);
            this.timestampMillis = timestampMillis;
        }
        @Override
@@ -566,22 +621,61 @@
            return (AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>) value;
        }
        boolean hasTimedOut(final long timeLimitMillis) {
            return timestampMillis < timeLimitMillis;
        }
        boolean isWaitingFuture() {
            return value instanceof AsynchronousFutureResult;
        }
    }
    /**
     * This is intended for unit testing only in order to inject fake time
     * stamps. Use System.currentTimeMillis() when null.
     */
    Callable<Long> testTimeSource = null;
    private final Semaphore availableConnections;
    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
    private final Semaphore currentPoolSize;
    private final int corePoolSize;
    private final ConnectionFactory factory;
    private boolean isClosed = false;
    private final int poolSize;
    private final ScheduledFuture<?> idleTimeoutFuture;
    private final long idleTimeoutMillis;
    private final int maxPoolSize;
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    FixedConnectionPool(final ConnectionFactory factory, final int poolSize) {
    CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
            final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
            final ScheduledExecutorService scheduler) {
        Validator.ensureNotNull(factory);
        Validator.ensureTrue(corePoolSize >= 0, "corePoolSize < 0");
        Validator.ensureTrue(maximumPoolSize > 0, "maxPoolSize <= 0");
        Validator.ensureTrue(corePoolSize <= maximumPoolSize, "corePoolSize > maxPoolSize");
        Validator.ensureTrue(idleTimeout >= 0, "idleTimeout < 0");
        Validator.ensureTrue(idleTimeout == 0 || unit != null, "time unit is null");
        this.factory = factory;
        this.poolSize = poolSize;
        this.currentPoolSize = new Semaphore(poolSize);
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maximumPoolSize;
        this.availableConnections = new Semaphore(maximumPoolSize);
        if (corePoolSize < maximumPoolSize && idleTimeout > 0) {
            // Dynamic pool.
            this.scheduler = DEFAULT_SCHEDULER.acquireIfNull(scheduler);
            this.idleTimeoutMillis = unit.toMillis(idleTimeout);
            this.idleTimeoutFuture =
                    this.scheduler.get().scheduleWithFixedDelay(new PurgeIdleConnectionsTask(),
                            idleTimeout, idleTimeout, unit);
        } else {
            // Fixed pool.
            this.scheduler = null;
            this.idleTimeoutMillis = 0;
            this.idleTimeoutFuture = null;
        }
    }
    @Override
@@ -601,18 +695,24 @@
            while (hasWaitingConnections()) {
                final QueueElement holder = queue.removeFirst();
                idleConnections.add(holder.getWaitingConnection());
                availableConnections.release();
            }
        }
        if (DEBUG_LOG.isLoggable(Level.FINE)) {
            DEBUG_LOG.fine(String.format(
                    "Connection pool is closing: currentPoolSize=%d, poolSize=%d", poolSize
                            - currentPoolSize.availablePermits(), poolSize));
                    "Connection pool is closing: availableConnections=%d, maxPoolSize=%d",
                    currentPoolSize(), maxPoolSize));
        }
        // Close the idle connections.
        if (idleTimeoutFuture != null) {
            idleTimeoutFuture.cancel(false);
            scheduler.release();
        }
        // Close all idle connections.
        for (final Connection connection : idleConnections) {
            closeConnection(connection);
            connection.close();
        }
        // Close the underlying factory.
@@ -636,13 +736,11 @@
            final QueueElement holder;
            synchronized (queue) {
                if (isClosed) {
                    throw new IllegalStateException("FixedConnectionPool is already closed");
                }
                if (hasWaitingConnections()) {
                    throw new IllegalStateException("CachedConnectionPool is already closed");
                } else if (hasWaitingConnections()) {
                    holder = queue.removeFirst();
                } else {
                    holder = new QueueElement(handler);
                    holder = new QueueElement(handler, currentTimeMillis());
                    queue.add(holder);
                }
            }
@@ -659,18 +757,18 @@
                } else {
                    // Close the stale connection and try again.
                    connection.close();
                    currentPoolSize.release();
                    availableConnections.release();
                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
                        DEBUG_LOG.fine(String.format(
                                "Connection no longer valid: currentPoolSize=%d, poolSize=%d",
                                poolSize - currentPoolSize.availablePermits(), poolSize));
                                "Connection no longer valid: availableConnections=%d, poolSize=%d",
                                currentPoolSize(), maxPoolSize));
                    }
                }
            } else {
                // Grow the pool if needed.
                final FutureResult<Connection> future = holder.getWaitingFuture();
                if (!future.isDone() && currentPoolSize.tryAcquire()) {
                if (!future.isDone() && availableConnections.tryAcquire()) {
                    factory.getConnectionAsync(connectionResultHandler);
                }
                return future;
@@ -681,10 +779,10 @@
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("FixedConnectionPool(");
        builder.append("CachedConnectionPool(");
        builder.append(String.valueOf(factory));
        builder.append(',');
        builder.append(poolSize);
        builder.append(maxPoolSize);
        builder.append(')');
        return builder.toString();
    }
@@ -699,15 +797,25 @@
        close();
    }
    private void closeConnection(final Connection connection) {
        // The connection will be closed, so decrease the pool size.
        currentPoolSize.release();
        connection.close();
    // Package private for unit testing.
    int currentPoolSize() {
        return maxPoolSize - availableConnections.availablePermits();
    }
        if (DEBUG_LOG.isLoggable(Level.FINE)) {
            DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: "
                    + "currentPoolSize=%d, poolSize=%d", poolSize
                    - currentPoolSize.availablePermits(), poolSize));
    /*
     * This method delegates to System.currentTimeMillis() except in unit tests
     * where we use injected times.
     */
    private long currentTimeMillis() {
        if (testTimeSource == null) {
            return System.currentTimeMillis();
        } else {
            try {
                return testTimeSource.call();
            } catch (final Exception e) {
                // Should not happen.
                throw new RuntimeException(e);
            }
        }
    }
@@ -727,21 +835,28 @@
            if (hasWaitingFutures()) {
                connectionPoolIsClosing = isClosed;
                holder = queue.removeFirst();
            } else if (isClosed) {
                connectionPoolIsClosing = true;
                holder = null;
            } else {
                if (isClosed) {
                    connectionPoolIsClosing = true;
                    holder = null;
                } else {
                    holder = new QueueElement(connection);
                    queue.add(holder);
                    return;
                }
                holder = new QueueElement(connection, currentTimeMillis());
                queue.add(holder);
                return;
            }
        }
        // There was waiting future, so complete it.
        if (connectionPoolIsClosing) {
            closeConnection(connection);
            // The connection will be closed, so decrease the pool size.
            availableConnections.release();
            connection.close();
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
                DEBUG_LOG.fine(String.format(
                        "Closing connection because connection pool is closing: "
                                + "availableConnections=%d, maxPoolSize=%d", currentPoolSize(),
                        maxPoolSize));
            }
            if (holder != null) {
                final ErrorResultException e =
@@ -751,9 +866,8 @@
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format(
                            "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e
                                    .getMessage(), poolSize - currentPoolSize.availablePermits(),
                            poolSize));
                            "Connection attempt failed: %s, availableConnections=%d, poolSize=%d",
                            e.getMessage(), currentPoolSize(), maxPoolSize));
                }
            }
        } else {