mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
11.45.2009 2bc8d15a28fafab97cefafede06d6b7e738ae0fe
sdk/src/org/opends/sdk/ConnectionPool.java
@@ -27,6 +27,9 @@
package org.opends.sdk;
import java.util.Collection;
import java.util.Stack;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@@ -35,95 +38,128 @@
import java.util.logging.Level;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.BindResult;
import org.opends.sdk.responses.CompareResult;
import org.opends.sdk.responses.GenericExtendedResult;
import org.opends.sdk.responses.Result;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.StaticUtils;
/**
 * A simple connection pool implementation.
 */
public class ConnectionPool
    extends AbstractConnectionFactory<AsynchronousConnection> {
public class ConnectionPool extends
    AbstractConnectionFactory<AsynchronousConnection>
{
  private final ConnectionFactory<?> connectionFactory;
  private volatile int numConnections;
  private final int poolSize;
  // FIXME: should use a better collection than this - CLQ?
  private final Stack<AsynchronousConnection> pool;
  private final ConcurrentLinkedQueue<PendingConnectionFuture<?>> pendingFutures;
  private final Object lock = new Object();
  private final class PooledConnectionWapper
      implements AsynchronousConnection, ConnectionEventListener {
  private final class PooledConnectionWapper implements
      AsynchronousConnection, ConnectionEventListener
  {
    private AsynchronousConnection connection;
    private PooledConnectionWapper(AsynchronousConnection connection) {
    private PooledConnectionWapper(AsynchronousConnection connection)
    {
      this.connection = connection;
      this.connection.addConnectionEventListener(this);
    }
    public void abandon(AbandonRequest request)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      connection.abandon(request);
    }
    public <P> ResultFuture<Result> add(
        AddRequest request,
    public <P> ResultFuture<Result> add(AddRequest request,
        ResultHandler<Result, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.add(request, handler, p);
    }
    public <P> ResultFuture<BindResult> bind(
        BindRequest request, ResultHandler<? super BindResult, P> handler, P p)
    public <P> ResultFuture<BindResult> bind(BindRequest request,
        ResultHandler<? super BindResult, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.bind(request, handler, p);
    }
    public void close() {
      synchronized (lock) {
        try {
    public void close()
    {
      synchronized (lock)
      {
        try
        {
          // Don't put closed connections back in the pool.
          if (connection.isClosed()) {
          if (connection.isClosed())
          {
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
            {
              StaticUtils.DEBUG_LOG.finest(String
                  .format("Dead connection released to pool. " +
                  "numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures.size()));
              StaticUtils.DEBUG_LOG
                  .finest(String
                      .format(
                          "Dead connection released to pool. "
                              + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                          numConnections, pool.size(), pendingFutures
                              .size()));
            }
            return;
          }
          // See if there waiters pending
          PendingConnectionFuture<?> future = pendingFutures.poll();
          if (future != null) {
            PooledConnectionWapper pooledConnection =
                new PooledConnectionWapper(connection);
          if (future != null)
          {
            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                connection);
            future.connection(pooledConnection);
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
            {
              StaticUtils.DEBUG_LOG.finest(String
                  .format("Connection released to pool and directly " +
                  "given to waiter. numConnections: %d, poolSize: %d, " +
                  "pendingFutures: %d", numConnections, pool.size(),
                  pendingFutures.size()));
              StaticUtils.DEBUG_LOG
                  .finest(String
                      .format(
                          "Connection released to pool and directly "
                              + "given to waiter. numConnections: %d, poolSize: %d, "
                              + "pendingFutures: %d", numConnections,
                          pool.size(), pendingFutures.size()));
            }
            return;
          }
@@ -132,99 +168,229 @@
          pool.push(connection);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG.finest(String
                .format("Connection released to pool. " +
                "numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures.size()));
            StaticUtils.DEBUG_LOG
                .finest(String
                    .format(
                        "Connection released to pool. "
                            + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                        numConnections, pool.size(), pendingFutures
                            .size()));
          }
        }
        finally {
        finally
        {
          // Null out the underlying connection to prevent further use.
          connection = null;
        }
      }
    }
    public void close(UnbindRequest request, String reason)
        throws NullPointerException {
        throws NullPointerException
    {
      close();
    }
    public <P> ResultFuture<CompareResult> compare(
        CompareRequest request, ResultHandler<? super CompareResult, P> handler,
        P p) throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        CompareRequest request,
        ResultHandler<? super CompareResult, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.compare(request, handler, p);
    }
    public <P> ResultFuture<Result> delete(
        DeleteRequest request, ResultHandler<Result, P> handler, P p)
    public <P> ResultFuture<Result> delete(DeleteRequest request,
        ResultHandler<Result, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.delete(request, handler, p);
    }
    public <R extends Result, P> ResultFuture<R> extendedRequest(
        ExtendedRequest<R> request, ResultHandler<? super R, P> handler, P p)
        ExtendedRequest<R> request,
        ResultHandler<? super R, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.extendedRequest(request, handler, p);
    }
    public <P> ResultFuture<Result> modify(
        ModifyRequest request, ResultHandler<Result, P> handler, P p)
    public <P> ResultFuture<Result> modify(ModifyRequest request,
        ResultHandler<Result, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.modify(request, handler, p);
    }
    public <P> ResultFuture<Result> modifyDN(
        ModifyDNRequest request, ResultHandler<Result, P> handler, P p)
    public <P> ResultFuture<Result> modifyDN(ModifyDNRequest request,
        ResultHandler<Result, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.modifyDN(request, handler, p);
    }
    public <P> ResultFuture<Result> search(
        SearchRequest request, ResultHandler<Result, P> resultHandler,
    public <P> ResultFuture<Result> search(SearchRequest request,
        ResultHandler<Result, P> resultHandler,
        SearchResultHandler<P> searchResulthandler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException {
      if (connection == null) {
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.search(request, resultHandler, searchResulthandler, p);
      return connection.search(request, resultHandler,
          searchResulthandler, p);
    }
    public void addConnectionEventListener(ConnectionEventListener listener)
        throws IllegalStateException, NullPointerException {
      if (connection == null) {
    /**
     * {@inheritDoc}
     */
    public <P> ResultFuture<SearchResultEntry> readEntry(DN name,
        Collection<String> attributeDescriptions,
        ResultHandler<? super SearchResultEntry, P> resultHandler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.readEntry(name, attributeDescriptions,
          resultHandler, p);
    }
    /**
     * {@inheritDoc}
     */
    public <P> ResultFuture<SearchResultEntry> searchSingleEntry(
        SearchRequest request,
        ResultHandler<? super SearchResultEntry, P> resultHandler, P p)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.searchSingleEntry(request, resultHandler, p);
    }
    /**
     * {@inheritDoc}
     */
    public <P> ResultFuture<RootDSE> readRootDSE(
        ResultHandler<RootDSE, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.readRootDSE(handler, p);
    }
    /**
     * {@inheritDoc}
     */
    public <P> ResultFuture<Schema> readSchemaForEntry(DN name,
        ResultHandler<Schema, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.readSchemaForEntry(name, handler, p);
    }
    /**
     * {@inheritDoc}
     */
    public <P> ResultFuture<Schema> readSchema(DN name,
        ResultHandler<Schema, P> handler, P p)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
      return connection.readSchema(name, handler, p);
    }
    public void addConnectionEventListener(
        ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
    }
    public void removeConnectionEventListener(ConnectionEventListener listener)
        throws NullPointerException {
      if (connection == null) {
    public void removeConnectionEventListener(
        ConnectionEventListener listener) throws NullPointerException
    {
      if (connection == null)
      {
        throw new IllegalStateException();
      }
    }
    /**
     * {@inheritDoc}
     */
@@ -233,14 +399,21 @@
      return connection == null;
    }
    public void connectionReceivedUnsolicitedNotification(
        GenericExtendedResult notification) {
        GenericExtendedResult notification)
    {
      // Ignore
    }
    public void connectionErrorOccurred(
        boolean isDisconnectNotification, ErrorResultException error) {
      synchronized (lock) {
        boolean isDisconnectNotification, ErrorResultException error)
    {
      synchronized (lock)
      {
        // Remove this connection from the pool if its in there
        pool.remove(this);
        numConnections--;
@@ -252,104 +425,166 @@
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String
              .format("Connection error occured: " + error.getMessage() +
              " numConnections: %d, poolSize: %d, pendingFutures: %d",
              numConnections, pool.size(), pendingFutures.size()));
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "Connection error occured: "
                          + error.getMessage()
                          + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
      }
    }
  }
  private static final class CompletedConnectionFuture
      implements ConnectionFuture<AsynchronousConnection> {
  private static final class CompletedConnectionFuture implements
      ConnectionFuture<AsynchronousConnection>
  {
    private final PooledConnectionWapper connection;
    private CompletedConnectionFuture(PooledConnectionWapper connection) {
    private CompletedConnectionFuture(PooledConnectionWapper connection)
    {
      this.connection = connection;
    }
    public boolean cancel(boolean mayInterruptIfRunning) {
    public boolean cancel(boolean mayInterruptIfRunning)
    {
      return false;
    }
    public AsynchronousConnection get()
        throws InterruptedException, ErrorResultException {
    public AsynchronousConnection get() throws InterruptedException,
        ErrorResultException
    {
      return connection;
    }
    public AsynchronousConnection get(long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException, ErrorResultException {
        throws InterruptedException, TimeoutException,
        ErrorResultException
    {
      return connection;
    }
    public boolean isCancelled() {
    public boolean isCancelled()
    {
      return false;
    }
    public boolean isDone() {
    public boolean isDone()
    {
      return true;
    }
  }
  private final class PendingConnectionFuture<P>
      implements ConnectionFuture<AsynchronousConnection> {
  private final class PendingConnectionFuture<P> implements
      ConnectionFuture<AsynchronousConnection>
  {
    private volatile boolean isCancelled;
    private volatile PooledConnectionWapper connection;
    private volatile ErrorResultException err;
    private final ConnectionResultHandler<? super AsynchronousConnection, P>
        handler;
    private final ConnectionResultHandler<? super AsynchronousConnection, P> handler;
    private final P p;
    private final CountDownLatch latch = new CountDownLatch(1);
    private PendingConnectionFuture(
        P p,
        ConnectionResultHandler<? super AsynchronousConnection, P> handler) {
        ConnectionResultHandler<? super AsynchronousConnection, P> handler)
    {
      this.handler = handler;
      this.p = p;
    }
    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
    public synchronized boolean cancel(boolean mayInterruptIfRunning)
    {
      return pendingFutures.remove(this) && (isCancelled = true);
    }
    public AsynchronousConnection get()
        throws InterruptedException, ErrorResultException {
    public AsynchronousConnection get() throws InterruptedException,
        ErrorResultException
    {
      latch.await();
      if (err != null) {
      if (err != null)
      {
        throw err;
      }
      return connection;
    }
    public AsynchronousConnection get(long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException, ErrorResultException {
        throws InterruptedException, TimeoutException,
        ErrorResultException
    {
      latch.await(timeout, unit);
      if (err != null) {
      if (err != null)
      {
        throw err;
      }
      return connection;
    }
    public synchronized boolean isCancelled() {
    public synchronized boolean isCancelled()
    {
      return isCancelled;
    }
    public boolean isDone() {
    public boolean isDone()
    {
      return latch.getCount() == 0;
    }
    private void connection(PooledConnectionWapper connection) {
    private void connection(PooledConnectionWapper connection)
    {
      this.connection = connection;
      if (handler != null) {
      if (handler != null)
      {
        handler.handleConnection(p, connection);
      }
      latch.countDown();
    }
    private void error(ErrorResultException e) {
    private void error(ErrorResultException e)
    {
      this.err = e;
      if (handler != null) {
      if (handler != null)
      {
        handler.handleConnectionError(p, e);
      }
      latch.countDown();
@@ -368,83 +603,115 @@
   * @param poolSize
   *          The maximum size of the connection pool.
   */
  public ConnectionPool(ConnectionFactory<?> connectionFactory, int poolSize) {
  public ConnectionPool(ConnectionFactory<?> connectionFactory,
      int poolSize)
  {
    this.connectionFactory = connectionFactory;
    this.poolSize = poolSize;
    this.pool = new Stack<AsynchronousConnection>();
    this.pendingFutures = new ConcurrentLinkedQueue<PendingConnectionFuture<?>>();
  }
  private final class WrapConnectionResultHandler
      implements ConnectionResultHandler<AsynchronousConnection, Void> {
  private final class WrapConnectionResultHandler implements
      ConnectionResultHandler<AsynchronousConnection, Void>
  {
    private final PendingConnectionFuture<?> future;
    private WrapConnectionResultHandler(PendingConnectionFuture<?> future) {
    private WrapConnectionResultHandler(
        PendingConnectionFuture<?> future)
    {
      this.future = future;
    }
    public void handleConnection(
        java.lang.Void p,
        AsynchronousConnection connection) {
      PooledConnectionWapper pooledConnection =
          new PooledConnectionWapper(connection);
    public void handleConnection(java.lang.Void p,
        AsynchronousConnection connection)
    {
      PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
          connection);
      future.connection(pooledConnection);
    }
    public void handleConnectionError(
        java.lang.Void p,
        ErrorResultException error) {
    public void handleConnectionError(java.lang.Void p,
        ErrorResultException error)
    {
      future.error(error);
    }
  }
  public <P> ConnectionFuture<AsynchronousConnection> getAsynchronousConnection(
      ConnectionResultHandler<? super AsynchronousConnection, P> handler, P p) {
    synchronized (lock) {
      ConnectionResultHandler<? super AsynchronousConnection, P> handler,
      P p)
  {
    synchronized (lock)
    {
      // Check to see if we have a connection in the pool
      if (!pool.isEmpty()) {
      if (!pool.isEmpty())
      {
        AsynchronousConnection conn = pool.pop();
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String
              .format("Connection aquired from pool. " +
              "numConnections: %d, poolSize: %d, pendingFutures: %d",
              numConnections, pool.size(), pendingFutures.size()));
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "Connection aquired from pool. "
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
        PooledConnectionWapper pooledConnection =
            new PooledConnectionWapper(conn);
        if (handler != null) {
        PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
            conn);
        if (handler != null)
        {
          handler.handleConnection(p, pooledConnection);
        }
        return new CompletedConnectionFuture(pooledConnection);
      }
      PendingConnectionFuture<P> pendingFuture =
          new PendingConnectionFuture<P>(p, handler);
      // Pool was empty. Maybe a new connection if pool size is not reached
      if (numConnections < poolSize) {
      PendingConnectionFuture<P> pendingFuture = new PendingConnectionFuture<P>(
          p, handler);
      // Pool was empty. Maybe a new connection if pool size is not
      // reached
      if (numConnections < poolSize)
      {
        numConnections++;
        WrapConnectionResultHandler wrapHandler =
            new WrapConnectionResultHandler(pendingFuture);
        WrapConnectionResultHandler wrapHandler = new WrapConnectionResultHandler(
            pendingFuture);
        connectionFactory.getAsynchronousConnection(wrapHandler, null);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String
              .format("New connection established and aquired. " +
              "numConnections: %d, poolSize: %d, pendingFutures: %d",
              numConnections, pool.size(), pendingFutures.size()));
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "New connection established and aquired. "
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
      } else {
      }
      else
      {
        // Have to wait
        pendingFutures.add(pendingFuture);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String
              .format("No connections available. Wait-listed" +
              "numConnections: %d, poolSize: %d, pendingFutures: %d",
              numConnections, pool.size(), pendingFutures.size()));
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "No connections available. Wait-listed"
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
      }