From 0c70e50ddb4b64d3a82e79bac8afe7c422917812 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Tue, 19 Oct 2010 16:36:12 +0000
Subject: [PATCH] Make ConnectionPool implementation fully async and fix some race conditions in the unit tests.
---
opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java | 432 ++++++++++-----
opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java | 28 +
opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java | 508 ++++++++++--------
opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java | 2
opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java | 2
opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java | 5
opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java | 46 +
opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java | 589 ++++++++++++---------
8 files changed, 964 insertions(+), 648 deletions(-)
diff --git a/opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java b/opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
index c95fddf..204b68e 100644
--- a/opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
+++ b/opendj-sdk/sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
@@ -36,11 +36,9 @@
import java.util.List;
import org.opends.sdk.*;
+import org.opends.sdk.controls.ProxiedAuthV2RequestControl;
import org.opends.sdk.requests.*;
-import org.opends.sdk.responses.BindResult;
-import org.opends.sdk.responses.CompareResult;
-import org.opends.sdk.responses.ExtendedResult;
-import org.opends.sdk.responses.Result;
+import org.opends.sdk.responses.*;
@@ -73,37 +71,127 @@
ServerConnection<Integer>
{
- private volatile AsynchronousConnection connection = null;
-
- private volatile boolean isUnbindRequired = false;
-
-
-
- private AsynchronousConnection getConnection()
- throws ErrorResultException
+ private abstract class AbstractRequestCompletionHandler
+ <R extends Result, H extends ResultHandler<? super R>>
+ implements ResultHandler<R>
{
- if (connection == null)
+ final H resultHandler;
+ final AsynchronousConnection connection;
+
+
+
+ AbstractRequestCompletionHandler(
+ final AsynchronousConnection connection, final H resultHandler)
{
- synchronized (this)
- {
- if (connection == null)
- {
- try
- {
- connection = factory.getAsynchronousConnection(null).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
+ this.connection = connection;
+ this.resultHandler = resultHandler;
}
- return connection;
+
+
+
+ @Override
+ public final void handleErrorResult(final ErrorResultException error)
+ {
+ connection.close();
+ resultHandler.handleErrorResult(error);
+ }
+
+
+
+ @Override
+ public final void handleResult(final R result)
+ {
+ connection.close();
+ resultHandler.handleResult(result);
+ }
+
}
+ private abstract class ConnectionCompletionHandler<R extends Result>
+ implements ResultHandler<AsynchronousConnection>
+ {
+ private final ResultHandler<? super R> resultHandler;
+
+
+
+ ConnectionCompletionHandler(final ResultHandler<? super R> resultHandler)
+ {
+ this.resultHandler = resultHandler;
+ }
+
+
+
+ @Override
+ public final void handleErrorResult(final ErrorResultException error)
+ {
+ resultHandler.handleErrorResult(error);
+ }
+
+
+
+ @Override
+ public abstract void handleResult(AsynchronousConnection connection);
+
+ }
+
+
+
+ private final class RequestCompletionHandler<R extends Result> extends
+ AbstractRequestCompletionHandler<R, ResultHandler<? super R>>
+ {
+ RequestCompletionHandler(final AsynchronousConnection connection,
+ final ResultHandler<? super R> resultHandler)
+ {
+ super(connection, resultHandler);
+ }
+ }
+
+
+
+ private final class SearchRequestCompletionHandler extends
+ AbstractRequestCompletionHandler<Result, SearchResultHandler>
+ implements SearchResultHandler
+ {
+
+ SearchRequestCompletionHandler(final AsynchronousConnection connection,
+ final SearchResultHandler resultHandler)
+ {
+ super(connection, resultHandler);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean handleEntry(final SearchResultEntry entry)
+ {
+ return resultHandler.handleEntry(entry);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final boolean handleReference(
+ final SearchResultReference reference)
+ {
+ return resultHandler.handleReference(reference);
+ }
+
+ }
+
+
+
+ private volatile ProxiedAuthV2RequestControl proxiedAuthControl = null;
+
+
+
private ServerConnectionImpl(final LDAPClientContext clientContext)
{
// Nothing to do.
@@ -133,15 +221,22 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().add(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<Result> innerHandler =
+ new RequestCompletionHandler<Result>(connection, resultHandler);
+ connection.add(request, innerHandler, intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -156,6 +251,7 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
+
if (request.getAuthenticationType() != ((byte) 0x80))
{
// TODO: SASL authentication not implemented.
@@ -166,20 +262,45 @@
}
else
{
- // Note that this connection has received a bind request: the
- // connection should be reverted back to anonymous when the client
- // unbinds.
- isUnbindRequired = true;
+ // Authenticate using a separate bind connection pool, because we
+ // don't want to change the state of the pooled connection.
+ final ConnectionCompletionHandler<BindResult> outerHandler =
+ new ConnectionCompletionHandler<BindResult>(resultHandler)
+ {
- try
- {
- getConnection().bind(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final ResultHandler<BindResult> innerHandler = new ResultHandler<BindResult>()
+ {
+
+ @Override
+ public final void handleErrorResult(
+ final ErrorResultException error)
+ {
+ connection.close();
+ resultHandler.handleErrorResult(error);
+ }
+
+
+
+ @Override
+ public final void handleResult(final BindResult result)
+ {
+ connection.close();
+ proxiedAuthControl = ProxiedAuthV2RequestControl
+ .newControl("dn:" + request.getName());
+ resultHandler.handleResult(result);
+ }
+ };
+ connection.bind(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ proxiedAuthControl = null;
+ bindFactory.getAsynchronousConnection(outerHandler);
}
}
@@ -195,15 +316,23 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<CompareResult> outerHandler =
+ new ConnectionCompletionHandler<CompareResult>(resultHandler)
{
- getConnection().compare(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<CompareResult> innerHandler =
+ new RequestCompletionHandler<CompareResult>(connection, resultHandler);
+ connection.compare(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -215,8 +344,7 @@
public void handleConnectionClosed(final Integer requestContext,
final UnbindRequest request)
{
- // Client connection closed: release the proxy connection.
- close();
+ // Client connection closed.
}
@@ -228,8 +356,7 @@
public void handleConnectionDisconnected(final ResultCode resultCode,
final String message)
{
- // Client disconnected by server: release the proxy connection.
- close();
+ // Client disconnected by server.
}
@@ -240,8 +367,7 @@
@Override
public void handleConnectionError(final Throwable error)
{
- // Client connection failed: release the proxy connection.
- close();
+ // Client connection failed.
}
@@ -256,15 +382,23 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().delete(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<Result> innerHandler =
+ new RequestCompletionHandler<Result>(connection, resultHandler);
+ connection.delete(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -296,15 +430,24 @@
else
{
// Forward all other extended operations.
- try
+ addProxiedAuthControl(request);
+
+ final ConnectionCompletionHandler<R> outerHandler =
+ new ConnectionCompletionHandler<R>(resultHandler)
{
- getConnection().extendedRequest(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<R> innerHandler =
+ new RequestCompletionHandler<R>(connection, resultHandler);
+ connection.extendedRequest(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
}
@@ -320,15 +463,23 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().modify(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<Result> innerHandler =
+ new RequestCompletionHandler<Result>(connection, resultHandler);
+ connection.modify(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -343,15 +494,23 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().modifyDN(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final RequestCompletionHandler<Result> innerHandler =
+ new RequestCompletionHandler<Result>(connection, resultHandler);
+ connection.modifyDN(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
@@ -365,54 +524,33 @@
final IntermediateResponseHandler intermediateResponseHandler)
throws UnsupportedOperationException
{
- try
+ addProxiedAuthControl(request);
+ final ConnectionCompletionHandler<Result> outerHandler =
+ new ConnectionCompletionHandler<Result>(resultHandler)
{
- getConnection().search(request, resultHandler,
- intermediateResponseHandler);
- }
- catch (ErrorResultException e)
- {
- resultHandler.handleErrorResult(e);
- }
+
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ final SearchRequestCompletionHandler innerHandler =
+ new SearchRequestCompletionHandler(connection, resultHandler);
+ connection.search(request, innerHandler,
+ intermediateResponseHandler);
+ }
+
+ };
+
+ factory.getAsynchronousConnection(outerHandler);
}
- private void close()
+ private void addProxiedAuthControl(final Request request)
{
- if (isUnbindRequired)
+ final ProxiedAuthV2RequestControl control = proxiedAuthControl;
+ if (control != null)
{
- synchronized (this)
- {
- if (connection != null)
- {
- connection.bind(Requests.newSimpleBindRequest(),
- new ResultHandler<Result>()
- {
-
- public void handleErrorResult(ErrorResultException error)
- {
- // The rebind failed - this is bad because if the
- // connection is pooled it will remain authenticated as
- // the wrong user.
- handleResult(error.getResult());
- }
-
-
-
- public void handleResult(Result result)
- {
- synchronized (ServerConnectionImpl.this)
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
- });
- }
- }
+ request.addControl(control);
}
}
@@ -421,12 +559,15 @@
private final ConnectionFactory factory;
+ private final ConnectionFactory bindFactory;
- private Proxy(final ConnectionFactory factory)
+ private Proxy(final ConnectionFactory factory,
+ final ConnectionFactory bindFactory)
{
this.factory = factory;
+ this.bindFactory = bindFactory;
}
@@ -467,18 +608,25 @@
// Create load balancer.
final List<ConnectionFactory> factories = new LinkedList<ConnectionFactory>();
+ final List<ConnectionFactory> bindFactories = new LinkedList<ConnectionFactory>();
for (int i = 2; i < args.length; i += 2)
{
final String remoteAddress = args[i];
final int remotePort = Integer.parseInt(args[i + 1]);
- // factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
- // remoteAddress, remotePort), Integer.MAX_VALUE));
- factories.add(new LDAPConnectionFactory(remoteAddress, remotePort));
+ factories.add(Connections.newConnectionPool(new LDAPConnectionFactory(
+ remoteAddress, remotePort), Integer.MAX_VALUE));
+ bindFactories.add(Connections.newConnectionPool(
+ new LDAPConnectionFactory(remoteAddress, remotePort),
+ Integer.MAX_VALUE));
}
final RoundRobinLoadBalancingAlgorithm algorithm = new RoundRobinLoadBalancingAlgorithm(
factories);
+ final RoundRobinLoadBalancingAlgorithm bindAlgorithm = new RoundRobinLoadBalancingAlgorithm(
+ bindFactories);
final ConnectionFactory factory = Connections.newLoadBalancer(algorithm);
+ final ConnectionFactory bindFactory = Connections
+ .newLoadBalancer(bindAlgorithm);
// Create listener.
final LDAPListenerOptions options = new LDAPListenerOptions()
@@ -486,8 +634,8 @@
LDAPListener listener = null;
try
{
- listener = new LDAPListener(localAddress, localPort, new Proxy(factory),
- options);
+ listener = new LDAPListener(localAddress, localPort, new Proxy(factory,
+ bindFactory), options);
System.out.println("Press any key to stop the server...");
System.in.read();
}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
index 42c7f8c..47f6d88 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -173,10 +173,10 @@
if (!isOperational.get()
&& (pendingConnectFuture == null || pendingConnectFuture.isDone()))
{
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINEST))
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
- StaticUtils.DEBUG_LOG.finest(String
- .format("Attempting connect on factory " + this));
+ StaticUtils.DEBUG_LOG.fine(String
+ .format("Attempting reconnect to offline factory " + this));
}
pendingConnectFuture = factory.getAsynchronousConnection(this);
}
@@ -189,23 +189,29 @@
if (isOperational.getAndSet(false))
{
// Transition from online to offline.
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
+ {
+ StaticUtils.DEBUG_LOG.warning(String.format("Connection factory "
+ + factory + " is no longer operational: " + error.getMessage()));
+ }
+
synchronized (stateLock)
{
offlineFactoriesCount++;
if (offlineFactoriesCount == 1)
{
// Enable monitoring.
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String
+ .format("Starting monitoring thread"));
+ }
+
monitoringFuture = scheduler.scheduleWithFixedDelay(
new MonitorRunnable(), 0, monitoringInterval,
monitoringIntervalTimeUnit);
}
}
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format("Connection factory "
- + factory + " is no longer operational: " + error.getMessage()));
- }
}
}
@@ -216,21 +222,27 @@
if (!isOperational.getAndSet(true))
{
// Transition from offline to online.
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO))
+ {
+ StaticUtils.DEBUG_LOG.info(String.format("Connection factory "
+ + factory + " is now operational"));
+ }
+
synchronized (stateLock)
{
offlineFactoriesCount--;
if (offlineFactoriesCount == 0)
{
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String
+ .format("Stopping monitoring thread"));
+ }
+
monitoringFuture.cancel(false);
monitoringFuture = null;
}
}
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.fine(String.format("Connection factory "
- + factory + " is now operational"));
- }
}
}
}
@@ -278,14 +290,14 @@
/**
* Creates a new abstract load balancing algorithm which will monitor offline
- * connection factories every 10 seconds using the default scheduler.
+ * connection factories every second using the default scheduler.
*
* @param factories
* The connection factories.
*/
AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories)
{
- this(factories, 10, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
+ this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler());
}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
index a7b5f75..e9497da 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,8 +30,11 @@
import java.util.Collection;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -41,6 +44,7 @@
import com.sun.opends.sdk.util.AsynchronousFutureResult;
import com.sun.opends.sdk.util.CompletedFutureResult;
import com.sun.opends.sdk.util.StaticUtils;
+import com.sun.opends.sdk.util.Validator;
@@ -49,36 +53,101 @@
*/
final class ConnectionPool extends AbstractConnectionFactory
{
- // Future used for waiting for pooled connections to become available.
- private static final class FuturePooledConnection extends
- AsynchronousFutureResult<AsynchronousConnection>
+
+ /**
+ * This result handler is invoked when an attempt to add a new connection to
+ * the pool completes.
+ */
+ private final class ConnectionResultHandler implements
+ ResultHandler<AsynchronousConnection>
{
- private FuturePooledConnection(
- final ResultHandler<? super AsynchronousConnection> handler)
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleErrorResult(final ErrorResultException error)
{
- super(handler);
+ // Connection attempt failed, so decrease the pool size.
+ currentPoolSize.release();
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Connection attempt failed: " + error.getMessage()
+ + " currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ QueueElement holder;
+ synchronized (queue)
+ {
+ if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
+ {
+ // No waiting futures.
+ return;
+ }
+ else
+ {
+ holder = queue.removeFirst();
+ }
+ }
+
+ // There was waiting future, so close it.
+ holder.getWaitingFuture().handleErrorResult(error);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void handleResult(final AsynchronousConnection connection)
+ {
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG.fine(String.format(
+ "Connection attempt succeeded: "
+ + " currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
+ }
+
+ publishConnection(connection);
}
}
- private final class PooledConnectionWapper implements AsynchronousConnection,
- ConnectionEventListener
+ /**
+ * A pooled connection is passed to the client. It wraps an underlying
+ * "pooled" connection obtained from the underlying factory and lasts until
+ * the client application closes this connection. More specifically, pooled
+ * connections are not actually stored in the internal queue.
+ */
+ private final class PooledConnection implements AsynchronousConnection
{
+ // Connection event listeners registed against this pooled connection should
+ // have the same life time as the pooled connection.
+ private final List<ConnectionEventListener> listeners =
+ new CopyOnWriteArrayList<ConnectionEventListener>();
+
private final AsynchronousConnection connection;
- private volatile boolean isClosed;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private PooledConnectionWapper(final AsynchronousConnection connection)
+ PooledConnection(final AsynchronousConnection connection)
{
this.connection = connection;
- this.connection.addConnectionEventListener(this);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Void> abandon(final AbandonRequest request)
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
@@ -92,6 +161,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> add(final AddRequest request,
final ResultHandler<? super Result> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -106,6 +179,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> add(final AddRequest request,
final ResultHandler<? super Result> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -122,18 +199,28 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void addConnectionEventListener(
final ConnectionEventListener listener) throws IllegalStateException,
NullPointerException
{
+ Validator.ensureNotNull(listener);
if (isClosed())
{
throw new IllegalStateException();
}
+ listeners.add(listener);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<BindResult> bind(final BindRequest request,
final ResultHandler<? super BindResult> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -148,6 +235,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<BindResult> bind(final BindRequest request,
final ResultHandler<? super BindResult> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -164,47 +255,51 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void close()
{
- synchronized (pool)
+ if (!isClosed.compareAndSet(false, true))
{
- if (isClosed)
- {
- return;
- }
- isClosed = true;
+ // Already closed.
+ return;
+ }
- // Don't put invalid connections back in the pool.
- if (connection.isValid())
+ // Don't put invalid connections back in the pool.
+ if (connection.isValid())
+ {
+ publishConnection(connection);
+ }
+ else
+ {
+ // The connection may have been disconnected by the remote server, but
+ // the server may still be available. In order to avoid leaving pending
+ // futures hanging indefinitely, we should try to reconnect immediately.
+
+ // Close the dead connection.
+ connection.close();
+
+ // Try to get a new connection to replace it.
+ factory.getAsynchronousConnection(connectionResultHandler);
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
{
- releaseConnection(connection);
- return;
+ StaticUtils.DEBUG_LOG.warning(String.format(
+ "Connection no longer valid. "
+ + "currentPoolSize=%d, poolSize=%d",
+ poolSize - currentPoolSize.availablePermits(), poolSize));
}
}
-
- // Connection is no longer valid. Close outside of lock
- connection.removeConnectionEventListener(this);
- connection.close();
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Dead connection released and closed. "
- + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Reconnect attempt starting. "
- + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
- connectionFactory.getAsynchronousConnection(new ReconnectHandler());
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void close(final UnbindRequest request, final String reason)
throws NullPointerException
{
@@ -213,6 +308,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<CompareResult> compare(final CompareRequest request,
final ResultHandler<? super CompareResult> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -227,6 +326,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<CompareResult> compare(final CompareRequest request,
final ResultHandler<? super CompareResult> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -243,48 +346,10 @@
- public void handleConnectionClosed()
- {
- // Ignore - we intercept close via the close method.
- }
-
-
-
- public void handleConnectionError(final boolean isDisconnectNotification,
- final ErrorResultException error)
- {
- // Remove this connection from the pool if its in there. If not,
- // just ignore and wait for the user to close and we can deal with it
- // there.
- if (pool.remove(this))
- {
- numConnections--;
- connection.removeConnectionEventListener(this);
-
- // FIXME: should still close the connection, but we need to be
- // careful that users of the pooled connection get a sensible
- // error if they continue to use it (i.e. not an NPE or ISE).
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Connection error occured and removed from pool: "
- + error.getMessage()
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
- }
- }
-
-
-
- public void handleUnsolicitedNotification(final ExtendedResult notification)
- {
- // Ignore
- }
-
-
-
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> delete(final DeleteRequest request,
final ResultHandler<? super Result> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -299,6 +364,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> delete(final DeleteRequest request,
final ResultHandler<? super Result> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -315,6 +384,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public <R extends ExtendedResult> FutureResult<R> extendedRequest(
final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -329,6 +402,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public <R extends ExtendedResult> FutureResult<R> extendedRequest(
final ExtendedRequest<R> request,
final ResultHandler<? super R> resultHandler,
@@ -349,6 +426,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public Connection getSynchronousConnection()
{
return new SynchronousConnection(this);
@@ -359,20 +437,29 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isClosed()
{
- return isClosed;
+ return isClosed.get();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public boolean isValid()
{
- return !isClosed && connection.isValid();
+ return connection.isValid() && !isClosed();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> modify(final ModifyRequest request,
final ResultHandler<? super Result> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -387,6 +474,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> modify(final ModifyRequest request,
final ResultHandler<? super Result> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -403,6 +494,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> modifyDN(final ModifyDNRequest request,
final ResultHandler<? super Result> handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -417,6 +512,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> modifyDN(final ModifyDNRequest request,
final ResultHandler<? super Result> resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -436,6 +535,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<SearchResultEntry> readEntry(final DN name,
final Collection<String> attributeDescriptions,
final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -454,6 +554,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<RootDSE> readRootDSE(
final ResultHandler<? super RootDSE> handler)
throws UnsupportedOperationException, IllegalStateException
@@ -470,6 +571,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Schema> readSchema(final DN name,
final ResultHandler<? super Schema> handler)
throws UnsupportedOperationException, IllegalStateException
@@ -486,6 +588,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<Schema> readSchemaForEntry(final DN name,
final ResultHandler<? super Schema> handler)
throws UnsupportedOperationException, IllegalStateException
@@ -499,17 +602,27 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void removeConnectionEventListener(
final ConnectionEventListener listener) throws NullPointerException
{
+ Validator.ensureNotNull(listener);
if (isClosed())
{
throw new IllegalStateException();
}
+ listeners.remove(listener);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> search(final SearchRequest request,
final SearchResultHandler handler)
throws UnsupportedOperationException, IllegalStateException,
@@ -524,6 +637,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public FutureResult<Result> search(final SearchRequest request,
final SearchResultHandler resultHandler,
final IntermediateResponseHandler intermediateResponseHandler)
@@ -543,6 +660,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public FutureResult<SearchResultEntry> searchSingleEntry(
final SearchRequest request,
final ResultHandler<? super SearchResultEntry> resultHandler)
@@ -561,9 +679,10 @@
/**
* {@inheritDoc}
*/
+ @Override
public String toString()
{
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("PooledConnection(");
builder.append(connection);
builder.append(')');
@@ -573,63 +692,86 @@
- private class ReconnectHandler implements
- ResultHandler<AsynchronousConnection>
+ /**
+ * A queue element is either a pending connection request future awaiting an
+ * {@code AsynchronousConnection} or it is an unused
+ * {@code AsynchronousConnection} awaiting a connection request.
+ */
+ private static final class QueueElement
{
- public void handleErrorResult(final ErrorResultException error)
- {
- // The reconnect failed. Fail the connect attempt.
- numConnections--;
- // The reconnect failed. The underlying connection factory
- // probably went
- // down. Just fail all pending futures
- synchronized (pool)
- {
- while (!pendingFutures.isEmpty())
- {
- pendingFutures.poll().handleErrorResult(error);
- }
- }
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
- {
- StaticUtils.DEBUG_LOG.warning(String.format(
- "Reconnect failed. Failed all pending futures: "
- + error.getMessage()
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
+ private final Object value;
+
+
+ QueueElement(final AsynchronousConnection connection)
+ {
+ this.value = connection;
}
- public void handleResult(final AsynchronousConnection connection)
+ QueueElement(final ResultHandler<? super AsynchronousConnection> handler)
{
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler);
+ }
+
+
+
+ AsynchronousConnection getWaitingConnection()
+ {
+ if (value instanceof AsynchronousConnection)
{
- StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. "
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
+ return (AsynchronousConnection) value;
}
- synchronized (pool)
+ else
{
- releaseConnection(connection);
+ throw new IllegalStateException();
}
}
+
+
+
+ @SuppressWarnings("unchecked")
+ AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture()
+ {
+ if (value instanceof AsynchronousFutureResult)
+ {
+ return (AsynchronousFutureResult<AsynchronousConnection>) value;
+ }
+ else
+ {
+ throw new IllegalStateException();
+ }
+ }
+
+
+
+ boolean isWaitingFuture()
+ {
+ return value instanceof AsynchronousFutureResult;
+ }
+
+
+
+ public String toString()
+ {
+ return String.valueOf(value);
+ }
}
- private final ConnectionFactory connectionFactory;
+ // Guarded by queue.
+ private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>();
- private volatile int numConnections;
+ private final ConnectionFactory factory;
private final int poolSize;
- // FIXME: should use a better collection than this - CLQ?
- private final Queue<AsynchronousConnection> pool;
+ private final Semaphore currentPoolSize;
- private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
+ private final ResultHandler<AsynchronousConnection> connectionResultHandler =
+ new ConnectionResultHandler();
@@ -637,109 +779,16 @@
* Creates a new connection pool which will maintain {@code poolSize}
* connections created using the provided connection factory.
*
- * @param connectionFactory
+ * @param factory
* The connection factory to use for creating new connections.
* @param poolSize
* The maximum size of the connection pool.
*/
- ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize)
+ ConnectionPool(final ConnectionFactory factory, final int poolSize)
{
- this.connectionFactory = connectionFactory;
+ this.factory = factory;
this.poolSize = poolSize;
- this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
- this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
- }
-
-
-
- @Override
- public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(
- final ResultHandler<? super AsynchronousConnection> handler)
- {
- // This entire method is synchronized to ensure new connects are
- // done synchronously to avoid the "pending connect" case.
- AsynchronousConnection conn;
- synchronized (pool)
- {
- // Check to see if we have a connection in the pool
- conn = pool.poll();
- if (conn == null)
- {
- // Pool was empty. Maybe a new connection if pool size is not
- // reached
- if (numConnections >= poolSize)
- {
- // We reached max # of conns so wait for a connection to
- // become available.
- final FuturePooledConnection future = new FuturePooledConnection(
- handler);
- pendingFutures.add(future);
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "No connections available. Wait-listed"
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
-
- return future;
- }
- }
- }
-
- if (conn == null)
- {
- try
- {
- // We can create a new connection.
- conn = connectionFactory.getAsynchronousConnection(null).get();
- numConnections++;
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "New connection established and aquired. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
- }
- catch (final ErrorResultException e)
- {
- if (handler != null)
- {
- handler.handleErrorResult(e);
- }
- return new CompletedFutureResult<AsynchronousConnection>(e);
- }
- catch (final InterruptedException e)
- {
- final ErrorResultException error = new ErrorResultException(Responses
- .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
- if (handler != null)
- {
- handler.handleErrorResult(error);
- }
- return new CompletedFutureResult<AsynchronousConnection>(error);
- }
- }
- else
- {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "Connection aquired from pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures.size()));
- }
- }
-
- final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- conn);
- if (handler != null)
- {
- handler.handleResult(pooledConnection);
- }
- return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+ this.currentPoolSize = new Semaphore(poolSize);
}
@@ -747,11 +796,59 @@
/**
* {@inheritDoc}
*/
+ @Override
+ public FutureResult<AsynchronousConnection> getAsynchronousConnection(
+ final ResultHandler<? super AsynchronousConnection> handler)
+ {
+ QueueElement holder;
+ synchronized (queue)
+ {
+ if (queue.isEmpty() || queue.getFirst().isWaitingFuture())
+ {
+ holder = new QueueElement(handler);
+ queue.add(holder);
+ }
+ else
+ {
+ holder = queue.removeFirst();
+ }
+ }
+
+ if (!holder.isWaitingFuture())
+ {
+ // There was a completed connection attempt.
+ final AsynchronousConnection connection = holder.getWaitingConnection();
+ final PooledConnection pooledConnection = new PooledConnection(connection);
+ if (handler != null)
+ {
+ handler.handleResult(pooledConnection);
+ }
+ return new CompletedFutureResult<AsynchronousConnection>(pooledConnection);
+ }
+ else
+ {
+ // Grow the pool if needed.
+ final FutureResult<AsynchronousConnection> future = holder
+ .getWaitingFuture();
+ if (!future.isDone() && currentPoolSize.tryAcquire())
+ {
+ factory.getAsynchronousConnection(connectionResultHandler);
+ }
+ return future;
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public String toString()
{
final StringBuilder builder = new StringBuilder();
builder.append("ConnectionPool(");
- builder.append(String.valueOf(connectionFactory));
+ builder.append(String.valueOf(factory));
builder.append(',');
builder.append(poolSize);
builder.append(')');
@@ -760,47 +857,25 @@
- private void releaseConnection(final AsynchronousConnection connection)
+ private void publishConnection(final AsynchronousConnection connection)
{
- // See if there waiters pending.
- for (;;)
+ QueueElement holder;
+ synchronized (queue)
{
- final PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
- final FuturePooledConnection future = pendingFutures.poll();
-
- if (future == null)
+ if (queue.isEmpty() || !queue.getFirst().isWaitingFuture())
{
- // No waiters - so drop out and add connection to pool.
- break;
- }
-
- future.handleResult(pooledConnection);
-
- if (!future.isCancelled())
- {
- // The future was not cancelled and the connection was
- // accepted.
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "Connection released and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections, pool.size(),
- pendingFutures.size()));
- }
+ holder = new QueueElement(connection);
+ queue.add(holder);
return;
}
+ else
+ {
+ holder = queue.removeFirst();
+ }
}
- // No waiters. Put back in pool.
- pool.offer(connection);
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG.finest(String.format(
- "Connection released to pool. numConnections: %d, "
- + "poolSize: %d, pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
- }
+ // There was waiting future, so close it.
+ final PooledConnection pooledConnection = new PooledConnection(connection);
+ holder.getWaitingFuture().handleResult(pooledConnection);
}
}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
index 0fed20d..11f21d4 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
@@ -70,7 +70,7 @@
/**
* Creates a new fail-over load balancing algorithm which will monitor offline
- * connection factories every 10 seconds using the default scheduler.
+ * connection factories every 1 second using the default scheduler.
*
* @param factories
* The ordered collection of connection factories.
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java
index 2968565..24c8559 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java
@@ -69,7 +69,7 @@
/**
* Creates a new round robin load balancing algorithm which will monitor
- * offline connection factories every 10 seconds using the default scheduler.
+ * offline connection factories every 1 second using the default scheduler.
*
* @param factories
* The ordered collection of connection factories.
diff --git a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java
index e6195c8..e89e534 100644
--- a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java
+++ b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java
@@ -36,14 +36,18 @@
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
import org.opends.sdk.requests.DigestMD5SASLBindRequest;
import org.opends.sdk.requests.Requests;
import org.opends.sdk.requests.SearchRequest;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.annotations.DataProvider;
+import com.sun.opends.sdk.util.StaticUtils;
+
import javax.net.ssl.SSLContext;
@@ -101,8 +105,30 @@
+ /**
+ * Disables logging before the tests.
+ */
+ @BeforeClass()
+ public void disableLogging()
+ {
+ StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE);
+ }
+
+
+
+ /**
+ * Re-enable logging after the tests.
+ */
+ @AfterClass()
+ public void enableLogging()
+ {
+ StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
+ }
+
+
+
@DataProvider(name = "connectionFactories")
- public Object[][] getConnectyionFactories() throws Exception
+ public Object[][] getConnectionFactories() throws Exception
{
Object[][] factories = new Object[21][1];
diff --git a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java
index 2b58e3b..1cefb8b 100644
--- a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java
+++ b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java
@@ -32,13 +32,17 @@
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.sun.opends.sdk.util.StaticUtils;
+
/**
@@ -55,16 +59,18 @@
- public void handleUnsolicitedNotification(ExtendedResult notification)
+ @Override
+ public void handleConnectionClosed()
{
- errorMessage = "Unexpected call to handleUnsolicitedNotification";
+ errorMessage = "Unexpected call to handleConnectionClosed";
closeLatch.countDown();
}
- public void handleConnectionError(boolean isDisconnectNotification,
- ErrorResultException error)
+ @Override
+ public void handleConnectionError(final boolean isDisconnectNotification,
+ final ErrorResultException error)
{
errorMessage = "Unexpected call to handleConnectionError";
closeLatch.countDown();
@@ -72,9 +78,10 @@
- public void handleConnectionClosed()
+ @Override
+ public void handleUnsolicitedNotification(final ExtendedResult notification)
{
- errorMessage = "Unexpected call to handleConnectionClosed";
+ errorMessage = "Unexpected call to handleUnsolicitedNotification";
closeLatch.countDown();
}
}
@@ -85,7 +92,7 @@
ServerConnection<Integer>
{
volatile LDAPClientContext context = null;
- volatile boolean isConnected = false;
+ final CountDownLatch isConnected = new CountDownLatch(1);
final CountDownLatch isClosed = new CountDownLatch(1);
@@ -294,7 +301,7 @@
final LDAPClientContext clientContext) throws ErrorResultException
{
serverConnection.context = clientContext;
- serverConnection.isConnected = true;
+ serverConnection.isConnected.countDown();
return serverConnection;
}
}
@@ -302,6 +309,233 @@
/**
+ * Disables logging before the tests.
+ */
+ @BeforeClass()
+ public void disableLogging()
+ {
+ StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE);
+ }
+
+
+
+ /**
+ * Re-enable logging after the tests.
+ */
+ @AfterClass()
+ public void enableLogging()
+ {
+ StaticUtils.DEBUG_LOG.setLevel(Level.INFO);
+ }
+
+
+
+ /**
+ * Tests connection event listener.
+ *
+ * @throws Exception
+ * If an unexpected error occurred.
+ */
+ @Test
+ public void testConnectionEventListenerClose() throws Exception
+ {
+ final MockServerConnection onlineServerConnection = new MockServerConnection();
+ final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
+ onlineServerConnection);
+ final LDAPListener onlineServerListener = new LDAPListener("localhost",
+ TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
+
+ final Connection connection;
+ try
+ {
+ // Connect and bind.
+ connection = new LDAPConnectionFactory(
+ onlineServerListener.getSocketAddress()).getConnection();
+
+ final MockConnectionEventListener listener = new MockConnectionEventListener()
+ {
+
+ @Override
+ public void handleConnectionClosed()
+ {
+ closeLatch.countDown();
+ }
+ };
+
+ connection.addConnectionEventListener(listener);
+ Assert.assertEquals(listener.closeLatch.getCount(), 1);
+ connection.close();
+ listener.closeLatch.await();
+ Assert.assertNull(listener.errorMessage);
+ }
+ finally
+ {
+ onlineServerListener.close();
+ }
+ }
+
+
+
+ /**
+ * Tests connection event listener.
+ *
+ * @throws Exception
+ * If an unexpected error occurred.
+ */
+ @Test
+ public void testConnectionEventListenerDisconnect() throws Exception
+ {
+ final MockServerConnection onlineServerConnection = new MockServerConnection();
+ final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
+ onlineServerConnection);
+ final LDAPListener onlineServerListener = new LDAPListener("localhost",
+ TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
+
+ final Connection connection;
+ try
+ {
+ // Connect and bind.
+ connection = new LDAPConnectionFactory(
+ onlineServerListener.getSocketAddress()).getConnection();
+
+ final MockConnectionEventListener listener = new MockConnectionEventListener()
+ {
+
+ @Override
+ public void handleConnectionError(
+ final boolean isDisconnectNotification,
+ final ErrorResultException error)
+ {
+ if (isDisconnectNotification)
+ {
+ errorMessage = "Unexpected disconnect notification";
+ }
+ closeLatch.countDown();
+ }
+ };
+
+ connection.addConnectionEventListener(listener);
+ Assert.assertEquals(listener.closeLatch.getCount(), 1);
+ Assert.assertTrue(onlineServerConnection.isConnected
+ .await(10, TimeUnit.SECONDS));
+ onlineServerConnection.context.disconnect();
+ listener.closeLatch.await();
+ Assert.assertNull(listener.errorMessage);
+ connection.close();
+ }
+ finally
+ {
+ onlineServerListener.close();
+ }
+ }
+
+
+
+ /**
+ * Tests connection event listener.
+ *
+ * @throws Exception
+ * If an unexpected error occurred.
+ */
+ @Test
+ public void testConnectionEventListenerDisconnectNotification()
+ throws Exception
+ {
+ final MockServerConnection onlineServerConnection = new MockServerConnection();
+ final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
+ onlineServerConnection);
+ final LDAPListener onlineServerListener = new LDAPListener("localhost",
+ TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
+
+ final Connection connection;
+ try
+ {
+ // Connect and bind.
+ connection = new LDAPConnectionFactory(
+ onlineServerListener.getSocketAddress()).getConnection();
+
+ final MockConnectionEventListener listener = new MockConnectionEventListener()
+ {
+
+ @Override
+ public void handleConnectionError(
+ final boolean isDisconnectNotification,
+ final ErrorResultException error)
+ {
+ if (!isDisconnectNotification
+ || !error.getResult().getResultCode().equals(ResultCode.BUSY)
+ || !error.getResult().getDiagnosticMessage().equals("test"))
+ {
+ errorMessage = "Missing disconnect notification: " + error;
+ }
+ closeLatch.countDown();
+ }
+ };
+
+ connection.addConnectionEventListener(listener);
+ Assert.assertEquals(listener.closeLatch.getCount(), 1);
+ Assert.assertTrue(onlineServerConnection.isConnected
+ .await(10, TimeUnit.SECONDS));
+ onlineServerConnection.context.disconnect(ResultCode.BUSY, "test");
+ listener.closeLatch.await();
+ Assert.assertNull(listener.errorMessage);
+ connection.close();
+ }
+ finally
+ {
+ onlineServerListener.close();
+ }
+ }
+
+
+
+ /**
+ * Tests connection event listener.
+ *
+ * @throws Exception
+ * If an unexpected error occurred.
+ */
+ @Test
+ public void testConnectionEventListenerUnbind() throws Exception
+ {
+ final MockServerConnection onlineServerConnection = new MockServerConnection();
+ final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
+ onlineServerConnection);
+ final LDAPListener onlineServerListener = new LDAPListener("localhost",
+ TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
+
+ final Connection connection;
+ try
+ {
+ // Connect and bind.
+ connection = new LDAPConnectionFactory(
+ onlineServerListener.getSocketAddress()).getConnection();
+
+ final MockConnectionEventListener listener = new MockConnectionEventListener()
+ {
+
+ @Override
+ public void handleConnectionClosed()
+ {
+ closeLatch.countDown();
+ }
+ };
+
+ connection.addConnectionEventListener(listener);
+ Assert.assertEquals(listener.closeLatch.getCount(), 1);
+ connection.close(Requests.newUnbindRequest(), "called from unit test");
+ listener.closeLatch.await();
+ Assert.assertNull(listener.errorMessage);
+ }
+ finally
+ {
+ onlineServerListener.close();
+ }
+ }
+
+
+
+ /**
* Tests basic LDAP listener functionality.
*
* @throws Exception
@@ -318,10 +552,15 @@
try
{
// Connect and close.
- new LDAPConnectionFactory(listener.getSocketAddress()).getConnection()
- .close();
+ final Connection connection = new LDAPConnectionFactory(
+ listener.getSocketAddress()).getConnection();
- Assert.assertTrue(serverConnection.isConnected);
+ Assert.assertTrue(serverConnection.isConnected
+ .await(10, TimeUnit.SECONDS));
+ Assert.assertEquals(serverConnection.isClosed.getCount(), 1);
+
+ connection.close();
+
Assert.assertTrue(serverConnection.isClosed.await(10, TimeUnit.SECONDS));
}
finally
@@ -401,14 +640,18 @@
try
{
// Connect and close.
- new LDAPConnectionFactory(proxyListener.getSocketAddress())
- .getConnection().close();
+ final Connection connection = new LDAPConnectionFactory(
+ proxyListener.getSocketAddress()).getConnection();
+
+ Assert.assertTrue(proxyServerConnection.isConnected.await(10,
+ TimeUnit.SECONDS));
+ Assert.assertTrue(onlineServerConnection.isConnected.await(10,
+ TimeUnit.SECONDS));
// Wait for connect/close to complete.
- proxyServerConnection.isClosed.await();
+ connection.close();
- Assert.assertTrue(proxyServerConnection.isConnected);
- Assert.assertTrue(onlineServerConnection.isConnected);
+ proxyServerConnection.isClosed.await();
}
finally
{
@@ -506,6 +749,11 @@
try
{
connection.bind("cn=test", "password");
+
+ Assert.assertTrue(proxyServerConnection.isConnected.await(10,
+ TimeUnit.SECONDS));
+ Assert.assertTrue(onlineServerConnection.isConnected.await(10,
+ TimeUnit.SECONDS));
}
finally
{
@@ -514,9 +762,6 @@
// Wait for connect/close to complete.
proxyServerConnection.isClosed.await();
-
- Assert.assertTrue(proxyServerConnection.isConnected);
- Assert.assertTrue(onlineServerConnection.isConnected);
}
finally
{
@@ -602,14 +847,18 @@
try
{
// Connect and close.
- new LDAPConnectionFactory(proxyListener.getSocketAddress())
- .getConnection().close();
+ final Connection connection = new LDAPConnectionFactory(
+ proxyListener.getSocketAddress()).getConnection();
+
+ Assert.assertTrue(proxyServerConnection.isConnected.await(10,
+ TimeUnit.SECONDS));
+ Assert.assertTrue(onlineServerConnection.isConnected.await(10,
+ TimeUnit.SECONDS));
+
+ connection.close();
// Wait for connect/close to complete.
proxyServerConnection.isClosed.await();
-
- Assert.assertTrue(proxyServerConnection.isConnected);
- Assert.assertTrue(onlineServerConnection.isConnected);
}
finally
{
@@ -713,6 +962,11 @@
try
{
connection.bind("cn=test", "password");
+
+ Assert.assertTrue(proxyServerConnection.isConnected.await(10,
+ TimeUnit.SECONDS));
+ Assert.assertTrue(onlineServerConnection.isConnected.await(10,
+ TimeUnit.SECONDS));
}
finally
{
@@ -721,9 +975,6 @@
// Wait for connect/close to complete.
proxyServerConnection.isClosed.await();
-
- Assert.assertTrue(proxyServerConnection.isConnected);
- Assert.assertTrue(onlineServerConnection.isConnected);
}
finally
{
@@ -763,7 +1014,7 @@
{
connection.bind("cn=test", "password");
}
- catch (ErrorResultException e)
+ catch (final ErrorResultException e)
{
connection.close();
throw e;
@@ -785,7 +1036,7 @@
Assert
.fail("Connection attempt to closed listener succeeded unexpectedly");
}
- catch (ConnectionException e)
+ catch (final ConnectionException e)
{
// Expected.
}
@@ -795,7 +1046,7 @@
connection.bind("cn=test", "password");
Assert.fail("Bind attempt on closed connection succeeded unexpectedly");
}
- catch (ErrorResultException e)
+ catch (final ErrorResultException e)
{
// Expected.
Assert.assertFalse(connection.isValid());
@@ -808,199 +1059,4 @@
Assert.assertTrue(connection.isClosed());
}
}
-
-
-
- /**
- * Tests connection event listener.
- *
- * @throws Exception
- * If an unexpected error occurred.
- */
- @Test
- public void testConnectionEventListenerClose() throws Exception
- {
- final MockServerConnection onlineServerConnection = new MockServerConnection();
- final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
- onlineServerConnection);
- final LDAPListener onlineServerListener = new LDAPListener("localhost",
- TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
-
- final Connection connection;
- try
- {
- // Connect and bind.
- connection = new LDAPConnectionFactory(
- onlineServerListener.getSocketAddress()).getConnection();
-
- MockConnectionEventListener listener = new MockConnectionEventListener()
- {
-
- public void handleConnectionClosed()
- {
- closeLatch.countDown();
- }
- };
-
- connection.addConnectionEventListener(listener);
- Assert.assertEquals(listener.closeLatch.getCount(), 1);
- connection.close();
- listener.closeLatch.await();
- Assert.assertNull(listener.errorMessage);
- }
- finally
- {
- onlineServerListener.close();
- }
- }
-
-
-
- /**
- * Tests connection event listener.
- *
- * @throws Exception
- * If an unexpected error occurred.
- */
- @Test
- public void testConnectionEventListenerUnbind() throws Exception
- {
- final MockServerConnection onlineServerConnection = new MockServerConnection();
- final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
- onlineServerConnection);
- final LDAPListener onlineServerListener = new LDAPListener("localhost",
- TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
-
- final Connection connection;
- try
- {
- // Connect and bind.
- connection = new LDAPConnectionFactory(
- onlineServerListener.getSocketAddress()).getConnection();
-
- MockConnectionEventListener listener = new MockConnectionEventListener()
- {
-
- public void handleConnectionClosed()
- {
- closeLatch.countDown();
- }
- };
-
- connection.addConnectionEventListener(listener);
- Assert.assertEquals(listener.closeLatch.getCount(), 1);
- connection.close(Requests.newUnbindRequest(), "called from unit test");
- listener.closeLatch.await();
- Assert.assertNull(listener.errorMessage);
- }
- finally
- {
- onlineServerListener.close();
- }
- }
-
-
-
- /**
- * Tests connection event listener.
- *
- * @throws Exception
- * If an unexpected error occurred.
- */
- @Test
- public void testConnectionEventListenerDisconnect() throws Exception
- {
- final MockServerConnection onlineServerConnection = new MockServerConnection();
- final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
- onlineServerConnection);
- final LDAPListener onlineServerListener = new LDAPListener("localhost",
- TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
-
- final Connection connection;
- try
- {
- // Connect and bind.
- connection = new LDAPConnectionFactory(
- onlineServerListener.getSocketAddress()).getConnection();
-
- MockConnectionEventListener listener = new MockConnectionEventListener()
- {
-
- public void handleConnectionError(boolean isDisconnectNotification,
- ErrorResultException error)
- {
- if (isDisconnectNotification)
- {
- errorMessage = "Unexpected disconnect notification";
- }
- closeLatch.countDown();
- }
- };
-
- connection.addConnectionEventListener(listener);
- Assert.assertEquals(listener.closeLatch.getCount(), 1);
- onlineServerConnection.context.disconnect();
- listener.closeLatch.await();
- Assert.assertNull(listener.errorMessage);
- connection.close();
- }
- finally
- {
- onlineServerListener.close();
- }
- }
-
-
-
- /**
- * Tests connection event listener.
- *
- * @throws Exception
- * If an unexpected error occurred.
- */
- @Test
- public void testConnectionEventListenerDisconnectNotification()
- throws Exception
- {
- final MockServerConnection onlineServerConnection = new MockServerConnection();
- final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory(
- onlineServerConnection);
- final LDAPListener onlineServerListener = new LDAPListener("localhost",
- TestCaseUtils.findFreePort(), onlineServerConnectionFactory);
-
- final Connection connection;
- try
- {
- // Connect and bind.
- connection = new LDAPConnectionFactory(
- onlineServerListener.getSocketAddress()).getConnection();
-
- MockConnectionEventListener listener = new MockConnectionEventListener()
- {
-
- public void handleConnectionError(boolean isDisconnectNotification,
- ErrorResultException error)
- {
- if (!isDisconnectNotification
- || !error.getResult().getResultCode().equals(ResultCode.BUSY)
- || !error.getResult().getDiagnosticMessage().equals("test"))
- {
- errorMessage = "Missing disconnect notification: " + error;
- }
- closeLatch.countDown();
- }
- };
-
- connection.addConnectionEventListener(listener);
- Assert.assertEquals(listener.closeLatch.getCount(), 1);
- onlineServerConnection.context.disconnect(ResultCode.BUSY, "test");
- listener.closeLatch.await();
- Assert.assertNull(listener.errorMessage);
- connection.close();
- }
- finally
- {
- onlineServerListener.close();
- }
- }
}
diff --git a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java
index 99a903f..9847b93 100644
--- a/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java
+++ b/opendj-sdk/sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2009 Sun Microsystems, Inc.
+ * Copyright 2009-2010 Sun Microsystems, Inc.
*/
package org.opends.sdk;
@@ -55,8 +55,7 @@
//
// This could be a problem if a subclass references a @DataProvider in
// a super-class that provides static parameters, i.e. the parameters
- // are
- // not regenerated for each invocation of the DataProvider.
+ // are not regenerated for each invocation of the DataProvider.
//
/**
--
Gitblit v1.10.0