From 9d6f5c9a5b7771e595892e6935cf1cc42012c4c6 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 21:37:36 +0000
Subject: [PATCH] Fixed some bugs with the connection pool.  Moved heart beat back to heart beat connection factory from isValid method.

---
 sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java |   85 +++++++++++++++++++++++++++++++++---------
 1 files changed, 66 insertions(+), 19 deletions(-)

diff --git a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index 3d426cd..9e95afe 100644
--- a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,14 @@
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.opends.sdk.requests.*;
 import org.opends.sdk.responses.*;
 import org.opends.sdk.schema.Schema;
 
 import com.sun.opends.sdk.util.FutureResultTransformer;
-import com.sun.opends.sdk.util.Validator;
 
 
 
@@ -46,10 +47,14 @@
  * An heart beat connection factory can be used to create connections
  * that sends a periodic search request to a Directory Server.
  */
-public class HeartBeatConnectionFactory extends
+final class HeartBeatConnectionFactory extends
     AbstractConnectionFactory<AsynchronousConnection>
 {
-  private final int interval;
+  private final SearchRequest heartBeat;
+
+  private final long timeout;
+
+  private final TimeUnit unit;
 
   private final List<AsynchronousConnectionImpl> activeConnections;
 
@@ -59,7 +64,30 @@
 
   // FIXME: use a single global scheduler?
 
-  // FIXME: change timeout parameters to long+TimeUnit.
+  /**
+   * Creates a new heart-beat connection factory which will create
+   * connections using the provided connection factory and periodically
+   * ping any created connections in order to detect that they are still
+   * alive.
+   *
+   * @param connectionFactory
+   *          The connection factory to use for creating connections.
+   * @param timeout
+   *          The time to wait between keepalive pings.
+   * @param unit
+   *          The time unit of the timeout argument.
+   */
+  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
+      long timeout, TimeUnit unit)
+  {
+    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
+  }
+
+
+
+  private static final SearchRequest DEFAULT_SEARCH = Requests
+      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
+          "1.1");
 
 
 
@@ -71,15 +99,19 @@
    *
    * @param connectionFactory
    *          The connection factory to use for creating connections.
-   * @param interval
-   *          The period between keepalive pings.
+   * @param timeout
+   *          The time to wait between keepalive pings.
+   * @param unit
+   *          The time unit of the timeout argument.
+   * @param heartBeat
+   *          The search request to use when pinging connections.
    */
-  public HeartBeatConnectionFactory(
-      ConnectionFactory<?> connectionFactory,
-      int interval)
+  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
+      long timeout, TimeUnit unit, SearchRequest heartBeat)
   {
-    Validator.ensureNotNull(connectionFactory);
-    this.interval = interval;
+    this.heartBeat = heartBeat;
+    this.timeout = timeout;
+    this.unit = unit;
     this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
     this.parentFactory = connectionFactory;
 
@@ -93,9 +125,11 @@
    * operations.
    */
   private final class AsynchronousConnectionImpl implements
-      AsynchronousConnection, ConnectionEventListener
+      AsynchronousConnection, ConnectionEventListener, ResultHandler<Result>
   {
     private final AsynchronousConnection connection;
+    private long lastSuccessfulPing;
+    private FutureResult<Result> lastPingFuture;
 
 
     private AsynchronousConnectionImpl(AsynchronousConnection connection)
@@ -316,9 +350,9 @@
      */
     public boolean isValid()
     {
-      // Avoid extra pings... Let the next ping find out if this connection
-      // is still valid.
-      return connection.isValid();
+      return connection.isValid() && (lastSuccessfulPing <= 0 ||
+          System.currentTimeMillis() - lastSuccessfulPing <
+              unit.toMillis(timeout) * 2);
     }
 
     public void connectionReceivedUnsolicitedNotification(
@@ -338,6 +372,15 @@
         activeConnections.remove(this);
       }
     }
+
+    public void handleErrorResult(ErrorResultException error) {
+      connection.close(Requests.newUnbindRequest(),
+          "Heartbeat retured error: " + error);
+    }
+
+    public void handleResult(Result result) {
+        lastSuccessfulPing = System.currentTimeMillis();
+    }
   }
 
 
@@ -354,22 +397,26 @@
 
     public void run()
     {
+      long startTime;
       while(true)
       {
+        startTime = System.currentTimeMillis();
         synchronized (activeConnections)
         {
           for (AsynchronousConnectionImpl connection : activeConnections)
           {
-            if(!connection.isValid())
+            if(connection.lastPingFuture == null ||
+                connection.lastPingFuture.isDone())
             {
-              connection.close(Requests.newUnbindRequest(),
-                               "Connection no longer valid");
+              connection.lastPingFuture =
+                  connection.search(heartBeat, connection, null);
             }
           }
         }
         try
         {
-          sleep(interval);
+          sleep(unit.toMillis(timeout) -
+              (System.currentTimeMillis() - startTime));
         }
         catch (InterruptedException e)
         {

--
Gitblit v1.10.0