From 9f2bba679ab597f1e50078a29d145100e3baed3c Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Tue, 19 Oct 2010 16:36:12 +0000
Subject: [PATCH] Make ConnectionPool implementation fully async and fix some race conditions in the unit tests.

---
 sdk/src/org/opends/sdk/ConnectionPool.java |  589 +++++++++++++++++++++++++++++++++-------------------------
 1 files changed, 332 insertions(+), 257 deletions(-)

diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index a7b5f75..e9497da 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,8 +30,11 @@
 
 
 import java.util.Collection;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 
 import org.opends.sdk.requests.*;
@@ -41,6 +44,7 @@
 import com.sun.opends.sdk.util.AsynchronousFutureResult;
 import com.sun.opends.sdk.util.CompletedFutureResult;
 import com.sun.opends.sdk.util.StaticUtils;
+import com.sun.opends.sdk.util.Validator;
 
 
 
@@ -49,36 +53,101 @@
  */
 final class ConnectionPool extends AbstractConnectionFactory
 {
-  // Future used for waiting for pooled connections to become available.
-  private static final class FuturePooledConnection extends
-      AsynchronousFutureResult<AsynchronousConnection>
+
+  /**
+   * This result handler is invoked when an attempt to add a new connection to
+   * the pool completes.
+   */
+  private final class ConnectionResultHandler implements
+      ResultHandler<AsynchronousConnection>
   {
-    private FuturePooledConnection(
-        final ResultHandler<? super AsynchronousConnection> handler)
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleErrorResult(final ErrorResultException error)
     {
-      super(handler);
+      // Connection attempt failed, so decrease the pool size.
+      currentPoolSize.release();
+
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG.fine(String.format(
+            "Connection attempt failed: " + error.getMessage()
+                + " currentPoolSize=%d, poolSize=%d",
+            poolSize - currentPoolSize.availablePermits(), poolSize));
+      }
+
+      QueueElement holder;
+      synchronized (queue)
+      {
+        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
+        {
+          // No waiting futures.
+          return;
+        }
+        else
+        {
+          holder = queue.removeFirst();
+        }
+      }
+
+      // There was waiting future, so close it.
+      holder.getWaitingFuture().handleErrorResult(error);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleResult(final AsynchronousConnection connection)
+    {
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG.fine(String.format(
+            "Connection attempt succeeded: "
+                + " currentPoolSize=%d, poolSize=%d",
+                poolSize - currentPoolSize.availablePermits(), poolSize));
+      }
+
+      publishConnection(connection);
     }
   }
 
 
 
