mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
22.37.2009 9d6f5c9a5b7771e595892e6935cf1cc42012c4c6
sdk/src/org/opends/sdk/ConnectionPool.java
@@ -132,58 +132,10 @@
        }
        isClosed = true;
        // Don't put closed connections back in the pool.
        if (!connection.isValid())
        // Don't put invalid connections back in the pool.
        if (connection.isValid())
        {
          numConnections--;
        }
        else
        {
          // See if there waiters pending.
          for (;;)
          {
            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                connection);
            FuturePooledConnection future = pendingFutures.poll();
            if (future == null)
            {
              // No waiters - so drop out and add connection to pool.
              break;
            }
            future.handleResult(pooledConnection);
            if (!future.isCancelled())
            {
              // The future was not cancelled and the connection was
              // accepted.
              if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
              {
                StaticUtils.DEBUG_LOG
                    .finest(String
                        .format(
                        "Connection released to pool and directly "
                        + "given to waiter. numConnections: %d, poolSize: %d, "
                        + "pendingFutures: %d", numConnections,
                        pool.size(), pendingFutures.size()));
              }
              return;
            }
          }
          // No waiters. Put back in pool.
          pool.offer(connection);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG
                .finest(String
                    .format(
                    "Connection released to pool and directly "
                    + "given to waiter. numConnections: %d, poolSize: %d, "
                    + "pendingFutures: %d", numConnections,
                    pool.size(), pendingFutures.size()));
          }
          releaseConnection(connection);
          return;
        }
      }
@@ -191,19 +143,31 @@
      // Connection is no longer valid. Close outside of lock
      connection.removeConnectionEventListener(this);
      connection.close();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
            .warning(String
                .format(
                "Dead connection released to pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                "Dead connection released and closed. "
                    + "numConnections: %d, poolSize: %d, " +
                    "pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
      return;
    }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG
            .warning(String
                .format(
                "Reconnect attempt starting. "
                    + "numConnections: %d, poolSize: %d, " +
                    "pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
    }
    public void close(UnbindRequest request, String reason)
@@ -443,12 +407,12 @@
        // careful that users of the pooled connection get a sensible
        // error if they continue to use it (i.e. not an NPE or ISE).
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
              .warning(String
                  .format(
                  "Connection error occured: "
                  "Connection error occured and removed from pool: "
                  + error.getMessage()
                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures
@@ -458,7 +422,52 @@
    }
  }
  private class ReconnectHandler
      implements ResultHandler<AsynchronousConnection>
  {
    public void handleErrorResult(ErrorResultException error) {
      // The reconnect failed. Fail the connect attempt.
      numConnections --;
      // The reconnect failed. The underlying connection factory probably went
      // down. Just fail all pending futures
      synchronized (pool)
      {
        while(!pendingFutures.isEmpty())
        {
          pendingFutures.poll().handleErrorResult(error);
        }
      }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG
            .warning(String
                .format(
                "Reconnect failed. Failed all pending futures: "
                    + error.getMessage()
                    + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
    }
    public void handleResult(AsynchronousConnection connection) {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
                .format(
                "Reconnect succeded. "
                    + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
      synchronized (pool)
      {
        releaseConnection(connection);
      }
    }
  }
  // Future used for waiting for pooled connections to become available.
  private static final class FuturePooledConnection extends
@@ -482,6 +491,54 @@
  }
  private void releaseConnection(AsynchronousConnection connection)
  {
    // See if there waiters pending.
    for (;;)
    {
      PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
          connection);
      FuturePooledConnection future = pendingFutures.poll();
      if (future == null)
      {
        // No waiters - so drop out and add connection to pool.
        break;
      }
      future.handleResult(pooledConnection);
      if (!future.isCancelled())
      {
        // The future was not cancelled and the connection was
        // accepted.
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                  "Connection released and directly "
                      + "given to waiter. numConnections: %d, poolSize: %d, "
                      + "pendingFutures: %d", numConnections,
                  pool.size(), pendingFutures.size()));
        }
        return;
      }
    }
    // No waiters. Put back in pool.
    pool.offer(connection);
    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
    {
      StaticUtils.DEBUG_LOG
          .finest(String
              .format(
              "Connection released to pool. numConnections: %d, " +
                  "poolSize: %d, pendingFutures: %d", numConnections,
              pool.size(), pendingFutures.size()));
    }
  }
  /**
@@ -546,7 +603,7 @@
      try
      {
        // We can create a new connection.
        conn = connectionFactory.getAsynchronousConnection(handler).get();
        conn = connectionFactory.getAsynchronousConnection(null).get();
        numConnections++;
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
@@ -561,13 +618,22 @@
      }
      catch (ErrorResultException e)
      {
        if (handler != null)
        {
          handler.handleErrorResult(e);
        }
        return new CompletedFutureResult<AsynchronousConnection>(e);
      }
      catch (InterruptedException e)
      {
        return new CompletedFutureResult<AsynchronousConnection>(
        ErrorResultException error =
            new ErrorResultException(Responses.newResult(
                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
        if (handler != null)
        {
          handler.handleErrorResult(error);
        }
        return new CompletedFutureResult<AsynchronousConnection>(error);
      }
    }
    else