From 9d6f5c9a5b7771e595892e6935cf1cc42012c4c6 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 21:37:36 +0000
Subject: [PATCH] Fixed some bugs with the connection pool.  Moved heart beat back to heart beat connection factory from isValid method.

---
 sdk/src/org/opends/sdk/ConnectionPool.java |  192 ++++++++++++++++++++++++++++++++---------------
 1 files changed, 129 insertions(+), 63 deletions(-)

diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index def5ef1..6a5a60d 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -132,58 +132,10 @@
         }
         isClosed = true;
 
-        // Don't put closed connections back in the pool.
-        if (!connection.isValid())
+        // Don't put invalid connections back in the pool.
+        if (connection.isValid())
         {
-          numConnections--;
-        }
-        else
-        {
-          // See if there waiters pending.
-          for (;;)
-          {
-            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-                connection);
-            FuturePooledConnection future = pendingFutures.poll();
-
-            if (future == null)
-            {
-              // No waiters - so drop out and add connection to pool.
-              break;
-            }
-
-            future.handleResult(pooledConnection);
-
-            if (!future.isCancelled())
-            {
-              // The future was not cancelled and the connection was
-              // accepted.
-              if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-              {
-                StaticUtils.DEBUG_LOG
-                    .finest(String
-                        .format(
-                        "Connection released to pool and directly "
-                        + "given to waiter. numConnections: %d, poolSize: %d, "
-                        + "pendingFutures: %d", numConnections,
-                        pool.size(), pendingFutures.size()));
-              }
-              return;
-            }
-          }
-
-          // No waiters. Put back in pool.
-          pool.offer(connection);
-          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-          {
-            StaticUtils.DEBUG_LOG
-                .finest(String
-                    .format(
-                    "Connection released to pool and directly "
-                    + "given to waiter. numConnections: %d, poolSize: %d, "
-                    + "pendingFutures: %d", numConnections,
-                    pool.size(), pendingFutures.size()));
-          }
+          releaseConnection(connection);
           return;
         }
       }
@@ -191,19 +143,31 @@
       // Connection is no longer valid. Close outside of lock
       connection.removeConnectionEventListener(this);
       connection.close();
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
       {
         StaticUtils.DEBUG_LOG
-            .finest(String
+            .warning(String
                 .format(
-                "Dead connection released to pool. "
-                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                "Dead connection released and closed. "
+                    + "numConnections: %d, poolSize: %d, " +
+                    "pendingFutures: %d",
                 numConnections, pool.size(), pendingFutures
                     .size()));
       }
-      return;
-    }
 
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+      {
+        StaticUtils.DEBUG_LOG
+            .warning(String
+                .format(
+                "Reconnect attempt starting. "
+                    + "numConnections: %d, poolSize: %d, " +
+                    "pendingFutures: %d",
+                numConnections, pool.size(), pendingFutures
+                    .size()));
+      }
+      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
+    }
 
 
     public void close(UnbindRequest request, String reason)
@@ -443,12 +407,12 @@
         // careful that users of the pooled connection get a sensible
         // error if they continue to use it (i.e. not an NPE or ISE).
 
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
         {
           StaticUtils.DEBUG_LOG
-              .finest(String
+              .warning(String
                   .format(
-                  "Connection error occured: "
+                  "Connection error occured and removed from pool: "
                   + error.getMessage()
                   + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                   numConnections, pool.size(), pendingFutures
@@ -458,7 +422,52 @@
     }
   }
 
+  private class ReconnectHandler
+      implements ResultHandler<AsynchronousConnection>
+  {
+    public void handleErrorResult(ErrorResultException error) {
+      // The reconnect failed. Fail the connect attempt.
+      numConnections --;
+      // The reconnect failed. The underlying connection factory probably went
+      // down. Just fail all pending futures
+      synchronized (pool)
+      {
+        while(!pendingFutures.isEmpty())
+        {
+          pendingFutures.poll().handleErrorResult(error);
+        }
+      }
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+      {
+        StaticUtils.DEBUG_LOG
+            .warning(String
+                .format(
+                "Reconnect failed. Failed all pending futures: "
+                    + error.getMessage()
+                    + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+                numConnections, pool.size(), pendingFutures
+                    .size()));
+      }
 
+    }
+
+    public void handleResult(AsynchronousConnection connection) {
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG
+            .finest(String
+                .format(
+                "Reconnect succeded. "
+                    + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+                numConnections, pool.size(), pendingFutures
+                    .size()));
+      }
+      synchronized (pool)
+      {
+        releaseConnection(connection);
+      }
+    }
+  }
 
   // Future used for waiting for pooled connections to become available.
   private static final class FuturePooledConnection extends
@@ -482,6 +491,54 @@
 
   }
 
+  private void releaseConnection(AsynchronousConnection connection)
+  {
+    // See if there waiters pending.
+    for (;;)
+    {
+      PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+          connection);
+      FuturePooledConnection future = pendingFutures.poll();
+
+      if (future == null)
+      {
+        // No waiters - so drop out and add connection to pool.
+        break;
+      }
+
+      future.handleResult(pooledConnection);
+
+      if (!future.isCancelled())
+      {
+        // The future was not cancelled and the connection was
+        // accepted.
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+        {
+          StaticUtils.DEBUG_LOG
+              .finest(String
+                  .format(
+                  "Connection released and directly "
+                      + "given to waiter. numConnections: %d, poolSize: %d, "
+                      + "pendingFutures: %d", numConnections,
+                  pool.size(), pendingFutures.size()));
+        }
+        return;
+      }
+    }
+
+    // No waiters. Put back in pool.
+    pool.offer(connection);
+    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+    {
+      StaticUtils.DEBUG_LOG
+          .finest(String
+              .format(
+              "Connection released to pool. numConnections: %d, " +
+                  "poolSize: %d, pendingFutures: %d", numConnections,
+              pool.size(), pendingFutures.size()));
+    }
+  }
+
 
 
   /**
@@ -546,7 +603,7 @@
       try
       {
         // We can create a new connection.
-        conn = connectionFactory.getAsynchronousConnection(handler).get();
+        conn = connectionFactory.getAsynchronousConnection(null).get();
         numConnections++;
         if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
         {
@@ -561,13 +618,22 @@
       }
       catch (ErrorResultException e)
       {
+        if (handler != null)
+        {
+          handler.handleErrorResult(e);
+        }
         return new CompletedFutureResult<AsynchronousConnection>(e);
       }
       catch (InterruptedException e)
       {
-        return new CompletedFutureResult<AsynchronousConnection>(
+        ErrorResultException error =
             new ErrorResultException(Responses.newResult(
-                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
+                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
+        if (handler != null)
+        {
+          handler.handleErrorResult(error);
+        }
+        return new CompletedFutureResult<AsynchronousConnection>(error);
       }
     }
     else

--
Gitblit v1.10.0