mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
16.13.2009 abc1a19fd4dee9729fd0aed721575a396d249bd4
sdk/src/org/opends/sdk/ConnectionPool.java
@@ -32,15 +32,15 @@
import java.util.Collection;
import java.util.Stack;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.AbstractFutureResult;
import com.sun.opends.sdk.util.CompletedFutureResult;
import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.StaticUtils;
@@ -48,7 +48,7 @@
/**
 * A simple connection pool implementation.
 */
public class ConnectionPool extends
public final class ConnectionPool extends
    AbstractConnectionFactory<AsynchronousConnection>
{
  private final ConnectionFactory<?> connectionFactory;
@@ -60,12 +60,33 @@
  // FIXME: should use a better collection than this - CLQ?
  private final Stack<AsynchronousConnection> pool;
  private final ConcurrentLinkedQueue<PendingResultFuture> pendingFutures;
  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
  private final Object lock = new Object();
  private final class FutureNewConnection
      extends
      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
  {
    private FutureNewConnection(
        ResultHandler<? super AsynchronousConnection> handler)
    {
      super(handler);
    }
    protected AsynchronousConnection transformResult(
        AsynchronousConnection result) throws ErrorResultException
    {
      return new PooledConnectionWapper(result);
    }
  }
  private final class PooledConnectionWapper implements
      AsynchronousConnection, ConnectionEventListener
  {
@@ -144,24 +165,37 @@
            return;
          }
          // See if there waiters pending
          PendingResultFuture future = pendingFutures.poll();
          if (future != null)
          // See if there waiters pending.
          for (;;)
          {
            FuturePooledConnection future = pendingFutures.poll();
            if (future == null)
            {
              // No waiters - so drop out and add connection to pool.
              break;
            }
            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                connection);
            future.connection(pooledConnection);
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
            future.handleResult(pooledConnection);
            if (!future.isCancelled())
            {
              StaticUtils.DEBUG_LOG
                  .finest(String
                      .format(
                          "Connection released to pool and directly "
                              + "given to waiter. numConnections: %d, poolSize: %d, "
                              + "pendingFutures: %d", numConnections,
                          pool.size(), pendingFutures.size()));
              // The future was not cancelled and the connection was
              // accepted.
              if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
              {
                StaticUtils.DEBUG_LOG
                    .finest(String
                        .format(
                            "Connection released to pool and directly "
                                + "given to waiter. numConnections: %d, poolSize: %d, "
                                + "pendingFutures: %d", numConnections,
                            pool.size(), pendingFutures.size()));
              }
              return;
            }
            return;
          }
          // No waiters. Put back in pool.
@@ -438,165 +472,26 @@
  private static final class CompletedResultFuture implements
      FutureResult<AsynchronousConnection>
  // Future used for waiting for pooled connections to become available.
  private static final class FuturePooledConnection extends
      AbstractFutureResult<AsynchronousConnection>
  {
    private final PooledConnectionWapper connection;
    private CompletedResultFuture(PooledConnectionWapper connection)
    {
      this.connection = connection;
    }
    public boolean cancel(boolean mayInterruptIfRunning)
    {
      return false;
    }
    public AsynchronousConnection get() throws InterruptedException,
        ErrorResultException
    {
      return connection;
    }
    public AsynchronousConnection get(long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException,
        ErrorResultException
    {
      return connection;
    }
    public boolean isCancelled()
    {
      return false;
    }
    public boolean isDone()
    {
      return true;
    }
    public int getRequestID()
    {
      return -1;
    }
  }
  private final class PendingResultFuture implements
      FutureResult<AsynchronousConnection>
  {
    private volatile boolean isCancelled;
    private volatile PooledConnectionWapper connection;
    private volatile ErrorResultException err;
    private final ResultHandler<? super AsynchronousConnection> handler;
    private final CountDownLatch latch = new CountDownLatch(1);
    private PendingResultFuture(
    private FuturePooledConnection(
        ResultHandler<? super AsynchronousConnection> handler)
    {
      this.handler = handler;
      super(handler);
    }
    public synchronized boolean cancel(boolean mayInterruptIfRunning)
    {
      return pendingFutures.remove(this) && (isCancelled = true);
    }
    public AsynchronousConnection get() throws InterruptedException,
        ErrorResultException
    {
      latch.await();
      if (err != null)
      {
        throw err;
      }
      return connection;
    }
    public AsynchronousConnection get(long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException,
        ErrorResultException
    {
      latch.await(timeout, unit);
      if (err != null)
      {
        throw err;
      }
      return connection;
    }
    public synchronized boolean isCancelled()
    {
      return isCancelled;
    }
    public boolean isDone()
    {
      return latch.getCount() == 0;
    }
    /**
     * {@inheritDoc}
     */
    public int getRequestID()
    {
      return -1;
    }
    private void connection(PooledConnectionWapper connection)
    {
      this.connection = connection;
      if (handler != null)
      {
        handler.handleResult(connection);
      }
      latch.countDown();
    }
    private void error(ErrorResultException e)
    {
      this.err = e;
      if (handler != null)
      {
        handler.handleErrorResult(e);
      }
      latch.countDown();
    }
  }
@@ -604,7 +499,7 @@
  /**
   * Creates a new connection pool which will maintain {@code poolSize}
   * connections created using the provided connection factory.
   *
   *
   * @param connectionFactory
   *          The connection factory to use for creating new
   *          connections.
@@ -617,38 +512,7 @@
    this.connectionFactory = connectionFactory;
    this.poolSize = poolSize;
    this.pool = new Stack<AsynchronousConnection>();
    this.pendingFutures = new ConcurrentLinkedQueue<PendingResultFuture>();
  }
  private final class WrapResultHandler implements
      ResultHandler<AsynchronousConnection>
  {
    private final PendingResultFuture future;
    private WrapResultHandler(PendingResultFuture future)
    {
      this.future = future;
    }
    public void handleResult(AsynchronousConnection connection)
    {
      PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
          connection);
      future.connection(pooledConnection);
    }
    public void handleErrorResult(ErrorResultException error)
    {
      future.error(error);
    }
    this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
  }
@@ -679,19 +543,21 @@
        {
          handler.handleResult(pooledConnection);
        }
        return new CompletedResultFuture(pooledConnection);
        return new CompletedFutureResult<AsynchronousConnection>(
            pooledConnection);
      }
      PendingResultFuture pendingFuture = new PendingResultFuture(
          handler);
      // Pool was empty. Maybe a new connection if pool size is not
      // reached
      if (numConnections < poolSize)
      {
        // We can create a new connection.
        numConnections++;
        WrapResultHandler wrapHandler = new WrapResultHandler(
            pendingFuture);
        connectionFactory.getAsynchronousConnection(wrapHandler);
        FutureNewConnection future = new FutureNewConnection(handler);
        future.setFutureResult(connectionFactory
            .getAsynchronousConnection(future));
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
@@ -702,11 +568,16 @@
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
        return future;
      }
      else
      {
        // Have to wait
        pendingFutures.add(pendingFuture);
        // Pool is full so wait for a connection to become available.
        FuturePooledConnection future = new FuturePooledConnection(
            handler);
        pendingFutures.add(future);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
@@ -717,9 +588,9 @@
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
      }
      return pendingFuture;
        return future;
      }
    }
  }
}