From abc1a19fd4dee9729fd0aed721575a396d249bd4 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Wed, 16 Dec 2009 22:13:34 +0000
Subject: [PATCH] Migrate remaining future impls over to new future APIs.

---
 sdk/src/org/opends/sdk/ConnectionPool.java |  287 +++++++++++++++-----------------------------------------
 1 files changed, 79 insertions(+), 208 deletions(-)

diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index 73aad16..e002423 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -32,15 +32,15 @@
 import java.util.Collection;
 import java.util.Stack;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 
 import org.opends.sdk.requests.*;
 import org.opends.sdk.responses.*;
 import org.opends.sdk.schema.Schema;
 
+import com.sun.opends.sdk.util.AbstractFutureResult;
+import com.sun.opends.sdk.util.CompletedFutureResult;
+import com.sun.opends.sdk.util.FutureResultTransformer;
 import com.sun.opends.sdk.util.StaticUtils;
 
 
@@ -48,7 +48,7 @@
 /**
  * A simple connection pool implementation.
  */
-public class ConnectionPool extends
+public final class ConnectionPool extends
     AbstractConnectionFactory<AsynchronousConnection>
 {
   private final ConnectionFactory<?> connectionFactory;
@@ -60,12 +60,33 @@
   // FIXME: should use a better collection than this - CLQ?
   private final Stack<AsynchronousConnection> pool;
 
-  private final ConcurrentLinkedQueue<PendingResultFuture> pendingFutures;
+  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
 
   private final Object lock = new Object();
 
 
 
+  private final class FutureNewConnection
+      extends
+      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
+  {
+    private FutureNewConnection(
+        ResultHandler<? super AsynchronousConnection> handler)
+    {
+      super(handler);
+    }
+
+
+
+    protected AsynchronousConnection transformResult(
+        AsynchronousConnection result) throws ErrorResultException
+    {
+      return new PooledConnectionWapper(result);
+    }
+  }
+
+
+
   private final class PooledConnectionWapper implements
       AsynchronousConnection, ConnectionEventListener
   {
@@ -144,24 +165,37 @@
             return;
           }
 
-          // See if there waiters pending
-          PendingResultFuture future = pendingFutures.poll();
-          if (future != null)
+          // See if there waiters pending.
+          for (;;)
           {
+            FuturePooledConnection future = pendingFutures.poll();
+
+            if (future == null)
+            {
+              // No waiters - so drop out and add connection to pool.
+              break;
+            }
+
             PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                 connection);
-            future.connection(pooledConnection);
-            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+            future.handleResult(pooledConnection);
+
+            if (!future.isCancelled())
             {
-              StaticUtils.DEBUG_LOG
-                  .finest(String
-                      .format(
-                          "Connection released to pool and directly "
-                              + "given to waiter. numConnections: %d, poolSize: %d, "
-                              + "pendingFutures: %d", numConnections,
-                          pool.size(), pendingFutures.size()));
+              // The future was not cancelled and the connection was
+              // accepted.
+              if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+              {
+                StaticUtils.DEBUG_LOG
+                    .finest(String
+                        .format(
+                            "Connection released to pool and directly "
+                                + "given to waiter. numConnections: %d, poolSize: %d, "
+                                + "pendingFutures: %d", numConnections,
+                            pool.size(), pendingFutures.size()));
+              }
+              return;
             }
-            return;
           }
 
           // No waiters. Put back in pool.
@@ -438,165 +472,26 @@
 
 
 
