mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
14.15.2010 23b59def043a8f71238ec5d73a393b32fb40f83c
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -29,16 +29,23 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import org.opends.sdk.requests.Requests;
import org.opends.sdk.requests.SearchRequest;
import org.opends.sdk.responses.ExtendedResult;
import org.opends.sdk.responses.Result;
import org.opends.sdk.responses.SearchResultEntry;
import org.opends.sdk.responses.SearchResultReference;
import com.sun.opends.sdk.util.AsynchronousConnectionDecorator;
import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.StaticUtils;
import com.sun.opends.sdk.util.Validator;
@@ -52,11 +59,10 @@
   * An asynchronous connection that sends heart beats and supports all
   * operations.
   */
  private final class AsynchronousConnectionImpl implements
      AsynchronousConnection, ConnectionEventListener, SearchResultHandler
  private final class AsynchronousConnectionImpl extends
      AsynchronousConnectionDecorator implements ConnectionEventListener,
      SearchResultHandler
  {
    private final AsynchronousConnection connection;
    private long lastSuccessfulPing;
    private FutureResult<Result> lastPingFuture;
@@ -65,187 +71,24 @@
    private AsynchronousConnectionImpl(final AsynchronousConnection connection)
    {
      this.connection = connection;
      super(connection);
    }
    public FutureResult<Void> abandon(final AbandonRequest request)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.abandon(request);
    }
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.add(request, handler);
    }
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection
          .add(request, resultHandler, intermediateResponseHandler);
    }
    public void addConnectionEventListener(
        final ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      connection.addConnectionEventListener(listener);
    }
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.bind(request, handler);
    }
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.bind(request, resultHandler,
          intermediateResponseHandler);
    }
    public void close()
    {
      synchronized (activeConnections)
      {
        connection.removeConnectionEventListener(this);
        activeConnections.remove(this);
      }
      connection.close();
    }
    public void close(final UnbindRequest request, final String reason)
        throws NullPointerException
    {
      synchronized (activeConnections)
      {
        connection.removeConnectionEventListener(this);
        activeConnections.remove(this);
      }
      connection.close(request, reason);
    }
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.compare(request, handler);
    }
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.compare(request, resultHandler,
          intermediateResponseHandler);
    }
    @Override
    public void handleConnectionClosed()
    {
      // Ignore - we intercept close through the close method.
      notifyClosed();
    }
    @Override
    public void handleConnectionError(final boolean isDisconnectNotification,
        final ErrorResultException error)
    {
      synchronized (activeConnections)
      {
        connection.removeConnectionEventListener(this);
        activeConnections.remove(this);
      }
    }
    public void handleUnsolicitedNotification(final ExtendedResult notification)
    {
      // Do nothing
    }
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.delete(request, handler);
    }
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.delete(request, resultHandler,
          intermediateResponseHandler);
    }
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.extendedRequest(request, handler);
    }
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request,
        final ResultHandler<? super R> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.extendedRequest(request, resultHandler,
          intermediateResponseHandler);
      notifyClosed();
    }
@@ -253,17 +96,8 @@
    /**
     * {@inheritDoc}
     */
    public Connection getSynchronousConnection()
    {
      return new SynchronousConnection(this);
    }
    /**
     * {@inheritDoc}
     */
    public boolean handleEntry(SearchResultEntry entry)
    @Override
    public boolean handleEntry(final SearchResultEntry entry)
    {
      // Ignore.
      return true;
@@ -271,6 +105,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleErrorResult(final ErrorResultException error)
    {
      connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: "
@@ -282,7 +120,8 @@
    /**
     * {@inheritDoc}
     */
    public boolean handleReference(SearchResultReference reference)
    @Override
    public boolean handleReference(final SearchResultReference reference)
    {
      // Ignore.
      return true;
@@ -290,6 +129,10 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public void handleResult(final Result result)
    {
      lastSuccessfulPing = System.currentTimeMillis();
@@ -297,12 +140,10 @@
    /**
     * {@inheritDoc}
     */
    public boolean isClosed()
    @Override
    public void handleUnsolicitedNotification(final ExtendedResult notification)
    {
      return connection.isClosed();
      // Do nothing
    }
@@ -310,55 +151,12 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isValid()
    {
      return connection.isValid()
          && (lastSuccessfulPing <= 0 || System.currentTimeMillis()
              - lastSuccessfulPing < unit.toMillis(timeout) * 2);
    }
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modify(request, handler);
    }
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modify(request, resultHandler,
          intermediateResponseHandler);
    }
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modifyDN(request, handler);
    }
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<? super Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modifyDN(request, resultHandler,
          intermediateResponseHandler);
              - lastSuccessfulPing < unit.toMillis(interval) * 2);
    }
