From 9f2bba679ab597f1e50078a29d145100e3baed3c Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Tue, 19 Oct 2010 16:36:12 +0000
Subject: [PATCH] Make ConnectionPool implementation fully async and fix some race conditions in the unit tests.
---
sdk/examples/org/opends/sdk/examples/server/proxy/Main.java | 432 ++++++++++++++++++++++++++++++++++++-----------------
1 files changed, 290 insertions(+), 142 deletions(-)
diff --git a/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java b/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
index c95fddf..204b68e 100644
--- a/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
+++ b/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
@@ -36,11 +36,9 @@
import java.util.List;
import org.opends.sdk.*;
+import org.opends.sdk.controls.ProxiedAuthV2RequestControl;
import org.opends.sdk.requests.*;
-import org.opends.sdk.responses.BindResult;
-import org.opends.sdk.responses.CompareResult;
-import org.opends.sdk.responses.ExtendedResult;
-import org.opends.sdk.responses.Result;
+import org.opends.sdk.responses.*;
@@ -73,37 +71,127 @@
ServerConnection<Integer>
{
- private volatile AsynchronousConnection connection = null;
-
- private volatile boolean isUnbindRequired = false;
-
-
-
- private AsynchronousConnection getConnection()
- throws ErrorResultException
+ private abstract class AbstractRequestCompletionHandler
+ <R extends Result, H extends ResultHandler<? super R>>
+ implements ResultHandler<R>
{
- if (connection == null)
+ final H resultHandler;
+ final AsynchronousConnection connection;
+
+
+
+ AbstractRequestCompletionHandler(
+ final AsynchronousConnection connection, final H resultHandler)
{
- synchronized (this)
- {
- if (connection == null)
- {
- try
- {
- connection = factory.getAsynchronousConnection(null).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
+ this.connection = connection;
+ this.resultHandler = resultHandler;
}
- return connection;
+
+
+
+ @Override
+ public final void handleErrorResult(final ErrorResultException error)
+ {
+ connection.close();
+ resultHandler.handleErrorResult(error);
+ }
+
+
+
+ @Override
+ public final void handleResult(final R result)
+ {
+ connection.close();
+ resultHandler.handleResult(result);
+ }
+
}
+ private abstract class ConnectionCompletionHandler<R extends Result>
+ implements ResultHandler<AsynchronousConnection>
+ {
+ private final ResultHandler<? super R> resultHandler;
+
+
+
+ ConnectionCompletionHandler(final ResultHandler<? super R> resultHandler)
+ {
+ this.resultHandler = resultHandler;
+ }
+
+
+
+ @Override
+ public final void handleErrorResult(final ErrorResultException error)
+ {
+ resultHandler.handleErrorResult(error);
+ }
+
+
+
+ @Override
+ public abstract void handleResult(AsynchronousConnection connection);
+
+ }
+
+
+
+ private final class RequestCompletionHandler<R extends Result> extends
+ AbstractRequestCompletionHandler<R, ResultHandler<? super R>>
+ {
+ RequestCompletionHandler(final AsynchronousConnection connection,
+ final ResultHandler<? super R> resultHandler)
+ {
+ super(connection, resultHandler);
+ }
+ }
+
+
+
+ private final class SearchRequestCompletionHandler extends
+ AbstractRequestCompletionHandler<Result, SearchResultHandler>
+ implements SearchResultHandler
+ {
+
+ SearchRequestCompletionHandler(final AsynchronousConnection connection,
+ final SearchResultHandler resultHandler)
+ {
+ super(connection, resultHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean handleEntry(final SearchResultEntry entry)
+ {
+ return resultHandler.handleEntry(entry);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean handleReference(
+ final SearchResultReference reference)
+ {
+ return resultHandler.handleReference(reference);
+ }
+
+ }
+
+
+
+ private volatile ProxiedAuthV2RequestControl proxiedAuthControl = null;
+
+
+
private ServerConnectionImpl(final LDAPClientContext clientContext)
{
// Nothing to do.
@@ -133,15 +221,22 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().add(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<Result> innerHandler =
+ new RequestCompletionHandler<Result>(connection, resultHandler);
+ connection.add(request, innerHandler, intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -156,6 +251,7 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
+
if (request.getAuthenticationType() != ((byte) 0x80))
{
// TODO: SASL authentication not implemented.
@@ -166,20 +262,45 @@
}
else
{
- // Note that this connection has received a bind request: the
- // connection should be reverted back to anonymous when the client
- // unbinds.
- isUnbindRequired = true;
+ // Authenticate using a separate bind connection pool, because we
+ // don't want to change the state of the pooled connection.
+ final ConnectionCompletionHandler<BindResult> outerHandler =
+ new ConnectionCompletionHandler<BindResult>(resultHandler)
+ {
- try
- {
- getConnection().bind(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final ResultHandler<BindResult> innerHandler = new ResultHandler<BindResult>()
+ {
+
+ @Override
+ public final void handleErrorResult(
+ final ErrorResultException error)
+ {
+ connection.close();
+ resultHandler.handleErrorResult(error);
+ }
+
+
+
+ @Override
+ public final void handleResult(final BindResult result)
+ {
+ connection.close();
+ proxiedAuthControl = ProxiedAuthV2RequestControl
+ .newControl("dn:" + request.getName());
+ resultHandler.handleResult(result);
+ }
+ };
+ connection.bind(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ proxiedAuthControl = null;
+ bindFactory.getAsynchronousConnection(outerHandler);
}
}
@@ -195,15 +316,23 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<CompareResult> outerHandler =
+ new ConnectionCompletionHandler<CompareResult>(resultHandler)
{
- getConnection().compare(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<CompareResult> innerHandler =
+ new RequestCompletionHandler<CompareResult>(connection, resultHandler);
+ connection.compare(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -215,8 +344,7 @@
public void handleConnectionClosed(final Integer requestContext,
final UnbindRequest request)
{
- // Client connection closed: release the proxy connection.
- close();
+ // Client connection closed.
}
@@ -228,8 +356,7 @@
public void handleConnectionDisconnected(final ResultCode resultCode,
final String message)
{
- // Client disconnected by server: release the proxy connection.
- close();
+ // Client disconnected by server.
}
@@ -240,8 +367,7 @@
@Override
public void handleConnectionError(final Throwable error)
{
- // Client connection failed: release the proxy connection.
- close();
+ // Client connection failed.
}
@@ -256,15 +382,23 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().delete(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<Result> innerHandler =
+ new RequestCompletionHandler<Result>(connection, resultHandler);
+ connection.delete(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -296,15 +430,24 @@
else
{
// Forward all other extended operations.
- try
+ addProxiedAuthControl(request);
+
+ final ConnectionCompletionHandler<R> outerHandler =
+ new ConnectionCompletionHandler<R>(resultHandler)
{
- getConnection().extendedRequest(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<R> innerHandler =
+ new RequestCompletionHandler<R>(connection, resultHandler);
+ connection.extendedRequest(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
}
@@ -320,15 +463,23 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().modify(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<Result> innerHandler =
+ new RequestCompletionHandler<Result>(connection, resultHandler);
+ connection.modify(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -343,15 +494,23 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().modifyDN(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<Result> innerHandler =
+ new RequestCompletionHandler<Result>(connection, resultHandler);
+ connection.modifyDN(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -365,54 +524,33 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().search(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final SearchRequestCompletionHandler innerHandler =
+ new SearchRequestCompletionHandler(connection, resultHandler);
+ connection.search(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
- private void close()
+ private void addProxiedAuthControl(final Request request)
{
- if (isUnbindRequired)
+ final ProxiedAuthV2RequestControl control = proxiedAuthControl;
+ if (control != null)
{
- synchronized (this)
- {
- if (connection != null)
- {
- connection.bind(Requests.newSimpleBindRequest(),
- new ResultHandler<Result>()
- {
-
- public void handleErrorResult(ErrorResultException error)
- {
- // The rebind failed - this is bad because if the
- // connection is pooled it will remain authenticated as
- // the wrong user.
- handleResult(error.getResult());
- }
-
-
-
- public void handleResult(Result result)
- {
- synchronized (ServerConnectionImpl.this)
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
- });
- }
- }
+ request.addControl(control);
}
}
@@ -421,12 +559,15 @@
private final ConnectionFactory factory;
+ private final ConnectionFactory bindFactory;
- private Proxy(final ConnectionFactory factory)
+ private Proxy(final ConnectionFactory factory,
+ final ConnectionFactory bindFactory)
{
this.factory = factory;
+ this.bindFactory = bindFactory;
}
@@ -467,18 +608,25 @@
// Create load balancer.
final List<ConnectionFactory> factories = new LinkedList<ConnectionFactory>();
+ final List<ConnectionFactory> bindFactories = new LinkedList<ConnectionFactory>();
for (int i = 2; i < args.length; i += 2)
{
final String remoteAddress = args[i];
final int remotePort = Integer.parseInt(args[i + 1]);
- // factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
- // remoteAddress, remotePort), Integer.MAX_VALUE));
- factories.add(new LDAPConnectionFactory(remoteAddress, remotePort));
+ factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
+ remoteAddress, remotePort), Integer.MAX_VALUE));
+ bindFactories.add(Connections.newConnectionPool(
+ new LDAPConnectionFactory(remoteAddress, remotePort),
+ Integer.MAX_VALUE));
}
final RoundRobinLoadBalancingAlgorithm algorithm = new RoundRobinLoadBalancingAlgorithm(
factories);
+ final RoundRobinLoadBalancingAlgorithm bindAlgorithm = new RoundRobinLoadBalancingAlgorithm(
+ bindFactories);
final ConnectionFactory factory = Connections.newLoadBalancer(algorithm);
+ final ConnectionFactory bindFactory = Connections
+ .newLoadBalancer(bindAlgorithm);
// Create listener.
final LDAPListenerOptions options = new LDAPListenerOptions()
@@ -486,8 +634,8 @@
LDAPListener listener = null;
try
{
- listener = new LDAPListener(localAddress, localPort, new Proxy(factory),
- options);
+ listener = new LDAPListener(localAddress, localPort, new Proxy(factory,
+ bindFactory), options);
System.out.println("Press any key to stop the server...");
System.in.read();
}
--
Gitblit v1.10.0