From f2160f4bd1c8ac67e5a86a6710d431e8932877f9 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 28 May 2010 11:47:51 +0000
Subject: [PATCH] Synchronize SDK on java.net with internal repository.

---
 sdk/src/org/opends/sdk/ConnectionPool.java |  737 +++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 429 insertions(+), 308 deletions(-)

diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index d0b8780..de641ce 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -49,21 +49,32 @@
  */
 final class ConnectionPool extends AbstractConnectionFactory
 {
-  private final ConnectionFactory connectionFactory;
-
-  private volatile int numConnections;
-
-  private final int poolSize;
-
-  // FIXME: should use a better collection than this - CLQ?
-  private final Queue<AsynchronousConnection> pool;
-
-  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
+  // Future used for waiting for pooled connections to become available.
+  private static final class FuturePooledConnection extends
+      AbstractFutureResult<AsynchronousConnection>
+  {
+    private FuturePooledConnection(
+        final ResultHandler<? super AsynchronousConnection> handler)
+    {
+      super(handler);
+    }
 
 
 
-  private final class PooledConnectionWapper implements
-      AsynchronousConnection, ConnectionEventListener
+    /**
+     * {@inheritDoc}
+     */
+    public int getRequestID()
+    {
+      return -1;
+    }
+
+  }
+
+
+
+  private final class PooledConnectionWapper implements AsynchronousConnection,
+      ConnectionEventListener
   {
     private final AsynchronousConnection connection;
 
@@ -71,7 +82,7 @@
 
 
 
-    private PooledConnectionWapper(AsynchronousConnection connection)
+    private PooledConnectionWapper(final AsynchronousConnection connection)
     {
       this.connection = connection;
       this.connection.addConnectionEventListener(this);
@@ -79,7 +90,7 @@
 
 
 
-    public void abandon(AbandonRequest request)
+    public FutureResult<Void> abandon(final AbandonRequest request)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
@@ -87,13 +98,13 @@
       {
         throw new IllegalStateException();
       }
-      connection.abandon(request);
+      return connection.abandon(request);
     }
 
 
 
-    public FutureResult<Result> add(AddRequest request,
-        ResultHandler<Result> handler)
+    public FutureResult<Result> add(final AddRequest request,
+        final ResultHandler<Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
@@ -106,8 +117,36 @@
 
 
 
-    public FutureResult<BindResult> bind(BindRequest request,
-        ResultHandler<? super BindResult> handler)
+    public FutureResult<Result> add(final AddRequest request,
+        final ResultHandler<Result> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection
+          .add(request, resultHandler, intermediateResponseHandler);
+    }
+
+
+
+    public void addConnectionEventListener(
+        final ConnectionEventListener listener) throws IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+    }
+
+
+
+    public FutureResult<BindResult> bind(final BindRequest request,
+        final ResultHandler<? super BindResult> handler)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
@@ -120,6 +159,22 @@
 
 
 
+    public FutureResult<BindResult> bind(final BindRequest request,
+        final ResultHandler<? super BindResult> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.bind(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
     public void close()
     {
       synchronized (pool)
@@ -145,26 +200,23 @@
       {
         StaticUtils.DEBUG_LOG.warning(String.format(
             "Dead connection released and closed. "
-                + "numConnections: %d, poolSize: %d, "
-                + "pendingFutures: %d", numConnections, pool.size(),
-            pendingFutures.size()));
+                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
+            numConnections, pool.size(), pendingFutures.size()));
       }
 
       if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
       {
         StaticUtils.DEBUG_LOG.warning(String.format(
             "Reconnect attempt starting. "
-                + "numConnections: %d, poolSize: %d, "
-                + "pendingFutures: %d", numConnections, pool.size(),
-            pendingFutures.size()));
+                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
+            numConnections, pool.size(), pendingFutures.size()));
       }
-      connectionFactory
-          .getAsynchronousConnection(new ReconnectHandler());
+      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
     }
 
 
 
