From 2bc8d15a28fafab97cefafede06d6b7e738ae0fe Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 11 Dec 2009 18:45:45 +0000
Subject: [PATCH] Various incremental improvements.
---
sdk/src/org/opends/sdk/ConnectionPool.java | 551 ++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 409 insertions(+), 142 deletions(-)
diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index 5d4adba..e8c9504 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -27,6 +27,9 @@
package org.opends.sdk;
+
+
+import java.util.Collection;
import java.util.Stack;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@@ -35,95 +38,128 @@
import java.util.logging.Level;
import org.opends.sdk.requests.*;
-import org.opends.sdk.responses.BindResult;
-import org.opends.sdk.responses.CompareResult;
-import org.opends.sdk.responses.GenericExtendedResult;
-import org.opends.sdk.responses.Result;
+import org.opends.sdk.responses.*;
+import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.StaticUtils;
+
+
/**
* A simple connection pool implementation.
*/
-public class ConnectionPool
- extends AbstractConnectionFactory<AsynchronousConnection> {
+public class ConnectionPool extends
+ AbstractConnectionFactory<AsynchronousConnection>
+{
private final ConnectionFactory<?> connectionFactory;
+
private volatile int numConnections;
+
private final int poolSize;
// FIXME: should use a better collection than this - CLQ?
private final Stack<AsynchronousConnection> pool;
private final ConcurrentLinkedQueue<PendingConnectionFuture<?>> pendingFutures;
+
private final Object lock = new Object();
- private final class PooledConnectionWapper
- implements AsynchronousConnection, ConnectionEventListener {
+
+
+ private final class PooledConnectionWapper implements
+ AsynchronousConnection, ConnectionEventListener
+ {
private AsynchronousConnection connection;
- private PooledConnectionWapper(AsynchronousConnection connection) {
+
+
+ private PooledConnectionWapper(AsynchronousConnection connection)
+ {
this.connection = connection;
this.connection.addConnectionEventListener(this);
}
+
+
public void abandon(AbandonRequest request)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
connection.abandon(request);
}
- public <P> ResultFuture<Result> add(
- AddRequest request,
+
+
+ public <P> ResultFuture<Result> add(AddRequest request,
ResultHandler<Result, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
return connection.add(request, handler, p);
}
- public <P> ResultFuture<BindResult> bind(
- BindRequest request, ResultHandler<? super BindResult, P> handler, P p)
+
+
+ public <P> ResultFuture<BindResult> bind(BindRequest request,
+ ResultHandler<? super BindResult, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
return connection.bind(request, handler, p);
}
- public void close() {
- synchronized (lock) {
- try {
+
+
+ public void close()
+ {
+ synchronized (lock)
+ {
+ try
+ {
// Don't put closed connections back in the pool.
- if (connection.isClosed()) {
+ if (connection.isClosed())
+ {
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG.finest(String
- .format("Dead connection released to pool. " +
- "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Dead connection released to pool. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
}
return;
}
// See if there waiters pending
PendingConnectionFuture<?> future = pendingFutures.poll();
- if (future != null) {
- PooledConnectionWapper pooledConnection =
- new PooledConnectionWapper(connection);
+ if (future != null)
+ {
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ connection);
future.connection(pooledConnection);
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG.finest(String
- .format("Connection released to pool and directly " +
- "given to waiter. numConnections: %d, poolSize: %d, " +
- "pendingFutures: %d", numConnections, pool.size(),
- pendingFutures.size()));
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection released to pool and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
}
return;
}
@@ -132,99 +168,229 @@
pool.push(connection);
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG.finest(String
- .format("Connection released to pool. " +
- "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection released to pool. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
}
}
- finally {
+ finally
+ {
// Null out the underlying connection to prevent further use.
connection = null;
}
}
}
+
+
public void close(UnbindRequest request, String reason)
- throws NullPointerException {
+ throws NullPointerException
+ {
close();
}
+
+
public <P> ResultFuture<CompareResult> compare(
- CompareRequest request, ResultHandler<? super CompareResult, P> handler,
- P p) throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ CompareRequest request,
+ ResultHandler<? super CompareResult, P> handler, P p)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
return connection.compare(request, handler, p);
}
- public <P> ResultFuture<Result> delete(
- DeleteRequest request, ResultHandler<Result, P> handler, P p)
+
+
+ public <P> ResultFuture<Result> delete(DeleteRequest request,
+ ResultHandler<Result, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
return connection.delete(request, handler, p);
}
+
+
public <R extends Result, P> ResultFuture<R> extendedRequest(
- ExtendedRequest<R> request, ResultHandler<? super R, P> handler, P p)
+ ExtendedRequest<R> request,
+ ResultHandler<? super R, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
return connection.extendedRequest(request, handler, p);
}
- public <P> ResultFuture<Result> modify(
- ModifyRequest request, ResultHandler<Result, P> handler, P p)
+
+
+ public <P> ResultFuture<Result> modify(ModifyRequest request,
+ ResultHandler<Result, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
return connection.modify(request, handler, p);
}
- public <P> ResultFuture<Result> modifyDN(
- ModifyDNRequest request, ResultHandler<Result, P> handler, P p)
+
+
+ public <P> ResultFuture<Result> modifyDN(ModifyDNRequest request,
+ ResultHandler<Result, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
return connection.modifyDN(request, handler, p);
}
- public <P> ResultFuture<Result> search(
- SearchRequest request, ResultHandler<Result, P> resultHandler,
+
+
+ public <P> ResultFuture<Result> search(SearchRequest request,
+ ResultHandler<Result, P> resultHandler,
SearchResultHandler<P> searchResulthandler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- if (connection == null) {
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
- return connection.search(request, resultHandler, searchResulthandler, p);
+ return connection.search(request, resultHandler,
+ searchResulthandler, p);
}
- public void addConnectionEventListener(ConnectionEventListener listener)
- throws IllegalStateException, NullPointerException {
- if (connection == null) {
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<SearchResultEntry> readEntry(DN name,
+ Collection<String> attributeDescriptions,
+ ResultHandler<? super SearchResultEntry, P> resultHandler, P p)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (connection == null)
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readEntry(name, attributeDescriptions,
+ resultHandler, p);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<SearchResultEntry> searchSingleEntry(
+ SearchRequest request,
+ ResultHandler<? super SearchResultEntry, P> resultHandler, P p)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ if (connection == null)
+ {
+ throw new IllegalStateException();
+ }
+ return connection.searchSingleEntry(request, resultHandler, p);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<RootDSE> readRootDSE(
+ ResultHandler<RootDSE, P> handler, P p)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ if (connection == null)
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readRootDSE(handler, p);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<Schema> readSchemaForEntry(DN name,
+ ResultHandler<Schema, P> handler, P p)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ if (connection == null)
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readSchemaForEntry(name, handler, p);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<Schema> readSchema(DN name,
+ ResultHandler<Schema, P> handler, P p)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ if (connection == null)
+ {
+ throw new IllegalStateException();
+ }
+ return connection.readSchema(name, handler, p);
+ }
+
+
+
+ public void addConnectionEventListener(
+ ConnectionEventListener listener) throws IllegalStateException,
+ NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
}
- public void removeConnectionEventListener(ConnectionEventListener listener)
- throws NullPointerException {
- if (connection == null) {
+
+
+ public void removeConnectionEventListener(
+ ConnectionEventListener listener) throws NullPointerException
+ {
+ if (connection == null)
+ {
throw new IllegalStateException();
}
}
+
+
/**
* {@inheritDoc}
*/
@@ -233,14 +399,21 @@
return connection == null;
}
+
+
public void connectionReceivedUnsolicitedNotification(
- GenericExtendedResult notification) {
+ GenericExtendedResult notification)
+ {
// Ignore
}
+
+
public void connectionErrorOccurred(
- boolean isDisconnectNotification, ErrorResultException error) {
- synchronized (lock) {
+ boolean isDisconnectNotification, ErrorResultException error)
+ {
+ synchronized (lock)
+ {
// Remove this connection from the pool if its in there
pool.remove(this);
numConnections--;
@@ -252,104 +425,166 @@
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG.finest(String
- .format("Connection error occured: " + error.getMessage() +
- " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection error occured: "
+ + error.getMessage()
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
}
}
}
}
- private static final class CompletedConnectionFuture
- implements ConnectionFuture<AsynchronousConnection> {
+
+
+ private static final class CompletedConnectionFuture implements
+ ConnectionFuture<AsynchronousConnection>
+ {
private final PooledConnectionWapper connection;
- private CompletedConnectionFuture(PooledConnectionWapper connection) {
+
+
+ private CompletedConnectionFuture(PooledConnectionWapper connection)
+ {
this.connection = connection;
}
- public boolean cancel(boolean mayInterruptIfRunning) {
+
+
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
return false;
}
- public AsynchronousConnection get()
- throws InterruptedException, ErrorResultException {
+
+
+ public AsynchronousConnection get() throws InterruptedException,
+ ErrorResultException
+ {
return connection;
}
+
+
public AsynchronousConnection get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException, ErrorResultException {
+ throws InterruptedException, TimeoutException,
+ ErrorResultException
+ {
return connection;
}
- public boolean isCancelled() {
+
+
+ public boolean isCancelled()
+ {
return false;
}
- public boolean isDone() {
+
+
+ public boolean isDone()
+ {
return true;
}
}
- private final class PendingConnectionFuture<P>
- implements ConnectionFuture<AsynchronousConnection> {
+
+
+ private final class PendingConnectionFuture<P> implements
+ ConnectionFuture<AsynchronousConnection>
+ {
private volatile boolean isCancelled;
+
private volatile PooledConnectionWapper connection;
+
private volatile ErrorResultException err;
- private final ConnectionResultHandler<? super AsynchronousConnection, P>
- handler;
+
+ private final ConnectionResultHandler<? super AsynchronousConnection, P> handler;
+
private final P p;
+
private final CountDownLatch latch = new CountDownLatch(1);
+
+
private PendingConnectionFuture(
P p,
- ConnectionResultHandler<? super AsynchronousConnection, P> handler) {
+ ConnectionResultHandler<? super AsynchronousConnection, P> handler)
+ {
this.handler = handler;
this.p = p;
}
- public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+
+
+ public synchronized boolean cancel(boolean mayInterruptIfRunning)
+ {
return pendingFutures.remove(this) && (isCancelled = true);
}
- public AsynchronousConnection get()
- throws InterruptedException, ErrorResultException {
+
+
+ public AsynchronousConnection get() throws InterruptedException,
+ ErrorResultException
+ {
latch.await();
- if (err != null) {
+ if (err != null)
+ {
throw err;
}
return connection;
}
+
+
public AsynchronousConnection get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException, ErrorResultException {
+ throws InterruptedException, TimeoutException,
+ ErrorResultException
+ {
latch.await(timeout, unit);
- if (err != null) {
+ if (err != null)
+ {
throw err;
}
return connection;
}
- public synchronized boolean isCancelled() {
+
+
+ public synchronized boolean isCancelled()
+ {
return isCancelled;
}
- public boolean isDone() {
+
+
+ public boolean isDone()
+ {
return latch.getCount() == 0;
}
- private void connection(PooledConnectionWapper connection) {
+
+
+ private void connection(PooledConnectionWapper connection)
+ {
this.connection = connection;
- if (handler != null) {
+ if (handler != null)
+ {
handler.handleConnection(p, connection);
}
latch.countDown();
}
- private void error(ErrorResultException e) {
+
+
+ private void error(ErrorResultException e)
+ {
this.err = e;
- if (handler != null) {
+ if (handler != null)
+ {
handler.handleConnectionError(p, e);
}
latch.countDown();
@@ -368,83 +603,115 @@
* @param poolSize
* The maximum size of the connection pool.
*/
- public ConnectionPool(ConnectionFactory<?> connectionFactory, int poolSize) {
+ public ConnectionPool(ConnectionFactory<?> connectionFactory,
+ int poolSize)
+ {
this.connectionFactory = connectionFactory;
this.poolSize = poolSize;
this.pool = new Stack<AsynchronousConnection>();
this.pendingFutures = new ConcurrentLinkedQueue<PendingConnectionFuture<?>>();
}
- private final class WrapConnectionResultHandler
- implements ConnectionResultHandler<AsynchronousConnection, Void> {
+
+
+ private final class WrapConnectionResultHandler implements
+ ConnectionResultHandler<AsynchronousConnection, Void>
+ {
private final PendingConnectionFuture<?> future;
- private WrapConnectionResultHandler(PendingConnectionFuture<?> future) {
+
+
+ private WrapConnectionResultHandler(
+ PendingConnectionFuture<?> future)
+ {
this.future = future;
}
- public void handleConnection(
- java.lang.Void p,
- AsynchronousConnection connection) {
- PooledConnectionWapper pooledConnection =
- new PooledConnectionWapper(connection);
+
+
+ public void handleConnection(java.lang.Void p,
+ AsynchronousConnection connection)
+ {
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ connection);
future.connection(pooledConnection);
}
- public void handleConnectionError(
- java.lang.Void p,
- ErrorResultException error) {
+
+
+ public void handleConnectionError(java.lang.Void p,
+ ErrorResultException error)
+ {
future.error(error);
}
}
+
+
public <P> ConnectionFuture<AsynchronousConnection> getAsynchronousConnection(
- ConnectionResultHandler<? super AsynchronousConnection, P> handler, P p) {
- synchronized (lock) {
+ ConnectionResultHandler<? super AsynchronousConnection, P> handler,
+ P p)
+ {
+ synchronized (lock)
+ {
// Check to see if we have a connection in the pool
-
- if (!pool.isEmpty()) {
+ if (!pool.isEmpty())
+ {
AsynchronousConnection conn = pool.pop();
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG.finest(String
- .format("Connection aquired from pool. " +
- "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection aquired from pool. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
}
- PooledConnectionWapper pooledConnection =
- new PooledConnectionWapper(conn);
- if (handler != null) {
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ conn);
+ if (handler != null)
+ {
handler.handleConnection(p, pooledConnection);
}
return new CompletedConnectionFuture(pooledConnection);
}
- PendingConnectionFuture<P> pendingFuture =
- new PendingConnectionFuture<P>(p, handler);
- // Pool was empty. Maybe a new connection if pool size is not reached
- if (numConnections < poolSize) {
+ PendingConnectionFuture<P> pendingFuture = new PendingConnectionFuture<P>(
+ p, handler);
+ // Pool was empty. Maybe a new connection if pool size is not
+ // reached
+ if (numConnections < poolSize)
+ {
numConnections++;
- WrapConnectionResultHandler wrapHandler =
- new WrapConnectionResultHandler(pendingFuture);
+ WrapConnectionResultHandler wrapHandler = new WrapConnectionResultHandler(
+ pendingFuture);
connectionFactory.getAsynchronousConnection(wrapHandler, null);
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG.finest(String
- .format("New connection established and aquired. " +
- "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "New connection established and aquired. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
}
- } else {
+ }
+ else
+ {
// Have to wait
pendingFutures.add(pendingFuture);
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG.finest(String
- .format("No connections available. Wait-listed" +
- "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "No connections available. Wait-listed"
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
}
}
--
Gitblit v1.10.0