-  private static final class CompletedResultFuture implements
-      FutureResult<AsynchronousConnection>
+  // Future used for waiting for pooled connections to become available.
+  private static final class FuturePooledConnection extends
+      AbstractFutureResult<AsynchronousConnection>
   {
-    private final PooledConnectionWapper connection;
-
-
-
-    private CompletedResultFuture(PooledConnectionWapper connection)
-    {
-      this.connection = connection;
-    }
-
-
-
-    public boolean cancel(boolean mayInterruptIfRunning)
-    {
-      return false;
-    }
-
-
-
-    public AsynchronousConnection get() throws InterruptedException,
-        ErrorResultException
-    {
-      return connection;
-    }
-
-
-
-    public AsynchronousConnection get(long timeout, TimeUnit unit)
-        throws InterruptedException, TimeoutException,
-        ErrorResultException
-    {
-      return connection;
-    }
-
-
-
-    public boolean isCancelled()
-    {
-      return false;
-    }
-
-
-
-    public boolean isDone()
-    {
-      return true;
-    }
-
-
-
-    public int getRequestID()
-    {
-      return -1;
-    }
-  }
-
-
-
-  private final class PendingResultFuture implements
-      FutureResult<AsynchronousConnection>
-  {
-    private volatile boolean isCancelled;
-
-    private volatile PooledConnectionWapper connection;
-
-    private volatile ErrorResultException err;
-
-    private final ResultHandler<? super AsynchronousConnection> handler;
-
-    private final CountDownLatch latch = new CountDownLatch(1);
-
-
-
-    private PendingResultFuture(
+    private FuturePooledConnection(
         ResultHandler<? super AsynchronousConnection> handler)
     {
-      this.handler = handler;
+      super(handler);
     }
 
 
 
-    public synchronized boolean cancel(boolean mayInterruptIfRunning)
-    {
-      return pendingFutures.remove(this) && (isCancelled = true);
-    }
-
-
-
-    public AsynchronousConnection get() throws InterruptedException,
-        ErrorResultException
-    {
-      latch.await();
-      if (err != null)
-      {
-        throw err;
-      }
-      return connection;
-    }
-
-
-
-    public AsynchronousConnection get(long timeout, TimeUnit unit)
-        throws InterruptedException, TimeoutException,
-        ErrorResultException
-    {
-      latch.await(timeout, unit);
-      if (err != null)
-      {
-        throw err;
-      }
-      return connection;
-    }
-
-
-
-    public synchronized boolean isCancelled()
-    {
-      return isCancelled;
-    }
-
-
-
-    public boolean isDone()
-    {
-      return latch.getCount() == 0;
-    }
-
-
-
+    /**
+     * {@inheritDoc}
+     */
     public int getRequestID()
     {
       return -1;
     }
 
-
-
-    private void connection(PooledConnectionWapper connection)
-    {
-      this.connection = connection;
-      if (handler != null)
-      {
-        handler.handleResult(connection);
-      }
-      latch.countDown();
-    }
-
-
-
-    private void error(ErrorResultException e)
-    {
-      this.err = e;
-      if (handler != null)
-      {
-        handler.handleErrorResult(e);
-      }
-      latch.countDown();
-    }
   }
 
 
@@ -604,7 +499,7 @@
   /**
    * Creates a new connection pool which will maintain {@code poolSize}
    * connections created using the provided connection factory.
-   *
+   * 
    * @param connectionFactory
    *          The connection factory to use for creating new
    *          connections.
@@ -617,38 +512,7 @@
     this.connectionFactory = connectionFactory;
     this.poolSize = poolSize;
     this.pool = new Stack<AsynchronousConnection>();
-    this.pendingFutures = new ConcurrentLinkedQueue<PendingResultFuture>();
-  }
-
-
-
-  private final class WrapResultHandler implements
-      ResultHandler<AsynchronousConnection>
-  {
-    private final PendingResultFuture future;
-
-
-
-    private WrapResultHandler(PendingResultFuture future)
-    {
-      this.future = future;
-    }
-
-
-
-    public void handleResult(AsynchronousConnection connection)
-    {
-      PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-          connection);
-      future.connection(pooledConnection);
-    }
-
-
-
-    public void handleErrorResult(ErrorResultException error)
-    {
-      future.error(error);
-    }
+    this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
   }
 
 
@@ -679,19 +543,21 @@
         {
           handler.handleResult(pooledConnection);
         }
-        return new CompletedResultFuture(pooledConnection);
+        return new CompletedFutureResult<AsynchronousConnection>(
+            pooledConnection);
       }
 
-      PendingResultFuture pendingFuture = new PendingResultFuture(
-          handler);
       // Pool was empty. Maybe a new connection if pool size is not
       // reached
       if (numConnections < poolSize)
       {
+        // We can create a new connection.
         numConnections++;
-        WrapResultHandler wrapHandler = new WrapResultHandler(
-            pendingFuture);
-        connectionFactory.getAsynchronousConnection(wrapHandler);
+
+        FutureNewConnection future = new FutureNewConnection(handler);
+        future.setFutureResult(connectionFactory
+            .getAsynchronousConnection(future));
+
         if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
         {
           StaticUtils.DEBUG_LOG
@@ -702,11 +568,16 @@
                       numConnections, pool.size(), pendingFutures
                           .size()));
         }
+
+        return future;
       }
       else
       {
-        // Have to wait
-        pendingFutures.add(pendingFuture);
+        // Pool is full so wait for a connection to become available.
+        FuturePooledConnection future = new FuturePooledConnection(
+            handler);
+        pendingFutures.add(future);
+
         if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
         {
           StaticUtils.DEBUG_LOG
@@ -717,9 +588,9 @@
                       numConnections, pool.size(), pendingFutures
                           .size()));
         }
-      }
 
-      return pendingFuture;
+        return future;
+      }
     }
   }
 }

--
Gitblit v1.10.0