| | |
| | | |
| | | |
| | | import java.util.Collection; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.logging.Level; |
| | | |
| | | import org.opends.sdk.requests.*; |
| | |
| | | import com.sun.opends.sdk.util.AsynchronousFutureResult; |
| | | import com.sun.opends.sdk.util.CompletedFutureResult; |
| | | import com.sun.opends.sdk.util.StaticUtils; |
| | | import com.sun.opends.sdk.util.Validator; |
| | | |
| | | |
| | | |
| | |
| | | */ |
| | | final class ConnectionPool extends AbstractConnectionFactory |
| | | { |
| | | // Future used for waiting for pooled connections to become available. |
| | | private static final class FuturePooledConnection extends |
| | | AsynchronousFutureResult<AsynchronousConnection> |
| | | |
| | | /** |
| | | * This result handler is invoked when an attempt to add a new connection to |
| | | * the pool completes. |
| | | */ |
| | | private final class ConnectionResultHandler implements |
| | | ResultHandler<AsynchronousConnection> |
| | | { |
| | | private FuturePooledConnection( |
| | | final ResultHandler<? super AsynchronousConnection> handler) |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void handleErrorResult(final ErrorResultException error) |
| | | { |
| | | super(handler); |
| | | // Connection attempt failed, so decrease the pool size. |
| | | currentPoolSize.release(); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.fine(String.format( |
| | | "Connection attempt failed: " + error.getMessage() |
| | | + " currentPoolSize=%d, poolSize=%d", |
| | | poolSize - currentPoolSize.availablePermits(), poolSize)); |
| | | } |
| | | |
| | | QueueElement holder; |
| | | synchronized (queue) |
| | | { |
| | | if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) |
| | | { |
| | | // No waiting futures. |
| | | return; |
| | | } |
| | | else |
| | | { |
| | | holder = queue.removeFirst(); |
| | | } |
| | | } |
| | | |
| | | // There was waiting future, so close it. |
| | | holder.getWaitingFuture().handleErrorResult(error); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void handleResult(final AsynchronousConnection connection) |
| | | { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.fine(String.format( |
| | | "Connection attempt succeeded: " |
| | | + " currentPoolSize=%d, poolSize=%d", |
| | | poolSize - currentPoolSize.availablePermits(), poolSize)); |
| | | } |
| | | |
| | | publishConnection(connection); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private final class PooledConnectionWapper implements AsynchronousConnection, |
| | | ConnectionEventListener |
| | | /** |
| | | * A pooled connection is passed to the client. It wraps an underlying |
| | | * "pooled" connection obtained from the underlying factory and lasts until |
| | | * the client application closes this connection. More specifically, pooled |
| | | * connections are not actually stored in the internal queue. |
| | | */ |
| | | private final class PooledConnection implements AsynchronousConnection |
| | | { |
| | | // Connection event listeners registed against this pooled connection should |
| | | // have the same life time as the pooled connection. |
| | | private final List<ConnectionEventListener> listeners = |
| | | new CopyOnWriteArrayList<ConnectionEventListener>(); |
| | | |
| | | private final AsynchronousConnection connection; |
| | | |
| | | private volatile boolean isClosed; |
| | | private final AtomicBoolean isClosed = new AtomicBoolean(false); |
| | | |
| | | |
| | | |
| | | private PooledConnectionWapper(final AsynchronousConnection connection) |
| | | PooledConnection(final AsynchronousConnection connection) |
| | | { |
| | | this.connection = connection; |
| | | this.connection.addConnectionEventListener(this); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Void> abandon(final AbandonRequest request) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> add(final AddRequest request, |
| | | final ResultHandler<? super Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> add(final AddRequest request, |
| | | final ResultHandler<? super Result> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void addConnectionEventListener( |
| | | final ConnectionEventListener listener) throws IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | Validator.ensureNotNull(listener); |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | listeners.add(listener); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<BindResult> bind(final BindRequest request, |
| | | final ResultHandler<? super BindResult> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<BindResult> bind(final BindRequest request, |
| | | final ResultHandler<? super BindResult> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | synchronized (pool) |
| | | if (!isClosed.compareAndSet(false, true)) |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | isClosed = true; |
| | | // Already closed. |
| | | return; |
| | | } |
| | | |
| | | // Don't put invalid connections back in the pool. |
| | | if (connection.isValid()) |
| | | // Don't put invalid connections back in the pool. |
| | | if (connection.isValid()) |
| | | { |
| | | publishConnection(connection); |
| | | } |
| | | else |
| | | { |
| | | // The connection may have been disconnected by the remote server, but |
| | | // the server may still be available. In order to avoid leaving pending |
| | | // futures hanging indefinitely, we should try to reconnect immediately. |
| | | |
| | | // Close the dead connection. |
| | | connection.close(); |
| | | |
| | | // Try to get a new connection to replace it. |
| | | factory.getAsynchronousConnection(connectionResultHandler); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | releaseConnection(connection); |
| | | return; |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Connection no longer valid. " |
| | | + "currentPoolSize=%d, poolSize=%d", |
| | | poolSize - currentPoolSize.availablePermits(), poolSize)); |
| | | } |
| | | } |
| | | |
| | | // Connection is no longer valid. Close outside of lock |
| | | connection.removeConnectionEventListener(this); |
| | | connection.close(); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Dead connection released and closed. " |
| | | + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Reconnect attempt starting. " |
| | | + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | connectionFactory.getAsynchronousConnection(new ReconnectHandler()); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void close(final UnbindRequest request, final String reason) |
| | | throws NullPointerException |
| | | { |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<CompareResult> compare(final CompareRequest request, |
| | | final ResultHandler<? super CompareResult> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<CompareResult> compare(final CompareRequest request, |
| | | final ResultHandler<? super CompareResult> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | |
| | | |
| | | |
| | | |
| | | public void handleConnectionClosed() |
| | | { |
| | | // Ignore - we intercept close via the close method. |
| | | } |
| | | |
| | | |
| | | |
| | | public void handleConnectionError(final boolean isDisconnectNotification, |
| | | final ErrorResultException error) |
| | | { |
| | | // Remove this connection from the pool if its in there. If not, |
| | | // just ignore and wait for the user to close and we can deal with it |
| | | // there. |
| | | if (pool.remove(this)) |
| | | { |
| | | numConnections--; |
| | | connection.removeConnectionEventListener(this); |
| | | |
| | | // FIXME: should still close the connection, but we need to be |
| | | // careful that users of the pooled connection get a sensible |
| | | // error if they continue to use it (i.e. not an NPE or ISE). |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Connection error occured and removed from pool: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | public void handleUnsolicitedNotification(final ExtendedResult notification) |
| | | { |
| | | // Ignore |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> delete(final DeleteRequest request, |
| | | final ResultHandler<? super Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> delete(final DeleteRequest request, |
| | | final ResultHandler<? super Result> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public <R extends ExtendedResult> FutureResult<R> extendedRequest( |
| | | final ExtendedRequest<R> request, final ResultHandler<? super R> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public <R extends ExtendedResult> FutureResult<R> extendedRequest( |
| | | final ExtendedRequest<R> request, |
| | | final ResultHandler<? super R> resultHandler, |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public Connection getSynchronousConnection() |
| | | { |
| | | return new SynchronousConnection(this); |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean isClosed() |
| | | { |
| | | return isClosed; |
| | | return isClosed.get(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean isValid() |
| | | { |
| | | return !isClosed && connection.isValid(); |
| | | return connection.isValid() && !isClosed(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> modify(final ModifyRequest request, |
| | | final ResultHandler<? super Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> modify(final ModifyRequest request, |
| | | final ResultHandler<? super Result> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> modifyDN(final ModifyDNRequest request, |
| | | final ResultHandler<? super Result> handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> modifyDN(final ModifyDNRequest request, |
| | | final ResultHandler<? super Result> resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<SearchResultEntry> readEntry(final DN name, |
| | | final Collection<String> attributeDescriptions, |
| | | final ResultHandler<? super SearchResultEntry> resultHandler) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<RootDSE> readRootDSE( |
| | | final ResultHandler<? super RootDSE> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Schema> readSchema(final DN name, |
| | | final ResultHandler<? super Schema> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Schema> readSchemaForEntry(final DN name, |
| | | final ResultHandler<? super Schema> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void removeConnectionEventListener( |
| | | final ConnectionEventListener listener) throws NullPointerException |
| | | { |
| | | Validator.ensureNotNull(listener); |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | listeners.remove(listener); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> search(final SearchRequest request, |
| | | final SearchResultHandler handler) |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<Result> search(final SearchRequest request, |
| | | final SearchResultHandler resultHandler, |
| | | final IntermediateResponseHandler intermediateResponseHandler) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<SearchResultEntry> searchSingleEntry( |
| | | final SearchRequest request, |
| | | final ResultHandler<? super SearchResultEntry> resultHandler) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | StringBuilder builder = new StringBuilder(); |
| | | final StringBuilder builder = new StringBuilder(); |
| | | builder.append("PooledConnection("); |
| | | builder.append(connection); |
| | | builder.append(')'); |
| | |
| | | |
| | | |
| | | |
| | | private class ReconnectHandler implements |
| | | ResultHandler<AsynchronousConnection> |
| | | /** |
| | | * A queue element is either a pending connection request future awaiting an |
| | | * {@code AsynchronousConnection} or it is an unused |
| | | * {@code AsynchronousConnection} awaiting a connection request. |
| | | */ |
| | | private static final class QueueElement |
| | | { |
| | | public void handleErrorResult(final ErrorResultException error) |
| | | { |
| | | // The reconnect failed. Fail the connect attempt. |
| | | numConnections--; |
| | | // The reconnect failed. The underlying connection factory |
| | | // probably went |
| | | // down. Just fail all pending futures |
| | | synchronized (pool) |
| | | { |
| | | while (!pendingFutures.isEmpty()) |
| | | { |
| | | pendingFutures.poll().handleErrorResult(error); |
| | | } |
| | | } |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Reconnect failed. Failed all pending futures: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | private final Object value; |
| | | |
| | | |
| | | |
| | | QueueElement(final AsynchronousConnection connection) |
| | | { |
| | | this.value = connection; |
| | | } |
| | | |
| | | |
| | | |
| | | public void handleResult(final AsynchronousConnection connection) |
| | | QueueElement(final ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler); |
| | | } |
| | | |
| | | |
| | | |
| | | AsynchronousConnection getWaitingConnection() |
| | | { |
| | | if (value instanceof AsynchronousConnection) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. " |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | return (AsynchronousConnection) value; |
| | | } |
| | | synchronized (pool) |
| | | else |
| | | { |
| | | releaseConnection(connection); |
| | | throw new IllegalStateException(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture() |
| | | { |
| | | if (value instanceof AsynchronousFutureResult) |
| | | { |
| | | return (AsynchronousFutureResult<AsynchronousConnection>) value; |
| | | } |
| | | else |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | boolean isWaitingFuture() |
| | | { |
| | | return value instanceof AsynchronousFutureResult; |
| | | } |
| | | |
| | | |
| | | |
| | | public String toString() |
| | | { |
| | | return String.valueOf(value); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private final ConnectionFactory connectionFactory; |
| | | // Guarded by queue. |
| | | private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); |
| | | |
| | | private volatile int numConnections; |
| | | private final ConnectionFactory factory; |
| | | |
| | | private final int poolSize; |
| | | |
| | | // FIXME: should use a better collection than this - CLQ? |
| | | private final Queue<AsynchronousConnection> pool; |
| | | private final Semaphore currentPoolSize; |
| | | |
| | | private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures; |
| | | private final ResultHandler<AsynchronousConnection> connectionResultHandler = |
| | | new ConnectionResultHandler(); |
| | | |
| | | |
| | | |
| | |
| | | * Creates a new connection pool which will maintain {@code poolSize} |
| | | * connections created using the provided connection factory. |
| | | * |
| | | * @param connectionFactory |
| | | * @param factory |
| | | * The connection factory to use for creating new connections. |
| | | * @param poolSize |
| | | * The maximum size of the connection pool. |
| | | */ |
| | | ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize) |
| | | ConnectionPool(final ConnectionFactory factory, final int poolSize) |
| | | { |
| | | this.connectionFactory = connectionFactory; |
| | | this.factory = factory; |
| | | this.poolSize = poolSize; |
| | | this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>(); |
| | | this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>(); |
| | | } |
| | | |
| | | |
| | | |
| | | @Override |
| | | public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection( |
| | | final ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | // This entire method is synchronized to ensure new connects are |
| | | // done synchronously to avoid the "pending connect" case. |
| | | AsynchronousConnection conn; |
| | | synchronized (pool) |
| | | { |
| | | // Check to see if we have a connection in the pool |
| | | conn = pool.poll(); |
| | | if (conn == null) |
| | | { |
| | | // Pool was empty. Maybe a new connection if pool size is not |
| | | // reached |
| | | if (numConnections >= poolSize) |
| | | { |
| | | // We reached max # of conns so wait for a connection to |
| | | // become available. |
| | | final FuturePooledConnection future = new FuturePooledConnection( |
| | | handler); |
| | | pendingFutures.add(future); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "No connections available. Wait-listed" |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | |
| | | return future; |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (conn == null) |
| | | { |
| | | try |
| | | { |
| | | // We can create a new connection. |
| | | conn = connectionFactory.getAsynchronousConnection(null).get(); |
| | | numConnections++; |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "New connection established and aquired. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | catch (final ErrorResultException e) |
| | | { |
| | | if (handler != null) |
| | | { |
| | | handler.handleErrorResult(e); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>(e); |
| | | } |
| | | catch (final InterruptedException e) |
| | | { |
| | | final ErrorResultException error = new ErrorResultException(Responses |
| | | .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); |
| | | if (handler != null) |
| | | { |
| | | handler.handleErrorResult(error); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>(error); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "Connection aquired from pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | |
| | | final PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | conn); |
| | | if (handler != null) |
| | | { |
| | | handler.handleResult(pooledConnection); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>(pooledConnection); |
| | | this.currentPoolSize = new Semaphore(poolSize); |
| | | } |
| | | |
| | | |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public FutureResult<AsynchronousConnection> getAsynchronousConnection( |
| | | final ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | QueueElement holder; |
| | | synchronized (queue) |
| | | { |
| | | if (queue.isEmpty() || queue.getFirst().isWaitingFuture()) |
| | | { |
| | | holder = new QueueElement(handler); |
| | | queue.add(holder); |
| | | } |
| | | else |
| | | { |
| | | holder = queue.removeFirst(); |
| | | } |
| | | } |
| | | |
| | | if (!holder.isWaitingFuture()) |
| | | { |
| | | // There was a completed connection attempt. |
| | | final AsynchronousConnection connection = holder.getWaitingConnection(); |
| | | final PooledConnection pooledConnection = new PooledConnection(connection); |
| | | if (handler != null) |
| | | { |
| | | handler.handleResult(pooledConnection); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>(pooledConnection); |
| | | } |
| | | else |
| | | { |
| | | // Grow the pool if needed. |
| | | final FutureResult<AsynchronousConnection> future = holder |
| | | .getWaitingFuture(); |
| | | if (!future.isDone() && currentPoolSize.tryAcquire()) |
| | | { |
| | | factory.getAsynchronousConnection(connectionResultHandler); |
| | | } |
| | | return future; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | final StringBuilder builder = new StringBuilder(); |
| | | builder.append("ConnectionPool("); |
| | | builder.append(String.valueOf(connectionFactory)); |
| | | builder.append(String.valueOf(factory)); |
| | | builder.append(','); |
| | | builder.append(poolSize); |
| | | builder.append(')'); |
| | |
| | | |
| | | |
| | | |
| | | private void releaseConnection(final AsynchronousConnection connection) |
| | | private void publishConnection(final AsynchronousConnection connection) |
| | | { |
| | | // See if there waiters pending. |
| | | for (;;) |
| | | QueueElement holder; |
| | | synchronized (queue) |
| | | { |
| | | final PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | final FuturePooledConnection future = pendingFutures.poll(); |
| | | |
| | | if (future == null) |
| | | if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) |
| | | { |
| | | // No waiters - so drop out and add connection to pool. |
| | | break; |
| | | } |
| | | |
| | | future.handleResult(pooledConnection); |
| | | |
| | | if (!future.isCancelled()) |
| | | { |
| | | // The future was not cancelled and the connection was |
| | | // accepted. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "Connection released and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, pool.size(), |
| | | pendingFutures.size())); |
| | | } |
| | | holder = new QueueElement(connection); |
| | | queue.add(holder); |
| | | return; |
| | | } |
| | | else |
| | | { |
| | | holder = queue.removeFirst(); |
| | | } |
| | | } |
| | | |
| | | // No waiters. Put back in pool. |
| | | pool.offer(connection); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "Connection released to pool. numConnections: %d, " |
| | | + "poolSize: %d, pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | // There was waiting future, so close it. |
| | | final PooledConnection pooledConnection = new PooledConnection(connection); |
| | | holder.getWaitingFuture().handleResult(pooledConnection); |
| | | } |
| | | } |