From d5e6d06039eb8ca4f2fcec967a32daef3970ffb5 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 21:37:36 +0000
Subject: [PATCH] Fixed some bugs with the connection pool. Moved heart beat back to heart beat connection factory from isValid method.
---
opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java | 42 +-----
opendj-sdk/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java | 9 +
opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java | 6
opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java | 31 ----
opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java | 26 +++
opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java | 192 ++++++++++++++++++---------
opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java | 2
opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java | 85 +++++++++--
8 files changed, 237 insertions(+), 156 deletions(-)
diff --git a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
index 9e9f8f1..4f6c80e 100644
--- a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
+++ b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -37,7 +37,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
@@ -59,7 +61,7 @@
import com.sun.grizzly.ssl.*;
import com.sun.grizzly.streams.StreamWriter;
import com.sun.opends.sdk.util.Validator;
-import com.sun.opends.sdk.util.StaticUtils;
+
/**
@@ -664,8 +666,6 @@
private final Object writeLock = new Object();
- private final SearchRequest pingRequest;
-
/**
* Creates a new LDAP connection.
*
@@ -676,22 +676,18 @@
* @param schema
* The schema which will be used to decode responses from the
* server.
- * @param pingRequest
- * The search request to use to verify the validity of
- * this connection.
* @param connFactory
* The associated connection factory.
*/
LDAPConnection(com.sun.grizzly.Connection<?> connection,
InetSocketAddress serverAddress, Schema schema,
- SearchRequest pingRequest, LDAPConnectionFactoryImpl connFactory)
+ LDAPConnectionFactoryImpl connFactory)
{
this.connection = connection;
this.serverAddress = serverAddress;
this.schema = schema;
this.connFactory = connFactory;
this.streamWriter = getFilterChainStreamWriter();
- this.pingRequest = pingRequest;
}
@@ -1345,17 +1341,13 @@
private void close(UnbindRequest unbindRequest,
- boolean isDisconnectNotification, Result reason)
+ boolean isDisconnectNotification, Result reason)
{
boolean notifyClose = false;
boolean notifyErrorOccurred = false;
synchronized (writeLock)
{
- if(reason == null && unbindRequest == null)
- {
- System.out.println("Something is wrong!");
- }
if (isClosed)
{
// Already closed.
@@ -1503,27 +1495,7 @@
*/
public boolean isValid()
{
- if(!isClosed && connectionInvalidReason == null)
- {
- try
- {
- search(pingRequest, null, null).get(5, TimeUnit.SECONDS);
- return true;
- }
- catch (ErrorResultException e)
- {
- StaticUtils.DEBUG_LOG.warning("Validity check returned error: " +
- e.getResult());
- }
- catch (TimeoutException e)
- {
- StaticUtils.DEBUG_LOG.warning("Validity check timed out");
- }
- catch (InterruptedException e)
- {
- }
- }
- return false;
+ return connectionInvalidReason == null && !isClosed;
}
//
//
diff --git a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
index f815431..d56a8fb 100644
--- a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
+++ b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -460,7 +460,7 @@
.getDefaultFilterChainFactory().getFilterChainPattern());
LDAPConnection ldapConnection = new LDAPConnection(connection,
- socketAddress, options.getSchema(), options.getPingRequest(), this);
+ socketAddress, options.getSchema(), this);
ldapConnectionAttr.set(connection, ldapConnection);
return ldapConnection;
}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
index d5676dd..c903cc2 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -1,9 +1,11 @@
package org.opends.sdk;
import com.sun.opends.sdk.util.Validator;
+import com.sun.opends.sdk.util.StaticUtils;
import java.util.List;
import java.util.ArrayList;
+import java.util.logging.Level;
/**
* Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:49:17
@@ -70,6 +72,15 @@
{
resultHandler.handleErrorResult(error);
}
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG
+ .warning(String
+ .format(
+ "Connection factory " + factory +
+ " is no longer operational: "
+ + error.getMessage()));
+ }
}
public void handleResult(AsynchronousConnection result)
@@ -79,6 +90,14 @@
{
resultHandler.handleResult(result);
}
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG
+ .warning(String
+ .format(
+ "Connection factory " + factory +
+ " is now operational"));
+ }
}
};
return factory.getAsynchronousConnection(handler);
@@ -100,8 +119,13 @@
for(MonitoredConnectionFactory f : factoryList)
{
if(!f.isOperational && (f.pendingConnectFuture == null ||
- f.pendingConnectFuture.isDone()))
+ f.pendingConnectFuture.isDone()))
{
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINEST))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String.format("Attempting connect on factory " + f));
+ }
f.pendingConnectFuture = f.factory.getAsynchronousConnection(f);
}
}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java b/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
index ecbc74d..9d37c3e 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
@@ -379,12 +379,6 @@
/**
* Returns true if the connection has not been closed and is still valid.
- * The implementation shall submit a search on the connection or use some
- * other mechanism that positively verifies the connection is still valid
- * when this method is called.
- *
- * The query submitted by the driver to validate the connection shall be
- * executed in the authentication context.
*
* @return {@code true} if the connection is valid, {@code false} otherwise.
*/
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java b/opendj-sdk/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
index 329715a..4fd5102 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
@@ -253,6 +253,15 @@
/**
* {@inheritDoc}
*/
+ public boolean isValid() {
+ return connection.isValid();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
public boolean isClosed()
{
return connection.isClosed();
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
index def5ef1..6a5a60d 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -132,58 +132,10 @@
}
isClosed = true;
- // Don't put closed connections back in the pool.
- if (!connection.isValid())
+ // Don't put invalid connections back in the pool.
+ if (connection.isValid())
{
- numConnections--;
- }
- else
- {
- // See if there waiters pending.
- for (;;)
- {
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
- FuturePooledConnection future = pendingFutures.poll();
-
- if (future == null)
- {
- // No waiters - so drop out and add connection to pool.
- break;
- }
-
- future.handleResult(pooledConnection);
-
- if (!future.isCancelled())
- {
- // The future was not cancelled and the connection was
- // accepted.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection released to pool and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
- }
- return;
- }
- }
-
- // No waiters. Put back in pool.
- pool.offer(connection);
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection released to pool and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
- }
+ releaseConnection(connection);
return;
}
}
@@ -191,19 +143,31 @@
// Connection is no longer valid. Close outside of lock
connection.removeConnectionEventListener(this);
connection.close();
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
StaticUtils.DEBUG_LOG
- .finest(String
+ .warning(String
.format(
- "Dead connection released to pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ "Dead connection released and closed. "
+ + "numConnections: %d, poolSize: %d, " +
+ "pendingFutures: %d",
numConnections, pool.size(), pendingFutures
.size()));
}
- return;
- }
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG
+ .warning(String
+ .format(
+ "Reconnect attempt starting. "
+ + "numConnections: %d, poolSize: %d, " +
+ "pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ connectionFactory.getAsynchronousConnection(new ReconnectHandler());
+ }
public void close(UnbindRequest request, String reason)
@@ -443,12 +407,12 @@
// careful that users of the pooled connection get a sensible
// error if they continue to use it (i.e. not an NPE or ISE).
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
StaticUtils.DEBUG_LOG
- .finest(String
+ .warning(String
.format(
- "Connection error occured: "
+ "Connection error occured and removed from pool: "
+ error.getMessage()
+ " numConnections: %d, poolSize: %d, pendingFutures: %d",
numConnections, pool.size(), pendingFutures
@@ -458,7 +422,52 @@
}
}
+ private class ReconnectHandler
+ implements ResultHandler<AsynchronousConnection>
+ {
+ public void handleErrorResult(ErrorResultException error) {
+ // The reconnect failed. Fail the connect attempt.
+ numConnections --;
+ // The reconnect failed. The underlying connection factory probably went
+ // down. Just fail all pending futures
+ synchronized (pool)
+ {
+ while(!pendingFutures.isEmpty())
+ {
+ pendingFutures.poll().handleErrorResult(error);
+ }
+ }
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG
+ .warning(String
+ .format(
+ "Reconnect failed. Failed all pending futures: "
+ + error.getMessage()
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ }
+
+ public void handleResult(AsynchronousConnection connection) {
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Reconnect succeded. "
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ synchronized (pool)
+ {
+ releaseConnection(connection);
+ }
+ }
+ }
// Future used for waiting for pooled connections to become available.
private static final class FuturePooledConnection extends
@@ -482,6 +491,54 @@
}
+ private void releaseConnection(AsynchronousConnection connection)
+ {
+ // See if there waiters pending.
+ for (;;)
+ {
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ connection);
+ FuturePooledConnection future = pendingFutures.poll();
+
+ if (future == null)
+ {
+ // No waiters - so drop out and add connection to pool.
+ break;
+ }
+
+ future.handleResult(pooledConnection);
+
+ if (!future.isCancelled())
+ {
+ // The future was not cancelled and the connection was
+ // accepted.
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection released and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
+ }
+ return;
+ }
+ }
+
+ // No waiters. Put back in pool.
+ pool.offer(connection);
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection released to pool. numConnections: %d, " +
+ "poolSize: %d, pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
+ }
+ }
+
/**
@@ -546,7 +603,7 @@
try
{
// We can create a new connection.
- conn = connectionFactory.getAsynchronousConnection(handler).get();
+ conn = connectionFactory.getAsynchronousConnection(null).get();
numConnections++;
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
@@ -561,13 +618,22 @@
}
catch (ErrorResultException e)
{
+ if (handler != null)
+ {
+ handler.handleErrorResult(e);
+ }
return new CompletedFutureResult<AsynchronousConnection>(e);
}
catch (InterruptedException e)
{
- return new CompletedFutureResult<AsynchronousConnection>(
+ ErrorResultException error =
new ErrorResultException(Responses.newResult(
- ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
+ ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
+ if (handler != null)
+ {
+ handler.handleErrorResult(error);
+ }
+ return new CompletedFutureResult<AsynchronousConnection>(error);
}
}
else
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index 3d426cd..9e95afe 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,14 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.FutureResultTransformer;
-import com.sun.opends.sdk.util.Validator;
@@ -46,10 +47,14 @@
* An heart beat connection factory can be used to create connections
* that sends a periodic search request to a Directory Server.
*/
-public class HeartBeatConnectionFactory extends
+final class HeartBeatConnectionFactory extends
AbstractConnectionFactory<AsynchronousConnection>
{
- private final int interval;
+ private final SearchRequest heartBeat;
+
+ private final long timeout;
+
+ private final TimeUnit unit;
private final List<AsynchronousConnectionImpl> activeConnections;
@@ -59,7 +64,30 @@
// FIXME: use a single global scheduler?
- // FIXME: change timeout parameters to long+TimeUnit.
+ /**
+ * Creates a new heart-beat connection factory which will create
+ * connections using the provided connection factory and periodically
+ * ping any created connections in order to detect that they are still
+ * alive.
+ *
+ * @param connectionFactory
+ * The connection factory to use for creating connections.
+ * @param timeout
+ * The time to wait between keepalive pings.
+ * @param unit
+ * The time unit of the timeout argument.
+ */
+ HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
+ long timeout, TimeUnit unit)
+ {
+ this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
+ }
+
+
+
+ private static final SearchRequest DEFAULT_SEARCH = Requests
+ .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
+ "1.1");
@@ -71,15 +99,19 @@
*
* @param connectionFactory
* The connection factory to use for creating connections.
- * @param interval
- * The period between keepalive pings.
+ * @param timeout
+ * The time to wait between keepalive pings.
+ * @param unit
+ * The time unit of the timeout argument.
+ * @param heartBeat
+ * The search request to use when pinging connections.
*/
- public HeartBeatConnectionFactory(
- ConnectionFactory<?> connectionFactory,
- int interval)
+ HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
+ long timeout, TimeUnit unit, SearchRequest heartBeat)
{
- Validator.ensureNotNull(connectionFactory);
- this.interval = interval;
+ this.heartBeat = heartBeat;
+ this.timeout = timeout;
+ this.unit = unit;
this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
this.parentFactory = connectionFactory;
@@ -93,9 +125,11 @@
* operations.
*/
private final class AsynchronousConnectionImpl implements
- AsynchronousConnection, ConnectionEventListener
+ AsynchronousConnection, ConnectionEventListener, ResultHandler<Result>
{
private final AsynchronousConnection connection;
+ private long lastSuccessfulPing;
+ private FutureResult<Result> lastPingFuture;
private AsynchronousConnectionImpl(AsynchronousConnection connection)
@@ -316,9 +350,9 @@
*/
public boolean isValid()
{
- // Avoid extra pings... Let the next ping find out if this connection
- // is still valid.
- return connection.isValid();
+ return connection.isValid() && (lastSuccessfulPing <= 0 ||
+ System.currentTimeMillis() - lastSuccessfulPing <
+ unit.toMillis(timeout) * 2);
}
public void connectionReceivedUnsolicitedNotification(
@@ -338,6 +372,15 @@
activeConnections.remove(this);
}
}
+
+ public void handleErrorResult(ErrorResultException error) {
+ connection.close(Requests.newUnbindRequest(),
+ "Heartbeat retured error: " + error);
+ }
+
+ public void handleResult(Result result) {
+ lastSuccessfulPing = System.currentTimeMillis();
+ }
}
@@ -354,22 +397,26 @@
public void run()
{
+ long startTime;
while(true)
{
+ startTime = System.currentTimeMillis();
synchronized (activeConnections)
{
for (AsynchronousConnectionImpl connection : activeConnections)
{
- if(!connection.isValid())
+ if(connection.lastPingFuture == null ||
+ connection.lastPingFuture.isDone())
{
- connection.close(Requests.newUnbindRequest(),
- "Connection no longer valid");
+ connection.lastPingFuture =
+ connection.search(heartBeat, connection, null);
}
}
}
try
{
- sleep(interval);
+ sleep(unit.toMillis(timeout) -
+ (System.currentTimeMillis() - startTime));
}
catch (InterruptedException e)
{
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java b/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
index 323a27d..1267b96 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
@@ -32,9 +32,6 @@
import javax.net.ssl.SSLContext;
import org.opends.sdk.schema.Schema;
-import org.opends.sdk.requests.SearchRequest;
-import org.opends.sdk.requests.Requests;
-import org.opends.sdk.SearchScope;
import com.sun.opends.sdk.util.Validator;
@@ -45,18 +42,12 @@
*/
public final class LDAPConnectionOptions
{
- private static final SearchRequest DEFAULT_PING = Requests
- .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
- "1.1");
-
private Schema schema = Schema.getDefaultSchema();
private SSLContext sslContext = null;
private boolean useStartTLS = false;
- private SearchRequest pingRequest = DEFAULT_PING;
-
/**
@@ -206,29 +197,7 @@
return this;
}
- /**
- * Retrieves the search request used to verify the validity of a
- * LDAP connection. By default, the root DSE is retrieved without any
- * attributes.
- *
- * @return The search request.
- */
- public SearchRequest getPingRequest()
- {
- return pingRequest;
- }
- /**
- * Sets the search request used to verify the validity of a
- * LDAP connection.
- *
- * @param pingRequest The search request that can be used to verify the
- * validity of a LDAP connection.
- */
- public void setPingRequest(SearchRequest pingRequest)
- {
- this.pingRequest = pingRequest;
- }
// Assigns the provided options to this set of options.
LDAPConnectionOptions assign(LDAPConnectionOptions options)
--
Gitblit v1.10.0