From 23b59def043a8f71238ec5d73a393b32fb40f83c Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Thu, 14 Oct 2010 16:15:50 +0000
Subject: [PATCH] Commit from OpenDS, matthew_swift * add unit tests for ConnectionEventListeners.
---
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java | 532 ++++++++++++++++------------------------------------------
1 files changed, 146 insertions(+), 386 deletions(-)
diff --git a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index 381cc43..77a29cb 100644
--- a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -29,16 +29,23 @@
-import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.opends.sdk.requests.*;
-import org.opends.sdk.responses.*;
-import org.opends.sdk.schema.Schema;
+import org.opends.sdk.requests.Requests;
+import org.opends.sdk.requests.SearchRequest;
+import org.opends.sdk.responses.ExtendedResult;
+import org.opends.sdk.responses.Result;
+import org.opends.sdk.responses.SearchResultEntry;
+import org.opends.sdk.responses.SearchResultReference;
+import com.sun.opends.sdk.util.AsynchronousConnectionDecorator;
import com.sun.opends.sdk.util.FutureResultTransformer;
+import com.sun.opends.sdk.util.StaticUtils;
+import com.sun.opends.sdk.util.Validator;
@@ -52,11 +59,10 @@
* An asynchronous connection that sends heart beats and supports all
* operations.
*/
- private final class AsynchronousConnectionImpl implements
- AsynchronousConnection, ConnectionEventListener, SearchResultHandler
+ private final class AsynchronousConnectionImpl extends
+ AsynchronousConnectionDecorator implements ConnectionEventListener,
+ SearchResultHandler
{
- private final AsynchronousConnection connection;
-
private long lastSuccessfulPing;
private FutureResult<Result> lastPingFuture;
@@ -65,187 +71,24 @@
private AsynchronousConnectionImpl(final AsynchronousConnection connection)
{
- this.connection = connection;
+ super(connection);
}
- public FutureResult<Void> abandon(final AbandonRequest request)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.abandon(request);
- }
-
-
-
- public FutureResult<Result> add(final AddRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.add(request, handler);
- }
-
-
-
- public FutureResult<Result> add(final AddRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection
- .add(request, resultHandler, intermediateResponseHandler);
- }
-
-
-
- public void addConnectionEventListener(
- final ConnectionEventListener listener) throws IllegalStateException,
- NullPointerException
- {
- connection.addConnectionEventListener(listener);
- }
-
-
-
- public FutureResult<BindResult> bind(final BindRequest request,
- final ResultHandler<? super BindResult> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.bind(request, handler);
- }
-
-
-
- public FutureResult<BindResult> bind(final BindRequest request,
- final ResultHandler<? super BindResult> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.bind(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- public void close()
- {
- synchronized (activeConnections)
- {
- connection.removeConnectionEventListener(this);
- activeConnections.remove(this);
- }
- connection.close();
- }
-
-
-
- public void close(final UnbindRequest request, final String reason)
- throws NullPointerException
- {
- synchronized (activeConnections)
- {
- connection.removeConnectionEventListener(this);
- activeConnections.remove(this);
- }
- connection.close(request, reason);
- }
-
-
-
- public FutureResult<CompareResult> compare(final CompareRequest request,
- final ResultHandler<? super CompareResult> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.compare(request, handler);
- }
-
-
-
- public FutureResult<CompareResult> compare(final CompareRequest request,
- final ResultHandler<? super CompareResult> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.compare(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
+ @Override
public void handleConnectionClosed()
{
- // Ignore - we intercept close through the close method.
+ notifyClosed();
}
+ @Override
public void handleConnectionError(final boolean isDisconnectNotification,
final ErrorResultException error)
{
- synchronized (activeConnections)
- {
- connection.removeConnectionEventListener(this);
- activeConnections.remove(this);
- }
- }
-
-
-
- public void handleUnsolicitedNotification(final ExtendedResult notification)
- {
- // Do nothing
- }
-
-
-
- public FutureResult<Result> delete(final DeleteRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.delete(request, handler);
- }
-
-
-
- public FutureResult<Result> delete(final DeleteRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.delete(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- public <R extends ExtendedResult> FutureResult<R> extendedRequest(
- final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.extendedRequest(request, handler);
- }
-
-
-
- public <R extends ExtendedResult> FutureResult<R> extendedRequest(
- final ExtendedRequest<R> request,
- final ResultHandler<? super R> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.extendedRequest(request, resultHandler,
- intermediateResponseHandler);
+ notifyClosed();
}
@@ -253,17 +96,8 @@
/**
* {@inheritDoc}
*/
- public Connection getSynchronousConnection()
- {
- return new SynchronousConnection(this);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public boolean handleEntry(SearchResultEntry entry)
+ @Override
+ public boolean handleEntry(final SearchResultEntry entry)
{
// Ignore.
return true;
@@ -271,6 +105,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void handleErrorResult(final ErrorResultException error)
{
connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: "
@@ -282,7 +120,8 @@
/**
* {@inheritDoc}
*/
- public boolean handleReference(SearchResultReference reference)
+ @Override
+ public boolean handleReference(final SearchResultReference reference)
{
// Ignore.
return true;
@@ -290,6 +129,10 @@
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void handleResult(final Result result)
{
lastSuccessfulPing = System.currentTimeMillis();
@@ -297,12 +140,10 @@
- /**
- * {@inheritDoc}
- */
- public boolean isClosed()
+ @Override
+ public void handleUnsolicitedNotification(final ExtendedResult notification)
{
- return connection.isClosed();
+ // Do nothing
}
@@ -310,55 +151,12 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isValid()
{
return connection.isValid()
&& (lastSuccessfulPing <= 0 || System.currentTimeMillis()
- - lastSuccessfulPing < unit.toMillis(timeout) * 2);
- }
-
-
-
- public FutureResult<Result> modify(final ModifyRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.modify(request, handler);
- }
-
-
-
- public FutureResult<Result> modify(final ModifyRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.modify(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- public FutureResult<Result> modifyDN(final ModifyDNRequest request,
- final ResultHandler<? super Result> handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.modifyDN(request, handler);
- }
-
-
-
- public FutureResult<Result> modifyDN(final ModifyDNRequest request,
- final ResultHandler<? super Result> resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.modifyDN(request, resultHandler,
- intermediateResponseHandler);
+ - lastSuccessfulPing < unit.toMillis(interval) * 2);
}
@@ -366,108 +164,32 @@
/**
* {@inheritDoc}
*/
- public FutureResult<SearchResultEntry> readEntry(final DN name,
- final Collection<String> attributeDescriptions,
- final ResultHandler<? super SearchResultEntry> resultHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.readEntry(name, attributeDescriptions, resultHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public FutureResult<RootDSE> readRootDSE(
- final ResultHandler<? super RootDSE> handler)
- throws UnsupportedOperationException, IllegalStateException
- {
- return connection.readRootDSE(handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public FutureResult<Schema> readSchema(final DN name,
- final ResultHandler<? super Schema> handler)
- throws UnsupportedOperationException, IllegalStateException
- {
- return connection.readSchema(name, handler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public FutureResult<Schema> readSchemaForEntry(final DN name,
- final ResultHandler<? super Schema> handler)
- throws UnsupportedOperationException, IllegalStateException
- {
- return connection.readSchemaForEntry(name, handler);
- }
-
-
-
- public void removeConnectionEventListener(
- final ConnectionEventListener listener) throws NullPointerException
- {
- connection.removeConnectionEventListener(listener);
- }
-
-
-
- public FutureResult<Result> search(final SearchRequest request,
- final SearchResultHandler handler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.search(request, handler);
- }
-
-
-
- public FutureResult<Result> search(final SearchRequest request,
- final SearchResultHandler resultHandler,
- final IntermediateResponseHandler intermediateResponseHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.search(request, resultHandler,
- intermediateResponseHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public FutureResult<SearchResultEntry> searchSingleEntry(
- final SearchRequest request,
- final ResultHandler<? super SearchResultEntry> resultHandler)
- throws UnsupportedOperationException, IllegalStateException,
- NullPointerException
- {
- return connection.searchSingleEntry(request, resultHandler);
- }
-
-
-
- /**
- * {@inheritDoc}
- */
+ @Override
public String toString()
{
- StringBuilder builder = new StringBuilder();
+ final StringBuilder builder = new StringBuilder();
builder.append("HeartBeatConnection(");
builder.append(connection);
builder.append(')');
return builder.toString();
}
+
+
+
+ private void notifyClosed()
+ {
+ synchronized (activeConnections)
+ {
+ connection.removeConnectionEventListener(this);
+ activeConnections.remove(this);
+
+ if (activeConnections.isEmpty())
+ {
+ // This is the last active connection, so stop the heart beat.
+ heartBeatFuture.cancel(false);
+ }
+ }
+ }
}
@@ -497,6 +219,12 @@
synchronized (activeConnections)
{
connection.addConnectionEventListener(heartBeatConnection);
+ if (activeConnections.isEmpty())
+ {
+ // This is the first active connection, so start the heart beat.
+ heartBeatFuture = scheduler.scheduleWithFixedDelay(
+ new HeartBeatRunnable(), 0, interval, unit);
+ }
activeConnections.add(heartBeatConnection);
}
return heartBeatConnection;
@@ -506,12 +234,11 @@
- private final class HeartBeatThread extends Thread
+ private final class HeartBeatRunnable implements Runnable
{
- private HeartBeatThread()
+ private HeartBeatRunnable()
{
- super("Heart Beat Thread");
- this.setDaemon(true);
+ // Nothing to do.
}
@@ -519,35 +246,17 @@
@Override
public void run()
{
- long startTime;
- while (true)
+ synchronized (activeConnections)
{
- startTime = System.currentTimeMillis();
- synchronized (activeConnections)
+ for (final AsynchronousConnectionImpl connection : activeConnections)
{
- for (final AsynchronousConnectionImpl connection : activeConnections)
+ if (connection.lastPingFuture == null
+ || connection.lastPingFuture.isDone())
{
- if (connection.lastPingFuture == null
- || connection.lastPingFuture.isDone())
- {
- connection.lastPingFuture = connection.search(heartBeat,
- connection, null);
- }
+ connection.lastPingFuture = connection.search(heartBeat,
+ connection, null);
}
}
- try
- {
- final long sleepTime = unit.toMillis(timeout)
- - (System.currentTimeMillis() - startTime);
- if (sleepTime > 0)
- {
- sleep(sleepTime);
- }
- }
- catch (final InterruptedException e)
- {
- // Ignore
- }
}
}
}
@@ -556,37 +265,58 @@
private final SearchRequest heartBeat;
- private final long timeout;
+ private final long interval;
- // FIXME: use a single global scheduler?
+ private final ScheduledExecutorService scheduler;
private final TimeUnit unit;
private final List<AsynchronousConnectionImpl> activeConnections;
- private final ConnectionFactory parentFactory;
+ private final ConnectionFactory factory;
private static final SearchRequest DEFAULT_SEARCH = Requests
.newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
+ private ScheduledFuture<?> heartBeatFuture;
+
/**
* Creates a new heart-beat connection factory which will create connections
* using the provided connection factory and periodically ping any created
- * connections in order to detect that they are still alive.
+ * connections in order to detect that they are still alive every 10 seconds
+ * using the default scheduler.
*
- * @param connectionFactory
+ * @param factory
* The connection factory to use for creating connections.
- * @param timeout
- * The time to wait between keepalive pings.
- * @param unit
- * The time unit of the timeout argument.
*/
- HeartBeatConnectionFactory(final ConnectionFactory connectionFactory,
- final long timeout, final TimeUnit unit)
+ HeartBeatConnectionFactory(final ConnectionFactory factory)
{
- this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
+ this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils
+ .getDefaultScheduler());
+ }
+
+
+
+ /**
+ * Creates a new heart-beat connection factory which will create connections
+ * using the provided connection factory and periodically ping any created
+ * connections in order to detect that they are still alive using the
+ * specified frequency and the default scheduler.
+ *
+ * @param factory
+ * The connection factory to use for creating connections.
+ * @param interval
+ * The interval between keepalive pings.
+ * @param unit
+ * The time unit for the interval between keepalive pings.
+ */
+ HeartBeatConnectionFactory(final ConnectionFactory factory,
+ final long interval, final TimeUnit unit)
+ {
+ this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils
+ .getDefaultScheduler());
}
@@ -597,25 +327,54 @@
* connections using the specified search request in order to detect that they
* are still alive.
*
- * @param connectionFactory
+ * @param factory
* The connection factory to use for creating connections.
- * @param timeout
- * The time to wait between keepalive pings.
+ * @param interval
+ * The interval between keepalive pings.
* @param unit
- * The time unit of the timeout argument.
+ * The time unit for the interval between keepalive pings.
* @param heartBeat
- * The search request to use when pinging connections.
+ * The search request to use for keepalive pings.
*/
- HeartBeatConnectionFactory(final ConnectionFactory connectionFactory,
- final long timeout, final TimeUnit unit, final SearchRequest heartBeat)
+ HeartBeatConnectionFactory(final ConnectionFactory factory,
+ final long interval, final TimeUnit unit, final SearchRequest heartBeat)
{
+ this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler());
+ }
+
+
+
+ /**
+ * Creates a new heart-beat connection factory which will create connections
+ * using the provided connection factory and periodically ping any created
+ * connections using the specified search request in order to detect that they
+ * are still alive.
+ *
+ * @param factory
+ * The connection factory to use for creating connections.
+ * @param interval
+ * The interval between keepalive pings.
+ * @param unit
+ * The time unit for the interval between keepalive pings.
+ * @param heartBeat
+ * The search request to use for keepalive pings.
+ * @param scheduler
+ * The scheduler which should for periodically sending keepalive
+ * pings.
+ */
+ HeartBeatConnectionFactory(final ConnectionFactory factory,
+ final long interval, final TimeUnit unit, final SearchRequest heartBeat,
+ final ScheduledExecutorService scheduler)
+ {
+ Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
+ Validator.ensureTrue(interval >= 0, "negative timeout");
+
this.heartBeat = heartBeat;
- this.timeout = timeout;
+ this.interval = interval;
this.unit = unit;
this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
- this.parentFactory = connectionFactory;
-
- new HeartBeatThread().start();
+ this.factory = factory;
+ this.scheduler = scheduler;
}
@@ -625,7 +384,7 @@
final ResultHandler<? super AsynchronousConnection> handler)
{
final FutureResultImpl future = new FutureResultImpl(handler);
- future.setFutureResult(parentFactory.getAsynchronousConnection(future));
+ future.setFutureResult(factory.getAsynchronousConnection(future));
return future;
}
@@ -634,11 +393,12 @@
/**
* {@inheritDoc}
*/
+ @Override
public String toString()
{
final StringBuilder builder = new StringBuilder();
builder.append("HeartBeatConnectionFactory(");
- builder.append(String.valueOf(parentFactory));
+ builder.append(String.valueOf(factory));
builder.append(')');
return builder.toString();
}
--
Gitblit v1.10.0