From abc8483bdf6febb3c10ea2bd20083ab389da9e3d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 12 Aug 2011 22:13:21 +0000
Subject: [PATCH] Fix OPENDJ-177: Make connection pools Closeable

---
 opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java |  834 ++++-------------------------------------------------------
 1 files changed, 57 insertions(+), 777 deletions(-)

diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
index 9984939c..ac686e4 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -22,806 +22,86 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011 ForgeRock AS
+ *      Copyright 2011 ForgeRock AS
  */
 
 package org.forgerock.opendj.ldap;
 
 
 
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-
-import org.forgerock.opendj.ldap.requests.*;
-import org.forgerock.opendj.ldap.responses.*;
-
-import com.forgerock.opendj.util.*;
+import java.io.Closeable;
 
 
 
 /**
- * A simple connection pool implementation.
+ * A connection factory which maintains and re-uses a pool of connections.
+ * Connections obtained from a connection pool are returned to the connection
+ * pool when closed, although connection pool implementations may choose to
+ * physically close the connection if needed (e.g. in order to reduce the size
+ * of the pool).
+ * <p>
+ * When connection pools are no longer needed they must be explicitly closed in
+ * order to close any remaining pooled connections.
+ * <p>
+ * Since pooled connections are re-used, applications must use operations such
+ * as binds and StartTLS with extreme caution.
  */
