mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
13.13.2011 abc8483bdf6febb3c10ea2bd20083ab389da9e3d
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -22,806 +22,86 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 *      Copyright 2011 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.requests.*;
import org.forgerock.opendj.ldap.responses.*;
import com.forgerock.opendj.util.*;
import java.io.Closeable;
/**
 * A simple connection pool implementation.
 * A connection factory which maintains and re-uses a pool of connections.
 * Connections obtained from a connection pool are returned to the connection
 * pool when closed, although connection pool implementations may choose to
 * physically close the connection if needed (e.g. in order to reduce the size
 * of the pool).
 * <p>
 * When connection pools are no longer needed they must be explicitly closed in
 * order to close any remaining pooled connections.
 * <p>
 * Since pooled connections are re-used, applications must use operations such
 * as binds and StartTLS with extreme caution.
 */
final class ConnectionPool extends AbstractConnectionFactory
public interface ConnectionPool extends ConnectionFactory, Closeable
{
  /**
   * This result handler is invoked when an attempt to add a new connection to
   * the pool completes.
   * Releases any resources associated with this connection pool. Pooled
   * connections will be permanently closed and this connection pool will no
   * longer be available for use.
   * <p>
   * Attempts to use this connection pool after it has been closed will result
   * in an {@code IllegalStateException}.
   * <p>
   * Calling {@code close} on a connection pool which is already closed has no
   * effect.
   */
  private final class ConnectionResultHandler implements
      ResultHandler<AsynchronousConnection>
  {
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleErrorResult(final ErrorResultException error)
    {
      // Connection attempt failed, so decrease the pool size.
      currentPoolSize.release();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt failed: " + error.getMessage()
                + " currentPoolSize=%d, poolSize=%d",
            poolSize - currentPoolSize.availablePermits(), poolSize));
      }
      QueueElement holder;
      synchronized (queue)
      {
        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
        {
          // No waiting futures.
          return;
        }
        else
        {
          holder = queue.removeFirst();
        }
      }
      // There was waiting future, so close it.
      holder.getWaitingFuture().handleErrorResult(error);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleResult(final AsynchronousConnection connection)
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt succeeded: "
                + " currentPoolSize=%d, poolSize=%d",
                poolSize - currentPoolSize.availablePermits(), poolSize));
      }
      publishConnection(connection);
    }
  }
  void close();
  /**
   * A pooled connection is passed to the client. It wraps an underlying
   * "pooled" connection obtained from the underlying factory and lasts until
   * the client application closes this connection. More specifically, pooled
   * connections are not actually stored in the internal queue.
   */
  private final class PooledConnection implements AsynchronousConnection
  {
    // Connection event listeners registed against this pooled connection should
    // have the same life time as the pooled connection.
    private final List<ConnectionEventListener> listeners =
      new CopyOnWriteArrayList<ConnectionEventListener>();
    private final AsynchronousConnection connection;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    PooledConnection(final AsynchronousConnection connection)
    {
      this.connection = connection;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Void> abandon(final AbandonRequest request)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.abandon(request);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.add(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection
          .add(request, resultHandler, intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void addConnectionEventListener(
        final ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.add(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.bind(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.bind(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close()
    {
      if (!isClosed.compareAndSet(false, true))
      {
        // Already closed.
        return;
      }
      // Don't put invalid connections back in the pool.
      if (connection.isValid())
      {
        publishConnection(connection);
      }
      else
      {
        // The connection may have been disconnected by the remote server, but
        // the server may still be available. In order to avoid leaving pending
        // futures hanging indefinitely, we should try to reconnect immediately.
        // Close the dead connection.
        connection.close();
        // Try to get a new connection to replace it.
        factory.getAsynchronousConnection(connectionResultHandler);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          StaticUtils.DEBUG_LOG.warning(String.format(
              "Connection no longer valid. "
                  + "currentPoolSize=%d, poolSize=%d",
                  poolSize - currentPoolSize.availablePermits(), poolSize));
        }
      }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close(final UnbindRequest request, final String reason)
        throws NullPointerException
    {
      close();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.compare(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.compare(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.delete(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.delete(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.extendedRequest(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request,
        final ResultHandler<? super R> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.extendedRequest(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getSynchronousConnection()
    {
      return new SynchronousConnection(this);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isClosed()
    {
      return isClosed.get();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid()
    {
      return connection.isValid() && !isClosed();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modify(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modify(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modifyDN(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modifyDN(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> readEntry(final DN name,
        final Collection<String> attributeDescriptions,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.readEntry(name, attributeDescriptions, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void removeConnectionEventListener(
        final ConnectionEventListener listener) throws NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.remove(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.search(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.search(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> searchSingleEntry(
        final SearchRequest request,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.searchSingleEntry(request, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      final StringBuilder builder = new StringBuilder();
      builder.append("PooledConnection(");
      builder.append(connection);
      builder.append(')');
      return builder.toString();
    }
  }
  /**
   * A queue element is either a pending connection request future awaiting an
   * {@code AsynchronousConnection} or it is an unused
   * {@code AsynchronousConnection} awaiting a connection request.
   */
  private static final class QueueElement
  {
    private final Object value;
    QueueElement(final AsynchronousConnection connection)
    {
      this.value = connection;
    }
    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
    {
      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
    }
    AsynchronousConnection getWaitingConnection()
    {
      if (value instanceof AsynchronousConnection)
      {
        return (AsynchronousConnection) value;
      }
      else
      {
        throw new IllegalStateException();
      }
    }
    @SuppressWarnings("unchecked")
    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
    {
      if (value instanceof AsynchronousFutureResult)
      {
        return (AsynchronousFutureResult<AsynchronousConnection>) value;
      }
      else
      {
        throw new IllegalStateException();
      }
    }
    boolean isWaitingFuture()
    {
      return value instanceof AsynchronousFutureResult;
    }
    public String toString()
    {
      return String.valueOf(value);
    }
  }
  // Guarded by queue.
  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
  private final ConnectionFactory factory;
  private final int poolSize;
  private final Semaphore currentPoolSize;
  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
    new ConnectionResultHandler();
  /**
   * Creates a new connection pool which will maintain {@code poolSize}
   * connections created using the provided connection factory.
   * Obtains an asynchronous connection from this connection pool, potentially
   * opening a new connection if needed.
   * <p>
   * The returned {@code FutureResult} can be used to retrieve the pooled
   * asynchronous connection. Alternatively, if a {@code ResultHandler} is
   * provided, the handler will be notified when the pooled connection is
   * available and ready for use.
   * <p>
   * Closing the pooled connection will, depending on the connection pool
   * implementation, return the connection to this pool without closing it.
   *
   * @param factory
   *          The connection factory to use for creating new connections.
   * @param poolSize
   *          The maximum size of the connection pool.
   * @param handler
   *          The completion handler, or {@code null} if no handler is to be
   *          used.
   * @return A future which can be used to retrieve the pooled asynchronous
   *         connection.
   * @throws IllegalStateException
   *           If this connection pool has already been closed.
   */
  ConnectionPool(final ConnectionFactory factory, final int poolSize)
  {
    this.factory = factory;
    this.poolSize = poolSize;
    this.currentPoolSize = new Semaphore(poolSize);
  }
  FutureResult<AsynchronousConnection> getAsynchronousConnection(
      ResultHandler<? super AsynchronousConnection> handler);
  /**
   * {@inheritDoc}
   * Obtains a connection from this connection pool, potentially opening a new
   * connection if needed.
   * <p>
   * Closing the pooled connection will, depending on the connection pool
   * implementation, return the connection to this pool without closing it.
   *
   * @return A pooled connection.
   * @throws ErrorResultException
   *           If the connection request failed for some reason.
   * @throws InterruptedException
   *           If the current thread was interrupted while waiting.
   * @throws IllegalStateException
   *           If this connection pool has already been closed.
   */
  @Override
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<? super AsynchronousConnection> handler)
  {
    QueueElement holder;
    synchronized (queue)
    {
      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
      {
        holder = new QueueElement(handler);
        queue.add(holder);
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    if (!holder.isWaitingFuture())
    {
      // There was a completed connection attempt.
      final AsynchronousConnection connection = holder.getWaitingConnection();
      final PooledConnection pooledConnection = new PooledConnection(connection);
      if (handler != null)
      {
        handler.handleResult(pooledConnection);
      }
      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
    }
    else
    {
      // Grow the pool if needed.
      final FutureResult<AsynchronousConnection> future = holder
          .getWaitingFuture();
      if (!future.isDone() && currentPoolSize.tryAcquire())
      {
        factory.getAsynchronousConnection(connectionResultHandler);
      }
      return future;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    final StringBuilder builder = new StringBuilder();
    builder.append("ConnectionPool(");
    builder.append(String.valueOf(factory));
    builder.append(',');
    builder.append(poolSize);
    builder.append(')');
    return builder.toString();
  }
  private void publishConnection(final AsynchronousConnection connection)
  {
    QueueElement holder;
    synchronized (queue)
    {
      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
      {
        holder = new QueueElement(connection);
        queue.add(holder);
        return;
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    // There was waiting future, so close it.
    final PooledConnection pooledConnection = new PooledConnection(connection);
    holder.getWaitingFuture().handleResult(pooledConnection);
  }
  Connection getConnection() throws ErrorResultException, InterruptedException;
}