From 1fbc9df5d0c44ae72c76dacc3c945d4f0d641ee6 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 05:55:54 +0000
Subject: [PATCH] Initial implementation of fail over connection factory

---
 sdk/src/org/opends/sdk/ConnectionPool.java |  308 +++++++++++++++++++++++++-------------------------
 1 files changed, 155 insertions(+), 153 deletions(-)

diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index dc22fae..def5ef1 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -31,7 +31,9 @@
 
 import java.util.Collection;
 import java.util.Stack;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 
 import org.opends.sdk.requests.*;
@@ -58,40 +60,17 @@
   private final int poolSize;
 
   // FIXME: should use a better collection than this - CLQ?
-  private final Stack<AsynchronousConnection> pool;
+  private final Queue<AsynchronousConnection> pool;
 
   private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
 
-  private final Object lock = new Object();
-
-
-
-  private final class FutureNewConnection
-      extends
-      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
-  {
-    private FutureNewConnection(
-        ResultHandler<? super AsynchronousConnection> handler)
-    {
-      super(handler);
-    }
-
-
-
-    protected AsynchronousConnection transformResult(
-        AsynchronousConnection result) throws ErrorResultException
-    {
-      return new PooledConnectionWapper(result);
-    }
-  }
-
 
 
   private final class PooledConnectionWapper implements
       AsynchronousConnection, ConnectionEventListener
   {
-    private AsynchronousConnection connection;
-
+    private final AsynchronousConnection connection;
+    private volatile boolean isClosed;
 
 
     private PooledConnectionWapper(AsynchronousConnection connection)
@@ -106,7 +85,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -120,7 +99,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -134,7 +113,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -145,29 +124,26 @@
 
     public void close()
     {
-      synchronized (lock)
+      synchronized (pool)
       {
-        try
+        if(isClosed)
         {
-          // Don't put closed connections back in the pool.
-          if (connection.isClosed())
-          {
-            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-            {
-              StaticUtils.DEBUG_LOG
-                  .finest(String
-                      .format(
-                          "Dead connection released to pool. "
-                              + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                          numConnections, pool.size(), pendingFutures
-                              .size()));
-            }
-            return;
-          }
+          return;
+        }
+        isClosed = true;
 
+        // Don't put closed connections back in the pool.
+        if (!connection.isValid())
+        {
+          numConnections--;
+        }
+        else
+        {
           // See if there waiters pending.
           for (;;)
           {
+            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+                connection);
             FuturePooledConnection future = pendingFutures.poll();
 
             if (future == null)
@@ -176,8 +152,6 @@
               break;
             }
 
-            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-                connection);
             future.handleResult(pooledConnection);
 
             if (!future.isCancelled())
@@ -189,34 +163,45 @@
                 StaticUtils.DEBUG_LOG
                     .finest(String
                         .format(
-                            "Connection released to pool and directly "
-                                + "given to waiter. numConnections: %d, poolSize: %d, "
-                                + "pendingFutures: %d", numConnections,
-                            pool.size(), pendingFutures.size()));
+                        "Connection released to pool and directly "
+                        + "given to waiter. numConnections: %d, poolSize: %d, "
+                        + "pendingFutures: %d", numConnections,
+                        pool.size(), pendingFutures.size()));
               }
               return;
             }
           }
 
           // No waiters. Put back in pool.
-          pool.push(connection);
+          pool.offer(connection);
           if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
           {
             StaticUtils.DEBUG_LOG
                 .finest(String
                     .format(
-                        "Connection released to pool. "
-                            + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                        numConnections, pool.size(), pendingFutures
-                            .size()));
+                    "Connection released to pool and directly "
+                    + "given to waiter. numConnections: %d, poolSize: %d, "
+                    + "pendingFutures: %d", numConnections,
+                    pool.size(), pendingFutures.size()));
           }
-        }
-        finally
-        {
-          // Null out the underlying connection to prevent further use.
-          connection = null;
+          return;
         }
       }
+
+      // Connection is no longer valid. Close outside of lock
+      connection.removeConnectionEventListener(this);
+      connection.close();
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG
+            .finest(String
+                .format(
+                "Dead connection released to pool. "
+                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                numConnections, pool.size(), pendingFutures
+                    .size()));
+      }
+      return;
     }
 
 
@@ -234,7 +219,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -248,7 +233,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -262,7 +247,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -276,7 +261,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -290,7 +275,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -305,7 +290,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -324,7 +309,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -343,7 +328,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -359,7 +344,7 @@
         ResultHandler<RootDSE> handler)
         throws UnsupportedOperationException, IllegalStateException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -375,7 +360,7 @@
         ResultHandler<Schema> handler)
         throws UnsupportedOperationException, IllegalStateException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -391,7 +376,7 @@
         ResultHandler<Schema> handler)
         throws UnsupportedOperationException, IllegalStateException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -404,7 +389,7 @@
         ConnectionEventListener listener) throws IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -415,7 +400,7 @@
     public void removeConnectionEventListener(
         ConnectionEventListener listener) throws NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -428,10 +413,13 @@
      */
     public boolean isClosed()
     {
-      return connection == null;
+      return isClosed;
     }
 
