mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
13.13.2011 abc8483bdf6febb3c10ea2bd20083ab389da9e3d
Fix OPENDJ-177: Make connection pools Closeable

Renamed ConnectionPool -> FixedConnectionPool
Added new interface ConnectionPool which extends Closeable
Renamed Connections.newConnectionPool() to newFixedConnectionPool()
Implemented FixedConnectionPool.close() and added unit tests.
1 files added
6 files modified
1946 ■■■■■ changed files
opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java 4 ●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java 834 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java 5 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java 944 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties 2 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java 139 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java 18 ●●●●● patch | view | raw | blame | history
opendj3/opendj-ldap-sdk-examples/src/main/java/org/forgerock/opendj/examples/proxy/Main.java
@@ -559,10 +559,10 @@
      final String remoteAddress = args[i];
      final int remotePort = Integer.parseInt(args[i + 1]);
      factories.add(Connections.newConnectionPool(
      factories.add(Connections.newFixedConnectionPool(
          new LDAPConnectionFactory(remoteAddress, remotePort),
          Integer.MAX_VALUE));
      bindFactories.add(Connections.newConnectionPool(
      bindFactories.add(Connections.newFixedConnectionPool(
          new LDAPConnectionFactory(remoteAddress, remotePort),
          Integer.MAX_VALUE));
    }
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/ConnectionPool.java
@@ -22,806 +22,86 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 *      Copyright 2011 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.requests.*;
import org.forgerock.opendj.ldap.responses.*;
import com.forgerock.opendj.util.*;
import java.io.Closeable;
/**
 * A simple connection pool implementation.
 * A connection factory which maintains and re-uses a pool of connections.
 * Connections obtained from a connection pool are returned to the connection
 * pool when closed, although connection pool implementations may choose to
 * physically close the connection if needed (e.g. in order to reduce the size
 * of the pool).
 * <p>
 * When connection pools are no longer needed they must be explicitly closed in
 * order to close any remaining pooled connections.
 * <p>
 * Since pooled connections are re-used, applications must use operations such
 * as binds and StartTLS with extreme caution.
 */
final class ConnectionPool extends AbstractConnectionFactory
public interface ConnectionPool extends ConnectionFactory, Closeable
{
  /**
   * This result handler is invoked when an attempt to add a new connection to
   * the pool completes.
   * Releases any resources associated with this connection pool. Pooled
   * connections will be permanently closed and this connection pool will no
   * longer be available for use.
   * <p>
   * Attempts to use this connection pool after it has been closed will result
   * in an {@code IllegalStateException}.
   * <p>
   * Calling {@code close} on a connection pool which is already closed has no
   * effect.
   */
  private final class ConnectionResultHandler implements
      ResultHandler<AsynchronousConnection>
  {
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleErrorResult(final ErrorResultException error)
    {
      // Connection attempt failed, so decrease the pool size.
      currentPoolSize.release();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt failed: " + error.getMessage()
                + " currentPoolSize=%d, poolSize=%d",
            poolSize - currentPoolSize.availablePermits(), poolSize));
      }
      QueueElement holder;
      synchronized (queue)
      {
        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
        {
          // No waiting futures.
          return;
        }
        else
        {
          holder = queue.removeFirst();
        }
      }
      // There was waiting future, so close it.
      holder.getWaitingFuture().handleErrorResult(error);
    }
  void close();
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleResult(final AsynchronousConnection connection)
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt succeeded: "
                + " currentPoolSize=%d, poolSize=%d",
                poolSize - currentPoolSize.availablePermits(), poolSize));
      }
      publishConnection(connection);
    }
  }
  /**
   * A pooled connection is passed to the client. It wraps an underlying
   * "pooled" connection obtained from the underlying factory and lasts until
   * the client application closes this connection. More specifically, pooled
   * connections are not actually stored in the internal queue.
   */
  private final class PooledConnection implements AsynchronousConnection
  {
    // Connection event listeners registed against this pooled connection should
    // have the same life time as the pooled connection.
    private final List<ConnectionEventListener> listeners =
      new CopyOnWriteArrayList<ConnectionEventListener>();
    private final AsynchronousConnection connection;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    PooledConnection(final AsynchronousConnection connection)
    {
      this.connection = connection;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Void> abandon(final AbandonRequest request)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.abandon(request);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.add(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection
          .add(request, resultHandler, intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void addConnectionEventListener(
        final ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.add(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.bind(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.bind(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close()
    {
      if (!isClosed.compareAndSet(false, true))
      {
        // Already closed.
        return;
      }
      // Don't put invalid connections back in the pool.
      if (connection.isValid())
      {
        publishConnection(connection);
      }
      else
      {
        // The connection may have been disconnected by the remote server, but
        // the server may still be available. In order to avoid leaving pending
        // futures hanging indefinitely, we should try to reconnect immediately.
        // Close the dead connection.
        connection.close();
        // Try to get a new connection to replace it.
        factory.getAsynchronousConnection(connectionResultHandler);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          StaticUtils.DEBUG_LOG.warning(String.format(
              "Connection no longer valid. "
                  + "currentPoolSize=%d, poolSize=%d",
                  poolSize - currentPoolSize.availablePermits(), poolSize));
        }
      }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close(final UnbindRequest request, final String reason)
        throws NullPointerException
    {
      close();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.compare(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.compare(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.delete(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.delete(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.extendedRequest(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request,
        final ResultHandler<? super R> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.extendedRequest(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getSynchronousConnection()
    {
      return new SynchronousConnection(this);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isClosed()
    {
      return isClosed.get();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid()
    {
      return connection.isValid() && !isClosed();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modify(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modify(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modifyDN(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modifyDN(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> readEntry(final DN name,
        final Collection<String> attributeDescriptions,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.readEntry(name, attributeDescriptions, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void removeConnectionEventListener(
        final ConnectionEventListener listener) throws NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.remove(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.search(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.search(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> searchSingleEntry(
        final SearchRequest request,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.searchSingleEntry(request, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      final StringBuilder builder = new StringBuilder();
      builder.append("PooledConnection(");
      builder.append(connection);
      builder.append(')');
      return builder.toString();
    }
  }
  /**
   * A queue element is either a pending connection request future awaiting an
   * {@code AsynchronousConnection} or it is an unused
   * {@code AsynchronousConnection} awaiting a connection request.
   */
  private static final class QueueElement
  {
    private final Object value;
    QueueElement(final AsynchronousConnection connection)
    {
      this.value = connection;
    }
    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
    {
      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
    }
    AsynchronousConnection getWaitingConnection()
    {
      if (value instanceof AsynchronousConnection)
      {
        return (AsynchronousConnection) value;
      }
      else
      {
        throw new IllegalStateException();
      }
    }
    @SuppressWarnings("unchecked")
    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
    {
      if (value instanceof AsynchronousFutureResult)
      {
        return (AsynchronousFutureResult<AsynchronousConnection>) value;
      }
      else
      {
        throw new IllegalStateException();
      }
    }
    boolean isWaitingFuture()
    {
      return value instanceof AsynchronousFutureResult;
    }
    public String toString()
    {
      return String.valueOf(value);
    }
  }
  // Guarded by queue.
  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
  private final ConnectionFactory factory;
  private final int poolSize;
  private final Semaphore currentPoolSize;
  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
    new ConnectionResultHandler();
  /**
   * Creates a new connection pool which will maintain {@code poolSize}
   * connections created using the provided connection factory.
   * Obtains an asynchronous connection from this connection pool, potentially
   * opening a new connection if needed.
   * <p>
   * The returned {@code FutureResult} can be used to retrieve the pooled
   * asynchronous connection. Alternatively, if a {@code ResultHandler} is
   * provided, the handler will be notified when the pooled connection is
   * available and ready for use.
   * <p>
   * Closing the pooled connection will, depending on the connection pool
   * implementation, return the connection to this pool without closing it.
   *
   * @param factory
   *          The connection factory to use for creating new connections.
   * @param poolSize
   *          The maximum size of the connection pool.
   * @param handler
   *          The completion handler, or {@code null} if no handler is to be
   *          used.
   * @return A future which can be used to retrieve the pooled asynchronous
   *         connection.
   * @throws IllegalStateException
   *           If this connection pool has already been closed.
   */
  ConnectionPool(final ConnectionFactory factory, final int poolSize)
  {
    this.factory = factory;
    this.poolSize = poolSize;
    this.currentPoolSize = new Semaphore(poolSize);
  }
  FutureResult<AsynchronousConnection> getAsynchronousConnection(
      ResultHandler<? super AsynchronousConnection> handler);
  /**
   * {@inheritDoc}
   * Obtains a connection from this connection pool, potentially opening a new
   * connection if needed.
   * <p>
   * Closing the pooled connection will, depending on the connection pool
   * implementation, return the connection to this pool without closing it.
   *
   * @return A pooled connection.
   * @throws ErrorResultException
   *           If the connection request failed for some reason.
   * @throws InterruptedException
   *           If the current thread was interrupted while waiting.
   * @throws IllegalStateException
   *           If this connection pool has already been closed.
   */
  @Override
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<? super AsynchronousConnection> handler)
  {
    QueueElement holder;
    synchronized (queue)
    {
      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
      {
        holder = new QueueElement(handler);
        queue.add(holder);
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    if (!holder.isWaitingFuture())
    {
      // There was a completed connection attempt.
      final AsynchronousConnection connection = holder.getWaitingConnection();
      final PooledConnection pooledConnection = new PooledConnection(connection);
      if (handler != null)
      {
        handler.handleResult(pooledConnection);
      }
      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
    }
    else
    {
      // Grow the pool if needed.
      final FutureResult<AsynchronousConnection> future = holder
          .getWaitingFuture();
      if (!future.isDone() && currentPoolSize.tryAcquire())
      {
        factory.getAsynchronousConnection(connectionResultHandler);
      }
      return future;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    final StringBuilder builder = new StringBuilder();
    builder.append("ConnectionPool(");
    builder.append(String.valueOf(factory));
    builder.append(',');
    builder.append(poolSize);
    builder.append(')');
    return builder.toString();
  }
  private void publishConnection(final AsynchronousConnection connection)
  {
    QueueElement holder;
    synchronized (queue)
    {
      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
      {
        holder = new QueueElement(connection);
        queue.add(holder);
        return;
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    // There was waiting future, so close it.
    final PooledConnection pooledConnection = new PooledConnection(connection);
    holder.getWaitingFuture().handleResult(pooledConnection);
  }
  Connection getConnection() throws ErrorResultException, InterruptedException;
}
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/Connections.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
@@ -92,13 +93,13 @@
   * @throws NullPointerException
   *           If {@code factory} was {@code null}.
   */
  public static ConnectionFactory newConnectionPool(
  public static ConnectionPool newFixedConnectionPool(
      final ConnectionFactory factory, final int poolSize)
      throws IllegalArgumentException, NullPointerException
  {
    Validator.ensureNotNull(factory);
    Validator.ensureTrue(poolSize >= 0, "negative pool size");
    return new ConnectionPool(factory, poolSize);
    return new FixedConnectionPool(factory, poolSize);
  }
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
New file
@@ -0,0 +1,944 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opendj3/legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opendj3/legal-notices/CDDLv1_0.txt.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.forgerock.opendj.ldap;
import static org.forgerock.opendj.ldap.CoreMessages.ERR_CONNECTION_POOL_CLOSING;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.forgerock.opendj.ldap.requests.*;
import org.forgerock.opendj.ldap.responses.*;
import com.forgerock.opendj.util.*;
/**
 * A simple connection pool implementation which maintains a fixed number of
 * connections.
 */
final class FixedConnectionPool extends AbstractConnectionFactory implements
    ConnectionPool
{
  /**
   * This result handler is invoked when an attempt to add a new connection to
   * the pool completes.
   */
  private final class ConnectionResultHandler implements
      ResultHandler<AsynchronousConnection>
  {
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleErrorResult(final ErrorResultException error)
    {
      // Connection attempt failed, so decrease the pool size.
      currentPoolSize.release();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format("Connection attempt failed: "
            + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize
            - currentPoolSize.availablePermits(), poolSize));
      }
      QueueElement holder;
      synchronized (queue)
      {
        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
        {
          // No waiting futures.
          return;
        }
        else
        {
          holder = queue.removeFirst();
        }
      }
      // There was waiting future, so close it.
      holder.getWaitingFuture().handleErrorResult(error);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleResult(final AsynchronousConnection connection)
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt succeeded: "
                + " currentPoolSize=%d, poolSize=%d", poolSize
                - currentPoolSize.availablePermits(), poolSize));
      }
      publishConnection(connection);
    }
  }
  /**
   * A pooled connection is passed to the client. It wraps an underlying
   * "pooled" connection obtained from the underlying factory and lasts until
   * the client application closes this connection. More specifically, pooled
   * connections are not actually stored in the internal queue.
   */
  private final class PooledConnection implements AsynchronousConnection
  {
    // Connection event listeners registed against this pooled connection should
    // have the same life time as the pooled connection.
    private final List<ConnectionEventListener> listeners =
      new CopyOnWriteArrayList<ConnectionEventListener>();
    private final AsynchronousConnection connection;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    PooledConnection(final AsynchronousConnection connection)
    {
      this.connection = connection;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Void> abandon(final AbandonRequest request)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.abandon(request);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.add(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection
          .add(request, resultHandler, intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void addConnectionEventListener(
        final ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.add(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.bind(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.bind(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close()
    {
      if (!isClosed.compareAndSet(false, true))
      {
        // Already closed.
        return;
      }
      // Don't put invalid connections back in the pool.
      if (connection.isValid())
      {
        publishConnection(connection);
      }
      else
      {
        // The connection may have been disconnected by the remote server, but
        // the server may still be available. In order to avoid leaving pending
        // futures hanging indefinitely, we should try to reconnect immediately.
        // Close the dead connection.
        connection.close();
        // Try to get a new connection to replace it.
        factory.getAsynchronousConnection(connectionResultHandler);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          StaticUtils.DEBUG_LOG.warning(String.format(
              "Connection no longer valid. "
                  + "currentPoolSize=%d, poolSize=%d", poolSize
                  - currentPoolSize.availablePermits(), poolSize));
        }
      }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close(final UnbindRequest request, final String reason)
        throws NullPointerException
    {
      close();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.compare(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.compare(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.delete(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.delete(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.extendedRequest(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request,
        final ResultHandler<? super R> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.extendedRequest(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getSynchronousConnection()
    {
      return new SynchronousConnection(this);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isClosed()
    {
      return isClosed.get();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid()
    {
      return connection.isValid() && !isClosed();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modify(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modify(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modifyDN(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.modifyDN(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> readEntry(final DN name,
        final Collection<String> attributeDescriptions,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.readEntry(name, attributeDescriptions, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void removeConnectionEventListener(
        final ConnectionEventListener listener) throws NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.remove(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.search(request, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.search(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> searchSingleEntry(
        final SearchRequest request,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      return connection.searchSingleEntry(request, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      final StringBuilder builder = new StringBuilder();
      builder.append("PooledConnection(");
      builder.append(connection);
      builder.append(')');
      return builder.toString();
    }
  }
  /**
   * A queue element is either a pending connection request future awaiting an
   * {@code AsynchronousConnection} or it is an unused
   * {@code AsynchronousConnection} awaiting a connection request.
   */
  private static final class QueueElement
  {
    private final Object value;
    QueueElement(final AsynchronousConnection connection)
    {
      this.value = connection;
    }
    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
    {
      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
    }
    @Override
    public String toString()
    {
      return String.valueOf(value);
    }
    AsynchronousConnection getWaitingConnection()
    {
      if (value instanceof AsynchronousConnection)
      {
        return (AsynchronousConnection) value;
      }
      else
      {
        throw new IllegalStateException();
      }
    }
    @SuppressWarnings("unchecked")
    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
    {
      if (value instanceof AsynchronousFutureResult)
      {
        return (AsynchronousFutureResult<AsynchronousConnection>) value;
      }
      else
      {
        throw new IllegalStateException();
      }
    }
    boolean isWaitingFuture()
    {
      return value instanceof AsynchronousFutureResult;
    }
  }
  // Guarded by queue.
  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
  // Guarded by queue.
  private boolean isClosed = false;
  private final ConnectionFactory factory;
  private final int poolSize;
  private final Semaphore currentPoolSize;
  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
    new ConnectionResultHandler();
  /**
   * Creates a new connection pool which will maintain {@code poolSize}
   * connections created using the provided connection factory.
   *
   * @param factory
   *          The connection factory to use for creating new connections.
   * @param poolSize
   *          The maximum size of the connection pool.
   */
  FixedConnectionPool(final ConnectionFactory factory, final int poolSize)
  {
    this.factory = factory;
    this.poolSize = poolSize;
    this.currentPoolSize = new Semaphore(poolSize);
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void close()
  {
    final LinkedList<AsynchronousConnection> idleConnections;
    synchronized (queue)
    {
      if (isClosed)
      {
        return;
      }
      isClosed = true;
      // Remove any connections which are waiting in the queue as these can be
      // closed immediately.
      idleConnections = new LinkedList<AsynchronousConnection>();
      while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture())
      {
        final QueueElement holder = queue.removeFirst();
        idleConnections.add(holder.getWaitingConnection());
      }
    }
    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
    {
      StaticUtils.DEBUG_LOG.fine(String.format(
          "Connection pool is closing: currentPoolSize=%d, poolSize=%d",
          poolSize - currentPoolSize.availablePermits(), poolSize));
    }
    // Close the idle connections.
    for (final AsynchronousConnection connection : idleConnections)
    {
      closeConnection(connection);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<? super AsynchronousConnection> handler)
  {
    QueueElement holder;
    synchronized (queue)
    {
      if (isClosed)
      {
        throw new IllegalStateException("FixedConnectionPool is already closed");
      }
      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
      {
        holder = new QueueElement(handler);
        queue.add(holder);
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    if (!holder.isWaitingFuture())
    {
      // There was a completed connection attempt.
      final AsynchronousConnection connection = holder.getWaitingConnection();
      final PooledConnection pooledConnection = new PooledConnection(connection);
      if (handler != null)
      {
        handler.handleResult(pooledConnection);
      }
      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
    }
    else
    {
      // Grow the pool if needed.
      final FutureResult<AsynchronousConnection> future = holder
          .getWaitingFuture();
      if (!future.isDone() && currentPoolSize.tryAcquire())
      {
        factory.getAsynchronousConnection(connectionResultHandler);
      }
      return future;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    final StringBuilder builder = new StringBuilder();
    builder.append("FixedConnectionPool(");
    builder.append(String.valueOf(factory));
    builder.append(',');
    builder.append(poolSize);
    builder.append(')');
    return builder.toString();
  }
  /**
   * Provide a finalizer because connection pools are expensive resources to
   * accidentally leave around. Also, since they won't be created all that
   * frequently, there's little risk of overloading the finalizer.
   */
  @Override
  protected void finalize() throws Throwable
  {
    close();
  }
  private void closeConnection(final AsynchronousConnection connection)
  {
    // The connection will be closed, so decrease the pool size.
    currentPoolSize.release();
    connection.close();
    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
    {
      StaticUtils.DEBUG_LOG.fine(String.format(
          "Closing connection because connection pool is closing: "
              + " currentPoolSize=%d, poolSize=%d",
          poolSize - currentPoolSize.availablePermits(), poolSize));
    }
  }
  private void publishConnection(final AsynchronousConnection connection)
  {
    final QueueElement holder;
    boolean connectionPoolIsClosing = false;
    synchronized (queue)
    {
      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
      {
        if (isClosed)
        {
          connectionPoolIsClosing = true;
          holder = null;
        }
        else
        {
          holder = new QueueElement(connection);
          queue.add(holder);
          return;
        }
      }
      else
      {
        connectionPoolIsClosing = isClosed;
        holder = queue.removeFirst();
      }
    }
    // There was waiting future, so complete it.
    if (connectionPoolIsClosing)
    {
      closeConnection(connection);
      if (holder != null)
      {
        final ErrorResultException e = ErrorResultException.newErrorResult(
            ResultCode.CLIENT_SIDE_USER_CANCELLED, ERR_CONNECTION_POOL_CLOSING
                .get(toString()).toString());
        holder.getWaitingFuture().handleErrorResult(e);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.fine(String.format(
              "Connection attempt failed: " + e.getMessage()
                  + " currentPoolSize=%d, poolSize=%d", poolSize
                  - currentPoolSize.availablePermits(), poolSize));
        }
      }
    }
    else
    {
      final PooledConnection pooledConnection = new PooledConnection(connection);
      holder.getWaitingFuture().handleResult(pooledConnection);
    }
  }
}
opendj3/opendj-ldap-sdk/src/main/resources/org/forgerock/opendj/ldap/core.properties
@@ -1385,3 +1385,5 @@
WARN_ATTR_SYNTAX_DSR_INVALID_SUPERIOR_RULE=The definition for \
 the DIT structure rule "%s" declared a superior rule "%s" which has been \
 removed from the schema because it is invalid
ERR_CONNECTION_POOL_CLOSING=No connection could be obtained from connection \
 pool "%s" because it is closing
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/ConnectionFactoryTestCase.java
@@ -33,11 +33,14 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import javax.net.ssl.SSLContext;
@@ -48,11 +51,14 @@
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.schema.Schema;
import org.forgerock.opendj.ldap.schema.SchemaBuilder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.forgerock.opendj.util.CompletedFutureResult;
import com.forgerock.opendj.util.StaticUtils;
@@ -205,7 +211,7 @@
        "online");
    // Connection pools.
    factories[7][0] = Connections.newConnectionPool(onlineServer, 10);
    factories[7][0] = Connections.newFixedConnectionPool(onlineServer, 10);
    // Round robin.
    factories[8][0] = Connections
@@ -220,9 +226,10 @@
        .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
            offlineServer1, offlineServer2, onlineServer)));
    factories[13][0] = Connections
        .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
            Connections.newConnectionPool(offlineServer1, 10),
            Connections.newConnectionPool(onlineServer, 10))));
        .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
            .<ConnectionFactory> asList(
                Connections.newFixedConnectionPool(offlineServer1, 10),
                Connections.newFixedConnectionPool(onlineServer, 10))));
    // Fail-over.
    factories[14][0] = Connections
@@ -237,11 +244,12 @@
        .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays.asList(
            offlineServer1, offlineServer2, onlineServer)));
    factories[19][0] = Connections
        .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays.asList(
            Connections.newConnectionPool(offlineServer1, 10),
            Connections.newConnectionPool(onlineServer, 10))));
        .newLoadBalancer(new FailoverLoadBalancingAlgorithm(Arrays
            .<ConnectionFactory> asList(
                Connections.newFixedConnectionPool(offlineServer1, 10),
                Connections.newFixedConnectionPool(onlineServer, 10))));
    factories[20][0] = Connections.newConnectionPool(onlineServer, 10);
    factories[20][0] = Connections.newFixedConnectionPool(onlineServer, 10);
    return factories;
  }
@@ -367,4 +375,119 @@
      connection.close();
    }
  }
  /**
   * Tests connection pool closure.
   *
   * @throws Exception If an unexpected exception occurred.
   */
  @SuppressWarnings("unchecked")
  @Test
  public void testConnectionPoolClose() throws Exception
  {
    // We'll use a pool of 4 connections.
    final int SIZE = 4;
    // Count number of real connections which are open.
    final AtomicInteger realConnectionCount = new AtomicInteger();
    final boolean[] realConnectionIsClosed = new boolean[SIZE];
    Arrays.fill(realConnectionIsClosed, true);
    // Mock underlying connection factory which always succeeds.
    final ConnectionFactory mockFactory = mock(ConnectionFactory.class);
    when(mockFactory.getAsynchronousConnection(any(ResultHandler.class)))
        .thenAnswer(new Answer<FutureResult<AsynchronousConnection>>()
        {
          public FutureResult<AsynchronousConnection> answer(
              InvocationOnMock invocation) throws Throwable
          {
            // Update state.
            final int connectionID = realConnectionCount.getAndIncrement();
            realConnectionIsClosed[connectionID] = false;
            // Mock connection decrements counter on close.
            AsynchronousConnection mockConnection = mock(AsynchronousConnection.class);
            doAnswer(new Answer<Void>()
            {
              public Void answer(InvocationOnMock invocation) throws Throwable
              {
                realConnectionCount.decrementAndGet();
                realConnectionIsClosed[connectionID] = true;
                return null;
              }
            }).when(mockConnection).close();
            when(mockConnection.isValid()).thenReturn(true);
            when(mockConnection.toString()).thenReturn("Mock connection " + connectionID);
            // Excecute handler and return future.
            ResultHandler<? super AsynchronousConnection> handler =
              (ResultHandler<? super AsynchronousConnection>) invocation.getArguments()[0];
            if (handler != null)
            {
              handler.handleResult(mockConnection);
            }
            return new CompletedFutureResult<AsynchronousConnection>(
                mockConnection);
          }
        });
    ConnectionPool pool = Connections.newFixedConnectionPool(mockFactory, SIZE);
    Connection[] pooledConnections = new Connection[SIZE];
    for (int i = 0; i < SIZE; i++)
    {
      pooledConnections[i] = pool.getConnection();
    }
    // Pool is fully utilized.
    assertThat(realConnectionCount.get()).isEqualTo(SIZE);
    assertThat(pooledConnections[0].isClosed()).isFalse();
    assertThat(pooledConnections[1].isClosed()).isFalse();
    assertThat(pooledConnections[2].isClosed()).isFalse();
    assertThat(pooledConnections[3].isClosed()).isFalse();
    assertThat(realConnectionIsClosed[0]).isFalse();
    assertThat(realConnectionIsClosed[1]).isFalse();
    assertThat(realConnectionIsClosed[2]).isFalse();
    assertThat(realConnectionIsClosed[3]).isFalse();
    // Release two connections.
    pooledConnections[0].close();
    pooledConnections[1].close();
    assertThat(realConnectionCount.get()).isEqualTo(4);
    assertThat(pooledConnections[0].isClosed()).isTrue();
    assertThat(pooledConnections[1].isClosed()).isTrue();
    assertThat(pooledConnections[2].isClosed()).isFalse();
    assertThat(pooledConnections[3].isClosed()).isFalse();
    assertThat(realConnectionIsClosed[0]).isFalse();
    assertThat(realConnectionIsClosed[1]).isFalse();
    assertThat(realConnectionIsClosed[2]).isFalse();
    assertThat(realConnectionIsClosed[3]).isFalse();
    // Close the pool closing the two connections immediately.
    pool.close();
    assertThat(realConnectionCount.get()).isEqualTo(2);
    assertThat(pooledConnections[0].isClosed()).isTrue();
    assertThat(pooledConnections[1].isClosed()).isTrue();
    assertThat(pooledConnections[2].isClosed()).isFalse();
    assertThat(pooledConnections[3].isClosed()).isFalse();
    assertThat(realConnectionIsClosed[0]).isTrue();
    assertThat(realConnectionIsClosed[1]).isTrue();
    assertThat(realConnectionIsClosed[2]).isFalse();
    assertThat(realConnectionIsClosed[3]).isFalse();
    // Release two remaining connections and check that they get closed.
    pooledConnections[2].close();
    pooledConnections[3].close();
    assertThat(realConnectionCount.get()).isEqualTo(0);
    assertThat(pooledConnections[0].isClosed()).isTrue();
    assertThat(pooledConnections[1].isClosed()).isTrue();
    assertThat(pooledConnections[2].isClosed()).isTrue();
    assertThat(pooledConnections[3].isClosed()).isTrue();
    assertThat(realConnectionIsClosed[0]).isTrue();
    assertThat(realConnectionIsClosed[1]).isTrue();
    assertThat(realConnectionIsClosed[2]).isTrue();
    assertThat(realConnectionIsClosed[3]).isTrue();
  }
}
opendj3/opendj-ldap-sdk/src/test/java/org/forgerock/opendj/ldap/LDAPListenerTestCase.java
@@ -604,10 +604,11 @@
      // Round robin.
      final ConnectionFactory loadBalancer = Connections
          .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
              Connections.newConnectionPool(offlineServer1, 10),
              Connections.newConnectionPool(offlineServer2, 10),
              Connections.newConnectionPool(onlineServer, 10))));
          .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
              .<ConnectionFactory> asList(
                  Connections.newFixedConnectionPool(offlineServer1, 10),
                  Connections.newFixedConnectionPool(offlineServer2, 10),
                  Connections.newFixedConnectionPool(onlineServer, 10))));
      final MockServerConnection proxyServerConnection = new MockServerConnection();
      final MockServerConnectionFactory proxyServerConnectionFactory = new MockServerConnectionFactory(
@@ -699,10 +700,11 @@
      // Round robin.
      final ConnectionFactory loadBalancer = Connections
          .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays.asList(
              Connections.newConnectionPool(offlineServer1, 10),
              Connections.newConnectionPool(offlineServer2, 10),
              Connections.newConnectionPool(onlineServer, 10))));
          .newLoadBalancer(new RoundRobinLoadBalancingAlgorithm(Arrays
              .<ConnectionFactory> asList(
                  Connections.newFixedConnectionPool(offlineServer1, 10),
                  Connections.newFixedConnectionPool(offlineServer2, 10),
                  Connections.newFixedConnectionPool(onlineServer, 10))));
      final MockServerConnection proxyServerConnection = new MockServerConnection()
      {