mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
22.55.2009 1fbc9df5d0c44ae72c76dacc3c945d4f0d641ee6
sdk/src/org/opends/sdk/ConnectionPool.java
@@ -31,7 +31,9 @@
import java.util.Collection;
import java.util.Stack;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -58,40 +60,17 @@
  private final int poolSize;
  // FIXME: should use a better collection than this - CLQ?
  private final Stack<AsynchronousConnection> pool;
  private final Queue<AsynchronousConnection> pool;
  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
  private final Object lock = new Object();
  private final class FutureNewConnection
      extends
      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
  {
    private FutureNewConnection(
        ResultHandler<? super AsynchronousConnection> handler)
    {
      super(handler);
    }
    protected AsynchronousConnection transformResult(
        AsynchronousConnection result) throws ErrorResultException
    {
      return new PooledConnectionWapper(result);
    }
  }
  private final class PooledConnectionWapper implements
      AsynchronousConnection, ConnectionEventListener
  {
    private AsynchronousConnection connection;
    private final AsynchronousConnection connection;
    private volatile boolean isClosed;
    private PooledConnectionWapper(AsynchronousConnection connection)
@@ -106,7 +85,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -120,7 +99,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -134,7 +113,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -145,29 +124,26 @@
    public void close()
    {
      synchronized (lock)
      synchronized (pool)
      {
        try
        if(isClosed)
        {
          // Don't put closed connections back in the pool.
          if (connection.isClosed())
          {
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
            {
              StaticUtils.DEBUG_LOG
                  .finest(String
                      .format(
                          "Dead connection released to pool. "
                              + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                          numConnections, pool.size(), pendingFutures
                              .size()));
            }
            return;
          }
          return;
        }
        isClosed = true;
        // Don't put closed connections back in the pool.
        if (!connection.isValid())
        {
          numConnections--;
        }
        else
        {
          // See if there waiters pending.
          for (;;)
          {
            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                connection);
            FuturePooledConnection future = pendingFutures.poll();
            if (future == null)
@@ -176,8 +152,6 @@
              break;
            }
            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                connection);
            future.handleResult(pooledConnection);
            if (!future.isCancelled())
@@ -189,34 +163,45 @@
                StaticUtils.DEBUG_LOG
                    .finest(String
                        .format(
                            "Connection released to pool and directly "
                                + "given to waiter. numConnections: %d, poolSize: %d, "
                                + "pendingFutures: %d", numConnections,
                            pool.size(), pendingFutures.size()));
                        "Connection released to pool and directly "
                        + "given to waiter. numConnections: %d, poolSize: %d, "
                        + "pendingFutures: %d", numConnections,
                        pool.size(), pendingFutures.size()));
              }
              return;
            }
          }
          // No waiters. Put back in pool.
          pool.push(connection);
          pool.offer(connection);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG
                .finest(String
                    .format(
                        "Connection released to pool. "
                            + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                        numConnections, pool.size(), pendingFutures
                            .size()));
                    "Connection released to pool and directly "
                    + "given to waiter. numConnections: %d, poolSize: %d, "
                    + "pendingFutures: %d", numConnections,
                    pool.size(), pendingFutures.size()));
          }
        }
        finally
        {
          // Null out the underlying connection to prevent further use.
          connection = null;
          return;
        }
      }
      // Connection is no longer valid. Close outside of lock
      connection.removeConnectionEventListener(this);
      connection.close();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
                .format(
                "Dead connection released to pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
      return;
    }
@@ -234,7 +219,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -248,7 +233,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -262,7 +247,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -276,7 +261,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -290,7 +275,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -305,7 +290,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -324,7 +309,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -343,7 +328,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -359,7 +344,7 @@
        ResultHandler<RootDSE> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -375,7 +360,7 @@
        ResultHandler<Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -391,7 +376,7 @@
        ResultHandler<Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -404,7 +389,7 @@
        ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -415,7 +400,7 @@
    public void removeConnectionEventListener(
        ConnectionEventListener listener) throws NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -428,10 +413,13 @@
     */
    public boolean isClosed()
    {
      return connection == null;
      return isClosed;
    }
    public boolean isValid()
    {
      return !isClosed && connection.isValid();
    }
    public void connectionReceivedUnsolicitedNotification(
        GenericExtendedResult notification)
