mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
29.31.2011 32034d853f3a284424ccfa87b6de210f1ca814e1
opendj3/opendj-ldap-sdk/src/main/java/org/forgerock/opendj/ldap/HeartBeatConnectionFactory.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS.
 */
package org.forgerock.opendj.ldap;
@@ -42,7 +43,10 @@
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
import com.forgerock.opendj.util.*;
import com.forgerock.opendj.util.ConnectionDecorator;
import com.forgerock.opendj.util.FutureResultTransformer;
import com.forgerock.opendj.util.StaticUtils;
import com.forgerock.opendj.util.Validator;
@@ -50,15 +54,13 @@
 * An heart beat connection factory can be used to create connections that sends
 * a periodic search request to a Directory Server.
 */
final class HeartBeatConnectionFactory extends AbstractConnectionFactory
final class HeartBeatConnectionFactory implements ConnectionFactory
{
  /**
   * An asynchronous connection that sends heart beats and supports all
   * operations.
   * A connection that sends heart beats and supports all operations.
   */
  private final class AsynchronousConnectionImpl extends
      AsynchronousConnectionDecorator implements ConnectionEventListener,
      SearchResultHandler
  private final class ConnectionImpl extends ConnectionDecorator implements
      ConnectionEventListener, SearchResultHandler
  {
    private long lastSuccessfulPing;
@@ -66,7 +68,7 @@
    private AsynchronousConnectionImpl(final AsynchronousConnection connection)
    private ConnectionImpl(final Connection connection)
    {
      super(connection);
    }
@@ -192,12 +194,11 @@
  private final class FutureResultImpl extends
      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
      implements ResultHandler<AsynchronousConnection>
      FutureResultTransformer<Connection, Connection> implements
      ResultHandler<Connection>
  {
    private FutureResultImpl(
        final ResultHandler<? super AsynchronousConnection> handler)
    private FutureResultImpl(final ResultHandler<? super Connection> handler)
    {
      super(handler);
    }
@@ -208,23 +209,10 @@
     * {@inheritDoc}
     */
    @Override
    protected AsynchronousConnection transformResult(
        final AsynchronousConnection connection) throws ErrorResultException
    protected Connection transformResult(final Connection connection)
        throws ErrorResultException
    {
      final AsynchronousConnectionImpl heartBeatConnection = new AsynchronousConnectionImpl(
          connection);
      synchronized (activeConnections)
      {
        connection.addConnectionEventListener(heartBeatConnection);
        if (activeConnections.isEmpty())
        {
          // This is the first active connection, so start the heart beat.
          heartBeatFuture = scheduler.scheduleWithFixedDelay(
              new HeartBeatRunnable(), 0, interval, unit);
        }
        activeConnections.add(heartBeatConnection);
      }
      return heartBeatConnection;
      return adaptConnection(connection);
    }
  }
@@ -245,13 +233,13 @@
    {
      synchronized (activeConnections)
      {
        for (final AsynchronousConnectionImpl connection : activeConnections)
        for (final ConnectionImpl connection : activeConnections)
        {
          if (connection.lastPingFuture == null
              || connection.lastPingFuture.isDone())
          {
            connection.lastPingFuture = connection.search(heartBeat,
                connection, null);
            connection.lastPingFuture = connection.searchAsync(heartBeat, null,
                connection);
          }
        }
      }
@@ -268,7 +256,7 @@
  private final TimeUnit unit;
  private final List<AsynchronousConnectionImpl> activeConnections;
  private final List<ConnectionImpl> activeConnections;
  private final ConnectionFactory factory;
@@ -369,19 +357,34 @@
    this.heartBeat = heartBeat;
    this.interval = interval;
    this.unit = unit;
    this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
    this.activeConnections = new LinkedList<ConnectionImpl>();
    this.factory = factory;
    this.scheduler = scheduler;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<? super AsynchronousConnection> handler)
  public Connection getConnection() throws ErrorResultException,
      InterruptedException
  {
    return adaptConnection(factory.getConnection());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public FutureResult<Connection> getConnectionAsync(
      final ResultHandler<? super Connection> handler)
  {
    final FutureResultImpl future = new FutureResultImpl(handler);
    future.setFutureResult(factory.getAsynchronousConnection(future));
    future.setFutureResult(factory.getConnectionAsync(future));
    return future;
  }
@@ -399,4 +402,23 @@
    builder.append(')');
    return builder.toString();
  }
  private Connection adaptConnection(final Connection connection)
  {
    final ConnectionImpl heartBeatConnection = new ConnectionImpl(connection);
    synchronized (activeConnections)
    {
      connection.addConnectionEventListener(heartBeatConnection);
      if (activeConnections.isEmpty())
      {
        // This is the first active connection, so start the heart beat.
        heartBeatFuture = scheduler.scheduleWithFixedDelay(
            new HeartBeatRunnable(), 0, interval, unit);
      }
      activeConnections.add(heartBeatConnection);
    }
    return heartBeatConnection;
  }
}