-final class ConnectionPool extends AbstractConnectionFactory
+public interface ConnectionPool extends ConnectionFactory, Closeable
 {
-
   /**
-   * This result handler is invoked when an attempt to add a new connection to
-   * the pool completes.
+   * Releases any resources associated with this connection pool. Pooled
+   * connections will be permanently closed and this connection pool will no
+   * longer be available for use.
+   * <p>
+   * Attempts to use this connection pool after it has been closed will result
+   * in an {@code IllegalStateException}.
+   * <p>
+   * Calling {@code close} on a connection pool which is already closed has no
+   * effect.
    */
-  private final class ConnectionResultHandler implements
-      ResultHandler<AsynchronousConnection>
-  {
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void handleErrorResult(final ErrorResultException error)
-    {
-      // Connection attempt failed, so decrease the pool size.
-      currentPoolSize.release();
-
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-      {
-        StaticUtils.DEBUG_LOG.fine(String.format(
-            "Connection attempt failed: " + error.getMessage()
-                + " currentPoolSize=%d, poolSize=%d",
-            poolSize - currentPoolSize.availablePermits(), poolSize));
-      }
-
-      QueueElement holder;
-      synchronized (queue)
-      {
-        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
-        {
-          // No waiting futures.
-          return;
-        }
-        else
-        {
-          holder = queue.removeFirst();
-        }
-      }
-
-      // There was waiting future, so close it.
-      holder.getWaitingFuture().handleErrorResult(error);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void handleResult(final AsynchronousConnection connection)
-    {
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-      {
-        StaticUtils.DEBUG_LOG.fine(String.format(
-            "Connection attempt succeeded: "
-                + " currentPoolSize=%d, poolSize=%d",
-                poolSize - currentPoolSize.availablePermits(), poolSize));
-      }
-
-      publishConnection(connection);
-    }
-  }
+  void close();
 
 
 
   /**
-   * A pooled connection is passed to the client. It wraps an underlying
-   * "pooled" connection obtained from the underlying factory and lasts until
-   * the client application closes this connection. More specifically, pooled
-   * connections are not actually stored in the internal queue.
-   */
-  private final class PooledConnection implements AsynchronousConnection
-  {
-    // Connection event listeners registed against this pooled connection should
-    // have the same life time as the pooled connection.
-    private final List<ConnectionEventListener> listeners =
-      new CopyOnWriteArrayList<ConnectionEventListener>();
-
-    private final AsynchronousConnection connection;
-
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-
-
-    PooledConnection(final AsynchronousConnection connection)
-    {
-      this.connection = connection;
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Void> abandon(final AbandonRequest request)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.abandon(request);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> add(final AddRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.add(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> add(final AddRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection
-          .add(request, resultHandler, intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void addConnectionEventListener(
-        final ConnectionEventListener listener) throws IllegalStateException,
-        NullPointerException
-    {
-      Validator.ensureNotNull(listener);
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      listeners.add(listener);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<BindResult> bind(final BindRequest request,
-        final ResultHandler<? super BindResult> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.bind(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<BindResult> bind(final BindRequest request,
-        final ResultHandler<? super BindResult> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.bind(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void close()
-    {
-      if (!isClosed.compareAndSet(false, true))
-      {
-        // Already closed.
-        return;
-      }
-
-      // Don't put invalid connections back in the pool.
-      if (connection.isValid())
-      {
-        publishConnection(connection);
-      }
-      else
-      {
-        // The connection may have been disconnected by the remote server, but
-        // the server may still be available. In order to avoid leaving pending
-        // futures hanging indefinitely, we should try to reconnect immediately.
-
-        // Close the dead connection.
-        connection.close();
-
-        // Try to get a new connection to replace it.
-        factory.getAsynchronousConnection(connectionResultHandler);
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-        {
-          StaticUtils.DEBUG_LOG.warning(String.format(
-              "Connection no longer valid. "
-                  + "currentPoolSize=%d, poolSize=%d",
-                  poolSize - currentPoolSize.availablePermits(), poolSize));
-        }
-      }
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void close(final UnbindRequest request, final String reason)
-        throws NullPointerException
-    {
-      close();
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<CompareResult> compare(final CompareRequest request,
-        final ResultHandler<? super CompareResult> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.compare(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<CompareResult> compare(final CompareRequest request,
-        final ResultHandler<? super CompareResult> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.compare(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> delete(final DeleteRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.delete(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> delete(final DeleteRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.delete(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
-        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.extendedRequest(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
-        final ExtendedRequest<R> request,
-        final ResultHandler<? super R> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.extendedRequest(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Connection getSynchronousConnection()
-    {
-      return new SynchronousConnection(this);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isClosed()
-    {
-      return isClosed.get();
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isValid()
-    {
-      return connection.isValid() && !isClosed();
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> modify(final ModifyRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modify(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> modify(final ModifyRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modify(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modifyDN(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modifyDN(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<SearchResultEntry> readEntry(final DN name,
-        final Collection<String> attributeDescriptions,
-        final ResultHandler<? super SearchResultEntry> resultHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.readEntry(name, attributeDescriptions, resultHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void removeConnectionEventListener(
-        final ConnectionEventListener listener) throws NullPointerException
-    {
-      Validator.ensureNotNull(listener);
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      listeners.remove(listener);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> search(final SearchRequest request,
-        final SearchResultHandler handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.search(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> search(final SearchRequest request,
-        final SearchResultHandler resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.search(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<SearchResultEntry> searchSingleEntry(
-        final SearchRequest request,
-        final ResultHandler<? super SearchResultEntry> resultHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.searchSingleEntry(request, resultHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public String toString()
-    {
-      final StringBuilder builder = new StringBuilder();
-      builder.append("PooledConnection(");
-      builder.append(connection);
-      builder.append(')');
-      return builder.toString();
-    }
-  }
-
-
-
-  /**
-   * A queue element is either a pending connection request future awaiting an
-   * {@code AsynchronousConnection} or it is an unused
-   * {@code AsynchronousConnection} awaiting a connection request.
-   */
-  private static final class QueueElement
-  {
-    private final Object value;
-
-
-
-    QueueElement(final AsynchronousConnection connection)
-    {
-      this.value = connection;
-    }
-
-
-
-    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
-    {
-      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
-    }
-
-
-
-    AsynchronousConnection getWaitingConnection()
-    {
-      if (value instanceof AsynchronousConnection)
-      {
-        return (AsynchronousConnection) value;
-      }
-      else
-      {
-        throw new IllegalStateException();
-      }
-    }
-
-
-
-    @SuppressWarnings("unchecked")
-    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
-    {
-      if (value instanceof AsynchronousFutureResult)
-      {
-        return (AsynchronousFutureResult<AsynchronousConnection>) value;
-      }
-      else
-      {
-        throw new IllegalStateException();
-      }
-    }
-
-
-
-    boolean isWaitingFuture()
-    {
-      return value instanceof AsynchronousFutureResult;
-    }
-
-
-
-    public String toString()
-    {
-      return String.valueOf(value);
-    }
-  }
-
-
-
-  // Guarded by queue.
-  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
-
-  private final ConnectionFactory factory;
-
-  private final int poolSize;
-
-  private final Semaphore currentPoolSize;
-
-  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
-    new ConnectionResultHandler();
-
-
-
-  /**
-   * Creates a new connection pool which will maintain {@code poolSize}
-   * connections created using the provided connection factory.
+   * Obtains an asynchronous connection from this connection pool, potentially
+   * opening a new connection if needed.
+   * <p>
+   * The returned {@code FutureResult} can be used to retrieve the pooled
+   * asynchronous connection. Alternatively, if a {@code ResultHandler} is
+   * provided, the handler will be notified when the pooled connection is
+   * available and ready for use.
+   * <p>
+   * Closing the pooled connection will, depending on the connection pool
+   * implementation, return the connection to this pool without closing it.
    *
-   * @param factory
-   *          The connection factory to use for creating new connections.
-   * @param poolSize
-   *          The maximum size of the connection pool.
+   * @param handler
+   *          The completion handler, or {@code null} if no handler is to be
+   *          used.
+   * @return A future which can be used to retrieve the pooled asynchronous
+   *         connection.
+   * @throws IllegalStateException
+   *           If this connection pool has already been closed.
    */
-  ConnectionPool(final ConnectionFactory factory, final int poolSize)
-  {
-    this.factory = factory;
-    this.poolSize = poolSize;
-    this.currentPoolSize = new Semaphore(poolSize);
-  }
+  FutureResult<AsynchronousConnection> getAsynchronousConnection(
+      ResultHandler<? super AsynchronousConnection> handler);
 
 
 
   /**
-   * {@inheritDoc}
+   * Obtains a connection from this connection pool, potentially opening a new
+   * connection if needed.
+   * <p>
+   * Closing the pooled connection will, depending on the connection pool
+   * implementation, return the connection to this pool without closing it.
+   *
+   * @return A pooled connection.
+   * @throws ErrorResultException
+   *           If the connection request failed for some reason.
+   * @throws InterruptedException
+   *           If the current thread was interrupted while waiting.
+   * @throws IllegalStateException
+   *           If this connection pool has already been closed.
    */
-  @Override
-  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
-      final ResultHandler<? super AsynchronousConnection> handler)
-  {
-    QueueElement holder;
-    synchronized (queue)
-    {
-      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
-      {
-        holder = new QueueElement(handler);
-        queue.add(holder);
-      }
-      else
-      {
-        holder = queue.removeFirst();
-      }
-    }
-
-    if (!holder.isWaitingFuture())
-    {
-      // There was a completed connection attempt.
-      final AsynchronousConnection connection = holder.getWaitingConnection();
-      final PooledConnection pooledConnection = new PooledConnection(connection);
-      if (handler != null)
-      {
-        handler.handleResult(pooledConnection);
-      }
-      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
-    }
-    else
-    {
-      // Grow the pool if needed.
-      final FutureResult<AsynchronousConnection> future = holder
-          .getWaitingFuture();
-      if (!future.isDone() && currentPoolSize.tryAcquire())
-      {
-        factory.getAsynchronousConnection(connectionResultHandler);
-      }
-      return future;
-    }
-  }
-
-
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String toString()
-  {
-    final StringBuilder builder = new StringBuilder();
-    builder.append("ConnectionPool(");
-    builder.append(String.valueOf(factory));
-    builder.append(',');
-    builder.append(poolSize);
-    builder.append(')');
-    return builder.toString();
-  }
-
-
-
-  private void publishConnection(final AsynchronousConnection connection)
-  {
-    QueueElement holder;
-    synchronized (queue)
-    {
-      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
-      {
-        holder = new QueueElement(connection);
-        queue.add(holder);
-        return;
-      }
-      else
-      {
-        holder = queue.removeFirst();
-      }
-    }
-
-    // There was waiting future, so close it.
-    final PooledConnection pooledConnection = new PooledConnection(connection);
-    holder.getWaitingFuture().handleResult(pooledConnection);
-  }
+  Connection getConnection() throws ErrorResultException, InterruptedException;
 }

--
Gitblit v1.10.0