From 23b59def043a8f71238ec5d73a393b32fb40f83c Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Thu, 14 Oct 2010 16:15:50 +0000
Subject: [PATCH] Commit from OpenDS, matthew_swift * add unit tests for ConnectionEventListeners.

---
 sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java |  532 ++++++++++++++++------------------------------------------
 1 files changed, 146 insertions(+), 386 deletions(-)

diff --git a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index 381cc43..77a29cb 100644
--- a/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -29,16 +29,23 @@
 
 
 
-import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import org.opends.sdk.requests.*;
-import org.opends.sdk.responses.*;
-import org.opends.sdk.schema.Schema;
+import org.opends.sdk.requests.Requests;
+import org.opends.sdk.requests.SearchRequest;
+import org.opends.sdk.responses.ExtendedResult;
+import org.opends.sdk.responses.Result;
+import org.opends.sdk.responses.SearchResultEntry;
+import org.opends.sdk.responses.SearchResultReference;
 
+import com.sun.opends.sdk.util.AsynchronousConnectionDecorator;
 import com.sun.opends.sdk.util.FutureResultTransformer;
+import com.sun.opends.sdk.util.StaticUtils;
+import com.sun.opends.sdk.util.Validator;
 
 
 
@@ -52,11 +59,10 @@
    * An asynchronous connection that sends heart beats and supports all
    * operations.
    */
