mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
15.58.2009 388f25a9dc58704ea19a333ba9a28054d48590b1
sdk/src/com/sun/opends/sdk/ldap/AbstractResultFutureImpl.java
@@ -29,236 +29,97 @@
import java.util.concurrent.*;
import java.util.logging.Level;
import org.opends.sdk.ErrorResultException;
import org.opends.sdk.ResultCode;
import org.opends.sdk.ResultFuture;
import org.opends.sdk.FutureResult;
import org.opends.sdk.ResultHandler;
import org.opends.sdk.requests.Requests;
import org.opends.sdk.responses.Responses;
import org.opends.sdk.responses.Result;
import com.sun.opends.sdk.util.StaticUtils;
import com.sun.opends.sdk.util.AbstractFutureResult;
/**
 * Abstract result future implementation.
 *
 * @param <S>
 *          The type of result returned by this future.
 */
public abstract class AbstractResultFutureImpl<R extends Result>
    implements ResultFuture<R>, Runnable
abstract class AbstractResultFutureImpl<S extends Result> extends
    AbstractFutureResult<S> implements FutureResult<S>
{
  private final LDAPConnection connection;
  private final ResultHandler<? super R> handler;
  private final ExecutorService handlerExecutor;
  private final int messageID;
  // Use a semaphore instead of a lock because semaphores can be
  // released by different thread to acquirer.
  private final Semaphore invokerLock;
  private final CountDownLatch latch = new CountDownLatch(1);
  private volatile boolean isCancelled = false;
  private volatile R result = null;
  /**
   * Creates a new LDAP result future.
   *
   * @param messageID
   *          The request message ID.
   * @param handler
   *          The result handler, maybe {@code null}.
   * @param connection
   *          The client connection.
   */
  AbstractResultFutureImpl(int messageID,
      ResultHandler<? super R> handler, LDAPConnection connection,
      ExecutorService handlerExecutor)
      ResultHandler<? super S> handler, LDAPConnection connection)
  {
    super(handler);
    this.messageID = messageID;
    this.handler = handler;
    this.connection = connection;
    this.handlerExecutor = handlerExecutor;
    if (handlerExecutor == null)
    {
      invokerLock = null;
    }
    else
    {
      invokerLock = new Semaphore(1);
    }
  }
  public synchronized boolean cancel(boolean b)
  /**
   * {@inheritDoc}
   */
  protected final ErrorResultException handleCancelRequest()
  {
    if (!isDone())
    {
      isCancelled = true;
      connection.abandon(Requests.newAbandonRequest(messageID));
      latch.countDown();
      return true;
    }
    else
    {
      return false;
    }
    connection.abandon(Requests.newAbandonRequest(messageID));
    return null;
  }
  public R get() throws InterruptedException, ErrorResultException
  {
    latch.await();
    return get0();
  }
  public R get(long timeout, TimeUnit unit)
      throws InterruptedException, TimeoutException,
      ErrorResultException
  {
    if (!latch.await(timeout, unit))
    {
      throw new TimeoutException();
    }
    return get0();
  }
  public int getMessageID()
  /**
   * {@inheritDoc}
   */
  public final int getRequestID()
  {
    return messageID;
  }
  public boolean isCancelled()
  final void adaptErrorResult(Result result)
  {
    return isCancelled;
    S errorResult = newErrorResult(result.getResultCode(), result
        .getDiagnosticMessage(), result.getCause());
    setResultOrError(errorResult);
  }
  public boolean isDone()
  {
    return latch.getCount() == 0;
  }
  public void run()
  final void setResultOrError(S result)
  {
    if (result.getResultCode().isExceptional())
    {
      ErrorResultException e = ErrorResultException.wrap(result);
      handler.handleErrorResult(e);
      handleErrorResult(ErrorResultException.wrap(result));
    }
    else
    {
      handler.handleResult(result);
      handleResult(result);
    }
  }
  final void handleErrorResult(Result result)
  {
    R errorResult = newErrorResult(result.getResultCode(), result
        .getDiagnosticMessage(), result.getCause());
    handleResult(errorResult);
  }
  abstract R newErrorResult(ResultCode resultCode,
  abstract S newErrorResult(ResultCode resultCode,
      String diagnosticMessage, Throwable cause);
  final void handleResult(R result)
  {
    if (!isDone())
    {
      this.result = result;
      if (handler != null)
      {
        invokeHandler(this);
      }
      latch.countDown();
    }
  }
  final void invokeHandler(final Runnable runnable)
  {
    try
    {
      if (handlerExecutor == null)
      {
        runnable.run();
      }
      else
      {
        invokerLock.acquire();
        try
        {
          handlerExecutor.submit(new Runnable()
          {
            public void run()
            {
              try
              {
                runnable.run();
              }
              finally
              {
                invokerLock.release();
              }
            }
          });
        }
        catch (Exception e)
        {
          invokerLock.release();
        }
      }
    }
    catch (InterruptedException e)
    {
      // Thread has been interrupted so give up.
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG.warning(String.format(
            "Invoke thread interrupted: %s", StaticUtils
                .getExceptionMessage(e)));
      }
      // Reset interrupt status.
      Thread.currentThread().interrupt();
    }
  }
  private R get0() throws ErrorResultException
  {
    if (isCancelled())
    {
      throw ErrorResultException.wrap(Responses
          .newResult(ResultCode.CLIENT_SIDE_USER_CANCELLED));
    }
    else if (result.getResultCode().isExceptional())
    {
      throw ErrorResultException.wrap(result);
    }
    else
    {
      return result;
    }
  }
}