-    public void close(UnbindRequest request, String reason)
+    public void close(final UnbindRequest request, final String reason)
         throws NullPointerException
     {
       close();
@@ -172,8 +224,8 @@
 
 
 
-    public FutureResult<CompareResult> compare(CompareRequest request,
-        ResultHandler<? super CompareResult> handler)
+    public FutureResult<CompareResult> compare(final CompareRequest request,
+        final ResultHandler<? super CompareResult> handler)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
@@ -186,8 +238,67 @@
 
 
 
-    public FutureResult<Result> delete(DeleteRequest request,
-        ResultHandler<Result> handler)
+    public FutureResult<CompareResult> compare(final CompareRequest request,
+        final ResultHandler<? super CompareResult> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.compare(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    public void connectionClosed()
+    {
+      // Ignore - we intercept close via the close method.
+    }
+
+
+
+    public void connectionErrorOccurred(final boolean isDisconnectNotification,
+        final ErrorResultException error)
+    {
+      // Remove this connection from the pool if its in there. If not,
+      // just ignore and wait for the user to close and we can deal with it
+      // there.
+      if (pool.remove(this))
+      {
+        numConnections--;
+        connection.removeConnectionEventListener(this);
+
+        // FIXME: should still close the connection, but we need to be
+        // careful that users of the pooled connection get a sensible
+        // error if they continue to use it (i.e. not an NPE or ISE).
+
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+        {
+          StaticUtils.DEBUG_LOG.warning(String.format(
+              "Connection error occured and removed from pool: "
+                  + error.getMessage()
+                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+              numConnections, pool.size(), pendingFutures.size()));
+        }
+      }
+    }
+
+
+
+    public void connectionReceivedUnsolicitedNotification(
+        final ExtendedResult notification)
+    {
+      // Ignore
+    }
+
+
+
+    public FutureResult<Result> delete(final DeleteRequest request,
+        final ResultHandler<Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
@@ -200,8 +311,24 @@
 
 
 
-    public <R extends Result> FutureResult<R> extendedRequest(
-        ExtendedRequest<R> request, ResultHandler<? super R> handler)
+    public FutureResult<Result> delete(final DeleteRequest request,
+        final ResultHandler<Result> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.delete(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
@@ -214,8 +341,10 @@
 
 
 
-    public FutureResult<Result> modify(ModifyRequest request,
-        ResultHandler<Result> handler)
+    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+        final ExtendedRequest<R> request,
+        final ResultHandler<? super R> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
@@ -223,37 +352,8 @@
       {
         throw new IllegalStateException();
       }
-      return connection.modify(request, handler);
-    }
-
-
-
-    public FutureResult<Result> modifyDN(ModifyDNRequest request,
-        ResultHandler<Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.modifyDN(request, handler);
-    }
-
-
-
-    public FutureResult<Result> search(SearchRequest request,
-        ResultHandler<Result> resultHandler,
-        SearchResultHandler searchResulthandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.search(request, resultHandler,
-          searchResulthandler);
+      return connection.extendedRequest(request, resultHandler,
+          intermediateResponseHandler);
     }
 
 
@@ -261,107 +361,9 @@
     /**
      * {@inheritDoc}
      */
-    public FutureResult<SearchResultEntry> readEntry(DN name,
-        Collection<String> attributeDescriptions,
-        ResultHandler<? super SearchResultEntry> resultHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
+    public Connection getSynchronousConnection()
     {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.readEntry(name, attributeDescriptions,
-          resultHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public FutureResult<SearchResultEntry> searchSingleEntry(
-        SearchRequest request,
-        ResultHandler<? super SearchResultEntry> resultHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.searchSingleEntry(request, resultHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public FutureResult<RootDSE> readRootDSE(
-        ResultHandler<RootDSE> handler)
-        throws UnsupportedOperationException, IllegalStateException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.readRootDSE(handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public FutureResult<Schema> readSchemaForEntry(DN name,
-        ResultHandler<Schema> handler)
-        throws UnsupportedOperationException, IllegalStateException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.readSchemaForEntry(name, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public FutureResult<Schema> readSchema(DN name,
-        ResultHandler<Schema> handler)
-        throws UnsupportedOperationException, IllegalStateException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-      return connection.readSchema(name, handler);
-    }
-
-
-
-    public void addConnectionEventListener(
-        ConnectionEventListener listener) throws IllegalStateException,
-        NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
-    }
-
-
-
-    public void removeConnectionEventListener(
-        ConnectionEventListener listener) throws NullPointerException
-    {
-      if (isClosed())
-      {
-        throw new IllegalStateException();
-      }
+      return new SynchronousConnection(this);
     }
 
 
@@ -383,42 +385,189 @@
 
 
 
-    public void connectionReceivedUnsolicitedNotification(
-        GenericExtendedResult notification)
+    public FutureResult<Result> modify(final ModifyRequest request,
+        final ResultHandler<Result> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
     {
-      // Ignore
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.modify(request, handler);
     }
 
 
 
-    public void connectionErrorOccurred(
-        boolean isDisconnectNotification, ErrorResultException error)
+    public FutureResult<Result> modify(final ModifyRequest request,
+        final ResultHandler<Result> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
     {
-      // Remove this connection from the pool if its in there. If not,
-      // just
-      // ignore and wait for the user to close and we can deal with it
-      // there.
-      if (pool.remove(this))
+      if (isClosed())
       {
-        numConnections--;
-        connection.removeConnectionEventListener(this);
-
-        // FIXME: should still close the connection, but we need to be
-        // careful that users of the pooled connection get a sensible
-        // error if they continue to use it (i.e. not an NPE or ISE).
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-        {
-          StaticUtils.DEBUG_LOG
-              .warning(String
-                  .format(
-                      "Connection error occured and removed from pool: "
-                          + error.getMessage()
-                          + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
-        }
+        throw new IllegalStateException();
       }
+      return connection.modify(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
+        final ResultHandler<Result> handler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.modifyDN(request, handler);
+    }
+
+
+
+    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
+        final ResultHandler<Result> resultHandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.modifyDN(request, resultHandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public FutureResult<SearchResultEntry> readEntry(final DN name,
+        final Collection<String> attributeDescriptions,
+        final ResultHandler<? super SearchResultEntry> resultHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.readEntry(name, attributeDescriptions, resultHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public FutureResult<RootDSE> readRootDSE(
+        final ResultHandler<RootDSE> handler)
+        throws UnsupportedOperationException, IllegalStateException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.readRootDSE(handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public FutureResult<Schema> readSchema(final DN name,
+        final ResultHandler<Schema> handler)
+        throws UnsupportedOperationException, IllegalStateException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.readSchema(name, handler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public FutureResult<Schema> readSchemaForEntry(final DN name,
+        final ResultHandler<Schema> handler)
+        throws UnsupportedOperationException, IllegalStateException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.readSchemaForEntry(name, handler);
+    }
+
+
+
+    public void removeConnectionEventListener(
+        final ConnectionEventListener listener) throws NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+    }
+
+
+
+    public FutureResult<Result> search(final SearchRequest request,
+        final ResultHandler<Result> resultHandler,
+        final SearchResultHandler searchResulthandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.search(request, resultHandler, searchResulthandler);
+    }
+
+
+
+    public FutureResult<Result> search(final SearchRequest request,
+        final ResultHandler<Result> resultHandler,
+        final SearchResultHandler searchResulthandler,
+        final IntermediateResponseHandler intermediateResponseHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.search(request, resultHandler, searchResulthandler,
+          intermediateResponseHandler);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public FutureResult<SearchResultEntry> searchSingleEntry(
+        final SearchRequest request,
+        final ResultHandler<? super SearchResultEntry> resultHandler)
+        throws UnsupportedOperationException, IllegalStateException,
+        NullPointerException
+    {
+      if (isClosed())
+      {
+        throw new IllegalStateException();
+      }
+      return connection.searchSingleEntry(request, resultHandler);
     }
   }
 
@@ -427,7 +576,7 @@
   private class ReconnectHandler implements
       ResultHandler<AsynchronousConnection>
   {
-    public void handleErrorResult(ErrorResultException error)
+    public void handleErrorResult(final ErrorResultException error)
     {
       // The reconnect failed. Fail the connect attempt.
       numConnections--;
@@ -443,29 +592,24 @@
       }
       if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
       {
-        StaticUtils.DEBUG_LOG
-            .warning(String
-                .format(
-                    "Reconnect failed. Failed all pending futures: "
-                        + error.getMessage()
-                        + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-                    numConnections, pool.size(), pendingFutures.size()));
+        StaticUtils.DEBUG_LOG.warning(String.format(
+            "Reconnect failed. Failed all pending futures: "
+                + error.getMessage()
+                + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+            numConnections, pool.size(), pendingFutures.size()));
       }
 
     }
 
 
 
-    public void handleResult(AsynchronousConnection connection)
+    public void handleResult(final AsynchronousConnection connection)
     {
       if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
       {
-        StaticUtils.DEBUG_LOG
-            .finest(String
-                .format(
-                    "Reconnect succeded. "
-                        + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-                    numConnections, pool.size(), pendingFutures.size()));
+        StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. "
+            + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+            numConnections, pool.size(), pendingFutures.size()));
       }
       synchronized (pool)
       {
@@ -476,75 +620,16 @@
 
 
 
-  // Future used for waiting for pooled connections to become available.
-  private static final class FuturePooledConnection extends
-      AbstractFutureResult<AsynchronousConnection>
-  {
-    private FuturePooledConnection(
-        ResultHandler<? super AsynchronousConnection> handler)
-    {
-      super(handler);
-    }
+  private final ConnectionFactory connectionFactory;
 
+  private volatile int numConnections;
 
+  private final int poolSize;
 
-    /**
-     * {@inheritDoc}
-     */
-    public int getRequestID()
-    {
-      return -1;
-    }
+  // FIXME: should use a better collection than this - CLQ?
+  private final Queue<AsynchronousConnection> pool;
 
-  }
-
-
-
-  private void releaseConnection(AsynchronousConnection connection)
-  {
-    // See if there waiters pending.
-    for (;;)
-    {
-      PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-          connection);
-      FuturePooledConnection future = pendingFutures.poll();
-
-      if (future == null)
-      {
-        // No waiters - so drop out and add connection to pool.
-        break;
-      }
-
-      future.handleResult(pooledConnection);
-
-      if (!future.isCancelled())
-      {
-        // The future was not cancelled and the connection was
-        // accepted.
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG
-              .finest(String
-                  .format(
-                      "Connection released and directly "
-                          + "given to waiter. numConnections: %d, poolSize: %d, "
-                          + "pendingFutures: %d", numConnections, pool
-                          .size(), pendingFutures.size()));
-        }
-        return;
-      }
-    }
-
-    // No waiters. Put back in pool.
-    pool.offer(connection);
-    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-    {
-      StaticUtils.DEBUG_LOG.finest(String.format(
-          "Connection released to pool. numConnections: %d, "
-              + "poolSize: %d, pendingFutures: %d", numConnections,
-          pool.size(), pendingFutures.size()));
-    }
-  }
+  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
 
 
 
@@ -553,12 +638,11 @@
    * connections created using the provided connection factory.
    *
    * @param connectionFactory
-   *          The connection factory to use for creating new
-   *          connections.
+   *          The connection factory to use for creating new connections.
    * @param poolSize
    *          The maximum size of the connection pool.
    */
-  ConnectionPool(ConnectionFactory connectionFactory, int poolSize)
+  ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize)
   {
     this.connectionFactory = connectionFactory;
     this.poolSize = poolSize;
@@ -568,12 +652,12 @@
 
 
 
+  @Override
   public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
-      ResultHandler<AsynchronousConnection> handler)
+      final ResultHandler<AsynchronousConnection> handler)
   {
     // This entire method is synchronized to ensure new connects are
-    // done
-    // synchronously to avoid the "pending connect" case.
+    // done synchronously to avoid the "pending connect" case.
     AsynchronousConnection conn;
     synchronized (pool)
     {
@@ -587,19 +671,16 @@
         {
           // We reached max # of conns so wait for a connection to
           // become available.
-          FuturePooledConnection future = new FuturePooledConnection(
+          final FuturePooledConnection future = new FuturePooledConnection(
               handler);
           pendingFutures.add(future);
 
           if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
           {
-            StaticUtils.DEBUG_LOG
-                .finest(String
-                    .format(
-                        "No connections available. Wait-listed"
-                            + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                        numConnections, pool.size(), pendingFutures
-                            .size()));
+            StaticUtils.DEBUG_LOG.finest(String.format(
+                "No connections available. Wait-listed"
+                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                numConnections, pool.size(), pendingFutures.size()));
           }
 
           return future;
@@ -616,16 +697,13 @@
         numConnections++;
         if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
         {
-          StaticUtils.DEBUG_LOG
-              .finest(String
-                  .format(
-                      "New connection established and aquired. "
-                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
+          StaticUtils.DEBUG_LOG.finest(String.format(
+              "New connection established and aquired. "
+                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+              numConnections, pool.size(), pendingFutures.size()));
         }
       }
-      catch (ErrorResultException e)
+      catch (final ErrorResultException e)
       {
         if (handler != null)
         {
@@ -633,9 +711,9 @@
         }
         return new CompletedFutureResult<AsynchronousConnection>(e);
       }
-      catch (InterruptedException e)
+      catch (final InterruptedException e)
       {
-        ErrorResultException error = new ErrorResultException(Responses
+        final ErrorResultException error = new ErrorResultException(Responses
             .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
         if (handler != null)
         {
@@ -648,23 +726,66 @@
     {
       if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
       {
-        StaticUtils.DEBUG_LOG
-            .finest(String
-                .format(
-                    "Connection aquired from pool. "
-                        + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                    numConnections, pool.size(), pendingFutures.size()));
+        StaticUtils.DEBUG_LOG.finest(String.format(
+            "Connection aquired from pool. "
+                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+            numConnections, pool.size(), pendingFutures.size()));
       }
     }
 
-    PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+    final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
         conn);
     if (handler != null)
     {
       handler.handleResult(pooledConnection);
     }
-    return new CompletedFutureResult<AsynchronousConnection>(
-        pooledConnection);
+    return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
 
   }
+
+
+
+  private void releaseConnection(final AsynchronousConnection connection)
+  {
+    // See if there waiters pending.
+    for (;;)
+    {
+      final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+          connection);
+      final FuturePooledConnection future = pendingFutures.poll();
+
+      if (future == null)
+      {
+        // No waiters - so drop out and add connection to pool.
+        break;
+      }
+
+      future.handleResult(pooledConnection);
+
+      if (!future.isCancelled())
+      {
+        // The future was not cancelled and the connection was
+        // accepted.
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+        {
+          StaticUtils.DEBUG_LOG.finest(String.format(
+              "Connection released and directly "
+                  + "given to waiter. numConnections: %d, poolSize: %d, "
+                  + "pendingFutures: %d", numConnections, pool.size(),
+              pendingFutures.size()));
+        }
+        return;
+      }
+    }
+
+    // No waiters. Put back in pool.
+    pool.offer(connection);
+    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+    {
+      StaticUtils.DEBUG_LOG.finest(String.format(
+          "Connection released to pool. numConnections: %d, "
+              + "poolSize: %d, pendingFutures: %d", numConnections,
+          pool.size(), pendingFutures.size()));
+    }
+  }
 }

--
Gitblit v1.10.0