-  private final class AsynchronousConnectionImpl implements
-      AsynchronousConnection, ConnectionEventListener, SearchResultHandler
+  private final class AsynchronousConnectionImpl extends
+      AsynchronousConnectionDecorator implements ConnectionEventListener,
+      SearchResultHandler
   {
-    private final AsynchronousConnection connection;
-
     private long lastSuccessfulPing;
 
     private FutureResult<Result> lastPingFuture;
@@ -65,187 +71,24 @@
 
     private AsynchronousConnectionImpl(final AsynchronousConnection connection)
     {
-      this.connection = connection;
+      super(connection);
     }
 
 
 
-    public FutureResult<Void> abandon(final AbandonRequest request)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.abandon(request);
-    }
-
-
-
-    public FutureResult<Result> add(final AddRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.add(request, handler);
-    }
-
-
-
-    public FutureResult<Result> add(final AddRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection
-          .add(request, resultHandler, intermediateResponseHandler);
-    }
-
-
-
-    public void addConnectionEventListener(
-        final ConnectionEventListener listener) throws IllegalStateException,
-        NullPointerException
-    {
-      connection.addConnectionEventListener(listener);
-    }
-
-
-
-    public FutureResult<BindResult> bind(final BindRequest request,
-        final ResultHandler<? super BindResult> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.bind(request, handler);
-    }
-
-
-
-    public FutureResult<BindResult> bind(final BindRequest request,
-        final ResultHandler<? super BindResult> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.bind(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    public void close()
-    {
-      synchronized (activeConnections)
-      {
-        connection.removeConnectionEventListener(this);
-        activeConnections.remove(this);
-      }
-      connection.close();
-    }
-
-
-
-    public void close(final UnbindRequest request, final String reason)
-        throws NullPointerException
-    {
-      synchronized (activeConnections)
-      {
-        connection.removeConnectionEventListener(this);
-        activeConnections.remove(this);
-      }
-      connection.close(request, reason);
-    }
-
-
-
-    public FutureResult<CompareResult> compare(final CompareRequest request,
-        final ResultHandler<? super CompareResult> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.compare(request, handler);
-    }
-
-
-
-    public FutureResult<CompareResult> compare(final CompareRequest request,
-        final ResultHandler<? super CompareResult> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.compare(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
+    @Override
     public void handleConnectionClosed()
     {
-      // Ignore - we intercept close through the close method.
+      notifyClosed();
     }
 
 
 
+    @Override
     public void handleConnectionError(final boolean isDisconnectNotification,
         final ErrorResultException error)
     {
-      synchronized (activeConnections)
-      {
-        connection.removeConnectionEventListener(this);
-        activeConnections.remove(this);
-      }
-    }
-
-
-
-    public void handleUnsolicitedNotification(final ExtendedResult notification)
-    {
-      // Do nothing
-    }
-
-
-
-    public FutureResult<Result> delete(final DeleteRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.delete(request, handler);
-    }
-
-
-
-    public FutureResult<Result> delete(final DeleteRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.delete(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
-        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.extendedRequest(request, handler);
-    }
-
-
-
-    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
-        final ExtendedRequest<R> request,
-        final ResultHandler<? super R> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.extendedRequest(request, resultHandler,
-          intermediateResponseHandler);
+      notifyClosed();
     }
 
 
@@ -253,17 +96,8 @@
     /**
      * {@inheritDoc}
      */
-    public Connection getSynchronousConnection()
-    {
-      return new SynchronousConnection(this);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public boolean handleEntry(SearchResultEntry entry)
+    @Override
+    public boolean handleEntry(final SearchResultEntry entry)
     {
       // Ignore.
       return true;
@@ -271,6 +105,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void handleErrorResult(final ErrorResultException error)
     {
       connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: "
@@ -282,7 +120,8 @@
     /**
      * {@inheritDoc}
      */
-    public boolean handleReference(SearchResultReference reference)
+    @Override
+    public boolean handleReference(final SearchResultReference reference)
     {
       // Ignore.
       return true;
@@ -290,6 +129,10 @@
 
 
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void handleResult(final Result result)
     {
       lastSuccessfulPing = System.currentTimeMillis();
@@ -297,12 +140,10 @@
 
 
 
-    /**
-     * {@inheritDoc}
-     */
-    public boolean isClosed()
+    @Override
+    public void handleUnsolicitedNotification(final ExtendedResult notification)
     {
-      return connection.isClosed();
+      // Do nothing
     }
 
 
@@ -310,55 +151,12 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public boolean isValid()
     {
       return connection.isValid()
           && (lastSuccessfulPing <= 0 || System.currentTimeMillis()
-              - lastSuccessfulPing < unit.toMillis(timeout) * 2);
-    }
-
-
-
-    public FutureResult<Result> modify(final ModifyRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.modify(request, handler);
-    }
-
-
-
-    public FutureResult<Result> modify(final ModifyRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.modify(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
-        final ResultHandler<? super Result> handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.modifyDN(request, handler);
-    }
-
-
-
-    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
-        final ResultHandler<? super Result> resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.modifyDN(request, resultHandler,
-          intermediateResponseHandler);
+              - lastSuccessfulPing < unit.toMillis(interval) * 2);
     }
 
 
@@ -366,108 +164,32 @@
     /**
      * {@inheritDoc}
      */
-    public FutureResult<SearchResultEntry> readEntry(final DN name,
-        final Collection<String> attributeDescriptions,
-        final ResultHandler<? super SearchResultEntry> resultHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.readEntry(name, attributeDescriptions, resultHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public FutureResult<RootDSE> readRootDSE(
-        final ResultHandler<? super RootDSE> handler)
-        throws UnsupportedOperationException, IllegalStateException
-    {
-      return connection.readRootDSE(handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public FutureResult<Schema> readSchema(final DN name,
-        final ResultHandler<? super Schema> handler)
-        throws UnsupportedOperationException, IllegalStateException
-    {
-      return connection.readSchema(name, handler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public FutureResult<Schema> readSchemaForEntry(final DN name,
-        final ResultHandler<? super Schema> handler)
-        throws UnsupportedOperationException, IllegalStateException
-    {
-      return connection.readSchemaForEntry(name, handler);
-    }
-
-
-
-    public void removeConnectionEventListener(
-        final ConnectionEventListener listener) throws NullPointerException
-    {
-      connection.removeConnectionEventListener(listener);
-    }
-
-
-
-    public FutureResult<Result> search(final SearchRequest request,
-        final SearchResultHandler handler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.search(request, handler);
-    }
-
-
-
-    public FutureResult<Result> search(final SearchRequest request,
-        final SearchResultHandler resultHandler,
-        final IntermediateResponseHandler intermediateResponseHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.search(request, resultHandler,
-          intermediateResponseHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
-    public FutureResult<SearchResultEntry> searchSingleEntry(
-        final SearchRequest request,
-        final ResultHandler<? super SearchResultEntry> resultHandler)
-        throws UnsupportedOperationException, IllegalStateException,
-        NullPointerException
-    {
-      return connection.searchSingleEntry(request, resultHandler);
-    }
-
-
-
-    /**
-     * {@inheritDoc}
-     */
+    @Override
     public String toString()
     {
-      StringBuilder builder = new StringBuilder();
+      final StringBuilder builder = new StringBuilder();
       builder.append("HeartBeatConnection(");
       builder.append(connection);
       builder.append(')');
       return builder.toString();
     }
+
+
+
+    private void notifyClosed()
+    {
+      synchronized (activeConnections)
+      {
+        connection.removeConnectionEventListener(this);
+        activeConnections.remove(this);
+
+        if (activeConnections.isEmpty())
+        {
+          // This is the last active connection, so stop the heart beat.
+          heartBeatFuture.cancel(false);
+        }
+      }
+    }
   }
 
 
@@ -497,6 +219,12 @@
       synchronized (activeConnections)
       {
         connection.addConnectionEventListener(heartBeatConnection);
+        if (activeConnections.isEmpty())
+        {
+          // This is the first active connection, so start the heart beat.
+          heartBeatFuture = scheduler.scheduleWithFixedDelay(
+              new HeartBeatRunnable(), 0, interval, unit);
+        }
         activeConnections.add(heartBeatConnection);
       }
       return heartBeatConnection;
@@ -506,12 +234,11 @@
 
 
 
-  private final class HeartBeatThread extends Thread
+  private final class HeartBeatRunnable implements Runnable
   {
-    private HeartBeatThread()
+    private HeartBeatRunnable()
     {
-      super("Heart Beat Thread");
-      this.setDaemon(true);
+      // Nothing to do.
     }
 
 
@@ -519,35 +246,17 @@
     @Override
     public void run()
     {
-      long startTime;
-      while (true)
+      synchronized (activeConnections)
       {
-        startTime = System.currentTimeMillis();
-        synchronized (activeConnections)
+        for (final AsynchronousConnectionImpl connection : activeConnections)
         {
-          for (final AsynchronousConnectionImpl connection : activeConnections)
+          if (connection.lastPingFuture == null
+              || connection.lastPingFuture.isDone())
           {
-            if (connection.lastPingFuture == null
-                || connection.lastPingFuture.isDone())
-            {
-              connection.lastPingFuture = connection.search(heartBeat,
-                  connection, null);
-            }
+            connection.lastPingFuture = connection.search(heartBeat,
+                connection, null);
           }
         }
-        try
-        {
-          final long sleepTime = unit.toMillis(timeout)
-              - (System.currentTimeMillis() - startTime);
-          if (sleepTime > 0)
-          {
-            sleep(sleepTime);
-          }
-        }
-        catch (final InterruptedException e)
-        {
-          // Ignore
-        }
       }
     }
   }
@@ -556,37 +265,58 @@
 
   private final SearchRequest heartBeat;
 
-  private final long timeout;
+  private final long interval;
 
-  // FIXME: use a single global scheduler?
+  private final ScheduledExecutorService scheduler;
 
   private final TimeUnit unit;
 
   private final List<AsynchronousConnectionImpl> activeConnections;
 
-  private final ConnectionFactory parentFactory;
+  private final ConnectionFactory factory;
 
   private static final SearchRequest DEFAULT_SEARCH = Requests
       .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
 
+  private ScheduledFuture<?> heartBeatFuture;
+
 
 
   /**
    * Creates a new heart-beat connection factory which will create connections
    * using the provided connection factory and periodically ping any created
-   * connections in order to detect that they are still alive.
+   * connections in order to detect that they are still alive every 10 seconds
+   * using the default scheduler.
    *
-   * @param connectionFactory
+   * @param factory
    *          The connection factory to use for creating connections.
-   * @param timeout
-   *          The time to wait between keepalive pings.
-   * @param unit
-   *          The time unit of the timeout argument.
    */
-  HeartBeatConnectionFactory(final ConnectionFactory connectionFactory,
-      final long timeout, final TimeUnit unit)
+  HeartBeatConnectionFactory(final ConnectionFactory factory)
   {
-    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
+    this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils
+        .getDefaultScheduler());
+  }
+
+
+
+  /**
+   * Creates a new heart-beat connection factory which will create connections
+   * using the provided connection factory and periodically ping any created
+   * connections in order to detect that they are still alive using the
+   * specified frequency and the default scheduler.
+   *
+   * @param factory
+   *          The connection factory to use for creating connections.
+   * @param interval
+   *          The interval between keepalive pings.
+   * @param unit
+   *          The time unit for the interval between keepalive pings.
+   */
+  HeartBeatConnectionFactory(final ConnectionFactory factory,
+      final long interval, final TimeUnit unit)
+  {
+    this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils
+        .getDefaultScheduler());
   }
 
 
@@ -597,25 +327,54 @@
    * connections using the specified search request in order to detect that they
    * are still alive.
    *
-   * @param connectionFactory
+   * @param factory
    *          The connection factory to use for creating connections.
-   * @param timeout
-   *          The time to wait between keepalive pings.
+   * @param interval
+   *          The interval between keepalive pings.
    * @param unit
-   *          The time unit of the timeout argument.
+   *          The time unit for the interval between keepalive pings.
    * @param heartBeat
-   *          The search request to use when pinging connections.
+   *          The search request to use for keepalive pings.
    */
-  HeartBeatConnectionFactory(final ConnectionFactory connectionFactory,
-      final long timeout, final TimeUnit unit, final SearchRequest heartBeat)
+  HeartBeatConnectionFactory(final ConnectionFactory factory,
+      final long interval, final TimeUnit unit, final SearchRequest heartBeat)
   {
+    this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler());
+  }
+
+
+
+  /**
+   * Creates a new heart-beat connection factory which will create connections
+   * using the provided connection factory and periodically ping any created
+   * connections using the specified search request in order to detect that they
+   * are still alive.
+   *
+   * @param factory
+   *          The connection factory to use for creating connections.
+   * @param interval
+   *          The interval between keepalive pings.
+   * @param unit
+   *          The time unit for the interval between keepalive pings.
+   * @param heartBeat
+   *          The search request to use for keepalive pings.
+   * @param scheduler
+   *          The scheduler which should for periodically sending keepalive
+   *          pings.
+   */
+  HeartBeatConnectionFactory(final ConnectionFactory factory,
+      final long interval, final TimeUnit unit, final SearchRequest heartBeat,
+      final ScheduledExecutorService scheduler)
+  {
+    Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
+    Validator.ensureTrue(interval >= 0, "negative timeout");
+
     this.heartBeat = heartBeat;
-    this.timeout = timeout;
+    this.interval = interval;
     this.unit = unit;
     this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
-    this.parentFactory = connectionFactory;
-
-    new HeartBeatThread().start();
+    this.factory = factory;
+    this.scheduler = scheduler;
   }
 
 
@@ -625,7 +384,7 @@
       final ResultHandler<? super AsynchronousConnection> handler)
   {
     final FutureResultImpl future = new FutureResultImpl(handler);
-    future.setFutureResult(parentFactory.getAsynchronousConnection(future));
+    future.setFutureResult(factory.getAsynchronousConnection(future));
     return future;
   }
 
@@ -634,11 +393,12 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public String toString()
   {
     final StringBuilder builder = new StringBuilder();
     builder.append("HeartBeatConnectionFactory(");
-    builder.append(String.valueOf(parentFactory));
+    builder.append(String.valueOf(factory));
     builder.append(')');
     return builder.toString();
   }

--
Gitblit v1.10.0