sdk/examples/org/opends/sdk/examples/server/proxy/Main.java
@@ -36,11 +36,9 @@ import java.util.List; import org.opends.sdk.*; import org.opends.sdk.controls.ProxiedAuthV2RequestControl; import org.opends.sdk.requests.*; import org.opends.sdk.responses.BindResult; import org.opends.sdk.responses.CompareResult; import org.opends.sdk.responses.ExtendedResult; import org.opends.sdk.responses.Result; import org.opends.sdk.responses.*; @@ -73,35 +71,125 @@ ServerConnection<Integer> { private volatile AsynchronousConnection connection = null; private volatile boolean isUnbindRequired = false; private abstract class AbstractRequestCompletionHandler <R extends Result, H extends ResultHandler<? super R>> implements ResultHandler<R> { final H resultHandler; final AsynchronousConnection connection; private AsynchronousConnection getConnection() throws ErrorResultException AbstractRequestCompletionHandler( final AsynchronousConnection connection, final H resultHandler) { if (connection == null) { synchronized (this) { if (connection == null) { try { connection = factory.getAsynchronousConnection(null).get(); this.connection = connection; this.resultHandler = resultHandler; } catch (InterruptedException e) @Override public final void handleErrorResult(final ErrorResultException error) { throw new RuntimeException(e); connection.close(); resultHandler.handleErrorResult(error); } @Override public final void handleResult(final R result) { connection.close(); resultHandler.handleResult(result); } } private abstract class ConnectionCompletionHandler<R extends Result> implements ResultHandler<AsynchronousConnection> { private final ResultHandler<? super R> resultHandler; ConnectionCompletionHandler(final ResultHandler<? super R> resultHandler) { this.resultHandler = resultHandler; } @Override public final void handleErrorResult(final ErrorResultException error) { resultHandler.handleErrorResult(error); } @Override public abstract void handleResult(AsynchronousConnection connection); } private final class RequestCompletionHandler<R extends Result> extends AbstractRequestCompletionHandler<R, ResultHandler<? super R>> { RequestCompletionHandler(final AsynchronousConnection connection, final ResultHandler<? super R> resultHandler) { super(connection, resultHandler); } } private final class SearchRequestCompletionHandler extends AbstractRequestCompletionHandler<Result, SearchResultHandler> implements SearchResultHandler { SearchRequestCompletionHandler(final AsynchronousConnection connection, final SearchResultHandler resultHandler) { super(connection, resultHandler); } /** * {@inheritDoc} */ @Override public final boolean handleEntry(final SearchResultEntry entry) { return resultHandler.handleEntry(entry); } return connection; /** * {@inheritDoc} */ @Override public final boolean handleReference( final SearchResultReference reference) { return resultHandler.handleReference(reference); } } private volatile ProxiedAuthV2RequestControl proxiedAuthControl = null; private ServerConnectionImpl(final LDAPClientContext clientContext) @@ -133,15 +221,22 @@ final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException { try addProxiedAuthControl(request); final ConnectionCompletionHandler<Result> outerHandler = new ConnectionCompletionHandler<Result>(resultHandler) { getConnection().add(request, resultHandler, intermediateResponseHandler); } catch (ErrorResultException e) @Override public void handleResult(final AsynchronousConnection connection) { resultHandler.handleErrorResult(e); final RequestCompletionHandler<Result> innerHandler = new RequestCompletionHandler<Result>(connection, resultHandler); connection.add(request, innerHandler, intermediateResponseHandler); } }; factory.getAsynchronousConnection(outerHandler); } @@ -156,6 +251,7 @@ final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException { if (request.getAuthenticationType() != ((byte) 0x80)) { // TODO: SASL authentication not implemented. @@ -166,20 +262,45 @@ } else { // Note that this connection has received a bind request: the // connection should be reverted back to anonymous when the client // unbinds. isUnbindRequired = true; try // Authenticate using a separate bind connection pool, because we // don't want to change the state of the pooled connection. final ConnectionCompletionHandler<BindResult> outerHandler = new ConnectionCompletionHandler<BindResult>(resultHandler) { getConnection().bind(request, resultHandler, @Override public void handleResult(final AsynchronousConnection connection) { final ResultHandler<BindResult> innerHandler = new ResultHandler<BindResult>() { @Override public final void handleErrorResult( final ErrorResultException error) { connection.close(); resultHandler.handleErrorResult(error); } @Override public final void handleResult(final BindResult result) { connection.close(); proxiedAuthControl = ProxiedAuthV2RequestControl .newControl("dn:" + request.getName()); resultHandler.handleResult(result); } }; connection.bind(request, innerHandler, intermediateResponseHandler); } catch (ErrorResultException e) { resultHandler.handleErrorResult(e); } }; proxiedAuthControl = null; bindFactory.getAsynchronousConnection(outerHandler); } } @@ -195,15 +316,23 @@ final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException { try addProxiedAuthControl(request); final ConnectionCompletionHandler<CompareResult> outerHandler = new ConnectionCompletionHandler<CompareResult>(resultHandler) { getConnection().compare(request, resultHandler, @Override public void handleResult(final AsynchronousConnection connection) { final RequestCompletionHandler<CompareResult> innerHandler = new RequestCompletionHandler<CompareResult>(connection, resultHandler); connection.compare(request, innerHandler, intermediateResponseHandler); } catch (ErrorResultException e) { resultHandler.handleErrorResult(e); } }; factory.getAsynchronousConnection(outerHandler); } @@ -215,8 +344,7 @@ public void handleConnectionClosed(final Integer requestContext, final UnbindRequest request) { // Client connection closed: release the proxy connection. close(); // Client connection closed. } @@ -228,8 +356,7 @@ public void handleConnectionDisconnected(final ResultCode resultCode, final String message) { // Client disconnected by server: release the proxy connection. close(); // Client disconnected by server. } @@ -240,8 +367,7 @@ @Override public void handleConnectionError(final Throwable error) { // Client connection failed: release the proxy connection. close(); // Client connection failed. } @@ -256,15 +382,23 @@ final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException { try addProxiedAuthControl(request); final ConnectionCompletionHandler<Result> outerHandler = new ConnectionCompletionHandler<Result>(resultHandler) { getConnection().delete(request, resultHandler, @Override public void handleResult(final AsynchronousConnection connection) { final RequestCompletionHandler<Result> innerHandler = new RequestCompletionHandler<Result>(connection, resultHandler); connection.delete(request, innerHandler, intermediateResponseHandler); } catch (ErrorResultException e) { resultHandler.handleErrorResult(e); } }; factory.getAsynchronousConnection(outerHandler); } @@ -296,15 +430,24 @@ else { // Forward all other extended operations. try addProxiedAuthControl(request); final ConnectionCompletionHandler<R> outerHandler = new ConnectionCompletionHandler<R>(resultHandler) { getConnection().extendedRequest(request, resultHandler, @Override public void handleResult(final AsynchronousConnection connection) { final RequestCompletionHandler<R> innerHandler = new RequestCompletionHandler<R>(connection, resultHandler); connection.extendedRequest(request, innerHandler, intermediateResponseHandler); } catch (ErrorResultException e) { resultHandler.handleErrorResult(e); } }; factory.getAsynchronousConnection(outerHandler); } } @@ -320,15 +463,23 @@ final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException { try addProxiedAuthControl(request); final ConnectionCompletionHandler<Result> outerHandler = new ConnectionCompletionHandler<Result>(resultHandler) { getConnection().modify(request, resultHandler, @Override public void handleResult(final AsynchronousConnection connection) { final RequestCompletionHandler<Result> innerHandler = new RequestCompletionHandler<Result>(connection, resultHandler); connection.modify(request, innerHandler, intermediateResponseHandler); } catch (ErrorResultException e) { resultHandler.handleErrorResult(e); } }; factory.getAsynchronousConnection(outerHandler); } @@ -343,15 +494,23 @@ final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException { try addProxiedAuthControl(request); final ConnectionCompletionHandler<Result> outerHandler = new ConnectionCompletionHandler<Result>(resultHandler) { getConnection().modifyDN(request, resultHandler, @Override public void handleResult(final AsynchronousConnection connection) { final RequestCompletionHandler<Result> innerHandler = new RequestCompletionHandler<Result>(connection, resultHandler); connection.modifyDN(request, innerHandler, intermediateResponseHandler); } catch (ErrorResultException e) { resultHandler.handleErrorResult(e); } }; factory.getAsynchronousConnection(outerHandler); } @@ -365,54 +524,33 @@ final IntermediateResponseHandler intermediateResponseHandler) throws UnsupportedOperationException { try addProxiedAuthControl(request); final ConnectionCompletionHandler<Result> outerHandler = new ConnectionCompletionHandler<Result>(resultHandler) { getConnection().search(request, resultHandler, @Override public void handleResult(final AsynchronousConnection connection) { final SearchRequestCompletionHandler innerHandler = new SearchRequestCompletionHandler(connection, resultHandler); connection.search(request, innerHandler, intermediateResponseHandler); } catch (ErrorResultException e) { resultHandler.handleErrorResult(e); } }; factory.getAsynchronousConnection(outerHandler); } private void close() private void addProxiedAuthControl(final Request request) { if (isUnbindRequired) final ProxiedAuthV2RequestControl control = proxiedAuthControl; if (control != null) { synchronized (this) { if (connection != null) { connection.bind(Requests.newSimpleBindRequest(), new ResultHandler<Result>() { public void handleErrorResult(ErrorResultException error) { // The rebind failed - this is bad because if the // connection is pooled it will remain authenticated as // the wrong user. handleResult(error.getResult()); } public void handleResult(Result result) { synchronized (ServerConnectionImpl.this) { if (connection != null) { connection.close(); } } } }); } } request.addControl(control); } } @@ -421,12 +559,15 @@ private final ConnectionFactory factory; private final ConnectionFactory bindFactory; private Proxy(final ConnectionFactory factory) private Proxy(final ConnectionFactory factory, final ConnectionFactory bindFactory) { this.factory = factory; this.bindFactory = bindFactory; } @@ -467,18 +608,25 @@ // Create load balancer. final List<ConnectionFactory> factories = new LinkedList<ConnectionFactory>(); final List<ConnectionFactory> bindFactories = new LinkedList<ConnectionFactory>(); for (int i = 2; i < args.length; i += 2) { final String remoteAddress = args[i]; final int remotePort = Integer.parseInt(args[i + 1]); // factories.add(Connections.newConnectionPool(new LDAPConnectionFactory( // remoteAddress, remotePort), Integer.MAX_VALUE)); factories.add(new LDAPConnectionFactory(remoteAddress, remotePort)); factories.add(Connections.newConnectionPool(new LDAPConnectionFactory( remoteAddress, remotePort), Integer.MAX_VALUE)); bindFactories.add(Connections.newConnectionPool( new LDAPConnectionFactory(remoteAddress, remotePort), Integer.MAX_VALUE)); } final RoundRobinLoadBalancingAlgorithm algorithm = new RoundRobinLoadBalancingAlgorithm( factories); final RoundRobinLoadBalancingAlgorithm bindAlgorithm = new RoundRobinLoadBalancingAlgorithm( bindFactories); final ConnectionFactory factory = Connections.newLoadBalancer(algorithm); final ConnectionFactory bindFactory = Connections .newLoadBalancer(bindAlgorithm); // Create listener. final LDAPListenerOptions options = new LDAPListenerOptions() @@ -486,8 +634,8 @@ LDAPListener listener = null; try { listener = new LDAPListener(localAddress, localPort, new Proxy(factory), options); listener = new LDAPListener(localAddress, localPort, new Proxy(factory, bindFactory), options); System.out.println("Press any key to stop the server..."); System.in.read(); } sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -173,10 +173,10 @@ if (!isOperational.get() && (pendingConnectFuture == null || pendingConnectFuture.isDone())) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINEST)) if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String .format("Attempting connect on factory " + this)); StaticUtils.DEBUG_LOG.fine(String .format("Attempting reconnect to offline factory " + this)); } pendingConnectFuture = factory.getAsynchronousConnection(this); } @@ -189,23 +189,29 @@ if (isOperational.getAndSet(false)) { // Transition from online to offline. if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format("Connection factory " + factory + " is no longer operational: " + error.getMessage())); } synchronized (stateLock) { offlineFactoriesCount++; if (offlineFactoriesCount == 1) { // Enable monitoring. if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String .format("Starting monitoring thread")); } monitoringFuture = scheduler.scheduleWithFixedDelay( new MonitorRunnable(), 0, monitoringInterval, monitoringIntervalTimeUnit); } } if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format("Connection factory " + factory + " is no longer operational: " + error.getMessage())); } } } @@ -216,21 +222,27 @@ if (!isOperational.getAndSet(true)) { // Transition from offline to online. if (StaticUtils.DEBUG_LOG.isLoggable(Level.INFO)) { StaticUtils.DEBUG_LOG.info(String.format("Connection factory " + factory + " is now operational")); } synchronized (stateLock) { offlineFactoriesCount--; if (offlineFactoriesCount == 0) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String .format("Stopping monitoring thread")); } monitoringFuture.cancel(false); monitoringFuture = null; } } if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format("Connection factory " + factory + " is now operational")); } } } } @@ -278,14 +290,14 @@ /** * Creates a new abstract load balancing algorithm which will monitor offline * connection factories every 10 seconds using the default scheduler. * connection factories every second using the default scheduler. * * @param factories * The connection factories. */ AbstractLoadBalancingAlgorithm(final Collection<ConnectionFactory> factories) { this(factories, 10, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler()); this(factories, 1, TimeUnit.SECONDS, StaticUtils.getDefaultScheduler()); } sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,8 +30,11 @@ import java.util.Collection; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.opends.sdk.requests.*; @@ -41,6 +44,7 @@ import com.sun.opends.sdk.util.AsynchronousFutureResult; import com.sun.opends.sdk.util.CompletedFutureResult; import com.sun.opends.sdk.util.StaticUtils; import com.sun.opends.sdk.util.Validator; @@ -49,36 +53,101 @@ */ final class ConnectionPool extends AbstractConnectionFactory { // Future used for waiting for pooled connections to become available. private static final class FuturePooledConnection extends AsynchronousFutureResult<AsynchronousConnection> /** * This result handler is invoked when an attempt to add a new connection to * the pool completes. */ private final class ConnectionResultHandler implements ResultHandler<AsynchronousConnection> { private FuturePooledConnection( final ResultHandler<? super AsynchronousConnection> handler) /** * {@inheritDoc} */ @Override public void handleErrorResult(final ErrorResultException error) { super(handler); // Connection attempt failed, so decrease the pool size. currentPoolSize.release(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt failed: " + error.getMessage() + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } QueueElement holder; synchronized (queue) { if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { // No waiting futures. return; } else { holder = queue.removeFirst(); } } // There was waiting future, so close it. holder.getWaitingFuture().handleErrorResult(error); } /** * {@inheritDoc} */ @Override public void handleResult(final AsynchronousConnection connection) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.fine(String.format( "Connection attempt succeeded: " + " currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } publishConnection(connection); } } private final class PooledConnectionWapper implements AsynchronousConnection, ConnectionEventListener /** * A pooled connection is passed to the client. It wraps an underlying * "pooled" connection obtained from the underlying factory and lasts until * the client application closes this connection. More specifically, pooled * connections are not actually stored in the internal queue. */ private final class PooledConnection implements AsynchronousConnection { // Connection event listeners registed against this pooled connection should // have the same life time as the pooled connection. private final List<ConnectionEventListener> listeners = new CopyOnWriteArrayList<ConnectionEventListener>(); private final AsynchronousConnection connection; private volatile boolean isClosed; private final AtomicBoolean isClosed = new AtomicBoolean(false); private PooledConnectionWapper(final AsynchronousConnection connection) PooledConnection(final AsynchronousConnection connection) { this.connection = connection; this.connection.addConnectionEventListener(this); } /** * {@inheritDoc} */ @Override public FutureResult<Void> abandon(final AbandonRequest request) throws UnsupportedOperationException, IllegalStateException, NullPointerException @@ -92,6 +161,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<Result> add(final AddRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, @@ -106,6 +179,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<Result> add(final AddRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) @@ -122,18 +199,28 @@ /** * {@inheritDoc} */ @Override public void addConnectionEventListener( final ConnectionEventListener listener) throws IllegalStateException, NullPointerException { Validator.ensureNotNull(listener); if (isClosed()) { throw new IllegalStateException(); } listeners.add(listener); } /** * {@inheritDoc} */ @Override public FutureResult<BindResult> bind(final BindRequest request, final ResultHandler<? super BindResult> handler) throws UnsupportedOperationException, IllegalStateException, @@ -148,6 +235,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<BindResult> bind(final BindRequest request, final ResultHandler<? super BindResult> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) @@ -164,47 +255,51 @@ /** * {@inheritDoc} */ @Override public void close() { synchronized (pool) if (!isClosed.compareAndSet(false, true)) { if (isClosed) { // Already closed. return; } isClosed = true; // Don't put invalid connections back in the pool. if (connection.isValid()) { releaseConnection(connection); return; publishConnection(connection); } } else { // The connection may have been disconnected by the remote server, but // the server may still be available. In order to avoid leaving pending // futures hanging indefinitely, we should try to reconnect immediately. // Connection is no longer valid. Close outside of lock connection.removeConnectionEventListener(this); // Close the dead connection. connection.close(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format( "Dead connection released and closed. " + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } // Try to get a new connection to replace it. factory.getAsynchronousConnection(connectionResultHandler); if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format( "Reconnect attempt starting. " + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); "Connection no longer valid. " + "currentPoolSize=%d, poolSize=%d", poolSize - currentPoolSize.availablePermits(), poolSize)); } connectionFactory.getAsynchronousConnection(new ReconnectHandler()); } } /** * {@inheritDoc} */ @Override public void close(final UnbindRequest request, final String reason) throws NullPointerException { @@ -213,6 +308,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<CompareResult> compare(final CompareRequest request, final ResultHandler<? super CompareResult> handler) throws UnsupportedOperationException, IllegalStateException, @@ -227,6 +326,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<CompareResult> compare(final CompareRequest request, final ResultHandler<? super CompareResult> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) @@ -243,48 +346,10 @@ public void handleConnectionClosed() { // Ignore - we intercept close via the close method. } public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { // Remove this connection from the pool if its in there. If not, // just ignore and wait for the user to close and we can deal with it // there. if (pool.remove(this)) { numConnections--; connection.removeConnectionEventListener(this); // FIXME: should still close the connection, but we need to be // careful that users of the pooled connection get a sensible // error if they continue to use it (i.e. not an NPE or ISE). if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format( "Connection error occured and removed from pool: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } } public void handleUnsolicitedNotification(final ExtendedResult notification) { // Ignore } /** * {@inheritDoc} */ @Override public FutureResult<Result> delete(final DeleteRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, @@ -299,6 +364,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<Result> delete(final DeleteRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) @@ -315,6 +384,10 @@ /** * {@inheritDoc} */ @Override public <R extends ExtendedResult> FutureResult<R> extendedRequest( final ExtendedRequest<R> request, final ResultHandler<? super R> handler) throws UnsupportedOperationException, IllegalStateException, @@ -329,6 +402,10 @@ /** * {@inheritDoc} */ @Override public <R extends ExtendedResult> FutureResult<R> extendedRequest( final ExtendedRequest<R> request, final ResultHandler<? super R> resultHandler, @@ -349,6 +426,7 @@ /** * {@inheritDoc} */ @Override public Connection getSynchronousConnection() { return new SynchronousConnection(this); @@ -359,20 +437,29 @@ /** * {@inheritDoc} */ @Override public boolean isClosed() { return isClosed; return isClosed.get(); } /** * {@inheritDoc} */ @Override public boolean isValid() { return !isClosed && connection.isValid(); return connection.isValid() && !isClosed(); } /** * {@inheritDoc} */ @Override public FutureResult<Result> modify(final ModifyRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, @@ -387,6 +474,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<Result> modify(final ModifyRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) @@ -403,6 +494,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyDN(final ModifyDNRequest request, final ResultHandler<? super Result> handler) throws UnsupportedOperationException, IllegalStateException, @@ -417,6 +512,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<Result> modifyDN(final ModifyDNRequest request, final ResultHandler<? super Result> resultHandler, final IntermediateResponseHandler intermediateResponseHandler) @@ -436,6 +535,7 @@ /** * {@inheritDoc} */ @Override public FutureResult<SearchResultEntry> readEntry(final DN name, final Collection<String> attributeDescriptions, final ResultHandler<? super SearchResultEntry> resultHandler) @@ -454,6 +554,7 @@ /** * {@inheritDoc} */ @Override public FutureResult<RootDSE> readRootDSE( final ResultHandler<? super RootDSE> handler) throws UnsupportedOperationException, IllegalStateException @@ -470,6 +571,7 @@ /** * {@inheritDoc} */ @Override public FutureResult<Schema> readSchema(final DN name, final ResultHandler<? super Schema> handler) throws UnsupportedOperationException, IllegalStateException @@ -486,6 +588,7 @@ /** * {@inheritDoc} */ @Override public FutureResult<Schema> readSchemaForEntry(final DN name, final ResultHandler<? super Schema> handler) throws UnsupportedOperationException, IllegalStateException @@ -499,17 +602,27 @@ /** * {@inheritDoc} */ @Override public void removeConnectionEventListener( final ConnectionEventListener listener) throws NullPointerException { Validator.ensureNotNull(listener); if (isClosed()) { throw new IllegalStateException(); } listeners.remove(listener); } /** * {@inheritDoc} */ @Override public FutureResult<Result> search(final SearchRequest request, final SearchResultHandler handler) throws UnsupportedOperationException, IllegalStateException, @@ -524,6 +637,10 @@ /** * {@inheritDoc} */ @Override public FutureResult<Result> search(final SearchRequest request, final SearchResultHandler resultHandler, final IntermediateResponseHandler intermediateResponseHandler) @@ -543,6 +660,7 @@ /** * {@inheritDoc} */ @Override public FutureResult<SearchResultEntry> searchSingleEntry( final SearchRequest request, final ResultHandler<? super SearchResultEntry> resultHandler) @@ -561,9 +679,10 @@ /** * {@inheritDoc} */ @Override public String toString() { StringBuilder builder = new StringBuilder(); final StringBuilder builder = new StringBuilder(); builder.append("PooledConnection("); builder.append(connection); builder.append(')'); @@ -573,63 +692,86 @@ private class ReconnectHandler implements ResultHandler<AsynchronousConnection> /** * A queue element is either a pending connection request future awaiting an * {@code AsynchronousConnection} or it is an unused * {@code AsynchronousConnection} awaiting a connection request. */ private static final class QueueElement { public void handleErrorResult(final ErrorResultException error) { // The reconnect failed. Fail the connect attempt. numConnections--; // The reconnect failed. The underlying connection factory // probably went // down. Just fail all pending futures synchronized (pool) { while (!pendingFutures.isEmpty()) { pendingFutures.poll().handleErrorResult(error); } } if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG.warning(String.format( "Reconnect failed. Failed all pending futures: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } private final Object value; QueueElement(final AsynchronousConnection connection) { this.value = connection; } public void handleResult(final AsynchronousConnection connection) QueueElement(final ResultHandler<? super AsynchronousConnection> handler) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String.format("Reconnect succeeded. " + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); this.value = new AsynchronousFutureResult<AsynchronousConnection>(handler); } synchronized (pool) AsynchronousConnection getWaitingConnection() { releaseConnection(connection); if (value instanceof AsynchronousConnection) { return (AsynchronousConnection) value; } else { throw new IllegalStateException(); } } private final ConnectionFactory connectionFactory; @SuppressWarnings("unchecked") AsynchronousFutureResult<AsynchronousConnection> getWaitingFuture() { if (value instanceof AsynchronousFutureResult) { return (AsynchronousFutureResult<AsynchronousConnection>) value; } else { throw new IllegalStateException(); } } private volatile int numConnections; boolean isWaitingFuture() { return value instanceof AsynchronousFutureResult; } public String toString() { return String.valueOf(value); } } // Guarded by queue. private final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); private final ConnectionFactory factory; private final int poolSize; // FIXME: should use a better collection than this - CLQ? private final Queue<AsynchronousConnection> pool; private final Semaphore currentPoolSize; private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures; private final ResultHandler<AsynchronousConnection> connectionResultHandler = new ConnectionResultHandler(); @@ -637,109 +779,16 @@ * Creates a new connection pool which will maintain {@code poolSize} * connections created using the provided connection factory. * * @param connectionFactory * @param factory * The connection factory to use for creating new connections. * @param poolSize * The maximum size of the connection pool. */ ConnectionPool(final ConnectionFactory connectionFactory, final int poolSize) ConnectionPool(final ConnectionFactory factory, final int poolSize) { this.connectionFactory = connectionFactory; this.factory = factory; this.poolSize = poolSize; this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>(); this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>(); } @Override public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection( final ResultHandler<? super AsynchronousConnection> handler) { // This entire method is synchronized to ensure new connects are // done synchronously to avoid the "pending connect" case. AsynchronousConnection conn; synchronized (pool) { // Check to see if we have a connection in the pool conn = pool.poll(); if (conn == null) { // Pool was empty. Maybe a new connection if pool size is not // reached if (numConnections >= poolSize) { // We reached max # of conns so wait for a connection to // become available. final FuturePooledConnection future = new FuturePooledConnection( handler); pendingFutures.add(future); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String.format( "No connections available. Wait-listed" + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } return future; } } } if (conn == null) { try { // We can create a new connection. conn = connectionFactory.getAsynchronousConnection(null).get(); numConnections++; if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String.format( "New connection established and aquired. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } catch (final ErrorResultException e) { if (handler != null) { handler.handleErrorResult(e); } return new CompletedFutureResult<AsynchronousConnection>(e); } catch (final InterruptedException e) { final ErrorResultException error = new ErrorResultException(Responses .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); if (handler != null) { handler.handleErrorResult(error); } return new CompletedFutureResult<AsynchronousConnection>(error); } } else { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String.format( "Connection aquired from pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } final PooledConnectionWapper pooledConnection = new PooledConnectionWapper( conn); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult<AsynchronousConnection>(pooledConnection); this.currentPoolSize = new Semaphore(poolSize); } @@ -747,11 +796,59 @@ /** * {@inheritDoc} */ @Override public FutureResult<AsynchronousConnection> getAsynchronousConnection( final ResultHandler<? super AsynchronousConnection> handler) { QueueElement holder; synchronized (queue) { if (queue.isEmpty() || queue.getFirst().isWaitingFuture()) { holder = new QueueElement(handler); queue.add(holder); } else { holder = queue.removeFirst(); } } if (!holder.isWaitingFuture()) { // There was a completed connection attempt. final AsynchronousConnection connection = holder.getWaitingConnection(); final PooledConnection pooledConnection = new PooledConnection(connection); if (handler != null) { handler.handleResult(pooledConnection); } return new CompletedFutureResult<AsynchronousConnection>(pooledConnection); } else { // Grow the pool if needed. final FutureResult<AsynchronousConnection> future = holder .getWaitingFuture(); if (!future.isDone() && currentPoolSize.tryAcquire()) { factory.getAsynchronousConnection(connectionResultHandler); } return future; } } /** * {@inheritDoc} */ @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("ConnectionPool("); builder.append(String.valueOf(connectionFactory)); builder.append(String.valueOf(factory)); builder.append(','); builder.append(poolSize); builder.append(')'); @@ -760,47 +857,25 @@ private void releaseConnection(final AsynchronousConnection connection) private void publishConnection(final AsynchronousConnection connection) { // See if there waiters pending. for (;;) QueueElement holder; synchronized (queue) { final PooledConnectionWapper pooledConnection = new PooledConnectionWapper( connection); final FuturePooledConnection future = pendingFutures.poll(); if (future == null) if (queue.isEmpty() || !queue.getFirst().isWaitingFuture()) { // No waiters - so drop out and add connection to pool. break; } future.handleResult(pooledConnection); if (!future.isCancelled()) { // The future was not cancelled and the connection was // accepted. if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String.format( "Connection released and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } holder = new QueueElement(connection); queue.add(holder); return; } else { holder = queue.removeFirst(); } } // No waiters. Put back in pool. pool.offer(connection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG.finest(String.format( "Connection released to pool. numConnections: %d, " + "poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } // There was waiting future, so close it. final PooledConnection pooledConnection = new PooledConnection(connection); holder.getWaitingFuture().handleResult(pooledConnection); } } sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
@@ -70,7 +70,7 @@ /** * Creates a new fail-over load balancing algorithm which will monitor offline * connection factories every 10 seconds using the default scheduler. * connection factories every 1 second using the default scheduler. * * @param factories * The ordered collection of connection factories. sdk/src/org/opends/sdk/RoundRobinLoadBalancingAlgorithm.java
@@ -69,7 +69,7 @@ /** * Creates a new round robin load balancing algorithm which will monitor * offline connection factories every 10 seconds using the default scheduler. * offline connection factories every 1 second using the default scheduler. * * @param factories * The ordered collection of connection factories. sdk/tests/unit-tests-testng/src/org/opends/sdk/ConnectionFactoryTestCase.java
@@ -36,14 +36,18 @@ import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.opends.sdk.requests.DigestMD5SASLBindRequest; import org.opends.sdk.requests.Requests; import org.opends.sdk.requests.SearchRequest; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.testng.annotations.DataProvider; import com.sun.opends.sdk.util.StaticUtils; import javax.net.ssl.SSLContext; @@ -101,8 +105,30 @@ /** * Disables logging before the tests. */ @BeforeClass() public void disableLogging() { StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE); } /** * Re-enable logging after the tests. */ @AfterClass() public void enableLogging() { StaticUtils.DEBUG_LOG.setLevel(Level.INFO); } @DataProvider(name = "connectionFactories") public Object[][] getConnectyionFactories() throws Exception public Object[][] getConnectionFactories() throws Exception { Object[][] factories = new Object[21][1]; sdk/tests/unit-tests-testng/src/org/opends/sdk/LDAPListenerTestCase.java
@@ -32,13 +32,17 @@ import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.sun.opends.sdk.util.StaticUtils; /** @@ -55,16 +59,18 @@ public void handleUnsolicitedNotification(ExtendedResult notification) @Override public void handleConnectionClosed() { errorMessage = "Unexpected call to handleUnsolicitedNotification"; errorMessage = "Unexpected call to handleConnectionClosed"; closeLatch.countDown(); } public void handleConnectionError(boolean isDisconnectNotification, ErrorResultException error) @Override public void handleConnectionError(final boolean isDisconnectNotification, final ErrorResultException error) { errorMessage = "Unexpected call to handleConnectionError"; closeLatch.countDown(); @@ -72,9 +78,10 @@ public void handleConnectionClosed() @Override public void handleUnsolicitedNotification(final ExtendedResult notification) { errorMessage = "Unexpected call to handleConnectionClosed"; errorMessage = "Unexpected call to handleUnsolicitedNotification"; closeLatch.countDown(); } } @@ -85,7 +92,7 @@ ServerConnection<Integer> { volatile LDAPClientContext context = null; volatile boolean isConnected = false; final CountDownLatch isConnected = new CountDownLatch(1); final CountDownLatch isClosed = new CountDownLatch(1); @@ -294,7 +301,7 @@ final LDAPClientContext clientContext) throws ErrorResultException { serverConnection.context = clientContext; serverConnection.isConnected = true; serverConnection.isConnected.countDown(); return serverConnection; } } @@ -302,6 +309,233 @@ /** * Disables logging before the tests. */ @BeforeClass() public void disableLogging() { StaticUtils.DEBUG_LOG.setLevel(Level.SEVERE); } /** * Re-enable logging after the tests. */ @AfterClass() public void enableLogging() { StaticUtils.DEBUG_LOG.setLevel(Level.INFO); } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerClose() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory( onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory( onlineServerListener.getSocketAddress()).getConnection(); final MockConnectionEventListener listener = new MockConnectionEventListener() { @Override public void handleConnectionClosed() { closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); connection.close(); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerDisconnect() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory( onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory( onlineServerListener.getSocketAddress()).getConnection(); final MockConnectionEventListener listener = new MockConnectionEventListener() { @Override public void handleConnectionError( final boolean isDisconnectNotification, final ErrorResultException error) { if (isDisconnectNotification) { errorMessage = "Unexpected disconnect notification"; } closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); Assert.assertTrue(onlineServerConnection.isConnected .await(10, TimeUnit.SECONDS)); onlineServerConnection.context.disconnect(); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); connection.close(); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerDisconnectNotification() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory( onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory( onlineServerListener.getSocketAddress()).getConnection(); final MockConnectionEventListener listener = new MockConnectionEventListener() { @Override public void handleConnectionError( final boolean isDisconnectNotification, final ErrorResultException error) { if (!isDisconnectNotification || !error.getResult().getResultCode().equals(ResultCode.BUSY) || !error.getResult().getDiagnosticMessage().equals("test")) { errorMessage = "Missing disconnect notification: " + error; } closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); Assert.assertTrue(onlineServerConnection.isConnected .await(10, TimeUnit.SECONDS)); onlineServerConnection.context.disconnect(ResultCode.BUSY, "test"); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); connection.close(); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerUnbind() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory( onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory( onlineServerListener.getSocketAddress()).getConnection(); final MockConnectionEventListener listener = new MockConnectionEventListener() { @Override public void handleConnectionClosed() { closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); connection.close(Requests.newUnbindRequest(), "called from unit test"); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); } finally { onlineServerListener.close(); } } /** * Tests basic LDAP listener functionality. * * @throws Exception @@ -318,10 +552,15 @@ try { // Connect and close. new LDAPConnectionFactory(listener.getSocketAddress()).getConnection() .close(); final Connection connection = new LDAPConnectionFactory( listener.getSocketAddress()).getConnection(); Assert.assertTrue(serverConnection.isConnected); Assert.assertTrue(serverConnection.isConnected .await(10, TimeUnit.SECONDS)); Assert.assertEquals(serverConnection.isClosed.getCount(), 1); connection.close(); Assert.assertTrue(serverConnection.isClosed.await(10, TimeUnit.SECONDS)); } finally @@ -401,14 +640,18 @@ try { // Connect and close. new LDAPConnectionFactory(proxyListener.getSocketAddress()) .getConnection().close(); final Connection connection = new LDAPConnectionFactory( proxyListener.getSocketAddress()).getConnection(); Assert.assertTrue(proxyServerConnection.isConnected.await(10, TimeUnit.SECONDS)); Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS)); // Wait for connect/close to complete. proxyServerConnection.isClosed.await(); connection.close(); Assert.assertTrue(proxyServerConnection.isConnected); Assert.assertTrue(onlineServerConnection.isConnected); proxyServerConnection.isClosed.await(); } finally { @@ -506,6 +749,11 @@ try { connection.bind("cn=test", "password"); Assert.assertTrue(proxyServerConnection.isConnected.await(10, TimeUnit.SECONDS)); Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS)); } finally { @@ -514,9 +762,6 @@ // Wait for connect/close to complete. proxyServerConnection.isClosed.await(); Assert.assertTrue(proxyServerConnection.isConnected); Assert.assertTrue(onlineServerConnection.isConnected); } finally { @@ -602,14 +847,18 @@ try { // Connect and close. new LDAPConnectionFactory(proxyListener.getSocketAddress()) .getConnection().close(); final Connection connection = new LDAPConnectionFactory( proxyListener.getSocketAddress()).getConnection(); Assert.assertTrue(proxyServerConnection.isConnected.await(10, TimeUnit.SECONDS)); Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS)); connection.close(); // Wait for connect/close to complete. proxyServerConnection.isClosed.await(); Assert.assertTrue(proxyServerConnection.isConnected); Assert.assertTrue(onlineServerConnection.isConnected); } finally { @@ -713,6 +962,11 @@ try { connection.bind("cn=test", "password"); Assert.assertTrue(proxyServerConnection.isConnected.await(10, TimeUnit.SECONDS)); Assert.assertTrue(onlineServerConnection.isConnected.await(10, TimeUnit.SECONDS)); } finally { @@ -721,9 +975,6 @@ // Wait for connect/close to complete. proxyServerConnection.isClosed.await(); Assert.assertTrue(proxyServerConnection.isConnected); Assert.assertTrue(onlineServerConnection.isConnected); } finally { @@ -763,7 +1014,7 @@ { connection.bind("cn=test", "password"); } catch (ErrorResultException e) catch (final ErrorResultException e) { connection.close(); throw e; @@ -785,7 +1036,7 @@ Assert .fail("Connection attempt to closed listener succeeded unexpectedly"); } catch (ConnectionException e) catch (final ConnectionException e) { // Expected. } @@ -795,7 +1046,7 @@ connection.bind("cn=test", "password"); Assert.fail("Bind attempt on closed connection succeeded unexpectedly"); } catch (ErrorResultException e) catch (final ErrorResultException e) { // Expected. Assert.assertFalse(connection.isValid()); @@ -808,199 +1059,4 @@ Assert.assertTrue(connection.isClosed()); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerClose() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory( onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory( onlineServerListener.getSocketAddress()).getConnection(); MockConnectionEventListener listener = new MockConnectionEventListener() { public void handleConnectionClosed() { closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); connection.close(); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerUnbind() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory( onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory( onlineServerListener.getSocketAddress()).getConnection(); MockConnectionEventListener listener = new MockConnectionEventListener() { public void handleConnectionClosed() { closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); connection.close(Requests.newUnbindRequest(), "called from unit test"); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerDisconnect() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory( onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory( onlineServerListener.getSocketAddress()).getConnection(); MockConnectionEventListener listener = new MockConnectionEventListener() { public void handleConnectionError(boolean isDisconnectNotification, ErrorResultException error) { if (isDisconnectNotification) { errorMessage = "Unexpected disconnect notification"; } closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); onlineServerConnection.context.disconnect(); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); connection.close(); } finally { onlineServerListener.close(); } } /** * Tests connection event listener. * * @throws Exception * If an unexpected error occurred. */ @Test public void testConnectionEventListenerDisconnectNotification() throws Exception { final MockServerConnection onlineServerConnection = new MockServerConnection(); final MockServerConnectionFactory onlineServerConnectionFactory = new MockServerConnectionFactory( onlineServerConnection); final LDAPListener onlineServerListener = new LDAPListener("localhost", TestCaseUtils.findFreePort(), onlineServerConnectionFactory); final Connection connection; try { // Connect and bind. connection = new LDAPConnectionFactory( onlineServerListener.getSocketAddress()).getConnection(); MockConnectionEventListener listener = new MockConnectionEventListener() { public void handleConnectionError(boolean isDisconnectNotification, ErrorResultException error) { if (!isDisconnectNotification || !error.getResult().getResultCode().equals(ResultCode.BUSY) || !error.getResult().getDiagnosticMessage().equals("test")) { errorMessage = "Missing disconnect notification: " + error; } closeLatch.countDown(); } }; connection.addConnectionEventListener(listener); Assert.assertEquals(listener.closeLatch.getCount(), 1); onlineServerConnection.context.disconnect(ResultCode.BUSY, "test"); listener.closeLatch.await(); Assert.assertNull(listener.errorMessage); connection.close(); } finally { onlineServerListener.close(); } } } sdk/tests/unit-tests-testng/src/org/opends/sdk/OpenDSTestCase.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. * Copyright 2009-2010 Sun Microsystems, Inc. */ package org.opends.sdk; @@ -55,8 +55,7 @@ // // This could be a problem if a subclass references a @DataProvider in // a super-class that provides static parameters, i.e. the parameters // are // not regenerated for each invocation of the DataProvider. // are not regenerated for each invocation of the DataProvider. // /**