From d7ab33028a671b16464920ecd0149065639adf3e Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 05:55:54 +0000
Subject: [PATCH] Initial implementation of fail over connection factory
---
opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java | 234 +++++++-------
opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java | 46 ++
opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java | 26 +
opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java | 16 +
opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java | 7
opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java | 31 +
opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java | 119 +++++++
opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java | 308 +++++++++---------
opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java | 10
opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java | 2
opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java | 109 +-----
11 files changed, 555 insertions(+), 353 deletions(-)
diff --git a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
index 5d91e23..9e9f8f1 100644
--- a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
+++ b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -37,9 +37,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
@@ -61,7 +59,7 @@
import com.sun.grizzly.ssl.*;
import com.sun.grizzly.streams.StreamWriter;
import com.sun.opends.sdk.util.Validator;
-
+import com.sun.opends.sdk.util.StaticUtils;
/**
@@ -666,6 +664,8 @@
private final Object writeLock = new Object();
+ private final SearchRequest pingRequest;
+
/**
* Creates a new LDAP connection.
*
@@ -676,18 +676,22 @@
* @param schema
* The schema which will be used to decode responses from the
* server.
+ * @param pingRequest
+ * The search request to use to verify the validity of
+ * this connection.
* @param connFactory
* The associated connection factory.
*/
LDAPConnection(com.sun.grizzly.Connection<?> connection,
InetSocketAddress serverAddress, Schema schema,
- LDAPConnectionFactoryImpl connFactory)
+ SearchRequest pingRequest, LDAPConnectionFactoryImpl connFactory)
{
this.connection = connection;
this.serverAddress = serverAddress;
this.schema = schema;
this.connFactory = connFactory;
this.streamWriter = getFilterChainStreamWriter();
+ this.pingRequest = pingRequest;
}
@@ -809,16 +813,7 @@
NullPointerException
{
Validator.ensureNotNull(listener);
-
- synchronized (writeLock)
- {
- if (isClosed)
- {
- throw new IllegalStateException();
- }
-
- listeners.add(listener);
- }
+ listeners.add(listener);
}
@@ -1258,11 +1253,7 @@
ConnectionEventListener listener) throws NullPointerException
{
Validator.ensureNotNull(listener);
-
- synchronized (writeLock)
- {
- listeners.remove(listener);
- }
+ listeners.remove(listener);
}
@@ -1354,13 +1345,17 @@
private void close(UnbindRequest unbindRequest,
- boolean isDisconnectNotification, Result reason)
+ boolean isDisconnectNotification, Result reason)
{
+ boolean notifyClose = false;
+ boolean notifyErrorOccurred = false;
+
synchronized (writeLock)
{
- boolean notifyClose = false;
- boolean notifyErrorOccurred = false;
-
+ if(reason == null && unbindRequest == null)
+ {
+ System.out.println("Something is wrong!");
+ }
if (isClosed)
{
// Already closed.
@@ -1381,112 +1376,104 @@
if (connectionInvalidReason != null)
{
// Already invalid.
- if (notifyClose)
- {
- // TODO: uncomment if close notification is required.
- // for (ConnectionEventListener listener : listeners)
- // {
- // listener.connectionClosed(this);
- // }
- }
return;
}
- // First abort all outstanding requests.
- for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
- .values())
- {
- if (pendingBindOrStartTLS <= 0)
- {
- ASN1StreamWriter asn1Writer = connFactory
- .getASN1Writer(streamWriter);
- int messageID = nextMsgID.getAndIncrement();
- AbandonRequest abandon = Requests.newAbandonRequest(future
- .getRequestID());
- try
- {
- LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
- abandon);
- asn1Writer.flush();
- }
- catch (IOException e)
- {
- // Underlying channel probably blown up. Just ignore.
- }
- finally
- {
- connFactory.releaseASN1Writer(asn1Writer);
- }
- }
+ // Mark the connection as invalid.
+ connectionInvalidReason = reason;
+ }
- future.adaptErrorResult(reason);
- }
- pendingRequests.clear();
-
- // Now try cleanly closing the connection if possible.
- try
+ // First abort all outstanding requests.
+ for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
+ .values())
+ {
+ if (pendingBindOrStartTLS <= 0)
{
ASN1StreamWriter asn1Writer = connFactory
.getASN1Writer(streamWriter);
- if (unbindRequest == null)
- {
- unbindRequest = Requests.newUnbindRequest();
- }
-
+ int messageID = nextMsgID.getAndIncrement();
+ AbandonRequest abandon = Requests.newAbandonRequest(future
+ .getRequestID());
try
{
- LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
- .getAndIncrement(), unbindRequest);
+ LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
+ abandon);
asn1Writer.flush();
}
+ catch (IOException e)
+ {
+ // Underlying channel probably blown up. Just ignore.
+ }
finally
{
connFactory.releaseASN1Writer(asn1Writer);
}
}
- catch (IOException e)
+
+ future.adaptErrorResult(reason);
+ }
+ pendingRequests.clear();
+
+ // Now try cleanly closing the connection if possible.
+ try
+ {
+ ASN1StreamWriter asn1Writer = connFactory
+ .getASN1Writer(streamWriter);
+ if (unbindRequest == null)
{
- // Underlying channel prob blown up. Just ignore.
+ unbindRequest = Requests.newUnbindRequest();
}
try
{
- streamWriter.close();
+ LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
+ .getAndIncrement(), unbindRequest);
+ asn1Writer.flush();
}
- catch (IOException e)
+ finally
{
- // Ignore.
+ connFactory.releaseASN1Writer(asn1Writer);
}
+ }
+ catch (IOException e)
+ {
+ // Underlying channel prob blown up. Just ignore.
+ }
- try
- {
- connection.close();
- }
- catch (IOException e)
- {
- // Ignore.
- }
+ try
+ {
+ streamWriter.close();
+ }
+ catch (IOException e)
+ {
+ // Ignore.
+ }
- // Mark the connection as invalid.
- connectionInvalidReason = reason;
+ try
+ {
+ connection.close();
+ }
+ catch (IOException e)
+ {
+ // Ignore.
+ }
- // Notify listeners.
- if (notifyClose)
- {
- // TODO: uncomment if close notification is required.
- // for (ConnectionEventListener listener : listeners)
- // {
- // listener.connectionClosed(this);
- // }
- }
+ // Notify listeners.
+ if (notifyClose)
+ {
+ // TODO: uncomment if close notification is required.
+ // for (ConnectionEventListener listener : listeners)
+ // {
+ // listener.connectionClosed(this);
+ // }
+ }
- if (notifyErrorOccurred)
+ if (notifyErrorOccurred)
+ {
+ for (ConnectionEventListener listener : listeners)
{
- for (ConnectionEventListener listener : listeners)
- {
- listener.connectionErrorOccurred(isDisconnectNotification,
- ErrorResultException.wrap(reason));
- }
+ listener.connectionErrorOccurred(isDisconnectNotification,
+ ErrorResultException.wrap(reason));
}
}
}
@@ -1506,27 +1493,38 @@
*/
public boolean isClosed()
{
- synchronized (writeLock)
- {
- return isClosed;
- }
+ return isClosed;
}
- //
- //
- //
- // /**
- // * {@inheritDoc}
- // */
- // public boolean isValid() throws InterruptedException
- // {
- // synchronized (writeLock)
- // {
- // return connectionInvalidReason == null;
- // }
- // }
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isValid()
+ {
+ if(!isClosed && connectionInvalidReason == null)
+ {
+ try
+ {
+ search(pingRequest, null, null).get(5, TimeUnit.SECONDS);
+ return true;
+ }
+ catch (ErrorResultException e)
+ {
+ StaticUtils.DEBUG_LOG.warning("Validity check returned error: " +
+ e.getResult());
+ }
+ catch (TimeoutException e)
+ {
+ StaticUtils.DEBUG_LOG.warning("Validity check timed out");
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ return false;
+ }
//
//
//
diff --git a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
index d56a8fb..f815431 100644
--- a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
+++ b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -460,7 +460,7 @@
.getDefaultFilterChainFactory().getFilterChainPattern());
LDAPConnection ldapConnection = new LDAPConnection(connection,
- socketAddress, options.getSchema(), this);
+ socketAddress, options.getSchema(), options.getPingRequest(), this);
ldapConnectionAttr.set(connection, ldapConnection);
return ldapConnection;
}
diff --git a/opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java b/opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java
index 2971d6b..82675de 100644
--- a/opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java
+++ b/opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java
@@ -309,6 +309,13 @@
+ public boolean isValid()
+ {
+ return connection.isValid();
+ }
+
+
+
public void close()
{
connection.close();
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
new file mode 100644
index 0000000..d5676dd
--- /dev/null
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -0,0 +1,119 @@
+package org.opends.sdk;
+
+import com.sun.opends.sdk.util.Validator;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:49:17
+ * PM To change this template use File | Settings | File Templates.
+ */
+public abstract class AbstractLoadBalancingAlgorithm
+ implements LoadBalancingAlgorithm
+{
+ protected final List<MonitoredConnectionFactory> factoryList;
+
+ protected AbstractLoadBalancingAlgorithm(ConnectionFactory<?>... factories)
+ {
+ Validator.ensureNotNull(factories);
+ factoryList = new ArrayList<MonitoredConnectionFactory>(factories.length);
+ for(ConnectionFactory<?> f : factories)
+ {
+ factoryList.add(new MonitoredConnectionFactory(f));
+ }
+
+ new MonitorThread().start();
+ }
+
+ protected class MonitoredConnectionFactory
+ extends AbstractConnectionFactory<AsynchronousConnection>
+ implements ResultHandler<AsynchronousConnection>
+ {
+ private final ConnectionFactory<?> factory;
+ private volatile boolean isOperational;
+ private volatile FutureResult<?> pendingConnectFuture;
+
+ private MonitoredConnectionFactory(ConnectionFactory<?> factory)
+ {
+ this.factory = factory;
+ this.isOperational = true;
+ }
+
+ public boolean isOperational()
+ {
+ return isOperational;
+ }
+
+ public void handleErrorResult(ErrorResultException error)
+ {
+ isOperational = false;
+ }
+
+ public void handleResult(AsynchronousConnection result)
+ {
+ isOperational = true;
+ // TODO: Notify the server is back up
+ result.close();
+ }
+
+ public FutureResult<? extends AsynchronousConnection>
+ getAsynchronousConnection(
+ final ResultHandler<? super AsynchronousConnection> resultHandler)
+ {
+ ResultHandler handler = new ResultHandler<AsynchronousConnection>()
+ {
+ public void handleErrorResult(ErrorResultException error)
+ {
+ isOperational = false;
+ if(resultHandler != null)
+ {
+ resultHandler.handleErrorResult(error);
+ }
+ }
+
+ public void handleResult(AsynchronousConnection result)
+ {
+ isOperational = true;
+ if(resultHandler != null)
+ {
+ resultHandler.handleResult(result);
+ }
+ }
+ };
+ return factory.getAsynchronousConnection(handler);
+ }
+ }
+
+ private class MonitorThread extends Thread
+ {
+ private MonitorThread()
+ {
+ super("Connection Factory Health Monitor");
+ this.setDaemon(true);
+ }
+
+ public void run()
+ {
+ while(true)
+ {
+ for(MonitoredConnectionFactory f : factoryList)
+ {
+ if(!f.isOperational && (f.pendingConnectFuture == null ||
+ f.pendingConnectFuture.isDone()))
+ {
+ f.pendingConnectFuture = f.factory.getAsynchronousConnection(f);
+ }
+ }
+ try
+ {
+ sleep(10000);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore and just go around again...
+ }
+ }
+ }
+ }
+}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java b/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
index 00d2ba0..ecbc74d 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
@@ -31,6 +31,7 @@
import java.io.Closeable;
import java.util.Collection;
+import java.util.concurrent.TimeUnit;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.BindResult;
@@ -377,6 +378,21 @@
/**
+ * Returns true if the connection has not been closed and is still valid.
+ * The implementation shall submit a search on the connection or use some
+ * other mechanism that positively verifies the connection is still valid
+ * when this method is called.
+ *
+ * The query submitted by the driver to validate the connection shall be
+ * executed in the authentication context.
+ *
+ * @return {@code true} if the connection is valid, {@code false} otherwise.
+ */
+ boolean isValid();
+
+
+
+ /**
* Modifies an entry in the Directory Server using the provided modify
* request.
*
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
index dc22fae..def5ef1 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -31,7 +31,9 @@
import java.util.Collection;
import java.util.Stack;
+import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -58,40 +60,17 @@
private final int poolSize;
// FIXME: should use a better collection than this - CLQ?
- private final Stack<AsynchronousConnection> pool;
+ private final Queue<AsynchronousConnection> pool;
private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
- private final Object lock = new Object();
-
-
-
- private final class FutureNewConnection
- extends
- FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
- {
- private FutureNewConnection(
- ResultHandler<? super AsynchronousConnection> handler)
- {
- super(handler);
- }
-
-
-
- protected AsynchronousConnection transformResult(
- AsynchronousConnection result) throws ErrorResultException
- {
- return new PooledConnectionWapper(result);
- }
- }
-
private final class PooledConnectionWapper implements
AsynchronousConnection, ConnectionEventListener
{
- private AsynchronousConnection connection;
-
+ private final AsynchronousConnection connection;
+ private volatile boolean isClosed;
private PooledConnectionWapper(AsynchronousConnection connection)
@@ -106,7 +85,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -120,7 +99,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -134,7 +113,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -145,29 +124,26 @@
public void close()
{
- synchronized (lock)
+ synchronized (pool)
{
- try
+ if(isClosed)
{
- // Don't put closed connections back in the pool.
- if (connection.isClosed())
- {
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Dead connection released to pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
- }
- return;
- }
+ return;
+ }
+ isClosed = true;
+ // Don't put closed connections back in the pool.
+ if (!connection.isValid())
+ {
+ numConnections--;
+ }
+ else
+ {
// See if there waiters pending.
for (;;)
{
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ connection);
FuturePooledConnection future = pendingFutures.poll();
if (future == null)
@@ -176,8 +152,6 @@
break;
}
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- connection);
future.handleResult(pooledConnection);
if (!future.isCancelled())
@@ -189,34 +163,45 @@
StaticUtils.DEBUG_LOG
.finest(String
.format(
- "Connection released to pool and directly "
- + "given to waiter. numConnections: %d, poolSize: %d, "
- + "pendingFutures: %d", numConnections,
- pool.size(), pendingFutures.size()));
+ "Connection released to pool and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
}
return;
}
}
// No waiters. Put back in pool.
- pool.push(connection);
+ pool.offer(connection);
if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
{
StaticUtils.DEBUG_LOG
.finest(String
.format(
- "Connection released to pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
+ "Connection released to pool and directly "
+ + "given to waiter. numConnections: %d, poolSize: %d, "
+ + "pendingFutures: %d", numConnections,
+ pool.size(), pendingFutures.size()));
}
- }
- finally
- {
- // Null out the underlying connection to prevent further use.
- connection = null;
+ return;
}
}
+
+ // Connection is no longer valid. Close outside of lock
+ connection.removeConnectionEventListener(this);
+ connection.close();
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Dead connection released to pool. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ return;
}
@@ -234,7 +219,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -248,7 +233,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -262,7 +247,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -276,7 +261,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -290,7 +275,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -305,7 +290,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -324,7 +309,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -343,7 +328,7 @@
throws UnsupportedOperationException, IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -359,7 +344,7 @@
ResultHandler<RootDSE> handler)
throws UnsupportedOperationException, IllegalStateException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -375,7 +360,7 @@
ResultHandler<Schema> handler)
throws UnsupportedOperationException, IllegalStateException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -391,7 +376,7 @@
ResultHandler<Schema> handler)
throws UnsupportedOperationException, IllegalStateException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -404,7 +389,7 @@
ConnectionEventListener listener) throws IllegalStateException,
NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -415,7 +400,7 @@
public void removeConnectionEventListener(
ConnectionEventListener listener) throws NullPointerException
{
- if (connection == null)
+ if (isClosed())
{
throw new IllegalStateException();
}
@@ -428,10 +413,13 @@
*/
public boolean isClosed()
{
- return connection == null;
+ return isClosed;
}
-
+ public boolean isValid()
+ {
+ return !isClosed && connection.isValid();
+ }
public void connectionReceivedUnsolicitedNotification(
GenericExtendedResult notification)
@@ -444,10 +432,10 @@
public void connectionErrorOccurred(
boolean isDisconnectNotification, ErrorResultException error)
{
- synchronized (lock)
+ // Remove this connection from the pool if its in there. If not, just
+ // ignore and wait for the user to close and we can deal with it there.
+ if(pool.remove(this))
{
- // Remove this connection from the pool if its in there
- pool.remove(this);
numConnections--;
connection.removeConnectionEventListener(this);
@@ -460,11 +448,11 @@
StaticUtils.DEBUG_LOG
.finest(String
.format(
- "Connection error occured: "
- + error.getMessage()
- + " numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
+ "Connection error occured: "
+ + error.getMessage()
+ + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
}
}
}
@@ -510,86 +498,100 @@
{
this.connectionFactory = connectionFactory;
this.poolSize = poolSize;
- this.pool = new Stack<AsynchronousConnection>();
+ this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
}
- public FutureResult<AsynchronousConnection> getAsynchronousConnection(
- ResultHandler<? super AsynchronousConnection> handler)
+ public synchronized FutureResult<AsynchronousConnection>
+ getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler)
{
- synchronized (lock)
+ // This entire method is synchronized to ensure new connects are done
+ // synchronously to avoid the "pending connect" case.
+ AsynchronousConnection conn;
+ synchronized(pool)
{
// Check to see if we have a connection in the pool
-
- if (!pool.isEmpty())
+ conn = pool.poll();
+ if (conn == null)
{
- AsynchronousConnection conn = pool.pop();
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ // Pool was empty. Maybe a new connection if pool size is not
+ // reached
+ if (numConnections >= poolSize)
{
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "Connection aquired from pool. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
+ // We reached max # of conns so wait for a connection to become available.
+ FuturePooledConnection future = new FuturePooledConnection(
+ handler);
+ pendingFutures.add(future);
+
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "No connections available. Wait-listed"
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+
+ return future;
}
- PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
- conn);
- if (handler != null)
- {
- handler.handleResult(pooledConnection);
- }
- return new CompletedFutureResult<AsynchronousConnection>(
- pooledConnection);
- }
-
- // Pool was empty. Maybe a new connection if pool size is not
- // reached
- if (numConnections < poolSize)
- {
- // We can create a new connection.
- numConnections++;
-
- FutureNewConnection future = new FutureNewConnection(handler);
- future.setFutureResult(connectionFactory
- .getAsynchronousConnection(future));
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "New connection established and aquired. "
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
- }
-
- return future;
- }
- else
- {
- // Pool is full so wait for a connection to become available.
- FuturePooledConnection future = new FuturePooledConnection(
- handler);
- pendingFutures.add(future);
-
- if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
- {
- StaticUtils.DEBUG_LOG
- .finest(String
- .format(
- "No connections available. Wait-listed"
- + "numConnections: %d, poolSize: %d, pendingFutures: %d",
- numConnections, pool.size(), pendingFutures
- .size()));
- }
-
- return future;
}
}
+
+ if(conn == null)
+ {
+ try
+ {
+ // We can create a new connection.
+ conn = connectionFactory.getAsynchronousConnection(handler).get();
+ numConnections++;
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "New connection established and aquired. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ }
+ catch (ErrorResultException e)
+ {
+ return new CompletedFutureResult<AsynchronousConnection>(e);
+ }
+ catch (InterruptedException e)
+ {
+ return new CompletedFutureResult<AsynchronousConnection>(
+ new ErrorResultException(Responses.newResult(
+ ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
+ }
+ }
+ else
+ {
+ if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+ {
+ StaticUtils.DEBUG_LOG
+ .finest(String
+ .format(
+ "Connection aquired from pool. "
+ + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+ numConnections, pool.size(), pendingFutures
+ .size()));
+ }
+ }
+
+ PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+ conn);
+ if (handler != null)
+ {
+ handler.handleResult(pooledConnection);
+ }
+ return new CompletedFutureResult<AsynchronousConnection>(
+ pooledConnection);
+
}
}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
new file mode 100644
index 0000000..cb5c309
--- /dev/null
+++ b/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
@@ -0,0 +1,26 @@
+package org.opends.sdk;
+
+/**
+ * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 5:42:01
+ * PM To change this template use File | Settings | File Templates.
+ */
+public class FailoverLoadBalancingAlgorithm
+ extends AbstractLoadBalancingAlgorithm
+{
+ public FailoverLoadBalancingAlgorithm(ConnectionFactory<?>... factories)
+ {
+ super(factories);
+ }
+
+ public ConnectionFactory<?> getNextConnectionFactory()
+ {
+ for(MonitoredConnectionFactory f : factoryList)
+ {
+ if(f.isOperational())
+ {
+ return f;
+ }
+ }
+ return null;
+ }
+}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index 285633c..3d426cd 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,13 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.FutureResultTransformer;
+import com.sun.opends.sdk.util.Validator;
@@ -46,47 +46,20 @@
* An heart beat connection factory can be used to create connections
* that sends a periodic search request to a Directory Server.
*/
-final class HeartBeatConnectionFactory extends
+public class HeartBeatConnectionFactory extends
AbstractConnectionFactory<AsynchronousConnection>
{
- private final SearchRequest heartBeat;
-
- private final long interval;
+ private final int interval;
private final List<AsynchronousConnectionImpl> activeConnections;
private final ConnectionFactory<?> parentFactory;
- private volatile boolean stopRequested;
-
// FIXME: use a single global scheduler?
- /**
- * Creates a new heart-beat connection factory which will create
- * connections using the provided connection factory and periodically
- * ping any created connections in order to detect that they are still
- * alive.
- *
- * @param connectionFactory
- * The connection factory to use for creating connections.
- * @param timeout
- * The time to wait between keepalive pings.
- * @param unit
- * The time unit of the timeout argument.
- */
- HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
- long timeout, TimeUnit unit)
- {
- this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
- }
-
-
-
- private static final SearchRequest DEFAULT_SEARCH = Requests
- .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
- "1.1");
+ // FIXME: change timeout parameters to long+TimeUnit.
@@ -98,30 +71,18 @@
*
* @param connectionFactory
* The connection factory to use for creating connections.
- * @param timeout
- * The time to wait between keepalive pings.
- * @param unit
- * The time unit of the timeout argument.
- * @param heartBeat
- * The search request to use when pinging connections.
+ * @param interval
+ * The period between keepalive pings.
*/
- HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
- long timeout, TimeUnit unit, SearchRequest heartBeat)
+ public HeartBeatConnectionFactory(
+ ConnectionFactory<?> connectionFactory,
+ int interval)
{
- this.heartBeat = heartBeat;
- this.interval = unit.toMillis(timeout);
+ Validator.ensureNotNull(connectionFactory);
+ this.interval = interval;
this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
this.parentFactory = connectionFactory;
- Runtime.getRuntime().addShutdownHook(new Thread()
- {
- @Override
- public void run()
- {
- stopRequested = true;
- }
- });
-
new HeartBeatThread().start();
}
@@ -132,13 +93,11 @@
* operations.
*/
private final class AsynchronousConnectionImpl implements
- AsynchronousConnection, ConnectionEventListener,
- ResultHandler<Result>
+ AsynchronousConnection, ConnectionEventListener
{
private final AsynchronousConnection connection;
-
private AsynchronousConnectionImpl(AsynchronousConnection connection)
{
this.connection = connection;
@@ -352,7 +311,15 @@
return connection.isClosed();
}
-
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isValid()
+ {
+ // Avoid extra pings... Let the next ping find out if this connection
+ // is still valid.
+ return connection.isValid();
+ }
public void connectionReceivedUnsolicitedNotification(
GenericExtendedResult notification)
@@ -371,31 +338,6 @@
activeConnections.remove(this);
}
}
-
-
-
- public void handleErrorResult(ErrorResultException error)
- {
- // TODO: I18N
- if (error instanceof TimeoutResultException)
- {
- close(Requests.newUnbindRequest(), "Heart beat timed out");
- }
- }
-
-
-
- public void handleResult(Result result)
- {
- // Do nothing
- }
-
-
-
- private void sendHeartBeat()
- {
- search(heartBeat, this, null);
- }
}
@@ -405,19 +347,24 @@
private HeartBeatThread()
{
super("Heart Beat Thread");
+ this.setDaemon(true);
}
public void run()
{
- while (!stopRequested)
+ while(true)
{
synchronized (activeConnections)
{
for (AsynchronousConnectionImpl connection : activeConnections)
{
- connection.sendHeartBeat();
+ if(!connection.isValid())
+ {
+ connection.close(Requests.newUnbindRequest(),
+ "Connection no longer valid");
+ }
}
}
try
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java
new file mode 100644
index 0000000..f11cc7c
--- /dev/null
+++ b/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java
@@ -0,0 +1,10 @@
+package org.opends.sdk;
+
+/**
+ * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:37:03
+ * PM To change this template use File | Settings | File Templates.
+ */
+public interface LoadBalancingAlgorithm
+{
+ public ConnectionFactory<?> getNextConnectionFactory();
+}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java b/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java
new file mode 100644
index 0000000..d03a624
--- /dev/null
+++ b/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java
@@ -0,0 +1,46 @@
+package org.opends.sdk;
+
+import com.sun.opends.sdk.util.Validator;
+import com.sun.opends.sdk.util.AbstractFutureResult;
+
+import org.opends.sdk.responses.Responses;
+
+/**
+ * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:23:52
+ * PM To change this template use File | Settings | File Templates.
+ */
+public class LoadBalancingConnectionFactory
+ extends AbstractConnectionFactory<AsynchronousConnection>
+{
+ private final LoadBalancingAlgorithm algorithm;
+
+ public LoadBalancingConnectionFactory(LoadBalancingAlgorithm algorithm)
+ {
+ Validator.ensureNotNull(algorithm);
+ this.algorithm = algorithm;
+ }
+
+ public FutureResult<? extends AsynchronousConnection>
+ getAsynchronousConnection(
+ ResultHandler<? super AsynchronousConnection> resultHandler)
+ {
+ ConnectionFactory<?> factory = algorithm.getNextConnectionFactory();
+ if(factory == null)
+ {
+ AbstractFutureResult<AsynchronousConnection> future =
+ new AbstractFutureResult<AsynchronousConnection>(resultHandler)
+ {
+ public int getRequestID()
+ {
+ return -1;
+ }
+ };
+ future.handleErrorResult(new ErrorResultException(
+ Responses.newResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR).
+ setDiagnosticMessage("No connection factories available")));
+ return future;
+ }
+
+ return factory.getAsynchronousConnection(resultHandler);
+ }
+}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java b/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
index 1267b96..323a27d 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
@@ -32,6 +32,9 @@
import javax.net.ssl.SSLContext;
import org.opends.sdk.schema.Schema;
+import org.opends.sdk.requests.SearchRequest;
+import org.opends.sdk.requests.Requests;
+import org.opends.sdk.SearchScope;
import com.sun.opends.sdk.util.Validator;
@@ -42,12 +45,18 @@
*/
public final class LDAPConnectionOptions
{
+ private static final SearchRequest DEFAULT_PING = Requests
+ .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
+ "1.1");
+
private Schema schema = Schema.getDefaultSchema();
private SSLContext sslContext = null;
private boolean useStartTLS = false;
+ private SearchRequest pingRequest = DEFAULT_PING;
+
/**
@@ -197,7 +206,29 @@
return this;
}
+ /**
+ * Retrieves the search request used to verify the validity of a
+ * LDAP connection. By default, the root DSE is retrieved without any
+ * attributes.
+ *
+ * @return The search request.
+ */
+ public SearchRequest getPingRequest()
+ {
+ return pingRequest;
+ }
+ /**
+ * Sets the search request used to verify the validity of a
+ * LDAP connection.
+ *
+ * @param pingRequest The search request that can be used to verify the
+ * validity of a LDAP connection.
+ */
+ public void setPingRequest(SearchRequest pingRequest)
+ {
+ this.pingRequest = pingRequest;
+ }
// Assigns the provided options to this set of options.
LDAPConnectionOptions assign(LDAPConnectionOptions options)
--
Gitblit v1.10.0