-  private final class PooledConnectionWapper implements AsynchronousConnection,
-      ConnectionEventListener
+  /**
+   * A pooled connection is passed to the client. It wraps an underlying
+   * "pooled" connection obtained from the underlying factory and lasts until
+   * the client application closes this connection. More specifically, pooled
+   * connections are not actually stored in the internal queue.
+   */
+  private final class PooledConnection implements AsynchronousConnection
   {
+    // Connection event listeners registed against this pooled connection should
+    // have the same life time as the pooled connection.
+    private final List<ConnectionEventListener> listeners =
+      new CopyOnWriteArrayList<ConnectionEventListener>();
+
     private final AsynchronousConnection connection;
 
-    private volatile boolean isClosed;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
 
 
-    private PooledConnectionWapper(final AsynchronousConnection connection)
+    PooledConnection(final AsynchronousConnection connection)
     {
       this.connection = connection;
-      this.connection.addConnectionEventListener(this);
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Void> abandon(final AbandonRequest request)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
@@ -92,6 +161,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> add(final AddRequest request,
         final ResultHandler<? super Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -106,6 +179,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> add(final AddRequest request,
         final ResultHandler<? super Result> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -122,18 +199,28 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void addConnectionEventListener(
         final ConnectionEventListener listener) throws IllegalStateException,
         NullPointerException
     {
+      Validator.ensureNotNull(listener);
       if (isClosed())
       {
         throw new IllegalStateException();
       }
+      listeners.add(listener);
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<BindResult> bind(final BindRequest request,
         final ResultHandler<? super BindResult> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -148,6 +235,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<BindResult> bind(final BindRequest request,
         final ResultHandler<? super BindResult> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -164,47 +255,51 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void close()
     {
-      synchronized (pool)
+      if (!isClosed.compareAndSet(false, true))
       {
-        if (isClosed)
-        {
-          return;
-        }
-        isClosed = true;
+        // Already closed.
+        return;
+      }
 
-        // Don't put invalid connections back in the pool.
-        if (connection.isValid())
+      // Don't put invalid connections back in the pool.
+      if (connection.isValid())
+      {
+        publishConnection(connection);
+      }
+      else
+      {
+        // The connection may have been disconnected by the remote server, but
+        // the server may still be available. In order to avoid leaving pending
+        // futures hanging indefinitely, we should try to reconnect immediately.
+
+        // Close the dead connection.
+        connection.close();
+
+        // Try to get a new connection to replace it.
+        factory.getAsynchronousConnection(connectionResultHandler);
+
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
         {
-          releaseConnection(connection);
-          return;
+          StaticUtils.DEBUG_LOG.warning(String.format(
+              "Connection no longer valid. "
+                  + "currentPoolSize=%d, poolSize=%d",
+                  poolSize - currentPoolSize.availablePermits(), poolSize));
         }
       }
-
-      // Connection is no longer valid. Close outside of lock
-      connection.removeConnectionEventListener(this);
-      connection.close();
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-      {
-        StaticUtils.DEBUG_LOG.warning(String.format(
-            "Dead connection released and closed. "
-                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
-      }
-
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-      {
-        StaticUtils.DEBUG_LOG.warning(String.format(
-            "Reconnect attempt starting. "
-                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
-      }
-      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void close(final UnbindRequest request, final String reason)
         throws NullPointerException
     {
@@ -213,6 +308,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<CompareResult> compare(final CompareRequest request,
         final ResultHandler<? super CompareResult> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -227,6 +326,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<CompareResult> compare(final CompareRequest request,
         final ResultHandler<? super CompareResult> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -243,48 +346,10 @@
 
 
 
-    public void handleConnectionClosed()
-    {
-      // Ignore - we intercept close via the close method.
-    }
-
-
-
-    public void handleConnectionError(final boolean isDisconnectNotification,
-        final ErrorResultException error)
-    {
-      // Remove this connection from the pool if its in there. If not,
-      // just ignore and wait for the user to close and we can deal with it
-      // there.
-      if (pool.remove(this))
-      {
-        numConnections--;
-        connection.removeConnectionEventListener(this);
-
-        // FIXME: should still close the connection, but we need to be
-        // careful that users of the pooled connection get a sensible
-        // error if they continue to use it (i.e. not an NPE or ISE).
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-        {
-          StaticUtils.DEBUG_LOG.warning(String.format(
-              "Connection error occured and removed from pool: "
-                  + error.getMessage()
-                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-              numConnections, pool.size(), pendingFutures.size()));
-        }
-      }
-    }
-
-
-
-    public void handleUnsolicitedNotification(final ExtendedResult notification)
-    {
-      // Ignore
-    }
-
-
-
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> delete(final DeleteRequest request,
         final ResultHandler<? super Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -299,6 +364,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> delete(final DeleteRequest request,
         final ResultHandler<? super Result> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -315,6 +384,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public <R extends ExtendedResult> FutureResult<R> extendedRequest(
         final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -329,6 +402,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public <R extends ExtendedResult> FutureResult<R> extendedRequest(
         final ExtendedRequest<R> request,
         final ResultHandler<? super R> resultHandler,
@@ -349,6 +426,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public Connection getSynchronousConnection()
     {
       return new SynchronousConnection(this);
@@ -359,20 +437,29 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public boolean isClosed()
     {
-      return isClosed;
+      return isClosed.get();
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public boolean isValid()
     {
-      return !isClosed && connection.isValid();
+      return connection.isValid() && !isClosed();
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> modify(final ModifyRequest request,
         final ResultHandler<? super Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -387,6 +474,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> modify(final ModifyRequest request,
         final ResultHandler<? super Result> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -403,6 +494,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> modifyDN(final ModifyDNRequest request,
         final ResultHandler<? super Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -417,6 +512,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> modifyDN(final ModifyDNRequest request,
         final ResultHandler<? super Result> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -436,6 +535,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<SearchResultEntry> readEntry(final DN name,
         final Collection<String> attributeDescriptions,
         final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -454,6 +554,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<RootDSE> readRootDSE(
         final ResultHandler<? super RootDSE> handler)
         throws UnsupportedOperationException, IllegalStateException
@@ -470,6 +571,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Schema> readSchema(final DN name,
         final ResultHandler<? super Schema> handler)
         throws UnsupportedOperationException, IllegalStateException
@@ -486,6 +588,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Schema> readSchemaForEntry(final DN name,
         final ResultHandler<? super Schema> handler)
         throws UnsupportedOperationException, IllegalStateException
@@ -499,17 +602,27 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void removeConnectionEventListener(
         final ConnectionEventListener listener) throws NullPointerException
     {
+      Validator.ensureNotNull(listener);
       if (isClosed())
       {
         throw new IllegalStateException();
       }
+      listeners.remove(listener);
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> search(final SearchRequest request,
         final SearchResultHandler handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -524,6 +637,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> search(final SearchRequest request,
         final SearchResultHandler resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -543,6 +660,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<SearchResultEntry> searchSingleEntry(
         final SearchRequest request,
         final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -561,9 +679,10 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public String toString()
     {
-      StringBuilder builder = new StringBuilder();
+      final StringBuilder builder = new StringBuilder();
       builder.append("PooledConnection(");
       builder.append(connection);
       builder.append(')');
@@ -573,63 +692,86 @@
 
 
 
-  private class ReconnectHandler implements
-      ResultHandler<AsynchronousConnection>
+  /**
+   * A queue element is either a pending connection request future awaiting an
+   * {@code AsynchronousConnection} or it is an unused
+   * {@code AsynchronousConnection} awaiting a connection request.
+   */
+  private static final class QueueElement
   {
-    public void handleErrorResult(final ErrorResultException error)
-    {
-      // The reconnect failed. Fail the connect attempt.
-      numConnections--;
-      // The reconnect failed. The underlying connection factory
-      // probably went
-      // down. Just fail all pending futures
-      synchronized (pool)
-      {
-        while (!pendingFutures.isEmpty())
-        {
-          pendingFutures.poll().handleErrorResult(error);
-        }
-      }
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-      {
-        StaticUtils.DEBUG_LOG.warning(String.format(
-            "Reconnect failed. Failed all pending futures: "
-                + error.getMessage()
-                + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
-      }
+    private final Object value;
 
+
+
+    QueueElement(final AsynchronousConnection connection)
+    {
+      this.value = connection;
     }
 
 
 
-    public void handleResult(final AsynchronousConnection connection)
+    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
     {
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
+    }
+
+
+
+    AsynchronousConnection getWaitingConnection()
+    {
+      if (value instanceof AsynchronousConnection)
       {
-        StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. "
-            + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
+        return (AsynchronousConnection) value;
       }
-      synchronized (pool)
+      else
       {
-        releaseConnection(connection);
+        throw new IllegalStateException();
       }
     }
+
+
+
+    @SuppressWarnings("unchecked")
+    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
+    {
+      if (value instanceof AsynchronousFutureResult)
+      {
+        return (AsynchronousFutureResult<AsynchronousConnection>) value;
+      }
+      else
+      {
+        throw new IllegalStateException();
+      }
+    }
+
+
+
+    boolean isWaitingFuture()
+    {
+      return value instanceof AsynchronousFutureResult;
+    }
+
+
+
+    public String toString()
+    {
+      return String.valueOf(value);
+    }
   }
 
 
 
-  private final ConnectionFactory connectionFactory;
+  // Guarded by queue.
+  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
 
-  private volatile int numConnections;
+  private final ConnectionFactory factory;
 
   private final int poolSize;
 
-  // FIXME: should use a better collection than this - CLQ?
-  private final Queue<AsynchronousConnection> pool;
+  private final Semaphore currentPoolSize;
 
-  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
+  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
+    new ConnectionResultHandler();
 
 
 
@@ -637,109 +779,16 @@
    * Creates a new connection pool which will maintain {@code poolSize}
    * connections created using the provided connection factory.
    *
-   * @param connectionFactory
+   * @param factory
    *          The connection factory to use for creating new connections.
    * @param poolSize
    *          The maximum size of the connection pool.
    */
-  ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize)
+  ConnectionPool(final ConnectionFactory factory, final int poolSize)
   {
-    this.connectionFactory = connectionFactory;
+    this.factory = factory;
     this.poolSize = poolSize;
-    this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
-    this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
-  }
-
-
-
-  @Override
-  public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
-      final ResultHandler<? super AsynchronousConnection> handler)
-  {
-    // This entire method is synchronized to ensure new connects are
-    // done synchronously to avoid the "pending connect" case.
-    AsynchronousConnection conn;
-    synchronized (pool)
-    {
-      // Check to see if we have a connection in the pool
-      conn = pool.poll();
-      if (conn == null)
-      {
-        // Pool was empty. Maybe a new connection if pool size is not
-        // reached
-        if (numConnections >= poolSize)
-        {
-          // We reached max # of conns so wait for a connection to
-          // become available.
-          final FuturePooledConnection future = new FuturePooledConnection(
-              handler);
-          pendingFutures.add(future);
-
-          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-          {
-            StaticUtils.DEBUG_LOG.finest(String.format(
-                "No connections available. Wait-listed"
-                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                numConnections, pool.size(), pendingFutures.size()));
-          }
-
-          return future;
-        }
-      }
-    }
-
-    if (conn == null)
-    {
-      try
-      {
-        // We can create a new connection.
-        conn = connectionFactory.getAsynchronousConnection(null).get();
-        numConnections++;
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG.finest(String.format(
-              "New connection established and aquired. "
-                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-              numConnections, pool.size(), pendingFutures.size()));
-        }
-      }
-      catch (final ErrorResultException e)
-      {
-        if (handler != null)
-        {
-          handler.handleErrorResult(e);
-        }
-        return new CompletedFutureResult<AsynchronousConnection>(e);
-      }
-      catch (final InterruptedException e)
-      {
-        final ErrorResultException error = new ErrorResultException(Responses
-            .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
-        if (handler != null)
-        {
-          handler.handleErrorResult(error);
-        }
-        return new CompletedFutureResult<AsynchronousConnection>(error);
-      }
-    }
-    else
-    {
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-      {
-        StaticUtils.DEBUG_LOG.finest(String.format(
-            "Connection aquired from pool. "
-                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
-      }
-    }
-
-    final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-        conn);
-    if (handler != null)
-    {
-      handler.handleResult(pooledConnection);
-    }
-    return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+    this.currentPoolSize = new Semaphore(poolSize);
   }
 
 
@@ -747,11 +796,59 @@
   /**
    * {@inheritDoc}
    */
+  @Override
+  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
+      final ResultHandler<? super AsynchronousConnection> handler)
+  {
+    QueueElement holder;
+    synchronized (queue)
+    {
+      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
+      {
+        holder = new QueueElement(handler);
+        queue.add(holder);
+      }
+      else
+      {
+        holder = queue.removeFirst();
+      }
+    }
+
+    if (!holder.isWaitingFuture())
+    {
+      // There was a completed connection attempt.
+      final AsynchronousConnection connection = holder.getWaitingConnection();
+      final PooledConnection pooledConnection = new PooledConnection(connection);
+      if (handler != null)
+      {
+        handler.handleResult(pooledConnection);
+      }
+      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+    }
+    else
+    {
+      // Grow the pool if needed.
+      final FutureResult<AsynchronousConnection> future = holder
+          .getWaitingFuture();
+      if (!future.isDone() && currentPoolSize.tryAcquire())
+      {
+        factory.getAsynchronousConnection(connectionResultHandler);
+      }
+      return future;
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public String toString()
   {
     final StringBuilder builder = new StringBuilder();
     builder.append("ConnectionPool(");
-    builder.append(String.valueOf(connectionFactory));
+    builder.append(String.valueOf(factory));
     builder.append(',');
     builder.append(poolSize);
     builder.append(')');
@@ -760,47 +857,25 @@
 
 
 
-  private void releaseConnection(final AsynchronousConnection connection)
+  private void publishConnection(final AsynchronousConnection connection)
   {
-    // See if there waiters pending.
-    for (;;)
+    QueueElement holder;
+    synchronized (queue)
     {
-      final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-          connection);
-      final FuturePooledConnection future = pendingFutures.poll();
-
-      if (future == null)
+      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
       {
-        // No waiters - so drop out and add connection to pool.
-        break;
-      }
-
-      future.handleResult(pooledConnection);
-
-      if (!future.isCancelled())
-      {
-        // The future was not cancelled and the connection was
-        // accepted.
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG.finest(String.format(
-              "Connection released and directly "
-                  + "given to waiter. numConnections: %d, poolSize: %d, "
-                  + "pendingFutures: %d", numConnections, pool.size(),
-              pendingFutures.size()));
-        }
+        holder = new QueueElement(connection);
+        queue.add(holder);
         return;
       }
+      else
+      {
+        holder = queue.removeFirst();
+      }
     }
 
-    // No waiters. Put back in pool.
-    pool.offer(connection);
-    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-    {
-      StaticUtils.DEBUG_LOG.finest(String.format(
-          "Connection released to pool. numConnections: %d, "
-              + "poolSize: %d, pendingFutures: %d", numConnections,
-          pool.size(), pendingFutures.size()));
-    }
+    // There was waiting future, so close it.
+    final PooledConnection pooledConnection = new PooledConnection(connection);
+    holder.getWaitingFuture().handleResult(pooledConnection);
   }
 }

--
Gitblit v1.10.0