From 9f2bba679ab597f1e50078a29d145100e3baed3c Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Tue, 19 Oct 2010 16:36:12 +0000
Subject: [PATCH] Make ConnectionPool implementation fully async and fix some race conditions in the unit tests.
---
sdk/src/org/opends/sdk/ConnectionPool.java | 589 +++++++++++++++++++++++++++++++++-------------------------
1 files changed, 332 insertions(+), 257 deletions(-)
diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index a7b5f75..e9497da 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,8 +30,11 @@
import java.util.Collection;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -41,6 +44,7 @@
import com.sun.opends.sdk.util.AsynchronousFutureResult;
import com.sun.opends.sdk.util.CompletedFutureResult;
import com.sun.opends.sdk.util.StaticUtils;
+import com.sun.opends.sdk.util.Validator;
@@ -49,36 +53,101 @@
*/
final class ConnectionPool extends AbstractConnectionFactory
{
- // Future used for waiting for pooled connections to become available.
- private static final class FuturePooledConnection extends
- AsynchronousFutureResult<AsynchronousConnection>
+
+ /**
+ * This result handler is invoked when an attempt to add a new connection to
+ * the pool completes.
+ */
+ private final class ConnectionResultHandler implements
+ ResultHandler<AsynchronousConnection>
{
- private FuturePooledConnection(
- final ResultHandler<? super AsynchronousConnection> handler)
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleErrorResult(final ErrorResultException error)
{
- super(handler);
+ // Connection attempt failed, so decrease the pool size.
+ currentPoolSize.release();
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Connection attempt failed: " + error.getMessage()
+ + " currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ QueueElement holder;
+ synchronized (queue)
+ {
+ if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
+ {
+ // No waiting futures.
+ return;
+ }
+ else
+ {
+ holder = queue.removeFirst();
+ }
+ }
+
+ // There was waiting future, so close it.
+ holder.getWaitingFuture().handleErrorResult(error);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Connection attempt succeeded: "
+ + " currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ publishConnection(connection);
}
}
- private final class PooledConnectionWapper implements AsynchronousConnection,
- ConnectionEventListener
+ /**
+ * A pooled connection is passed to the client. It wraps an underlying
+ * "pooled" connection obtained from the underlying factory and lasts until
+ * the client application closes this connection. More specifically, pooled
+ * connections are not actually stored in the internal queue.
+ */
+ private final class PooledConnection implements AsynchronousConnection
{
+ // Connection event listeners registed against this pooled connection should
+ // have the same life time as the pooled connection.
+ private final List<ConnectionEventListener> listeners =
+ new CopyOnWriteArrayList<ConnectionEventListener>();
+
private final AsynchronousConnection connection;
- private volatile boolean isClosed;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private PooledConnectionWapper(final AsynchronousConnection connection)
+ PooledConnection(final AsynchronousConnection connection)
{
this.connection = connection;
- this.connection.addConnectionEventListener(this);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Void> abandon(final AbandonRequest request)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
@@ -92,6 +161,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> add(final AddRequest request,
final ResultHandler<? super Result> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -106,6 +179,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> add(final AddRequest request,
final ResultHandler<? super Result> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -122,18 +199,28 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void addConnectionEventListener(
final ConnectionEventListener listener) throws IllegalStateException,
NullPointerException
{
+ Validator.ensureNotNull(listener);
if (isClosed())
{
throw new IllegalStateException();
}
+ listeners.add(listener);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<BindResult> bind(final BindRequest request,
final ResultHandler<? super BindResult> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -148,6 +235,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<BindResult> bind(final BindRequest request,
final ResultHandler<? super BindResult> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -164,47 +255,51 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void close()
{
- synchronized (pool)
+ if (!isClosed.compareAndSet(false, true))
{
- if (isClosed)
- {
- return;
- }
- isClosed = true;
+ // Already closed.
+ return;
+ }
- // Don't put invalid connections back in the pool.
- if (connection.isValid())
+ // Don't put invalid connections back in the pool.
+ if (connection.isValid())
+ {
+ publishConnection(connection);
+ }
+ else
+ {
+ // The connection may have been disconnected by the remote server, but
+ // the server may still be available. In order to avoid leaving pending
+ // futures hanging indefinitely, we should try to reconnect immediately.
+
+ // Close the dead connection.
+ connection.close();
+
+ // Try to get a new connection to replace it.
+ factory.getAsynchronousConnection(connectionResultHandler);
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
- releaseConnection(connection);
- return;
+ StaticUtils.DEBUG_LOG.warning(String.format(
+ "Connection no longer valid. "
+ + "currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
}
}
-
- // Connection is no longer valid. Close outside of lock
- connection.removeConnectionEventListener(this);
- connection.close();
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Dead connection released and closed. "
- + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Reconnect attempt starting. "
- + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
- connectionFactory.getAsynchronousConnection(new ReconnectHandler());
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void close(final UnbindRequest request, final String reason)
throws NullPointerException
{
@@ -213,6 +308,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<CompareResult> compare(final CompareRequest request,
final ResultHandler<? super CompareResult> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -227,6 +326,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<CompareResult> compare(final CompareRequest request,
final ResultHandler<? super CompareResult> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -243,48 +346,10 @@
- public void handleConnectionClosed()
- {
- // Ignore - we intercept close via the close method.
- }
-
-
-
- public void handleConnectionError(final boolean isDisconnectNotification,
- final ErrorResultException error)
- {
- // Remove this connection from the pool if its in there. If not,
- // just ignore and wait for the user to close and we can deal with it
- // there.
- if (pool.remove(this))
- {
- numConnections--;
- connection.removeConnectionEventListener(this);
-
- // FIXME: should still close the connection, but we need to be
- // careful that users of the pooled connection get a sensible
- // error if they continue to use it (i.e. not an NPE or ISE).
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Connection error occured and removed from pool: "
- + error.getMessage()
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
- }
- }
-
-
-
- public void handleUnsolicitedNotification(final ExtendedResult notification)
- {
- // Ignore
- }
-
-
-
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> delete(final DeleteRequest request,
final ResultHandler<? super Result> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -299,6 +364,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> delete(final DeleteRequest request,
final ResultHandler<? super Result> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -315,6 +384,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public <R extends ExtendedResult> FutureResult<R> extendedRequest(
final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -329,6 +402,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public <R extends ExtendedResult> FutureResult<R> extendedRequest(
final ExtendedRequest<R> request,
final ResultHandler<? super R> resultHandler,
@@ -349,6 +426,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public Connection getSynchronousConnection()
{
return new SynchronousConnection(this);
@@ -359,20 +437,29 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isClosed()
{
- return isClosed;
+ return isClosed.get();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean isValid()
{
- return !isClosed && connection.isValid();
+ return connection.isValid() && !isClosed();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> modify(final ModifyRequest request,
final ResultHandler<? super Result> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -387,6 +474,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> modify(final ModifyRequest request,
final ResultHandler<? super Result> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -403,6 +494,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> modifyDN(final ModifyDNRequest request,
final ResultHandler<? super Result> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -417,6 +512,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> modifyDN(final ModifyDNRequest request,
final ResultHandler<? super Result> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -436,6 +535,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<SearchResultEntry> readEntry(final DN name,
final Collection<String> attributeDescriptions,
final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -454,6 +554,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<RootDSE> readRootDSE(
final ResultHandler<? super RootDSE> handler)
throws UnsupportedOperationException, IllegalStateException
@@ -470,6 +571,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Schema> readSchema(final DN name,
final ResultHandler<? super Schema> handler)
throws UnsupportedOperationException, IllegalStateException
@@ -486,6 +588,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Schema> readSchemaForEntry(final DN name,
final ResultHandler<? super Schema> handler)
throws UnsupportedOperationException, IllegalStateException
@@ -499,17 +602,27 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void removeConnectionEventListener(
final ConnectionEventListener listener) throws NullPointerException
{
+ Validator.ensureNotNull(listener);
if (isClosed())
{
throw new IllegalStateException();
}
+ listeners.remove(listener);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> search(final SearchRequest request,
final SearchResultHandler handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -524,6 +637,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> search(final SearchRequest request,
final SearchResultHandler resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -543,6 +660,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<SearchResultEntry> searchSingleEntry(
final SearchRequest request,
final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -561,9 +679,10 @@
/**
* {@inheritDoc}
*/
+ @Override
public String toString()
{
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("PooledConnection(");
builder.append(connection);
builder.append(')');
@@ -573,63 +692,86 @@
- private class ReconnectHandler implements
- ResultHandler<AsynchronousConnection>
+ /**
+ * A queue element is either a pending connection request future awaiting an
+ * {@code AsynchronousConnection} or it is an unused
+ * {@code AsynchronousConnection} awaiting a connection request.
+ */
+ private static final class QueueElement
{
- public void handleErrorResult(final ErrorResultException error)
- {
- // The reconnect failed. Fail the connect attempt.
- numConnections--;
- // The reconnect failed. The underlying connection factory
- // probably went
- // down. Just fail all pending futures
- synchronized (pool)
- {
- while (!pendingFutures.isEmpty())
- {
- pendingFutures.poll().handleErrorResult(error);
- }
- }
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Reconnect failed. Failed all pending futures: "
- + error.getMessage()
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
+ private final Object value;
+
+
+ QueueElement(final AsynchronousConnection connection)
+ {
+ this.value = connection;
}
- public void handleResult(final AsynchronousConnection connection)
+ QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
{
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
+ }
+
+
+
+ AsynchronousConnection getWaitingConnection()
+ {
+ if (value instanceof AsynchronousConnection)
{
- StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. "
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ return (AsynchronousConnection) value;
}
- synchronized (pool)
+ else
{
- releaseConnection(connection);
+ throw new IllegalStateException();
}
}
+
+
+
+ @SuppressWarnings("unchecked")
+ AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
+ {
+ if (value instanceof AsynchronousFutureResult)
+ {
+ return (AsynchronousFutureResult<AsynchronousConnection>) value;
+ }
+ else
+ {
+ throw new IllegalStateException();
+ }
+ }
+
+
+
+ boolean isWaitingFuture()
+ {
+ return value instanceof AsynchronousFutureResult;
+ }
+
+
+
+ public String toString()
+ {
+ return String.valueOf(value);
+ }
}
- private final ConnectionFactory connectionFactory;
+ // Guarded by queue.
+ private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
- private volatile int numConnections;
+ private final ConnectionFactory factory;
private final int poolSize;
- // FIXME: should use a better collection than this - CLQ?
- private final Queue<AsynchronousConnection> pool;
+ private final Semaphore currentPoolSize;
- private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
+ private final ResultHandler<AsynchronousConnection> connectionResultHandler =
+ new ConnectionResultHandler();
@@ -637,109 +779,16 @@
* Creates a new connection pool which will maintain {@code poolSize}
* connections created using the provided connection factory.
*
- * @param connectionFactory
+ * @param factory
* The connection factory to use for creating new connections.
* @param poolSize
* The maximum size of the connection pool.
*/
- ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize)
+ ConnectionPool(final ConnectionFactory factory, final int poolSize)
{
- this.connectionFactory = connectionFactory;
+ this.factory = factory;
this.poolSize = poolSize;
- this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
- this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
- }
-
-
-
- @Override
- public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
- final ResultHandler<? super AsynchronousConnection> handler)
- {
- // This entire method is synchronized to ensure new connects are
- // done synchronously to avoid the "pending connect" case.
- AsynchronousConnection conn;
- synchronized (pool)
- {
- // Check to see if we have a connection in the pool
- conn = pool.poll();
- if (conn == null)
- {
- // Pool was empty. Maybe a new connection if pool size is not
- // reached
- if (numConnections >= poolSize)
- {
- // We reached max # of conns so wait for a connection to
- // become available.
- final FuturePooledConnection future = new FuturePooledConnection(
- handler);
- pendingFutures.add(future);
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "No connections available. Wait-listed"
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
-
- return future;
- }
- }
- }
-
- if (conn == null)
- {
- try
- {
- // We can create a new connection.
- conn = connectionFactory.getAsynchronousConnection(null).get();
- numConnections++;
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "New connection established and aquired. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
- }
- catch (final ErrorResultException e)
- {
- if (handler != null)
- {
- handler.handleErrorResult(e);
- }
- return new CompletedFutureResult<AsynchronousConnection>(e);
- }
- catch (final InterruptedException e)
- {
- final ErrorResultException error = new ErrorResultException(Responses
- .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
- if (handler != null)
- {
- handler.handleErrorResult(error);
- }
- return new CompletedFutureResult<AsynchronousConnection>(error);
- }
- }
- else
- {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "Connection aquired from pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
- }
-
- final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- conn);
- if (handler != null)
- {
- handler.handleResult(pooledConnection);
- }
- return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+ this.currentPoolSize = new Semaphore(poolSize);
}
@@ -747,11 +796,59 @@
/**
* {@inheritDoc}
*/
+ @Override
+ public FutureResult<AsynchronousConnection> getAsynchronousConnection(
+ final ResultHandler<? super AsynchronousConnection> handler)
+ {
+ QueueElement holder;
+ synchronized (queue)
+ {
+ if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
+ {
+ holder = new QueueElement(handler);
+ queue.add(holder);
+ }
+ else
+ {
+ holder = queue.removeFirst();
+ }
+ }
+
+ if (!holder.isWaitingFuture())
+ {
+ // There was a completed connection attempt.
+ final AsynchronousConnection connection = holder.getWaitingConnection();
+ final PooledConnection pooledConnection = new PooledConnection(connection);
+ if (handler != null)
+ {
+ handler.handleResult(pooledConnection);
+ }
+ return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+ }
+ else
+ {
+ // Grow the pool if needed.
+ final FutureResult<AsynchronousConnection> future = holder
+ .getWaitingFuture();
+ if (!future.isDone() && currentPoolSize.tryAcquire())
+ {
+ factory.getAsynchronousConnection(connectionResultHandler);
+ }
+ return future;
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public String toString()
{
final StringBuilder builder = new StringBuilder();
builder.append("ConnectionPool(");
- builder.append(String.valueOf(connectionFactory));
+ builder.append(String.valueOf(factory));
builder.append(',');
builder.append(poolSize);
builder.append(')');
@@ -760,47 +857,25 @@
- private void releaseConnection(final AsynchronousConnection connection)
+ private void publishConnection(final AsynchronousConnection connection)
{
- // See if there waiters pending.
- for (;;)
+ QueueElement holder;
+ synchronized (queue)
{
- final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
- final FuturePooledConnection future = pendingFutures.poll();
-
- if (future == null)
+ if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
{
- // No waiters - so drop out and add connection to pool.
- break;
- }
-
- future.handleResult(pooledConnection);
-
- if (!future.isCancelled())
- {
- // The future was not cancelled and the connection was
- // accepted.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "Connection released and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections, pool.size(),
- pendingFutures.size()));
- }
+ holder = new QueueElement(connection);
+ queue.add(holder);
return;
}
+ else
+ {
+ holder = queue.removeFirst();
+ }
}
- // No waiters. Put back in pool.
- pool.offer(connection);
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "Connection released to pool. numConnections: %d, "
- + "poolSize: %d, pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
- }
+ // There was waiting future, so close it.
+ final PooledConnection pooledConnection = new PooledConnection(connection);
+ holder.getWaitingFuture().handleResult(pooledConnection);
}
}
--
Gitblit v1.10.0