| | |
| | | * the pool completes. |
| | | */ |
| | | private final class ConnectionResultHandler implements ResultHandler<Connection> { |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) { |
| | | // Connection attempt failed, so decrease the pool size. |
| | | currentPoolSize.release(); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Connection attempt failed: " + error.getMessage() |
| | | + " currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", error |
| | | .getMessage(), poolSize - currentPoolSize.availablePermits(), |
| | | poolSize)); |
| | | } |
| | | |
| | | QueueElement holder; |
| | |
| | | holder.getWaitingFuture().handleErrorResult(error); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void handleResult(final Connection connection) { |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Connection attempt succeeded: " |
| | | + " currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection attempt succeeded: currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | } |
| | | |
| | | publishConnection(connection); |
| | | } |
| | | } |
| | |
| | | notifyErrorOccurred = error != null; |
| | | if (!notifyClose) { |
| | | if (listeners == null) { |
| | | // Create and register first listener. If an error has |
| | | // already occurred on the underlying connection, then |
| | | // the listener may be immediately invoked so ensure |
| | | // that it is already in the list. |
| | | /* |
| | | * Create and register first listener. If an error has |
| | | * already occurred on the underlying connection, then |
| | | * the listener may be immediately invoked so ensure |
| | | * that it is already in the list. |
| | | */ |
| | | listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); |
| | | listeners.add(listener); |
| | | connection.addConnectionEventListener(this); |
| | |
| | | tmpListeners = listeners; |
| | | } |
| | | |
| | | // Remove underlying listener if needed and do this before |
| | | // subsequent connection events may occur. |
| | | /* |
| | | * Remove underlying listener if needed and do this before |
| | | * subsequent connection events may occur. |
| | | */ |
| | | if (tmpListeners != null) { |
| | | connection.removeConnectionEventListener(this); |
| | | } |
| | |
| | | if (connection.isValid()) { |
| | | publishConnection(connection); |
| | | } else { |
| | | // The connection may have been disconnected by the remote |
| | | // server, but the server may still be available. In order to |
| | | // avoid leaving pending futures hanging indefinitely, we should |
| | | // try to reconnect immediately. No need to release/acquire |
| | | // currentPoolSize. |
| | | /* |
| | | * The connection may have been disconnected by the remote |
| | | * server, but the server may still be available. In order to |
| | | * avoid leaving pending futures hanging indefinitely, we should |
| | | * try to reconnect immediately. No need to release/acquire |
| | | * currentPoolSize. |
| | | */ |
| | | connection.close(); |
| | | factory.getConnectionAsync(connectionResultHandler); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Connection no longer valid. " |
| | | + "currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection no longer valid: currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | QueueElement(final ResultHandler<? super Connection> handler) { |
| | | this.value = new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>(handler); |
| | | this.value = |
| | | new AsynchronousFutureResult<Connection, ResultHandler<? super Connection>>( |
| | | handler); |
| | | } |
| | | |
| | | @Override |
| | |
| | | private final int poolSize; |
| | | private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); |
| | | |
| | | /** |
| | | * Creates a new connection pool which will maintain {@code poolSize} |
| | | * connections created using the provided connection factory. |
| | | * |
| | | * @param factory |
| | | * The connection factory to use for creating new connections. |
| | | * @param poolSize |
| | | * The maximum size of the connection pool. |
| | | */ |
| | | FixedConnectionPool(final ConnectionFactory factory, final int poolSize) { |
| | | this.factory = factory; |
| | | this.poolSize = poolSize; |
| | | this.currentPoolSize = new Semaphore(poolSize); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void close() { |
| | | final LinkedList<Connection> idleConnections; |
| | |
| | | } |
| | | isClosed = true; |
| | | |
| | | // Remove any connections which are waiting in the queue as these |
| | | // can be closed immediately. |
| | | /* |
| | | * Remove any connections which are waiting in the queue as these |
| | | * can be closed immediately. |
| | | */ |
| | | idleConnections = new LinkedList<Connection>(); |
| | | while (hasWaitingConnections()) { |
| | | final QueueElement holder = queue.removeFirst(); |
| | |
| | | for (final Connection connection : idleConnections) { |
| | | closeConnection(connection); |
| | | } |
| | | |
| | | // Close the underlying factory. |
| | | factory.close(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public Connection getConnection() throws ErrorResultException { |
| | | try { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Connection> getConnectionAsync( |
| | | final ResultHandler<? super Connection> handler) { |
| | |
| | | currentPoolSize.release(); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Connection no longer valid. " |
| | | + "currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection no longer valid: currentPoolSize=%d, poolSize=%d", |
| | | poolSize - currentPoolSize.availablePermits(), poolSize)); |
| | | } |
| | | } |
| | | } else { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() { |
| | | final StringBuilder builder = new StringBuilder(); |
| | |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Closing connection because connection pool is closing: " |
| | | + " currentPoolSize=%d, poolSize=%d", poolSize |
| | | + "currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | } |
| | | } |
| | |
| | | holder.getWaitingFuture().handleErrorResult(e); |
| | | |
| | | if (DEBUG_LOG.isLoggable(Level.FINE)) { |
| | | DEBUG_LOG.fine(String.format("Connection attempt failed: " + e.getMessage() |
| | | + " currentPoolSize=%d, poolSize=%d", poolSize |
| | | - currentPoolSize.availablePermits(), poolSize)); |
| | | DEBUG_LOG.fine(String.format( |
| | | "Connection attempt failed: %s, currentPoolSize=%d, poolSize=%d", e |
| | | .getMessage(), poolSize - currentPoolSize.availablePermits(), |
| | | poolSize)); |
| | | } |
| | | } |
| | | } else { |
| | | final PooledConnection pooledConnection = new PooledConnection(connection); |
| | | holder.getWaitingFuture().handleResult(pooledConnection); |
| | | holder.getWaitingFuture().handleResult(new PooledConnection(connection)); |
| | | } |
| | | } |
| | | |