mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
22.07.2014 a2df03777d64a1cc2face1011a70effb0b98f133
opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/CachedConnectionPool.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
@@ -44,6 +44,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.requests.AbandonRequest;
@@ -88,6 +89,7 @@
        @Override
        public void handleErrorResult(final ErrorResultException error) {
            // Connection attempt failed, so decrease the pool size.
            pendingConnectionAttempts.decrementAndGet();
            availableConnections.release();
            if (DEBUG_LOG.isLoggable(Level.FINE)) {
@@ -96,18 +98,27 @@
                        error.getMessage(), currentPoolSize(), maxPoolSize));
            }
            QueueElement holder;
            /*
             * There may be many pending futures waiting for a connection
             * attempt to succeed. In some situations the number of pending
             * futures may exceed the pool size and the number of outstanding
             * connection attempts. If only one pending future is resolved per
             * failed connection attempt then some pending futures will be left
             * unresolved. Therefore, a failed connection attempt must fail all
             * pending futures, even if some of the subsequent connection
             * attempts succeed, which is unlikely (if one fails, then they are
             * all likely to fail).
             */
            final List<QueueElement> waitingFutures =
                    new LinkedList<CachedConnectionPool.QueueElement>();
            synchronized (queue) {
                if (hasWaitingFutures()) {
                    holder = queue.removeFirst();
                } else {
                    // No waiting futures.
                    return;
                while (hasWaitingFutures()) {
                    waitingFutures.add(queue.removeFirst());
                }
            }
            // There was waiting future, so close it.
            holder.getWaitingFuture().handleErrorResult(error);
            for (QueueElement waitingFuture : waitingFutures) {
                waitingFuture.getWaitingFuture().handleErrorResult(error);
            }
        }
        @Override
@@ -117,6 +128,7 @@
                        "Connection attempt succeeded:  availableConnections=%d, maxPoolSize=%d",
                        currentPoolSize(), maxPoolSize));
            }
            pendingConnectionAttempts.decrementAndGet();
            publishConnection(connection);
        }
    }
@@ -260,6 +272,7 @@
                 * availableConnections.
                 */
                connection.close();
                pendingConnectionAttempts.incrementAndGet();
                factory.getConnectionAsync(connectionResultHandler);
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
@@ -668,7 +681,6 @@
    private final Semaphore availableConnections;
    private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
    private final int corePoolSize;
    private final ConnectionFactory factory;
    private boolean isClosed = false;
    private final ScheduledFuture<?> idleTimeoutFuture;
@@ -677,6 +689,12 @@
    private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
    private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler;
    /**
     * The number of new connections which are in the process of being
     * established.
     */
    private final AtomicInteger pendingConnectionAttempts = new AtomicInteger();
    CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize,
            final int maximumPoolSize, final long idleTimeout, final TimeUnit unit,
            final ScheduledExecutorService scheduler) {
@@ -776,47 +794,58 @@
                }
            }
            if (!holder.isWaitingFuture()) {
                // There was a completed connection attempt.
                final Connection connection = holder.getWaitingConnection();
                if (connection.isValid()) {
                    final PooledConnection pooledConnection =
                            newPooledConnection(connection, getStackTraceIfDebugEnabled());
                    if (handler != null) {
                        handler.handleResult(pooledConnection);
                    }
                    return new CompletedFutureResult<Connection>(pooledConnection);
                } else {
                    // Close the stale connection and try again.
                    connection.close();
                    availableConnections.release();
                    if (DEBUG_LOG.isLoggable(Level.FINE)) {
                        DEBUG_LOG.fine(String.format(
                                "Connection no longer valid: availableConnections=%d, poolSize=%d",
                                currentPoolSize(), maxPoolSize));
                    }
                }
            } else {
            if (holder.isWaitingFuture()) {
                // Grow the pool if needed.
                final FutureResult<Connection> future = holder.getWaitingFuture();
                if (!future.isDone() && availableConnections.tryAcquire()) {
                    pendingConnectionAttempts.incrementAndGet();
                    factory.getConnectionAsync(connectionResultHandler);
                }
                return future;
            }
            // There was a completed connection attempt.
            final Connection connection = holder.getWaitingConnection();
            if (connection.isValid()) {
                final PooledConnection pooledConnection =
                        newPooledConnection(connection, getStackTraceIfDebugEnabled());
                if (handler != null) {
                    handler.handleResult(pooledConnection);
                }
                return new CompletedFutureResult<Connection>(pooledConnection);
            } else {
                // Close the stale connection and try again.
                connection.close();
                availableConnections.release();
                if (DEBUG_LOG.isLoggable(Level.FINE)) {
                    DEBUG_LOG.fine(String.format(
                            "Connection no longer valid: availableConnections=%d, poolSize=%d",
                            currentPoolSize(), maxPoolSize));
                }
            }
        }
    }
    @Override
    public String toString() {
        final StringBuilder builder = new StringBuilder();
        builder.append("CachedConnectionPool(");
        builder.append(String.valueOf(factory));
        builder.append(',');
        builder.append(maxPoolSize);
        builder.append(')');
        return builder.toString();
        final int size = currentPoolSize();
        final int pending = pendingConnectionAttempts.get();
        int in = 0;
        int blocked = 0;
        synchronized (queue) {
            for (QueueElement qe : queue) {
                if (qe.isWaitingFuture()) {
                    blocked++;
                } else {
                    in++;
                }
            }
        }
        final int out = size - in - pending;
        return String.format("CachedConnectionPool(size=%d[in:%d + out:%d + "
                + "pending:%d], maxSize=%d, blocked=%d, factory=%s)", size, in, out, pending,
                maxPoolSize, blocked, String.valueOf(factory));
    }
    /**