| | |
| | | * |
| | | * |
| | | * Copyright 2009-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | |
| | | package org.forgerock.opendj.ldap; |
| | |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.logging.Level; |
| | | |
| | | import org.forgerock.opendj.ldap.requests.AbandonRequest; |
| | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | // Connection attempt failed, so decrease the pool size. |
| | | pendingConnectionAttempts.decrementAndGet(); |
| | | availableConnections.release(); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | |
| | | error.getMessage(), currentPoolSize(), maxPoolSize)); |
| | | } |
| | | |
| | | QueueElement holder; |
| | | /* |
| | | * There may be many pending futures waiting for a connection |
| | | * attempt to succeed. In some situations the number of pending |
| | | * futures may exceed the pool size and the number of outstanding |
| | | * connection attempts. If only one pending future is resolved per |
| | | * failed connection attempt then some pending futures will be left |
| | | * unresolved. Therefore, a failed connection attempt must fail all |
| | | * pending futures, even if some of the subsequent connection |
| | | * attempts succeed, which is unlikely (if one fails, then they are |
| | | * all likely to fail). |
| | | */ |
| | | final List<QueueElement> waitingFutures = |
| | | new LinkedList<CachedConnectionPool.QueueElement>(); |
| | | synchronized (queue) { |
| | | if (hasWaitingFutures()) { |
| | | holder = queue.removeFirst(); |
| | | } else { |
| | | // No waiting futures. |
| | | return; |
| | | while (hasWaitingFutures()) { |
| | | waitingFutures.add(queue.removeFirst()); |
| | | } |
| | | } |
| | | |
| | | // There was waiting future, so close it. |
| | | holder.getWaitingFuture().handleErrorResult(error); |
| | | for (QueueElement waitingFuture : waitingFutures) { |
| | | waitingFuture.getWaitingFuture().handleErrorResult(error); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | |
| | | "Connection attempt succeeded: availableConnections=%d, maxPoolSize=%d", |
| | | currentPoolSize(), maxPoolSize)); |
| | | } |
| | | pendingConnectionAttempts.decrementAndGet(); |
| | | publishConnection(connection); |
| | | } |
| | | } |
| | |
| | | * availableConnections. |
| | | */ |
| | | connection.close(); |
| | | pendingConnectionAttempts.incrementAndGet(); |
| | | factory.getConnectionAsync(connectionResultHandler); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | |
| | | private final Semaphore availableConnections; |
| | | private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler(); |
| | | private final int corePoolSize; |
| | | |
| | | private final ConnectionFactory factory; |
| | | private boolean isClosed = false; |
| | | private final ScheduledFuture<?> idleTimeoutFuture; |
| | |
| | | private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); |
| | | private final ReferenceCountedObject<ScheduledExecutorService>.Reference scheduler; |
| | | |
| | | /** |
| | | * The number of new connections which are in the process of being |
| | | * established. |
| | | */ |
| | | private final AtomicInteger pendingConnectionAttempts = new AtomicInteger(); |
| | | |
| | | CachedConnectionPool(final ConnectionFactory factory, final int corePoolSize, |
| | | final int maximumPoolSize, final long idleTimeout, final TimeUnit unit, |
| | | final ScheduledExecutorService scheduler) { |
| | |
| | | } |
| | | } |
| | | |
| | | if (!holder.isWaitingFuture()) { |
| | | // There was a completed connection attempt. |
| | | final Connection connection = holder.getWaitingConnection(); |
| | | if (connection.isValid()) { |
| | | final PooledConnection pooledConnection = |
| | | newPooledConnection(connection, getStackTraceIfDebugEnabled()); |
| | | if (handler != null) { |
| | | handler.handleResult(pooledConnection); |
| | | } |
| | | return new CompletedFutureResult<Connection>(pooledConnection); |
| | | } else { |
| | | // Close the stale connection and try again. |
| | | connection.close(); |
| | | availableConnections.release(); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection no longer valid: availableConnections=%d, poolSize=%d", |
| | | currentPoolSize(), maxPoolSize)); |
| | | } |
| | | } |
| | | } else { |
| | | if (holder.isWaitingFuture()) { |
| | | // Grow the pool if needed. |
| | | final FutureResult<Connection> future = holder.getWaitingFuture(); |
| | | if (!future.isDone() && availableConnections.tryAcquire()) { |
| | | pendingConnectionAttempts.incrementAndGet(); |
| | | factory.getConnectionAsync(connectionResultHandler); |
| | | } |
| | | return future; |
| | | } |
| | | |
| | | // There was a completed connection attempt. |
| | | final Connection connection = holder.getWaitingConnection(); |
| | | if (connection.isValid()) { |
| | | final PooledConnection pooledConnection = |
| | | newPooledConnection(connection, getStackTraceIfDebugEnabled()); |
| | | if (handler != null) { |
| | | handler.handleResult(pooledConnection); |
| | | } |
| | | return new CompletedFutureResult<Connection>(pooledConnection); |
| | | } else { |
| | | // Close the stale connection and try again. |
| | | connection.close(); |
| | | availableConnections.release(); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection no longer valid: availableConnections=%d, poolSize=%d", |
| | | currentPoolSize(), maxPoolSize)); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public String toString() { |
| | | final StringBuilder builder = new StringBuilder(); |
| | | builder.append("CachedConnectionPool("); |
| | | builder.append(String.valueOf(factory)); |
| | | builder.append(','); |
| | | builder.append(maxPoolSize); |
| | | builder.append(')'); |
| | | return builder.toString(); |
| | | final int size = currentPoolSize(); |
| | | final int pending = pendingConnectionAttempts.get(); |
| | | int in = 0; |
| | | int blocked = 0; |
| | | synchronized (queue) { |
| | | for (QueueElement qe : queue) { |
| | | if (qe.isWaitingFuture()) { |
| | | blocked++; |
| | | } else { |
| | | in++; |
| | | } |
| | | } |
| | | } |
| | | final int out = size - in - pending; |
| | | return String.format("CachedConnectionPool(size=%d[in:%d + out:%d + " |
| | | + "pending:%d], maxSize=%d, blocked=%d, factory=%s)", size, in, out, pending, |
| | | maxPoolSize, blocked, String.valueOf(factory)); |
| | | } |
| | | |
| | | /** |