From 9d6f5c9a5b7771e595892e6935cf1cc42012c4c6 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 21:37:36 +0000
Subject: [PATCH] Fixed some bugs with the connection pool. Moved heart beat back to heart beat connection factory from isValid method.
---
sdk/src/org/opends/sdk/ConnectionPool.java | 192 ++++++++++++++++++++++++++++++++---------------
1 files changed, 129 insertions(+), 63 deletions(-)
diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index def5ef1..6a5a60d 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -132,58 +132,10 @@
}
isClosed = true;
- // Don't put closed connections back in the pool.
- if (!connection.isValid())
+ // Don't put invalid connections back in the pool.
+ if (connection.isValid())
{
- numConnections--;
- }
- else
- {
- // See if there waiters pending.
- for (;;)
- {
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
- FuturePooledConnection future = pendingFutures.poll();
-
- if (future == null)
- {
- // No waiters - so drop out and add connection to pool.
- break;
- }
-
- future.handleResult(pooledConnection);
-
- if (!future.isCancelled())
- {
- // The future was not cancelled and the connection was
- // accepted.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection released to pool and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
- }
- return;
- }
- }
-
- // No waiters. Put back in pool.
- pool.offer(connection);
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection released to pool and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
- }
+ releaseConnection(connection);
return;
}
}
@@ -191,19 +143,31 @@
// Connection is no longer valid. Close outside of lock
connection.removeConnectionEventListener(this);
connection.close();
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
StaticUtils.DEBUG_LOG
- .finest(String
+ .warning(String
.format(
- "Dead connection released to pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ "Dead connection released and closed. "
+ + "numConnections: %d, poolSize: %d, " +
+ "pendingFutures: %d",
numConnections, pool.size(), pendingFutures
.size()));
}
- return;
- }
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG
+ .warning(String
+ .format(
+ "Reconnect attempt starting. "
+ + "numConnections: %d, poolSize: %d, " +
+ "pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ connectionFactory.getAsynchronousConnection(new ReconnectHandler());
+ }
public void close(UnbindRequest request, String reason)
@@ -443,12 +407,12 @@
// careful that users of the pooled connection get a sensible
// error if they continue to use it (i.e. not an NPE or ISE).
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
StaticUtils.DEBUG_LOG
- .finest(String
+ .warning(String
.format(
- "Connection error occured: "
+ "Connection error occured and removed from pool: "
+ error.getMessage()
+ " numConnections: %d, poolSize: %d, pendingFutures: %d",
numConnections, pool.size(), pendingFutures
@@ -458,7 +422,52 @@
}
}
+ private class ReconnectHandler
+ implements ResultHandler<AsynchronousConnection>
+ {
+ public void handleErrorResult(ErrorResultException error) {
+ // The reconnect failed. Fail the connect attempt.
+ numConnections --;
+ // The reconnect failed. The underlying connection factory probably went
+ // down. Just fail all pending futures
+ synchronized (pool)
+ {
+ while(!pendingFutures.isEmpty())
+ {
+ pendingFutures.poll().handleErrorResult(error);
+ }
+ }
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG
+ .warning(String
+ .format(
+ "Reconnect failed. Failed all pending futures: "
+ + error.getMessage()
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ }
+
+ public void handleResult(AsynchronousConnection connection) {
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Reconnect succeded. "
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ synchronized (pool)
+ {
+ releaseConnection(connection);
+ }
+ }
+ }
// Future used for waiting for pooled connections to become available.
private static final class FuturePooledConnection extends
@@ -482,6 +491,54 @@
}
+ private void releaseConnection(AsynchronousConnection connection)
+ {
+ // See if there waiters pending.
+ for (;;)
+ {
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ connection);
+ FuturePooledConnection future = pendingFutures.poll();
+
+ if (future == null)
+ {
+ // No waiters - so drop out and add connection to pool.
+ break;
+ }
+
+ future.handleResult(pooledConnection);
+
+ if (!future.isCancelled())
+ {
+ // The future was not cancelled and the connection was
+ // accepted.
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection released and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
+ }
+ return;
+ }
+ }
+
+ // No waiters. Put back in pool.
+ pool.offer(connection);
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection released to pool. numConnections: %d, " +
+ "poolSize: %d, pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
+ }
+ }
+
/**
@@ -546,7 +603,7 @@
try
{
// We can create a new connection.
- conn = connectionFactory.getAsynchronousConnection(handler).get();
+ conn = connectionFactory.getAsynchronousConnection(null).get();
numConnections++;
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
@@ -561,13 +618,22 @@
}
catch (ErrorResultException e)
{
+ if (handler != null)
+ {
+ handler.handleErrorResult(e);
+ }
return new CompletedFutureResult<AsynchronousConnection>(e);
}
catch (InterruptedException e)
{
- return new CompletedFutureResult<AsynchronousConnection>(
+ ErrorResultException error =
new ErrorResultException(Responses.newResult(
- ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
+ ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
+ if (handler != null)
+ {
+ handler.handleErrorResult(error);
+ }
+ return new CompletedFutureResult<AsynchronousConnection>(error);
}
}
else
--
Gitblit v1.10.0