@@ -444,10 +432,10 @@
    public void connectionErrorOccurred(
        boolean isDisconnectNotification, ErrorResultException error)
    {
      synchronized (lock)
      // Remove this connection from the pool if its in there. If not, just
      // ignore and wait for the user to close and we can deal with it there.
      if(pool.remove(this))
      {
        // Remove this connection from the pool if its in there
        pool.remove(this);
        numConnections--;
        connection.removeConnectionEventListener(this);
@@ -460,11 +448,11 @@
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "Connection error occured: "
                          + error.getMessage()
                          + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
                  "Connection error occured: "
                  + error.getMessage()
                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures
                      .size()));
        }
      }
    }
@@ -510,86 +498,100 @@
  {
    this.connectionFactory = connectionFactory;
    this.poolSize = poolSize;
    this.pool = new Stack<AsynchronousConnection>();
    this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
    this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
  }
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      ResultHandler<? super AsynchronousConnection> handler)
  public synchronized FutureResult<AsynchronousConnection>
  getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler)
  {
    synchronized (lock)
    // This entire method is synchronized to ensure new connects are done
    // synchronously to avoid the "pending connect" case.
    AsynchronousConnection conn;
    synchronized(pool)
    {
      // Check to see if we have a connection in the pool
      if (!pool.isEmpty())
      conn = pool.poll();
      if (conn == null)
      {
        AsynchronousConnection conn = pool.pop();
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        // Pool was empty. Maybe a new connection if pool size is not
        // reached
        if (numConnections >= poolSize)
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "Connection aquired from pool. "
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
          // We reached max # of conns so wait for a connection to become available.
          FuturePooledConnection future = new FuturePooledConnection(
              handler);
          pendingFutures.add(future);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG
                .finest(String
                    .format(
                    "No connections available. Wait-listed"
                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                    numConnections, pool.size(), pendingFutures
                        .size()));
          }
          return future;
        }
        PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
            conn);
        if (handler != null)
        {
          handler.handleResult(pooledConnection);
        }
        return new CompletedFutureResult<AsynchronousConnection>(
            pooledConnection);
      }
      // Pool was empty. Maybe a new connection if pool size is not
      // reached
      if (numConnections < poolSize)
      {
        // We can create a new connection.
        numConnections++;
        FutureNewConnection future = new FutureNewConnection(handler);
        future.setFutureResult(connectionFactory
            .getAsynchronousConnection(future));
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "New connection established and aquired. "
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
        return future;
      }
      else
      {
        // Pool is full so wait for a connection to become available.
        FuturePooledConnection future = new FuturePooledConnection(
            handler);
        pendingFutures.add(future);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "No connections available. Wait-listed"
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
        return future;
      }
    }
    if(conn == null)
    {
      try
      {
        // We can create a new connection.
        conn = connectionFactory.getAsynchronousConnection(handler).get();
        numConnections++;
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                  "New connection established and aquired. "
                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures
                      .size()));
        }
      }
      catch (ErrorResultException e)
      {
        return new CompletedFutureResult<AsynchronousConnection>(e);
      }
      catch (InterruptedException e)
      {
        return new CompletedFutureResult<AsynchronousConnection>(
            new ErrorResultException(Responses.newResult(
                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
      }
    }
    else
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
                .format(
                "Connection aquired from pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
    }
    PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
        conn);
    if (handler != null)
    {
      handler.handleResult(pooledConnection);
    }
    return new CompletedFutureResult<AsynchronousConnection>(
        pooledConnection);
  }
}