| | |
| | | } |
| | | isClosed = true; |
| | | |
| | | // Don't put closed connections back in the pool. |
| | | if (!connection.isValid()) |
| | | // Don't put invalid connections back in the pool. |
| | | if (connection.isValid()) |
| | | { |
| | | numConnections--; |
| | | } |
| | | else |
| | | { |
| | | // See if there waiters pending. |
| | | for (;;) |
| | | { |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | FuturePooledConnection future = pendingFutures.poll(); |
| | | |
| | | if (future == null) |
| | | { |
| | | // No waiters - so drop out and add connection to pool. |
| | | break; |
| | | } |
| | | |
| | | future.handleResult(pooledConnection); |
| | | |
| | | if (!future.isCancelled()) |
| | | { |
| | | // The future was not cancelled and the connection was |
| | | // accepted. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released to pool and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // No waiters. Put back in pool. |
| | | pool.offer(connection); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released to pool and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | releaseConnection(connection); |
| | | return; |
| | | } |
| | | } |
| | |
| | | // Connection is no longer valid. Close outside of lock |
| | | connection.removeConnectionEventListener(this); |
| | | connection.close(); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .warning(String |
| | | .format( |
| | | "Dead connection released to pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | "Dead connection released and closed. " |
| | | + "numConnections: %d, poolSize: %d, " + |
| | | "pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .warning(String |
| | | .format( |
| | | "Reconnect attempt starting. " |
| | | + "numConnections: %d, poolSize: %d, " + |
| | | "pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | connectionFactory.getAsynchronousConnection(new ReconnectHandler()); |
| | | } |
| | | |
| | | |
| | | public void close(UnbindRequest request, String reason) |
| | |
| | | // careful that users of the pooled connection get a sensible |
| | | // error if they continue to use it (i.e. not an NPE or ISE). |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .warning(String |
| | | .format( |
| | | "Connection error occured: " |
| | | "Connection error occured and removed from pool: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | |
| | | } |
| | | } |
| | | |
| | | private class ReconnectHandler |
| | | implements ResultHandler<AsynchronousConnection> |
| | | { |
| | | public void handleErrorResult(ErrorResultException error) { |
| | | // The reconnect failed. Fail the connect attempt. |
| | | numConnections --; |
| | | // The reconnect failed. The underlying connection factory probably went |
| | | // down. Just fail all pending futures |
| | | synchronized (pool) |
| | | { |
| | | while(!pendingFutures.isEmpty()) |
| | | { |
| | | pendingFutures.poll().handleErrorResult(error); |
| | | } |
| | | } |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .warning(String |
| | | .format( |
| | | "Reconnect failed. Failed all pending futures: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | |
| | | } |
| | | |
| | | public void handleResult(AsynchronousConnection connection) { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Reconnect succeded. " |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | synchronized (pool) |
| | | { |
| | | releaseConnection(connection); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Future used for waiting for pooled connections to become available. |
| | | private static final class FuturePooledConnection extends |
| | |
| | | |
| | | } |
| | | |
| | | private void releaseConnection(AsynchronousConnection connection) |
| | | { |
| | | // See if there waiters pending. |
| | | for (;;) |
| | | { |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | FuturePooledConnection future = pendingFutures.poll(); |
| | | |
| | | if (future == null) |
| | | { |
| | | // No waiters - so drop out and add connection to pool. |
| | | break; |
| | | } |
| | | |
| | | future.handleResult(pooledConnection); |
| | | |
| | | if (!future.isCancelled()) |
| | | { |
| | | // The future was not cancelled and the connection was |
| | | // accepted. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // No waiters. Put back in pool. |
| | | pool.offer(connection); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released to pool. numConnections: %d, " + |
| | | "poolSize: %d, pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | |
| | | try |
| | | { |
| | | // We can create a new connection. |
| | | conn = connectionFactory.getAsynchronousConnection(handler).get(); |
| | | conn = connectionFactory.getAsynchronousConnection(null).get(); |
| | | numConnections++; |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | |
| | | } |
| | | catch (ErrorResultException e) |
| | | { |
| | | if (handler != null) |
| | | { |
| | | handler.handleErrorResult(e); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>(e); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | return new CompletedFutureResult<AsynchronousConnection>( |
| | | ErrorResultException error = |
| | | new ErrorResultException(Responses.newResult( |
| | | ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e))); |
| | | ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); |
| | | if (handler != null) |
| | | { |
| | | handler.handleErrorResult(error); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>(error); |
| | | } |
| | | } |
| | | else |