opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -67,8 +67,7 @@ * LDAP connection factory implementation. */ public final class LDAPConnectionFactoryImpl extends AbstractConnectionFactory<AsynchronousConnection> implements ConnectionFactory<AsynchronousConnection> AbstractConnectionFactory implements ConnectionFactory { private final class LDAPTransport extends AbstractLDAPTransport { @@ -373,7 +372,7 @@ * {@inheritDoc} */ public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> handler) ResultHandler<AsynchronousConnection> handler) { FutureResultImpl future = new FutureResultImpl(handler); opendj-sdk/sdk/src/com/sun/opends/sdk/tools/ArgumentParserConnectionFactory.java
@@ -65,8 +65,7 @@ * A connection factory designed for use with command line tools. */ final class ArgumentParserConnectionFactory extends AbstractConnectionFactory<AsynchronousConnection> implements ConnectionFactory<AsynchronousConnection> AbstractConnectionFactory implements ConnectionFactory { /** * End Of Line. @@ -174,7 +173,7 @@ private SSLContext sslContext; private ConnectionFactory<? extends AsynchronousConnection> connFactory; private ConnectionFactory connFactory; private BindRequest bindRequest = null; @@ -349,8 +348,8 @@ /** * {@inheritDoc} */ public FutureResult<? extends AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> handler) public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<AsynchronousConnection> handler) { return connFactory.getAsynchronousConnection(handler); } @@ -364,8 +363,8 @@ // Couldn't have at the same time bindPassword and bindPasswordFile if (bindPasswordArg.isPresent() && bindPasswordFileArg.isPresent()) { LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS .get(bindPasswordArg.getLongIdentifier(), bindPasswordFileArg LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get( bindPasswordArg.getLongIdentifier(), bindPasswordFileArg .getLongIdentifier()); throw new ArgumentException(message); } @@ -374,23 +373,24 @@ // trustStore related arg if (trustAllArg.isPresent() && trustStorePathArg.isPresent()) { LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get(trustAllArg .getLongIdentifier(), trustStorePathArg.getLongIdentifier()); LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get( trustAllArg.getLongIdentifier(), trustStorePathArg .getLongIdentifier()); throw new ArgumentException(message); } if (trustAllArg.isPresent() && trustStorePasswordArg.isPresent()) { LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get(trustAllArg .getLongIdentifier(), trustStorePasswordArg .getLongIdentifier()); LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get( trustAllArg.getLongIdentifier(), trustStorePasswordArg .getLongIdentifier()); throw new ArgumentException(message); } if (trustAllArg.isPresent() && trustStorePasswordFileArg.isPresent()) { LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get(trustAllArg .getLongIdentifier(), trustStorePasswordFileArg .getLongIdentifier()); LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get( trustAllArg.getLongIdentifier(), trustStorePasswordFileArg .getLongIdentifier()); throw new ArgumentException(message); } @@ -411,7 +411,8 @@ String value = trustStorePathArg.getValue(); if (!canRead(trustStorePathArg.getValue())) { LocalizableMessage message = ERR_CANNOT_READ_TRUSTSTORE.get(value); LocalizableMessage message = ERR_CANNOT_READ_TRUSTSTORE .get(value); throw new ArgumentException(message); } } @@ -422,7 +423,8 @@ String value = keyStorePathArg.getValue(); if (!canRead(trustStorePathArg.getValue())) { LocalizableMessage message = ERR_CANNOT_READ_KEYSTORE.get(value); LocalizableMessage message = ERR_CANNOT_READ_KEYSTORE .get(value); throw new ArgumentException(message); } } @@ -431,8 +433,9 @@ // useSSLArg if (useStartTLSArg.isPresent() && useSSLArg.isPresent()) { LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get(useStartTLSArg .getLongIdentifier(), useSSLArg.getLongIdentifier()); LocalizableMessage message = ERR_TOOL_CONFLICTING_ARGS.get( useStartTLSArg.getLongIdentifier(), useSSLArg .getLongIdentifier()); throw new ArgumentException(message); } @@ -492,8 +495,8 @@ } catch (CLIException e) { throw new ArgumentException(LocalizableMessage.raw("Error reading input: " + e.toString())); throw new ArgumentException(LocalizableMessage .raw("Error reading input: " + e.toString())); } if (bindRequest != null) { @@ -522,7 +525,8 @@ if (bindDnArg.isPresent() || bindPasswordFileArg.isPresent() || bindPasswordArg.isPresent()) { return Requests.newSimpleBindRequest(getBindDN(), getPassword()); return Requests .newSimpleBindRequest(getBindDN(), getPassword()); } return null; } @@ -559,12 +563,14 @@ { if (sslContext == null) { LocalizableMessage message = ERR_TOOL_SASLEXTERNAL_NEEDS_SSL_OR_TLS.get(); LocalizableMessage message = ERR_TOOL_SASLEXTERNAL_NEEDS_SSL_OR_TLS .get(); throw new ArgumentException(message); } if (!keyStorePathArg.isPresent() && getKeyStore() == null) { LocalizableMessage message = ERR_TOOL_SASLEXTERNAL_NEEDS_KEYSTORE.get(); LocalizableMessage message = ERR_TOOL_SASLEXTERNAL_NEEDS_KEYSTORE .get(); throw new ArgumentException(message); } return new ExternalSASLBindRequest(getAuthzID()); @@ -591,9 +597,9 @@ } else if (app.isInteractive()) { value = app.readInput(LocalizableMessage.raw("Bind DN:"), bindDnArg .getDefaultValue() == null ? value : bindDnArg .getDefaultValue()); value = app.readInput(LocalizableMessage.raw("Bind DN:"), bindDnArg.getDefaultValue() == null ? value : bindDnArg .getDefaultValue()); } try @@ -626,13 +632,15 @@ } if (value == null && app.isInteractive()) { value = app.readInput(LocalizableMessage.raw("Authentication ID:"), value = app.readInput(LocalizableMessage .raw("Authentication ID:"), bindDnArg.getDefaultValue() == null ? null : "dn: " + bindDnArg.getDefaultValue()); } if (value == null) { LocalizableMessage message = ERR_LDAPAUTH_SASL_AUTHID_REQUIRED.get(mech); LocalizableMessage message = ERR_LDAPAUTH_SASL_AUTHID_REQUIRED .get(mech); throw new ArgumentException(message); } return value; @@ -676,7 +684,8 @@ } if (value.length() == 0 && app.isInteractive()) { value = app.readLineOfInput(LocalizableMessage.raw("Bind Password:")); value = app.readLineOfInput(LocalizableMessage .raw("Bind Password:")); } return ByteString.valueOf(value); opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java
@@ -67,69 +67,15 @@ * then the connection attempt will fail and an {@code * ErrorResultException} will be thrown. */ final class AuthenticatedConnectionFactory implements ConnectionFactory<AuthenticatedConnectionFactory.AuthenticatedAsynchronousConnection> final class AuthenticatedConnectionFactory extends AbstractConnectionFactory implements ConnectionFactory { // We implement the factory using the pimpl idiom in order have // cleaner Javadoc which does not expose implementation methods from // AbstractConnectionFactory. private static final class Impl extends AbstractConnectionFactory<AuthenticatedConnectionFactory.AuthenticatedAsynchronousConnection> implements ConnectionFactory<AuthenticatedConnectionFactory.AuthenticatedAsynchronousConnection> { private final BindRequest request; private final BindRequest request; private final ConnectionFactory<?> parentFactory; private final ConnectionFactory parentFactory; private boolean allowRebinds = false; private Impl(ConnectionFactory<?> factory, BindRequest request) throws NullPointerException { Validator.ensureNotNull(factory, request); this.parentFactory = factory; // FIXME: should do a defensive copy. this.request = request; } /** * {@inheritDoc} */ public FutureResult<AuthenticatedAsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AuthenticatedAsynchronousConnection> handler) { FutureResultImpl future = new FutureResultImpl(request, handler); future.futureConnectionResult.setFutureResult(parentFactory .getAsynchronousConnection(future.futureConnectionResult)); return future.futureBindResult; } /** * {@inheritDoc} */ public AuthenticatedConnection getConnection() throws ErrorResultException { return new AuthenticatedConnection( blockingGetAsynchronousConnection()); } } private final Impl impl; private boolean allowRebinds = false; @@ -555,17 +501,21 @@ * @throws NullPointerException * If {@code factory} or {@code request} was {@code null}. */ public AuthenticatedConnectionFactory(ConnectionFactory<?> factory, public AuthenticatedConnectionFactory(ConnectionFactory factory, BindRequest request) throws NullPointerException { impl = new Impl(factory, request); Validator.ensureNotNull(factory, request); this.parentFactory = factory; // FIXME: should do a defensive copy. this.request = request; } private static final class FutureResultImpl { private final FutureResultTransformer<BindResult, AuthenticatedAsynchronousConnection> futureBindResult; private final FutureResultTransformer<BindResult, AsynchronousConnection> futureBindResult; private final RecursiveFutureResult<AsynchronousConnection, BindResult> futureConnectionResult; @@ -575,12 +525,11 @@ private FutureResultImpl( BindRequest request, ResultHandler<? super AuthenticatedAsynchronousConnection> handler) private FutureResultImpl(BindRequest request, ResultHandler<AsynchronousConnection> handler) { this.bindRequest = request; this.futureBindResult = new FutureResultTransformer<BindResult, AuthenticatedAsynchronousConnection>( this.futureBindResult = new FutureResultTransformer<BindResult, AsynchronousConnection>( handler) { @@ -647,7 +596,7 @@ public AuthenticatedConnectionFactory setRebindAllowed( boolean allowRebinds) { impl.allowRebinds = allowRebinds; this.allowRebinds = allowRebinds; return this; } @@ -666,15 +615,18 @@ */ public boolean isRebindAllowed() { return impl.allowRebinds; return allowRebinds; } public FutureResult<AuthenticatedAsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AuthenticatedAsynchronousConnection> handler) public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<AsynchronousConnection> handler) { return impl.getAsynchronousConnection(handler); FutureResultImpl future = new FutureResultImpl(request, handler); future.futureConnectionResult.setFutureResult(parentFactory .getAsynchronousConnection(future.futureConnectionResult)); return future.futureBindResult; } @@ -682,7 +634,8 @@ public AuthenticatedConnection getConnection() throws ErrorResultException { return impl.getConnection(); return new AuthenticatedConnection( (AuthenticatedAsynchronousConnection) blockingGetAsynchronousConnection()); } } opendj-sdk/sdk/src/com/sun/opends/sdk/tools/ModRate.java
@@ -254,7 +254,7 @@ WorkerThread<?> newWorkerThread(AsynchronousConnection connection, ConnectionFactory<?> connectionFactory) ConnectionFactory connectionFactory) { return new ModifyWorkerThread(connection, connectionFactory); } @@ -277,7 +277,7 @@ private ModifyWorkerThread(AsynchronousConnection connection, ConnectionFactory<?> connectionFactory) ConnectionFactory connectionFactory) { super(connection, connectionFactory); } opendj-sdk/sdk/src/com/sun/opends/sdk/tools/PerformanceRunner.java
@@ -224,7 +224,7 @@ final int run(ConnectionFactory<?> connectionFactory) final int run(ConnectionFactory connectionFactory) { List<Thread> threads = new ArrayList<Thread>(); @@ -291,7 +291,7 @@ abstract WorkerThread<?> newWorkerThread( AsynchronousConnection connection, ConnectionFactory<?> connectionFactory); ConnectionFactory connectionFactory); @@ -357,12 +357,12 @@ private final AsynchronousConnection connection; private final ConnectionFactory<?> connectionFactory; private final ConnectionFactory connectionFactory; WorkerThread(AsynchronousConnection connection, ConnectionFactory<?> connectionFactory) ConnectionFactory connectionFactory) { super("Worker Thread"); this.connection = connection; opendj-sdk/sdk/src/com/sun/opends/sdk/tools/SearchRate.java
@@ -309,7 +309,7 @@ WorkerThread<?> newWorkerThread(AsynchronousConnection connection, ConnectionFactory<?> connectionFactory) ConnectionFactory connectionFactory) { return new SearchWorkerThread(connection, connectionFactory); } @@ -357,7 +357,7 @@ private SearchWorkerThread(AsynchronousConnection connection, ConnectionFactory<?> connectionFactory) ConnectionFactory connectionFactory) { super(connection, connectionFactory); } opendj-sdk/sdk/src/org/opends/sdk/AbstractConnectionFactory.java
@@ -38,13 +38,9 @@ * This class provides a skeletal implementation of the {@code * ConnectionFactory} interface, to minimize the effort required to * implement this interface. * * @param <C> * The type of asynchronous connection returned by this * connection factory. */ public abstract class AbstractConnectionFactory<C extends AsynchronousConnection> implements ConnectionFactory<C> public abstract class AbstractConnectionFactory implements ConnectionFactory { /** * Creates a new abstract connection factory. @@ -59,8 +55,8 @@ /** * {@inheritDoc} */ public abstract FutureResult<? extends C> getAsynchronousConnection( ResultHandler<? super C> handler); public abstract FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<AsynchronousConnection> handler); @@ -101,10 +97,10 @@ * @throws ErrorResultException * If the connection request failed for some reason. */ protected final C blockingGetAsynchronousConnection() protected final AsynchronousConnection blockingGetAsynchronousConnection() throws ErrorResultException { FutureResult<? extends C> future = getAsynchronousConnection(null); FutureResult<AsynchronousConnection> future = getAsynchronousConnection(null); try { return future.get(); opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -51,12 +51,12 @@ protected AbstractLoadBalancingAlgorithm( ConnectionFactory<?>... factories) ConnectionFactory... factories) { Validator.ensureNotNull((Object[]) factories); factoryList = new ArrayList<MonitoredConnectionFactory>( factories.length); for (ConnectionFactory<?> f : factories) for (ConnectionFactory f : factories) { factoryList.add(new MonitoredConnectionFactory(f)); } @@ -67,10 +67,10 @@ protected class MonitoredConnectionFactory extends AbstractConnectionFactory<AsynchronousConnection> implements AbstractConnectionFactory implements ResultHandler<AsynchronousConnection> { private final ConnectionFactory<?> factory; private final ConnectionFactory factory; private volatile boolean isOperational; @@ -78,7 +78,7 @@ private MonitoredConnectionFactory(ConnectionFactory<?> factory) private MonitoredConnectionFactory(ConnectionFactory factory) { this.factory = factory; this.isOperational = true; @@ -109,8 +109,8 @@ public FutureResult<? extends AsynchronousConnection> getAsynchronousConnection( final ResultHandler<? super AsynchronousConnection> resultHandler) public FutureResult<AsynchronousConnection> getAsynchronousConnection( final ResultHandler<AsynchronousConnection> resultHandler) { ResultHandler<AsynchronousConnection> handler = new ResultHandler<AsynchronousConnection>() { opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
@@ -31,7 +31,6 @@ import java.io.Closeable; import java.util.Collection; import java.util.concurrent.TimeUnit; import org.opends.sdk.requests.*; import org.opends.sdk.responses.BindResult; @@ -385,7 +384,7 @@ boolean isValid(); /** * Modifies an entry in the Directory Server using the provided modify * request. opendj-sdk/sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
@@ -57,12 +57,12 @@ * ErrorResultException} will be thrown. */ final class AuthenticatedConnectionFactory extends AbstractConnectionFactory<AsynchronousConnection> AbstractConnectionFactory { private final BindRequest request; private final ConnectionFactory<?> parentFactory; private final ConnectionFactory parentFactory; @@ -77,7 +77,7 @@ * @param request * The Bind request to use for authentication. */ AuthenticatedConnectionFactory(ConnectionFactory<?> factory, AuthenticatedConnectionFactory(ConnectionFactory factory, BindRequest request) throws NullPointerException { this.parentFactory = factory; @@ -92,7 +92,7 @@ * {@inheritDoc} */ public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> handler) ResultHandler<AsynchronousConnection> handler) { FutureResultImpl future = new FutureResultImpl(request, handler); future.futureConnectionResult.setFutureResult(parentFactory @@ -253,7 +253,8 @@ /** * {@inheritDoc} */ public boolean isValid() { public boolean isValid() { return connection.isValid(); } opendj-sdk/sdk/src/org/opends/sdk/ConnectionFactory.java
@@ -49,11 +49,8 @@ * connection. Applications should aim to close connections as soon as * possible in order to avoid resource contention. * * @param <C> * The type of asynchronous connection returned by this * connection factory. */ public interface ConnectionFactory<C extends AsynchronousConnection> public interface ConnectionFactory { /** * Returns a connection to the Directory Server associated with this @@ -83,6 +80,6 @@ * @return A future which can be used to retrieve the asynchronous * connection. */ FutureResult<? extends C> getAsynchronousConnection( ResultHandler<? super C> handler); FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<AsynchronousConnection> handler); } opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -30,10 +30,8 @@ import java.util.Collection; import java.util.Stack; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import org.opends.sdk.requests.*; @@ -42,7 +40,6 @@ import com.sun.opends.sdk.util.AbstractFutureResult; import com.sun.opends.sdk.util.CompletedFutureResult; import com.sun.opends.sdk.util.FutureResultTransformer; import com.sun.opends.sdk.util.StaticUtils; @@ -50,10 +47,9 @@ /** * A simple connection pool implementation. */ final class ConnectionPool extends AbstractConnectionFactory<AsynchronousConnection> final class ConnectionPool extends AbstractConnectionFactory { private final ConnectionFactory<?> connectionFactory; private final ConnectionFactory connectionFactory; private volatile int numConnections; @@ -70,9 +66,11 @@ AsynchronousConnection, ConnectionEventListener { private final AsynchronousConnection connection; private volatile boolean isClosed; private PooledConnectionWapper(AsynchronousConnection connection) { this.connection = connection; @@ -126,7 +124,7 @@ { synchronized (pool) { if(isClosed) if (isClosed) { return; } @@ -145,31 +143,27 @@ connection.close(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG .warning(String .format( "Dead connection released and closed. " + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); StaticUtils.DEBUG_LOG.warning(String.format( "Dead connection released and closed. " + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG .warning(String .format( "Reconnect attempt starting. " + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); StaticUtils.DEBUG_LOG.warning(String.format( "Reconnect attempt starting. " + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } connectionFactory.getAsynchronousConnection(new ReconnectHandler()); connectionFactory .getAsynchronousConnection(new ReconnectHandler()); } public void close(UnbindRequest request, String reason) throws NullPointerException { @@ -380,11 +374,15 @@ return isClosed; } public boolean isValid() { return !isClosed && connection.isValid(); } public void connectionReceivedUnsolicitedNotification( GenericExtendedResult notification) { @@ -396,9 +394,11 @@ public void connectionErrorOccurred( boolean isDisconnectNotification, ErrorResultException error) { // Remove this connection from the pool if its in there. If not, just // ignore and wait for the user to close and we can deal with it there. if(pool.remove(this)) // Remove this connection from the pool if its in there. If not, // just // ignore and wait for the user to close and we can deal with it // there. if (pool.remove(this)) { numConnections--; connection.removeConnectionEventListener(this); @@ -412,27 +412,31 @@ StaticUtils.DEBUG_LOG .warning(String .format( "Connection error occured and removed from pool: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); "Connection error occured and removed from pool: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } } } private class ReconnectHandler implements ResultHandler<AsynchronousConnection> private class ReconnectHandler implements ResultHandler<AsynchronousConnection> { public void handleErrorResult(ErrorResultException error) { public void handleErrorResult(ErrorResultException error) { // The reconnect failed. Fail the connect attempt. numConnections --; // The reconnect failed. The underlying connection factory probably went numConnections--; // The reconnect failed. The underlying connection factory // probably went // down. Just fail all pending futures synchronized (pool) { while(!pendingFutures.isEmpty()) while (!pendingFutures.isEmpty()) { pendingFutures.poll().handleErrorResult(error); } @@ -442,25 +446,26 @@ StaticUtils.DEBUG_LOG .warning(String .format( "Reconnect failed. Failed all pending futures: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); "Reconnect failed. Failed all pending futures: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } public void handleResult(AsynchronousConnection connection) { public void handleResult(AsynchronousConnection connection) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Reconnect succeded. " + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); "Reconnect succeded. " + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } synchronized (pool) { @@ -469,6 +474,8 @@ } } // Future used for waiting for pooled connections to become available. private static final class FuturePooledConnection extends AbstractFutureResult<AsynchronousConnection> @@ -491,6 +498,8 @@ } private void releaseConnection(AsynchronousConnection connection) { // See if there waiters pending. @@ -517,10 +526,10 @@ StaticUtils.DEBUG_LOG .finest(String .format( "Connection released and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); "Connection released and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool .size(), pendingFutures.size())); } return; } @@ -530,12 +539,10 @@ pool.offer(connection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released to pool. numConnections: %d, " + "poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); StaticUtils.DEBUG_LOG.finest(String.format( "Connection released to pool. numConnections: %d, " + "poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } @@ -551,7 +558,7 @@ * @param poolSize * The maximum size of the connection pool. */ ConnectionPool(ConnectionFactory<?> connectionFactory, int poolSize) ConnectionPool(ConnectionFactory connectionFactory, int poolSize) { this.connectionFactory = connectionFactory; this.poolSize = poolSize; @@ -561,13 +568,14 @@ public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler) public synchronized FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<AsynchronousConnection> handler) { // This entire method is synchronized to ensure new connects are done // This entire method is synchronized to ensure new connects are // done // synchronously to avoid the "pending connect" case. AsynchronousConnection conn; synchronized(pool) synchronized (pool) { // Check to see if we have a connection in the pool conn = pool.poll(); @@ -577,7 +585,8 @@ // reached if (numConnections >= poolSize) { // We reached max # of conns so wait for a connection to become available. // We reached max # of conns so wait for a connection to // become available. FuturePooledConnection future = new FuturePooledConnection( handler); pendingFutures.add(future); @@ -587,10 +596,10 @@ StaticUtils.DEBUG_LOG .finest(String .format( "No connections available. Wait-listed" + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); "No connections available. Wait-listed" + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } return future; @@ -598,7 +607,7 @@ } } if(conn == null) if (conn == null) { try { @@ -610,10 +619,10 @@ StaticUtils.DEBUG_LOG .finest(String .format( "New connection established and aquired. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); "New connection established and aquired. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } catch (ErrorResultException e) @@ -626,9 +635,8 @@ } catch (InterruptedException e) { ErrorResultException error = new ErrorResultException(Responses.newResult( ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); ErrorResultException error = new ErrorResultException(Responses .newResult(ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); if (handler != null) { handler.handleErrorResult(error); @@ -643,10 +651,9 @@ StaticUtils.DEBUG_LOG .finest(String .format( "Connection aquired from pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); "Connection aquired from pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } opendj-sdk/sdk/src/org/opends/sdk/Connections.java
@@ -75,8 +75,8 @@ * @throws NullPointerException * If {@code factory} or {@code request} was {@code null}. */ public static ConnectionFactory<AsynchronousConnection> newAuthenticatedConnectionFactory( ConnectionFactory<?> factory, BindRequest request) public static ConnectionFactory newAuthenticatedConnectionFactory( ConnectionFactory factory, BindRequest request) throws NullPointerException { Validator.ensureNotNull(factory, request); @@ -101,8 +101,8 @@ * @throws NullPointerException * If {@code factory} was {@code null}. */ public static ConnectionFactory<AsynchronousConnection> newConnectionPool( ConnectionFactory<?> factory, int poolSize) public static ConnectionFactory newConnectionPool( ConnectionFactory factory, int poolSize) throws IllegalArgumentException, NullPointerException { Validator.ensureNotNull(factory); @@ -130,8 +130,8 @@ * @throws NullPointerException * If {@code factory} or {@code unit} was {@code null}. */ public static ConnectionFactory<AsynchronousConnection> newHeartBeatConnectionFactory( ConnectionFactory<?> factory, long timeout, TimeUnit unit) public static ConnectionFactory newHeartBeatConnectionFactory( ConnectionFactory factory, long timeout, TimeUnit unit) throws IllegalArgumentException, NullPointerException { Validator.ensureNotNull(factory, unit); @@ -163,8 +163,8 @@ * If {@code factory}, {@code unit}, or {@code heartBeat} * was {@code null}. */ public static ConnectionFactory<AsynchronousConnection> newHeartBeatConnectionFactory( ConnectionFactory<?> factory, long timeout, TimeUnit unit, public static ConnectionFactory newHeartBeatConnectionFactory( ConnectionFactory factory, long timeout, TimeUnit unit, SearchRequest heartBeat) throws IllegalArgumentException, NullPointerException { opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
@@ -1,22 +1,25 @@ package org.opends.sdk; /** * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 5:42:01 * PM To change this template use File | Settings | File Templates. * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: * 5:42:01 PM To change this template use File | Settings | File * Templates. */ public class FailoverLoadBalancingAlgorithm extends AbstractLoadBalancingAlgorithm public class FailoverLoadBalancingAlgorithm extends AbstractLoadBalancingAlgorithm { public FailoverLoadBalancingAlgorithm(ConnectionFactory<?>... factories) public FailoverLoadBalancingAlgorithm(ConnectionFactory... factories) { super(factories); } public ConnectionFactory<?> getNextConnectionFactory() public ConnectionFactory getNextConnectionFactory() { for(MonitoredConnectionFactory f : factoryList) for (MonitoredConnectionFactory f : factoryList) { if(f.isOperational()) if (f.isOperational()) { return f; } opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -33,7 +33,6 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; @@ -48,7 +47,7 @@ * that sends a periodic search request to a Directory Server. */ final class HeartBeatConnectionFactory extends AbstractConnectionFactory<AsynchronousConnection> AbstractConnectionFactory { private final SearchRequest heartBeat; @@ -58,7 +57,7 @@ private final List<AsynchronousConnectionImpl> activeConnections; private final ConnectionFactory<?> parentFactory; private final ConnectionFactory parentFactory; @@ -77,7 +76,7 @@ * @param unit * The time unit of the timeout argument. */ HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory, HeartBeatConnectionFactory(ConnectionFactory connectionFactory, long timeout, TimeUnit unit) { this(connectionFactory, timeout, unit, DEFAULT_SEARCH); @@ -106,7 +105,7 @@ * @param heartBeat * The search request to use when pinging connections. */ HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory, HeartBeatConnectionFactory(ConnectionFactory connectionFactory, long timeout, TimeUnit unit, SearchRequest heartBeat) { this.heartBeat = heartBeat; @@ -125,13 +124,17 @@ * operations. */ private final class AsynchronousConnectionImpl implements AsynchronousConnection, ConnectionEventListener, ResultHandler<Result> AsynchronousConnection, ConnectionEventListener, ResultHandler<Result> { private final AsynchronousConnection connection; private long lastSuccessfulPing; private FutureResult<Result> lastPingFuture; private AsynchronousConnectionImpl(AsynchronousConnection connection) { this.connection = connection; @@ -345,16 +348,20 @@ return connection.isClosed(); } /** * {@inheritDoc} */ public boolean isValid() { return connection.isValid() && (lastSuccessfulPing <= 0 || System.currentTimeMillis() - lastSuccessfulPing < unit.toMillis(timeout) * 2); return connection.isValid() && (lastSuccessfulPing <= 0 || System.currentTimeMillis() - lastSuccessfulPing < unit.toMillis(timeout) * 2); } public void connectionReceivedUnsolicitedNotification( GenericExtendedResult notification) { @@ -373,13 +380,19 @@ } } public void handleErrorResult(ErrorResultException error) { public void handleErrorResult(ErrorResultException error) { connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: " + error); } public void handleResult(Result result) { lastSuccessfulPing = System.currentTimeMillis(); public void handleResult(Result result) { lastSuccessfulPing = System.currentTimeMillis(); } } @@ -398,25 +411,25 @@ public void run() { long startTime; while(true) while (true) { startTime = System.currentTimeMillis(); synchronized (activeConnections) { for (AsynchronousConnectionImpl connection : activeConnections) { if(connection.lastPingFuture == null || connection.lastPingFuture.isDone()) if (connection.lastPingFuture == null || connection.lastPingFuture.isDone()) { connection.lastPingFuture = connection.search(heartBeat, connection, null); connection.lastPingFuture = connection.search(heartBeat, connection, null); } } } try { sleep(unit.toMillis(timeout) - (System.currentTimeMillis() - startTime)); sleep(unit.toMillis(timeout) - (System.currentTimeMillis() - startTime)); } catch (InterruptedException e) { @@ -464,7 +477,7 @@ public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> handler) ResultHandler<AsynchronousConnection> handler) { FutureResultImpl future = new FutureResultImpl(handler); future.setFutureResult(parentFactory opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java
@@ -1,10 +1,11 @@ package org.opends.sdk; /** * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:37:03 * PM To change this template use File | Settings | File Templates. * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: * 3:37:03 PM To change this template use File | Settings | File * Templates. */ public interface LoadBalancingAlgorithm { public ConnectionFactory<?> getNextConnectionFactory(); public ConnectionFactory getNextConnectionFactory(); } opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java
@@ -1,43 +1,51 @@ package org.opends.sdk; import com.sun.opends.sdk.util.Validator; import com.sun.opends.sdk.util.AbstractFutureResult; import org.opends.sdk.responses.Responses; /** * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:23:52 * PM To change this template use File | Settings | File Templates. * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: * 3:23:52 PM To change this template use File | Settings | File * Templates. */ public class LoadBalancingConnectionFactory extends AbstractConnectionFactory<AsynchronousConnection> public class LoadBalancingConnectionFactory extends AbstractConnectionFactory { private final LoadBalancingAlgorithm algorithm; public LoadBalancingConnectionFactory(LoadBalancingAlgorithm algorithm) { Validator.ensureNotNull(algorithm); this.algorithm = algorithm; } public FutureResult<? extends AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> resultHandler) public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<AsynchronousConnection> resultHandler) { ConnectionFactory<?> factory = algorithm.getNextConnectionFactory(); if(factory == null) ConnectionFactory factory = algorithm.getNextConnectionFactory(); if (factory == null) { AbstractFutureResult<AsynchronousConnection> future = new AbstractFutureResult<AsynchronousConnection>(resultHandler) AbstractFutureResult<AsynchronousConnection> future = new AbstractFutureResult<AsynchronousConnection>( resultHandler) { public int getRequestID() { return -1; } }; future.handleErrorResult(new ErrorResultException( Responses.newResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR). setDiagnosticMessage("No connection factories available"))); future.handleErrorResult(new ErrorResultException(Responses .newResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR) .setDiagnosticMessage("No connection factories available"))); return future; } opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionFactory.java
@@ -37,8 +37,7 @@ /** * LDAP connection factory implementation. */ public final class LDAPConnectionFactory implements ConnectionFactory<AsynchronousConnection> public final class LDAPConnectionFactory implements ConnectionFactory { // We implement the factory using the pimpl idiom in order have // cleaner Javadoc which does not expose implementation methods from @@ -115,7 +114,7 @@ public FutureResult<AsynchronousConnection> getAsynchronousConnection( ResultHandler<? super AsynchronousConnection> handler) ResultHandler<AsynchronousConnection> handler) { return impl.getAsynchronousConnection(handler); }