| sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java | ●●●●● patch | view | raw | blame | history | |
| sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java | ●●●●● patch | view | raw | blame | history | |
| sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java | ●●●●● patch | view | raw | blame | history | |
| sdk/src/org/opends/sdk/AsynchronousConnection.java | ●●●●● patch | view | raw | blame | history | |
| sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java | ●●●●● patch | view | raw | blame | history | |
| sdk/src/org/opends/sdk/ConnectionPool.java | ●●●●● patch | view | raw | blame | history | |
| sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java | ●●●●● patch | view | raw | blame | history | |
| sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java | ●●●●● patch | view | raw | blame | history |
sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -37,7 +37,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLContext; @@ -59,7 +61,7 @@ import com.sun.grizzly.ssl.*; import com.sun.grizzly.streams.StreamWriter; import com.sun.opends.sdk.util.Validator; import com.sun.opends.sdk.util.StaticUtils; /** @@ -664,8 +666,6 @@ private final Object writeLock = new Object(); private final SearchRequest pingRequest; /** * Creates a new LDAP connection. * @@ -676,22 +676,18 @@ * @param schema * The schema which will be used to decode responses from the * server. * @param pingRequest * The search request to use to verify the validity of * this connection. * @param connFactory * The associated connection factory. */ LDAPConnection(com.sun.grizzly.Connection<?> connection, InetSocketAddress serverAddress, Schema schema, SearchRequest pingRequest, LDAPConnectionFactoryImpl connFactory) LDAPConnectionFactoryImpl connFactory) { this.connection = connection; this.serverAddress = serverAddress; this.schema = schema; this.connFactory = connFactory; this.streamWriter = getFilterChainStreamWriter(); this.pingRequest = pingRequest; } @@ -1352,10 +1348,6 @@ synchronized (writeLock) { if(reason == null && unbindRequest == null) { System.out.println("Something is wrong!"); } if (isClosed) { // Already closed. @@ -1503,27 +1495,7 @@ */ public boolean isValid() { if(!isClosed && connectionInvalidReason == null) { try { search(pingRequest, null, null).get(5, TimeUnit.SECONDS); return true; } catch (ErrorResultException e) { StaticUtils.DEBUG_LOG.warning("Validity check returned error: " + e.getResult()); } catch (TimeoutException e) { StaticUtils.DEBUG_LOG.warning("Validity check timed out"); } catch (InterruptedException e) { } } return false; return connectionInvalidReason == null && !isClosed; } // // sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -460,7 +460,7 @@ .getDefaultFilterChainFactory().getFilterChainPattern()); LDAPConnection ldapConnection = new LDAPConnection(connection, socketAddress, options.getSchema(), options.getPingRequest(), this); socketAddress, options.getSchema(), this); ldapConnectionAttr.set(connection, ldapConnection); return ldapConnection; } sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -1,9 +1,11 @@ package org.opends.sdk; import com.sun.opends.sdk.util.Validator; import com.sun.opends.sdk.util.StaticUtils; import java.util.List; import java.util.ArrayList; import java.util.logging.Level; /** * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:49:17 @@ -70,6 +72,15 @@ { resultHandler.handleErrorResult(error); } if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG .warning(String .format( "Connection factory " + factory + " is no longer operational: " + error.getMessage())); } } public void handleResult(AsynchronousConnection result) @@ -79,6 +90,14 @@ { resultHandler.handleResult(result); } if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG .warning(String .format( "Connection factory " + factory + " is now operational")); } } }; return factory.getAsynchronousConnection(handler); @@ -102,6 +121,11 @@ if(!f.isOperational && (f.pendingConnectFuture == null || f.pendingConnectFuture.isDone())) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINEST)) { StaticUtils.DEBUG_LOG .finest(String.format("Attempting connect on factory " + f)); } f.pendingConnectFuture = f.factory.getAsynchronousConnection(f); } } sdk/src/org/opends/sdk/AsynchronousConnection.java
@@ -379,12 +379,6 @@ /** * Returns true if the connection has not been closed and is still valid. * The implementation shall submit a search on the connection or use some * other mechanism that positively verifies the connection is still valid * when this method is called. * * The query submitted by the driver to validate the connection shall be * executed in the authentication context. * * @return {@code true} if the connection is valid, {@code false} otherwise. */ sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
@@ -253,6 +253,15 @@ /** * {@inheritDoc} */ public boolean isValid() { return connection.isValid(); } /** * {@inheritDoc} */ public boolean isClosed() { return connection.isClosed(); sdk/src/org/opends/sdk/ConnectionPool.java
@@ -132,58 +132,10 @@ } isClosed = true; // Don't put closed connections back in the pool. if (!connection.isValid()) // Don't put invalid connections back in the pool. if (connection.isValid()) { numConnections--; } else { // See if there waiters pending. for (;;) { PooledConnectionWapper pooledConnection = new PooledConnectionWapper( connection); FuturePooledConnection future = pendingFutures.poll(); if (future == null) { // No waiters - so drop out and add connection to pool. break; } future.handleResult(pooledConnection); if (!future.isCancelled()) { // The future was not cancelled and the connection was // accepted. if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released to pool and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } return; } } // No waiters. Put back in pool. pool.offer(connection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released to pool and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } releaseConnection(connection); return; } } @@ -191,19 +143,31 @@ // Connection is no longer valid. Close outside of lock connection.removeConnectionEventListener(this); connection.close(); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG .finest(String .warning(String .format( "Dead connection released to pool. " + "numConnections: %d, poolSize: %d, pendingFutures: %d", "Dead connection released and closed. " + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } return; } if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG .warning(String .format( "Reconnect attempt starting. " + "numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } connectionFactory.getAsynchronousConnection(new ReconnectHandler()); } public void close(UnbindRequest request, String reason) @@ -443,12 +407,12 @@ // careful that users of the pooled connection get a sensible // error if they continue to use it (i.e. not an NPE or ISE). if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG .finest(String .warning(String .format( "Connection error occured: " "Connection error occured and removed from pool: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures @@ -458,7 +422,52 @@ } } private class ReconnectHandler implements ResultHandler<AsynchronousConnection> { public void handleErrorResult(ErrorResultException error) { // The reconnect failed. Fail the connect attempt. numConnections --; // The reconnect failed. The underlying connection factory probably went // down. Just fail all pending futures synchronized (pool) { while(!pendingFutures.isEmpty()) { pendingFutures.poll().handleErrorResult(error); } } if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING)) { StaticUtils.DEBUG_LOG .warning(String .format( "Reconnect failed. Failed all pending futures: " + error.getMessage() + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } } public void handleResult(AsynchronousConnection connection) { if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Reconnect succeded. " + " numConnections: %d, poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures .size())); } synchronized (pool) { releaseConnection(connection); } } } // Future used for waiting for pooled connections to become available. private static final class FuturePooledConnection extends @@ -482,6 +491,54 @@ } private void releaseConnection(AsynchronousConnection connection) { // See if there waiters pending. for (;;) { PooledConnectionWapper pooledConnection = new PooledConnectionWapper( connection); FuturePooledConnection future = pendingFutures.poll(); if (future == null) { // No waiters - so drop out and add connection to pool. break; } future.handleResult(pooledConnection); if (!future.isCancelled()) { // The future was not cancelled and the connection was // accepted. if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released and directly " + "given to waiter. numConnections: %d, poolSize: %d, " + "pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } return; } } // No waiters. Put back in pool. pool.offer(connection); if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { StaticUtils.DEBUG_LOG .finest(String .format( "Connection released to pool. numConnections: %d, " + "poolSize: %d, pendingFutures: %d", numConnections, pool.size(), pendingFutures.size())); } } /** @@ -546,7 +603,7 @@ try { // We can create a new connection. conn = connectionFactory.getAsynchronousConnection(handler).get(); conn = connectionFactory.getAsynchronousConnection(null).get(); numConnections++; if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE)) { @@ -561,13 +618,22 @@ } catch (ErrorResultException e) { if (handler != null) { handler.handleErrorResult(e); } return new CompletedFutureResult<AsynchronousConnection>(e); } catch (InterruptedException e) { return new CompletedFutureResult<AsynchronousConnection>( ErrorResultException error = new ErrorResultException(Responses.newResult( ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e))); ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)); if (handler != null) { handler.handleErrorResult(error); } return new CompletedFutureResult<AsynchronousConnection>(error); } } else sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,14 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opends.sdk.requests.*; import org.opends.sdk.responses.*; import org.opends.sdk.schema.Schema; import com.sun.opends.sdk.util.FutureResultTransformer; import com.sun.opends.sdk.util.Validator; @@ -46,10 +47,14 @@ * An heart beat connection factory can be used to create connections * that sends a periodic search request to a Directory Server. */ public class HeartBeatConnectionFactory extends final class HeartBeatConnectionFactory extends AbstractConnectionFactory<AsynchronousConnection> { private final int interval; private final SearchRequest heartBeat; private final long timeout; private final TimeUnit unit; private final List<AsynchronousConnectionImpl> activeConnections; @@ -59,7 +64,30 @@ // FIXME: use a single global scheduler? // FIXME: change timeout parameters to long+TimeUnit. /** * Creates a new heart-beat connection factory which will create * connections using the provided connection factory and periodically * ping any created connections in order to detect that they are still * alive. * * @param connectionFactory * The connection factory to use for creating connections. * @param timeout * The time to wait between keepalive pings. * @param unit * The time unit of the timeout argument. */ HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory, long timeout, TimeUnit unit) { this(connectionFactory, timeout, unit, DEFAULT_SEARCH); } private static final SearchRequest DEFAULT_SEARCH = Requests .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1"); @@ -71,15 +99,19 @@ * * @param connectionFactory * The connection factory to use for creating connections. * @param interval * The period between keepalive pings. * @param timeout * The time to wait between keepalive pings. * @param unit * The time unit of the timeout argument. * @param heartBeat * The search request to use when pinging connections. */ public HeartBeatConnectionFactory( ConnectionFactory<?> connectionFactory, int interval) HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory, long timeout, TimeUnit unit, SearchRequest heartBeat) { Validator.ensureNotNull(connectionFactory); this.interval = interval; this.heartBeat = heartBeat; this.timeout = timeout; this.unit = unit; this.activeConnections = new LinkedList<AsynchronousConnectionImpl>(); this.parentFactory = connectionFactory; @@ -93,9 +125,11 @@ * operations. */ private final class AsynchronousConnectionImpl implements AsynchronousConnection, ConnectionEventListener AsynchronousConnection, ConnectionEventListener, ResultHandler<Result> { private final AsynchronousConnection connection; private long lastSuccessfulPing; private FutureResult<Result> lastPingFuture; private AsynchronousConnectionImpl(AsynchronousConnection connection) @@ -316,9 +350,9 @@ */ public boolean isValid() { // Avoid extra pings... Let the next ping find out if this connection // is still valid. return connection.isValid(); return connection.isValid() && (lastSuccessfulPing <= 0 || System.currentTimeMillis() - lastSuccessfulPing < unit.toMillis(timeout) * 2); } public void connectionReceivedUnsolicitedNotification( @@ -338,6 +372,15 @@ activeConnections.remove(this); } } public void handleErrorResult(ErrorResultException error) { connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: " + error); } public void handleResult(Result result) { lastSuccessfulPing = System.currentTimeMillis(); } } @@ -354,22 +397,26 @@ public void run() { long startTime; while(true) { startTime = System.currentTimeMillis(); synchronized (activeConnections) { for (AsynchronousConnectionImpl connection : activeConnections) { if(!connection.isValid()) if(connection.lastPingFuture == null || connection.lastPingFuture.isDone()) { connection.close(Requests.newUnbindRequest(), "Connection no longer valid"); connection.lastPingFuture = connection.search(heartBeat, connection, null); } } } try { sleep(interval); sleep(unit.toMillis(timeout) - (System.currentTimeMillis() - startTime)); } catch (InterruptedException e) { sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
@@ -32,9 +32,6 @@ import javax.net.ssl.SSLContext; import org.opends.sdk.schema.Schema; import org.opends.sdk.requests.SearchRequest; import org.opends.sdk.requests.Requests; import org.opends.sdk.SearchScope; import com.sun.opends.sdk.util.Validator; @@ -45,18 +42,12 @@ */ public final class LDAPConnectionOptions { private static final SearchRequest DEFAULT_PING = Requests .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1"); private Schema schema = Schema.getDefaultSchema(); private SSLContext sslContext = null; private boolean useStartTLS = false; private SearchRequest pingRequest = DEFAULT_PING; /** @@ -206,29 +197,7 @@ return this; } /** * Retrieves the search request used to verify the validity of a * LDAP connection. By default, the root DSE is retrieved without any * attributes. * * @return The search request. */ public SearchRequest getPingRequest() { return pingRequest; } /** * Sets the search request used to verify the validity of a * LDAP connection. * * @param pingRequest The search request that can be used to verify the * validity of a LDAP connection. */ public void setPingRequest(SearchRequest pingRequest) { this.pingRequest = pingRequest; } // Assigns the provided options to this set of options. LDAPConnectionOptions assign(LDAPConnectionOptions options)