From 1fbc9df5d0c44ae72c76dacc3c945d4f0d641ee6 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 05:55:54 +0000
Subject: [PATCH] Initial implementation of fail over connection factory
---
sdk/src/org/opends/sdk/ConnectionPool.java | 308 +++++++++++++++++++++++++-------------------------
1 files changed, 155 insertions(+), 153 deletions(-)
diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index dc22fae..def5ef1 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -31,7 +31,9 @@
import java.util.Collection;
import java.util.Stack;
+import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -58,40 +60,17 @@
private final int poolSize;
// FIXME: should use a better collection than this - CLQ?
- private final Stack<AsynchronousConnection> pool;
+ private final Queue<AsynchronousConnection> pool;
private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
- private final Object lock = new Object();
-
-
-
- private final class FutureNewConnection
- extends
- FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
- {
- private FutureNewConnection(
- ResultHandler<? super AsynchronousConnection> handler)
- {
- super(handler);
- }
-
-
-
- protected AsynchronousConnection transformResult(
- AsynchronousConnection result) throws ErrorResultException
- {
- return new PooledConnectionWapper(result);
- }
- }
-
private final class PooledConnectionWapper implements
AsynchronousConnection, ConnectionEventListener
{
- private AsynchronousConnection connection;
-
+ private final AsynchronousConnection connection;
+ private volatile boolean isClosed;
private PooledConnectionWapper(AsynchronousConnection connection)
@@ -106,7 +85,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -120,7 +99,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -134,7 +113,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -145,29 +124,26 @@
public void close()
{
- synchronized (lock)
+ synchronized (pool)
{
- try
+ if(isClosed)
{
- // Don't put closed connections back in the pool.
- if (connection.isClosed())
- {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Dead connection released to pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
- }
- return;
- }
+ return;
+ }
+ isClosed = true;
+ // Don't put closed connections back in the pool.
+ if (!connection.isValid())
+ {
+ numConnections--;
+ }
+ else
+ {
// See if there waiters pending.
for (;;)
{
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ connection);
FuturePooledConnection future = pendingFutures.poll();
if (future == null)
@@ -176,8 +152,6 @@
break;
}
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
future.handleResult(pooledConnection);
if (!future.isCancelled())
@@ -189,34 +163,45 @@
StaticUtils.DEBUG_LOG
.finest(String
.format(
- "Connection released to pool and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
+ "Connection released to pool and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
}
return;
}
}
// No waiters. Put back in pool.
- pool.push(connection);
+ pool.offer(connection);
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
StaticUtils.DEBUG_LOG
.finest(String
.format(
- "Connection released to pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
+ "Connection released to pool and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
}
- }
- finally
- {
- // Null out the underlying connection to prevent further use.
- connection = null;
+ return;
}
}
+
+ // Connection is no longer valid. Close outside of lock
+ connection.removeConnectionEventListener(this);
+ connection.close();
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Dead connection released to pool. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ return;
}
@@ -234,7 +219,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -248,7 +233,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -262,7 +247,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -276,7 +261,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -290,7 +275,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -305,7 +290,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -324,7 +309,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -343,7 +328,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -359,7 +344,7 @@
ResultHandler<RootDSE> handler)
throws UnsupportedOperationException, IllegalStateException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -375,7 +360,7 @@
ResultHandler<Schema> handler)
throws UnsupportedOperationException, IllegalStateException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -391,7 +376,7 @@
ResultHandler<Schema> handler)
throws UnsupportedOperationException, IllegalStateException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -404,7 +389,7 @@
ConnectionEventListener listener) throws IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -415,7 +400,7 @@
public void removeConnectionEventListener(
ConnectionEventListener listener) throws NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -428,10 +413,13 @@
*/
public boolean isClosed()
{
- return connection == null;
+ return isClosed;
}
-
+ public boolean isValid()
+ {
+ return !isClosed && connection.isValid();
+ }
public void connectionReceivedUnsolicitedNotification(
GenericExtendedResult notification)
@@ -444,10 +432,10 @@
public void connectionErrorOccurred(
boolean isDisconnectNotification, ErrorResultException error)
{
- synchronized (lock)
+ // Remove this connection from the pool if its in there. If not, just
+ // ignore and wait for the user to close and we can deal with it there.
+ if(pool.remove(this))
{
- // Remove this connection from the pool if its in there
- pool.remove(this);
numConnections--;
connection.removeConnectionEventListener(this);
@@ -460,11 +448,11 @@
StaticUtils.DEBUG_LOG
.finest(String
.format(
- "Connection error occured: "
- + error.getMessage()
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
+ "Connection error occured: "
+ + error.getMessage()
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
}
}
}
@@ -510,86 +498,100 @@
{
this.connectionFactory = connectionFactory;
this.poolSize = poolSize;
- this.pool = new Stack<AsynchronousConnection>();
+ this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
}
- public FutureResult<AsynchronousConnection> getAsynchronousConnection(
- ResultHandler<? super AsynchronousConnection> handler)
+ public synchronized FutureResult<AsynchronousConnection>
+ getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler)
{
- synchronized (lock)
+ // This entire method is synchronized to ensure new connects are done
+ // synchronously to avoid the "pending connect" case.
+ AsynchronousConnection conn;
+ synchronized(pool)
{
// Check to see if we have a connection in the pool
-
- if (!pool.isEmpty())
+ conn = pool.poll();
+ if (conn == null)
{
- AsynchronousConnection conn = pool.pop();
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ // Pool was empty. Maybe a new connection if pool size is not
+ // reached
+ if (numConnections >= poolSize)
{
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection aquired from pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
+ // We reached max # of conns so wait for a connection to become available.
+ FuturePooledConnection future = new FuturePooledConnection(
+ handler);
+ pendingFutures.add(future);
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "No connections available. Wait-listed"
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+
+ return future;
}
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- conn);
- if (handler != null)
- {
- handler.handleResult(pooledConnection);
- }
- return new CompletedFutureResult<AsynchronousConnection>(
- pooledConnection);
- }
-
- // Pool was empty. Maybe a new connection if pool size is not
- // reached
- if (numConnections < poolSize)
- {
- // We can create a new connection.
- numConnections++;
-
- FutureNewConnection future = new FutureNewConnection(handler);
- future.setFutureResult(connectionFactory
- .getAsynchronousConnection(future));
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "New connection established and aquired. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
- }
-
- return future;
- }
- else
- {
- // Pool is full so wait for a connection to become available.
- FuturePooledConnection future = new FuturePooledConnection(
- handler);
- pendingFutures.add(future);
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "No connections available. Wait-listed"
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
- }
-
- return future;
}
}
+
+ if(conn == null)
+ {
+ try
+ {
+ // We can create a new connection.
+ conn = connectionFactory.getAsynchronousConnection(handler).get();
+ numConnections++;
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "New connection established and aquired. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ }
+ catch (ErrorResultException e)
+ {
+ return new CompletedFutureResult<AsynchronousConnection>(e);
+ }
+ catch (InterruptedException e)
+ {
+ return new CompletedFutureResult<AsynchronousConnection>(
+ new ErrorResultException(Responses.newResult(
+ ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
+ }
+ }
+ else
+ {
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection aquired from pool. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ }
+
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ conn);
+ if (handler != null)
+ {
+ handler.handleResult(pooledConnection);
+ }
+ return new CompletedFutureResult<AsynchronousConnection>(
+ pooledConnection);
+
}
}
--
Gitblit v1.10.0