mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
19.36.2010 9f2bba679ab597f1e50078a29d145100e3baed3c
sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,8 +30,11 @@
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -41,6 +44,7 @@
import com.sun.opends.sdk.util.AsynchronousFutureResult;
import com.sun.opends.sdk.util.CompletedFutureResult;
import com.sun.opends.sdk.util.StaticUtils;
import com.sun.opends.sdk.util.Validator;
@@ -49,36 +53,101 @@
 */
final class ConnectionPool extends AbstractConnectionFactory
{
  // Future used for waiting for pooled connections to become available.
  private static final class FuturePooledConnection extends
      AsynchronousFutureResult<AsynchronousConnection>
  /**
   * This result handler is invoked when an attempt to add a new connection to
   * the pool completes.
   */
  private final class ConnectionResultHandler implements
      ResultHandler<AsynchronousConnection>
  {
    private FuturePooledConnection(
        final ResultHandler<? super AsynchronousConnection> handler)
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleErrorResult(final ErrorResultException error)
    {
      super(handler);
      // Connection attempt failed, so decrease the pool size.
      currentPoolSize.release();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt failed: " + error.getMessage()
                + " currentPoolSize=%d, poolSize=%d",
            poolSize - currentPoolSize.availablePermits(), poolSize));
      }
      QueueElement holder;
      synchronized (queue)
      {
        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
        {
          // No waiting futures.
          return;
        }
        else
        {
          holder = queue.removeFirst();
        }
      }
      // There was waiting future, so close it.
      holder.getWaitingFuture().handleErrorResult(error);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleResult(final AsynchronousConnection connection)
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt succeeded: "
                + " currentPoolSize=%d, poolSize=%d",
                poolSize - currentPoolSize.availablePermits(), poolSize));
      }
      publishConnection(connection);
    }
  }
  private final class PooledConnectionWapper implements AsynchronousConnection,
      ConnectionEventListener
  /**
   * A pooled connection is passed to the client. It wraps an underlying
   * "pooled" connection obtained from the underlying factory and lasts until
   * the client application closes this connection. More specifically, pooled
   * connections are not actually stored in the internal queue.
   */
  private final class PooledConnection implements AsynchronousConnection
  {
    // Connection event listeners registed against this pooled connection should
    // have the same life time as the pooled connection.
    private final List<ConnectionEventListener> listeners =
      new CopyOnWriteArrayList<ConnectionEventListener>();
    private final AsynchronousConnection connection;
    private volatile boolean isClosed;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private PooledConnectionWapper(final AsynchronousConnection connection)
    PooledConnection(final AsynchronousConnection connection)
    {
      this.connection = connection;
      this.connection.addConnectionEventListener(this);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Void> abandon(final AbandonRequest request)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
@@ -92,6 +161,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -106,6 +179,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -122,18 +199,28 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void addConnectionEventListener(
        final ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.add(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -148,6 +235,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -164,47 +255,51 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void close()
    {
      synchronized (pool)
      if (!isClosed.compareAndSet(false, true))
      {
        if (isClosed)
        {
          return;
        }
        isClosed = true;
        // Already closed.
        return;
      }
        // Don't put invalid connections back in the pool.
        if (connection.isValid())
      // Don't put invalid connections back in the pool.
      if (connection.isValid())
      {
        publishConnection(connection);
      }
      else
      {
        // The connection may have been disconnected by the remote server, but
        // the server may still be available. In order to avoid leaving pending
        // futures hanging indefinitely, we should try to reconnect immediately.
        // Close the dead connection.
        connection.close();
        // Try to get a new connection to replace it.
        factory.getAsynchronousConnection(connectionResultHandler);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          releaseConnection(connection);
          return;
          StaticUtils.DEBUG_LOG.warning(String.format(
              "Connection no longer valid. "
                  + "currentPoolSize=%d, poolSize=%d",
                  poolSize - currentPoolSize.availablePermits(), poolSize));
        }
      }
      // Connection is no longer valid. Close outside of lock
      connection.removeConnectionEventListener(this);
      connection.close();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Dead connection released and closed. "
                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
      }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Reconnect attempt starting. "
                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
      }
      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close(final UnbindRequest request, final String reason)
        throws NullPointerException
    {
@@ -213,6 +308,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -227,6 +326,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -243,48 +346,10 @@
    public void handleConnectionClosed()
    {
      // Ignore - we intercept close via the close method.
    }
    public void handleConnectionError(final boolean isDisconnectNotification,
        final ErrorResultException error)
    {
      // Remove this connection from the pool if its in there. If not,
      // just ignore and wait for the user to close and we can deal with it
      // there.
      if (pool.remove(this))
      {
        numConnections--;
        connection.removeConnectionEventListener(this);
        // FIXME: should still close the connection, but we need to be
        // careful that users of the pooled connection get a sensible
        // error if they continue to use it (i.e. not an NPE or ISE).
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          StaticUtils.DEBUG_LOG.warning(String.format(
              "Connection error occured and removed from pool: "
                  + error.getMessage()
                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
              numConnections, pool.size(), pendingFutures.size()));
        }
      }
    }
    public void handleUnsolicitedNotification(final ExtendedResult notification)
    {
      // Ignore
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -299,6 +364,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -315,6 +384,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -329,6 +402,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request,
        final ResultHandler<? super R> resultHandler,
@@ -349,6 +426,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getSynchronousConnection()
    {
      return new SynchronousConnection(this);
@@ -359,20 +437,29 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isClosed()
    {
      return isClosed;
      return isClosed.get();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid()
    {
      return !isClosed && connection.isValid();
      return connection.isValid() && !isClosed();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -387,6 +474,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -403,6 +494,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -417,6 +512,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -436,6 +535,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> readEntry(final DN name,
        final Collection<String> attributeDescriptions,
        final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -454,6 +554,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<RootDSE> readRootDSE(
        final ResultHandler<? super RootDSE> handler)
        throws UnsupportedOperationException, IllegalStateException
@@ -470,6 +571,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Schema> readSchema(final DN name,
        final ResultHandler<? super Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
@@ -486,6 +588,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Schema> readSchemaForEntry(final DN name,
        final ResultHandler<? super Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
@@ -499,17 +602,27 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void removeConnectionEventListener(
        final ConnectionEventListener listener) throws NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.remove(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -524,6 +637,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -543,6 +660,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> searchSingleEntry(
        final SearchRequest request,
        final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -561,9 +679,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      StringBuilder builder = new StringBuilder();
      final StringBuilder builder = new StringBuilder();
      builder.append("PooledConnection(");
      builder.append(connection);
      builder.append(')');
@@ -573,63 +692,86 @@
  private class ReconnectHandler implements
      ResultHandler<AsynchronousConnection>
  /**
   * A queue element is either a pending connection request future awaiting an
   * {@code AsynchronousConnection} or it is an unused
   * {@code AsynchronousConnection} awaiting a connection request.
   */
  private static final class QueueElement
  {
    public void handleErrorResult(final ErrorResultException error)
    {
      // The reconnect failed. Fail the connect attempt.
      numConnections--;
      // The reconnect failed. The underlying connection factory
      // probably went
      // down. Just fail all pending futures
      synchronized (pool)
      {
        while (!pendingFutures.isEmpty())
        {
          pendingFutures.poll().handleErrorResult(error);
        }
      }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Reconnect failed. Failed all pending futures: "
                + error.getMessage()
                + " numConnections: %d, poolSize: %d, pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
      }
    private final Object value;
    QueueElement(final AsynchronousConnection connection)
    {
      this.value = connection;
    }
    public void handleResult(final AsynchronousConnection connection)
    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
    }
    AsynchronousConnection getWaitingConnection()
    {
      if (value instanceof AsynchronousConnection)
      {
        StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. "
            + " numConnections: %d, poolSize: %d, pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
        return (AsynchronousConnection) value;
      }
      synchronized (pool)
      else
      {
        releaseConnection(connection);
        throw new IllegalStateException();
      }
    }
    @SuppressWarnings("unchecked")
    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
    {
      if (value instanceof AsynchronousFutureResult)
      {
        return (AsynchronousFutureResult<AsynchronousConnection>) value;
      }
      else
      {
        throw new IllegalStateException();
      }
    }
    boolean isWaitingFuture()
    {
      return value instanceof AsynchronousFutureResult;
    }
    public String toString()
    {
      return String.valueOf(value);
    }
  }
  private final ConnectionFactory connectionFactory;
  // Guarded by queue.
  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
  private volatile int numConnections;
  private final ConnectionFactory factory;
  private final int poolSize;
  // FIXME: should use a better collection than this - CLQ?
  private final Queue<AsynchronousConnection> pool;
  private final Semaphore currentPoolSize;
  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
    new ConnectionResultHandler();
@@ -637,109 +779,16 @@
   * Creates a new connection pool which will maintain {@code poolSize}
   * connections created using the provided connection factory.
   *
   * @param connectionFactory
   * @param factory
   *          The connection factory to use for creating new connections.
   * @param poolSize
   *          The maximum size of the connection pool.
   */
  ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize)
  ConnectionPool(final ConnectionFactory factory, final int poolSize)
  {
    this.connectionFactory = connectionFactory;
    this.factory = factory;
    this.poolSize = poolSize;
    this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
    this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
  }
  @Override
  public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<? super AsynchronousConnection> handler)
  {
    // This entire method is synchronized to ensure new connects are
    // done synchronously to avoid the "pending connect" case.
    AsynchronousConnection conn;
    synchronized (pool)
    {
      // Check to see if we have a connection in the pool
      conn = pool.poll();
      if (conn == null)
      {
        // Pool was empty. Maybe a new connection if pool size is not
        // reached
        if (numConnections >= poolSize)
        {
          // We reached max # of conns so wait for a connection to
          // become available.
          final FuturePooledConnection future = new FuturePooledConnection(
              handler);
          pendingFutures.add(future);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG.finest(String.format(
                "No connections available. Wait-listed"
                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures.size()));
          }
          return future;
        }
      }
    }
    if (conn == null)
    {
      try
      {
        // We can create a new connection.
        conn = connectionFactory.getAsynchronousConnection(null).get();
        numConnections++;
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String.format(
              "New connection established and aquired. "
                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
              numConnections, pool.size(), pendingFutures.size()));
        }
      }
      catch (final ErrorResultException e)
      {
        if (handler != null)
        {
          handler.handleErrorResult(e);
        }
        return new CompletedFutureResult<AsynchronousConnection>(e);
      }
      catch (final InterruptedException e)
      {
        final ErrorResultException error = new ErrorResultException(Responses
            .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
        if (handler != null)
        {
          handler.handleErrorResult(error);
        }
        return new CompletedFutureResult<AsynchronousConnection>(error);
      }
    }
    else
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.finest(String.format(
            "Connection aquired from pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
      }
    }
    final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
        conn);
    if (handler != null)
    {
      handler.handleResult(pooledConnection);
    }
    return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
    this.currentPoolSize = new Semaphore(poolSize);
  }
@@ -747,11 +796,59 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<? super AsynchronousConnection> handler)
  {
    QueueElement holder;
    synchronized (queue)
    {
      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
      {
        holder = new QueueElement(handler);
        queue.add(holder);
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    if (!holder.isWaitingFuture())
    {
      // There was a completed connection attempt.
      final AsynchronousConnection connection = holder.getWaitingConnection();
      final PooledConnection pooledConnection = new PooledConnection(connection);
      if (handler != null)
      {
        handler.handleResult(pooledConnection);
      }
      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
    }
    else
    {
      // Grow the pool if needed.
      final FutureResult<AsynchronousConnection> future = holder
          .getWaitingFuture();
      if (!future.isDone() && currentPoolSize.tryAcquire())
      {
        factory.getAsynchronousConnection(connectionResultHandler);
      }
      return future;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    final StringBuilder builder = new StringBuilder();
    builder.append("ConnectionPool(");
    builder.append(String.valueOf(connectionFactory));
    builder.append(String.valueOf(factory));
    builder.append(',');
    builder.append(poolSize);
    builder.append(')');
@@ -760,47 +857,25 @@
  private void releaseConnection(final AsynchronousConnection connection)
  private void publishConnection(final AsynchronousConnection connection)
  {
    // See if there waiters pending.
    for (;;)
    QueueElement holder;
    synchronized (queue)
    {
      final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
          connection);
      final FuturePooledConnection future = pendingFutures.poll();
      if (future == null)
      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
      {
        // No waiters - so drop out and add connection to pool.
        break;
      }
      future.handleResult(pooledConnection);
      if (!future.isCancelled())
      {
        // The future was not cancelled and the connection was
        // accepted.
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String.format(
              "Connection released and directly "
                  + "given to waiter. numConnections: %d, poolSize: %d, "
                  + "pendingFutures: %d", numConnections, pool.size(),
              pendingFutures.size()));
        }
        holder = new QueueElement(connection);
        queue.add(holder);
        return;
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    // No waiters. Put back in pool.
    pool.offer(connection);
    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
    {
      StaticUtils.DEBUG_LOG.finest(String.format(
          "Connection released to pool. numConnections: %d, "
              + "poolSize: %d, pendingFutures: %d", numConnections,
          pool.size(), pendingFutures.size()));
    }
    // There was waiting future, so close it.
    final PooledConnection pooledConnection = new PooledConnection(connection);
    holder.getWaitingFuture().handleResult(pooledConnection);
  }
}