| | |
| | | |
| | | import java.util.Collection; |
| | | import java.util.Stack; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.logging.Level; |
| | | |
| | | import org.opends.sdk.requests.*; |
| | |
| | | private final int poolSize; |
| | | |
| | | // FIXME: should use a better collection than this - CLQ? |
| | | private final Stack<AsynchronousConnection> pool; |
| | | private final Queue<AsynchronousConnection> pool; |
| | | |
| | | private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures; |
| | | |
| | | private final Object lock = new Object(); |
| | | |
| | | |
| | | |
| | | private final class FutureNewConnection |
| | | extends |
| | | FutureResultTransformer<AsynchronousConnection, AsynchronousConnection> |
| | | { |
| | | private FutureNewConnection( |
| | | ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | super(handler); |
| | | } |
| | | |
| | | |
| | | |
| | | protected AsynchronousConnection transformResult( |
| | | AsynchronousConnection result) throws ErrorResultException |
| | | { |
| | | return new PooledConnectionWapper(result); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private final class PooledConnectionWapper implements |
| | | AsynchronousConnection, ConnectionEventListener |
| | | { |
| | | private AsynchronousConnection connection; |
| | | |
| | | private final AsynchronousConnection connection; |
| | | private volatile boolean isClosed; |
| | | |
| | | |
| | | private PooledConnectionWapper(AsynchronousConnection connection) |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | |
| | | public void close() |
| | | { |
| | | synchronized (lock) |
| | | synchronized (pool) |
| | | { |
| | | try |
| | | if(isClosed) |
| | | { |
| | | // Don't put closed connections back in the pool. |
| | | if (connection.isClosed()) |
| | | { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Dead connection released to pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | return; |
| | | } |
| | | return; |
| | | } |
| | | isClosed = true; |
| | | |
| | | // Don't put closed connections back in the pool. |
| | | if (!connection.isValid()) |
| | | { |
| | | numConnections--; |
| | | } |
| | | else |
| | | { |
| | | // See if there waiters pending. |
| | | for (;;) |
| | | { |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | FuturePooledConnection future = pendingFutures.poll(); |
| | | |
| | | if (future == null) |
| | |
| | | break; |
| | | } |
| | | |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | connection); |
| | | future.handleResult(pooledConnection); |
| | | |
| | | if (!future.isCancelled()) |
| | |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released to pool and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | "Connection released to pool and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // No waiters. Put back in pool. |
| | | pool.push(connection); |
| | | pool.offer(connection); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released to pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | "Connection released to pool and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | // Null out the underlying connection to prevent further use. |
| | | connection = null; |
| | | return; |
| | | } |
| | | } |
| | | |
| | | // Connection is no longer valid. Close outside of lock |
| | | connection.removeConnectionEventListener(this); |
| | | connection.close(); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Dead connection released to pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | throws UnsupportedOperationException, IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | ResultHandler<RootDSE> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | ResultHandler<Schema> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | ResultHandler<Schema> handler) |
| | | throws UnsupportedOperationException, IllegalStateException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | ConnectionEventListener listener) throws IllegalStateException, |
| | | NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | public void removeConnectionEventListener( |
| | | ConnectionEventListener listener) throws NullPointerException |
| | | { |
| | | if (connection == null) |
| | | if (isClosed()) |
| | | { |
| | | throw new IllegalStateException(); |
| | | } |
| | |
| | | */ |
| | | public boolean isClosed() |
| | | { |
| | | return connection == null; |
| | | return isClosed; |
| | | } |
| | | |
| | | |
| | | public boolean isValid() |
| | | { |
| | | return !isClosed && connection.isValid(); |
| | | } |
| | | |
| | | public void connectionReceivedUnsolicitedNotification( |
| | | GenericExtendedResult notification) |
| | |
| | | public void connectionErrorOccurred( |
| | | boolean isDisconnectNotification, ErrorResultException error) |
| | | { |
| | | synchronized (lock) |
| | | // Remove this connection from the pool if its in there. If not, just |
| | | // ignore and wait for the user to close and we can deal with it there. |
| | | if(pool.remove(this)) |
| | | { |
| | | // Remove this connection from the pool if its in there |
| | | pool.remove(this); |
| | | numConnections--; |
| | | connection.removeConnectionEventListener(this); |
| | | |
| | |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection error occured: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | "Connection error occured: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | this.connectionFactory = connectionFactory; |
| | | this.poolSize = poolSize; |
| | | this.pool = new Stack<AsynchronousConnection>(); |
| | | this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>(); |
| | | this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>(); |
| | | } |
| | | |
| | | |
| | | |
| | | public FutureResult<AsynchronousConnection> getAsynchronousConnection( |
| | | ResultHandler<? super AsynchronousConnection> handler) |
| | | public synchronized FutureResult<AsynchronousConnection> |
| | | getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler) |
| | | { |
| | | synchronized (lock) |
| | | // This entire method is synchronized to ensure new connects are done |
| | | // synchronously to avoid the "pending connect" case. |
| | | AsynchronousConnection conn; |
| | | synchronized(pool) |
| | | { |
| | | // Check to see if we have a connection in the pool |
| | | |
| | | if (!pool.isEmpty()) |
| | | conn = pool.poll(); |
| | | if (conn == null) |
| | | { |
| | | AsynchronousConnection conn = pool.pop(); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | // Pool was empty. Maybe a new connection if pool size is not |
| | | // reached |
| | | if (numConnections >= poolSize) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection aquired from pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | // We reached max # of conns so wait for a connection to become available. |
| | | FuturePooledConnection future = new FuturePooledConnection( |
| | | handler); |
| | | pendingFutures.add(future); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "No connections available. Wait-listed" |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | |
| | | return future; |
| | | } |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | conn); |
| | | if (handler != null) |
| | | { |
| | | handler.handleResult(pooledConnection); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>( |
| | | pooledConnection); |
| | | } |
| | | |
| | | // Pool was empty. Maybe a new connection if pool size is not |
| | | // reached |
| | | if (numConnections < poolSize) |
| | | { |
| | | // We can create a new connection. |
| | | numConnections++; |
| | | |
| | | FutureNewConnection future = new FutureNewConnection(handler); |
| | | future.setFutureResult(connectionFactory |
| | | .getAsynchronousConnection(future)); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "New connection established and aquired. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | |
| | | return future; |
| | | } |
| | | else |
| | | { |
| | | // Pool is full so wait for a connection to become available. |
| | | FuturePooledConnection future = new FuturePooledConnection( |
| | | handler); |
| | | pendingFutures.add(future); |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "No connections available. Wait-listed" |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | |
| | | return future; |
| | | } |
| | | } |
| | | |
| | | if(conn == null) |
| | | { |
| | | try |
| | | { |
| | | // We can create a new connection. |
| | | conn = connectionFactory.getAsynchronousConnection(handler).get(); |
| | | numConnections++; |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "New connection established and aquired. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | } |
| | | catch (ErrorResultException e) |
| | | { |
| | | return new CompletedFutureResult<AsynchronousConnection>(e); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | return new CompletedFutureResult<AsynchronousConnection>( |
| | | new ErrorResultException(Responses.newResult( |
| | | ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e))); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection aquired from pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | } |
| | | |
| | | PooledConnectionWapper pooledConnection = new PooledConnectionWapper( |
| | | conn); |
| | | if (handler != null) |
| | | { |
| | | handler.handleResult(pooledConnection); |
| | | } |
| | | return new CompletedFutureResult<AsynchronousConnection>( |
| | | pooledConnection); |
| | | |
| | | } |
| | | } |