mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
11.23.2010 4ea2bc73aa3a298a61532530eb177704fa4a569f
opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,10 +30,8 @@
import java.util.Collection;
import java.util.Stack;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -42,7 +40,6 @@
import com.sun.opends.sdk.util.AbstractFutureResult;
import com.sun.opends.sdk.util.CompletedFutureResult;
import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.StaticUtils;
@@ -50,10 +47,9 @@
/**
 * A simple connection pool implementation.
 */
final class ConnectionPool extends
    AbstractConnectionFactory<AsynchronousConnection>
final class ConnectionPool extends AbstractConnectionFactory
{
  private final ConnectionFactory<?> connectionFactory;
  private final ConnectionFactory connectionFactory;
  private volatile int numConnections;
@@ -70,9 +66,11 @@
      AsynchronousConnection, ConnectionEventListener
  {
    private final AsynchronousConnection connection;
    private volatile boolean isClosed;
    private PooledConnectionWapper(AsynchronousConnection connection)
    {
      this.connection = connection;
@@ -126,7 +124,7 @@
    {
      synchronized (pool)
      {
        if(isClosed)
        if (isClosed)
        {
          return;
        }
@@ -145,31 +143,27 @@
      connection.close();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG
            .warning(String
                .format(
                "Dead connection released and closed. "
                    + "numConnections: %d, poolSize: %d, " +
                    "pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Dead connection released and closed. "
                + "numConnections: %d, poolSize: %d, "
                + "pendingFutures: %d", numConnections, pool.size(),
            pendingFutures.size()));
      }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG
            .warning(String
                .format(
                "Reconnect attempt starting. "
                    + "numConnections: %d, poolSize: %d, " +
                    "pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Reconnect attempt starting. "
                + "numConnections: %d, poolSize: %d, "
                + "pendingFutures: %d", numConnections, pool.size(),
            pendingFutures.size()));
      }
      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
      connectionFactory
          .getAsynchronousConnection(new ReconnectHandler());
    }
    public void close(UnbindRequest request, String reason)
        throws NullPointerException
    {
@@ -380,11 +374,15 @@
      return isClosed;
    }
    public boolean isValid()
    {
      return !isClosed && connection.isValid();
    }
    public void connectionReceivedUnsolicitedNotification(
        GenericExtendedResult notification)
    {
@@ -396,9 +394,11 @@
    public void connectionErrorOccurred(
        boolean isDisconnectNotification, ErrorResultException error)
    {
      // Remove this connection from the pool if its in there. If not, just
      // ignore and wait for the user to close and we can deal with it there.
      if(pool.remove(this))
      // Remove this connection from the pool if its in there. If not,
      // just
      // ignore and wait for the user to close and we can deal with it
      // there.
      if (pool.remove(this))
      {
        numConnections--;
        connection.removeConnectionEventListener(this);
@@ -412,27 +412,31 @@
          StaticUtils.DEBUG_LOG
              .warning(String
                  .format(
                  "Connection error occured and removed from pool: "
                  + error.getMessage()
                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures
                      .size()));
                      "Connection error occured and removed from pool: "
                          + error.getMessage()
                          + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
      }
    }
  }
  private class ReconnectHandler
      implements ResultHandler<AsynchronousConnection>
  private class ReconnectHandler implements
      ResultHandler<AsynchronousConnection>
  {
    public void handleErrorResult(ErrorResultException error) {
    public void handleErrorResult(ErrorResultException error)
    {
      // The reconnect failed. Fail the connect attempt.
      numConnections --;
      // The reconnect failed. The underlying connection factory probably went
      numConnections--;
      // The reconnect failed. The underlying connection factory
      // probably went
      // down. Just fail all pending futures
      synchronized (pool)
      {
        while(!pendingFutures.isEmpty())
        while (!pendingFutures.isEmpty())
        {
          pendingFutures.poll().handleErrorResult(error);
        }
@@ -442,25 +446,26 @@
        StaticUtils.DEBUG_LOG
            .warning(String
                .format(
                "Reconnect failed. Failed all pending futures: "
                    + error.getMessage()
                    + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
                    "Reconnect failed. Failed all pending futures: "
                        + error.getMessage()
                        + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                    numConnections, pool.size(), pendingFutures.size()));
      }
    }
    public void handleResult(AsynchronousConnection connection) {
    public void handleResult(AsynchronousConnection connection)
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
                .format(
                "Reconnect succeded. "
                    + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
                    "Reconnect succeded. "
                        + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                    numConnections, pool.size(), pendingFutures.size()));
      }
      synchronized (pool)
      {
@@ -469,6 +474,8 @@
    }
  }
  // Future used for waiting for pooled connections to become available.
  private static final class FuturePooledConnection extends
      AbstractFutureResult<AsynchronousConnection>
@@ -491,6 +498,8 @@
  }
  private void releaseConnection(AsynchronousConnection connection)
  {
    // See if there waiters pending.
@@ -517,10 +526,10 @@
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                  "Connection released and directly "
                      + "given to waiter. numConnections: %d, poolSize: %d, "
                      + "pendingFutures: %d", numConnections,
                  pool.size(), pendingFutures.size()));
                      "Connection released and directly "
                          + "given to waiter. numConnections: %d, poolSize: %d, "
                          + "pendingFutures: %d", numConnections, pool
                          .size(), pendingFutures.size()));
        }
        return;
      }
