mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
19.36.2010 9f2bba679ab597f1e50078a29d145100e3baed3c
Make ConnectionPool implementation fully async and fix some race conditions in the unit tests.
8 files modified
1612 ■■■■■ changed files
sdk/examples/org/opends/sdk/examples/server/proxy/Main.java 432 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java 46 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/ConnectionPool.java 589 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java 2 ●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java 2 ●●● patch | view | raw | blame | history
sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java 28 ●●●●● patch | view | raw | blame | history
sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java 508 ●●●●● patch | view | raw | blame | history
sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java 5 ●●●●● patch | view | raw | blame | history
sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
@@ -36,11 +36,9 @@
import java.util.List;
import org.opends.sdk.*;
import org.opends.sdk.controls.ProxiedAuthV2RequestControl;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.BindResult;
import org.opends.sdk.responses.CompareResult;
import org.opends.sdk.responses.ExtendedResult;
import org.opends.sdk.responses.Result;
import org.opends.sdk.responses.*;
@@ -73,37 +71,127 @@
        ServerConnection<Integer>
    {
      private volatile AsynchronousConnection connection = null;
      private volatile boolean isUnbindRequired = false;
      private AsynchronousConnection getConnection()
          throws ErrorResultException
      private abstract class AbstractRequestCompletionHandler
          <R extends Result, H extends ResultHandler<? super R>>
          implements ResultHandler<R>
      {
        if (connection == null)
        final H resultHandler;
        final AsynchronousConnection connection;
        AbstractRequestCompletionHandler(
            final AsynchronousConnection connection, final H resultHandler)
        {
          synchronized (this)
          {
            if (connection == null)
            {
              try
              {
                connection = factory.getAsynchronousConnection(null).get();
              }
              catch (InterruptedException e)
              {
                throw new RuntimeException(e);
              }
            }
          }
          this.connection = connection;
          this.resultHandler = resultHandler;
        }
        return connection;
        @Override
        public final void handleErrorResult(final ErrorResultException error)
        {
          connection.close();
          resultHandler.handleErrorResult(error);
        }
        @Override
        public final void handleResult(final R result)
        {
          connection.close();
          resultHandler.handleResult(result);
        }
      }
      private abstract class ConnectionCompletionHandler<R extends Result>
          implements ResultHandler<AsynchronousConnection>
      {
        private final ResultHandler<? super R> resultHandler;
        ConnectionCompletionHandler(final ResultHandler<? super R> resultHandler)
        {
          this.resultHandler = resultHandler;
        }
        @Override
        public final void handleErrorResult(final ErrorResultException error)
        {
          resultHandler.handleErrorResult(error);
        }
        @Override
        public abstract void handleResult(AsynchronousConnection connection);
      }
      private final class RequestCompletionHandler<R extends Result> extends
          AbstractRequestCompletionHandler<R, ResultHandler<? super R>>
      {
        RequestCompletionHandler(final AsynchronousConnection connection,
            final ResultHandler<? super R> resultHandler)
        {
          super(connection, resultHandler);
        }
      }
      private final class SearchRequestCompletionHandler extends
          AbstractRequestCompletionHandler<Result, SearchResultHandler>
          implements SearchResultHandler
      {
        SearchRequestCompletionHandler(final AsynchronousConnection connection,
            final SearchResultHandler resultHandler)
        {
          super(connection, resultHandler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public final boolean handleEntry(final SearchResultEntry entry)
        {
          return resultHandler.handleEntry(entry);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public final boolean handleReference(
            final SearchResultReference reference)
        {
          return resultHandler.handleReference(reference);
        }
      }
      private volatile ProxiedAuthV2RequestControl proxiedAuthControl = null;
      private ServerConnectionImpl(final LDAPClientContext clientContext)
      {
        // Nothing to do.
@@ -133,15 +221,22 @@
          final IntermediateResponseHandler intermediateResponseHandler)
          throws UnsupportedOperationException
      {
        try
        addProxiedAuthControl(request);
        final ConnectionCompletionHandler<Result> outerHandler =
          new ConnectionCompletionHandler<Result>(resultHandler)
        {
          getConnection().add(request, resultHandler,
              intermediateResponseHandler);
        }
        catch (ErrorResultException e)
        {
          resultHandler.handleErrorResult(e);
        }
          @Override
          public void handleResult(final AsynchronousConnection connection)
          {
            final RequestCompletionHandler<Result> innerHandler =
              new RequestCompletionHandler<Result>(connection, resultHandler);
            connection.add(request, innerHandler, intermediateResponseHandler);
          }
        };
        factory.getAsynchronousConnection(outerHandler);
      }
@@ -156,6 +251,7 @@
          final IntermediateResponseHandler intermediateResponseHandler)
          throws UnsupportedOperationException
      {
        if (request.getAuthenticationType() != ((byte) 0x80))
        {
          // TODO: SASL authentication not implemented.
@@ -166,20 +262,45 @@
        }
        else
        {
          // Note that this connection has received a bind request: the
          // connection should be reverted back to anonymous when the client
          // unbinds.
          isUnbindRequired = true;
          // Authenticate using a separate bind connection pool, because we
          // don't want to change the state of the pooled connection.
          final ConnectionCompletionHandler<BindResult> outerHandler =
            new ConnectionCompletionHandler<BindResult>(resultHandler)
          {
          try
          {
            getConnection().bind(request, resultHandler,
                intermediateResponseHandler);
          }
          catch (ErrorResultException e)
          {
            resultHandler.handleErrorResult(e);
          }
            @Override
            public void handleResult(final AsynchronousConnection connection)
            {
              final ResultHandler<BindResult> innerHandler = new ResultHandler<BindResult>()
              {
                @Override
                public final void handleErrorResult(
                    final ErrorResultException error)
                {
                  connection.close();
                  resultHandler.handleErrorResult(error);
                }
                @Override
                public final void handleResult(final BindResult result)
                {
                  connection.close();
                  proxiedAuthControl = ProxiedAuthV2RequestControl
                      .newControl("dn:" + request.getName());
                  resultHandler.handleResult(result);
                }
              };
              connection.bind(request, innerHandler,
                  intermediateResponseHandler);
            }
          };
          proxiedAuthControl = null;
          bindFactory.getAsynchronousConnection(outerHandler);
        }
      }
@@ -195,15 +316,23 @@
          final IntermediateResponseHandler intermediateResponseHandler)
          throws UnsupportedOperationException
      {
        try
        addProxiedAuthControl(request);
        final ConnectionCompletionHandler<CompareResult> outerHandler =
          new ConnectionCompletionHandler<CompareResult>(resultHandler)
        {
          getConnection().compare(request, resultHandler,
              intermediateResponseHandler);
        }
        catch (ErrorResultException e)
        {
          resultHandler.handleErrorResult(e);
        }
          @Override
          public void handleResult(final AsynchronousConnection connection)
          {
            final RequestCompletionHandler<CompareResult> innerHandler =
              new RequestCompletionHandler<CompareResult>(connection, resultHandler);
            connection.compare(request, innerHandler,
                intermediateResponseHandler);
          }
        };
        factory.getAsynchronousConnection(outerHandler);
      }
