From 9d6f5c9a5b7771e595892e6935cf1cc42012c4c6 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 21:37:36 +0000
Subject: [PATCH] Fixed some bugs with the connection pool. Moved heart beat back to heart beat connection factory from isValid method.
---
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java | 85 +++++++++++++++++++++++++++++++++---------
1 files changed, 66 insertions(+), 19 deletions(-)
diff --git a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index 3d426cd..9e95afe 100644
--- a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,14 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.FutureResultTransformer;
-import com.sun.opends.sdk.util.Validator;
@@ -46,10 +47,14 @@
* An heart beat connection factory can be used to create connections
* that sends a periodic search request to a Directory Server.
*/
-public class HeartBeatConnectionFactory extends
+final class HeartBeatConnectionFactory extends
AbstractConnectionFactory<AsynchronousConnection>
{
- private final int interval;
+ private final SearchRequest heartBeat;
+
+ private final long timeout;
+
+ private final TimeUnit unit;
private final List<AsynchronousConnectionImpl> activeConnections;
@@ -59,7 +64,30 @@
// FIXME: use a single global scheduler?
- // FIXME: change timeout parameters to long+TimeUnit.
+ /**
+ * Creates a new heart-beat connection factory which will create
+ * connections using the provided connection factory and periodically
+ * ping any created connections in order to detect that they are still
+ * alive.
+ *
+ * @param connectionFactory
+ * The connection factory to use for creating connections.
+ * @param timeout
+ * The time to wait between keepalive pings.
+ * @param unit
+ * The time unit of the timeout argument.
+ */
+ HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
+ long timeout, TimeUnit unit)
+ {
+ this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
+ }
+
+
+
+ private static final SearchRequest DEFAULT_SEARCH = Requests
+ .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
+ "1.1");
@@ -71,15 +99,19 @@
*
* @param connectionFactory
* The connection factory to use for creating connections.
- * @param interval
- * The period between keepalive pings.
+ * @param timeout
+ * The time to wait between keepalive pings.
+ * @param unit
+ * The time unit of the timeout argument.
+ * @param heartBeat
+ * The search request to use when pinging connections.
*/
- public HeartBeatConnectionFactory(
- ConnectionFactory<?> connectionFactory,
- int interval)
+ HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
+ long timeout, TimeUnit unit, SearchRequest heartBeat)
{
- Validator.ensureNotNull(connectionFactory);
- this.interval = interval;
+ this.heartBeat = heartBeat;
+ this.timeout = timeout;
+ this.unit = unit;
this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
this.parentFactory = connectionFactory;
@@ -93,9 +125,11 @@
* operations.
*/
private final class AsynchronousConnectionImpl implements
- AsynchronousConnection, ConnectionEventListener
+ AsynchronousConnection, ConnectionEventListener, ResultHandler<Result>
{
private final AsynchronousConnection connection;
+ private long lastSuccessfulPing;
+ private FutureResult<Result> lastPingFuture;
private AsynchronousConnectionImpl(AsynchronousConnection connection)
@@ -316,9 +350,9 @@
*/
public boolean isValid()
{
- // Avoid extra pings... Let the next ping find out if this connection
- // is still valid.
- return connection.isValid();
+ return connection.isValid() && (lastSuccessfulPing <= 0 ||
+ System.currentTimeMillis() - lastSuccessfulPing <
+ unit.toMillis(timeout) * 2);
}
public void connectionReceivedUnsolicitedNotification(
@@ -338,6 +372,15 @@
activeConnections.remove(this);
}
}
+
+ public void handleErrorResult(ErrorResultException error) {
+ connection.close(Requests.newUnbindRequest(),
+ "Heartbeat retured error: " + error);
+ }
+
+ public void handleResult(Result result) {
+ lastSuccessfulPing = System.currentTimeMillis();
+ }
}
@@ -354,22 +397,26 @@
public void run()
{
+ long startTime;
while(true)
{
+ startTime = System.currentTimeMillis();
synchronized (activeConnections)
{
for (AsynchronousConnectionImpl connection : activeConnections)
{
- if(!connection.isValid())
+ if(connection.lastPingFuture == null ||
+ connection.lastPingFuture.isDone())
{
- connection.close(Requests.newUnbindRequest(),
- "Connection no longer valid");
+ connection.lastPingFuture =
+ connection.search(heartBeat, connection, null);
}
}
}
try
{
- sleep(interval);
+ sleep(unit.toMillis(timeout) -
+ (System.currentTimeMillis() - startTime));
}
catch (InterruptedException e)
{
--
Gitblit v1.10.0