From 2bc8d15a28fafab97cefafede06d6b7e738ae0fe Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 11 Dec 2009 18:45:45 +0000
Subject: [PATCH] Various incremental improvements.
---
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java | 448 +++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 329 insertions(+), 119 deletions(-)
diff --git a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index bf237a3..498d486 100644
--- a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -27,6 +27,9 @@
package org.opends.sdk;
+
+
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
@@ -35,46 +38,89 @@
import java.util.concurrent.TimeoutException;
import org.opends.sdk.requests.*;
-import org.opends.sdk.responses.BindResult;
-import org.opends.sdk.responses.CompareResult;
-import org.opends.sdk.responses.GenericExtendedResult;
-import org.opends.sdk.responses.Result;
+import org.opends.sdk.responses.*;
+import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.Validator;
+
+
/**
- * An heart beat connection factory can be used to create
- * connections that sends a periodic search request to a Directory Server.
+ * An heart beat connection factory can be used to create connections
+ * that sends a periodic search request to a Directory Server.
*/
-public class HeartBeatConnectionFactory
- extends AbstractConnectionFactory<
- HeartBeatConnectionFactory.HeartBeatAsynchronousConnection> {
+public class HeartBeatConnectionFactory extends
+ AbstractConnectionFactory<AsynchronousConnection>
+{
private final SearchRequest heartBeat;
+
private final int interval;
- private final List<HeartBeatAsynchronousConnection> activeConnections;
+
+ private final List<AsynchronousConnectionImpl> activeConnections;
+
private final ConnectionFactory<?> parentFactory;
- private boolean stopRequested;
+ private volatile boolean stopRequested;
+
+
+ // FIXME: use a single global scheduler?
+
+ // FIXME: change timeout parameters to long+TimeUnit.
+
+ /**
+ * Creates a new heart-beat connection factory which will create
+ * connections using the provided connection factory and periodically
+ * ping any created connections in order to detect that they are still
+ * alive.
+ *
+ * @param connectionFactory
+ * The connection factory to use for creating connections.
+ * @param interval
+ * The period between keepalive pings.
+ */
public HeartBeatConnectionFactory(
- ConnectionFactory<?> parentFactory,
- int interval) {
- this(parentFactory, Requests.newSearchRequest("", SearchScope.BASE_OBJECT,
- "(objectClass=*)", "1.1"), interval);
+ ConnectionFactory<?> connectionFactory, int interval)
+ {
+ this(connectionFactory, DEFAULT_SEARCH, interval);
}
+
+
+ private static final SearchRequest DEFAULT_SEARCH = Requests
+ .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
+ "1.1");
+
+
+
+ /**
+ * Creates a new heart-beat connection factory which will create
+ * connections using the provided connection factory and periodically
+ * ping any created connections using the specified search request in
+ * order to detect that they are still alive.
+ *
+ * @param connectionFactory
+ * The connection factory to use for creating connections.
+ * @param heartBeat
+ * The search request to use when pinging connections.
+ * @param interval
+ * The period between keepalive pings.
+ */
public HeartBeatConnectionFactory(
- ConnectionFactory<?> parentFactory,
- SearchRequest heartBeat, int interval) {
- Validator.ensureNotNull(parentFactory, heartBeat);
+ ConnectionFactory<?> connectionFactory, SearchRequest heartBeat,
+ int interval)
+ {
+ Validator.ensureNotNull(connectionFactory, heartBeat);
this.heartBeat = heartBeat;
this.interval = interval;
- this.activeConnections = new LinkedList<HeartBeatAsynchronousConnection>();
- this.parentFactory = parentFactory;
+ this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
+ this.parentFactory = connectionFactory;
- Runtime.getRuntime().addShutdownHook(new Thread() {
+ Runtime.getRuntime().addShutdownHook(new Thread()
+ {
@Override
- public void run() {
+ public void run()
+ {
stopRequested = true;
}
});
@@ -82,116 +128,227 @@
new HeartBeatThread().start();
}
+
+
/**
* An asynchronous connection that sends heart beats and supports all
* operations.
*/
- public final class HeartBeatAsynchronousConnection
- implements AsynchronousConnection, ConnectionEventListener,
- ResultHandler<Result, Void> {
+ private final class AsynchronousConnectionImpl implements
+ AsynchronousConnection, ConnectionEventListener,
+ ResultHandler<Result, Void>
+ {
private final AsynchronousConnection connection;
- public HeartBeatAsynchronousConnection(AsynchronousConnection connection) {
+
+
+ private AsynchronousConnectionImpl(AsynchronousConnection connection)
+ {
this.connection = connection;
}
+
+
public void abandon(AbandonRequest request)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
+ NullPointerException
+ {
connection.abandon(request);
}
- public <P> ResultFuture<Result> add(
- AddRequest request,
+
+
+ public <P> ResultFuture<Result> add(AddRequest request,
ResultHandler<Result, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
+ NullPointerException
+ {
return connection.add(request, handler, p);
}
- public <P> ResultFuture<BindResult> bind(
- BindRequest request, ResultHandler<? super BindResult, P> handler, P p)
+
+
+ public <P> ResultFuture<BindResult> bind(BindRequest request,
+ ResultHandler<? super BindResult, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
+ NullPointerException
+ {
return connection.bind(request, handler, p);
}
- public void close() {
- synchronized (activeConnections) {
+
+
+ public void close()
+ {
+ synchronized (activeConnections)
+ {
connection.removeConnectionEventListener(this);
activeConnections.remove(this);
}
connection.close();
}
+
+
public void close(UnbindRequest request, String reason)
- throws NullPointerException {
- synchronized (activeConnections) {
+ throws NullPointerException
+ {
+ synchronized (activeConnections)
+ {
connection.removeConnectionEventListener(this);
activeConnections.remove(this);
}
connection.close(request, reason);
}
+
+
public <P> ResultFuture<CompareResult> compare(
- CompareRequest request, ResultHandler<? super CompareResult, P> handler,
- P p) throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
+ CompareRequest request,
+ ResultHandler<? super CompareResult, P> handler, P p)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
return connection.compare(request, handler, p);
}
- public <P> ResultFuture<Result> delete(
- DeleteRequest request,
- ResultHandler<Result, P> handler,
- P p)
+
+
+ public <P> ResultFuture<Result> delete(DeleteRequest request,
+ ResultHandler<Result, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
+ NullPointerException
+ {
return connection.delete(request, handler, p);
}
+
+
public <R extends Result, P> ResultFuture<R> extendedRequest(
- ExtendedRequest<R> request, ResultHandler<? super R, P> handler, P p)
+ ExtendedRequest<R> request,
+ ResultHandler<? super R, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
+ NullPointerException
+ {
return connection.extendedRequest(request, handler, p);
}
- public <P> ResultFuture<Result> modify(
- ModifyRequest request,
- ResultHandler<Result, P> handler,
- P p)
+
+
+ public <P> ResultFuture<Result> modify(ModifyRequest request,
+ ResultHandler<Result, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
+ NullPointerException
+ {
return connection.modify(request, handler, p);
}
- public <P> ResultFuture<Result> modifyDN(
- ModifyDNRequest request,
- ResultHandler<Result, P> handler,
- P p)
+
+
+ public <P> ResultFuture<Result> modifyDN(ModifyDNRequest request,
+ ResultHandler<Result, P> handler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
+ NullPointerException
+ {
return connection.modifyDN(request, handler, p);
}
- public <P> ResultFuture<Result> search(
- SearchRequest request, ResultHandler<Result, P> resultHandler,
+
+
+ public <P> ResultFuture<Result> search(SearchRequest request,
+ ResultHandler<Result, P> resultHandler,
SearchResultHandler<P> searchResultHandler, P p)
throws UnsupportedOperationException, IllegalStateException,
- NullPointerException {
- return connection.search(request, resultHandler, searchResultHandler, p);
+ NullPointerException
+ {
+ return connection.search(request, resultHandler,
+ searchResultHandler, p);
}
- public void addConnectionEventListener(ConnectionEventListener listener)
- throws IllegalStateException, NullPointerException {
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<SearchResultEntry> readEntry(DN name,
+ Collection<String> attributeDescriptions,
+ ResultHandler<? super SearchResultEntry, P> resultHandler, P p)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return connection.readEntry(name, attributeDescriptions,
+ resultHandler, p);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<SearchResultEntry> searchSingleEntry(
+ SearchRequest request,
+ ResultHandler<? super SearchResultEntry, P> resultHandler, P p)
+ throws UnsupportedOperationException, IllegalStateException,
+ NullPointerException
+ {
+ return connection.searchSingleEntry(request, resultHandler, p);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<RootDSE> readRootDSE(
+ ResultHandler<RootDSE, P> handler, P p)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ return connection.readRootDSE(handler, p);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<Schema> readSchemaForEntry(DN name,
+ ResultHandler<Schema, P> handler, P p)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ return connection.readSchemaForEntry(name, handler, p);
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public <P> ResultFuture<Schema> readSchema(DN name,
+ ResultHandler<Schema, P> handler, P p)
+ throws UnsupportedOperationException, IllegalStateException
+ {
+ return connection.readSchema(name, handler, p);
+ }
+
+
+
+ public void addConnectionEventListener(
+ ConnectionEventListener listener) throws IllegalStateException,
+ NullPointerException
+ {
connection.addConnectionEventListener(listener);
}
- public void removeConnectionEventListener(ConnectionEventListener listener)
- throws NullPointerException {
+
+
+ public void removeConnectionEventListener(
+ ConnectionEventListener listener) throws NullPointerException
+ {
connection.removeConnectionEventListener(listener);
}
+
+
/**
* {@inheritDoc}
*/
@@ -200,62 +357,93 @@
return connection.isClosed();
}
+
+
public void connectionReceivedUnsolicitedNotification(
- GenericExtendedResult notification) {
+ GenericExtendedResult notification)
+ {
// Do nothing
}
+
+
public void connectionErrorOccurred(
- boolean isDisconnectNotification,
- ErrorResultException error) {
- synchronized (activeConnections) {
+ boolean isDisconnectNotification, ErrorResultException error)
+ {
+ synchronized (activeConnections)
+ {
connection.removeConnectionEventListener(this);
activeConnections.remove(this);
}
}
- public void handleErrorResult(Void aVoid, ErrorResultException error) {
+
+
+ public void handleErrorResult(Void aVoid, ErrorResultException error)
+ {
// TODO: I18N
- if(error instanceof OperationTimeoutException)
+ if (error instanceof TimeoutResultException)
{
close(Requests.newUnbindRequest(), "Heart beat timed out");
}
}
- public void handleResult(Void aVoid, Result result) {
+
+
+ public void handleResult(Void aVoid, Result result)
+ {
// Do nothing
}
- private void sendHeartBeat() {
+
+
+ private void sendHeartBeat()
+ {
search(heartBeat, this, null, null);
}
}
- private final class HeartBeatThread extends Thread {
- private HeartBeatThread() {
+
+
+ private final class HeartBeatThread extends Thread
+ {
+ private HeartBeatThread()
+ {
super("Heart Beat Thread");
}
- public void run() {
- while (!stopRequested) {
- synchronized (activeConnections) {
- for (HeartBeatAsynchronousConnection connection : activeConnections) {
+
+
+ public void run()
+ {
+ while (!stopRequested)
+ {
+ synchronized (activeConnections)
+ {
+ for (AsynchronousConnectionImpl connection : activeConnections)
+ {
connection.sendHeartBeat();
}
}
- try {
+ try
+ {
sleep(interval);
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e)
+ {
// Ignore
}
}
}
}
+
+
private final class ConnectionFutureImpl<P> implements
- ConnectionFuture<HeartBeatAsynchronousConnection>,
- ConnectionResultHandler<AsynchronousConnection, Void> {
- private volatile HeartBeatAsynchronousConnection heartBeatConnection;
+ ConnectionFuture<AsynchronousConnection>,
+ ConnectionResultHandler<AsynchronousConnection, Void>
+ {
+ private volatile AsynchronousConnectionImpl heartBeatConnection;
private volatile ErrorResultException exception;
@@ -263,102 +451,124 @@
private final CountDownLatch latch = new CountDownLatch(1);
- private final
- ConnectionResultHandler<? super HeartBeatAsynchronousConnection, P> handler;
+ private final ConnectionResultHandler<? super AsynchronousConnectionImpl, P> handler;
private final P p;
private boolean cancelled;
+
private ConnectionFutureImpl(
- ConnectionResultHandler<
- ? super HeartBeatAsynchronousConnection, P> handler,
- P p) {
+ ConnectionResultHandler<? super AsynchronousConnectionImpl, P> handler,
+ P p)
+ {
this.handler = handler;
this.p = p;
}
- public boolean cancel(boolean mayInterruptIfRunning) {
+
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
cancelled = connectFuture.cancel(mayInterruptIfRunning);
- if (cancelled) {
+ if (cancelled)
+ {
latch.countDown();
}
return cancelled;
}
- public HeartBeatAsynchronousConnection get()
- throws InterruptedException, ErrorResultException {
+
+ public AsynchronousConnectionImpl get()
+ throws InterruptedException, ErrorResultException
+ {
latch.await();
- if (cancelled) {
+ if (cancelled)
+ {
throw new CancellationException();
}
- if (exception != null) {
+ if (exception != null)
+ {
throw exception;
}
return heartBeatConnection;
}
- public HeartBeatAsynchronousConnection get(
- long timeout,
- TimeUnit unit) throws InterruptedException, TimeoutException,
- ErrorResultException {
+
+ public AsynchronousConnectionImpl get(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException,
+ ErrorResultException
+ {
latch.await(timeout, unit);
- if (cancelled) {
+ if (cancelled)
+ {
throw new CancellationException();
}
- if (exception != null) {
+ if (exception != null)
+ {
throw exception;
}
return heartBeatConnection;
}
- public boolean isCancelled() {
+
+ public boolean isCancelled()
+ {
return cancelled;
}
- public boolean isDone() {
+
+ public boolean isDone()
+ {
return latch.getCount() == 0;
}
- public void handleConnection(
- Void v,
- AsynchronousConnection connection) {
- heartBeatConnection = new HeartBeatAsynchronousConnection(connection);
- synchronized (activeConnections) {
+
+ public void handleConnection(Void v,
+ AsynchronousConnection connection)
+ {
+ heartBeatConnection = new AsynchronousConnectionImpl(connection);
+ synchronized (activeConnections)
+ {
connection.addConnectionEventListener(heartBeatConnection);
activeConnections.add(heartBeatConnection);
}
- if (handler != null) {
+ if (handler != null)
+ {
handler.handleConnection(p, heartBeatConnection);
}
latch.countDown();
}
- public void handleConnectionError(Void v, ErrorResultException error) {
+
+ public void handleConnectionError(Void v, ErrorResultException error)
+ {
exception = error;
- if (handler != null) {
+ if (handler != null)
+ {
handler.handleConnectionError(p, error);
}
latch.countDown();
}
}
- public <P> ConnectionFuture<? extends HeartBeatAsynchronousConnection>
- getAsynchronousConnection(
- ConnectionResultHandler<? super
- HeartBeatAsynchronousConnection, P> pConnectionResultHandler, P p) {
- ConnectionFutureImpl<P> future =
- new ConnectionFutureImpl<P>(pConnectionResultHandler, p);
- future.connectFuture =
- parentFactory.getAsynchronousConnection(future, null);
+
+
+ public <P> ConnectionFuture<AsynchronousConnection> getAsynchronousConnection(
+ ConnectionResultHandler<? super AsynchronousConnection, P> handler,
+ P p)
+ {
+ ConnectionFutureImpl<P> future = new ConnectionFutureImpl<P>(
+ handler, p);
+ future.connectFuture = parentFactory.getAsynchronousConnection(
+ future, null);
return future;
}
}
--
Gitblit v1.10.0