-
+    public boolean isValid()
+    {
+      return !isClosed && connection.isValid();
+    }
 
     public void connectionReceivedUnsolicitedNotification(
         GenericExtendedResult notification)
@@ -444,10 +432,10 @@
     public void connectionErrorOccurred(
         boolean isDisconnectNotification, ErrorResultException error)
     {
-      synchronized (lock)
+      // Remove this connection from the pool if its in there. If not, just
+      // ignore and wait for the user to close and we can deal with it there.
+      if(pool.remove(this))
       {
-        // Remove this connection from the pool if its in there
-        pool.remove(this);
         numConnections--;
         connection.removeConnectionEventListener(this);
 
@@ -460,11 +448,11 @@
           StaticUtils.DEBUG_LOG
               .finest(String
                   .format(
-                      "Connection error occured: "
-                          + error.getMessage()
-                          + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
+                  "Connection error occured: "
+                  + error.getMessage()
+                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+                  numConnections, pool.size(), pendingFutures
+                      .size()));
         }
       }
     }
@@ -510,86 +498,100 @@
   {
     this.connectionFactory = connectionFactory;
     this.poolSize = poolSize;
-    this.pool = new Stack<AsynchronousConnection>();
+    this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
     this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
   }
 
 
 
-  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
-      ResultHandler<? super AsynchronousConnection> handler)
+  public synchronized FutureResult<AsynchronousConnection>
+  getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler)
   {
-    synchronized (lock)
+    // This entire method is synchronized to ensure new connects are done
+    // synchronously to avoid the "pending connect" case.
+    AsynchronousConnection conn;
+    synchronized(pool)
     {
       // Check to see if we have a connection in the pool
-
-      if (!pool.isEmpty())
+      conn = pool.poll();
+      if (conn == null)
       {
-        AsynchronousConnection conn = pool.pop();
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+        // Pool was empty. Maybe a new connection if pool size is not
+        // reached
+        if (numConnections >= poolSize)
         {
-          StaticUtils.DEBUG_LOG
-              .finest(String
-                  .format(
-                      "Connection aquired from pool. "
-                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
+          // We reached max # of conns so wait for a connection to become available.
+          FuturePooledConnection future = new FuturePooledConnection(
+              handler);
+          pendingFutures.add(future);
+
+          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+          {
+            StaticUtils.DEBUG_LOG
+                .finest(String
+                    .format(
+                    "No connections available. Wait-listed"
+                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                    numConnections, pool.size(), pendingFutures
+                        .size()));
+          }
+
+          return future;
         }
-        PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-            conn);
-        if (handler != null)
-        {
-          handler.handleResult(pooledConnection);
-        }
-        return new CompletedFutureResult<AsynchronousConnection>(
-            pooledConnection);
-      }
-
-      // Pool was empty. Maybe a new connection if pool size is not
-      // reached
-      if (numConnections < poolSize)
-      {
-        // We can create a new connection.
-        numConnections++;
-
-        FutureNewConnection future = new FutureNewConnection(handler);
-        future.setFutureResult(connectionFactory
-            .getAsynchronousConnection(future));
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG
-              .finest(String
-                  .format(
-                      "New connection established and aquired. "
-                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
-        }
-
-        return future;
-      }
-      else
-      {
-        // Pool is full so wait for a connection to become available.
-        FuturePooledConnection future = new FuturePooledConnection(
-            handler);
-        pendingFutures.add(future);
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG
-              .finest(String
-                  .format(
-                      "No connections available. Wait-listed"
-                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
-        }
-
-        return future;
       }
     }
+
+    if(conn == null)
+    {
+      try
+      {
+        // We can create a new connection.
+        conn = connectionFactory.getAsynchronousConnection(handler).get();
+        numConnections++;
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+        {
+          StaticUtils.DEBUG_LOG
+              .finest(String
+                  .format(
+                  "New connection established and aquired. "
+                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                  numConnections, pool.size(), pendingFutures
+                      .size()));
+        }
+      }
+      catch (ErrorResultException e)
+      {
+        return new CompletedFutureResult<AsynchronousConnection>(e);
+      }
+      catch (InterruptedException e)
+      {
+        return new CompletedFutureResult<AsynchronousConnection>(
+            new ErrorResultException(Responses.newResult(
+                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
+      }
+    }
+    else
+    {
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG
+            .finest(String
+                .format(
+                "Connection aquired from pool. "
+                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                numConnections, pool.size(), pendingFutures
+                    .size()));
+      }
+    }
+
+    PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+        conn);
+    if (handler != null)
+    {
+      handler.handleResult(pooledConnection);
+    }
+    return new CompletedFutureResult<AsynchronousConnection>(
+        pooledConnection);
+
   }
 }

--
Gitblit v1.10.0