From 0c70e50ddb4b64d3a82e79bac8afe7c422917812 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Tue, 19 Oct 2010 16:36:12 +0000
Subject: [PATCH] Make ConnectionPool implementation fully async and fix some race conditions in the unit tests.

---
 opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java                   |  432 ++++++++++-----
 opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java |   28 +
 opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java      |  508 ++++++++++--------
 opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java                    |    2 
 opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java                  |    2 
 opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java            |    5 
 opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java                    |   46 +
 opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java                                    |  589 ++++++++++++---------
 8 files changed, 964 insertions(+), 648 deletions(-)

diff --git a/opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java b/opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
index c95fddf..204b68e 100644
--- a/opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
+++ b/opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
@@ -36,11 +36,9 @@
 import java.util.List;
 
 import org.opends.sdk.*;
+import org.opends.sdk.controls.ProxiedAuthV2RequestControl;
 import org.opends.sdk.requests.*;
-import org.opends.sdk.responses.BindResult;
-import org.opends.sdk.responses.CompareResult;
-import org.opends.sdk.responses.ExtendedResult;
-import org.opends.sdk.responses.Result;
+import org.opends.sdk.responses.*;
 
 
 
@@ -73,37 +71,127 @@
         ServerConnection<Integer>
     {
 
-      private volatile AsynchronousConnection connection = null;
-
-      private volatile boolean isUnbindRequired = false;
-
-
-
-      private AsynchronousConnection getConnection()
-          throws ErrorResultException
+      private abstract class AbstractRequestCompletionHandler
+          <R extends Result, H extends ResultHandler<? super R>>
+          implements ResultHandler<R>
       {
-        if (connection == null)
+        final H resultHandler;
+        final AsynchronousConnection connection;
+
+
+
+        AbstractRequestCompletionHandler(
+            final AsynchronousConnection connection, final H resultHandler)
         {
-          synchronized (this)
-          {
-            if (connection == null)
-            {
-              try
-              {
-                connection = factory.getAsynchronousConnection(null).get();
-              }
-              catch (InterruptedException e)
-              {
-                throw new RuntimeException(e);
-              }
-            }
-          }
+          this.connection = connection;
+          this.resultHandler = resultHandler;
         }
-        return connection;
+
+
+
+        @Override
+        public final void handleErrorResult(final ErrorResultException error)
+        {
+          connection.close();
+          resultHandler.handleErrorResult(error);
+        }
+
+
+
+        @Override
+        public final void handleResult(final R result)
+        {
+          connection.close();
+          resultHandler.handleResult(result);
+        }
+
       }
 
 
 
+      private abstract class ConnectionCompletionHandler<R extends Result>
+          implements ResultHandler<AsynchronousConnection>
+      {
+        private final ResultHandler<? super R> resultHandler;
+
+
+
+        ConnectionCompletionHandler(final ResultHandler<? super R> resultHandler)
+        {
+          this.resultHandler = resultHandler;
+        }
+
+
+
+        @Override
+        public final void handleErrorResult(final ErrorResultException error)
+        {
+          resultHandler.handleErrorResult(error);
+        }
+
+
+
+        @Override
+        public abstract void handleResult(AsynchronousConnection connection);
+
+      }
+
+
+
+      private final class RequestCompletionHandler<R extends Result> extends
+          AbstractRequestCompletionHandler<R, ResultHandler<? super R>>
+      {
+        RequestCompletionHandler(final AsynchronousConnection connection,
+            final ResultHandler<? super R> resultHandler)
+        {
+          super(connection, resultHandler);
+        }
+      }
+
+
+
+      private final class SearchRequestCompletionHandler extends
+          AbstractRequestCompletionHandler<Result, SearchResultHandler>
+          implements SearchResultHandler
+      {
+
+        SearchRequestCompletionHandler(final AsynchronousConnection connection,
+            final SearchResultHandler resultHandler)
+        {
+          super(connection, resultHandler);
+        }
+
+
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public final boolean handleEntry(final SearchResultEntry entry)
+        {
+          return resultHandler.handleEntry(entry);
+        }
+
+
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public final boolean handleReference(
+            final SearchResultReference reference)
+        {
+          return resultHandler.handleReference(reference);
+        }
+
+      }
+
+
+
+      private volatile ProxiedAuthV2RequestControl proxiedAuthControl = null;
+
+
+
       private ServerConnectionImpl(final LDAPClientContext clientContext)
       {
         // Nothing to do.
@@ -133,15 +221,22 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().add(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<Result> innerHandler =
+              new RequestCompletionHandler<Result>(connection, resultHandler);
+            connection.add(request, innerHandler, intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -156,6 +251,7 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
+
         if (request.getAuthenticationType() != ((byte) 0x80))
         {
           // TODO: SASL authentication not implemented.
@@ -166,20 +262,45 @@
         }
         else
         {
-          // Note that this connection has received a bind request: the
-          // connection should be reverted back to anonymous when the client
-          // unbinds.
-          isUnbindRequired = true;
+          // Authenticate using a separate bind connection pool, because we
+          // don't want to change the state of the pooled connection.
+          final ConnectionCompletionHandler<BindResult> outerHandler =
+            new ConnectionCompletionHandler<BindResult>(resultHandler)
+          {
 
-          try
-          {
-            getConnection().bind(request, resultHandler,
-                intermediateResponseHandler);
-          }
-          catch (ErrorResultException e)
-          {
-            resultHandler.handleErrorResult(e);
-          }
+            @Override
+            public void handleResult(final AsynchronousConnection connection)
+            {
+              final ResultHandler<BindResult> innerHandler = new ResultHandler<BindResult>()
+              {
+
+                @Override
+                public final void handleErrorResult(
+                    final ErrorResultException error)
+                {
+                  connection.close();
+                  resultHandler.handleErrorResult(error);
+                }
+
+
+
+                @Override
+                public final void handleResult(final BindResult result)
+                {
+                  connection.close();
+                  proxiedAuthControl = ProxiedAuthV2RequestControl
+                      .newControl("dn:" + request.getName());
+                  resultHandler.handleResult(result);
+                }
+              };
+              connection.bind(request, innerHandler,
+                  intermediateResponseHandler);
+            }
+
+          };
+
+          proxiedAuthControl = null;
+          bindFactory.getAsynchronousConnection(outerHandler);
         }
       }
 
@@ -195,15 +316,23 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<CompareResult> outerHandler =
+          new ConnectionCompletionHandler<CompareResult>(resultHandler)
         {
-          getConnection().compare(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<CompareResult> innerHandler =
+              new RequestCompletionHandler<CompareResult>(connection, resultHandler);
+            connection.compare(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -215,8 +344,7 @@
       public void handleConnectionClosed(final Integer requestContext,
           final UnbindRequest request)
       {
-        // Client connection closed: release the proxy connection.
-        close();
+        // Client connection closed.
       }
 
 
@@ -228,8 +356,7 @@
       public void handleConnectionDisconnected(final ResultCode resultCode,
           final String message)
       {
-        // Client disconnected by server: release the proxy connection.
-        close();
+        // Client disconnected by server.
       }
 
 
@@ -240,8 +367,7 @@
       @Override
       public void handleConnectionError(final Throwable error)
       {
-        // Client connection failed: release the proxy connection.
-        close();
+        // Client connection failed.
       }
 
 
@@ -256,15 +382,23 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().delete(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<Result> innerHandler =
+              new RequestCompletionHandler<Result>(connection, resultHandler);
+            connection.delete(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -296,15 +430,24 @@
         else
         {
           // Forward all other extended operations.
-          try
+          addProxiedAuthControl(request);
+
+          final ConnectionCompletionHandler<R> outerHandler =
+            new ConnectionCompletionHandler<R>(resultHandler)
           {
-            getConnection().extendedRequest(request, resultHandler,
-                intermediateResponseHandler);
-          }
-          catch (ErrorResultException e)
-          {
-            resultHandler.handleErrorResult(e);
-          }
+
+            @Override
+            public void handleResult(final AsynchronousConnection connection)
+            {
+              final RequestCompletionHandler<R> innerHandler =
+                new RequestCompletionHandler<R>(connection, resultHandler);
+              connection.extendedRequest(request, innerHandler,
+                  intermediateResponseHandler);
+            }
+
+          };
+
+          factory.getAsynchronousConnection(outerHandler);
         }
       }
 
@@ -320,15 +463,23 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().modify(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<Result> innerHandler =
+              new RequestCompletionHandler<Result>(connection, resultHandler);
+            connection.modify(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -343,15 +494,23 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().modifyDN(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final RequestCompletionHandler<Result> innerHandler =
+              new RequestCompletionHandler<Result>(connection, resultHandler);
+            connection.modifyDN(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
@@ -365,54 +524,33 @@
           final IntermediateResponseHandler intermediateResponseHandler)
           throws UnsupportedOperationException
       {
-        try
+        addProxiedAuthControl(request);
+        final ConnectionCompletionHandler<Result> outerHandler =
+          new ConnectionCompletionHandler<Result>(resultHandler)
         {
-          getConnection().search(request, resultHandler,
-              intermediateResponseHandler);
-        }
-        catch (ErrorResultException e)
-        {
-          resultHandler.handleErrorResult(e);
-        }
+
+          @Override
+          public void handleResult(final AsynchronousConnection connection)
+          {
+            final SearchRequestCompletionHandler innerHandler =
+              new SearchRequestCompletionHandler(connection, resultHandler);
+            connection.search(request, innerHandler,
+                intermediateResponseHandler);
+          }
+
+        };
+
+        factory.getAsynchronousConnection(outerHandler);
       }
 
 
 
-      private void close()
+      private void addProxiedAuthControl(final Request request)
       {
-        if (isUnbindRequired)
+        final ProxiedAuthV2RequestControl control = proxiedAuthControl;
+        if (control != null)
         {
-          synchronized (this)
-          {
-            if (connection != null)
-            {
-              connection.bind(Requests.newSimpleBindRequest(),
-                  new ResultHandler<Result>()
-                  {
-
-                    public void handleErrorResult(ErrorResultException error)
-                    {
-                      // The rebind failed - this is bad because if the
-                      // connection is pooled it will remain authenticated as
-                      // the wrong user.
-                      handleResult(error.getResult());
-                    }
-
-
-
-                    public void handleResult(Result result)
-                    {
-                      synchronized (ServerConnectionImpl.this)
-                      {
-                        if (connection != null)
-                        {
-                          connection.close();
-                        }
-                      }
-                    }
-                  });
-            }
-          }
+          request.addControl(control);
         }
       }
 
@@ -421,12 +559,15 @@
 
 
     private final ConnectionFactory factory;
+    private final ConnectionFactory bindFactory;
 
 
 
-    private Proxy(final ConnectionFactory factory)
+    private Proxy(final ConnectionFactory factory,
+        final ConnectionFactory bindFactory)
     {
       this.factory = factory;
+      this.bindFactory = bindFactory;
     }
 
 
@@ -467,18 +608,25 @@
 
     // Create load balancer.
     final List<ConnectionFactory> factories = new LinkedList<ConnectionFactory>();
+    final List<ConnectionFactory> bindFactories = new LinkedList<ConnectionFactory>();
     for (int i = 2; i < args.length; i += 2)
     {
       final String remoteAddress = args[i];
       final int remotePort = Integer.parseInt(args[i + 1]);
 
-      // factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
-      //    remoteAddress, remotePort), Integer.MAX_VALUE));
-      factories.add(new LDAPConnectionFactory(remoteAddress, remotePort));
+      factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
+          remoteAddress, remotePort), Integer.MAX_VALUE));
+      bindFactories.add(Connections.newConnectionPool(
+          new LDAPConnectionFactory(remoteAddress, remotePort),
+          Integer.MAX_VALUE));
     }
     final RoundRobinLoadBalancingAlgorithm algorithm = new RoundRobinLoadBalancingAlgorithm(
         factories);
+    final RoundRobinLoadBalancingAlgorithm bindAlgorithm = new RoundRobinLoadBalancingAlgorithm(
+        bindFactories);
     final ConnectionFactory factory = Connections.newLoadBalancer(algorithm);
+    final ConnectionFactory bindFactory = Connections
+        .newLoadBalancer(bindAlgorithm);
 
     // Create listener.
     final LDAPListenerOptions options = new LDAPListenerOptions()
@@ -486,8 +634,8 @@
     LDAPListener listener = null;
     try
     {
-      listener = new LDAPListener(localAddress, localPort, new Proxy(factory),
-          options);
+      listener = new LDAPListener(localAddress, localPort, new Proxy(factory,
+          bindFactory), options);
       System.out.println("Press any key to stop the server...");
       System.in.read();
     }
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
index 42c7f8c..47f6d88 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -173,10 +173,10 @@
       if (!isOperational.get()
           && (pendingConnectFuture == null || pendingConnectFuture.isDone()))
       {
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINEST))
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
         {
-          StaticUtils.DEBUG_LOG.finest(String
-              .format("Attempting connect on factory " + this));
+          StaticUtils.DEBUG_LOG.fine(String
+              .format("Attempting reconnect to offline factory " + this));
         }
         pendingConnectFuture = factory.getAsynchronousConnection(this);
       }
@@ -189,23 +189,29 @@
       if (isOperational.getAndSet(false))
       {
         // Transition from online to offline.
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+        {
+          StaticUtils.DEBUG_LOG.warning(String.format("Connection factory "
+              + factory + " is no longer operational: " + error.getMessage()));
+        }
+
         synchronized (stateLock)
         {
           offlineFactoriesCount++;
           if (offlineFactoriesCount == 1)
           {
             // Enable monitoring.
+            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+            {
+              StaticUtils.DEBUG_LOG.fine(String
+                  .format("Starting monitoring thread"));
+            }
+
             monitoringFuture = scheduler.scheduleWithFixedDelay(
                 new MonitorRunnable(), 0, monitoringInterval,
                 monitoringIntervalTimeUnit);
           }
         }
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG.fine(String.format("Connection factory "
-              + factory + " is no longer operational: " + error.getMessage()));
-        }
       }
     }
 
@@ -216,21 +222,27 @@
       if (!isOperational.getAndSet(true))
       {
         // Transition from offline to online.
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO))
+        {
+          StaticUtils.DEBUG_LOG.info(String.format("Connection factory "
+              + factory + " is now operational"));
+        }
+
         synchronized (stateLock)
         {
           offlineFactoriesCount--;
           if (offlineFactoriesCount == 0)
           {
+            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+            {
+              StaticUtils.DEBUG_LOG.fine(String
+                  .format("Stopping monitoring thread"));
+            }
+
             monitoringFuture.cancel(false);
             monitoringFuture = null;
           }
         }
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG.fine(String.format("Connection factory "
-              + factory + " is now operational"));
-        }
       }
     }
   }
