| | |
| | | import java.util.Collection; |
| | | import java.util.Stack; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.logging.Level; |
| | | |
| | | import org.opends.sdk.requests.*; |
| | | import org.opends.sdk.responses.*; |
| | | import org.opends.sdk.schema.Schema; |
| | | |
| | | import com.sun.opends.sdk.util.AbstractFutureResult; |
| | | import com.sun.opends.sdk.util.CompletedFutureResult; |
| | | import com.sun.opends.sdk.util.FutureResultTransformer; |
| | | import com.sun.opends.sdk.util.StaticUtils; |
| | | |
| | | |
| | |
| | | /** |
| | | * A simple connection pool implementation. |
| | | */ |
| | | public class ConnectionPool extends |
| | | public final class ConnectionPool extends |
| | | AbstractConnectionFactory<AsynchronousConnection> |
| | | { |
| | | private final ConnectionFactory<?> connectionFactory; |
| | |
| | | // FIXME: should use a better collection than this - CLQ? |
| | | private final Stack<AsynchronousConnection> pool; |
| | | |
| | | private final ConcurrentLinkedQueue<PendingResultFuture> pendingFutures; |
| | | private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures; |
| | | |
| | | private final Object lock = new Object(); |
| | | |
| | | |
| | | |
| | | private final class FutureNewConnection |
| | | extends |
| | | FutureResultTransformer<AsynchronousConnection, AsynchronousConnection> |
| | | { |
| | | private FutureNewConnection( |
| | | ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | super(handler); |
| | | } |
| | | |
| | | |
| | | |
| | | protected AsynchronousConnection transformResult( |
| | | AsynchronousConnection result) throws ErrorResultException |
| | | { |
| | | return new PooledConnectionWapper(result); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private final class PooledConnectionWapper implements |
| | | AsynchronousConnection, ConnectionEventListener |
| | | { |
| | |
| | | return; |
| | | } |
| | | |
| | | // See if there waiters pending |
| | | PendingResultFuture future = pendingFutures.poll(); |
| | | if (future != null) |
| | | // See if there waiters pending. |
| | | for (;;) |
| | | { |
| | | FuturePooledConnection future = pendingFutures.poll(); |
| | | |
| | | if (future == null) |
| | | { |
| | | // No waiters - so drop out and add connection to pool. |
| | | break; |
| | | } |
| | | |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | future.connection(pooledConnection); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | future.handleResult(pooledConnection); |
| | | |
| | | if (!future.isCancelled()) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released to pool and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | // The future was not cancelled and the connection was |
| | | // accepted. |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released to pool and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | return; |
| | | } |
| | | return; |
| | | } |
| | | |
| | | // No waiters. Put back in pool. |
| | |
| | | |
| | | |
| | | |
| | | private static final class CompletedResultFuture implements |
| | | FutureResult<AsynchronousConnection> |
| | | // Future used for waiting for pooled connections to become available. |
| | | private static final class FuturePooledConnection extends |
| | | AbstractFutureResult<AsynchronousConnection> |
| | | { |
| | | private final PooledConnectionWapper connection; |
| | | |
| | | |
| | | |
| | | private CompletedResultFuture(PooledConnectionWapper connection) |
| | | { |
| | | this.connection = connection; |
| | | } |
| | | |
| | | |
| | | |
| | | public boolean cancel(boolean mayInterruptIfRunning) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | |
| | | |
| | | public AsynchronousConnection get() throws InterruptedException, |
| | | ErrorResultException |
| | | { |
| | | return connection; |
| | | } |
| | | |
| | | |
| | | |
| | | public AsynchronousConnection get(long timeout, TimeUnit unit) |
| | | throws InterruptedException, TimeoutException, |
| | | ErrorResultException |
| | | { |
| | | return connection; |
| | | } |
| | | |
| | | |
| | | |
| | | public boolean isCancelled() |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | |
| | | |
| | | public boolean isDone() |
| | | { |
| | | return true; |
| | | } |
| | | |
| | | |
| | | |
| | | public int getRequestID() |
| | | { |
| | | return -1; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private final class PendingResultFuture implements |
| | | FutureResult<AsynchronousConnection> |
| | | { |
| | | private volatile boolean isCancelled; |
| | | |
| | | private volatile PooledConnectionWapper connection; |
| | | |
| | | private volatile ErrorResultException err; |
| | | |
| | | private final ResultHandler<? super AsynchronousConnection> handler; |
| | | |
| | | private final CountDownLatch latch = new CountDownLatch(1); |
| | | |
| | | |
| | | |
| | | private PendingResultFuture( |
| | | private FuturePooledConnection( |
| | | ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | this.handler = handler; |
| | | super(handler); |
| | | } |
| | | |
| | | |
| | | |
| | | public synchronized boolean cancel(boolean mayInterruptIfRunning) |
| | | { |
| | | return pendingFutures.remove(this) && (isCancelled = true); |
| | | } |
| | | |
| | | |
| | | |
| | | public AsynchronousConnection get() throws InterruptedException, |
| | | ErrorResultException |
| | | { |
| | | latch.await(); |
| | | if (err != null) |
| | | { |
| | | throw err; |
| | | } |
| | | return connection; |
| | | } |
| | | |
| | | |
| | | |
| | | public AsynchronousConnection get(long timeout, TimeUnit unit) |
| | | throws InterruptedException, TimeoutException, |
| | | ErrorResultException |
| | | { |
| | | latch.await(timeout, unit); |
| | | if (err != null) |
| | | { |
| | | throw err; |
| | | } |
| | | return connection; |
| | | } |
| | | |
| | | |
| | | |
| | | public synchronized boolean isCancelled() |
| | | { |
| | | return isCancelled; |
| | | } |
| | | |
| | | |
| | | |
| | | public boolean isDone() |
| | | { |
| | | return latch.getCount() == 0; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getRequestID() |
| | | { |
| | | return -1; |
| | | } |
| | | |
| | | |
| | | |
| | | private void connection(PooledConnectionWapper connection) |
| | | { |
| | | this.connection = connection; |
| | | if (handler != null) |
| | | { |
| | | handler.handleResult(connection); |
| | | } |
| | | latch.countDown(); |
| | | } |
| | | |
| | | |
| | | |
| | | private void error(ErrorResultException e) |
| | | { |
| | | this.err = e; |
| | | if (handler != null) |
| | | { |
| | | handler.handleErrorResult(e); |
| | | } |
| | | latch.countDown(); |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | /** |
| | | * Creates a new connection pool which will maintain {@code poolSize} |
| | | * connections created using the provided connection factory. |
| | | * |
| | | * |
| | | * @param connectionFactory |
| | | * The connection factory to use for creating new |
| | | * connections. |
| | |
| | | this.connectionFactory = connectionFactory; |
| | | this.poolSize = poolSize; |
| | | this.pool = new Stack<AsynchronousConnection>(); |
| | | this.pendingFutures = new ConcurrentLinkedQueue<PendingResultFuture>(); |
| | | } |
| | | |
| | | |
| | | |
| | | private final class WrapResultHandler implements |
| | | ResultHandler<AsynchronousConnection> |
| | | { |
| | | private final PendingResultFuture future; |
| | | |
| | | |
| | | |
| | | private WrapResultHandler(PendingResultFuture future) |
| | | { |
| | | this.future = future; |
| | | } |
| | | |
| | | |
| | | |
| | | public void handleResult(AsynchronousConnection connection) |
| | | { |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | future.connection(pooledConnection); |
| | | } |
| | | |
| | | |
| | | |
| | | public void handleErrorResult(ErrorResultException error) |
| | | { |
| | | future.error(error); |
| | | } |
| | | this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>(); |
| | | } |
| | | |
| | | |
| | |
| | | { |
| | | handler.handleResult(pooledConnection); |
| | | } |
| | | return new CompletedResultFuture(pooledConnection); |
| | | return new CompletedFutureResult<AsynchronousConnection>( |
| | | pooledConnection); |
| | | } |
| | | |
| | | PendingResultFuture pendingFuture = new PendingResultFuture( |
| | | handler); |
| | | // Pool was empty. Maybe a new connection if pool size is not |
| | | // reached |
| | | if (numConnections < poolSize) |
| | | { |
| | | // We can create a new connection. |
| | | numConnections++; |
| | | WrapResultHandler wrapHandler = new WrapResultHandler( |
| | | pendingFuture); |
| | | connectionFactory.getAsynchronousConnection(wrapHandler); |
| | | |
| | | FutureNewConnection future = new FutureNewConnection(handler); |
| | | future.setFutureResult(connectionFactory |
| | | .getAsynchronousConnection(future)); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | |
| | | return future; |
| | | } |
| | | else |
| | | { |
| | | // Have to wait |
| | | pendingFutures.add(pendingFuture); |
| | | // Pool is full so wait for a connection to become available. |
| | | FuturePooledConnection future = new FuturePooledConnection( |
| | | handler); |
| | | pendingFutures.add(future); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | } |
| | | |
| | | return pendingFuture; |
| | | return future; |
| | | } |
| | | } |
| | | } |
| | | } |