mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
28.47.2010 f2160f4bd1c8ac67e5a86a6710d431e8932877f9
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -43,89 +43,17 @@
/**
 * An heart beat connection factory can be used to create connections
 * that sends a periodic search request to a Directory Server.
 * An heart beat connection factory can be used to create connections that sends
 * a periodic search request to a Directory Server.
 */
final class HeartBeatConnectionFactory extends
    AbstractConnectionFactory
final class HeartBeatConnectionFactory extends AbstractConnectionFactory
{
  private final SearchRequest heartBeat;
  private final long timeout;
  private final TimeUnit unit;
  private final List<AsynchronousConnectionImpl> activeConnections;
  private final ConnectionFactory parentFactory;
  // FIXME: use a single global scheduler?
  /**
   * Creates a new heart-beat connection factory which will create
   * connections using the provided connection factory and periodically
   * ping any created connections in order to detect that they are still
   * alive.
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   */
  HeartBeatConnectionFactory(ConnectionFactory connectionFactory,
      long timeout, TimeUnit unit)
  {
    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
  }
  private static final SearchRequest DEFAULT_SEARCH = Requests
      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
          "1.1");
  /**
   * Creates a new heart-beat connection factory which will create
   * connections using the provided connection factory and periodically
   * ping any created connections using the specified search request in
   * order to detect that they are still alive.
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   * @param heartBeat
   *          The search request to use when pinging connections.
   */
  HeartBeatConnectionFactory(ConnectionFactory connectionFactory,
      long timeout, TimeUnit unit, SearchRequest heartBeat)
  {
    this.heartBeat = heartBeat;
    this.timeout = timeout;
    this.unit = unit;
    this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
    this.parentFactory = connectionFactory;
    new HeartBeatThread().start();
  }
  /**
   * An asynchronous connection that sends heart beats and supports all
   * operations.
   */
  private final class AsynchronousConnectionImpl implements
      AsynchronousConnection, ConnectionEventListener,
      ResultHandler<Result>
      AsynchronousConnection, ConnectionEventListener, ResultHandler<Result>
  {
    private final AsynchronousConnection connection;
@@ -135,24 +63,24 @@
    private AsynchronousConnectionImpl(AsynchronousConnection connection)
    private AsynchronousConnectionImpl(final AsynchronousConnection connection)
    {
      this.connection = connection;
    }
    public void abandon(AbandonRequest request)
    public FutureResult<Void> abandon(final AbandonRequest request)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      connection.abandon(request);
      return connection.abandon(request);
    }
    public FutureResult<Result> add(AddRequest request,
        ResultHandler<Result> handler)
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
@@ -161,8 +89,29 @@
    public FutureResult<BindResult> bind(BindRequest request,
        ResultHandler<? super BindResult> handler)
    public FutureResult<Result> add(final AddRequest request,
        final ResultHandler<Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection
          .add(request, resultHandler, intermediateResponseHandler);
    }
    public void addConnectionEventListener(
        final ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      connection.addConnectionEventListener(listener);
    }
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
@@ -171,6 +120,18 @@
    public FutureResult<BindResult> bind(final BindRequest request,
        final ResultHandler<? super BindResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.bind(request, resultHandler,
          intermediateResponseHandler);
    }
    public void close()
    {
      synchronized (activeConnections)
@@ -183,7 +144,7 @@
    public void close(UnbindRequest request, String reason)
    public void close(final UnbindRequest request, final String reason)
        throws NullPointerException
    {
      synchronized (activeConnections)
@@ -196,8 +157,8 @@
    public FutureResult<CompareResult> compare(CompareRequest request,
        ResultHandler<? super CompareResult> handler)
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
@@ -206,8 +167,47 @@
    public FutureResult<Result> delete(DeleteRequest request,
        ResultHandler<Result> handler)
    public FutureResult<CompareResult> compare(final CompareRequest request,
        final ResultHandler<? super CompareResult> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.compare(request, resultHandler,
          intermediateResponseHandler);
    }
    public void connectionClosed()
    {
      // Ignore - we intercept close through the close method.
    }
    public void connectionErrorOccurred(final boolean isDisconnectNotification,
        final ErrorResultException error)
    {
      synchronized (activeConnections)
      {
        connection.removeConnectionEventListener(this);
        activeConnections.remove(this);
      }
    }
    public void connectionReceivedUnsolicitedNotification(
        final ExtendedResult notification)
    {
      // Do nothing
    }
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
@@ -216,8 +216,20 @@
    public <R extends Result> FutureResult<R> extendedRequest(
        ExtendedRequest<R> request, ResultHandler<? super R> handler)
    public FutureResult<Result> delete(final DeleteRequest request,
        final ResultHandler<Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.delete(request, resultHandler,
          intermediateResponseHandler);
    }
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request, final ResultHandler<? super R> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
@@ -226,34 +238,15 @@
    public FutureResult<Result> modify(ModifyRequest request,
        ResultHandler<Result> handler)
    public <R extends ExtendedResult> FutureResult<R> extendedRequest(
        final ExtendedRequest<R> request,
        final ResultHandler<? super R> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modify(request, handler);
    }
    public FutureResult<Result> modifyDN(ModifyDNRequest request,
        ResultHandler<Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modifyDN(request, handler);
    }
    public FutureResult<Result> search(SearchRequest request,
        ResultHandler<Result> resultHandler,
        SearchResultHandler searchResultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.search(request, resultHandler,
          searchResultHandler);
      return connection.extendedRequest(request, resultHandler,
          intermediateResponseHandler);
    }
@@ -261,81 +254,24 @@
    /**
     * {@inheritDoc}
     */
    public FutureResult<SearchResultEntry> readEntry(DN name,
        Collection<String> attributeDescriptions,
        ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    public Connection getSynchronousConnection()
    {
      return connection.readEntry(name, attributeDescriptions,
          resultHandler);
      return new SynchronousConnection(this);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<SearchResultEntry> searchSingleEntry(
        SearchRequest request,
        ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    public void handleErrorResult(final ErrorResultException error)
    {
      return connection.searchSingleEntry(request, resultHandler);
      connection.close(Requests.newUnbindRequest(), "Heartbeat retured error: "
          + error);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<RootDSE> readRootDSE(
        ResultHandler<RootDSE> handler)
        throws UnsupportedOperationException, IllegalStateException
    public void handleResult(final Result result)
    {
      return connection.readRootDSE(handler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<Schema> readSchemaForEntry(DN name,
        ResultHandler<Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      return connection.readSchemaForEntry(name, handler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<Schema> readSchema(DN name,
        ResultHandler<Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      return connection.readSchema(name, handler);
    }
    public void addConnectionEventListener(
        ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      connection.addConnectionEventListener(listener);
    }
    public void removeConnectionEventListener(
        ConnectionEventListener listener) throws NullPointerException
    {
      connection.removeConnectionEventListener(listener);
      lastSuccessfulPing = System.currentTimeMillis();
    }
@@ -362,38 +298,178 @@
    public void connectionReceivedUnsolicitedNotification(
        GenericExtendedResult notification)
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      // Do nothing
      return connection.modify(request, handler);
    }
    public void connectionErrorOccurred(
        boolean isDisconnectNotification, ErrorResultException error)
    public FutureResult<Result> modify(final ModifyRequest request,
        final ResultHandler<Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modify(request, resultHandler,
          intermediateResponseHandler);
    }
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<Result> handler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modifyDN(request, handler);
    }
    public FutureResult<Result> modifyDN(final ModifyDNRequest request,
        final ResultHandler<Result> resultHandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.modifyDN(request, resultHandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<SearchResultEntry> readEntry(final DN name,
        final Collection<String> attributeDescriptions,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.readEntry(name, attributeDescriptions, resultHandler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<RootDSE> readRootDSE(
        final ResultHandler<RootDSE> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      return connection.readRootDSE(handler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<Schema> readSchema(final DN name,
        final ResultHandler<Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      return connection.readSchema(name, handler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<Schema> readSchemaForEntry(final DN name,
        final ResultHandler<Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      return connection.readSchemaForEntry(name, handler);
    }
    public void removeConnectionEventListener(
        final ConnectionEventListener listener) throws NullPointerException
    {
      connection.removeConnectionEventListener(listener);
    }
    public FutureResult<Result> search(final SearchRequest request,
        final ResultHandler<Result> resultHandler,
        final SearchResultHandler searchResultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.search(request, resultHandler, searchResultHandler);
    }
    public FutureResult<Result> search(final SearchRequest request,
        final ResultHandler<Result> resultHandler,
        final SearchResultHandler searchResulthandler,
        final IntermediateResponseHandler intermediateResponseHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.search(request, resultHandler, searchResulthandler,
          intermediateResponseHandler);
    }
    /**
     * {@inheritDoc}
     */
    public FutureResult<SearchResultEntry> searchSingleEntry(
        final SearchRequest request,
        final ResultHandler<? super SearchResultEntry> resultHandler)
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      return connection.searchSingleEntry(request, resultHandler);
    }
  }
  private final class FutureResultImpl extends
      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
      implements FutureResult<AsynchronousConnection>,
      ResultHandler<AsynchronousConnection>
  {
    private FutureResultImpl(
        final ResultHandler<? super AsynchronousConnection> handler)
    {
      super(handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    protected AsynchronousConnection transformResult(
        final AsynchronousConnection connection) throws ErrorResultException
    {
      final AsynchronousConnectionImpl heartBeatConnection = new AsynchronousConnectionImpl(
          connection);
      synchronized (activeConnections)
      {
        connection.removeConnectionEventListener(this);
        activeConnections.remove(this);
        connection.addConnectionEventListener(heartBeatConnection);
        activeConnections.add(heartBeatConnection);
      }
      return heartBeatConnection;
    }
    public void handleErrorResult(ErrorResultException error)
    {
      connection.close(Requests.newUnbindRequest(),
          "Heartbeat retured error: " + error);
    }
    public void handleResult(Result result)
    {
      lastSuccessfulPing = System.currentTimeMillis();
    }
  }
@@ -408,6 +484,7 @@
    @Override
    public void run()
    {
      long startTime;
@@ -416,7 +493,7 @@
        startTime = System.currentTimeMillis();
        synchronized (activeConnections)
        {
          for (AsynchronousConnectionImpl connection : activeConnections)
          for (final AsynchronousConnectionImpl connection : activeConnections)
          {
            if (connection.lastPingFuture == null
                || connection.lastPingFuture.isDone())
@@ -428,10 +505,14 @@
        }
        try
        {
          sleep(unit.toMillis(timeout)
              - (System.currentTimeMillis() - startTime));
          final long sleepTime = unit.toMillis(timeout)
              - (System.currentTimeMillis() - startTime);
          if (sleepTime > 0)
          {
            sleep(sleepTime);
          }
        }
        catch (InterruptedException e)
        catch (final InterruptedException e)
        {
          // Ignore
        }
@@ -441,47 +522,78 @@
  private final class FutureResultImpl
      extends
      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
      implements FutureResult<AsynchronousConnection>,
      ResultHandler<AsynchronousConnection>
  private final SearchRequest heartBeat;
  private final long timeout;
  // FIXME: use a single global scheduler?
  private final TimeUnit unit;
  private final List<AsynchronousConnectionImpl> activeConnections;
  private final ConnectionFactory parentFactory;
  private static final SearchRequest DEFAULT_SEARCH = Requests
      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)", "1.1");
  /**
   * Creates a new heart-beat connection factory which will create connections
   * using the provided connection factory and periodically ping any created
   * connections in order to detect that they are still alive.
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   */
  HeartBeatConnectionFactory(final ConnectionFactory connectionFactory,
      final long timeout, final TimeUnit unit)
  {
    private FutureResultImpl(
        ResultHandler<? super AsynchronousConnection> handler)
    {
      super(handler);
    }
    /**
     * {@inheritDoc}
     */
    protected AsynchronousConnection transformResult(
        AsynchronousConnection connection) throws ErrorResultException
    {
      AsynchronousConnectionImpl heartBeatConnection = new AsynchronousConnectionImpl(
          connection);
      synchronized (activeConnections)
      {
        connection.addConnectionEventListener(heartBeatConnection);
        activeConnections.add(heartBeatConnection);
      }
      return heartBeatConnection;
    }
    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
  }
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      ResultHandler<AsynchronousConnection> handler)
  /**
   * Creates a new heart-beat connection factory which will create connections
   * using the provided connection factory and periodically ping any created
   * connections using the specified search request in order to detect that they
   * are still alive.
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   * @param heartBeat
   *          The search request to use when pinging connections.
   */
  HeartBeatConnectionFactory(final ConnectionFactory connectionFactory,
      final long timeout, final TimeUnit unit, final SearchRequest heartBeat)
  {
    FutureResultImpl future = new FutureResultImpl(handler);
    future.setFutureResult(parentFactory
        .getAsynchronousConnection(future));
    this.heartBeat = heartBeat;
    this.timeout = timeout;
    this.unit = unit;
    this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
    this.parentFactory = connectionFactory;
    new HeartBeatThread().start();
  }
  @Override
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      final ResultHandler<AsynchronousConnection> handler)
  {
    final FutureResultImpl future = new FutureResultImpl(handler);
    future.setFutureResult(parentFactory.getAsynchronousConnection(future));
    return future;
  }
}