@@ -530,12 +539,10 @@
    pool.offer(connection);
    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
    {
      StaticUtils.DEBUG_LOG
          .finest(String
              .format(
              "Connection released to pool. numConnections: %d, " +
                  "poolSize: %d, pendingFutures: %d", numConnections,
              pool.size(), pendingFutures.size()));
      StaticUtils.DEBUG_LOG.finest(String.format(
          "Connection released to pool. numConnections: %d, "
              + "poolSize: %d, pendingFutures: %d", numConnections,
          pool.size(), pendingFutures.size()));
    }
  }
@@ -551,7 +558,7 @@
   * @param poolSize
   *          The maximum size of the connection pool.
   */
  ConnectionPool(ConnectionFactory<?> connectionFactory, int poolSize)
  ConnectionPool(ConnectionFactory connectionFactory, int poolSize)
  {
    this.connectionFactory = connectionFactory;
    this.poolSize = poolSize;
@@ -561,13 +568,14 @@
  public synchronized FutureResult<AsynchronousConnection>
  getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler)
  public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
      ResultHandler<AsynchronousConnection> handler)
  {
    // This entire method is synchronized to ensure new connects are done
    // This entire method is synchronized to ensure new connects are
    // done
    // synchronously to avoid the "pending connect" case.
    AsynchronousConnection conn;
    synchronized(pool)
    synchronized (pool)
    {
      // Check to see if we have a connection in the pool
      conn = pool.poll();
@@ -577,7 +585,8 @@
        // reached
        if (numConnections >= poolSize)
        {
          // We reached max # of conns so wait for a connection to become available.
          // We reached max # of conns so wait for a connection to
          // become available.
          FuturePooledConnection future = new FuturePooledConnection(
              handler);
          pendingFutures.add(future);
@@ -587,10 +596,10 @@
            StaticUtils.DEBUG_LOG
                .finest(String
                    .format(
                    "No connections available. Wait-listed"
                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                    numConnections, pool.size(), pendingFutures
                        .size()));
                        "No connections available. Wait-listed"
                            + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                        numConnections, pool.size(), pendingFutures
                            .size()));
          }
          return future;
@@ -598,7 +607,7 @@
      }
    }
    if(conn == null)
    if (conn == null)
    {
      try
      {
@@ -610,10 +619,10 @@
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                  "New connection established and aquired. "
                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures
                      .size()));
                      "New connection established and aquired. "
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
      }
      catch (ErrorResultException e)
@@ -626,9 +635,8 @@
      }
      catch (InterruptedException e)
      {
        ErrorResultException error =
            new ErrorResultException(Responses.newResult(
                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
        ErrorResultException error = new ErrorResultException(Responses
            .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
        if (handler != null)
        {
          handler.handleErrorResult(error);
@@ -643,10 +651,9 @@
        StaticUtils.DEBUG_LOG
            .finest(String
                .format(
                "Connection aquired from pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
                    "Connection aquired from pool. "
                        + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                    numConnections, pool.size(), pendingFutures.size()));
      }
    }