From 6e60c3e9b573d818a8e2f20b9b8d17bf92245799 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 12 Aug 2011 22:13:21 +0000
Subject: [PATCH] Fix OPENDJ-177: Make connection pools Closeable

---
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties           |    2 
 opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java |  139 +++
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java       |  944 ++++++++++++++++++++++++++++
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java            |  834 +-----------------------
 opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java   |    4 
 opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java               |    5 
 opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java      |   18 
 7 files changed, 1,149 insertions(+), 797 deletions(-)

diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java b/opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java
index 2d35f6f..ae2d8f7 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java
@@ -559,10 +559,10 @@
       final String remoteAddress = args[i];
       final int remotePort = Integer.parseInt(args[i + 1]);
 
-      factories.add(Connections.newConnectionPool(
+      factories.add(Connections.newFixedConnectionPool(
           new LDAPConnectionFactory(remoteAddress, remotePort),
           Integer.MAX_VALUE));
-      bindFactories.add(Connections.newConnectionPool(
+      bindFactories.add(Connections.newFixedConnectionPool(
           new LDAPConnectionFactory(remoteAddress, remotePort),
           Integer.MAX_VALUE));
     }
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
index 9984939c..ac686e4 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -22,806 +22,86 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011 ForgeRock AS
+ *      Copyright 2011 ForgeRock AS
  */
 
 package org.forgerock.opendj.ldap;
 
 
 
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-
-import org.forgerock.opendj.ldap.requests.*;
-import org.forgerock.opendj.ldap.responses.*;
-
-import com.forgerock.opendj.util.*;
+import java.io.Closeable;
 
 
 
 /**
- * A simple connection pool implementation.
+ * A connection factory which maintains and re-uses a pool of connections.
+ * Connections obtained from a connection pool are returned to the connection
+ * pool when closed, although connection pool implementations may choose to
+ * physically close the connection if needed (e.g. in order to reduce the size
+ * of the pool).
+ * <p>
+ * When connection pools are no longer needed they must be explicitly closed in
+ * order to close any remaining pooled connections.
+ * <p>
+ * Since pooled connections are re-used, applications must use operations such
+ * as binds and StartTLS with extreme caution.
  */
-final class ConnectionPool extends AbstractConnectionFactory
+public interface ConnectionPool extends ConnectionFactory, Closeable
 {
-
   /**
-   * This result handler is invoked when an attempt to add a new connection to
-   * the pool completes.
+   * Releases any resources associated with this connection pool. Pooled
+   * connections will be permanently closed and this connection pool will no
+   * longer be available for use.
+   * <p>
+   * Attempts to use this connection pool after it has been closed will result
+   * in an {@code IllegalStateException}.
+   * <p>
+   * Calling {@code close} on a connection pool which is already closed has no
+   * effect.
    */
-  private final class ConnectionResultHandler implements
-      ResultHandler<AsynchronousConnection>
-  {
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void handleErrorResult(final ErrorResultException error)
-    {
-      // Connection attempt failed, so decrease the pool size.
-      currentPoolSize.release();
-
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-      {
-        StaticUtils.DEBUG_LOG.fine(String.format(
-            "Connection attempt failed: " + error.getMessage()
-                + " currentPoolSize=%d, poolSize=%d",
-            poolSize - currentPoolSize.availablePermits(), poolSize));
-      }
-
-      QueueElement holder;
-      synchronized (queue)
-      {
-        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
-        {
-          // No waiting futures.
-          return;
-        }
-        else
-        {
-          holder = queue.removeFirst();
-        }
-      }
-
-      // There was waiting future, so close it.
-      holder.getWaitingFuture().handleErrorResult(error);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void handleResult(final AsynchronousConnection connection)
-    {
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-      {
-        StaticUtils.DEBUG_LOG.fine(String.format(
-            "Connection attempt succeeded: "
-                + " currentPoolSize=%d, poolSize=%d",
-                poolSize - currentPoolSize.availablePermits(), poolSize));
-      }
-
-      publishConnection(connection);
-    }
-  }
+  void close();
 
 
 
   /**
-   * A pooled connection is passed to the client. It wraps an underlying
-   * "pooled" connection obtained from the underlying factory and lasts until
-   * the client application closes this connection. More specifically, pooled
-   * connections are not actually stored in the internal queue.
-   */
-  private final class PooledConnection implements AsynchronousConnection
-  {
-    // Connection event listeners registed against this pooled connection should
-    // have the same life time as the pooled connection.
-    private final List<ConnectionEventListener> listeners =
-      new CopyOnWriteArrayList<ConnectionEventListener>();
-
-    private final AsynchronousConnection connection;
-
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-
-
-    PooledConnection(final AsynchronousConnection connection)
-    {
-      this.connection = connection;
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Void> abandon(final AbandonRequest request)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.abandon(request);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> add(final AddRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.add(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> add(final AddRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection
-          .add(request, resultHandler, intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void addConnectionEventListener(
-        final ConnectionEventListener listener) throws IllegalStateException,
-        NullPointerException
-    {
-      Validator.ensureNotNull(listener);
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      listeners.add(listener);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<BindResult> bind(final BindRequest request,
-        final ResultHandler<? super BindResult> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.bind(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<BindResult> bind(final BindRequest request,
-        final ResultHandler<? super BindResult> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.bind(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void close()
-    {
-      if (!isClosed.compareAndSet(false, true))
-      {
-        // Already closed.
-        return;
-      }
-
-      // Don't put invalid connections back in the pool.
-      if (connection.isValid())
-      {
-        publishConnection(connection);
-      }
-      else
-      {
-        // The connection may have been disconnected by the remote server, but
-        // the server may still be available. In order to avoid leaving pending
-        // futures hanging indefinitely, we should try to reconnect immediately.
-
-        // Close the dead connection.
-        connection.close();
-
-        // Try to get a new connection to replace it.
-        factory.getAsynchronousConnection(connectionResultHandler);
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-        {
-          StaticUtils.DEBUG_LOG.warning(String.format(
-              "Connection no longer valid. "
-                  + "currentPoolSize=%d, poolSize=%d",
-                  poolSize - currentPoolSize.availablePermits(), poolSize));
-        }
-      }
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void close(final UnbindRequest request, final String reason)
-        throws NullPointerException
-    {
-      close();
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<CompareResult> compare(final CompareRequest request,
-        final ResultHandler<? super CompareResult> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.compare(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<CompareResult> compare(final CompareRequest request,
-        final ResultHandler<? super CompareResult> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.compare(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> delete(final DeleteRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.delete(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> delete(final DeleteRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.delete(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
-        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.extendedRequest(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
-        final ExtendedRequest<R> request,
-        final ResultHandler<? super R> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.extendedRequest(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Connection getSynchronousConnection()
-    {
-      return new SynchronousConnection(this);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isClosed()
-    {
-      return isClosed.get();
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isValid()
-    {
-      return connection.isValid() && !isClosed();
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> modify(final ModifyRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modify(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> modify(final ModifyRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modify(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modifyDN(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modifyDN(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<SearchResultEntry> readEntry(final DN name,
-        final Collection<String> attributeDescriptions,
-        final ResultHandler<? super SearchResultEntry> resultHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.readEntry(name, attributeDescriptions, resultHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void removeConnectionEventListener(
-        final ConnectionEventListener listener) throws NullPointerException
-    {
-      Validator.ensureNotNull(listener);
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      listeners.remove(listener);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> search(final SearchRequest request,
-        final SearchResultHandler handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.search(request, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<Result> search(final SearchRequest request,
-        final SearchResultHandler resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.search(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public FutureResult<SearchResultEntry> searchSingleEntry(
-        final SearchRequest request,
-        final ResultHandler<? super SearchResultEntry> resultHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.searchSingleEntry(request, resultHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public String toString()
-    {
-      final StringBuilder builder = new StringBuilder();
-      builder.append("PooledConnection(");
-      builder.append(connection);
-      builder.append(')');
-      return builder.toString();
-    }
-  }
-
-
-
-  /**
-   * A queue element is either a pending connection request future awaiting an
-   * {@code AsynchronousConnection} or it is an unused
-   * {@code AsynchronousConnection} awaiting a connection request.
-   */
-  private static final class QueueElement
-  {
-    private final Object value;
-
-
-
-    QueueElement(final AsynchronousConnection connection)
-    {
-      this.value = connection;
-    }
-
-
-
-    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
-    {
-      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
-    }
-
-
-
-    AsynchronousConnection getWaitingConnection()
-    {
-      if (value instanceof AsynchronousConnection)
-      {
-        return (AsynchronousConnection) value;
-      }
-      else
-      {
-        throw new IllegalStateException();
-      }
-    }
-
-
-
-    @SuppressWarnings("unchecked")
-    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
-    {
-      if (value instanceof AsynchronousFutureResult)
-      {
-        return (AsynchronousFutureResult<AsynchronousConnection>) value;
-      }
-      else
-      {
-        throw new IllegalStateException();
-      }
-    }
-
-
-
-    boolean isWaitingFuture()
-    {
-      return value instanceof AsynchronousFutureResult;
-    }
-
-
-
-    public String toString()
-    {
-      return String.valueOf(value);
-    }
-  }
-
-
-
-  // Guarded by queue.
-  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
-
-  private final ConnectionFactory factory;
-
-  private final int poolSize;
-
-  private final Semaphore currentPoolSize;
-
-  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
-    new ConnectionResultHandler();
-
-
-
-  /**
-   * Creates a new connection pool which will maintain {@code poolSize}
-   * connections created using the provided connection factory.
+   * Obtains an asynchronous connection from this connection pool, potentially
+   * opening a new connection if needed.
+   * <p>
+   * The returned {@code FutureResult} can be used to retrieve the pooled
+   * asynchronous connection. Alternatively, if a {@code ResultHandler} is
+   * provided, the handler will be notified when the pooled connection is
+   * available and ready for use.
+   * <p>
+   * Closing the pooled connection will, depending on the connection pool
+   * implementation, return the connection to this pool without closing it.
    *
-   * @param factory
-   *          The connection factory to use for creating new connections.
-   * @param poolSize
-   *          The maximum size of the connection pool.
+   * @param handler
+   *          The completion handler, or {@code null} if no handler is to be
+   *          used.
+   * @return A future which can be used to retrieve the pooled asynchronous
+   *         connection.
+   * @throws IllegalStateException
+   *           If this connection pool has already been closed.
    */
-  ConnectionPool(final ConnectionFactory factory, final int poolSize)
-  {
-    this.factory = factory;
-    this.poolSize = poolSize;
-    this.currentPoolSize = new Semaphore(poolSize);
-  }
+  FutureResult<AsynchronousConnection> getAsynchronousConnection(
+      ResultHandler<? super AsynchronousConnection> handler);
 
 
 
   /**
-   * {@inheritDoc}
+   * Obtains a connection from this connection pool, potentially opening a new
+   * connection if needed.
+   * <p>
+   * Closing the pooled connection will, depending on the connection pool
+   * implementation, return the connection to this pool without closing it.
+   *
+   * @return A pooled connection.
+   * @throws ErrorResultException
+   *           If the connection request failed for some reason.
+   * @throws InterruptedException
+   *           If the current thread was interrupted while waiting.
+   * @throws IllegalStateException
+   *           If this connection pool has already been closed.
    */
-  @Override
-  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
-      final ResultHandler<? super AsynchronousConnection> handler)
-  {
-    QueueElement holder;
-    synchronized (queue)
-    {
-      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
-      {
-        holder = new QueueElement(handler);
-        queue.add(holder);
-      }
-      else
-      {
-        holder = queue.removeFirst();
-      }
-    }
-
-    if (!holder.isWaitingFuture())
-    {
-      // There was a completed connection attempt.
-      final AsynchronousConnection connection = holder.getWaitingConnection();
-      final PooledConnection pooledConnection = new PooledConnection(connection);
-      if (handler != null)
-      {
-        handler.handleResult(pooledConnection);
-      }
-      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
-    }
-    else
-    {
-      // Grow the pool if needed.
-      final FutureResult<AsynchronousConnection> future = holder
-          .getWaitingFuture();
-      if (!future.isDone() && currentPoolSize.tryAcquire())
-      {
-        factory.getAsynchronousConnection(connectionResultHandler);
-      }
-      return future;
-    }
-  }
-
-
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String toString()
-  {
-    final StringBuilder builder = new StringBuilder();
-    builder.append("ConnectionPool(");
-    builder.append(String.valueOf(factory));
-    builder.append(',');
-    builder.append(poolSize);
-    builder.append(')');
-    return builder.toString();
-  }
-
-
-
-  private void publishConnection(final AsynchronousConnection connection)
-  {
-    QueueElement holder;
-    synchronized (queue)
-    {
-      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
-      {
-        holder = new QueueElement(connection);
-        queue.add(holder);
-        return;
-      }
-      else
-      {
-        holder = queue.removeFirst();
-      }
-    }
-
-    // There was waiting future, so close it.
-    final PooledConnection pooledConnection = new PooledConnection(connection);
-    holder.getWaitingFuture().handleResult(pooledConnection);
-  }
+  Connection getConnection() throws ErrorResultException, InterruptedException;
 }
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
index 7b1c24c..0429693 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2011 ForgeRock AS
  */
 
 package org.forgerock.opendj.ldap;
@@ -92,13 +93,13 @@
    * @throws NullPointerException
    *           If {@code factory} was {@code null}.
    */
-  public static ConnectionFactory newConnectionPool(
+  public static ConnectionPool newFixedConnectionPool(
       final ConnectionFactory factory, final int poolSize)
       throws IllegalArgumentException, NullPointerException
   {
     Validator.ensureNotNull(factory);
     Validator.ensureTrue(poolSize >= 0, "negative pool size");
-    return new ConnectionPool(factory, poolSize);
+    return new FixedConnectionPool(factory, poolSize);
   }
 
 
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
new file mode 100644
index 0000000..f20244f
--- /dev/null
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -0,0 +1,944 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opendj3/legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opendj3/legal-notices/CDDLv1_0.txt.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2009-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2011 ForgeRock AS
+ */
+
+package org.forgerock.opendj.ldap;
+
+
+
+import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+
+import org.forgerock.opendj.ldap.requests.*;
+import org.forgerock.opendj.ldap.responses.*;
+
+import com.forgerock.opendj.util.*;
+
+
+
+/**
+ * A simple connection pool implementation which maintains a fixed number of
+ * connections.
+ */
+final class FixedConnectionPool extends AbstractConnectionFactory implements
+    ConnectionPool
+{
+
+  /**
+   * This result handler is invoked when an attempt to add a new connection to
+   * the pool completes.
+   */
+  private final class ConnectionResultHandler implements
+      ResultHandler<AsynchronousConnection>
+  {
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleErrorResult(final ErrorResultException error)
+    {
+      // Connection attempt failed, so decrease the pool size.
+      currentPoolSize.release();
+
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: "
+            + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize
+            - currentPoolSize.availablePermits(), poolSize));
+      }
+
+      QueueElement holder;
+      synchronized (queue)
+      {
+        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
+        {
+          // No waiting futures.
+          return;
+        }
+        else
+        {
+          holder = queue.removeFirst();
+        }
+      }
+
+      // There was waiting future, so close it.
+      holder.getWaitingFuture().handleErrorResult(error);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleResult(final AsynchronousConnection connection)
+    {
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG.fine(String.format(
+            "Connection attempt succeeded: "
+                + " currentPoolSize=%d, poolSize=%d", poolSize
+                - currentPoolSize.availablePermits(), poolSize));
+      }
+
+      publishConnection(connection);
+    }
+  }
+
+
+
+  /**
+   * A pooled connection is passed to the client. It wraps an underlying
+   * "pooled" connection obtained from the underlying factory and lasts until
+   * the client application closes this connection. More specifically, pooled
+   * connections are not actually stored in the internal queue.
+   */
+  private final class PooledConnection implements AsynchronousConnection
+  {
+    // Connection event listeners registed against this pooled connection should
+    // have the same life time as the pooled connection.
+    private final List<ConnectionEventListener> listeners =
+      new CopyOnWriteArrayList<ConnectionEventListener>();
+
+    private final AsynchronousConnection connection;
+
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+
+
+    PooledConnection(final AsynchronousConnection connection)
+    {
+      this.connection = connection;
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Void> abandon(final AbandonRequest request)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.abandon(request);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> add(final AddRequest request,
+        final ResultHandler<? super Result> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.add(request, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> add(final AddRequest request,
+        final ResultHandler<? super Result> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection
+          .add(request, resultHandler, intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void addConnectionEventListener(
+        final ConnectionEventListener listener) throws IllegalStateException,
+        NullPointerException
+    {
+      Validator.ensureNotNull(listener);
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      listeners.add(listener);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<BindResult> bind(final BindRequest request,
+        final ResultHandler<? super BindResult> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.bind(request, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<BindResult> bind(final BindRequest request,
+        final ResultHandler<? super BindResult> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.bind(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void close()
+    {
+      if (!isClosed.compareAndSet(false, true))
+      {
+        // Already closed.
+        return;
+      }
+
+      // Don't put invalid connections back in the pool.
+      if (connection.isValid())
+      {
+        publishConnection(connection);
+      }
+      else
+      {
+        // The connection may have been disconnected by the remote server, but
+        // the server may still be available. In order to avoid leaving pending
+        // futures hanging indefinitely, we should try to reconnect immediately.
+
+        // Close the dead connection.
+        connection.close();
+
+        // Try to get a new connection to replace it.
+        factory.getAsynchronousConnection(connectionResultHandler);
+
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+        {
+          StaticUtils.DEBUG_LOG.warning(String.format(
+              "Connection no longer valid. "
+                  + "currentPoolSize=%d, poolSize=%d", poolSize
+                  - currentPoolSize.availablePermits(), poolSize));
+        }
+      }
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void close(final UnbindRequest request, final String reason)
+        throws NullPointerException
+    {
+      close();
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<CompareResult> compare(final CompareRequest request,
+        final ResultHandler<? super CompareResult> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.compare(request, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<CompareResult> compare(final CompareRequest request,
+        final ResultHandler<? super CompareResult> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.compare(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> delete(final DeleteRequest request,
+        final ResultHandler<? super Result> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.delete(request, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> delete(final DeleteRequest request,
+        final ResultHandler<? super Result> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.delete(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.extendedRequest(request, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+        final ExtendedRequest<R> request,
+        final ResultHandler<? super R> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.extendedRequest(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Connection getSynchronousConnection()
+    {
+      return new SynchronousConnection(this);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isClosed()
+    {
+      return isClosed.get();
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isValid()
+    {
+      return connection.isValid() && !isClosed();
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> modify(final ModifyRequest request,
+        final ResultHandler<? super Result> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.modify(request, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> modify(final ModifyRequest request,
+        final ResultHandler<? super Result> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.modify(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
+        final ResultHandler<? super Result> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.modifyDN(request, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
+        final ResultHandler<? super Result> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.modifyDN(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<SearchResultEntry> readEntry(final DN name,
+        final Collection<String> attributeDescriptions,
+        final ResultHandler<? super SearchResultEntry> resultHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.readEntry(name, attributeDescriptions, resultHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void removeConnectionEventListener(
+        final ConnectionEventListener listener) throws NullPointerException
+    {
+      Validator.ensureNotNull(listener);
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      listeners.remove(listener);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> search(final SearchRequest request,
+        final SearchResultHandler handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.search(request, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<Result> search(final SearchRequest request,
+        final SearchResultHandler resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.search(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public FutureResult<SearchResultEntry> searchSingleEntry(
+        final SearchRequest request,
+        final ResultHandler<? super SearchResultEntry> resultHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.searchSingleEntry(request, resultHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString()
+    {
+      final StringBuilder builder = new StringBuilder();
+      builder.append("PooledConnection(");
+      builder.append(connection);
+      builder.append(')');
+      return builder.toString();
+    }
+  }
+
+
+
+  /**
+   * A queue element is either a pending connection request future awaiting an
+   * {@code AsynchronousConnection} or it is an unused
+   * {@code AsynchronousConnection} awaiting a connection request.
+   */
+  private static final class QueueElement
+  {
+    private final Object value;
+
+
+
+    QueueElement(final AsynchronousConnection connection)
+    {
+      this.value = connection;
+    }
+
+
+
+    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
+    {
+      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
+    }
+
+
+
+    @Override
+    public String toString()
+    {
+      return String.valueOf(value);
+    }
+
+
+
+    AsynchronousConnection getWaitingConnection()
+    {
+      if (value instanceof AsynchronousConnection)
+      {
+        return (AsynchronousConnection) value;
+      }
+      else
+      {
+        throw new IllegalStateException();
+      }
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
+    {
+      if (value instanceof AsynchronousFutureResult)
+      {
+        return (AsynchronousFutureResult<AsynchronousConnection>) value;
+      }
+      else
+      {
+        throw new IllegalStateException();
+      }
+    }
+
+
+
+    boolean isWaitingFuture()
+    {
+      return value instanceof AsynchronousFutureResult;
+    }
+  }
+
+
+
+  // Guarded by queue.
+  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
+
+  // Guarded by queue.
+  private boolean isClosed = false;
+
+  private final ConnectionFactory factory;
+
+  private final int poolSize;
+
+  private final Semaphore currentPoolSize;
+
+  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
+    new ConnectionResultHandler();
+
+
+
+  /**
+   * Creates a new connection pool which will maintain {@code poolSize}
+   * connections created using the provided connection factory.
+   *
+   * @param factory
+   *          The connection factory to use for creating new connections.
+   * @param poolSize
+   *          The maximum size of the connection pool.
+   */
+  FixedConnectionPool(final ConnectionFactory factory, final int poolSize)
+  {
+    this.factory = factory;
+    this.poolSize = poolSize;
+    this.currentPoolSize = new Semaphore(poolSize);
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close()
+  {
+    final LinkedList<AsynchronousConnection> idleConnections;
+    synchronized (queue)
+    {
+      if (isClosed)
+      {
+        return;
+      }
+      isClosed = true;
+
+      // Remove any connections which are waiting in the queue as these can be
+      // closed immediately.
+      idleConnections = new LinkedList<AsynchronousConnection>();
+      while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture())
+      {
+        final QueueElement holder = queue.removeFirst();
+        idleConnections.add(holder.getWaitingConnection());
+      }
+    }
+
+    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+    {
+      StaticUtils.DEBUG_LOG.fine(String.format(
+          "Connection pool is closing: currentPoolSize=%d, poolSize=%d",
+          poolSize - currentPoolSize.availablePermits(), poolSize));
+    }
+
+    // Close the idle connections.
+    for (final AsynchronousConnection connection : idleConnections)
+    {
+      closeConnection(connection);
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
+      final ResultHandler<? super AsynchronousConnection> handler)
+  {
+    QueueElement holder;
+    synchronized (queue)
+    {
+      if (isClosed)
+      {
+        throw new IllegalStateException("FixedConnectionPool is already closed");
+      }
+
+      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
+      {
+        holder = new QueueElement(handler);
+        queue.add(holder);
+      }
+      else
+      {
+        holder = queue.removeFirst();
+      }
+    }
+
+    if (!holder.isWaitingFuture())
+    {
+      // There was a completed connection attempt.
+      final AsynchronousConnection connection = holder.getWaitingConnection();
+      final PooledConnection pooledConnection = new PooledConnection(connection);
+      if (handler != null)
+      {
+        handler.handleResult(pooledConnection);
+      }
+      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+    }
+    else
+    {
+      // Grow the pool if needed.
+      final FutureResult<AsynchronousConnection> future = holder
+          .getWaitingFuture();
+      if (!future.isDone() && currentPoolSize.tryAcquire())
+      {
+        factory.getAsynchronousConnection(connectionResultHandler);
+      }
+      return future;
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString()
+  {
+    final StringBuilder builder = new StringBuilder();
+    builder.append("FixedConnectionPool(");
+    builder.append(String.valueOf(factory));
+    builder.append(',');
+    builder.append(poolSize);
+    builder.append(')');
+    return builder.toString();
+  }
+
+
+
+  /**
+   * Provide a finalizer because connection pools are expensive resources to
+   * accidentally leave around. Also, since they won't be created all that
+   * frequently, there's little risk of overloading the finalizer.
+   */
+  @Override
+  protected void finalize() throws Throwable
+  {
+    close();
+  }
+
+
+
+  private void closeConnection(final AsynchronousConnection connection)
+  {
+    // The connection will be closed, so decrease the pool size.
+    currentPoolSize.release();
+    connection.close();
+
+    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+    {
+      StaticUtils.DEBUG_LOG.fine(String.format(
+          "Closing connection because connection pool is closing: "
+              + " currentPoolSize=%d, poolSize=%d",
+          poolSize - currentPoolSize.availablePermits(), poolSize));
+    }
+  }
+
+
+
+  private void publishConnection(final AsynchronousConnection connection)
+  {
+    final QueueElement holder;
+    boolean connectionPoolIsClosing = false;
+
+    synchronized (queue)
+    {
+      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
+      {
+        if (isClosed)
+        {
+          connectionPoolIsClosing = true;
+          holder = null;
+        }
+        else
+        {
+          holder = new QueueElement(connection);
+          queue.add(holder);
+          return;
+        }
+      }
+      else
+      {
+        connectionPoolIsClosing = isClosed;
+        holder = queue.removeFirst();
+      }
+    }
+
+    // There was waiting future, so complete it.
+    if (connectionPoolIsClosing)
+    {
+      closeConnection(connection);
+
+      if (holder != null)
+      {
+        final ErrorResultException e = ErrorResultException.newErrorResult(
+            ResultCode.CLIENT_SIDE_USER_CANCELLED, ERR_CONNECTION_POOL_CLOSING
+                .get(toString()).toString());
+        holder.getWaitingFuture().handleErrorResult(e);
+
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+        {
+          StaticUtils.DEBUG_LOG.fine(String.format(
+              "Connection attempt failed: " + e.getMessage()
+                  + " currentPoolSize=%d, poolSize=%d", poolSize
+                  - currentPoolSize.availablePermits(), poolSize));
+        }
+      }
+    }
+    else
+    {
+      final PooledConnection pooledConnection = new PooledConnection(connection);
+      holder.getWaitingFuture().handleResult(pooledConnection);
+    }
+  }
+}
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
index 0c3d7fd..7eb1587 100755
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
@@ -1385,3 +1385,5 @@
 WARN_ATTR_SYNTAX_DSR_INVALID_SUPERIOR_RULE=The definition for \
  the DIT structure rule "%s" declared a superior rule "%s" which has been \
  removed from the schema because it is invalid
+ERR_CONNECTION_POOL_CLOSING=No connection could be obtained from connection \
+ pool "%s" because it is closing
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
index 0ba30b8..3ab0141 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -33,11 +33,14 @@
 
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 
 import javax.net.ssl.SSLContext;
@@ -48,11 +51,14 @@
 import org.forgerock.opendj.ldap.responses.SearchResultEntry;
 import org.forgerock.opendj.ldap.schema.Schema;
 import org.forgerock.opendj.ldap.schema.SchemaBuilder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import com.forgerock.opendj.util.CompletedFutureResult;
 import com.forgerock.opendj.util.StaticUtils;
 
 
@@ -205,7 +211,7 @@
         "online");
 
     // Connection pools.
-    factories[7][0] = Connections.newConnectionPool(onlineServer, 10);
+    factories[7][0] = Connections.newFixedConnectionPool(onlineServer, 10);
 
     // Round robin.
     factories[8][0] = Connections
@@ -220,9 +226,10 @@
         .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
             offlineServer1, offlineServer2, onlineServer)));
     factories[13][0] = Connections
-        .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
-            Connections.newConnectionPool(offlineServer1, 10),
-            Connections.newConnectionPool(onlineServer, 10))));
+        .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
+            .<ConnectionFactory> asList(
+                Connections.newFixedConnectionPool(offlineServer1, 10),
+                Connections.newFixedConnectionPool(onlineServer, 10))));
 
     // Fail-over.
     factories[14][0] = Connections
@@ -237,11 +244,12 @@
         .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays.asList(
             offlineServer1, offlineServer2, onlineServer)));
     factories[19][0] = Connections
-        .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays.asList(
-            Connections.newConnectionPool(offlineServer1, 10),
-            Connections.newConnectionPool(onlineServer, 10))));
+        .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays
+            .<ConnectionFactory> asList(
+                Connections.newFixedConnectionPool(offlineServer1, 10),
+                Connections.newFixedConnectionPool(onlineServer, 10))));
 
-    factories[20][0] = Connections.newConnectionPool(onlineServer, 10);
+    factories[20][0] = Connections.newFixedConnectionPool(onlineServer, 10);
 
     return factories;
   }
@@ -367,4 +375,119 @@
       connection.close();
     }
   }
+
+
+
+  /**
+   * Tests connection pool closure.
+   *
+   * @throws Exception If an unexpected exception occurred.
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testConnectionPoolClose() throws Exception
+  {
+    // We'll use a pool of 4 connections.
+    final int SIZE = 4;
+
+    // Count number of real connections which are open.
+    final AtomicInteger realConnectionCount = new AtomicInteger();
+    final boolean[] realConnectionIsClosed = new boolean[SIZE];
+    Arrays.fill(realConnectionIsClosed, true);
+
+    // Mock underlying connection factory which always succeeds.
+    final ConnectionFactory mockFactory = mock(ConnectionFactory.class);
+    when(mockFactory.getAsynchronousConnection(any(ResultHandler.class)))
+        .thenAnswer(new Answer<FutureResult<AsynchronousConnection>>()
+        {
+
+          public FutureResult<AsynchronousConnection> answer(
+              InvocationOnMock invocation) throws Throwable
+          {
+            // Update state.
+            final int connectionID = realConnectionCount.getAndIncrement();
+            realConnectionIsClosed[connectionID] = false;
+
+            // Mock connection decrements counter on close.
+            AsynchronousConnection mockConnection = mock(AsynchronousConnection.class);
+            doAnswer(new Answer<Void>()
+            {
+              public Void answer(InvocationOnMock invocation) throws Throwable
+              {
+                realConnectionCount.decrementAndGet();
+                realConnectionIsClosed[connectionID] = true;
+                return null;
+              }
+            }).when(mockConnection).close();
+            when(mockConnection.isValid()).thenReturn(true);
+            when(mockConnection.toString()).thenReturn("Mock connection " + connectionID);
+
+            // Excecute handler and return future.
+            ResultHandler<? super AsynchronousConnection> handler =
+              (ResultHandler<? super AsynchronousConnection>) invocation.getArguments()[0];
+            if (handler != null)
+            {
+              handler.handleResult(mockConnection);
+            }
+            return new CompletedFutureResult<AsynchronousConnection>(
+                mockConnection);
+          }
+        });
+
+    ConnectionPool pool = Connections.newFixedConnectionPool(mockFactory, SIZE);
+    Connection[] pooledConnections = new Connection[SIZE];
+    for (int i = 0; i < SIZE; i++)
+    {
+      pooledConnections[i] = pool.getConnection();
+    }
+
+    // Pool is fully utilized.
+    assertThat(realConnectionCount.get()).isEqualTo(SIZE);
+    assertThat(pooledConnections[0].isClosed()).isFalse();
+    assertThat(pooledConnections[1].isClosed()).isFalse();
+    assertThat(pooledConnections[2].isClosed()).isFalse();
+    assertThat(pooledConnections[3].isClosed()).isFalse();
+    assertThat(realConnectionIsClosed[0]).isFalse();
+    assertThat(realConnectionIsClosed[1]).isFalse();
+    assertThat(realConnectionIsClosed[2]).isFalse();
+    assertThat(realConnectionIsClosed[3]).isFalse();
+
+    // Release two connections.
+    pooledConnections[0].close();
+    pooledConnections[1].close();
+    assertThat(realConnectionCount.get()).isEqualTo(4);
+    assertThat(pooledConnections[0].isClosed()).isTrue();
+    assertThat(pooledConnections[1].isClosed()).isTrue();
+    assertThat(pooledConnections[2].isClosed()).isFalse();
+    assertThat(pooledConnections[3].isClosed()).isFalse();
+    assertThat(realConnectionIsClosed[0]).isFalse();
+    assertThat(realConnectionIsClosed[1]).isFalse();
+    assertThat(realConnectionIsClosed[2]).isFalse();
+    assertThat(realConnectionIsClosed[3]).isFalse();
+
+    // Close the pool closing the two connections immediately.
+    pool.close();
+    assertThat(realConnectionCount.get()).isEqualTo(2);
+    assertThat(pooledConnections[0].isClosed()).isTrue();
+    assertThat(pooledConnections[1].isClosed()).isTrue();
+    assertThat(pooledConnections[2].isClosed()).isFalse();
+    assertThat(pooledConnections[3].isClosed()).isFalse();
+    assertThat(realConnectionIsClosed[0]).isTrue();
+    assertThat(realConnectionIsClosed[1]).isTrue();
+    assertThat(realConnectionIsClosed[2]).isFalse();
+    assertThat(realConnectionIsClosed[3]).isFalse();
+
+    // Release two remaining connections and check that they get closed.
+    pooledConnections[2].close();
+    pooledConnections[3].close();
+    assertThat(realConnectionCount.get()).isEqualTo(0);
+    assertThat(pooledConnections[0].isClosed()).isTrue();
+    assertThat(pooledConnections[1].isClosed()).isTrue();
+    assertThat(pooledConnections[2].isClosed()).isTrue();
+    assertThat(pooledConnections[3].isClosed()).isTrue();
+    assertThat(realConnectionIsClosed[0]).isTrue();
+    assertThat(realConnectionIsClosed[1]).isTrue();
+    assertThat(realConnectionIsClosed[2]).isTrue();
+    assertThat(realConnectionIsClosed[3]).isTrue();
+  }
 }
diff --git a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
index a98ce81..fe89608 100644
--- a/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
+++ b/opendj-sdk/opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -604,10 +604,11 @@
 
       // Round robin.
       final ConnectionFactory loadBalancer = Connections
-          .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
-              Connections.newConnectionPool(offlineServer1, 10),
-              Connections.newConnectionPool(offlineServer2, 10),
-              Connections.newConnectionPool(onlineServer, 10))));
+          .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
+              .<ConnectionFactory> asList(
+                  Connections.newFixedConnectionPool(offlineServer1, 10),
+                  Connections.newFixedConnectionPool(offlineServer2, 10),
+                  Connections.newFixedConnectionPool(onlineServer, 10))));
 
       final MockServerConnection proxyServerConnection = new MockServerConnection();
       final MockServerConnectionFactory proxyServerConnectionFactory = new MockServerConnectionFactory(
@@ -699,10 +700,11 @@
 
       // Round robin.
       final ConnectionFactory loadBalancer = Connections
-          .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
-              Connections.newConnectionPool(offlineServer1, 10),
-              Connections.newConnectionPool(offlineServer2, 10),
-              Connections.newConnectionPool(onlineServer, 10))));
+          .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
+              .<ConnectionFactory> asList(
+                  Connections.newFixedConnectionPool(offlineServer1, 10),
+                  Connections.newFixedConnectionPool(offlineServer2, 10),
+                  Connections.newFixedConnectionPool(onlineServer, 10))));
 
       final MockServerConnection proxyServerConnection = new MockServerConnection()
       {

--
Gitblit v1.10.0