@@ -215,8 +344,7 @@
      public void handleConnectionClosed(final Integer requestContext,
          final UnbindRequest request)
      {
        // Client connection closed: release the proxy connection.
        close();
        // Client connection closed.
      }
@@ -228,8 +356,7 @@
      public void handleConnectionDisconnected(final ResultCode resultCode,
          final String message)
      {
        // Client disconnected by server: release the proxy connection.
        close();
        // Client disconnected by server.
      }
@@ -240,8 +367,7 @@
      @Override
      public void handleConnectionError(final Throwable error)
      {
        // Client connection failed: release the proxy connection.
        close();
        // Client connection failed.
      }
@@ -256,15 +382,23 @@
          final IntermediateResponseHandler intermediateResponseHandler)
          throws UnsupportedOperationException
      {
        try
        addProxiedAuthControl(request);
        final ConnectionCompletionHandler<Result> outerHandler =
          new ConnectionCompletionHandler<Result>(resultHandler)
        {
          getConnection().delete(request, resultHandler,
              intermediateResponseHandler);
        }
        catch (ErrorResultException e)
        {
          resultHandler.handleErrorResult(e);
        }
          @Override
          public void handleResult(final AsynchronousConnection connection)
          {
            final RequestCompletionHandler<Result> innerHandler =
              new RequestCompletionHandler<Result>(connection, resultHandler);
            connection.delete(request, innerHandler,
                intermediateResponseHandler);
          }
        };
        factory.getAsynchronousConnection(outerHandler);
      }
@@ -296,15 +430,24 @@
        else
        {
          // Forward all other extended operations.
          try
          addProxiedAuthControl(request);
          final ConnectionCompletionHandler<R> outerHandler =
            new ConnectionCompletionHandler<R>(resultHandler)
          {
            getConnection().extendedRequest(request, resultHandler,
                intermediateResponseHandler);
          }
          catch (ErrorResultException e)
          {
            resultHandler.handleErrorResult(e);
          }
            @Override
            public void handleResult(final AsynchronousConnection connection)
            {
              final RequestCompletionHandler<R> innerHandler =
                new RequestCompletionHandler<R>(connection, resultHandler);
              connection.extendedRequest(request, innerHandler,
                  intermediateResponseHandler);
            }
          };
          factory.getAsynchronousConnection(outerHandler);
        }
      }
@@ -320,15 +463,23 @@
          final IntermediateResponseHandler intermediateResponseHandler)
          throws UnsupportedOperationException
      {
        try
        addProxiedAuthControl(request);
        final ConnectionCompletionHandler<Result> outerHandler =
          new ConnectionCompletionHandler<Result>(resultHandler)
        {
          getConnection().modify(request, resultHandler,
              intermediateResponseHandler);
        }
        catch (ErrorResultException e)
        {
          resultHandler.handleErrorResult(e);
        }
          @Override
          public void handleResult(final AsynchronousConnection connection)
          {
            final RequestCompletionHandler<Result> innerHandler =
              new RequestCompletionHandler<Result>(connection, resultHandler);
            connection.modify(request, innerHandler,
                intermediateResponseHandler);
          }
        };
        factory.getAsynchronousConnection(outerHandler);
      }
@@ -343,15 +494,23 @@
          final IntermediateResponseHandler intermediateResponseHandler)
          throws UnsupportedOperationException
      {
        try
        addProxiedAuthControl(request);
        final ConnectionCompletionHandler<Result> outerHandler =
          new ConnectionCompletionHandler<Result>(resultHandler)
        {
          getConnection().modifyDN(request, resultHandler,
              intermediateResponseHandler);
        }
        catch (ErrorResultException e)
        {
          resultHandler.handleErrorResult(e);
        }
          @Override
          public void handleResult(final AsynchronousConnection connection)
          {
            final RequestCompletionHandler<Result> innerHandler =
              new RequestCompletionHandler<Result>(connection, resultHandler);
            connection.modifyDN(request, innerHandler,
                intermediateResponseHandler);
          }
        };
        factory.getAsynchronousConnection(outerHandler);
      }
@@ -365,54 +524,33 @@
          final IntermediateResponseHandler intermediateResponseHandler)
          throws UnsupportedOperationException
      {
        try
        addProxiedAuthControl(request);
        final ConnectionCompletionHandler<Result> outerHandler =
          new ConnectionCompletionHandler<Result>(resultHandler)
        {
          getConnection().search(request, resultHandler,
              intermediateResponseHandler);
        }
        catch (ErrorResultException e)
        {
          resultHandler.handleErrorResult(e);
        }
          @Override
          public void handleResult(final AsynchronousConnection connection)
          {
            final SearchRequestCompletionHandler innerHandler =
              new SearchRequestCompletionHandler(connection, resultHandler);
            connection.search(request, innerHandler,
                intermediateResponseHandler);
          }
        };
        factory.getAsynchronousConnection(outerHandler);
      }
      private void close()
      private void addProxiedAuthControl(final Request request)
      {
        if (isUnbindRequired)
        final ProxiedAuthV2RequestControl control = proxiedAuthControl;
        if (control != null)
        {
          synchronized (this)
          {
            if (connection != null)
            {
              connection.bind(Requests.newSimpleBindRequest(),
                  new ResultHandler<Result>()
                  {
                    public void handleErrorResult(ErrorResultException error)
                    {
                      // The rebind failed - this is bad because if the
                      // connection is pooled it will remain authenticated as
                      // the wrong user.
                      handleResult(error.getResult());
                    }
                    public void handleResult(Result result)
                    {
                      synchronized (ServerConnectionImpl.this)
                      {
                        if (connection != null)
                        {
                          connection.close();
                        }
                      }
                    }
                  });
            }
          }
          request.addControl(control);
        }
      }
