| | |
| | | */ |
| | | final class ConnectionPool extends AbstractConnectionFactory |
| | | { |
| | | private final ConnectionFactory connectionFactory; |
| | | |
| | | private volatile int numConnections; |
| | | |
| | | private final int poolSize; |
| | | |
| | | // FIXME: should use a better collection than this - CLQ? |
| | | private final Queue<AsynchronousConnection> pool; |
| | | |
| | | private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures; |
| | | // Future used for waiting for pooled connections to become available. |
| | | private static final class FuturePooledConnection extends |
| | | AbstractFutureResult<AsynchronousConnection> |
| | | { |
| | | private FuturePooledConnection( |
| | | final ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | super(handler); |
| | | } |
| | | |
| | | |
| | | |
| | | private final class PooledConnectionWapper implements |
| | | AsynchronousConnection, ConnectionEventListener |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getRequestID() |
| | | { |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | private final class PooledConnectionWapper implements AsynchronousConnection, |
| | | ConnectionEventListener |
| | | { |
| | | private final AsynchronousConnection connection; |
| | | |
| | |
| | | |
| | | |
| | | |
| | | private PooledConnectionWapper(AsynchronousConnection connection) |
| | | private PooledConnectionWapper(final AsynchronousConnection connection) |
| | | { |
| | | this.connection = connection; |
| | | this.connection.addConnectionEventListener(this); |
| | |
| | | |
| | | |
| | | |
| | | public void abandon(AbandonRequest request) |
| | | public FutureResult<Void> abandon(final AbandonRequest request) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | connection.abandon(request); |
| | | return connection.abandon(request); |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> add(AddRequest request, |
| | | ResultHandler<Result> handler) |
| | | public FutureResult<Result> add(final AddRequest request, |
| | | final ResultHandler<Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | |
| | | |
| | | |
| | | |
| | | public FutureResult<BindResult> bind(BindRequest request, |
| | | ResultHandler<? super BindResult> handler) |
| | | public FutureResult<Result> add(final AddRequest request, |
| | | final ResultHandler<Result> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection |
| | | .add(request, resultHandler, intermediateResponseHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | public void addConnectionEventListener( |
| | | final ConnectionEventListener listener) throws IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<BindResult> bind(final BindRequest request, |
| | | final ResultHandler<? super BindResult> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | |
| | | |
| | | |
| | | |
| | | public FutureResult<BindResult> bind(final BindRequest request, |
| | | final ResultHandler<? super BindResult> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.bind(request, resultHandler, |
| | | intermediateResponseHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | public void close() |
| | | { |
| | | synchronized (pool) |
| | |
| | | { |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Dead connection released and closed. " |
| | | + "numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, pool.size(), |
| | | pendingFutures.size())); |
| | | + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Reconnect attempt starting. " |
| | | + "numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, pool.size(), |
| | | pendingFutures.size())); |
| | | + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | connectionFactory |
| | | .getAsynchronousConnection(new ReconnectHandler()); |
| | | connectionFactory.getAsynchronousConnection(new ReconnectHandler()); |
| | | } |
| | | |
| | | |
| | | |
| | | public void close(UnbindRequest request, String reason) |
| | | public void close(final UnbindRequest request, final String reason) |
| | | throws NullPointerException |
| | | { |
| | | close(); |
| | |
| | | |
| | | |
| | | |
| | | public FutureResult<CompareResult> compare(CompareRequest request, |
| | | ResultHandler<? super CompareResult> handler) |
| | | public FutureResult<CompareResult> compare(final CompareRequest request, |
| | | final ResultHandler<? super CompareResult> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> delete(DeleteRequest request, |
| | | ResultHandler<Result> handler) |
| | | public FutureResult<CompareResult> compare(final CompareRequest request, |
| | | final ResultHandler<? super CompareResult> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.compare(request, resultHandler, |
| | | intermediateResponseHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | public void connectionClosed() |
| | | { |
| | | // Ignore - we intercept close via the close method. |
| | | } |
| | | |
| | | |
| | | |
| | | public void connectionErrorOccurred(final boolean isDisconnectNotification, |
| | | final ErrorResultException error) |
| | | { |
| | | // Remove this connection from the pool if its in there. If not, |
| | | // just ignore and wait for the user to close and we can deal with it |
| | | // there. |
| | | if (pool.remove(this)) |
| | | { |
| | | numConnections--; |
| | | connection.removeConnectionEventListener(this); |
| | | |
| | | // FIXME: should still close the connection, but we need to be |
| | | // careful that users of the pooled connection get a sensible |
| | | // error if they continue to use it (i.e. not an NPE or ISE). |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Connection error occured and removed from pool: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | public void connectionReceivedUnsolicitedNotification( |
| | | final ExtendedResult notification) |
| | | { |
| | | // Ignore |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> delete(final DeleteRequest request, |
| | | final ResultHandler<Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | |
| | | |
| | | |
| | | |
| | | public <R extends Result> FutureResult<R> extendedRequest( |
| | | ExtendedRequest<R> request, ResultHandler<? super R> handler) |
| | | public FutureResult<Result> delete(final DeleteRequest request, |
| | | final ResultHandler<Result> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.delete(request, resultHandler, |
| | | intermediateResponseHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | public <R extends ExtendedResult> FutureResult<R> extendedRequest( |
| | | final ExtendedRequest<R> request, final ResultHandler<? super R> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> modify(ModifyRequest request, |
| | | ResultHandler<Result> handler) |
| | | public <R extends ExtendedResult> FutureResult<R> extendedRequest( |
| | | final ExtendedRequest<R> request, |
| | | final ResultHandler<? super R> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.modify(request, handler); |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> modifyDN(ModifyDNRequest request, |
| | | ResultHandler<Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.modifyDN(request, handler); |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> search(SearchRequest request, |
| | | ResultHandler<Result> resultHandler, |
| | | SearchResultHandler searchResulthandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.search(request, resultHandler, |
| | | searchResulthandler); |
| | | return connection.extendedRequest(request, resultHandler, |
| | | intermediateResponseHandler); |
| | | } |
| | | |
| | | |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<SearchResultEntry> readEntry(DN name, |
| | | Collection<String> attributeDescriptions, |
| | | ResultHandler<? super SearchResultEntry> resultHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | public Connection getSynchronousConnection() |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.readEntry(name, attributeDescriptions, |
| | | resultHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<SearchResultEntry> searchSingleEntry( |
| | | SearchRequest request, |
| | | ResultHandler<? super SearchResultEntry> resultHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.searchSingleEntry(request, resultHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<RootDSE> readRootDSE( |
| | | ResultHandler<RootDSE> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.readRootDSE(handler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<Schema> readSchemaForEntry(DN name, |
| | | ResultHandler<Schema> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.readSchemaForEntry(name, handler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<Schema> readSchema(DN name, |
| | | ResultHandler<Schema> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.readSchema(name, handler); |
| | | } |
| | | |
| | | |
| | | |
| | | public void addConnectionEventListener( |
| | | ConnectionEventListener listener) throws IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | public void removeConnectionEventListener( |
| | | ConnectionEventListener listener) throws NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return new SynchronousConnection(this); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | |
| | | |
| | | public void connectionReceivedUnsolicitedNotification( |
| | | GenericExtendedResult notification) |
| | | public FutureResult<Result> modify(final ModifyRequest request, |
| | | final ResultHandler<Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | // Ignore |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.modify(request, handler); |
| | | } |
| | | |
| | | |
| | | |
| | | public void connectionErrorOccurred( |
| | | boolean isDisconnectNotification, ErrorResultException error) |
| | | public FutureResult<Result> modify(final ModifyRequest request, |
| | | final ResultHandler<Result> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | // Remove this connection from the pool if its in there. If not, |
| | | // just |
| | | // ignore and wait for the user to close and we can deal with it |
| | | // there. |
| | | if (pool.remove(this)) |
| | | if (isClosed()) |
| | | { |
| | | numConnections--; |
| | | connection.removeConnectionEventListener(this); |
| | | |
| | | // FIXME: should still close the connection, but we need to be |
| | | // careful that users of the pooled connection get a sensible |
| | | // error if they continue to use it (i.e. not an NPE or ISE). |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .warning(String |
| | | .format( |
| | | "Connection error occured and removed from pool: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.modify(request, resultHandler, |
| | | intermediateResponseHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> modifyDN(final ModifyDNRequest request, |
| | | final ResultHandler<Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.modifyDN(request, handler); |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> modifyDN(final ModifyDNRequest request, |
| | | final ResultHandler<Result> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.modifyDN(request, resultHandler, |
| | | intermediateResponseHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<SearchResultEntry> readEntry(final DN name, |
| | | final Collection<String> attributeDescriptions, |
| | | final ResultHandler<? super SearchResultEntry> resultHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.readEntry(name, attributeDescriptions, resultHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<RootDSE> readRootDSE( |
| | | final ResultHandler<RootDSE> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.readRootDSE(handler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<Schema> readSchema(final DN name, |
| | | final ResultHandler<Schema> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.readSchema(name, handler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<Schema> readSchemaForEntry(final DN name, |
| | | final ResultHandler<Schema> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.readSchemaForEntry(name, handler); |
| | | } |
| | | |
| | | |
| | | |
| | | public void removeConnectionEventListener( |
| | | final ConnectionEventListener listener) throws NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> search(final SearchRequest request, |
| | | final ResultHandler<Result> resultHandler, |
| | | final SearchResultHandler searchResulthandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.search(request, resultHandler, searchResulthandler); |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<Result> search(final SearchRequest request, |
| | | final ResultHandler<Result> resultHandler, |
| | | final SearchResultHandler searchResulthandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.search(request, resultHandler, searchResulthandler, |
| | | intermediateResponseHandler); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public FutureResult<SearchResultEntry> searchSingleEntry( |
| | | final SearchRequest request, |
| | | final ResultHandler<? super SearchResultEntry> resultHandler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | return connection.searchSingleEntry(request, resultHandler); |
| | | } |
| | | } |
| | | |
| | |
| | | private class ReconnectHandler implements |
| | | ResultHandler<AsynchronousConnection> |
| | | { |
| | | public void handleErrorResult(ErrorResultException error) |
| | | public void handleErrorResult(final ErrorResultException error) |
| | | { |
| | | // The reconnect failed. Fail the connect attempt. |
| | | numConnections--; |
| | |
| | | } |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .warning(String |
| | | .format( |
| | | "Reconnect failed. Failed all pending futures: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Reconnect failed. Failed all pending futures: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | public void handleResult(AsynchronousConnection connection) |
| | | public void handleResult(final AsynchronousConnection connection) |
| | | { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Reconnect succeded. " |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. " |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | synchronized (pool) |
| | | { |
| | |
| | | |
| | | |
| | | |
| | | // Future used for waiting for pooled connections to become available. |
| | | private static final class FuturePooledConnection extends |
| | | AbstractFutureResult<AsynchronousConnection> |
| | | { |
| | | private FuturePooledConnection( |
| | | ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | super(handler); |
| | | } |
| | | private final ConnectionFactory connectionFactory; |
| | | |
| | | private volatile int numConnections; |
| | | |
| | | private final int poolSize; |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getRequestID() |
| | | { |
| | | return -1; |
| | | } |
| | | // FIXME: should use a better collection than this - CLQ? |
| | | private final Queue<AsynchronousConnection> pool; |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | private void releaseConnection(AsynchronousConnection connection) |
| | | { |
| | | // See if there waiters pending. |
| | | for (;;) |
| | | { |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | FuturePooledConnection future = pendingFutures.poll(); |
| | | |
| | | if (future == null) |
| | | { |
| | | // No waiters - so drop out and add connection to pool. |
| | | break; |
| | | } |
| | | |
| | | future.handleResult(pooledConnection); |
| | | |
| | | if (!future.isCancelled()) |
| | | { |
| | | // The future was not cancelled and the connection was |
| | | // accepted. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, pool |
| | | .size(), pendingFutures.size())); |
| | | } |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // No waiters. Put back in pool. |
| | | pool.offer(connection); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "Connection released to pool. numConnections: %d, " |
| | | + "poolSize: %d, pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures; |
| | | |
| | | |
| | | |
| | |
| | | * connections created using the provided connection factory. |
| | | * |
| | | * @param connectionFactory |
| | | * The connection factory to use for creating new |
| | | * connections. |
| | | * The connection factory to use for creating new connections. |
| | | * @param poolSize |
| | | * The maximum size of the connection pool. |
| | | */ |
| | | ConnectionPool(ConnectionFactory connectionFactory, int poolSize) |
| | | ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize) |
| | | { |
| | | this.connectionFactory = connectionFactory; |
| | | this.poolSize = poolSize; |
| | |
| | | |
| | | |
| | | |
| | | @Override |
| | | public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection( |
| | | ResultHandler<AsynchronousConnection> handler) |
| | | final ResultHandler<AsynchronousConnection> handler) |
| | | { |
| | | // This entire method is synchronized to ensure new connects are |
| | | // done |
| | | // synchronously to avoid the "pending connect" case. |
| | | // done synchronously to avoid the "pending connect" case. |
| | | AsynchronousConnection conn; |
| | | synchronized (pool) |
| | | { |
| | |
| | | { |
| | | // We reached max # of conns so wait for a connection to |
| | | // become available. |
| | | FuturePooledConnection future = new FuturePooledConnection( |
| | | final FuturePooledConnection future = new FuturePooledConnection( |
| | | handler); |
| | | pendingFutures.add(future); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "No connections available. Wait-listed" |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "No connections available. Wait-listed" |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | |
| | | return future; |
| | |
| | | numConnections++; |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "New connection established and aquired. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "New connection established and aquired. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | catch (ErrorResultException e) |
| | | catch (final ErrorResultException e) |
| | | { |
| | | if (handler != null) |
| | | { |
| | |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>(e); |
| | | } |
| | | catch (InterruptedException e) |
| | | catch (final InterruptedException e) |
| | | { |
| | | ErrorResultException error = new ErrorResultException(Responses |
| | | final ErrorResultException error = new ErrorResultException(Responses |
| | | .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); |
| | | if (handler != null) |
| | | { |
| | |
| | | { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection aquired from pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "Connection aquired from pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | final PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | conn); |
| | | if (handler != null) |
| | | { |
| | | handler.handleResult(pooledConnection); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>( |
| | | pooledConnection); |
| | | return new CompletedFutureResult<AsynchronousConnection>(pooledConnection); |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | private void releaseConnection(final AsynchronousConnection connection) |
| | | { |
| | | // See if there waiters pending. |
| | | for (;;) |
| | | { |
| | | final PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | final FuturePooledConnection future = pendingFutures.poll(); |
| | | |
| | | if (future == null) |
| | | { |
| | | // No waiters - so drop out and add connection to pool. |
| | | break; |
| | | } |
| | | |
| | | future.handleResult(pooledConnection); |
| | | |
| | | if (!future.isCancelled()) |
| | | { |
| | | // The future was not cancelled and the connection was |
| | | // accepted. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "Connection released and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, pool.size(), |
| | | pendingFutures.size())); |
| | | } |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // No waiters. Put back in pool. |
| | | pool.offer(connection); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "Connection released to pool. numConnections: %d, " |
| | | + "poolSize: %d, pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | } |