| | |
| | | |
| | | |
| | | import java.util.Collection; |
| | | import java.util.Stack; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.logging.Level; |
| | | |
| | | import org.opends.sdk.requests.*; |
| | |
| | | |
| | | import com.sun.opends.sdk.util.AbstractFutureResult; |
| | | import com.sun.opends.sdk.util.CompletedFutureResult; |
| | | import com.sun.opends.sdk.util.FutureResultTransformer; |
| | | import com.sun.opends.sdk.util.StaticUtils; |
| | | |
| | | |
| | |
| | | /** |
| | | * A simple connection pool implementation. |
| | | */ |
| | | final class ConnectionPool extends |
| | | AbstractConnectionFactory<AsynchronousConnection> |
| | | final class ConnectionPool extends AbstractConnectionFactory |
| | | { |
| | | private final ConnectionFactory<?> connectionFactory; |
| | | private final ConnectionFactory connectionFactory; |
| | | |
| | | private volatile int numConnections; |
| | | |
| | |
| | | AsynchronousConnection, ConnectionEventListener |
| | | { |
| | | private final AsynchronousConnection connection; |
| | | |
| | | private volatile boolean isClosed; |
| | | |
| | | |
| | | |
| | | private PooledConnectionWapper(AsynchronousConnection connection) |
| | | { |
| | | this.connection = connection; |
| | |
| | | { |
| | | synchronized (pool) |
| | | { |
| | | if(isClosed) |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | |
| | | connection.close(); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .warning(String |
| | | .format( |
| | | "Dead connection released and closed. " |
| | | + "numConnections: %d, poolSize: %d, " + |
| | | "pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Dead connection released and closed. " |
| | | + "numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, pool.size(), |
| | | pendingFutures.size())); |
| | | } |
| | | |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .warning(String |
| | | .format( |
| | | "Reconnect attempt starting. " |
| | | + "numConnections: %d, poolSize: %d, " + |
| | | "pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | StaticUtils.DEBUG_LOG.warning(String.format( |
| | | "Reconnect attempt starting. " |
| | | + "numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, pool.size(), |
| | | pendingFutures.size())); |
| | | } |
| | | connectionFactory.getAsynchronousConnection(new ReconnectHandler()); |
| | | connectionFactory |
| | | .getAsynchronousConnection(new ReconnectHandler()); |
| | | } |
| | | |
| | | |
| | | |
| | | public void close(UnbindRequest request, String reason) |
| | | throws NullPointerException |
| | | { |
| | |
| | | return isClosed; |
| | | } |
| | | |
| | | |
| | | |
| | | public boolean isValid() |
| | | { |
| | | return !isClosed && connection.isValid(); |
| | | } |
| | | |
| | | |
| | | |
| | | public void connectionReceivedUnsolicitedNotification( |
| | | GenericExtendedResult notification) |
| | | { |
| | |
| | | public void connectionErrorOccurred( |
| | | boolean isDisconnectNotification, ErrorResultException error) |
| | | { |
| | | // Remove this connection from the pool if its in there. If not, just |
| | | // ignore and wait for the user to close and we can deal with it there. |
| | | if(pool.remove(this)) |
| | | // Remove this connection from the pool if its in there. If not, |
| | | // just |
| | | // ignore and wait for the user to close and we can deal with it |
| | | // there. |
| | | if (pool.remove(this)) |
| | | { |
| | | numConnections--; |
| | | connection.removeConnectionEventListener(this); |
| | |
| | | StaticUtils.DEBUG_LOG |
| | | .warning(String |
| | | .format( |
| | | "Connection error occured and removed from pool: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | "Connection error occured and removed from pool: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private class ReconnectHandler |
| | | implements ResultHandler<AsynchronousConnection> |
| | | |
| | | |
| | | private class ReconnectHandler implements |
| | | ResultHandler<AsynchronousConnection> |
| | | { |
| | | public void handleErrorResult(ErrorResultException error) { |
| | | public void handleErrorResult(ErrorResultException error) |
| | | { |
| | | // The reconnect failed. Fail the connect attempt. |
| | | numConnections --; |
| | | // The reconnect failed. The underlying connection factory probably went |
| | | numConnections--; |
| | | // The reconnect failed. The underlying connection factory |
| | | // probably went |
| | | // down. Just fail all pending futures |
| | | synchronized (pool) |
| | | { |
| | | while(!pendingFutures.isEmpty()) |
| | | while (!pendingFutures.isEmpty()) |
| | | { |
| | | pendingFutures.poll().handleErrorResult(error); |
| | | } |
| | |
| | | StaticUtils.DEBUG_LOG |
| | | .warning(String |
| | | .format( |
| | | "Reconnect failed. Failed all pending futures: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | "Reconnect failed. Failed all pending futures: " |
| | | + error.getMessage() |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | |
| | | } |
| | | |
| | | public void handleResult(AsynchronousConnection connection) { |
| | | |
| | | |
| | | public void handleResult(AsynchronousConnection connection) |
| | | { |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Reconnect succeded. " |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | "Reconnect succeded. " |
| | | + " numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | synchronized (pool) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | // Future used for waiting for pooled connections to become available. |
| | | private static final class FuturePooledConnection extends |
| | | AbstractFutureResult<AsynchronousConnection> |
| | |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | private void releaseConnection(AsynchronousConnection connection) |
| | | { |
| | | // See if there waiters pending. |
| | |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | "Connection released and directly " |
| | | + "given to waiter. numConnections: %d, poolSize: %d, " |
| | | + "pendingFutures: %d", numConnections, pool |
| | | .size(), pendingFutures.size())); |
| | | } |
| | | return; |
| | | } |
| | |
| | | pool.offer(connection); |
| | | if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) |
| | | { |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection released to pool. numConnections: %d, " + |
| | | "poolSize: %d, pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | StaticUtils.DEBUG_LOG.finest(String.format( |
| | | "Connection released to pool. numConnections: %d, " |
| | | + "poolSize: %d, pendingFutures: %d", numConnections, |
| | | pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | |
| | |
| | | * @param poolSize |
| | | * The maximum size of the connection pool. |
| | | */ |
| | | ConnectionPool(ConnectionFactory<?> connectionFactory, int poolSize) |
| | | ConnectionPool(ConnectionFactory connectionFactory, int poolSize) |
| | | { |
| | | this.connectionFactory = connectionFactory; |
| | | this.poolSize = poolSize; |
| | |
| | | |
| | | |
| | | |
| | | public synchronized FutureResult<AsynchronousConnection> |
| | | getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler) |
| | | public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection( |
| | | ResultHandler<AsynchronousConnection> handler) |
| | | { |
| | | // This entire method is synchronized to ensure new connects are done |
| | | // This entire method is synchronized to ensure new connects are |
| | | // done |
| | | // synchronously to avoid the "pending connect" case. |
| | | AsynchronousConnection conn; |
| | | synchronized(pool) |
| | | synchronized (pool) |
| | | { |
| | | // Check to see if we have a connection in the pool |
| | | conn = pool.poll(); |
| | |
| | | // reached |
| | | if (numConnections >= poolSize) |
| | | { |
| | | // We reached max # of conns so wait for a connection to become available. |
| | | // We reached max # of conns so wait for a connection to |
| | | // become available. |
| | | FuturePooledConnection future = new FuturePooledConnection( |
| | | handler); |
| | | pendingFutures.add(future); |
| | |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "No connections available. Wait-listed" |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | "No connections available. Wait-listed" |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | |
| | | return future; |
| | |
| | | } |
| | | } |
| | | |
| | | if(conn == null) |
| | | if (conn == null) |
| | | { |
| | | try |
| | | { |
| | |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "New connection established and aquired. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | "New connection established and aquired. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | } |
| | | } |
| | | catch (ErrorResultException e) |
| | |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | ErrorResultException error = |
| | | new ErrorResultException(Responses.newResult( |
| | | ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); |
| | | ErrorResultException error = new ErrorResultException(Responses |
| | | .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); |
| | | if (handler != null) |
| | | { |
| | | handler.handleErrorResult(error); |
| | |
| | | StaticUtils.DEBUG_LOG |
| | | .finest(String |
| | | .format( |
| | | "Connection aquired from pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures |
| | | .size())); |
| | | "Connection aquired from pool. " |
| | | + "numConnections: %d, poolSize: %d, pendingFutures: %d", |
| | | numConnections, pool.size(), pendingFutures.size())); |
| | | } |
| | | } |
| | | |