@@ -421,12 +559,15 @@
    private final ConnectionFactory factory;
    private final ConnectionFactory bindFactory;
    private Proxy(final ConnectionFactory factory)
    private Proxy(final ConnectionFactory factory,
        final ConnectionFactory bindFactory)
    {
      this.factory = factory;
      this.bindFactory = bindFactory;
    }
@@ -467,18 +608,25 @@
    // Create load balancer.
    final List<ConnectionFactory> factories = new LinkedList<ConnectionFactory>();
    final List<ConnectionFactory> bindFactories = new LinkedList<ConnectionFactory>();
    for (int i = 2; i < args.length; i += 2)
    {
      final String remoteAddress = args[i];
      final int remotePort = Integer.parseInt(args[i + 1]);
      // factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
      //    remoteAddress, remotePort), Integer.MAX_VALUE));
      factories.add(new LDAPConnectionFactory(remoteAddress, remotePort));
      factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
          remoteAddress, remotePort), Integer.MAX_VALUE));
      bindFactories.add(Connections.newConnectionPool(
          new LDAPConnectionFactory(remoteAddress, remotePort),
          Integer.MAX_VALUE));
    }
    final RoundRobinLoadBalancingAlgorithm algorithm = new RoundRobinLoadBalancingAlgorithm(
        factories);
    final RoundRobinLoadBalancingAlgorithm bindAlgorithm = new RoundRobinLoadBalancingAlgorithm(
        bindFactories);
    final ConnectionFactory factory = Connections.newLoadBalancer(algorithm);
    final ConnectionFactory bindFactory = Connections
        .newLoadBalancer(bindAlgorithm);
    // Create listener.
    final LDAPListenerOptions options = new LDAPListenerOptions()
@@ -486,8 +634,8 @@
    LDAPListener listener = null;
    try
    {
      listener = new LDAPListener(localAddress, localPort, new Proxy(factory),
          options);
      listener = new LDAPListener(localAddress, localPort, new Proxy(factory,
          bindFactory), options);
      System.out.println("Press any key to stop the server...");
      System.in.read();
    }
sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -173,10 +173,10 @@
      if (!isOperational.get()
          && (pendingConnectFuture == null || pendingConnectFuture.isDone()))
      {
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINEST))
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String
              .format("Attempting connect on factory " + this));
          StaticUtils.DEBUG_LOG.fine(String
              .format("Attempting reconnect to offline factory " + this));
        }
        pendingConnectFuture = factory.getAsynchronousConnection(this);
      }
@@ -189,23 +189,29 @@
      if (isOperational.getAndSet(false))
      {
        // Transition from online to offline.
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          StaticUtils.DEBUG_LOG.warning(String.format("Connection factory "
              + factory + " is no longer operational: " + error.getMessage()));
        }
        synchronized (stateLock)
        {
          offlineFactoriesCount++;
          if (offlineFactoriesCount == 1)
          {
            // Enable monitoring.
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
            {
              StaticUtils.DEBUG_LOG.fine(String
                  .format("Starting monitoring thread"));
            }
            monitoringFuture = scheduler.scheduleWithFixedDelay(
                new MonitorRunnable(), 0, monitoringInterval,
                monitoringIntervalTimeUnit);
          }
        }
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.fine(String.format("Connection factory "
              + factory + " is no longer operational: " + error.getMessage()));
        }
      }
    }
@@ -216,21 +222,27 @@
      if (!isOperational.getAndSet(true))
      {
        // Transition from offline to online.
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO))
        {
          StaticUtils.DEBUG_LOG.info(String.format("Connection factory "
              + factory + " is now operational"));
        }
        synchronized (stateLock)
        {
          offlineFactoriesCount--;
          if (offlineFactoriesCount == 0)
          {
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
            {
              StaticUtils.DEBUG_LOG.fine(String
                  .format("Stopping monitoring thread"));
            }
            monitoringFuture.cancel(false);
            monitoringFuture = null;
          }
        }
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.fine(String.format("Connection factory "
              + factory + " is now operational"));
        }
      }
    }
  }
