mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
22.37.2009 9d6f5c9a5b7771e595892e6935cf1cc42012c4c6
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,14 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.Validator;
@@ -46,10 +47,14 @@
 * An heart beat connection factory can be used to create connections
 * that sends a periodic search request to a Directory Server.
 */
public class HeartBeatConnectionFactory extends
final class HeartBeatConnectionFactory extends
    AbstractConnectionFactory<AsynchronousConnection>
{
  private final int interval;
  private final SearchRequest heartBeat;
  private final long timeout;
  private final TimeUnit unit;
  private final List<AsynchronousConnectionImpl> activeConnections;
@@ -59,7 +64,30 @@
  // FIXME: use a single global scheduler?
  // FIXME: change timeout parameters to long+TimeUnit.
  /**
   * Creates a new heart-beat connection factory which will create
   * connections using the provided connection factory and periodically
   * ping any created connections in order to detect that they are still
   * alive.
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   */
  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
      long timeout, TimeUnit unit)
  {
    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
  }
  private static final SearchRequest DEFAULT_SEARCH = Requests
      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
          "1.1");
@@ -71,15 +99,19 @@
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param interval
   *          The period between keepalive pings.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   * @param heartBeat
   *          The search request to use when pinging connections.
   */
  public HeartBeatConnectionFactory(
      ConnectionFactory<?> connectionFactory,
      int interval)
  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
      long timeout, TimeUnit unit, SearchRequest heartBeat)
  {
    Validator.ensureNotNull(connectionFactory);
    this.interval = interval;
    this.heartBeat = heartBeat;
    this.timeout = timeout;
    this.unit = unit;
    this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
    this.parentFactory = connectionFactory;
@@ -93,9 +125,11 @@
   * operations.
   */
  private final class AsynchronousConnectionImpl implements
      AsynchronousConnection, ConnectionEventListener
      AsynchronousConnection, ConnectionEventListener, ResultHandler<Result>
  {
    private final AsynchronousConnection connection;
    private long lastSuccessfulPing;
    private FutureResult<Result> lastPingFuture;
    private AsynchronousConnectionImpl(AsynchronousConnection connection)
@@ -316,9 +350,9 @@
     */
    public boolean isValid()
    {
      // Avoid extra pings... Let the next ping find out if this connection
      // is still valid.
      return connection.isValid();
      return connection.isValid() && (lastSuccessfulPing <= 0 ||
          System.currentTimeMillis() - lastSuccessfulPing <
              unit.toMillis(timeout) * 2);
    }
    public void connectionReceivedUnsolicitedNotification(
@@ -338,6 +372,15 @@
        activeConnections.remove(this);
      }
    }
    public void handleErrorResult(ErrorResultException error) {
      connection.close(Requests.newUnbindRequest(),
          "Heartbeat retured error: " + error);
    }
    public void handleResult(Result result) {
        lastSuccessfulPing = System.currentTimeMillis();
    }
  }
@@ -354,22 +397,26 @@
    public void run()
    {
      long startTime;
      while(true)
      {
        startTime = System.currentTimeMillis();
        synchronized (activeConnections)
        {
          for (AsynchronousConnectionImpl connection : activeConnections)
          {
            if(!connection.isValid())
            if(connection.lastPingFuture == null ||
                connection.lastPingFuture.isDone())
            {
              connection.close(Requests.newUnbindRequest(),
                               "Connection no longer valid");
              connection.lastPingFuture =
                  connection.search(heartBeat, connection, null);
            }
          }
        }
        try
        {
          sleep(interval);
          sleep(unit.toMillis(timeout) -
              (System.currentTimeMillis() - startTime));
        }
        catch (InterruptedException e)
        {