From f2160f4bd1c8ac67e5a86a6710d431e8932877f9 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 28 May 2010 11:47:51 +0000
Subject: [PATCH] Synchronize SDK on java.net with internal repository.
---
sdk/src/org/opends/sdk/ConnectionPool.java | 737 +++++++++++++++++++++++++++++++++-----------------------
1 files changed, 429 insertions(+), 308 deletions(-)
diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index d0b8780..de641ce 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -49,21 +49,32 @@
*/
final class ConnectionPool extends AbstractConnectionFactory
{
- private final ConnectionFactory connectionFactory;
-
- private volatile int numConnections;
-
- private final int poolSize;
-
- // FIXME: should use a better collection than this - CLQ?
- private final Queue<AsynchronousConnection> pool;
-
- private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
+ // Future used for waiting for pooled connections to become available.
+ private static final class FuturePooledConnection extends
+ AbstractFutureResult<AsynchronousConnection>
+ {
+ private FuturePooledConnection(
+ final ResultHandler<? super AsynchronousConnection> handler)
+ {
+ super(handler);
+ }
- private final class PooledConnectionWapper implements
- AsynchronousConnection, ConnectionEventListener
+ /**
+ * {@inheritDoc}
+ */
+ public int getRequestID()
+ {
+ return -1;
+ }
+
+ }
+
+
+
+ private final class PooledConnectionWapper implements AsynchronousConnection,
+ ConnectionEventListener
{
private final AsynchronousConnection connection;
@@ -71,7 +82,7 @@
- private PooledConnectionWapper(AsynchronousConnection connection)
+ private PooledConnectionWapper(final AsynchronousConnection connection)
{
this.connection = connection;
this.connection.addConnectionEventListener(this);
@@ -79,7 +90,7 @@
- public void abandon(AbandonRequest request)
+ public FutureResult<Void> abandon(final AbandonRequest request)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
@@ -87,13 +98,13 @@
{
throw new IllegalStateException();
}
- connection.abandon(request);
+ return connection.abandon(request);
}
- public FutureResult<Result> add(AddRequest request,
- ResultHandler<Result> handler)
+ public FutureResult<Result> add(final AddRequest request,
+ final ResultHandler<Result> handler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
@@ -106,8 +117,36 @@
- public FutureResult<BindResult> bind(BindRequest request,
- ResultHandler<? super BindResult> handler)
+ public FutureResult<Result> add(final AddRequest request,
+ final ResultHandler<Result> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection
+ .add(request, resultHandler, intermediateResponseHandler);
+ }
+
+
+
+ public void addConnectionEventListener(
+ final ConnectionEventListener listener) throws IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ }
+
+
+
+ public FutureResult<BindResult> bind(final BindRequest request,
+ final ResultHandler<? super BindResult> handler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
@@ -120,6 +159,22 @@
+ public FutureResult<BindResult> bind(final BindRequest request,
+ final ResultHandler<? super BindResult> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.bind(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
public void close()
{
synchronized (pool)
@@ -145,26 +200,23 @@
{
StaticUtils.DEBUG_LOG.warning(String.format(
"Dead connection released and closed. "
- + "numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections, pool.size(),
- pendingFutures.size()));
+ + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures.size()));
}
if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
StaticUtils.DEBUG_LOG.warning(String.format(
"Reconnect attempt starting. "
- + "numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections, pool.size(),
- pendingFutures.size()));
+ + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures.size()));
}
- connectionFactory
- .getAsynchronousConnection(new ReconnectHandler());
+ connectionFactory.getAsynchronousConnection(new ReconnectHandler());
}
- public void close(UnbindRequest request, String reason)
+ public void close(final UnbindRequest request, final String reason)
throws NullPointerException
{
close();
@@ -172,8 +224,8 @@
- public FutureResult<CompareResult> compare(CompareRequest request,
- ResultHandler<? super CompareResult> handler)
+ public FutureResult<CompareResult> compare(final CompareRequest request,
+ final ResultHandler<? super CompareResult> handler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
@@ -186,8 +238,67 @@
- public FutureResult<Result> delete(DeleteRequest request,
- ResultHandler<Result> handler)
+ public FutureResult<CompareResult> compare(final CompareRequest request,
+ final ResultHandler<? super CompareResult> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.compare(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ public void connectionClosed()
+ {
+ // Ignore - we intercept close via the close method.
+ }
+
+
+
+ public void connectionErrorOccurred(final boolean isDisconnectNotification,
+ final ErrorResultException error)
+ {
+ // Remove this connection from the pool if its in there. If not,
+ // just ignore and wait for the user to close and we can deal with it
+ // there.
+ if (pool.remove(this))
+ {
+ numConnections--;
+ connection.removeConnectionEventListener(this);
+
+ // FIXME: should still close the connection, but we need to be
+ // careful that users of the pooled connection get a sensible
+ // error if they continue to use it (i.e. not an NPE or ISE).
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG.warning(String.format(
+ "Connection error occured and removed from pool: "
+ + error.getMessage()
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures.size()));
+ }
+ }
+ }
+
+
+
+ public void connectionReceivedUnsolicitedNotification(
+ final ExtendedResult notification)
+ {
+ // Ignore
+ }
+
+
+
+ public FutureResult<Result> delete(final DeleteRequest request,
+ final ResultHandler<Result> handler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
@@ -200,8 +311,24 @@
- public <R extends Result> FutureResult<R> extendedRequest(
- ExtendedRequest<R> request, ResultHandler<? super R> handler)
+ public FutureResult<Result> delete(final DeleteRequest request,
+ final ResultHandler<Result> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.delete(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+ final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
@@ -214,8 +341,10 @@
- public FutureResult<Result> modify(ModifyRequest request,
- ResultHandler<Result> handler)
+ public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+ final ExtendedRequest<R> request,
+ final ResultHandler<? super R> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
@@ -223,37 +352,8 @@
{
throw new IllegalStateException();
}
- return connection.modify(request, handler);
- }
-
-
-
- public FutureResult<Result> modifyDN(ModifyDNRequest request,
- ResultHandler<Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modifyDN(request, handler);
- }
-
-
-
- public FutureResult<Result> search(SearchRequest request,
- ResultHandler<Result> resultHandler,
- SearchResultHandler searchResulthandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.search(request, resultHandler,
- searchResulthandler);
+ return connection.extendedRequest(request, resultHandler,
+ intermediateResponseHandler);
}
@@ -261,107 +361,9 @@
/**
* {@inheritDoc}
*/
- public FutureResult<SearchResultEntry> readEntry(DN name,
- Collection<String> attributeDescriptions,
- ResultHandler<? super SearchResultEntry> resultHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
+ public Connection getSynchronousConnection()
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.readEntry(name, attributeDescriptions,
- resultHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public FutureResult<SearchResultEntry> searchSingleEntry(
- SearchRequest request,
- ResultHandler<? super SearchResultEntry> resultHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.searchSingleEntry(request, resultHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public FutureResult<RootDSE> readRootDSE(
- ResultHandler<RootDSE> handler)
- throws UnsupportedOperationException, IllegalStateException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.readRootDSE(handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public FutureResult<Schema> readSchemaForEntry(DN name,
- ResultHandler<Schema> handler)
- throws UnsupportedOperationException, IllegalStateException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.readSchemaForEntry(name, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public FutureResult<Schema> readSchema(DN name,
- ResultHandler<Schema> handler)
- throws UnsupportedOperationException, IllegalStateException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.readSchema(name, handler);
- }
-
-
-
- public void addConnectionEventListener(
- ConnectionEventListener listener) throws IllegalStateException,
- NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- }
-
-
-
- public void removeConnectionEventListener(
- ConnectionEventListener listener) throws NullPointerException
- {
- if (isClosed())
- {
- throw new IllegalStateException();
- }
+ return new SynchronousConnection(this);
}
@@ -383,42 +385,189 @@
- public void connectionReceivedUnsolicitedNotification(
- GenericExtendedResult notification)
+ public FutureResult<Result> modify(final ModifyRequest request,
+ final ResultHandler<Result> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
{
- // Ignore
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.modify(request, handler);
}
- public void connectionErrorOccurred(
- boolean isDisconnectNotification, ErrorResultException error)
+ public FutureResult<Result> modify(final ModifyRequest request,
+ final ResultHandler<Result> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
{
- // Remove this connection from the pool if its in there. If not,
- // just
- // ignore and wait for the user to close and we can deal with it
- // there.
- if (pool.remove(this))
+ if (isClosed())
{
- numConnections--;
- connection.removeConnectionEventListener(this);
-
- // FIXME: should still close the connection, but we need to be
- // careful that users of the pooled connection get a sensible
- // error if they continue to use it (i.e. not an NPE or ISE).
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG
- .warning(String
- .format(
- "Connection error occured and removed from pool: "
- + error.getMessage()
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
- }
+ throw new IllegalStateException();
}
+ return connection.modify(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ public FutureResult<Result> modifyDN(final ModifyDNRequest request,
+ final ResultHandler<Result> handler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.modifyDN(request, handler);
+ }
+
+
+
+ public FutureResult<Result> modifyDN(final ModifyDNRequest request,
+ final ResultHandler<Result> resultHandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.modifyDN(request, resultHandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public FutureResult<SearchResultEntry> readEntry(final DN name,
+ final Collection<String> attributeDescriptions,
+ final ResultHandler<? super SearchResultEntry> resultHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readEntry(name, attributeDescriptions, resultHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public FutureResult<RootDSE> readRootDSE(
+ final ResultHandler<RootDSE> handler)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readRootDSE(handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public FutureResult<Schema> readSchema(final DN name,
+ final ResultHandler<Schema> handler)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readSchema(name, handler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public FutureResult<Schema> readSchemaForEntry(final DN name,
+ final ResultHandler<Schema> handler)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readSchemaForEntry(name, handler);
+ }
+
+
+
+ public void removeConnectionEventListener(
+ final ConnectionEventListener listener) throws NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ }
+
+
+
+ public FutureResult<Result> search(final SearchRequest request,
+ final ResultHandler<Result> resultHandler,
+ final SearchResultHandler searchResulthandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.search(request, resultHandler, searchResulthandler);
+ }
+
+
+
+ public FutureResult<Result> search(final SearchRequest request,
+ final ResultHandler<Result> resultHandler,
+ final SearchResultHandler searchResulthandler,
+ final IntermediateResponseHandler intermediateResponseHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.search(request, resultHandler, searchResulthandler,
+ intermediateResponseHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public FutureResult<SearchResultEntry> searchSingleEntry(
+ final SearchRequest request,
+ final ResultHandler<? super SearchResultEntry> resultHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection.searchSingleEntry(request, resultHandler);
}
}
@@ -427,7 +576,7 @@
private class ReconnectHandler implements
ResultHandler<AsynchronousConnection>
{
- public void handleErrorResult(ErrorResultException error)
+ public void handleErrorResult(final ErrorResultException error)
{
// The reconnect failed. Fail the connect attempt.
numConnections--;
@@ -443,29 +592,24 @@
}
if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
- StaticUtils.DEBUG_LOG
- .warning(String
- .format(
- "Reconnect failed. Failed all pending futures: "
- + error.getMessage()
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG.warning(String.format(
+ "Reconnect failed. Failed all pending futures: "
+ + error.getMessage()
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures.size()));
}
}
- public void handleResult(AsynchronousConnection connection)
+ public void handleResult(final AsynchronousConnection connection)
{
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Reconnect succeded. "
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. "
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures.size()));
}
synchronized (pool)
{
@@ -476,75 +620,16 @@
- // Future used for waiting for pooled connections to become available.
- private static final class FuturePooledConnection extends
- AbstractFutureResult<AsynchronousConnection>
- {
- private FuturePooledConnection(
- ResultHandler<? super AsynchronousConnection> handler)
- {
- super(handler);
- }
+ private final ConnectionFactory connectionFactory;
+ private volatile int numConnections;
+ private final int poolSize;
- /**
- * {@inheritDoc}
- */
- public int getRequestID()
- {
- return -1;
- }
+ // FIXME: should use a better collection than this - CLQ?
+ private final Queue<AsynchronousConnection> pool;
- }
-
-
-
- private void releaseConnection(AsynchronousConnection connection)
- {
- // See if there waiters pending.
- for (;;)
- {
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
- FuturePooledConnection future = pendingFutures.poll();
-
- if (future == null)
- {
- // No waiters - so drop out and add connection to pool.
- break;
- }
-
- future.handleResult(pooledConnection);
-
- if (!future.isCancelled())
- {
- // The future was not cancelled and the connection was
- // accepted.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection released and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections, pool
- .size(), pendingFutures.size()));
- }
- return;
- }
- }
-
- // No waiters. Put back in pool.
- pool.offer(connection);
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "Connection released to pool. numConnections: %d, "
- + "poolSize: %d, pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
- }
- }
+ private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
@@ -553,12 +638,11 @@
* connections created using the provided connection factory.
*
* @param connectionFactory
- * The connection factory to use for creating new
- * connections.
+ * The connection factory to use for creating new connections.
* @param poolSize
* The maximum size of the connection pool.
*/
- ConnectionPool(ConnectionFactory connectionFactory, int poolSize)
+ ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize)
{
this.connectionFactory = connectionFactory;
this.poolSize = poolSize;
@@ -568,12 +652,12 @@
+ @Override
public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
- ResultHandler<AsynchronousConnection> handler)
+ final ResultHandler<AsynchronousConnection> handler)
{
// This entire method is synchronized to ensure new connects are
- // done
- // synchronously to avoid the "pending connect" case.
+ // done synchronously to avoid the "pending connect" case.
AsynchronousConnection conn;
synchronized (pool)
{
@@ -587,19 +671,16 @@
{
// We reached max # of conns so wait for a connection to
// become available.
- FuturePooledConnection future = new FuturePooledConnection(
+ final FuturePooledConnection future = new FuturePooledConnection(
handler);
pendingFutures.add(future);
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "No connections available. Wait-listed"
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
+ StaticUtils.DEBUG_LOG.finest(String.format(
+ "No connections available. Wait-listed"
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures.size()));
}
return future;
@@ -616,16 +697,13 @@
numConnections++;
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "New connection established and aquired. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
+ StaticUtils.DEBUG_LOG.finest(String.format(
+ "New connection established and aquired. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures.size()));
}
}
- catch (ErrorResultException e)
+ catch (final ErrorResultException e)
{
if (handler != null)
{
@@ -633,9 +711,9 @@
}
return new CompletedFutureResult<AsynchronousConnection>(e);
}
- catch (InterruptedException e)
+ catch (final InterruptedException e)
{
- ErrorResultException error = new ErrorResultException(Responses
+ final ErrorResultException error = new ErrorResultException(Responses
.newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
if (handler != null)
{
@@ -648,23 +726,66 @@
{
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection aquired from pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG.finest(String.format(
+ "Connection aquired from pool. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures.size()));
}
}
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
conn);
if (handler != null)
{
handler.handleResult(pooledConnection);
}
- return new CompletedFutureResult<AsynchronousConnection>(
- pooledConnection);
+ return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
}
+
+
+
+ private void releaseConnection(final AsynchronousConnection connection)
+ {
+ // See if there waiters pending.
+ for (;;)
+ {
+ final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ connection);
+ final FuturePooledConnection future = pendingFutures.poll();
+
+ if (future == null)
+ {
+ // No waiters - so drop out and add connection to pool.
+ break;
+ }
+
+ future.handleResult(pooledConnection);
+
+ if (!future.isCancelled())
+ {
+ // The future was not cancelled and the connection was
+ // accepted.
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.finest(String.format(
+ "Connection released and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections, pool.size(),
+ pendingFutures.size()));
+ }
+ return;
+ }
+ }
+
+ // No waiters. Put back in pool.
+ pool.offer(connection);
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.finest(String.format(
+ "Connection released to pool. numConnections: %d, "
+ + "poolSize: %d, pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
+ }
+ }
}
--
Gitblit v1.10.0