From abc1a19fd4dee9729fd0aed721575a396d249bd4 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Wed, 16 Dec 2009 22:13:34 +0000
Subject: [PATCH] Migrate remaining future impls over to new future APIs.
---
sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java | 94 ++--
sdk/src/com/sun/opends/sdk/ldap/LDAPFutureResultImpl.java | 6
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java | 130 -----
sdk/src/com/sun/opends/sdk/util/RecursiveFutureResult.java | 24
sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java | 327 ++++----------
sdk/src/com/sun/opends/sdk/ldap/LDAPCompareFutureResultImpl.java | 6
sdk/src/com/sun/opends/sdk/util/FutureResultTransformer.java | 4
sdk/src/com/sun/opends/sdk/ldap/AbstractLDAPFutureResultImpl.java | 9
sdk/src/com/sun/opends/sdk/util/CompletedFutureResult.java | 179 ++++++++
sdk/src/org/opends/sdk/ConnectionFactory.java | 6
sdk/src/com/sun/opends/sdk/ldap/LDAPExtendedFutureResultImpl.java | 6
sdk/src/org/opends/sdk/ConnectionPool.java | 287 +++---------
sdk/src/com/sun/opends/sdk/ldap/LDAPSearchFutureResultImpl.java | 6
sdk/src/com/sun/opends/sdk/ldap/LDAPBindFutureResultImpl.java | 6
sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java | 216 ++-------
15 files changed, 517 insertions(+), 789 deletions(-)
diff --git a/sdk/src/com/sun/opends/sdk/ldap/AbstractResultFutureImpl.java b/sdk/src/com/sun/opends/sdk/ldap/AbstractLDAPFutureResultImpl.java
similarity index 90%
rename from sdk/src/com/sun/opends/sdk/ldap/AbstractResultFutureImpl.java
rename to sdk/src/com/sun/opends/sdk/ldap/AbstractLDAPFutureResultImpl.java
index 0cbcf31..8560e57 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/AbstractResultFutureImpl.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/AbstractLDAPFutureResultImpl.java
@@ -41,12 +41,12 @@
/**
- * Abstract result future implementation.
+ * Abstract future result implementation.
*
* @param <S>
* The type of result returned by this future.
*/
-abstract class AbstractResultFutureImpl<S extends Result> extends
+abstract class AbstractLDAPFutureResultImpl<S extends Result> extends
AbstractFutureResult<S> implements FutureResult<S>
{
private final LDAPConnection connection;
@@ -65,7 +65,7 @@
* @param connection
* The client connection.
*/
- AbstractResultFutureImpl(int messageID,
+ AbstractLDAPFutureResultImpl(int messageID,
ResultHandler<? super S> handler, LDAPConnection connection)
{
super(handler);
@@ -78,7 +78,8 @@
/**
* {@inheritDoc}
*/
- protected final ErrorResultException handleCancelRequest()
+ protected final ErrorResultException handleCancelRequest(
+ boolean mayInterruptIfRunning)
{
connection.abandon(Requests.newAbandonRequest(messageID));
return null;
diff --git a/sdk/src/com/sun/opends/sdk/ldap/BindResultFutureImpl.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPBindFutureResultImpl.java
similarity index 92%
rename from sdk/src/com/sun/opends/sdk/ldap/BindResultFutureImpl.java
rename to sdk/src/com/sun/opends/sdk/ldap/LDAPBindFutureResultImpl.java
index 37fb0d3..028d85b 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/BindResultFutureImpl.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPBindFutureResultImpl.java
@@ -42,8 +42,8 @@
/**
* Bind result future implementation.
*/
-final class BindResultFutureImpl extends
- AbstractResultFutureImpl<BindResult> implements
+final class LDAPBindFutureResultImpl extends
+ AbstractLDAPFutureResultImpl<BindResult> implements
FutureResult<BindResult>
{
private final BindRequest request;
@@ -52,7 +52,7 @@
- BindResultFutureImpl(int messageID, BindRequest request,
+ LDAPBindFutureResultImpl(int messageID, BindRequest request,
ResultHandler<? super BindResult> handler,
LDAPConnection connection)
{
diff --git a/sdk/src/com/sun/opends/sdk/ldap/CompareResultFutureImpl.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPCompareFutureResultImpl.java
similarity index 91%
rename from sdk/src/com/sun/opends/sdk/ldap/CompareResultFutureImpl.java
rename to sdk/src/com/sun/opends/sdk/ldap/LDAPCompareFutureResultImpl.java
index 114f775..6e28919 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/CompareResultFutureImpl.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPCompareFutureResultImpl.java
@@ -41,15 +41,15 @@
/**
* Compare result future implementation.
*/
-final class CompareResultFutureImpl extends
- AbstractResultFutureImpl<CompareResult> implements
+final class LDAPCompareFutureResultImpl extends
+ AbstractLDAPFutureResultImpl<CompareResult> implements
FutureResult<CompareResult>
{
private final CompareRequest request;
- CompareResultFutureImpl(int messageID, CompareRequest request,
+ LDAPCompareFutureResultImpl(int messageID, CompareRequest request,
ResultHandler<? super CompareResult> handler,
LDAPConnection connection)
{
diff --git a/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
index d86198a..5d91e23 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -83,13 +83,13 @@
@Override
public void handleAddResult(int messageID, Result result)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof ResultFutureImpl)
+ if (pendingRequest instanceof LDAPFutureResultImpl)
{
- ResultFutureImpl future = (ResultFutureImpl) pendingRequest;
+ LDAPFutureResultImpl future = (LDAPFutureResultImpl) pendingRequest;
if (future.getRequest() instanceof AddRequest)
{
future.setResultOrError(result);
@@ -108,13 +108,13 @@
@Override
public void handleBindResult(int messageID, BindResult result)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof BindResultFutureImpl)
+ if (pendingRequest instanceof LDAPBindFutureResultImpl)
{
- BindResultFutureImpl future = ((BindResultFutureImpl) pendingRequest);
+ LDAPBindFutureResultImpl future = ((LDAPBindFutureResultImpl) pendingRequest);
BindRequest request = future.getRequest();
if (request instanceof SASLBindRequest<?>)
@@ -215,13 +215,13 @@
@Override
public void handleCompareResult(int messageID, CompareResult result)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof CompareResultFutureImpl)
+ if (pendingRequest instanceof LDAPCompareFutureResultImpl)
{
- CompareResultFutureImpl future = (CompareResultFutureImpl) pendingRequest;
+ LDAPCompareFutureResultImpl future = (LDAPCompareFutureResultImpl) pendingRequest;
future.setResultOrError(result);
}
else
@@ -239,13 +239,13 @@
@Override
public void handleDeleteResult(int messageID, Result result)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof ResultFutureImpl)
+ if (pendingRequest instanceof LDAPFutureResultImpl)
{
- ResultFutureImpl future = (ResultFutureImpl) pendingRequest;
+ LDAPFutureResultImpl future = (LDAPFutureResultImpl) pendingRequest;
if (future.getRequest() instanceof DeleteRequest)
{
future.setResultOrError(result);
@@ -322,12 +322,12 @@
}
}
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
- if (pendingRequest instanceof ExtendedResultFutureImpl<?>)
+ if (pendingRequest instanceof LDAPExtendedFutureResultImpl<?>)
{
- ExtendedResultFutureImpl<?> extendedFuture = ((ExtendedResultFutureImpl<?>) pendingRequest);
+ LDAPExtendedFutureResultImpl<?> extendedFuture = ((LDAPExtendedFutureResultImpl<?>) pendingRequest);
try
{
handleExtendedResult0(extendedFuture, result);
@@ -357,7 +357,7 @@
public void handleIntermediateResponse(int messageID,
GenericIntermediateResponse response)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
if (pendingRequest != null)
{
@@ -365,10 +365,10 @@
// FIXME: intermediate responses can occur for all operations.
- // if (pendingRequest instanceof ExtendedResultFutureImpl)
+ // if (pendingRequest instanceof LDAPExtendedFutureResultImpl)
// {
- // ExtendedResultFutureImpl extendedFuture =
- // ((ExtendedResultFutureImpl) pendingRequest);
+ // LDAPExtendedFutureResultImpl extendedFuture =
+ // ((LDAPExtendedFutureResultImpl) pendingRequest);
// ExtendedRequest request = extendedFuture.getRequest();
//
// try
@@ -400,13 +400,13 @@
@Override
public void handleModifyDNResult(int messageID, Result result)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof ResultFutureImpl)
+ if (pendingRequest instanceof LDAPFutureResultImpl)
{
- ResultFutureImpl future = (ResultFutureImpl) pendingRequest;
+ LDAPFutureResultImpl future = (LDAPFutureResultImpl) pendingRequest;
if (future.getRequest() instanceof ModifyDNRequest)
{
future.setResultOrError(result);
@@ -425,13 +425,13 @@
@Override
public void handleModifyResult(int messageID, Result result)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof ResultFutureImpl)
+ if (pendingRequest instanceof LDAPFutureResultImpl)
{
- ResultFutureImpl future = (ResultFutureImpl) pendingRequest;
+ LDAPFutureResultImpl future = (LDAPFutureResultImpl) pendingRequest;
if (future.getRequest() instanceof ModifyRequest)
{
future.setResultOrError(result);
@@ -450,13 +450,13 @@
@Override
public void handleSearchResult(int messageID, Result result)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof SearchResultFutureImpl)
+ if (pendingRequest instanceof LDAPSearchFutureResultImpl)
{
- ((SearchResultFutureImpl) pendingRequest)
+ ((LDAPSearchFutureResultImpl) pendingRequest)
.setResultOrError(result);
}
else
@@ -475,13 +475,13 @@
public void handleSearchResultEntry(int messageID,
SearchResultEntry entry)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.get(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof SearchResultFutureImpl)
+ if (pendingRequest instanceof LDAPSearchFutureResultImpl)
{
- ((SearchResultFutureImpl) pendingRequest)
+ ((LDAPSearchFutureResultImpl) pendingRequest)
.handleSearchResultEntry(entry);
}
else
@@ -500,13 +500,13 @@
public void handleSearchResultReference(int messageID,
SearchResultReference reference)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.get(messageID);
if (pendingRequest != null)
{
- if (pendingRequest instanceof SearchResultFutureImpl)
+ if (pendingRequest instanceof LDAPSearchFutureResultImpl)
{
- ((SearchResultFutureImpl) pendingRequest)
+ ((LDAPSearchFutureResultImpl) pendingRequest)
.handleSearchResultReference(reference);
}
else
@@ -658,7 +658,7 @@
private volatile int pendingBindOrStartTLS = -1;
- private final ConcurrentHashMap<Integer, AbstractResultFutureImpl<?>> pendingRequests = new ConcurrentHashMap<Integer, AbstractResultFutureImpl<?>>();
+ private final ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>> pendingRequests = new ConcurrentHashMap<Integer, AbstractLDAPFutureResultImpl<?>>();
private final InetSocketAddress serverAddress;
@@ -697,7 +697,7 @@
*/
public void abandon(AbandonRequest request)
{
- AbstractResultFutureImpl<?> pendingRequest = pendingRequests
+ AbstractLDAPFutureResultImpl<?> pendingRequest = pendingRequests
.remove(request.getMessageID());
if (pendingRequest != null)
{
@@ -751,7 +751,7 @@
ResultHandler<Result> handler)
{
int messageID = nextMsgID.getAndIncrement();
- ResultFutureImpl future = new ResultFutureImpl(messageID, request,
+ LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request,
handler, this);
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
@@ -830,7 +830,7 @@
ResultHandler<? super BindResult> handler)
{
int messageID = nextMsgID.getAndIncrement();
- BindResultFutureImpl future = new BindResultFutureImpl(messageID,
+ LDAPBindFutureResultImpl future = new LDAPBindFutureResultImpl(messageID,
request, handler, this);
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
@@ -949,7 +949,7 @@
ResultHandler<? super CompareResult> handler)
{
int messageID = nextMsgID.getAndIncrement();
- CompareResultFutureImpl future = new CompareResultFutureImpl(
+ LDAPCompareFutureResultImpl future = new LDAPCompareFutureResultImpl(
messageID, request, handler, this);
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
@@ -1007,7 +1007,7 @@
ResultHandler<Result> handler)
{
int messageID = nextMsgID.getAndIncrement();
- ResultFutureImpl future = new ResultFutureImpl(messageID, request,
+ LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request,
handler, this);
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
@@ -1065,7 +1065,7 @@
ExtendedRequest<R> request, ResultHandler<? super R> handler)
{
int messageID = nextMsgID.getAndIncrement();
- ExtendedResultFutureImpl<R> future = new ExtendedResultFutureImpl<R>(
+ LDAPExtendedFutureResultImpl<R> future = new LDAPExtendedFutureResultImpl<R>(
messageID, request, handler, this);
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
@@ -1142,7 +1142,7 @@
ResultHandler<Result> handler)
{
int messageID = nextMsgID.getAndIncrement();
- ResultFutureImpl future = new ResultFutureImpl(messageID, request,
+ LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request,
handler, this);
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
@@ -1200,7 +1200,7 @@
ResultHandler<Result> handler)
{
int messageID = nextMsgID.getAndIncrement();
- ResultFutureImpl future = new ResultFutureImpl(messageID, request,
+ LDAPFutureResultImpl future = new LDAPFutureResultImpl(messageID, request,
handler, this);
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
@@ -1275,7 +1275,7 @@
SearchResultHandler searchResulthandler)
{
int messageID = nextMsgID.getAndIncrement();
- SearchResultFutureImpl future = new SearchResultFutureImpl(
+ LDAPSearchFutureResultImpl future = new LDAPSearchFutureResultImpl(
messageID, request, resultHandler, searchResulthandler, this);
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
@@ -1393,7 +1393,7 @@
}
// First abort all outstanding requests.
- for (AbstractResultFutureImpl<?> future : pendingRequests
+ for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
.values())
{
if (pendingBindOrStartTLS <= 0)
@@ -1561,7 +1561,7 @@
// Needed in order to expose type information.
private <R extends Result> void handleExtendedResult0(
- ExtendedResultFutureImpl<R> future, GenericExtendedResult result)
+ LDAPExtendedFutureResultImpl<R> future, GenericExtendedResult result)
throws DecodeException
{
R decodedResponse = future.decodeResponse(result.getResultCode(),
@@ -1592,7 +1592,7 @@
private void handleIncorrectResponse(
- AbstractResultFutureImpl<?> pendingRequest)
+ AbstractLDAPFutureResultImpl<?> pendingRequest)
{
// FIXME: I18N need to have a better error message.
Result errorResult = Responses.newResult(
diff --git a/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
index 9392984..d56a8fb 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -33,17 +33,20 @@
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import javax.net.ssl.SSLContext;
import org.opends.sdk.*;
-import org.opends.sdk.ldap.LDAPConnectionOptions;
import org.opends.sdk.controls.*;
import org.opends.sdk.extensions.StartTLSRequest;
+import org.opends.sdk.ldap.LDAPConnectionOptions;
import org.opends.sdk.responses.Responses;
import org.opends.sdk.responses.Result;
+import com.sun.grizzly.CompletionHandler;
+import com.sun.grizzly.Connection;
import com.sun.grizzly.TransportFactory;
import com.sun.grizzly.attributes.Attribute;
import com.sun.grizzly.filterchain.PatternFilterChainFactory;
@@ -53,6 +56,9 @@
import com.sun.grizzly.ssl.SSLFilter;
import com.sun.grizzly.ssl.SSLHandshaker;
import com.sun.grizzly.streams.StreamWriter;
+import com.sun.opends.sdk.util.CompletedFutureResult;
+import com.sun.opends.sdk.util.FutureResultTransformer;
+import com.sun.opends.sdk.util.RecursiveFutureResult;
import com.sun.opends.sdk.util.Validator;
@@ -69,7 +75,7 @@
@Override
protected LDAPMessageHandler getMessageHandler(
- com.sun.grizzly.Connection<?> connection)
+ Connection<?> connection)
{
return ldapConnectionAttr.get(connection).getLDAPMessageHandler();
}
@@ -77,8 +83,7 @@
@Override
- protected void removeMessageHandler(
- com.sun.grizzly.Connection<?> connection)
+ protected void removeMessageHandler(Connection<?> connection)
{
ldapConnectionAttr.remove(connection);
}
@@ -87,161 +92,96 @@
- private static class FailedImpl implements
- FutureResult<AsynchronousConnection>
+ private final class FutureResultImpl implements
+ CompletionHandler<Connection>
{
- private volatile ErrorResultException exception;
+ private final FutureResultTransformer<Result, AsynchronousConnection> futureStartTLSResult;
+
+ private final RecursiveFutureResult<LDAPConnection, Result> futureConnectionResult;
+
+ private LDAPConnection connection;
- private FailedImpl(ErrorResultException exception)
- {
- this.exception = exception;
- }
-
-
-
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- return false;
- }
-
-
-
- public AsynchronousConnection get() throws InterruptedException,
- ErrorResultException
- {
- throw exception;
- }
-
-
-
- public AsynchronousConnection get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException,
- ErrorResultException
- {
- throw exception;
- }
-
-
-
- public boolean isCancelled()
- {
- return false;
- }
-
-
-
- public boolean isDone()
- {
- return false;
- }
-
-
-
- public int getRequestID()
- {
- return -1;
- }
- }
-
-
-
- private class ResultFutureImpl implements
- FutureResult<AsynchronousConnection>,
- com.sun.grizzly.CompletionHandler<com.sun.grizzly.Connection>,
- ResultHandler<Result>
- {
- private volatile AsynchronousConnection connection;
-
- private volatile ErrorResultException exception;
-
- private volatile Future<com.sun.grizzly.Connection> connectFuture;
-
- private volatile FutureResult<?> sslFuture;
-
- private final CountDownLatch latch = new CountDownLatch(1);
-
- private final ResultHandler<? super AsynchronousConnection> handler;
-
- private boolean cancelled;
-
-
-
- private ResultFutureImpl(
+ private FutureResultImpl(
ResultHandler<? super AsynchronousConnection> handler)
{
- this.handler = handler;
- }
-
-
-
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- cancelled = connectFuture.cancel(mayInterruptIfRunning)
- || sslFuture != null
- && sslFuture.cancel(mayInterruptIfRunning);
- if (cancelled)
+ this.futureStartTLSResult = new FutureResultTransformer<Result, AsynchronousConnection>(
+ handler)
{
- latch.countDown();
- }
- return cancelled;
- }
+
+ protected ErrorResultException transformErrorResult(
+ ErrorResultException errorResult)
+ {
+ // Ensure that the connection is closed.
+ try
+ {
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore.
+ }
+ return errorResult;
+ }
- public AsynchronousConnection get() throws InterruptedException,
- ErrorResultException
- {
- latch.await();
- if (cancelled)
+ protected LDAPConnection transformResult(Result result)
+ throws ErrorResultException
+ {
+ return connection;
+ }
+
+ };
+
+ this.futureConnectionResult = new RecursiveFutureResult<LDAPConnection, Result>(
+ futureStartTLSResult)
{
- throw new CancellationException();
- }
- if (exception != null)
- {
- throw exception;
- }
- return connection;
- }
+ protected FutureResult<? extends Result> chainResult(
+ LDAPConnection innerResult,
+ ResultHandler<? super Result> handler)
+ throws ErrorResultException
+ {
+ connection = innerResult;
+ if (options.getSSLContext() != null && options.useStartTLS())
+ {
+ StartTLSRequest startTLS = new StartTLSRequest(options
+ .getSSLContext());
+ return connection.extendedRequest(startTLS, handler);
+ }
- public AsynchronousConnection get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException,
- ErrorResultException
- {
- latch.await(timeout, unit);
- if (cancelled)
- {
- throw new CancellationException();
- }
- if (exception != null)
- {
- throw exception;
- }
- return connection;
- }
+ if (options.getSSLContext() != null)
+ {
+ try
+ {
+ connection.installFilter(sslFilter);
+ connection.performSSLHandshake(sslHandshaker,
+ sslEngineConfigurator);
+ }
+ catch (ErrorResultException errorResult)
+ {
+ try
+ {
+ connection.close();
+ connection = null;
+ }
+ catch (Exception ignored)
+ {
+ }
+ throw errorResult;
+ }
+ }
+ handler.handleResult(null);
+ return new CompletedFutureResult<Result>((Result) null);
+ }
+ };
- public boolean isCancelled()
- {
- return cancelled;
- }
-
-
-
- public boolean isDone()
- {
- return latch.getCount() == 0;
- }
-
-
-
- public int getRequestID()
- {
- return -1;
+ futureStartTLSResult.setFutureResult(futureConnectionResult);
}
@@ -249,7 +189,7 @@
/**
* {@inheritDoc}
*/
- public void cancelled(com.sun.grizzly.Connection connection)
+ public void cancelled(Connection connection)
{
// Ignore this.
}
@@ -259,54 +199,9 @@
/**
* {@inheritDoc}
*/
- public void completed(com.sun.grizzly.Connection connection,
- com.sun.grizzly.Connection result)
+ public void completed(Connection connection, Connection result)
{
- LDAPConnection ldapConn = adaptConnection(connection);
- this.connection = adaptConnection(connection);
-
- if (options.getSSLContext() != null && options.useStartTLS())
- {
- StartTLSRequest startTLS = new StartTLSRequest(options
- .getSSLContext());
- sslFuture = this.connection.extendedRequest(startTLS, this);
- }
- else if (options.getSSLContext() != null)
- {
- try
- {
- ldapConn.installFilter(sslFilter);
- ldapConn.performSSLHandshake(sslHandshaker,
- sslEngineConfigurator);
- latch.countDown();
- if (handler != null)
- {
- handler.handleResult(this.connection);
- }
- }
- catch (CancellationException ce)
- {
- // Handshake cancelled.
- latch.countDown();
- }
- catch (ErrorResultException throwable)
- {
- exception = throwable;
- latch.countDown();
- if (handler != null)
- {
- handler.handleErrorResult(exception);
- }
- }
- }
- else
- {
- latch.countDown();
- if (handler != null)
- {
- handler.handleResult(this.connection);
- }
- }
+ futureConnectionResult.handleResult(adaptConnection(connection));
}
@@ -314,15 +209,10 @@
/**
* {@inheritDoc}
*/
- public void failed(com.sun.grizzly.Connection connection,
- Throwable throwable)
+ public void failed(Connection connection, Throwable throwable)
{
- exception = adaptConnectionException(throwable);
- latch.countDown();
- if (handler != null)
- {
- handler.handleErrorResult(exception);
- }
+ futureConnectionResult
+ .handleErrorResult(adaptConnectionException(throwable));
}
@@ -330,36 +220,11 @@
/**
* {@inheritDoc}
*/
- public void updated(com.sun.grizzly.Connection connection,
- com.sun.grizzly.Connection result)
+ public void updated(Connection connection, Connection result)
{
// Ignore this.
}
-
-
- // This is called when the StartTLS request is successful
- public void handleResult(Result result)
- {
- latch.countDown();
- if (handler != null)
- {
- handler.handleResult(connection);
- }
- }
-
-
-
- // This is called when the StartTLS request is not successful
- public void handleErrorResult(ErrorResultException error)
- {
- exception = error;
- latch.countDown();
- if (handler != null)
- {
- handler.handleErrorResult(exception);
- }
- }
}
@@ -510,17 +375,18 @@
public FutureResult<AsynchronousConnection> getAsynchronousConnection(
ResultHandler<? super AsynchronousConnection> handler)
{
- ResultFutureImpl future = new ResultFutureImpl(handler);
+ FutureResultImpl future = new FutureResultImpl(handler);
try
{
- future.connectFuture = transport.connect(socketAddress, future);
- return future;
+ future.futureConnectionResult.setFutureResult(transport.connect(
+ socketAddress, future));
+ return future.futureStartTLSResult;
}
catch (IOException e)
{
ErrorResultException result = adaptConnectionException(e);
- return new FailedImpl(result);
+ return new CompletedFutureResult<AsynchronousConnection>(result);
}
}
@@ -583,8 +449,7 @@
- private LDAPConnection adaptConnection(
- com.sun.grizzly.Connection<?> connection)
+ private LDAPConnection adaptConnection(Connection<?> connection)
{
// Test shows that its much faster with non block writes but risk
// running out of memory if the server is slow.
diff --git a/sdk/src/com/sun/opends/sdk/ldap/ExtendedResultFutureImpl.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPExtendedFutureResultImpl.java
similarity index 90%
rename from sdk/src/com/sun/opends/sdk/ldap/ExtendedResultFutureImpl.java
rename to sdk/src/com/sun/opends/sdk/ldap/LDAPExtendedFutureResultImpl.java
index 55d0ead..db0454b 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/ExtendedResultFutureImpl.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPExtendedFutureResultImpl.java
@@ -38,14 +38,14 @@
/**
* Extended result future implementation.
*/
-final class ExtendedResultFutureImpl<R extends Result> extends
- AbstractResultFutureImpl<R> implements FutureResult<R>
+final class LDAPExtendedFutureResultImpl<R extends Result> extends
+ AbstractLDAPFutureResultImpl<R> implements FutureResult<R>
{
private final ExtendedRequest<R> request;
- ExtendedResultFutureImpl(int messageID, ExtendedRequest<R> request,
+ LDAPExtendedFutureResultImpl(int messageID, ExtendedRequest<R> request,
ResultHandler<? super R> handler, LDAPConnection connection)
{
super(messageID, handler, connection);
diff --git a/sdk/src/com/sun/opends/sdk/ldap/ResultFutureImpl.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPFutureResultImpl.java
similarity index 90%
rename from sdk/src/com/sun/opends/sdk/ldap/ResultFutureImpl.java
rename to sdk/src/com/sun/opends/sdk/ldap/LDAPFutureResultImpl.java
index fd62983..cfc6287 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/ResultFutureImpl.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPFutureResultImpl.java
@@ -41,14 +41,14 @@
/**
* Result future implementation.
*/
-public final class ResultFutureImpl extends
- AbstractResultFutureImpl<Result> implements FutureResult<Result>
+public final class LDAPFutureResultImpl extends
+ AbstractLDAPFutureResultImpl<Result> implements FutureResult<Result>
{
private final Request request;
- ResultFutureImpl(int messageID, Request request,
+ LDAPFutureResultImpl(int messageID, Request request,
ResultHandler<Result> handler, LDAPConnection connection)
{
super(messageID, handler, connection);
diff --git a/sdk/src/com/sun/opends/sdk/ldap/SearchResultFutureImpl.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPSearchFutureResultImpl.java
similarity index 93%
rename from sdk/src/com/sun/opends/sdk/ldap/SearchResultFutureImpl.java
rename to sdk/src/com/sun/opends/sdk/ldap/LDAPSearchFutureResultImpl.java
index 3ea5df1..9a2f116 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/SearchResultFutureImpl.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPSearchFutureResultImpl.java
@@ -44,8 +44,8 @@
/**
* Search result future implementation.
*/
-final class SearchResultFutureImpl extends
- AbstractResultFutureImpl<Result> implements FutureResult<Result>
+final class LDAPSearchFutureResultImpl extends
+ AbstractLDAPFutureResultImpl<Result> implements FutureResult<Result>
{
private final SearchResultHandler searchResultHandler;
@@ -54,7 +54,7 @@
- SearchResultFutureImpl(int messageID, SearchRequest request,
+ LDAPSearchFutureResultImpl(int messageID, SearchRequest request,
ResultHandler<Result> resultHandler,
SearchResultHandler searchResultHandler, LDAPConnection connection)
{
diff --git a/sdk/src/com/sun/opends/sdk/util/CompletedFutureResult.java b/sdk/src/com/sun/opends/sdk/util/CompletedFutureResult.java
new file mode 100644
index 0000000..83d254d
--- /dev/null
+++ b/sdk/src/com/sun/opends/sdk/util/CompletedFutureResult.java
@@ -0,0 +1,179 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009 Sun Microsystems, Inc.
+ */
+
+package com.sun.opends.sdk.util;
+
+
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.opends.sdk.ErrorResultException;
+import org.opends.sdk.FutureResult;
+
+
+
+/**
+ * An implementation of {@code FutureResult} which can be used in cases
+ * where the result is known in advance, for example, if the result is
+ * obtained synchronously.
+ *
+ * @param <S>
+ * The type of result returned by this future.
+ */
+public final class CompletedFutureResult<S> implements FutureResult<S>
+{
+ private final S result;
+
+ private final ErrorResultException errorResult;
+
+ private final int requestID;
+
+
+
+ /**
+ * Creates a new completed future which will return the provided
+ * result and request ID of {@code -1}.
+ *
+ * @param result
+ * The result, which may be {@code null}.
+ */
+ public CompletedFutureResult(S result)
+ {
+ this(result, -1);
+ }
+
+
+
+ /**
+ * Creates a new completed future which will throw the provided error
+ * result and request ID of {@code -1}.
+ *
+ * @param errorResult
+ * The error result.
+ * @throws NullPointerException
+ * If {@code errorResult} was {@code null}.
+ */
+ public CompletedFutureResult(ErrorResultException errorResult)
+ throws NullPointerException
+ {
+ this(errorResult, -1);
+ }
+
+
+
+ /**
+ * Creates a new completed future which will return the provided
+ * result and request ID.
+ *
+ * @param result
+ * The result, which may be {@code null}.
+ * @param requestID
+ * The request ID.
+ */
+ public CompletedFutureResult(S result, int requestID)
+ {
+ this.result = result;
+ this.errorResult = null;
+ this.requestID = requestID;
+ }
+
+
+
+ /**
+ * Creates a new completed future which will throw the provided error
+ * result and request ID.
+ *
+ * @param errorResult
+ * The error result.
+ * @param requestID
+ * The request ID.
+ * @throws NullPointerException
+ * If {@code errorResult} was {@code null}.
+ */
+ public CompletedFutureResult(ErrorResultException errorResult,
+ int requestID) throws NullPointerException
+ {
+ Validator.ensureNotNull(errorResult);
+ this.result = null;
+ this.errorResult = errorResult;
+ this.requestID = requestID;
+ }
+
+
+
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return false;
+ }
+
+
+
+ public S get() throws ErrorResultException, InterruptedException
+ {
+ if (errorResult == null)
+ {
+ // May be null.
+ return result;
+ }
+ else
+ {
+ throw errorResult;
+ }
+ }
+
+
+
+ public S get(long timeout, TimeUnit unit)
+ throws ErrorResultException, TimeoutException,
+ InterruptedException
+ {
+ return get();
+ }
+
+
+
+ public int getRequestID()
+ {
+ return requestID;
+ }
+
+
+
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+
+
+ public boolean isDone()
+ {
+ return true;
+ }
+
+}
diff --git a/sdk/src/com/sun/opends/sdk/util/FutureResultTransformer.java b/sdk/src/com/sun/opends/sdk/util/FutureResultTransformer.java
index 04eb558..cb5491d 100644
--- a/sdk/src/com/sun/opends/sdk/util/FutureResultTransformer.java
+++ b/sdk/src/com/sun/opends/sdk/util/FutureResultTransformer.java
@@ -55,7 +55,7 @@
private final ResultHandler<? super N> handler;
- private volatile FutureResult<M> future = null;
+ private volatile FutureResult<? extends M> future = null;
// These do not need to be volatile since the future acts as a memory
// barrier.
@@ -194,7 +194,7 @@
* @param future
* The inner future.
*/
- public final void setFutureResult(FutureResult<M> future)
+ public final void setFutureResult(FutureResult<? extends M> future)
{
this.future = future;
}
diff --git a/sdk/src/com/sun/opends/sdk/util/RecursiveFutureResult.java b/sdk/src/com/sun/opends/sdk/util/RecursiveFutureResult.java
index de2e98d..1d96342 100644
--- a/sdk/src/com/sun/opends/sdk/util/RecursiveFutureResult.java
+++ b/sdk/src/com/sun/opends/sdk/util/RecursiveFutureResult.java
@@ -29,6 +29,7 @@
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -64,7 +65,15 @@
public int getRequestID()
{
- return innerFuture.getRequestID();
+ if (innerFuture instanceof FutureResult<?>)
+ {
+ FutureResult<?> tmp = (FutureResult<?>) innerFuture;
+ return tmp.getRequestID();
+ }
+ else
+ {
+ return -1;
+ }
}
@@ -89,11 +98,11 @@
private final FutureResultImpl impl;
- private volatile FutureResult<M> innerFuture = null;
+ private volatile Future<?> innerFuture = null;
// This does not need to be volatile since the inner future acts as a
// memory barrier.
- private FutureResult<N> outerFuture = null;
+ private FutureResult<? extends N> outerFuture = null;
@@ -215,7 +224,7 @@
* @param future
* The inner future.
*/
- public final void setFutureResult(FutureResult<M> future)
+ public final void setFutureResult(Future<?> future)
{
this.innerFuture = future;
}
@@ -239,7 +248,7 @@
* If the outer request could not be invoked and processing
* should terminate.
*/
- protected FutureResult<N> chainErrorResult(
+ protected FutureResult<? extends N> chainErrorResult(
ErrorResultException innerError, ResultHandler<? super N> handler)
throws ErrorResultException
{
@@ -261,7 +270,8 @@
* If the outer request could not be invoked and processing
* should terminate.
*/
- protected abstract FutureResult<N> chainResult(M innerResult,
- ResultHandler<? super N> handler) throws ErrorResultException;
+ protected abstract FutureResult<? extends N> chainResult(
+ M innerResult, ResultHandler<? super N> handler)
+ throws ErrorResultException;
}
diff --git a/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java b/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
index 67f43d5..c20ae3c 100644
--- a/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
+++ b/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
@@ -30,10 +30,6 @@
import java.util.Collection;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.BindResult;
@@ -42,6 +38,8 @@
import org.opends.sdk.responses.SearchResultEntry;
import org.opends.sdk.schema.Schema;
+import com.sun.opends.sdk.util.FutureResultTransformer;
+import com.sun.opends.sdk.util.RecursiveFutureResult;
import com.sun.opends.sdk.util.Validator;
@@ -108,13 +106,10 @@
public FutureResult<AuthenticatedAsynchronousConnection> getAsynchronousConnection(
ResultHandler<? super AuthenticatedAsynchronousConnection> handler)
{
- // TODO: bug here? if allowRebind= false then bind will never
- // happen
- ResultFutureImpl future = new ResultFutureImpl(
- allowRebinds ? request : null, handler);
- future.connectFuture = parentFactory
- .getAsynchronousConnection(future.connectionHandler);
- return future;
+ FutureResultImpl future = new FutureResultImpl(request, handler);
+ future.futureConnectionResult.setFutureResult(parentFactory
+ .getAsynchronousConnection(future.futureConnectionResult));
+ return future.futureBindResult;
}
@@ -560,169 +555,68 @@
- private static final class ResultFutureImpl implements
- FutureResult<AuthenticatedAsynchronousConnection>
+ private static final class FutureResultImpl
{
- private final class ConnectionResultHandler implements
- ResultHandler<AsynchronousConnection>
- {
- public void handleResult(AsynchronousConnection conn)
- {
- connection = conn;
- bindFuture = connection.bind(request, bindHandler);
- }
+ private final FutureResultTransformer<BindResult, AuthenticatedAsynchronousConnection> futureBindResult;
+
+ private final RecursiveFutureResult<AsynchronousConnection, BindResult> futureConnectionResult;
+
+ private final BindRequest bindRequest;
+
+ private AsynchronousConnection connection;
- public void handleErrorResult(ErrorResultException error)
- {
- exception = error;
- latch.countDown();
- }
- }
-
-
-
- private final class BindRequestResultHandler implements
- ResultHandler<BindResult>
- {
- public void handleResult(BindResult result)
- {
- // FIXME: should make the result unmodifiable.
- authenticatedConnection = new AuthenticatedAsynchronousConnection(
- connection, request, result);
- latch.countDown();
- if (handler != null)
- {
- handler.handleResult(authenticatedConnection);
- }
- }
-
-
-
- public void handleErrorResult(ErrorResultException error)
- {
- // Ensure that the connection is closed.
- try
- {
- connection.close();
- }
- catch (Exception e)
- {
- // Ignore.
- }
-
- exception = error;
- latch.countDown();
- if (handler != null)
- {
- handler.handleErrorResult(exception);
- }
- }
- }
-
-
-
- private final ResultHandler<BindResult> bindHandler = new BindRequestResultHandler();
-
- private final ResultHandler<AsynchronousConnection> connectionHandler = new ConnectionResultHandler();
-
- private volatile AuthenticatedAsynchronousConnection authenticatedConnection;
-
- private volatile AsynchronousConnection connection;
-
- private volatile ErrorResultException exception;
-
- private volatile FutureResult<?> connectFuture;
-
- private volatile FutureResult<BindResult> bindFuture;
-
- private final CountDownLatch latch = new CountDownLatch(1);
-
- private final ResultHandler<? super AuthenticatedAsynchronousConnection> handler;
-
- private boolean cancelled;
-
- private final BindRequest request;
-
-
-
- private ResultFutureImpl(
+ private FutureResultImpl(
BindRequest request,
ResultHandler<? super AuthenticatedAsynchronousConnection> handler)
{
- this.request = request;
- this.handler = handler;
- }
-
-
-
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- cancelled = connectFuture.cancel(mayInterruptIfRunning)
- || bindFuture != null
- && bindFuture.cancel(mayInterruptIfRunning);
- if (cancelled)
+ this.bindRequest = request;
+ this.futureBindResult = new FutureResultTransformer<BindResult, AuthenticatedAsynchronousConnection>(
+ handler)
{
- latch.countDown();
- }
- return cancelled;
- }
+
+ protected ErrorResultException transformErrorResult(
+ ErrorResultException errorResult)
+ {
+ // Ensure that the connection is closed.
+ try
+ {
+ connection.close();
+ connection = null;
+ }
+ catch (Exception e)
+ {
+ // Ignore.
+ }
+ return errorResult;
+ }
- public AuthenticatedAsynchronousConnection get()
- throws InterruptedException, ErrorResultException
- {
- latch.await();
- if (cancelled)
+ protected AuthenticatedAsynchronousConnection transformResult(
+ BindResult result) throws ErrorResultException
+ {
+ // FIXME: should make the result unmodifiable.
+ return new AuthenticatedAsynchronousConnection(connection,
+ bindRequest, result);
+ }
+
+ };
+ this.futureConnectionResult = new RecursiveFutureResult<AsynchronousConnection, BindResult>(
+ futureBindResult)
{
- throw new CancellationException();
- }
- if (exception != null)
- {
- throw exception;
- }
- return authenticatedConnection;
- }
-
-
- public AuthenticatedAsynchronousConnection get(long timeout,
- TimeUnit unit) throws InterruptedException, TimeoutException,
- ErrorResultException
- {
- latch.await(timeout, unit);
- if (cancelled)
- {
- throw new CancellationException();
- }
- if (exception != null)
- {
- throw exception;
- }
- return authenticatedConnection;
- }
-
-
-
- public boolean isCancelled()
- {
- return cancelled;
- }
-
-
-
- public boolean isDone()
- {
- return latch.getCount() == 0;
- }
-
-
-
- public int getRequestID()
- {
- return -1;
+ protected FutureResult<? extends BindResult> chainResult(
+ AsynchronousConnection innerResult,
+ ResultHandler<? super BindResult> handler)
+ throws ErrorResultException
+ {
+ connection = innerResult;
+ return connection.bind(bindRequest, handler);
+ }
+ };
+ futureBindResult.setFutureResult(futureConnectionResult);
}
}
diff --git a/sdk/src/org/opends/sdk/ConnectionFactory.java b/sdk/src/org/opends/sdk/ConnectionFactory.java
index a9c55cb..67aab1a 100644
--- a/sdk/src/org/opends/sdk/ConnectionFactory.java
+++ b/sdk/src/org/opends/sdk/ConnectionFactory.java
@@ -73,9 +73,9 @@
* Initiates an asynchronous connection request to the Directory
* Server associated with this connection factory. The returned
* {@code FutureResult} can be used to retrieve the completed
- * asynchronous connection. Alternatively, if a {@code
- * ResultHandler} is provided, the handler will be notified
- * when the connection is available and ready for use.
+ * asynchronous connection. Alternatively, if a {@code ResultHandler}
+ * is provided, the handler will be notified when the connection is
+ * available and ready for use.
*
* @param handler
* The completion handler, or {@code null} if no handler is
diff --git a/sdk/src/org/opends/sdk/ConnectionPool.java b/sdk/src/org/opends/sdk/ConnectionPool.java
index 73aad16..e002423 100644
--- a/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -32,15 +32,15 @@
import java.util.Collection;
import java.util.Stack;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
+import com.sun.opends.sdk.util.AbstractFutureResult;
+import com.sun.opends.sdk.util.CompletedFutureResult;
+import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.StaticUtils;
@@ -48,7 +48,7 @@
/**
* A simple connection pool implementation.
*/
-public class ConnectionPool extends
+public final class ConnectionPool extends
AbstractConnectionFactory<AsynchronousConnection>
{
private final ConnectionFactory<?> connectionFactory;
@@ -60,12 +60,33 @@
// FIXME: should use a better collection than this - CLQ?
private final Stack<AsynchronousConnection> pool;
- private final ConcurrentLinkedQueue<PendingResultFuture> pendingFutures;
+ private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
private final Object lock = new Object();
+ private final class FutureNewConnection
+ extends
+ FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
+ {
+ private FutureNewConnection(
+ ResultHandler<? super AsynchronousConnection> handler)
+ {
+ super(handler);
+ }
+
+
+
+ protected AsynchronousConnection transformResult(
+ AsynchronousConnection result) throws ErrorResultException
+ {
+ return new PooledConnectionWapper(result);
+ }
+ }
+
+
+
private final class PooledConnectionWapper implements
AsynchronousConnection, ConnectionEventListener
{
@@ -144,24 +165,37 @@
return;
}
- // See if there waiters pending
- PendingResultFuture future = pendingFutures.poll();
- if (future != null)
+ // See if there waiters pending.
+ for (;;)
{
+ FuturePooledConnection future = pendingFutures.poll();
+
+ if (future == null)
+ {
+ // No waiters - so drop out and add connection to pool.
+ break;
+ }
+
PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
connection);
- future.connection(pooledConnection);
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ future.handleResult(pooledConnection);
+
+ if (!future.isCancelled())
{
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection released to pool and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
+ // The future was not cancelled and the connection was
+ // accepted.
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection released to pool and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
+ }
+ return;
}
- return;
}
// No waiters. Put back in pool.
@@ -438,165 +472,26 @@
- private static final class CompletedResultFuture implements
- FutureResult<AsynchronousConnection>
+ // Future used for waiting for pooled connections to become available.
+ private static final class FuturePooledConnection extends
+ AbstractFutureResult<AsynchronousConnection>
{
- private final PooledConnectionWapper connection;
-
-
-
- private CompletedResultFuture(PooledConnectionWapper connection)
- {
- this.connection = connection;
- }
-
-
-
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- return false;
- }
-
-
-
- public AsynchronousConnection get() throws InterruptedException,
- ErrorResultException
- {
- return connection;
- }
-
-
-
- public AsynchronousConnection get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException,
- ErrorResultException
- {
- return connection;
- }
-
-
-
- public boolean isCancelled()
- {
- return false;
- }
-
-
-
- public boolean isDone()
- {
- return true;
- }
-
-
-
- public int getRequestID()
- {
- return -1;
- }
- }
-
-
-
- private final class PendingResultFuture implements
- FutureResult<AsynchronousConnection>
- {
- private volatile boolean isCancelled;
-
- private volatile PooledConnectionWapper connection;
-
- private volatile ErrorResultException err;
-
- private final ResultHandler<? super AsynchronousConnection> handler;
-
- private final CountDownLatch latch = new CountDownLatch(1);
-
-
-
- private PendingResultFuture(
+ private FuturePooledConnection(
ResultHandler<? super AsynchronousConnection> handler)
{
- this.handler = handler;
+ super(handler);
}
- public synchronized boolean cancel(boolean mayInterruptIfRunning)
- {
- return pendingFutures.remove(this) && (isCancelled = true);
- }
-
-
-
- public AsynchronousConnection get() throws InterruptedException,
- ErrorResultException
- {
- latch.await();
- if (err != null)
- {
- throw err;
- }
- return connection;
- }
-
-
-
- public AsynchronousConnection get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException,
- ErrorResultException
- {
- latch.await(timeout, unit);
- if (err != null)
- {
- throw err;
- }
- return connection;
- }
-
-
-
- public synchronized boolean isCancelled()
- {
- return isCancelled;
- }
-
-
-
- public boolean isDone()
- {
- return latch.getCount() == 0;
- }
-
-
-
+ /**
+ * {@inheritDoc}
+ */
public int getRequestID()
{
return -1;
}
-
-
- private void connection(PooledConnectionWapper connection)
- {
- this.connection = connection;
- if (handler != null)
- {
- handler.handleResult(connection);
- }
- latch.countDown();
- }
-
-
-
- private void error(ErrorResultException e)
- {
- this.err = e;
- if (handler != null)
- {
- handler.handleErrorResult(e);
- }
- latch.countDown();
- }
}
@@ -604,7 +499,7 @@
/**
* Creates a new connection pool which will maintain {@code poolSize}
* connections created using the provided connection factory.
- *
+ *
* @param connectionFactory
* The connection factory to use for creating new
* connections.
@@ -617,38 +512,7 @@
this.connectionFactory = connectionFactory;
this.poolSize = poolSize;
this.pool = new Stack<AsynchronousConnection>();
- this.pendingFutures = new ConcurrentLinkedQueue<PendingResultFuture>();
- }
-
-
-
- private final class WrapResultHandler implements
- ResultHandler<AsynchronousConnection>
- {
- private final PendingResultFuture future;
-
-
-
- private WrapResultHandler(PendingResultFuture future)
- {
- this.future = future;
- }
-
-
-
- public void handleResult(AsynchronousConnection connection)
- {
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
- future.connection(pooledConnection);
- }
-
-
-
- public void handleErrorResult(ErrorResultException error)
- {
- future.error(error);
- }
+ this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
}
@@ -679,19 +543,21 @@
{
handler.handleResult(pooledConnection);
}
- return new CompletedResultFuture(pooledConnection);
+ return new CompletedFutureResult<AsynchronousConnection>(
+ pooledConnection);
}
- PendingResultFuture pendingFuture = new PendingResultFuture(
- handler);
// Pool was empty. Maybe a new connection if pool size is not
// reached
if (numConnections < poolSize)
{
+ // We can create a new connection.
numConnections++;
- WrapResultHandler wrapHandler = new WrapResultHandler(
- pendingFuture);
- connectionFactory.getAsynchronousConnection(wrapHandler);
+
+ FutureNewConnection future = new FutureNewConnection(handler);
+ future.setFutureResult(connectionFactory
+ .getAsynchronousConnection(future));
+
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
StaticUtils.DEBUG_LOG
@@ -702,11 +568,16 @@
numConnections, pool.size(), pendingFutures
.size()));
}
+
+ return future;
}
else
{
- // Have to wait
- pendingFutures.add(pendingFuture);
+ // Pool is full so wait for a connection to become available.
+ FuturePooledConnection future = new FuturePooledConnection(
+ handler);
+ pendingFutures.add(future);
+
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
StaticUtils.DEBUG_LOG
@@ -717,9 +588,9 @@
numConnections, pool.size(), pendingFutures
.size()));
}
- }
- return pendingFuture;
+ return future;
+ }
}
}
}
diff --git a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index 619f880..d917444 100644
--- a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,15 +32,12 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
+import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.Validator;
@@ -437,126 +434,37 @@
- private final class ResultFutureImpl implements
- FutureResult<AsynchronousConnection>,
+ private final class FutureResultImpl
+ extends
+ FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
+ implements FutureResult<AsynchronousConnection>,
ResultHandler<AsynchronousConnection>
{
- private volatile AsynchronousConnectionImpl heartBeatConnection;
- private volatile ErrorResultException exception;
-
- private volatile FutureResult<?> connectFuture;
-
- private final CountDownLatch latch = new CountDownLatch(1);
-
- private final ResultHandler<? super AsynchronousConnectionImpl> handler;
-
- private boolean cancelled;
-
-
-
- private ResultFutureImpl(
- ResultHandler<? super AsynchronousConnectionImpl> handler)
+ private FutureResultImpl(
+ ResultHandler<? super AsynchronousConnection> handler)
{
- this.handler = handler;
+ super(handler);
}
- public boolean cancel(boolean mayInterruptIfRunning)
+ /**
+ * {@inheritDoc}
+ */
+ protected AsynchronousConnection transformResult(
+ AsynchronousConnection connection) throws ErrorResultException
{
- cancelled = connectFuture.cancel(mayInterruptIfRunning);
- if (cancelled)
- {
- latch.countDown();
- }
- return cancelled;
- }
-
-
-
- public AsynchronousConnectionImpl get()
- throws InterruptedException, ErrorResultException
- {
- latch.await();
- if (cancelled)
- {
- throw new CancellationException();
- }
- if (exception != null)
- {
- throw exception;
- }
- return heartBeatConnection;
- }
-
-
-
- public AsynchronousConnectionImpl get(long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException,
- ErrorResultException
- {
- latch.await(timeout, unit);
- if (cancelled)
- {
- throw new CancellationException();
- }
- if (exception != null)
- {
- throw exception;
- }
- return heartBeatConnection;
- }
-
-
-
- public boolean isCancelled()
- {
- return cancelled;
- }
-
-
-
- public boolean isDone()
- {
- return latch.getCount() == 0;
- }
-
-
-
- public int getRequestID()
- {
- return -1;
- }
-
-
-
- public void handleResult(AsynchronousConnection connection)
- {
- heartBeatConnection = new AsynchronousConnectionImpl(connection);
+ AsynchronousConnectionImpl heartBeatConnection = new AsynchronousConnectionImpl(
+ connection);
synchronized (activeConnections)
{
connection.addConnectionEventListener(heartBeatConnection);
activeConnections.add(heartBeatConnection);
}
- if (handler != null)
- {
- handler.handleResult(heartBeatConnection);
- }
- latch.countDown();
+ return heartBeatConnection;
}
-
-
- public void handleErrorResult(ErrorResultException error)
- {
- exception = error;
- if (handler != null)
- {
- handler.handleErrorResult(error);
- }
- latch.countDown();
- }
}
@@ -564,9 +472,9 @@
public FutureResult<AsynchronousConnection> getAsynchronousConnection(
ResultHandler<? super AsynchronousConnection> handler)
{
- ResultFutureImpl future = new ResultFutureImpl(handler);
- future.connectFuture = parentFactory
- .getAsynchronousConnection(future);
+ FutureResultImpl future = new FutureResultImpl(handler);
+ future.setFutureResult(parentFactory
+ .getAsynchronousConnection(future));
return future;
}
}
--
Gitblit v1.10.0