From 9f2bba679ab597f1e50078a29d145100e3baed3c Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Tue, 19 Oct 2010 16:36:12 +0000
Subject: [PATCH] Make ConnectionPool implementation fully async and fix some race conditions in the unit tests.

---
 sdk/examples/org/opends/sdk/examples/server/proxy/Main.java |  432 ++++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 290 insertions(+), 142 deletions(-)

diff --git a/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java b/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
index c95fddf..204b68e 100644
--- a/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
+++ b/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
@@ -36,11 +36,9 @@
 import java.util.List;
 
 import org.opends.sdk.*;
+import org.opends.sdk.controls.ProxiedAuthV2RequestControl;
 import org.opends.sdk.requests.*;
-import org.opends.sdk.responses.BindResult;
-import org.opends.sdk.responses.CompareResult;
-import org.opends.sdk.responses.ExtendedResult;
-import org.opends.sdk.responses.Result;
+import org.opends.sdk.responses.*;
 
 
 
@@ -73,37 +71,127 @@
         ServerConnection<Integer>
     {
 
-      private volatile AsynchronousConnection connection = null;
-
-      private volatile boolean isUnbindRequired = false;
-
-
-
-      private AsynchronousConnection getConnection()
-          throws ErrorResultException
+      private abstract class AbstractRequestCompletionHandler
+          <R extends Result, H extends ResultHandler<? super R>>
+          implements ResultHandler<R>
       {
-        if (connection == null)
+        final H resultHandler;
+        final AsynchronousConnection connection;
+
+
+
+        AbstractRequestCompletionHandler(
+            final AsynchronousConnection connection, final H resultHandler)
         {
-          synchronized (this)
-          {
-            if (connection == null)
-            {
-              try
-              {
-                connection = factory.getAsynchronousConnection(null).get();
-              }
-              catch (InterruptedException e)
-              {
-                throw new RuntimeException(e);
-              }
-            }
-          }
+          this.connection = connection;
+          this.resultHandler = resultHandler;
         }
-        return connection;
+
+
+
+        @Override
+        public final void handleErrorResult(final ErrorResultException error)
+        {
+          connection.close();
+          resultHandler.handleErrorResult(error);
+        }
+
+
+
+        @Override
+        public final void handleResult(final R result)
+        {
+          connection.close();
+          resultHandler.handleResult(result);
+        }
+
       }
 
 
 
+      private abstract class ConnectionCompletionHandler<R extends Result>
+          implements ResultHandler<AsynchronousConnection>
+      {
+        private final ResultHandler<? super R> resultHandler;
+
+
+
+        ConnectionCompletionHandler(final ResultHandler<? super R> resultHandler)
+        {
+          this.resultHandler = resultHandler;
+        }
+
+
+
+        @Override
+        public final void handleErrorResult(final ErrorResultException error)
+        {
+          resultHandler.handleErrorResult(error);
+        }
+
+
+
+        @Override
+        public abstract void handleResult(AsynchronousConnection connection);
+
+      }
+
+
+
+      private final class RequestCompletionHandler<R extends Result> extends
+          AbstractRequestCompletionHandler<R, ResultHandler<? super R>>
+      {
+        RequestCompletionHandler(final AsynchronousConnection connection,
+            final ResultHandler<? super R> resultHandler)
+        {
+          super(connection, resultHandler);
+        }
+      }
+
+
+
+      private final class SearchRequestCompletionHandler extends
+          AbstractRequestCompletionHandler<Result, SearchResultHandler>
+          implements SearchResultHandler
+      {
+
+        SearchRequestCompletionHandler(final AsynchronousConnection connection,
+            final SearchResultHandler resultHandler)
+        {
+          super(connection, resultHandler);
+        }
+
+
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public final boolean handleEntry(final SearchResultEntry entry)
+        {
+          return resultHandler.handleEntry(entry);
+        }
+
+
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public final boolean handleReference(
+            final SearchResultReference reference)
+        {
+          return resultHandler.handleReference(reference);
+        }
+
+      }
+
+
+
+      private volatile ProxiedAuthV2RequestControl proxiedAuthControl = null;
+
+
+
       private ServerConnectionImpl(final LDAPClientContext clientContext)
       {
         // Nothing to do.
@@ -133,15 +221,22 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().add(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<Result> innerHandler =
+              new RequestCompletionHandler<Result>(connection, resultHandler);
+            connection.add(request, innerHandler, intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -156,6 +251,7 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
+
         if (request.getAuthenticationType() != ((byte) 0x80))
         {
           // TODO: SASL authentication not implemented.
@@ -166,20 +262,45 @@
         }
         else
         {
-          // Note that this connection has received a bind request: the
-          // connection should be reverted back to anonymous when the client
-          // unbinds.
-          isUnbindRequired = true;
+          // Authenticate using a separate bind connection pool, because we
+          // don't want to change the state of the pooled connection.
+          final ConnectionCompletionHandler<BindResult> outerHandler =
+            new ConnectionCompletionHandler<BindResult>(resultHandler)
+          {
 
-          try
-          {
-            getConnection().bind(request, resultHandler,
-                intermediateResponseHandler);
-          }
-          catch (ErrorResultException e)
-          {
-            resultHandler.handleErrorResult(e);
-          }
+            @Override
+            public void handleResult(final AsynchronousConnection connection)
+            {
+              final ResultHandler<BindResult> innerHandler = new ResultHandler<BindResult>()
+              {
+
+                @Override
+                public final void handleErrorResult(
+                    final ErrorResultException error)
+                {
+                  connection.close();
+                  resultHandler.handleErrorResult(error);
+                }
+
+
+
+                @Override
+                public final void handleResult(final BindResult result)
+                {
+                  connection.close();
+                  proxiedAuthControl = ProxiedAuthV2RequestControl
+                      .newControl("dn:" + request.getName());
+                  resultHandler.handleResult(result);
+                }
+              };
+              connection.bind(request, innerHandler,
+                  intermediateResponseHandler);
+            }
+
+          };
+
+          proxiedAuthControl = null;
+          bindFactory.getAsynchronousConnection(outerHandler);
         }
       }
 
@@ -195,15 +316,23 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<CompareResult> outerHandler =
+          new ConnectionCompletionHandler<CompareResult>(resultHandler)
         {
-          getConnection().compare(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<CompareResult> innerHandler =
+              new RequestCompletionHandler<CompareResult>(connection, resultHandler);
+            connection.compare(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -215,8 +344,7 @@
       public void handleConnectionClosed(final Integer requestContext,
           final UnbindRequest request)
       {
-        // Client connection closed: release the proxy connection.
-        close();
+        // Client connection closed.
       }
 
 
@@ -228,8 +356,7 @@
       public void handleConnectionDisconnected(final ResultCode resultCode,
           final String message)
       {
-        // Client disconnected by server: release the proxy connection.
-        close();
+        // Client disconnected by server.
       }
 
 
@@ -240,8 +367,7 @@
       @Override
       public void handleConnectionError(final Throwable error)
       {
-        // Client connection failed: release the proxy connection.
-        close();
+        // Client connection failed.
       }
 
 
@@ -256,15 +382,23 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().delete(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<Result> innerHandler =
+              new RequestCompletionHandler<Result>(connection, resultHandler);
+            connection.delete(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -296,15 +430,24 @@
         else
         {
           // Forward all other extended operations.
-          try
+          addProxiedAuthControl(request);
+
+          final ConnectionCompletionHandler<R> outerHandler =
+            new ConnectionCompletionHandler<R>(resultHandler)
           {
-            getConnection().extendedRequest(request, resultHandler,
-                intermediateResponseHandler);
-          }
-          catch (ErrorResultException e)
-          {
-            resultHandler.handleErrorResult(e);
-          }
+
+            @Override
+            public void handleResult(final AsynchronousConnection connection)
+            {
+              final RequestCompletionHandler<R> innerHandler =
+                new RequestCompletionHandler<R>(connection, resultHandler);
+              connection.extendedRequest(request, innerHandler,
+                  intermediateResponseHandler);
+            }
+
+          };
+
+          factory.getAsynchronousConnection(outerHandler);
         }
       }
 
@@ -320,15 +463,23 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().modify(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<Result> innerHandler =
+              new RequestCompletionHandler<Result>(connection, resultHandler);
+            connection.modify(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -343,15 +494,23 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().modifyDN(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<Result> innerHandler =
+              new RequestCompletionHandler<Result>(connection, resultHandler);
+            connection.modifyDN(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -365,54 +524,33 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().search(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final SearchRequestCompletionHandler innerHandler =
+              new SearchRequestCompletionHandler(connection, resultHandler);
+            connection.search(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
 
-      private void close()
+      private void addProxiedAuthControl(final Request request)
       {
-        if (isUnbindRequired)
+        final ProxiedAuthV2RequestControl control = proxiedAuthControl;
+        if (control != null)
         {
-          synchronized (this)
-          {
-            if (connection != null)
-            {
-              connection.bind(Requests.newSimpleBindRequest(),
-                  new ResultHandler<Result>()
-                  {
-
-                    public void handleErrorResult(ErrorResultException error)
-                    {
-                      // The rebind failed - this is bad because if the
-                      // connection is pooled it will remain authenticated as
-                      // the wrong user.
-                      handleResult(error.getResult());
-                    }
-
-
-
-                    public void handleResult(Result result)
-                    {
-                      synchronized (ServerConnectionImpl.this)
-                      {
-                        if (connection != null)
-                        {
-                          connection.close();
-                        }
-                      }
-                    }
-                  });
-            }
-          }
+          request.addControl(control);
         }
       }
 
@@ -421,12 +559,15 @@
 
 
     private final ConnectionFactory factory;
+    private final ConnectionFactory bindFactory;
 
 
 
-    private Proxy(final ConnectionFactory factory)
+    private Proxy(final ConnectionFactory factory,
+        final ConnectionFactory bindFactory)
     {
       this.factory = factory;
+      this.bindFactory = bindFactory;
     }
 
 
@@ -467,18 +608,25 @@
 
     // Create load balancer.
     final List<ConnectionFactory> factories = new LinkedList<ConnectionFactory>();
+    final List<ConnectionFactory> bindFactories = new LinkedList<ConnectionFactory>();
     for (int i = 2; i < args.length; i += 2)
     {
       final String remoteAddress = args[i];
       final int remotePort = Integer.parseInt(args[i + 1]);
 
-      // factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
-      //    remoteAddress, remotePort), Integer.MAX_VALUE));
-      factories.add(new LDAPConnectionFactory(remoteAddress, remotePort));
+      factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
+          remoteAddress, remotePort), Integer.MAX_VALUE));
+      bindFactories.add(Connections.newConnectionPool(
+          new LDAPConnectionFactory(remoteAddress, remotePort),
+          Integer.MAX_VALUE));
     }
     final RoundRobinLoadBalancingAlgorithm algorithm = new RoundRobinLoadBalancingAlgorithm(
         factories);
+    final RoundRobinLoadBalancingAlgorithm bindAlgorithm = new RoundRobinLoadBalancingAlgorithm(
+        bindFactories);
     final ConnectionFactory factory = Connections.newLoadBalancer(algorithm);
+    final ConnectionFactory bindFactory = Connections
+        .newLoadBalancer(bindAlgorithm);
 
     // Create listener.
     final LDAPListenerOptions options = new LDAPListenerOptions()
@@ -486,8 +634,8 @@
     LDAPListener listener = null;
     try
     {
-      listener = new LDAPListener(localAddress, localPort, new Proxy(factory),
-          options);
+      listener = new LDAPListener(localAddress, localPort, new Proxy(factory,
+          bindFactory), options);
       System.out.println("Press any key to stop the server...");
       System.in.read();
     }

--
Gitblit v1.10.0