@@ -278,14 +290,14 @@
 
   /**
    * Creates a new abstract load balancing algorithm which will monitor offline
-   * connection factories every 10 seconds using the default scheduler.
+   * connection factories every second using the default scheduler.
    *
    * @param factories
    *          The connection factories.
    */
   AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories)
   {
-    this(factories, 10, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
+    this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
   }
 
 
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
index a7b5f75..e9497da 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,8 +30,11 @@
 
 
 import java.util.Collection;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 
 import org.opends.sdk.requests.*;
@@ -41,6 +44,7 @@
 import com.sun.opends.sdk.util.AsynchronousFutureResult;
 import com.sun.opends.sdk.util.CompletedFutureResult;
 import com.sun.opends.sdk.util.StaticUtils;
+import com.sun.opends.sdk.util.Validator;
 
 
 
@@ -49,36 +53,101 @@
  */
 final class ConnectionPool extends AbstractConnectionFactory
 {
-  // Future used for waiting for pooled connections to become available.
-  private static final class FuturePooledConnection extends
-      AsynchronousFutureResult<AsynchronousConnection>
+
+  /**
+   * This result handler is invoked when an attempt to add a new connection to
+   * the pool completes.
+   */
+  private final class ConnectionResultHandler implements
+      ResultHandler<AsynchronousConnection>
   {
-    private FuturePooledConnection(
-        final ResultHandler<? super AsynchronousConnection> handler)
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleErrorResult(final ErrorResultException error)
     {
-      super(handler);
+      // Connection attempt failed, so decrease the pool size.
+      currentPoolSize.release();
+
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG.fine(String.format(
+            "Connection attempt failed: " + error.getMessage()
+                + " currentPoolSize=%d, poolSize=%d",
+            poolSize - currentPoolSize.availablePermits(), poolSize));
+      }
+
+      QueueElement holder;
+      synchronized (queue)
+      {
+        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
+        {
+          // No waiting futures.
+          return;
+        }
+        else
+        {
+          holder = queue.removeFirst();
+        }
+      }
+
+      // There was waiting future, so close it.
+      holder.getWaitingFuture().handleErrorResult(error);
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleResult(final AsynchronousConnection connection)
+    {
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG.fine(String.format(
+            "Connection attempt succeeded: "
+                + " currentPoolSize=%d, poolSize=%d",
+                poolSize - currentPoolSize.availablePermits(), poolSize));
+      }
+
+      publishConnection(connection);
     }
   }
 
 
 
-  private final class PooledConnectionWapper implements AsynchronousConnection,
-      ConnectionEventListener
+  /**
+   * A pooled connection is passed to the client. It wraps an underlying
+   * "pooled" connection obtained from the underlying factory and lasts until
+   * the client application closes this connection. More specifically, pooled
+   * connections are not actually stored in the internal queue.
+   */
+  private final class PooledConnection implements AsynchronousConnection
   {
+    // Connection event listeners registed against this pooled connection should
+    // have the same life time as the pooled connection.
+    private final List<ConnectionEventListener> listeners =
+      new CopyOnWriteArrayList<ConnectionEventListener>();
+
     private final AsynchronousConnection connection;
 
-    private volatile boolean isClosed;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
 
 
-    private PooledConnectionWapper(final AsynchronousConnection connection)
+    PooledConnection(final AsynchronousConnection connection)
     {
       this.connection = connection;
-      this.connection.addConnectionEventListener(this);
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Void> abandon(final AbandonRequest request)
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
@@ -92,6 +161,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> add(final AddRequest request,
         final ResultHandler<? super Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -106,6 +179,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> add(final AddRequest request,
         final ResultHandler<? super Result> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -122,18 +199,28 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void addConnectionEventListener(
         final ConnectionEventListener listener) throws IllegalStateException,
         NullPointerException
     {
+      Validator.ensureNotNull(listener);
       if (isClosed())
       {
         throw new IllegalStateException();
       }
+      listeners.add(listener);
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<BindResult> bind(final BindRequest request,
         final ResultHandler<? super BindResult> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -148,6 +235,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<BindResult> bind(final BindRequest request,
         final ResultHandler<? super BindResult> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -164,47 +255,51 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void close()
     {
-      synchronized (pool)
+      if (!isClosed.compareAndSet(false, true))
       {
-        if (isClosed)
-        {
-          return;
-        }
-        isClosed = true;
+        // Already closed.
+        return;
+      }
 
-        // Don't put invalid connections back in the pool.
-        if (connection.isValid())
+      // Don't put invalid connections back in the pool.
+      if (connection.isValid())
+      {
+        publishConnection(connection);
+      }
+      else
+      {
+        // The connection may have been disconnected by the remote server, but
+        // the server may still be available. In order to avoid leaving pending
+        // futures hanging indefinitely, we should try to reconnect immediately.
+
+        // Close the dead connection.
+        connection.close();
+
+        // Try to get a new connection to replace it.
+        factory.getAsynchronousConnection(connectionResultHandler);
+
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
         {
-          releaseConnection(connection);
-          return;
+          StaticUtils.DEBUG_LOG.warning(String.format(
+              "Connection no longer valid. "
+                  + "currentPoolSize=%d, poolSize=%d",
+                  poolSize - currentPoolSize.availablePermits(), poolSize));
         }
       }
-
-      // Connection is no longer valid. Close outside of lock
-      connection.removeConnectionEventListener(this);
-      connection.close();
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-      {
-        StaticUtils.DEBUG_LOG.warning(String.format(
-            "Dead connection released and closed. "
-                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
-      }
-
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-      {
-        StaticUtils.DEBUG_LOG.warning(String.format(
-            "Reconnect attempt starting. "
-                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
-      }
-      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void close(final UnbindRequest request, final String reason)
         throws NullPointerException
     {
@@ -213,6 +308,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<CompareResult> compare(final CompareRequest request,
         final ResultHandler<? super CompareResult> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -227,6 +326,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<CompareResult> compare(final CompareRequest request,
         final ResultHandler<? super CompareResult> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -243,48 +346,10 @@
 
 
 
-    public void handleConnectionClosed()
-    {
-      // Ignore - we intercept close via the close method.
-    }
-
-
-
-    public void handleConnectionError(final boolean isDisconnectNotification,
-        final ErrorResultException error)
-    {
-      // Remove this connection from the pool if its in there. If not,
-      // just ignore and wait for the user to close and we can deal with it
-      // there.
-      if (pool.remove(this))
-      {
-        numConnections--;
-        connection.removeConnectionEventListener(this);
-
-        // FIXME: should still close the connection, but we need to be
-        // careful that users of the pooled connection get a sensible
-        // error if they continue to use it (i.e. not an NPE or ISE).
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-        {
-          StaticUtils.DEBUG_LOG.warning(String.format(
-              "Connection error occured and removed from pool: "
-                  + error.getMessage()
-                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-              numConnections, pool.size(), pendingFutures.size()));
-        }
-      }
-    }
-
-
-
-    public void handleUnsolicitedNotification(final ExtendedResult notification)
-    {
-      // Ignore
-    }
-
-
-
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> delete(final DeleteRequest request,
         final ResultHandler<? super Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -299,6 +364,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> delete(final DeleteRequest request,
         final ResultHandler<? super Result> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -315,6 +384,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public <R extends ExtendedResult> FutureResult<R> extendedRequest(
         final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -329,6 +402,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public <R extends ExtendedResult> FutureResult<R> extendedRequest(
         final ExtendedRequest<R> request,
         final ResultHandler<? super R> resultHandler,
@@ -349,6 +426,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public Connection getSynchronousConnection()
     {
       return new SynchronousConnection(this);
@@ -359,20 +437,29 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public boolean isClosed()
     {
-      return isClosed;
+      return isClosed.get();
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public boolean isValid()
     {
-      return !isClosed && connection.isValid();
+      return connection.isValid() && !isClosed();
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> modify(final ModifyRequest request,
         final ResultHandler<? super Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -387,6 +474,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> modify(final ModifyRequest request,
         final ResultHandler<? super Result> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -403,6 +494,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> modifyDN(final ModifyDNRequest request,
         final ResultHandler<? super Result> handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -417,6 +512,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> modifyDN(final ModifyDNRequest request,
         final ResultHandler<? super Result> resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -436,6 +535,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<SearchResultEntry> readEntry(final DN name,
         final Collection<String> attributeDescriptions,
         final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -454,6 +554,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<RootDSE> readRootDSE(
         final ResultHandler<? super RootDSE> handler)
         throws UnsupportedOperationException, IllegalStateException
@@ -470,6 +571,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Schema> readSchema(final DN name,
         final ResultHandler<? super Schema> handler)
         throws UnsupportedOperationException, IllegalStateException
@@ -486,6 +588,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<Schema> readSchemaForEntry(final DN name,
         final ResultHandler<? super Schema> handler)
         throws UnsupportedOperationException, IllegalStateException
@@ -499,17 +602,27 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void removeConnectionEventListener(
         final ConnectionEventListener listener) throws NullPointerException
     {
+      Validator.ensureNotNull(listener);
       if (isClosed())
       {
         throw new IllegalStateException();
       }
+      listeners.remove(listener);
     }
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> search(final SearchRequest request,
         final SearchResultHandler handler)
         throws UnsupportedOperationException, IllegalStateException,
@@ -524,6 +637,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public FutureResult<Result> search(final SearchRequest request,
         final SearchResultHandler resultHandler,
         final IntermediateResponseHandler intermediateResponseHandler)
@@ -543,6 +660,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public FutureResult<SearchResultEntry> searchSingleEntry(
         final SearchRequest request,
         final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -561,9 +679,10 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public String toString()
     {
-      StringBuilder builder = new StringBuilder();
+      final StringBuilder builder = new StringBuilder();
       builder.append("PooledConnection(");
       builder.append(connection);
       builder.append(')');
@@ -573,63 +692,86 @@
 
 
 
-  private class ReconnectHandler implements
-      ResultHandler<AsynchronousConnection>
+  /**
+   * A queue element is either a pending connection request future awaiting an
+   * {@code AsynchronousConnection} or it is an unused
+   * {@code AsynchronousConnection} awaiting a connection request.
+   */
+  private static final class QueueElement
   {
-    public void handleErrorResult(final ErrorResultException error)
-    {
-      // The reconnect failed. Fail the connect attempt.
-      numConnections--;
-      // The reconnect failed. The underlying connection factory
-      // probably went
-      // down. Just fail all pending futures
-      synchronized (pool)
-      {
-        while (!pendingFutures.isEmpty())
-        {
-          pendingFutures.poll().handleErrorResult(error);
-        }
-      }
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
-      {
-        StaticUtils.DEBUG_LOG.warning(String.format(
-            "Reconnect failed. Failed all pending futures: "
-                + error.getMessage()
-                + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
-      }
+    private final Object value;
 
+
+
+    QueueElement(final AsynchronousConnection connection)
+    {
+      this.value = connection;
     }
 
 
 
-    public void handleResult(final AsynchronousConnection connection)
+    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
     {
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
+    }
+
+
+
+    AsynchronousConnection getWaitingConnection()
+    {
+      if (value instanceof AsynchronousConnection)
       {
-        StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. "
-            + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
+        return (AsynchronousConnection) value;
       }
-      synchronized (pool)
+      else
       {
-        releaseConnection(connection);
+        throw new IllegalStateException();
       }
     }
+
+
+
+    @SuppressWarnings("unchecked")
+    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
+    {
+      if (value instanceof AsynchronousFutureResult)
+      {
+        return (AsynchronousFutureResult<AsynchronousConnection>) value;
+      }
+      else
+      {
+        throw new IllegalStateException();
+      }
+    }
+
+
+
+    boolean isWaitingFuture()
+    {
+      return value instanceof AsynchronousFutureResult;
+    }
+
+
+
+    public String toString()
+    {
+      return String.valueOf(value);
+    }
   }
 
 
 
-  private final ConnectionFactory connectionFactory;
+  // Guarded by queue.
+  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
 
-  private volatile int numConnections;
+  private final ConnectionFactory factory;
 
   private final int poolSize;
 
-  // FIXME: should use a better collection than this - CLQ?
-  private final Queue<AsynchronousConnection> pool;
+  private final Semaphore currentPoolSize;
 
-  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
+  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
+    new ConnectionResultHandler();
 
 
 
@@ -637,109 +779,16 @@
    * Creates a new connection pool which will maintain {@code poolSize}
    * connections created using the provided connection factory.
    *
-   * @param connectionFactory
+   * @param factory
    *          The connection factory to use for creating new connections.
    * @param poolSize
    *          The maximum size of the connection pool.
    */
-  ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize)
+  ConnectionPool(final ConnectionFactory factory, final int poolSize)
   {
-    this.connectionFactory = connectionFactory;
+    this.factory = factory;
     this.poolSize = poolSize;
-    this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
-    this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
-  }
-
-
-
-  @Override
-  public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
-      final ResultHandler<? super AsynchronousConnection> handler)
-  {
-    // This entire method is synchronized to ensure new connects are
-    // done synchronously to avoid the "pending connect" case.
-    AsynchronousConnection conn;
-    synchronized (pool)
-    {
-      // Check to see if we have a connection in the pool
-      conn = pool.poll();
-      if (conn == null)
-      {
-        // Pool was empty. Maybe a new connection if pool size is not
-        // reached
-        if (numConnections >= poolSize)
-        {
-          // We reached max # of conns so wait for a connection to
-          // become available.
-          final FuturePooledConnection future = new FuturePooledConnection(
-              handler);
-          pendingFutures.add(future);
-
-          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-          {
-            StaticUtils.DEBUG_LOG.finest(String.format(
-                "No connections available. Wait-listed"
-                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                numConnections, pool.size(), pendingFutures.size()));
-          }
-
-          return future;
-        }
-      }
-    }
-
-    if (conn == null)
-    {
-      try
-      {
-        // We can create a new connection.
-        conn = connectionFactory.getAsynchronousConnection(null).get();
-        numConnections++;
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG.finest(String.format(
-              "New connection established and aquired. "
-                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-              numConnections, pool.size(), pendingFutures.size()));
-        }
-      }
-      catch (final ErrorResultException e)
-      {
-        if (handler != null)
-        {
-          handler.handleErrorResult(e);
-        }
-        return new CompletedFutureResult<AsynchronousConnection>(e);
-      }
-      catch (final InterruptedException e)
-      {
-        final ErrorResultException error = new ErrorResultException(Responses
-            .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
-        if (handler != null)
-        {
-          handler.handleErrorResult(error);
-        }
-        return new CompletedFutureResult<AsynchronousConnection>(error);
-      }
-    }
-    else
-    {
-      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-      {
-        StaticUtils.DEBUG_LOG.finest(String.format(
-            "Connection aquired from pool. "
-                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-            numConnections, pool.size(), pendingFutures.size()));
-      }
-    }
-
-    final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-        conn);
-    if (handler != null)
-    {
-      handler.handleResult(pooledConnection);
-    }
-    return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+    this.currentPoolSize = new Semaphore(poolSize);
   }
 
 
@@ -747,11 +796,59 @@
   /**
    * {@inheritDoc}
    */
+  @Override
+  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
+      final ResultHandler<? super AsynchronousConnection> handler)
+  {
+    QueueElement holder;
+    synchronized (queue)
+    {
+      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
+      {
+        holder = new QueueElement(handler);
+        queue.add(holder);
+      }
+      else
+      {
+        holder = queue.removeFirst();
+      }
+    }
+
+    if (!holder.isWaitingFuture())
+    {
+      // There was a completed connection attempt.
+      final AsynchronousConnection connection = holder.getWaitingConnection();
+      final PooledConnection pooledConnection = new PooledConnection(connection);
+      if (handler != null)
+      {
+        handler.handleResult(pooledConnection);
+      }
+      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+    }
+    else
+    {
+      // Grow the pool if needed.
+      final FutureResult<AsynchronousConnection> future = holder
+          .getWaitingFuture();
+      if (!future.isDone() && currentPoolSize.tryAcquire())
+      {
+        factory.getAsynchronousConnection(connectionResultHandler);
+      }
+      return future;
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public String toString()
   {
     final StringBuilder builder = new StringBuilder();
     builder.append("ConnectionPool(");
-    builder.append(String.valueOf(connectionFactory));
+    builder.append(String.valueOf(factory));
     builder.append(',');
     builder.append(poolSize);
     builder.append(')');
@@ -760,47 +857,25 @@
 
 
 
-  private void releaseConnection(final AsynchronousConnection connection)
+  private void publishConnection(final AsynchronousConnection connection)
   {
-    // See if there waiters pending.
-    for (;;)
+    QueueElement holder;
+    synchronized (queue)
     {
-      final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-          connection);
-      final FuturePooledConnection future = pendingFutures.poll();
-
-      if (future == null)
+      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
       {
-        // No waiters - so drop out and add connection to pool.
-        break;
-      }
-
-      future.handleResult(pooledConnection);
-
-      if (!future.isCancelled())
-      {
-        // The future was not cancelled and the connection was
-        // accepted.
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG.finest(String.format(
-              "Connection released and directly "
-                  + "given to waiter. numConnections: %d, poolSize: %d, "
-                  + "pendingFutures: %d", numConnections, pool.size(),
-              pendingFutures.size()));
-        }
+        holder = new QueueElement(connection);
+        queue.add(holder);
         return;
       }
+      else
+      {
+        holder = queue.removeFirst();
+      }
     }
 
-    // No waiters. Put back in pool.
-    pool.offer(connection);
-    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-    {
-      StaticUtils.DEBUG_LOG.finest(String.format(
-          "Connection released to pool. numConnections: %d, "
-              + "poolSize: %d, pendingFutures: %d", numConnections,
-          pool.size(), pendingFutures.size()));
-    }
+    // There was waiting future, so close it.
+    final PooledConnection pooledConnection = new PooledConnection(connection);
+    holder.getWaitingFuture().handleResult(pooledConnection);
   }
 }
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
index 0fed20d..11f21d4 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
@@ -70,7 +70,7 @@
 
   /**
    * Creates a new fail-over load balancing algorithm which will monitor offline
-   * connection factories every 10 seconds using the default scheduler.
+   * connection factories every 1 second using the default scheduler.
    *
    * @param factories
    *          The ordered collection of connection factories.
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java
index 2968565..24c8559 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java
@@ -69,7 +69,7 @@
 
   /**
    * Creates a new round robin load balancing algorithm which will monitor
-   * offline connection factories every 10 seconds using the default scheduler.
+   * offline connection factories every 1 second using the default scheduler.
    *
    * @param factories
    *          The ordered collection of connection factories.
diff --git a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java
index e6195c8..e89e534 100644
--- a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java
+++ b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java
@@ -36,14 +36,18 @@
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
 
 import org.opends.sdk.requests.DigestMD5SASLBindRequest;
 import org.opends.sdk.requests.Requests;
 import org.opends.sdk.requests.SearchRequest;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.annotations.DataProvider;
 
+import com.sun.opends.sdk.util.StaticUtils;
+
 import javax.net.ssl.SSLContext;
 
 
@@ -101,8 +105,30 @@
 
 
 
+  /**
+   * Disables logging before the tests.
+   */
+  @BeforeClass()
+  public void disableLogging()
+  {
+    StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE);
+  }
+
+
+
+  /**
+   * Re-enable logging after the tests.
+   */
+  @AfterClass()
+  public void enableLogging()
+  {
+    StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
+  }
+
+
+
   @DataProvider(name = "connectionFactories")
-  public Object[][] getConnectyionFactories() throws Exception
+  public Object[][] getConnectionFactories() throws Exception
   {
     Object[][] factories = new Object[21][1];
 
diff --git a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java
index 2b58e3b..1cefb8b 100644
--- a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java
+++ b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java
@@ -32,13 +32,17 @@
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
 
 import org.opends.sdk.requests.*;
 import org.opends.sdk.responses.*;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.sun.opends.sdk.util.StaticUtils;
+
 
 
 /**
@@ -55,16 +59,18 @@
 
 
 
-    public void handleUnsolicitedNotification(ExtendedResult notification)
+    @Override
+    public void handleConnectionClosed()
     {
-      errorMessage = "Unexpected call to handleUnsolicitedNotification";
+      errorMessage = "Unexpected call to handleConnectionClosed";
       closeLatch.countDown();
     }
 
 
 
-    public void handleConnectionError(boolean isDisconnectNotification,
-        ErrorResultException error)
+    @Override
+    public void handleConnectionError(final boolean isDisconnectNotification,
+        final ErrorResultException error)
     {
       errorMessage = "Unexpected call to handleConnectionError";
       closeLatch.countDown();
@@ -72,9 +78,10 @@
 
 
 
-    public void handleConnectionClosed()
+    @Override
+    public void handleUnsolicitedNotification(final ExtendedResult notification)
     {
-      errorMessage = "Unexpected call to handleConnectionClosed";
+      errorMessage = "Unexpected call to handleUnsolicitedNotification";
       closeLatch.countDown();
     }
   }
@@ -85,7 +92,7 @@
       ServerConnection<Integer>
   {
     volatile LDAPClientContext context = null;
-    volatile boolean isConnected = false;
+    final CountDownLatch isConnected = new CountDownLatch(1);
     final CountDownLatch isClosed = new CountDownLatch(1);
 
 
@@ -294,7 +301,7 @@
         final LDAPClientContext clientContext) throws ErrorResultException
     {
       serverConnection.context = clientContext;
-      serverConnection.isConnected = true;
+      serverConnection.isConnected.countDown();
       return serverConnection;
     }
   }
@@ -302,6 +309,233 @@
 
 
   /**
+   * Disables logging before the tests.
+   */
+  @BeforeClass()
+  public void disableLogging()
+  {
+    StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE);
+  }
+
+
+
+  /**
+   * Re-enable logging after the tests.
+   */
+  @AfterClass()
+  public void enableLogging()
+  {
+    StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
+  }
+
+
+
+  /**
+   * Tests connection event listener.
+   *
+   * @throws Exception
+   *           If an unexpected error occurred.
+   */
+  @Test
+  public void testConnectionEventListenerClose() throws Exception
+  {
+    final MockServerConnection onlineServerConnection = new MockServerConnection();
+    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
+        onlineServerConnection);
+    final LDAPListener onlineServerListener = new LDAPListener("localhost",
+        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
+
+    final Connection connection;
+    try
+    {
+      // Connect and bind.
+      connection = new LDAPConnectionFactory(
+          onlineServerListener.getSocketAddress()).getConnection();
+
+      final MockConnectionEventListener listener = new MockConnectionEventListener()
+      {
+
+        @Override
+        public void handleConnectionClosed()
+        {
+          closeLatch.countDown();
+        }
+      };
+
+      connection.addConnectionEventListener(listener);
+      Assert.assertEquals(listener.closeLatch.getCount(), 1);
+      connection.close();
+      listener.closeLatch.await();
+      Assert.assertNull(listener.errorMessage);
+    }
+    finally
+    {
+      onlineServerListener.close();
+    }
+  }
+
+
+
+  /**
+   * Tests connection event listener.
+   *
+   * @throws Exception
+   *           If an unexpected error occurred.
+   */
+  @Test
+  public void testConnectionEventListenerDisconnect() throws Exception
+  {
+    final MockServerConnection onlineServerConnection = new MockServerConnection();
+    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
+        onlineServerConnection);
+    final LDAPListener onlineServerListener = new LDAPListener("localhost",
+        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
+
+    final Connection connection;
+    try
+    {
+      // Connect and bind.
+      connection = new LDAPConnectionFactory(
+          onlineServerListener.getSocketAddress()).getConnection();
+
+      final MockConnectionEventListener listener = new MockConnectionEventListener()
+      {
+
+        @Override
+        public void handleConnectionError(
+            final boolean isDisconnectNotification,
+            final ErrorResultException error)
+        {
+          if (isDisconnectNotification)
+          {
+            errorMessage = "Unexpected disconnect notification";
+          }
+          closeLatch.countDown();
+        }
+      };
+
+      connection.addConnectionEventListener(listener);
+      Assert.assertEquals(listener.closeLatch.getCount(), 1);
+      Assert.assertTrue(onlineServerConnection.isConnected
+          .await(10, TimeUnit.SECONDS));
+      onlineServerConnection.context.disconnect();
+      listener.closeLatch.await();
+      Assert.assertNull(listener.errorMessage);
+      connection.close();
+    }
+    finally
+    {
+      onlineServerListener.close();
+    }
+  }
+
+
+
+  /**
+   * Tests connection event listener.
+   *
+   * @throws Exception
+   *           If an unexpected error occurred.
+   */
+  @Test
+  public void testConnectionEventListenerDisconnectNotification()
+      throws Exception
+  {
+    final MockServerConnection onlineServerConnection = new MockServerConnection();
+    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
+        onlineServerConnection);
+    final LDAPListener onlineServerListener = new LDAPListener("localhost",
+        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
+
+    final Connection connection;
+    try
+    {
+      // Connect and bind.
+      connection = new LDAPConnectionFactory(
+          onlineServerListener.getSocketAddress()).getConnection();
+
+      final MockConnectionEventListener listener = new MockConnectionEventListener()
+      {
+
+        @Override
+        public void handleConnectionError(
+            final boolean isDisconnectNotification,
+            final ErrorResultException error)
+        {
+          if (!isDisconnectNotification
+              || !error.getResult().getResultCode().equals(ResultCode.BUSY)
+              || !error.getResult().getDiagnosticMessage().equals("test"))
+          {
+            errorMessage = "Missing disconnect notification: " + error;
+          }
+          closeLatch.countDown();
+        }
+      };
+
+      connection.addConnectionEventListener(listener);
+      Assert.assertEquals(listener.closeLatch.getCount(), 1);
+      Assert.assertTrue(onlineServerConnection.isConnected
+          .await(10, TimeUnit.SECONDS));
+      onlineServerConnection.context.disconnect(ResultCode.BUSY, "test");
+      listener.closeLatch.await();
+      Assert.assertNull(listener.errorMessage);
+      connection.close();
+    }
+    finally
+    {
+      onlineServerListener.close();
+    }
+  }
+
+
+
+  /**
+   * Tests connection event listener.
+   *
+   * @throws Exception
+   *           If an unexpected error occurred.
+   */
+  @Test
+  public void testConnectionEventListenerUnbind() throws Exception
+  {
+    final MockServerConnection onlineServerConnection = new MockServerConnection();
+    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
+        onlineServerConnection);
+    final LDAPListener onlineServerListener = new LDAPListener("localhost",
+        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
+
+    final Connection connection;
+    try
+    {
+      // Connect and bind.
+      connection = new LDAPConnectionFactory(
+          onlineServerListener.getSocketAddress()).getConnection();
+
+      final MockConnectionEventListener listener = new MockConnectionEventListener()
+      {
+
+        @Override
+        public void handleConnectionClosed()
+        {
+          closeLatch.countDown();
+        }
+      };
+
+      connection.addConnectionEventListener(listener);
+      Assert.assertEquals(listener.closeLatch.getCount(), 1);
+      connection.close(Requests.newUnbindRequest(), "called from unit test");
+      listener.closeLatch.await();
+      Assert.assertNull(listener.errorMessage);
+    }
+    finally
+    {
+      onlineServerListener.close();
+    }
+  }
+
+
+
+  /**
    * Tests basic LDAP listener functionality.
    *
    * @throws Exception
@@ -318,10 +552,15 @@
     try
     {
       // Connect and close.
-      new LDAPConnectionFactory(listener.getSocketAddress()).getConnection()
-          .close();
+      final Connection connection = new LDAPConnectionFactory(
+          listener.getSocketAddress()).getConnection();
 
-      Assert.assertTrue(serverConnection.isConnected);
+      Assert.assertTrue(serverConnection.isConnected
+          .await(10, TimeUnit.SECONDS));
+      Assert.assertEquals(serverConnection.isClosed.getCount(), 1);
+
+      connection.close();
+
       Assert.assertTrue(serverConnection.isClosed.await(10, TimeUnit.SECONDS));
     }
     finally
@@ -401,14 +640,18 @@
       try
       {
         // Connect and close.
-        new LDAPConnectionFactory(proxyListener.getSocketAddress())
-            .getConnection().close();
+        final Connection connection = new LDAPConnectionFactory(
+            proxyListener.getSocketAddress()).getConnection();
+
+        Assert.assertTrue(proxyServerConnection.isConnected.await(10,
+            TimeUnit.SECONDS));
+        Assert.assertTrue(onlineServerConnection.isConnected.await(10,
+            TimeUnit.SECONDS));
 
         // Wait for connect/close to complete.
-        proxyServerConnection.isClosed.await();
+        connection.close();
 
-        Assert.assertTrue(proxyServerConnection.isConnected);
-        Assert.assertTrue(onlineServerConnection.isConnected);
+        proxyServerConnection.isClosed.await();
       }
       finally
       {
@@ -506,6 +749,11 @@
         try
         {
           connection.bind("cn=test", "password");
+
+          Assert.assertTrue(proxyServerConnection.isConnected.await(10,
+              TimeUnit.SECONDS));
+          Assert.assertTrue(onlineServerConnection.isConnected.await(10,
+              TimeUnit.SECONDS));
         }
         finally
         {
@@ -514,9 +762,6 @@
 
         // Wait for connect/close to complete.
         proxyServerConnection.isClosed.await();
-
-        Assert.assertTrue(proxyServerConnection.isConnected);
-        Assert.assertTrue(onlineServerConnection.isConnected);
       }
       finally
       {
@@ -602,14 +847,18 @@
       try
       {
         // Connect and close.
-        new LDAPConnectionFactory(proxyListener.getSocketAddress())
-            .getConnection().close();
+        final Connection connection = new LDAPConnectionFactory(
+            proxyListener.getSocketAddress()).getConnection();
+
+        Assert.assertTrue(proxyServerConnection.isConnected.await(10,
+            TimeUnit.SECONDS));
+        Assert.assertTrue(onlineServerConnection.isConnected.await(10,
+            TimeUnit.SECONDS));
+
+        connection.close();
 
         // Wait for connect/close to complete.
         proxyServerConnection.isClosed.await();
-
-        Assert.assertTrue(proxyServerConnection.isConnected);
-        Assert.assertTrue(onlineServerConnection.isConnected);
       }
       finally
       {
@@ -713,6 +962,11 @@
         try
         {
           connection.bind("cn=test", "password");
+
+          Assert.assertTrue(proxyServerConnection.isConnected.await(10,
+              TimeUnit.SECONDS));
+          Assert.assertTrue(onlineServerConnection.isConnected.await(10,
+              TimeUnit.SECONDS));
         }
         finally
         {
@@ -721,9 +975,6 @@
 
         // Wait for connect/close to complete.
         proxyServerConnection.isClosed.await();
-
-        Assert.assertTrue(proxyServerConnection.isConnected);
-        Assert.assertTrue(onlineServerConnection.isConnected);
       }
       finally
       {
@@ -763,7 +1014,7 @@
       {
         connection.bind("cn=test", "password");
       }
-      catch (ErrorResultException e)
+      catch (final ErrorResultException e)
       {
         connection.close();
         throw e;
@@ -785,7 +1036,7 @@
       Assert
           .fail("Connection attempt to closed listener succeeded unexpectedly");
     }
-    catch (ConnectionException e)
+    catch (final ConnectionException e)
     {
       // Expected.
     }
@@ -795,7 +1046,7 @@
       connection.bind("cn=test", "password");
       Assert.fail("Bind attempt on closed connection succeeded unexpectedly");
     }
-    catch (ErrorResultException e)
+    catch (final ErrorResultException e)
     {
       // Expected.
       Assert.assertFalse(connection.isValid());
@@ -808,199 +1059,4 @@
       Assert.assertTrue(connection.isClosed());
     }
   }
-
-
-
-  /**
-   * Tests connection event listener.
-   *
-   * @throws Exception
-   *           If an unexpected error occurred.
-   */
-  @Test
-  public void testConnectionEventListenerClose() throws Exception
-  {
-    final MockServerConnection onlineServerConnection = new MockServerConnection();
-    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
-        onlineServerConnection);
-    final LDAPListener onlineServerListener = new LDAPListener("localhost",
-        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
-
-    final Connection connection;
-    try
-    {
-      // Connect and bind.
-      connection = new LDAPConnectionFactory(
-          onlineServerListener.getSocketAddress()).getConnection();
-
-      MockConnectionEventListener listener = new MockConnectionEventListener()
-      {
-
-        public void handleConnectionClosed()
-        {
-          closeLatch.countDown();
-        }
-      };
-
-      connection.addConnectionEventListener(listener);
-      Assert.assertEquals(listener.closeLatch.getCount(), 1);
-      connection.close();
-      listener.closeLatch.await();
-      Assert.assertNull(listener.errorMessage);
-    }
-    finally
-    {
-      onlineServerListener.close();
-    }
-  }
-
-
-
-  /**
-   * Tests connection event listener.
-   *
-   * @throws Exception
-   *           If an unexpected error occurred.
-   */
-  @Test
-  public void testConnectionEventListenerUnbind() throws Exception
-  {
-    final MockServerConnection onlineServerConnection = new MockServerConnection();
-    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
-        onlineServerConnection);
-    final LDAPListener onlineServerListener = new LDAPListener("localhost",
-        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
-
-    final Connection connection;
-    try
-    {
-      // Connect and bind.
-      connection = new LDAPConnectionFactory(
-          onlineServerListener.getSocketAddress()).getConnection();
-
-      MockConnectionEventListener listener = new MockConnectionEventListener()
-      {
-
-        public void handleConnectionClosed()
-        {
-          closeLatch.countDown();
-        }
-      };
-
-      connection.addConnectionEventListener(listener);
-      Assert.assertEquals(listener.closeLatch.getCount(), 1);
-      connection.close(Requests.newUnbindRequest(), "called from unit test");
-      listener.closeLatch.await();
-      Assert.assertNull(listener.errorMessage);
-    }
-    finally
-    {
-      onlineServerListener.close();
-    }
-  }
-
-
-
-  /**
-   * Tests connection event listener.
-   *
-   * @throws Exception
-   *           If an unexpected error occurred.
-   */
-  @Test
-  public void testConnectionEventListenerDisconnect() throws Exception
-  {
-    final MockServerConnection onlineServerConnection = new MockServerConnection();
-    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
-        onlineServerConnection);
-    final LDAPListener onlineServerListener = new LDAPListener("localhost",
-        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
-
-    final Connection connection;
-    try
-    {
-      // Connect and bind.
-      connection = new LDAPConnectionFactory(
-          onlineServerListener.getSocketAddress()).getConnection();
-
-      MockConnectionEventListener listener = new MockConnectionEventListener()
-      {
-
-        public void handleConnectionError(boolean isDisconnectNotification,
-            ErrorResultException error)
-        {
-          if (isDisconnectNotification)
-          {
-            errorMessage = "Unexpected disconnect notification";
-          }
-          closeLatch.countDown();
-        }
-      };
-
-      connection.addConnectionEventListener(listener);
-      Assert.assertEquals(listener.closeLatch.getCount(), 1);
-      onlineServerConnection.context.disconnect();
-      listener.closeLatch.await();
-      Assert.assertNull(listener.errorMessage);
-      connection.close();
-    }
-    finally
-    {
-      onlineServerListener.close();
-    }
-  }
-
-
-
-  /**
-   * Tests connection event listener.
-   *
-   * @throws Exception
-   *           If an unexpected error occurred.
-   */
-  @Test
-  public void testConnectionEventListenerDisconnectNotification()
-      throws Exception
-  {
-    final MockServerConnection onlineServerConnection = new MockServerConnection();
-    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
-        onlineServerConnection);
-    final LDAPListener onlineServerListener = new LDAPListener("localhost",
-        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
-
-    final Connection connection;
-    try
-    {
-      // Connect and bind.
-      connection = new LDAPConnectionFactory(
-          onlineServerListener.getSocketAddress()).getConnection();
-
-      MockConnectionEventListener listener = new MockConnectionEventListener()
-      {
-
-        public void handleConnectionError(boolean isDisconnectNotification,
-            ErrorResultException error)
-        {
-          if (!isDisconnectNotification
-              || !error.getResult().getResultCode().equals(ResultCode.BUSY)
-              || !error.getResult().getDiagnosticMessage().equals("test"))
-          {
-            errorMessage = "Missing disconnect notification: " + error;
-          }
-          closeLatch.countDown();
-        }
-      };
-
-      connection.addConnectionEventListener(listener);
-      Assert.assertEquals(listener.closeLatch.getCount(), 1);
-      onlineServerConnection.context.disconnect(ResultCode.BUSY, "test");
-      listener.closeLatch.await();
-      Assert.assertNull(listener.errorMessage);
-      connection.close();
-    }
-    finally
-    {
-      onlineServerListener.close();
-    }
-  }
 }
diff --git a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java
index 99a903f..9847b93 100644
--- a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java
+++ b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2009 Sun Microsystems, Inc.
+ *      Copyright 2009-2010 Sun Microsystems, Inc.
  */
 
 package org.opends.sdk;
@@ -55,8 +55,7 @@
   //
   // This could be a problem if a subclass references a @DataProvider in
   // a super-class that provides static parameters, i.e. the parameters
-  // are
-  // not regenerated for each invocation of the DataProvider.
+  // are not regenerated for each invocation of the DataProvider.
   //
 
   /**

--
Gitblit v1.10.0