From 32034d853f3a284424ccfa87b6de210f1ca814e1 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 29 Nov 2011 00:31:21 +0000
Subject: [PATCH] Fix OPENDJ-43 (Synchronous Connection decorator implementations should not use AsynchronousConnections) and OPENDJ-328 (Make it easier to implement connection decorators).
---
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java | 610 +++++++++++++++++++++++++++++++------------------------
1 files changed, 343 insertions(+), 267 deletions(-)
diff --git a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
index f20244f..6b2be72 100644
--- a/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
+++ b/opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/FixedConnectionPool.java
@@ -35,15 +35,21 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
+import org.forgerock.i18n.LocalizedIllegalArgumentException;
import org.forgerock.opendj.ldap.requests.*;
import org.forgerock.opendj.ldap.responses.*;
+import org.forgerock.opendj.ldif.ConnectionEntryReader;
-import com.forgerock.opendj.util.*;
+import com.forgerock.opendj.util.AsynchronousFutureResult;
+import com.forgerock.opendj.util.CompletedFutureResult;
+import com.forgerock.opendj.util.StaticUtils;
+import com.forgerock.opendj.util.Validator;
@@ -51,8 +57,7 @@
* A simple connection pool implementation which maintains a fixed number of
* connections.
*/
-final class FixedConnectionPool extends AbstractConnectionFactory implements
- ConnectionPool
+final class FixedConnectionPool implements ConnectionPool
{
/**
@@ -60,7 +65,7 @@
* the pool completes.
*/
private final class ConnectionResultHandler implements
- ResultHandler<AsynchronousConnection>
+ ResultHandler<Connection>
{
/**
* {@inheritDoc}
@@ -102,7 +107,7 @@
* {@inheritDoc}
*/
@Override
- public void handleResult(final AsynchronousConnection connection)
+ public void handleResult(final Connection connection)
{
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
@@ -124,77 +129,76 @@
* the client application closes this connection. More specifically, pooled
* connections are not actually stored in the internal queue.
*/
- private final class PooledConnection implements AsynchronousConnection
+ private final class PooledConnection implements Connection
{
// Connection event listeners registed against this pooled connection should
// have the same life time as the pooled connection.
private final List<ConnectionEventListener> listeners =
- new CopyOnWriteArrayList<ConnectionEventListener>();
+ new CopyOnWriteArrayList<ConnectionEventListener>();
- private final AsynchronousConnection connection;
+ private final Connection connection;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- PooledConnection(final AsynchronousConnection connection)
+ PooledConnection(final Connection connection)
{
this.connection = connection;
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Void> abandon(final AbandonRequest request)
+ public FutureResult<Void> abandonAsync(final AbandonRequest request)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.abandon(request);
+ return checkState().abandonAsync(request);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> add(final AddRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
+ public Result add(final AddRequest request) throws ErrorResultException,
+ InterruptedException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.add(request, handler);
+ return checkState().add(request);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> add(final AddRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
+ public Result add(final Entry entry) throws ErrorResultException,
+ InterruptedException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
+ {
+ return checkState().add(entry);
+ }
+
+
+
+ @Override
+ public Result add(final String... ldifLines) throws ErrorResultException,
+ InterruptedException, UnsupportedOperationException,
+ LocalizedIllegalArgumentException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().add(ldifLines);
+ }
+
+
+
+ @Override
+ public FutureResult<Result> addAsync(final AddRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection
- .add(request, resultHandler, intermediateResponseHandler);
+ return checkState().addAsync(request, intermediateResponseHandler,
+ resultHandler);
}
@@ -208,49 +212,43 @@
NullPointerException
{
Validator.ensureNotNull(listener);
- if (isClosed())
- {
- throw new IllegalStateException();
- }
+ checkState();
listeners.add(listener);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<BindResult> bind(final BindRequest request,
- final ResultHandler<? super BindResult> handler)
- throws UnsupportedOperationException, IllegalStateException,
+ public BindResult bind(final BindRequest request)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.bind(request, handler);
+ return checkState().bind(request);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<BindResult> bind(final BindRequest request,
- final ResultHandler<? super BindResult> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
+ public BindResult bind(final String name, final char[] password)
+ throws ErrorResultException, InterruptedException,
+ LocalizedIllegalArgumentException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
+ {
+ return checkState().bind(name, password);
+ }
+
+
+
+ @Override
+ public FutureResult<BindResult> bindAsync(final BindRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super BindResult> resultHandler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.bind(request, resultHandler,
- intermediateResponseHandler);
+ return checkState().bindAsync(request, intermediateResponseHandler,
+ resultHandler);
}
@@ -282,7 +280,7 @@
connection.close();
// Try to get a new connection to replace it.
- factory.getAsynchronousConnection(connectionResultHandler);
+ factory.getConnectionAsync(connectionResultHandler);
if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
@@ -308,128 +306,122 @@
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<CompareResult> compare(final CompareRequest request,
- final ResultHandler<? super CompareResult> handler)
- throws UnsupportedOperationException, IllegalStateException,
+ public CompareResult compare(final CompareRequest request)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.compare(request, handler);
+ return checkState().compare(request);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<CompareResult> compare(final CompareRequest request,
- final ResultHandler<? super CompareResult> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
+ public CompareResult compare(final String name,
+ final String attributeDescription, final String assertionValue)
+ throws ErrorResultException, InterruptedException,
+ LocalizedIllegalArgumentException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.compare(request, resultHandler,
- intermediateResponseHandler);
+ return checkState().compare(name, attributeDescription, assertionValue);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> delete(final DeleteRequest request,
- final ResultHandler<? super Result> handler)
+ public FutureResult<CompareResult> compareAsync(
+ final CompareRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super CompareResult> resultHandler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.delete(request, handler);
+ return checkState().compareAsync(request, intermediateResponseHandler,
+ resultHandler);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> delete(final DeleteRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
+ public Result delete(final DeleteRequest request)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.delete(request, resultHandler,
- intermediateResponseHandler);
+ return checkState().delete(request);
}
- /**
- * {@inheritDoc}
- */
@Override
- public <R extends ExtendedResult> FutureResult<R> extendedRequest(
- final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
- throws UnsupportedOperationException, IllegalStateException,
+ public Result delete(final String name) throws ErrorResultException,
+ InterruptedException, LocalizedIllegalArgumentException,
+ UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.extendedRequest(request, handler);
+ return checkState().delete(name);
}
- /**
- * {@inheritDoc}
- */
@Override
- public <R extends ExtendedResult> FutureResult<R> extendedRequest(
+ public FutureResult<Result> deleteAsync(final DeleteRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().deleteAsync(request, intermediateResponseHandler,
+ resultHandler);
+ }
+
+
+
+ @Override
+ public <R extends ExtendedResult> R extendedRequest(
+ final ExtendedRequest<R> request) throws ErrorResultException,
+ InterruptedException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
+ {
+ return checkState().extendedRequest(request);
+ }
+
+
+
+ @Override
+ public <R extends ExtendedResult> R extendedRequest(
final ExtendedRequest<R> request,
- final ResultHandler<? super R> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
+ final IntermediateResponseHandler handler) throws ErrorResultException,
+ InterruptedException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.extendedRequest(request, resultHandler,
- intermediateResponseHandler);
+ return checkState().extendedRequest(request, handler);
}
- /**
- * {@inheritDoc}
- */
@Override
- public Connection getSynchronousConnection()
+ public GenericExtendedResult extendedRequest(final String requestName,
+ final ByteString requestValue) throws ErrorResultException,
+ InterruptedException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
{
- return new SynchronousConnection(this);
+ return checkState().extendedRequest(requestName, requestValue);
+ }
+
+
+
+ @Override
+ public <R extends ExtendedResult> FutureResult<R> extendedRequestAsync(
+ final ExtendedRequest<R> request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super R> resultHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().extendedRequestAsync(request,
+ intermediateResponseHandler, resultHandler);
}
@@ -456,97 +448,107 @@
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> modify(final ModifyRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
+ public Result modify(final ModifyRequest request)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modify(request, handler);
+ return checkState().modify(request);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> modify(final ModifyRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
+ public Result modify(final String... ldifLines)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, LocalizedIllegalArgumentException,
+ IllegalStateException, NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modify(request, resultHandler,
- intermediateResponseHandler);
+ return checkState().modify(ldifLines);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> modifyDN(final ModifyDNRequest request,
- final ResultHandler<? super Result> handler)
+ public FutureResult<Result> modifyAsync(final ModifyRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modifyDN(request, handler);
+ return checkState().modifyAsync(request, intermediateResponseHandler,
+ resultHandler);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> modifyDN(final ModifyDNRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
+ public Result modifyDN(final ModifyDNRequest request)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.modifyDN(request, resultHandler,
- intermediateResponseHandler);
+ return checkState().modifyDN(request);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<SearchResultEntry> readEntry(final DN name,
+ public Result modifyDN(final String name, final String newRDN)
+ throws ErrorResultException, LocalizedIllegalArgumentException,
+ InterruptedException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
+ {
+ return checkState().modifyDN(name, newRDN);
+ }
+
+
+
+ @Override
+ public FutureResult<Result> modifyDNAsync(final ModifyDNRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final ResultHandler<? super Result> resultHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().modifyDNAsync(request, intermediateResponseHandler,
+ resultHandler);
+ }
+
+
+
+ @Override
+ public SearchResultEntry readEntry(final DN name,
+ final String... attributeDescriptions) throws ErrorResultException,
+ InterruptedException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
+ {
+ return checkState().readEntry(name, attributeDescriptions);
+ }
+
+
+
+ @Override
+ public SearchResultEntry readEntry(final String name,
+ final String... attributeDescriptions) throws ErrorResultException,
+ InterruptedException, LocalizedIllegalArgumentException,
+ UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().readEntry(name, attributeDescriptions);
+ }
+
+
+
+ @Override
+ public FutureResult<SearchResultEntry> readEntryAsync(final DN name,
final Collection<String> attributeDescriptions,
- final ResultHandler<? super SearchResultEntry> resultHandler)
+ final ResultHandler<? super SearchResultEntry> handler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.readEntry(name, attributeDescriptions, resultHandler);
+ return checkState().readEntryAsync(name, attributeDescriptions, handler);
}
@@ -559,68 +561,118 @@
final ConnectionEventListener listener) throws NullPointerException
{
Validator.ensureNotNull(listener);
- if (isClosed())
- {
- throw new IllegalStateException();
- }
+ checkState();
listeners.remove(listener);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> search(final SearchRequest request,
- final SearchResultHandler handler)
+ public ConnectionEntryReader search(final SearchRequest request,
+ final BlockingQueue<Response> entries)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.search(request, handler);
+ return checkState().search(request, entries);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<Result> search(final SearchRequest request,
- final SearchResultHandler resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
+ public Result search(final SearchRequest request,
+ final Collection<? super SearchResultEntry> entries)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.search(request, resultHandler,
- intermediateResponseHandler);
+ return checkState().search(request, entries);
}
- /**
- * {@inheritDoc}
- */
@Override
- public FutureResult<SearchResultEntry> searchSingleEntry(
+ public Result search(final SearchRequest request,
+ final Collection<? super SearchResultEntry> entries,
+ final Collection<? super SearchResultReference> references)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().search(request, entries, references);
+ }
+
+
+
+ @Override
+ public Result search(final SearchRequest request,
+ final SearchResultHandler handler) throws ErrorResultException,
+ InterruptedException, UnsupportedOperationException,
+ IllegalStateException, NullPointerException
+ {
+ return checkState().search(request, handler);
+ }
+
+
+
+ @Override
+ public ConnectionEntryReader search(final String baseObject,
+ final SearchScope scope, final String filter,
+ final String... attributeDescriptions)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().search(baseObject, scope, filter,
+ attributeDescriptions);
+ }
+
+
+
+ @Override
+ public FutureResult<Result> searchAsync(final SearchRequest request,
+ final IntermediateResponseHandler intermediateResponseHandler,
+ final SearchResultHandler resultHandler)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().searchAsync(request, intermediateResponseHandler,
+ resultHandler);
+ }
+
+
+
+ @Override
+ public SearchResultEntry searchSingleEntry(final SearchRequest request)
+ throws ErrorResultException, InterruptedException,
+ UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().searchSingleEntry(request);
+ }
+
+
+
+ @Override
+ public SearchResultEntry searchSingleEntry(final String baseObject,
+ final SearchScope scope, final String filter,
+ final String... attributeDescriptions) throws ErrorResultException,
+ InterruptedException, LocalizedIllegalArgumentException,
+ UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return checkState().searchSingleEntry(baseObject, scope, filter,
+ attributeDescriptions);
+ }
+
+
+
+ @Override
+ public FutureResult<SearchResultEntry> searchSingleEntryAsync(
final SearchRequest request,
- final ResultHandler<? super SearchResultEntry> resultHandler)
+ final ResultHandler<? super SearchResultEntry> handler)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (isClosed())
- {
- throw new IllegalStateException();
- }
- return connection.searchSingleEntry(request, resultHandler);
+ return checkState().searchSingleEntryAsync(request, handler);
}
@@ -637,14 +689,27 @@
builder.append(')');
return builder.toString();
}
+
+
+
+ // Checks that this pooled connection has not been closed.
+ private Connection checkState()
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException();
+ }
+ return connection;
+ }
+
}
/**
* A queue element is either a pending connection request future awaiting an
- * {@code AsynchronousConnection} or it is an unused
- * {@code AsynchronousConnection} awaiting a connection request.
+ * {@code Connection} or it is an unused {@code Connection} awaiting a
+ * connection request.
*/
private static final class QueueElement
{
@@ -652,16 +717,16 @@
- QueueElement(final AsynchronousConnection connection)
+ QueueElement(final Connection connection)
{
this.value = connection;
}
- QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
+ QueueElement(final ResultHandler<? super Connection> handler)
{
- this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
+ this.value = new AsynchronousFutureResult<Connection>(handler);
}
@@ -674,11 +739,11 @@
- AsynchronousConnection getWaitingConnection()
+ Connection getWaitingConnection()
{
- if (value instanceof AsynchronousConnection)
+ if (value instanceof Connection)
{
- return (AsynchronousConnection) value;
+ return (Connection) value;
}
else
{
@@ -689,11 +754,11 @@
@SuppressWarnings("unchecked")
- AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
+ AsynchronousFutureResult<Connection> getWaitingFuture()
{
if (value instanceof AsynchronousFutureResult)
{
- return (AsynchronousFutureResult<AsynchronousConnection>) value;
+ return (AsynchronousFutureResult<Connection>) value;
}
else
{
@@ -723,8 +788,7 @@
private final Semaphore currentPoolSize;
- private final ResultHandler<AsynchronousConnection> connectionResultHandler =
- new ConnectionResultHandler();
+ private final ResultHandler<Connection> connectionResultHandler = new ConnectionResultHandler();
@@ -752,7 +816,7 @@
@Override
public void close()
{
- final LinkedList<AsynchronousConnection> idleConnections;
+ final LinkedList<Connection> idleConnections;
synchronized (queue)
{
if (isClosed)
@@ -763,7 +827,7 @@
// Remove any connections which are waiting in the queue as these can be
// closed immediately.
- idleConnections = new LinkedList<AsynchronousConnection>();
+ idleConnections = new LinkedList<Connection>();
while (!queue.isEmpty() && !queue.getFirst().isWaitingFuture())
{
final QueueElement holder = queue.removeFirst();
@@ -779,7 +843,7 @@
}
// Close the idle connections.
- for (final AsynchronousConnection connection : idleConnections)
+ for (final Connection connection : idleConnections)
{
closeConnection(connection);
}
@@ -791,8 +855,20 @@
* {@inheritDoc}
*/
@Override
- public FutureResult<AsynchronousConnection> getAsynchronousConnection(
- final ResultHandler<? super AsynchronousConnection> handler)
+ public Connection getConnection() throws ErrorResultException,
+ InterruptedException
+ {
+ return getConnectionAsync(null).get();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public FutureResult<Connection> getConnectionAsync(
+ final ResultHandler<? super Connection> handler)
{
QueueElement holder;
synchronized (queue)
@@ -816,22 +892,21 @@
if (!holder.isWaitingFuture())
{
// There was a completed connection attempt.
- final AsynchronousConnection connection = holder.getWaitingConnection();
+ final Connection connection = holder.getWaitingConnection();
final PooledConnection pooledConnection = new PooledConnection(connection);
if (handler != null)
{
handler.handleResult(pooledConnection);
}
- return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+ return new CompletedFutureResult<Connection>(pooledConnection);
}
else
{
// Grow the pool if needed.
- final FutureResult<AsynchronousConnection> future = holder
- .getWaitingFuture();
+ final FutureResult<Connection> future = holder.getWaitingFuture();
if (!future.isDone() && currentPoolSize.tryAcquire())
{
- factory.getAsynchronousConnection(connectionResultHandler);
+ factory.getConnectionAsync(connectionResultHandler);
}
return future;
}
@@ -869,7 +944,7 @@
- private void closeConnection(final AsynchronousConnection connection)
+ private void closeConnection(final Connection connection)
{
// The connection will be closed, so decrease the pool size.
currentPoolSize.release();
@@ -886,7 +961,7 @@
- private void publishConnection(final AsynchronousConnection connection)
+ private void publishConnection(final Connection connection)
{
final QueueElement holder;
boolean connectionPoolIsClosing = false;
@@ -941,4 +1016,5 @@
holder.getWaitingFuture().handleResult(pooledConnection);
}
}
+
}
--
Gitblit v1.10.0