@@ -366,108 +164,32 @@
    /**
     * {@inheritDoc}
     */
    public FutureResult<SearchResultEntry> readEntry(final DN name,
        final Collection<String> attributeDescriptions,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.readEntry(name, attributeDescriptions, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<RootDSE> readRootDSE(
        final ResultHandler<? super RootDSE> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      return connection.readRootDSE(handler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<Schema> readSchema(final DN name,
        final ResultHandler<? super Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      return connection.readSchema(name, handler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<Schema> readSchemaForEntry(final DN name,
        final ResultHandler<? super Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      return connection.readSchemaForEntry(name, handler);
    }
    public void removeConnectionEventListener(
        final ConnectionEventListener listener) throws NullPointerException
    {
      connection.removeConnectionEventListener(listener);
    }
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.search(request, handler);
    }
    public FutureResult<Result> search(final SearchRequest request,
        final SearchResultHandler resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.search(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<SearchResultEntry> searchSingleEntry(
        final SearchRequest request,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.searchSingleEntry(request, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String toString()
    {
      StringBuilder builder = new StringBuilder();
      final StringBuilder builder = new StringBuilder();
      builder.append("HeartBeatConnection(");
      builder.append(connection);
      builder.append(')');
      return builder.toString();
    }
    private void notifyClosed()
    {
      synchronized (activeConnections)
      {
        connection.removeConnectionEventListener(this);
        activeConnections.remove(this);
        if (activeConnections.isEmpty())
        {
          // This is the last active connection, so stop the heart beat.
          heartBeatFuture.cancel(false);
        }
      }
    }
  }
@@ -497,6 +219,12 @@
      synchronized (activeConnections)
      {
        connection.addConnectionEventListener(heartBeatConnection);
        if (activeConnections.isEmpty())
        {
          // This is the first active connection, so start the heart beat.
          heartBeatFuture = scheduler.scheduleWithFixedDelay(
              new HeartBeatRunnable(), 0, interval, unit);
        }
        activeConnections.add(heartBeatConnection);
      }
      return heartBeatConnection;
@@ -506,12 +234,11 @@
  private final class HeartBeatThread extends Thread
  private final class HeartBeatRunnable implements Runnable
  {
    private HeartBeatThread()
    private HeartBeatRunnable()
    {
      super("Heart Beat Thread");
      this.setDaemon(true);
      // Nothing to do.
    }
@@ -519,35 +246,17 @@
    @Override
    public void run()
    {
      long startTime;
      while (true)
      synchronized (activeConnections)
      {
        startTime = System.currentTimeMillis();
        synchronized (activeConnections)
        for (final AsynchronousConnectionImpl connection : activeConnections)
        {
          for (final AsynchronousConnectionImpl connection : activeConnections)
          if (connection.lastPingFuture == null
              || connection.lastPingFuture.isDone())
          {
            if (connection.lastPingFuture == null
                || connection.lastPingFuture.isDone())
            {
              connection.lastPingFuture = connection.search(heartBeat,
                  connection, null);
            }
            connection.lastPingFuture = connection.search(heartBeat,
                connection, null);
          }
        }
        try
        {
          final long sleepTime = unit.toMillis(timeout)
              - (System.currentTimeMillis() - startTime);
          if (sleepTime > 0)
          {
            sleep(sleepTime);
          }
        }
        catch (final InterruptedException e)
        {
          // Ignore
        }
      }
    }
  }
@@ -556,37 +265,58 @@
  private final SearchRequest heartBeat;
  private final long timeout;
  private final long interval;
  // FIXME: use a single global scheduler?
  private final ScheduledExecutorService scheduler;
  private final TimeUnit unit;
  private final List<AsynchronousConnectionImpl> activeConnections;
  private final ConnectionFactory parentFactory;
  private final ConnectionFactory factory;
  private static final SearchRequest DEFAULT_SEARCH = Requests
      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
  private ScheduledFuture<?> heartBeatFuture;
  /**
   * Creates a new heart-beat connection factory which will create connections
   * using the provided connection factory and periodically ping any created
   * connections in order to detect that they are still alive.
   * connections in order to detect that they are still alive every 10 seconds
   * using the default scheduler.
   *
   * @param connectionFactory
   * @param factory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   */
  HeartBeatConnectionFactory(final ConnectionFactory connectionFactory,
      final long timeout, final TimeUnit unit)
  HeartBeatConnectionFactory(final ConnectionFactory factory)
  {
    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
    this(factory, 10, TimeUnit.SECONDS, DEFAULT_SEARCH, StaticUtils
        .getDefaultScheduler());
  }
  /**
   * Creates a new heart-beat connection factory which will create connections
   * using the provided connection factory and periodically ping any created
   * connections in order to detect that they are still alive using the
   * specified frequency and the default scheduler.
   *
   * @param factory
   *          The connection factory to use for creating connections.
   * @param interval
   *          The interval between keepalive pings.
   * @param unit
   *          The time unit for the interval between keepalive pings.
   */
  HeartBeatConnectionFactory(final ConnectionFactory factory,
      final long interval, final TimeUnit unit)
  {
    this(factory, interval, unit, DEFAULT_SEARCH, StaticUtils
        .getDefaultScheduler());
  }
@@ -597,25 +327,54 @@
   * connections using the specified search request in order to detect that they
   * are still alive.
   *
   * @param connectionFactory
   * @param factory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param interval
   *          The interval between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   *          The time unit for the interval between keepalive pings.
   * @param heartBeat
   *          The search request to use when pinging connections.
   *          The search request to use for keepalive pings.
   */
  HeartBeatConnectionFactory(final ConnectionFactory connectionFactory,
      final long timeout, final TimeUnit unit, final SearchRequest heartBeat)
  HeartBeatConnectionFactory(final ConnectionFactory factory,
      final long interval, final TimeUnit unit, final SearchRequest heartBeat)
  {
    this(factory, interval, unit, heartBeat, StaticUtils.getDefaultScheduler());
  }
  /**
   * Creates a new heart-beat connection factory which will create connections
   * using the provided connection factory and periodically ping any created
   * connections using the specified search request in order to detect that they
   * are still alive.
   *
   * @param factory
   *          The connection factory to use for creating connections.
   * @param interval
   *          The interval between keepalive pings.
   * @param unit
   *          The time unit for the interval between keepalive pings.
   * @param heartBeat
   *          The search request to use for keepalive pings.
   * @param scheduler
   *          The scheduler which should for periodically sending keepalive
   *          pings.
   */
  HeartBeatConnectionFactory(final ConnectionFactory factory,
      final long interval, final TimeUnit unit, final SearchRequest heartBeat,
      final ScheduledExecutorService scheduler)
  {
    Validator.ensureNotNull(factory, heartBeat, unit, scheduler);
    Validator.ensureTrue(interval >= 0, "negative timeout");
    this.heartBeat = heartBeat;
    this.timeout = timeout;
    this.interval = interval;
    this.unit = unit;
    this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
    this.parentFactory = connectionFactory;
    new HeartBeatThread().start();
    this.factory = factory;
    this.scheduler = scheduler;
  }
@@ -625,7 +384,7 @@
      final ResultHandler<? super AsynchronousConnection> handler)
  {
    final FutureResultImpl future = new FutureResultImpl(handler);
    future.setFutureResult(parentFactory.getAsynchronousConnection(future));
    future.setFutureResult(factory.getAsynchronousConnection(future));
    return future;
  }
@@ -634,11 +393,12 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    final StringBuilder builder = new StringBuilder();
    builder.append("HeartBeatConnectionFactory(");
    builder.append(String.valueOf(parentFactory));
    builder.append(String.valueOf(factory));
    builder.append(')');
    return builder.toString();
  }