@@ -278,14 +290,14 @@
  /**
   * Creates a new abstract load balancing algorithm which will monitor offline
   * connection factories every 10 seconds using the default scheduler.
   * connection factories every second using the default scheduler.
   *
   * @param factories
   *          The connection factories.
   */
  AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories)
  {
    this(factories, 10, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
    this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
  }
sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,8 +30,11 @@
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -41,6 +44,7 @@
import com.sun.opends.sdk.util.AsynchronousFutureResult;
import com.sun.opends.sdk.util.CompletedFutureResult;
import com.sun.opends.sdk.util.StaticUtils;
import com.sun.opends.sdk.util.Validator;
@@ -49,36 +53,101 @@
 */
final class ConnectionPool extends AbstractConnectionFactory
{
  // Future used for waiting for pooled connections to become available.
  private static final class FuturePooledConnection extends
      AsynchronousFutureResult<AsynchronousConnection>
  /**
   * This result handler is invoked when an attempt to add a new connection to
   * the pool completes.
   */
  private final class ConnectionResultHandler implements
      ResultHandler<AsynchronousConnection>
  {
    private FuturePooledConnection(
        final ResultHandler<? super AsynchronousConnection> handler)
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleErrorResult(final ErrorResultException error)
    {
      super(handler);
      // Connection attempt failed, so decrease the pool size.
      currentPoolSize.release();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt failed: " + error.getMessage()
                + " currentPoolSize=%d, poolSize=%d",
            poolSize - currentPoolSize.availablePermits(), poolSize));
      }
      QueueElement holder;
      synchronized (queue)
      {
        if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
        {
          // No waiting futures.
          return;
        }
        else
        {
          holder = queue.removeFirst();
        }
      }
      // There was waiting future, so close it.
      holder.getWaitingFuture().handleErrorResult(error);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleResult(final AsynchronousConnection connection)
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.fine(String.format(
            "Connection attempt succeeded: "
                + " currentPoolSize=%d, poolSize=%d",
                poolSize - currentPoolSize.availablePermits(), poolSize));
      }
      publishConnection(connection);
    }
  }
  private final class PooledConnectionWapper implements AsynchronousConnection,
      ConnectionEventListener
  /**
   * A pooled connection is passed to the client. It wraps an underlying
   * "pooled" connection obtained from the underlying factory and lasts until
   * the client application closes this connection. More specifically, pooled
   * connections are not actually stored in the internal queue.
   */
  private final class PooledConnection implements AsynchronousConnection
  {
    // Connection event listeners registed against this pooled connection should
    // have the same life time as the pooled connection.
    private final List<ConnectionEventListener> listeners =
      new CopyOnWriteArrayList<ConnectionEventListener>();
    private final AsynchronousConnection connection;
    private volatile boolean isClosed;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private PooledConnectionWapper(final AsynchronousConnection connection)
    PooledConnection(final AsynchronousConnection connection)
    {
      this.connection = connection;
      this.connection.addConnectionEventListener(this);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Void> abandon(final AbandonRequest request)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
@@ -92,6 +161,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -106,6 +179,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -122,18 +199,28 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void addConnectionEventListener(
        final ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.add(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -148,6 +235,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -164,47 +255,51 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void close()
    {
      synchronized (pool)
      if (!isClosed.compareAndSet(false, true))
      {
        if (isClosed)
        {
          return;
        }
        isClosed = true;
        // Already closed.
        return;
      }
        // Don't put invalid connections back in the pool.
        if (connection.isValid())
      // Don't put invalid connections back in the pool.
      if (connection.isValid())
      {
        publishConnection(connection);
      }
      else
      {
        // The connection may have been disconnected by the remote server, but
        // the server may still be available. In order to avoid leaving pending
        // futures hanging indefinitely, we should try to reconnect immediately.
        // Close the dead connection.
        connection.close();
        // Try to get a new connection to replace it.
        factory.getAsynchronousConnection(connectionResultHandler);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          releaseConnection(connection);
          return;
          StaticUtils.DEBUG_LOG.warning(String.format(
              "Connection no longer valid. "
                  + "currentPoolSize=%d, poolSize=%d",
                  poolSize - currentPoolSize.availablePermits(), poolSize));
        }
      }
      // Connection is no longer valid. Close outside of lock
      connection.removeConnectionEventListener(this);
      connection.close();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Dead connection released and closed. "
                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
      }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Reconnect attempt starting. "
                + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
      }
      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close(final UnbindRequest request, final String reason)
        throws NullPointerException
    {
@@ -213,6 +308,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -227,6 +326,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -243,48 +346,10 @@
    public void handleConnectionClosed()
    {
      // Ignore - we intercept close via the close method.
    }
    public void handleConnectionError(final boolean isDisconnectNotification,
        final ErrorResultException error)
    {
      // Remove this connection from the pool if its in there. If not,
      // just ignore and wait for the user to close and we can deal with it
      // there.
      if (pool.remove(this))
      {
        numConnections--;
        connection.removeConnectionEventListener(this);
        // FIXME: should still close the connection, but we need to be
        // careful that users of the pooled connection get a sensible
        // error if they continue to use it (i.e. not an NPE or ISE).
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          StaticUtils.DEBUG_LOG.warning(String.format(
              "Connection error occured and removed from pool: "
                  + error.getMessage()
                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
              numConnections, pool.size(), pendingFutures.size()));
        }
      }
    }
    public void handleUnsolicitedNotification(final ExtendedResult notification)
    {
      // Ignore
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -299,6 +364,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -315,6 +384,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -329,6 +402,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request,
        final ResultHandler<? super R> resultHandler,
@@ -349,6 +426,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getSynchronousConnection()
    {
      return new SynchronousConnection(this);
@@ -359,20 +437,29 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isClosed()
    {
      return isClosed;
      return isClosed.get();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid()
    {
      return !isClosed && connection.isValid();
      return connection.isValid() && !isClosed();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -387,6 +474,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -403,6 +494,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -417,6 +512,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -436,6 +535,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> readEntry(final DN name,
        final Collection<String> attributeDescriptions,
        final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -454,6 +554,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<RootDSE> readRootDSE(
        final ResultHandler<? super RootDSE> handler)
        throws UnsupportedOperationException, IllegalStateException
@@ -470,6 +571,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Schema> readSchema(final DN name,
        final ResultHandler<? super Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
@@ -486,6 +588,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Schema> readSchemaForEntry(final DN name,
        final ResultHandler<? super Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
@@ -499,17 +602,27 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void removeConnectionEventListener(
        final ConnectionEventListener listener) throws NullPointerException
    {
      Validator.ensureNotNull(listener);
      if (isClosed())
      {
        throw new IllegalStateException();
      }
      listeners.remove(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler handler)
        throws UnsupportedOperationException, IllegalStateException,
@@ -524,6 +637,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
@@ -543,6 +660,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public FutureResult<SearchResultEntry> searchSingleEntry(
        final SearchRequest request,
        final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -561,9 +679,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      StringBuilder builder = new StringBuilder();
      final StringBuilder builder = new StringBuilder();
      builder.append("PooledConnection(");
      builder.append(connection);
      builder.append(')');
@@ -573,63 +692,86 @@
  private class ReconnectHandler implements
      ResultHandler<AsynchronousConnection>
  /**
   * A queue element is either a pending connection request future awaiting an
   * {@code AsynchronousConnection} or it is an unused
   * {@code AsynchronousConnection} awaiting a connection request.
   */
  private static final class QueueElement
  {
    public void handleErrorResult(final ErrorResultException error)
    {
      // The reconnect failed. Fail the connect attempt.
      numConnections--;
      // The reconnect failed. The underlying connection factory
      // probably went
      // down. Just fail all pending futures
      synchronized (pool)
      {
        while (!pendingFutures.isEmpty())
        {
          pendingFutures.poll().handleErrorResult(error);
        }
      }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Reconnect failed. Failed all pending futures: "
                + error.getMessage()
                + " numConnections: %d, poolSize: %d, pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
      }
    private final Object value;
    QueueElement(final AsynchronousConnection connection)
    {
      this.value = connection;
    }
    public void handleResult(final AsynchronousConnection connection)
    QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
    }
    AsynchronousConnection getWaitingConnection()
    {
      if (value instanceof AsynchronousConnection)
      {
        StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. "
            + " numConnections: %d, poolSize: %d, pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
        return (AsynchronousConnection) value;
      }
      synchronized (pool)
      else
      {
        releaseConnection(connection);
        throw new IllegalStateException();
      }
    }
    @SuppressWarnings("unchecked")
    AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
    {
      if (value instanceof AsynchronousFutureResult)
      {
        return (AsynchronousFutureResult<AsynchronousConnection>) value;
      }
      else
      {
        throw new IllegalStateException();
      }
    }
    boolean isWaitingFuture()
    {
      return value instanceof AsynchronousFutureResult;
    }
    public String toString()
    {
      return String.valueOf(value);
    }
  }
  private final ConnectionFactory connectionFactory;
  // Guarded by queue.
  private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
  private volatile int numConnections;
  private final ConnectionFactory factory;
  private final int poolSize;
  // FIXME: should use a better collection than this - CLQ?
  private final Queue<AsynchronousConnection> pool;
  private final Semaphore currentPoolSize;
  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
  private final ResultHandler<AsynchronousConnection> connectionResultHandler =
    new ConnectionResultHandler();
@@ -637,109 +779,16 @@
   * Creates a new connection pool which will maintain {@code poolSize}
   * connections created using the provided connection factory.
   *
   * @param connectionFactory
   * @param factory
   *          The connection factory to use for creating new connections.
   * @param poolSize
   *          The maximum size of the connection pool.
   */
  ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize)
  ConnectionPool(final ConnectionFactory factory, final int poolSize)
  {
    this.connectionFactory = connectionFactory;
    this.factory = factory;
    this.poolSize = poolSize;
    this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
    this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
  }
  @Override
  public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<? super AsynchronousConnection> handler)
  {
    // This entire method is synchronized to ensure new connects are
    // done synchronously to avoid the "pending connect" case.
    AsynchronousConnection conn;
    synchronized (pool)
    {
      // Check to see if we have a connection in the pool
      conn = pool.poll();
      if (conn == null)
      {
        // Pool was empty. Maybe a new connection if pool size is not
        // reached
        if (numConnections >= poolSize)
        {
          // We reached max # of conns so wait for a connection to
          // become available.
          final FuturePooledConnection future = new FuturePooledConnection(
              handler);
          pendingFutures.add(future);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG.finest(String.format(
                "No connections available. Wait-listed"
                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures.size()));
          }
          return future;
        }
      }
    }
    if (conn == null)
    {
      try
      {
        // We can create a new connection.
        conn = connectionFactory.getAsynchronousConnection(null).get();
        numConnections++;
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String.format(
              "New connection established and aquired. "
                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
              numConnections, pool.size(), pendingFutures.size()));
        }
      }
      catch (final ErrorResultException e)
      {
        if (handler != null)
        {
          handler.handleErrorResult(e);
        }
        return new CompletedFutureResult<AsynchronousConnection>(e);
      }
      catch (final InterruptedException e)
      {
        final ErrorResultException error = new ErrorResultException(Responses
            .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
        if (handler != null)
        {
          handler.handleErrorResult(error);
        }
        return new CompletedFutureResult<AsynchronousConnection>(error);
      }
    }
    else
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG.finest(String.format(
            "Connection aquired from pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
            numConnections, pool.size(), pendingFutures.size()));
      }
    }
    final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
        conn);
    if (handler != null)
    {
      handler.handleResult(pooledConnection);
    }
    return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
    this.currentPoolSize = new Semaphore(poolSize);
  }
@@ -747,11 +796,59 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<? super AsynchronousConnection> handler)
  {
    QueueElement holder;
    synchronized (queue)
    {
      if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
      {
        holder = new QueueElement(handler);
        queue.add(holder);
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    if (!holder.isWaitingFuture())
    {
      // There was a completed connection attempt.
      final AsynchronousConnection connection = holder.getWaitingConnection();
      final PooledConnection pooledConnection = new PooledConnection(connection);
      if (handler != null)
      {
        handler.handleResult(pooledConnection);
      }
      return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
    }
    else
    {
      // Grow the pool if needed.
      final FutureResult<AsynchronousConnection> future = holder
          .getWaitingFuture();
      if (!future.isDone() && currentPoolSize.tryAcquire())
      {
        factory.getAsynchronousConnection(connectionResultHandler);
      }
      return future;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    final StringBuilder builder = new StringBuilder();
    builder.append("ConnectionPool(");
    builder.append(String.valueOf(connectionFactory));
    builder.append(String.valueOf(factory));
    builder.append(',');
    builder.append(poolSize);
    builder.append(')');
@@ -760,47 +857,25 @@
  private void releaseConnection(final AsynchronousConnection connection)
  private void publishConnection(final AsynchronousConnection connection)
  {
    // See if there waiters pending.
    for (;;)
    QueueElement holder;
    synchronized (queue)
    {
      final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
          connection);
      final FuturePooledConnection future = pendingFutures.poll();
      if (future == null)
      if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
      {
        // No waiters - so drop out and add connection to pool.
        break;
      }
      future.handleResult(pooledConnection);
      if (!future.isCancelled())
      {
        // The future was not cancelled and the connection was
        // accepted.
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG.finest(String.format(
              "Connection released and directly "
                  + "given to waiter. numConnections: %d, poolSize: %d, "
                  + "pendingFutures: %d", numConnections, pool.size(),
              pendingFutures.size()));
        }
        holder = new QueueElement(connection);
        queue.add(holder);
        return;
      }
      else
      {
        holder = queue.removeFirst();
      }
    }
    // No waiters. Put back in pool.
    pool.offer(connection);
    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
    {
      StaticUtils.DEBUG_LOG.finest(String.format(
          "Connection released to pool. numConnections: %d, "
              + "poolSize: %d, pendingFutures: %d", numConnections,
          pool.size(), pendingFutures.size()));
    }
    // There was waiting future, so close it.
    final PooledConnection pooledConnection = new PooledConnection(connection);
    holder.getWaitingFuture().handleResult(pooledConnection);
  }
}
sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
@@ -70,7 +70,7 @@
  /**
   * Creates a new fail-over load balancing algorithm which will monitor offline
   * connection factories every 10 seconds using the default scheduler.
   * connection factories every 1 second using the default scheduler.
   *
   * @param factories
   *          The ordered collection of connection factories.
sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java
@@ -69,7 +69,7 @@
  /**
   * Creates a new round robin load balancing algorithm which will monitor
   * offline connection factories every 10 seconds using the default scheduler.
   * offline connection factories every 1 second using the default scheduler.
   *
   * @param factories
   *          The ordered collection of connection factories.
sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java
@@ -36,14 +36,18 @@
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.opends.sdk.requests.DigestMD5SASLBindRequest;
import org.opends.sdk.requests.Requests;
import org.opends.sdk.requests.SearchRequest;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.annotations.DataProvider;
import com.sun.opends.sdk.util.StaticUtils;
import javax.net.ssl.SSLContext;
@@ -101,8 +105,30 @@
  /**
   * Disables logging before the tests.
   */
  @BeforeClass()
  public void disableLogging()
  {
    StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE);
  }
  /**
   * Re-enable logging after the tests.
   */
  @AfterClass()
  public void enableLogging()
  {
    StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
  }
  @DataProvider(name = "connectionFactories")
  public Object[][] getConnectyionFactories() throws Exception
  public Object[][] getConnectionFactories() throws Exception
  {
    Object[][] factories = new Object[21][1];
sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java
@@ -32,13 +32,17 @@
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.sun.opends.sdk.util.StaticUtils;
/**
@@ -55,16 +59,18 @@
    public void handleUnsolicitedNotification(ExtendedResult notification)
    @Override
    public void handleConnectionClosed()
    {
      errorMessage = "Unexpected call to handleUnsolicitedNotification";
      errorMessage = "Unexpected call to handleConnectionClosed";
      closeLatch.countDown();
    }
    public void handleConnectionError(boolean isDisconnectNotification,
        ErrorResultException error)
    @Override
    public void handleConnectionError(final boolean isDisconnectNotification,
        final ErrorResultException error)
    {
      errorMessage = "Unexpected call to handleConnectionError";
      closeLatch.countDown();
@@ -72,9 +78,10 @@
    public void handleConnectionClosed()
    @Override
    public void handleUnsolicitedNotification(final ExtendedResult notification)
    {
      errorMessage = "Unexpected call to handleConnectionClosed";
      errorMessage = "Unexpected call to handleUnsolicitedNotification";
      closeLatch.countDown();
    }
  }
@@ -85,7 +92,7 @@
      ServerConnection<Integer>
  {
    volatile LDAPClientContext context = null;
    volatile boolean isConnected = false;
    final CountDownLatch isConnected = new CountDownLatch(1);
    final CountDownLatch isClosed = new CountDownLatch(1);
@@ -294,7 +301,7 @@
        final LDAPClientContext clientContext) throws ErrorResultException
    {
      serverConnection.context = clientContext;
      serverConnection.isConnected = true;
      serverConnection.isConnected.countDown();
      return serverConnection;
    }
  }
@@ -302,6 +309,233 @@
  /**
   * Disables logging before the tests.
   */
  @BeforeClass()
  public void disableLogging()
  {
    StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE);
  }
  /**
   * Re-enable logging after the tests.
   */
  @AfterClass()
  public void enableLogging()
  {
    StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
  }
  /**
   * Tests connection event listener.
   *
   * @throws Exception
   *           If an unexpected error occurred.
   */
  @Test
  public void testConnectionEventListenerClose() throws Exception
  {
    final MockServerConnection onlineServerConnection = new MockServerConnection();
    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
        onlineServerConnection);
    final LDAPListener onlineServerListener = new LDAPListener("localhost",
        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
    final Connection connection;
    try
    {
      // Connect and bind.
      connection = new LDAPConnectionFactory(
          onlineServerListener.getSocketAddress()).getConnection();
      final MockConnectionEventListener listener = new MockConnectionEventListener()
      {
        @Override
        public void handleConnectionClosed()
        {
          closeLatch.countDown();
        }
      };
      connection.addConnectionEventListener(listener);
      Assert.assertEquals(listener.closeLatch.getCount(), 1);
      connection.close();
      listener.closeLatch.await();
      Assert.assertNull(listener.errorMessage);
    }
    finally
    {
      onlineServerListener.close();
    }
  }
  /**
   * Tests connection event listener.
   *
   * @throws Exception
   *           If an unexpected error occurred.
   */
  @Test
  public void testConnectionEventListenerDisconnect() throws Exception
  {
    final MockServerConnection onlineServerConnection = new MockServerConnection();
    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
        onlineServerConnection);
    final LDAPListener onlineServerListener = new LDAPListener("localhost",
        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
    final Connection connection;
    try
    {
      // Connect and bind.
      connection = new LDAPConnectionFactory(
          onlineServerListener.getSocketAddress()).getConnection();
      final MockConnectionEventListener listener = new MockConnectionEventListener()
      {
        @Override
        public void handleConnectionError(
            final boolean isDisconnectNotification,
            final ErrorResultException error)
        {
          if (isDisconnectNotification)
          {
            errorMessage = "Unexpected disconnect notification";
          }
          closeLatch.countDown();
        }
      };
      connection.addConnectionEventListener(listener);
      Assert.assertEquals(listener.closeLatch.getCount(), 1);
      Assert.assertTrue(onlineServerConnection.isConnected
          .await(10, TimeUnit.SECONDS));
      onlineServerConnection.context.disconnect();
      listener.closeLatch.await();
      Assert.assertNull(listener.errorMessage);
      connection.close();
    }
    finally
    {
      onlineServerListener.close();
    }
  }
  /**
   * Tests connection event listener.
   *
   * @throws Exception
   *           If an unexpected error occurred.
   */
  @Test
  public void testConnectionEventListenerDisconnectNotification()
      throws Exception
  {
    final MockServerConnection onlineServerConnection = new MockServerConnection();
    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
        onlineServerConnection);
    final LDAPListener onlineServerListener = new LDAPListener("localhost",
        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
    final Connection connection;
    try
    {
      // Connect and bind.
      connection = new LDAPConnectionFactory(
          onlineServerListener.getSocketAddress()).getConnection();
      final MockConnectionEventListener listener = new MockConnectionEventListener()
      {
        @Override
        public void handleConnectionError(
            final boolean isDisconnectNotification,
            final ErrorResultException error)
        {
          if (!isDisconnectNotification
              || !error.getResult().getResultCode().equals(ResultCode.BUSY)
              || !error.getResult().getDiagnosticMessage().equals("test"))
          {
            errorMessage = "Missing disconnect notification: " + error;
          }
          closeLatch.countDown();
        }
      };
      connection.addConnectionEventListener(listener);
      Assert.assertEquals(listener.closeLatch.getCount(), 1);
      Assert.assertTrue(onlineServerConnection.isConnected
          .await(10, TimeUnit.SECONDS));
      onlineServerConnection.context.disconnect(ResultCode.BUSY, "test");
      listener.closeLatch.await();
      Assert.assertNull(listener.errorMessage);
      connection.close();
    }
    finally
    {
      onlineServerListener.close();
    }
  }
  /**
   * Tests connection event listener.
   *
   * @throws Exception
   *           If an unexpected error occurred.
   */
  @Test
  public void testConnectionEventListenerUnbind() throws Exception
  {
    final MockServerConnection onlineServerConnection = new MockServerConnection();
    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
        onlineServerConnection);
    final LDAPListener onlineServerListener = new LDAPListener("localhost",
        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
    final Connection connection;
    try
    {
      // Connect and bind.
      connection = new LDAPConnectionFactory(
          onlineServerListener.getSocketAddress()).getConnection();
      final MockConnectionEventListener listener = new MockConnectionEventListener()
      {
        @Override
        public void handleConnectionClosed()
        {
          closeLatch.countDown();
        }
      };
      connection.addConnectionEventListener(listener);
      Assert.assertEquals(listener.closeLatch.getCount(), 1);
      connection.close(Requests.newUnbindRequest(), "called from unit test");
      listener.closeLatch.await();
      Assert.assertNull(listener.errorMessage);
    }
    finally
    {
      onlineServerListener.close();
    }
  }
  /**
   * Tests basic LDAP listener functionality.
   *
   * @throws Exception
@@ -318,10 +552,15 @@
    try
    {
      // Connect and close.
      new LDAPConnectionFactory(listener.getSocketAddress()).getConnection()
          .close();
      final Connection connection = new LDAPConnectionFactory(
          listener.getSocketAddress()).getConnection();
      Assert.assertTrue(serverConnection.isConnected);
      Assert.assertTrue(serverConnection.isConnected
          .await(10, TimeUnit.SECONDS));
      Assert.assertEquals(serverConnection.isClosed.getCount(), 1);
      connection.close();
      Assert.assertTrue(serverConnection.isClosed.await(10, TimeUnit.SECONDS));
    }
    finally
@@ -401,14 +640,18 @@
      try
      {
        // Connect and close.
        new LDAPConnectionFactory(proxyListener.getSocketAddress())
            .getConnection().close();
        final Connection connection = new LDAPConnectionFactory(
            proxyListener.getSocketAddress()).getConnection();
        Assert.assertTrue(proxyServerConnection.isConnected.await(10,
            TimeUnit.SECONDS));
        Assert.assertTrue(onlineServerConnection.isConnected.await(10,
            TimeUnit.SECONDS));
        // Wait for connect/close to complete.
        proxyServerConnection.isClosed.await();
        connection.close();
        Assert.assertTrue(proxyServerConnection.isConnected);
        Assert.assertTrue(onlineServerConnection.isConnected);
        proxyServerConnection.isClosed.await();
      }
      finally
      {
@@ -506,6 +749,11 @@
        try
        {
          connection.bind("cn=test", "password");
          Assert.assertTrue(proxyServerConnection.isConnected.await(10,
              TimeUnit.SECONDS));
          Assert.assertTrue(onlineServerConnection.isConnected.await(10,
              TimeUnit.SECONDS));
        }
        finally
        {
@@ -514,9 +762,6 @@
        // Wait for connect/close to complete.
        proxyServerConnection.isClosed.await();
        Assert.assertTrue(proxyServerConnection.isConnected);
        Assert.assertTrue(onlineServerConnection.isConnected);
      }
      finally
      {
@@ -602,14 +847,18 @@
      try
      {
        // Connect and close.
        new LDAPConnectionFactory(proxyListener.getSocketAddress())
            .getConnection().close();
        final Connection connection = new LDAPConnectionFactory(
            proxyListener.getSocketAddress()).getConnection();
        Assert.assertTrue(proxyServerConnection.isConnected.await(10,
            TimeUnit.SECONDS));
        Assert.assertTrue(onlineServerConnection.isConnected.await(10,
            TimeUnit.SECONDS));
        connection.close();
        // Wait for connect/close to complete.
        proxyServerConnection.isClosed.await();
        Assert.assertTrue(proxyServerConnection.isConnected);
        Assert.assertTrue(onlineServerConnection.isConnected);
      }
      finally
      {
@@ -713,6 +962,11 @@
        try
        {
          connection.bind("cn=test", "password");
          Assert.assertTrue(proxyServerConnection.isConnected.await(10,
              TimeUnit.SECONDS));
          Assert.assertTrue(onlineServerConnection.isConnected.await(10,
              TimeUnit.SECONDS));
        }
        finally
        {
@@ -721,9 +975,6 @@
        // Wait for connect/close to complete.
        proxyServerConnection.isClosed.await();
        Assert.assertTrue(proxyServerConnection.isConnected);
        Assert.assertTrue(onlineServerConnection.isConnected);
      }
      finally
      {
@@ -763,7 +1014,7 @@
      {
        connection.bind("cn=test", "password");
      }
      catch (ErrorResultException e)
      catch (final ErrorResultException e)
      {
        connection.close();
        throw e;
@@ -785,7 +1036,7 @@
      Assert
          .fail("Connection attempt to closed listener succeeded unexpectedly");
    }
    catch (ConnectionException e)
    catch (final ConnectionException e)
    {
      // Expected.
    }
@@ -795,7 +1046,7 @@
      connection.bind("cn=test", "password");
      Assert.fail("Bind attempt on closed connection succeeded unexpectedly");
    }
    catch (ErrorResultException e)
    catch (final ErrorResultException e)
    {
      // Expected.
      Assert.assertFalse(connection.isValid());
@@ -808,199 +1059,4 @@
      Assert.assertTrue(connection.isClosed());
    }
  }
  /**
   * Tests connection event listener.
   *
   * @throws Exception
   *           If an unexpected error occurred.
   */
  @Test
  public void testConnectionEventListenerClose() throws Exception
  {
    final MockServerConnection onlineServerConnection = new MockServerConnection();
    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
        onlineServerConnection);
    final LDAPListener onlineServerListener = new LDAPListener("localhost",
        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
    final Connection connection;
    try
    {
      // Connect and bind.
      connection = new LDAPConnectionFactory(
          onlineServerListener.getSocketAddress()).getConnection();
      MockConnectionEventListener listener = new MockConnectionEventListener()
      {
        public void handleConnectionClosed()
        {
          closeLatch.countDown();
        }
      };
      connection.addConnectionEventListener(listener);
      Assert.assertEquals(listener.closeLatch.getCount(), 1);
      connection.close();
      listener.closeLatch.await();
      Assert.assertNull(listener.errorMessage);
    }
    finally
    {
      onlineServerListener.close();
    }
  }
  /**
   * Tests connection event listener.
   *
   * @throws Exception
   *           If an unexpected error occurred.
   */
  @Test
  public void testConnectionEventListenerUnbind() throws Exception
  {
    final MockServerConnection onlineServerConnection = new MockServerConnection();
    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
        onlineServerConnection);
    final LDAPListener onlineServerListener = new LDAPListener("localhost",
        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
    final Connection connection;
    try
    {
      // Connect and bind.
      connection = new LDAPConnectionFactory(
          onlineServerListener.getSocketAddress()).getConnection();
      MockConnectionEventListener listener = new MockConnectionEventListener()
      {
        public void handleConnectionClosed()
        {
          closeLatch.countDown();
        }
      };
      connection.addConnectionEventListener(listener);
      Assert.assertEquals(listener.closeLatch.getCount(), 1);
      connection.close(Requests.newUnbindRequest(), "called from unit test");
      listener.closeLatch.await();
      Assert.assertNull(listener.errorMessage);
    }
    finally
    {
      onlineServerListener.close();
    }
  }
  /**
   * Tests connection event listener.
   *
   * @throws Exception
   *           If an unexpected error occurred.
   */
  @Test
  public void testConnectionEventListenerDisconnect() throws Exception
  {
    final MockServerConnection onlineServerConnection = new MockServerConnection();
    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
        onlineServerConnection);
    final LDAPListener onlineServerListener = new LDAPListener("localhost",
        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
    final Connection connection;
    try
    {
      // Connect and bind.
      connection = new LDAPConnectionFactory(
          onlineServerListener.getSocketAddress()).getConnection();
      MockConnectionEventListener listener = new MockConnectionEventListener()
      {
        public void handleConnectionError(boolean isDisconnectNotification,
            ErrorResultException error)
        {
          if (isDisconnectNotification)
          {
            errorMessage = "Unexpected disconnect notification";
          }
          closeLatch.countDown();
        }
      };
      connection.addConnectionEventListener(listener);
      Assert.assertEquals(listener.closeLatch.getCount(), 1);
      onlineServerConnection.context.disconnect();
      listener.closeLatch.await();
      Assert.assertNull(listener.errorMessage);
      connection.close();
    }
    finally
    {
      onlineServerListener.close();
    }
  }
  /**
   * Tests connection event listener.
   *
   * @throws Exception
   *           If an unexpected error occurred.
   */
  @Test
  public void testConnectionEventListenerDisconnectNotification()
      throws Exception
  {
    final MockServerConnection onlineServerConnection = new MockServerConnection();
    final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
        onlineServerConnection);
    final LDAPListener onlineServerListener = new LDAPListener("localhost",
        TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
    final Connection connection;
    try
    {
      // Connect and bind.
      connection = new LDAPConnectionFactory(
          onlineServerListener.getSocketAddress()).getConnection();
      MockConnectionEventListener listener = new MockConnectionEventListener()
      {
        public void handleConnectionError(boolean isDisconnectNotification,
            ErrorResultException error)
        {
          if (!isDisconnectNotification
              || !error.getResult().getResultCode().equals(ResultCode.BUSY)
              || !error.getResult().getDiagnosticMessage().equals("test"))
          {
            errorMessage = "Missing disconnect notification: " + error;
          }
          closeLatch.countDown();
        }
      };
      connection.addConnectionEventListener(listener);
      Assert.assertEquals(listener.closeLatch.getCount(), 1);
      onlineServerConnection.context.disconnect(ResultCode.BUSY, "test");
      listener.closeLatch.await();
      Assert.assertNull(listener.errorMessage);
      connection.close();
    }
    finally
    {
      onlineServerListener.close();
    }
  }
}
sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 */
package org.opends.sdk;
@@ -55,8 +55,7 @@
  //
  // This could be a problem if a subclass references a @DataProvider in
  // a super-class that provides static parameters, i.e. the parameters
  // are
  // not regenerated for each invocation of the DataProvider.
  // are not regenerated for each invocation of the DataProvider.
  //
  /**