From abc1a19fd4dee9729fd0aed721575a396d249bd4 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Wed, 16 Dec 2009 22:13:34 +0000
Subject: [PATCH] Migrate remaining future impls over to new future APIs.
---
sdk/src/org/opends/sdk/ConnectionPool.java | 287 +++++++++++++++-----------------------------------------
1 files changed, 79 insertions(+), 208 deletions(-)
diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index 73aad16..e002423 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -32,15 +32,15 @@
import java.util.Collection;
import java.util.Stack;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
+import com.sun.opends.sdk.util.AbstractFutureResult;
+import com.sun.opends.sdk.util.CompletedFutureResult;
+import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.StaticUtils;
@@ -48,7 +48,7 @@
/**
* A simple connection pool implementation.
*/
-public class ConnectionPool extends
+public final class ConnectionPool extends
AbstractConnectionFactory<AsynchronousConnection>
{
private final ConnectionFactory<?> connectionFactory;
@@ -60,12 +60,33 @@
// FIXME: should use a better collection than this - CLQ?
private final Stack<AsynchronousConnection> pool;
- private final ConcurrentLinkedQueue<PendingResultFuture> pendingFutures;
+ private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
private final Object lock = new Object();
+ private final class FutureNewConnection
+ extends
+ FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
+ {
+ private FutureNewConnection(
+ ResultHandler<? super AsynchronousConnection> handler)
+ {
+ super(handler);
+ }
+
+
+
+ protected AsynchronousConnection transformResult(
+ AsynchronousConnection result) throws ErrorResultException
+ {
+ return new PooledConnectionWapper(result);
+ }
+ }
+
+
+
private final class PooledConnectionWapper implements
AsynchronousConnection, ConnectionEventListener
{
@@ -144,24 +165,37 @@
return;
}
- // See if there waiters pending
- PendingResultFuture future = pendingFutures.poll();
- if (future != null)
+ // See if there waiters pending.
+ for (;;)
{
+ FuturePooledConnection future = pendingFutures.poll();
+
+ if (future == null)
+ {
+ // No waiters - so drop out and add connection to pool.
+ break;
+ }
+
PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
connection);
- future.connection(pooledConnection);
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ future.handleResult(pooledConnection);
+
+ if (!future.isCancelled())
{
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection released to pool and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
+ // The future was not cancelled and the connection was
+ // accepted.
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection released to pool and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
+ }
+ return;
}
- return;
}
// No waiters. Put back in pool.
@@ -438,165 +472,26 @@
- private static final class CompletedResultFuture implements
- FutureResult<AsynchronousConnection>
+ // Future used for waiting for pooled connections to become available.
+ private static final class FuturePooledConnection extends
+ AbstractFutureResult<AsynchronousConnection>
{
- private final PooledConnectionWapper connection;
-
-
-
- private CompletedResultFuture(PooledConnectionWapper connection)
- {
- this.connection = connection;
- }
-
-
-
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- return false;
- }
-
-
-
- public AsynchronousConnection get() throws InterruptedException,
- ErrorResultException
- {
- return connection;
- }
-
-
-
- public AsynchronousConnection get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException,
- ErrorResultException
- {
- return connection;
- }
-
-
-
- public boolean isCancelled()
- {
- return false;
- }
-
-
-
- public boolean isDone()
- {
- return true;
- }
-
-
-
- public int getRequestID()
- {
- return -1;
- }
- }
-
-
-
- private final class PendingResultFuture implements
- FutureResult<AsynchronousConnection>
- {
- private volatile boolean isCancelled;
-
- private volatile PooledConnectionWapper connection;
-
- private volatile ErrorResultException err;
-
- private final ResultHandler<? super AsynchronousConnection> handler;
-
- private final CountDownLatch latch = new CountDownLatch(1);
-
-
-
- private PendingResultFuture(
+ private FuturePooledConnection(
ResultHandler<? super AsynchronousConnection> handler)
{
- this.handler = handler;
+ super(handler);
}
- public synchronized boolean cancel(boolean mayInterruptIfRunning)
- {
- return pendingFutures.remove(this) && (isCancelled = true);
- }
-
-
-
- public AsynchronousConnection get() throws InterruptedException,
- ErrorResultException
- {
- latch.await();
- if (err != null)
- {
- throw err;
- }
- return connection;
- }
-
-
-
- public AsynchronousConnection get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException,
- ErrorResultException
- {
- latch.await(timeout, unit);
- if (err != null)
- {
- throw err;
- }
- return connection;
- }
-
-
-
- public synchronized boolean isCancelled()
- {
- return isCancelled;
- }
-
-
-
- public boolean isDone()
- {
- return latch.getCount() == 0;
- }
-
-
-
+ /**
+ * {@inheritDoc}
+ */
public int getRequestID()
{
return -1;
}
-
-
- private void connection(PooledConnectionWapper connection)
- {
- this.connection = connection;
- if (handler != null)
- {
- handler.handleResult(connection);
- }
- latch.countDown();
- }
-
-
-
- private void error(ErrorResultException e)
- {
- this.err = e;
- if (handler != null)
- {
- handler.handleErrorResult(e);
- }
- latch.countDown();
- }
}
@@ -604,7 +499,7 @@
/**
* Creates a new connection pool which will maintain {@code poolSize}
* connections created using the provided connection factory.
- *
+ *
* @param connectionFactory
* The connection factory to use for creating new
* connections.
@@ -617,38 +512,7 @@
this.connectionFactory = connectionFactory;
this.poolSize = poolSize;
this.pool = new Stack<AsynchronousConnection>();
- this.pendingFutures = new ConcurrentLinkedQueue<PendingResultFuture>();
- }
-
-
-
- private final class WrapResultHandler implements
- ResultHandler<AsynchronousConnection>
- {
- private final PendingResultFuture future;
-
-
-
- private WrapResultHandler(PendingResultFuture future)
- {
- this.future = future;
- }
-
-
-
- public void handleResult(AsynchronousConnection connection)
- {
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
- future.connection(pooledConnection);
- }
-
-
-
- public void handleErrorResult(ErrorResultException error)
- {
- future.error(error);
- }
+ this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
}
@@ -679,19 +543,21 @@
{
handler.handleResult(pooledConnection);
}
- return new CompletedResultFuture(pooledConnection);
+ return new CompletedFutureResult<AsynchronousConnection>(
+ pooledConnection);
}
- PendingResultFuture pendingFuture = new PendingResultFuture(
- handler);
// Pool was empty. Maybe a new connection if pool size is not
// reached
if (numConnections < poolSize)
{
+ // We can create a new connection.
numConnections++;
- WrapResultHandler wrapHandler = new WrapResultHandler(
- pendingFuture);
- connectionFactory.getAsynchronousConnection(wrapHandler);
+
+ FutureNewConnection future = new FutureNewConnection(handler);
+ future.setFutureResult(connectionFactory
+ .getAsynchronousConnection(future));
+
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
StaticUtils.DEBUG_LOG
@@ -702,11 +568,16 @@
numConnections, pool.size(), pendingFutures
.size()));
}
+
+ return future;
}
else
{
- // Have to wait
- pendingFutures.add(pendingFuture);
+ // Pool is full so wait for a connection to become available.
+ FuturePooledConnection future = new FuturePooledConnection(
+ handler);
+ pendingFutures.add(future);
+
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
StaticUtils.DEBUG_LOG
@@ -717,9 +588,9 @@
numConnections, pool.size(), pendingFutures
.size()));
}
- }
- return pendingFuture;
+ return future;
+ }
}
}
}
--
Gitblit v1.10.0