mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
22.37.2009 9d6f5c9a5b7771e595892e6935cf1cc42012c4c6
Fixed some bugs with the connection pool. 
Moved heart beat back to heart beat connection factory from isValid method.
8 files modified
389 ■■■■■ changed files
sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java 40 ●●●● patch | view | raw | blame | history
sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java 2 ●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java 24 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/AsynchronousConnection.java 6 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java 9 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/ConnectionPool.java 192 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java 85 ●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java 31 ●●●●● patch | view | raw | blame | history
sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -37,7 +37,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
@@ -59,7 +61,7 @@
import com.sun.grizzly.ssl.*;
import com.sun.grizzly.streams.StreamWriter;
import com.sun.opends.sdk.util.Validator;
import com.sun.opends.sdk.util.StaticUtils;
/**
@@ -664,8 +666,6 @@
  private final Object writeLock = new Object();
  private final SearchRequest pingRequest;
  /**
   * Creates a new LDAP connection.
   *
@@ -676,22 +676,18 @@
   * @param schema
   *          The schema which will be used to decode responses from the
   *          server.
   * @param pingRequest
   *          The search request to use to verify the validity of
   *          this connection.
   * @param connFactory
   *          The associated connection factory.
   */
  LDAPConnection(com.sun.grizzly.Connection<?> connection,
      InetSocketAddress serverAddress, Schema schema,
      SearchRequest pingRequest, LDAPConnectionFactoryImpl connFactory)
      LDAPConnectionFactoryImpl connFactory)
  {
    this.connection = connection;
    this.serverAddress = serverAddress;
    this.schema = schema;
    this.connFactory = connFactory;
    this.streamWriter = getFilterChainStreamWriter();
    this.pingRequest = pingRequest;
  }
@@ -1352,10 +1348,6 @@
    synchronized (writeLock)
    {
      if(reason == null && unbindRequest == null)
      {
        System.out.println("Something is wrong!");
      }
      if (isClosed)
      {
        // Already closed.
@@ -1503,27 +1495,7 @@
   */
  public boolean isValid()
  {
    if(!isClosed && connectionInvalidReason == null)
    {
      try
      {
        search(pingRequest, null, null).get(5, TimeUnit.SECONDS);
        return true;
      }
      catch (ErrorResultException e)
      {
        StaticUtils.DEBUG_LOG.warning("Validity check returned error: " +
                                      e.getResult());
      }
      catch (TimeoutException e)
      {
        StaticUtils.DEBUG_LOG.warning("Validity check timed out");
      }
      catch (InterruptedException e)
      {
      }
    }
    return false;
    return connectionInvalidReason == null && !isClosed;
  }
  //
  //
sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -460,7 +460,7 @@
        .getDefaultFilterChainFactory().getFilterChainPattern());
    LDAPConnection ldapConnection = new LDAPConnection(connection,
        socketAddress, options.getSchema(), options.getPingRequest(), this);
        socketAddress, options.getSchema(), this);
    ldapConnectionAttr.set(connection, ldapConnection);
    return ldapConnection;
  }
sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -1,9 +1,11 @@
package org.opends.sdk;
import com.sun.opends.sdk.util.Validator;
import com.sun.opends.sdk.util.StaticUtils;
import java.util.List;
import java.util.ArrayList;
import java.util.logging.Level;
/**
 * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:49:17
@@ -70,6 +72,15 @@
          {
            resultHandler.handleErrorResult(error);
          }
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
          {
            StaticUtils.DEBUG_LOG
                .warning(String
                    .format(
                    "Connection factory " + factory +
                        " is no longer operational: "
                        + error.getMessage()));
          }
        }
        public void handleResult(AsynchronousConnection result)
@@ -79,6 +90,14 @@
          {
            resultHandler.handleResult(result);
          }
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
          {
            StaticUtils.DEBUG_LOG
                .warning(String
                    .format(
                    "Connection factory " + factory +
                        " is now operational"));
          }
        }
      };
      return factory.getAsynchronousConnection(handler);
@@ -102,6 +121,11 @@
          if(!f.isOperational && (f.pendingConnectFuture == null ||
                                  f.pendingConnectFuture.isDone()))
          {
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINEST))
            {
              StaticUtils.DEBUG_LOG
                  .finest(String.format("Attempting connect on factory " + f));
            }
            f.pendingConnectFuture = f.factory.getAsynchronousConnection(f);
          }
        }
sdk/src/org/opends/sdk/AsynchronousConnection.java
@@ -379,12 +379,6 @@
  /**
   * Returns true if the connection has not been closed and is still valid.
   * The implementation shall submit a search on the connection or use some
   * other mechanism that positively verifies the connection is still valid
   * when this method is called.
   *
   * The query submitted by the driver to validate the connection shall be
   * executed in the authentication context.
   *
   * @return {@code true} if the connection is valid, {@code false} otherwise.
   */
sdk/src/org/opends/sdk/AuthenticatedConnectionFactory.java
@@ -253,6 +253,15 @@
    /**
     * {@inheritDoc}
     */
    public boolean isValid() {
      return connection.isValid();
    }
    /**
     * {@inheritDoc}
     */
    public boolean isClosed()
    {
      return connection.isClosed();
sdk/src/org/opends/sdk/ConnectionPool.java
@@ -132,58 +132,10 @@
        }
        isClosed = true;
        // Don't put closed connections back in the pool.
        if (!connection.isValid())
        // Don't put invalid connections back in the pool.
        if (connection.isValid())
        {
          numConnections--;
        }
        else
        {
          // See if there waiters pending.
          for (;;)
          {
            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                connection);
            FuturePooledConnection future = pendingFutures.poll();
            if (future == null)
            {
              // No waiters - so drop out and add connection to pool.
              break;
            }
            future.handleResult(pooledConnection);
            if (!future.isCancelled())
            {
              // The future was not cancelled and the connection was
              // accepted.
              if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
              {
                StaticUtils.DEBUG_LOG
                    .finest(String
                        .format(
                        "Connection released to pool and directly "
                        + "given to waiter. numConnections: %d, poolSize: %d, "
                        + "pendingFutures: %d", numConnections,
                        pool.size(), pendingFutures.size()));
              }
              return;
            }
          }
          // No waiters. Put back in pool.
          pool.offer(connection);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG
                .finest(String
                    .format(
                    "Connection released to pool and directly "
                    + "given to waiter. numConnections: %d, poolSize: %d, "
                    + "pendingFutures: %d", numConnections,
                    pool.size(), pendingFutures.size()));
          }
          releaseConnection(connection);
          return;
        }
      }
@@ -191,19 +143,31 @@
      // Connection is no longer valid. Close outside of lock
      connection.removeConnectionEventListener(this);
      connection.close();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
            .warning(String
                .format(
                "Dead connection released to pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                "Dead connection released and closed. "
                    + "numConnections: %d, poolSize: %d, " +
                    "pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
      return;
    }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG
            .warning(String
                .format(
                "Reconnect attempt starting. "
                    + "numConnections: %d, poolSize: %d, " +
                    "pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
      connectionFactory.getAsynchronousConnection(new ReconnectHandler());
    }
    public void close(UnbindRequest request, String reason)
@@ -443,12 +407,12 @@
        // careful that users of the pooled connection get a sensible
        // error if they continue to use it (i.e. not an NPE or ISE).
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
              .warning(String
                  .format(
                  "Connection error occured: "
                  "Connection error occured and removed from pool: "
                  + error.getMessage()
                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures
@@ -458,7 +422,52 @@
    }
  }
  private class ReconnectHandler
      implements ResultHandler<AsynchronousConnection>
  {
    public void handleErrorResult(ErrorResultException error) {
      // The reconnect failed. Fail the connect attempt.
      numConnections --;
      // The reconnect failed. The underlying connection factory probably went
      // down. Just fail all pending futures
      synchronized (pool)
      {
        while(!pendingFutures.isEmpty())
        {
          pendingFutures.poll().handleErrorResult(error);
        }
      }
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.WARNING))
      {
        StaticUtils.DEBUG_LOG
            .warning(String
                .format(
                "Reconnect failed. Failed all pending futures: "
                    + error.getMessage()
                    + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
    }
    public void handleResult(AsynchronousConnection connection) {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
                .format(
                "Reconnect succeded. "
                    + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
      synchronized (pool)
      {
        releaseConnection(connection);
      }
    }
  }
  // Future used for waiting for pooled connections to become available.
  private static final class FuturePooledConnection extends
@@ -482,6 +491,54 @@
  }
  private void releaseConnection(AsynchronousConnection connection)
  {
    // See if there waiters pending.
    for (;;)
    {
      PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
          connection);
      FuturePooledConnection future = pendingFutures.poll();
      if (future == null)
      {
        // No waiters - so drop out and add connection to pool.
        break;
      }
      future.handleResult(pooledConnection);
      if (!future.isCancelled())
      {
        // The future was not cancelled and the connection was
        // accepted.
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                  "Connection released and directly "
                      + "given to waiter. numConnections: %d, poolSize: %d, "
                      + "pendingFutures: %d", numConnections,
                  pool.size(), pendingFutures.size()));
        }
        return;
      }
    }
    // No waiters. Put back in pool.
    pool.offer(connection);
    if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
    {
      StaticUtils.DEBUG_LOG
          .finest(String
              .format(
              "Connection released to pool. numConnections: %d, " +
                  "poolSize: %d, pendingFutures: %d", numConnections,
              pool.size(), pendingFutures.size()));
    }
  }
  /**
@@ -546,7 +603,7 @@
      try
      {
        // We can create a new connection.
        conn = connectionFactory.getAsynchronousConnection(handler).get();
        conn = connectionFactory.getAsynchronousConnection(null).get();
        numConnections++;
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
@@ -561,13 +618,22 @@
      }
      catch (ErrorResultException e)
      {
        if (handler != null)
        {
          handler.handleErrorResult(e);
        }
        return new CompletedFutureResult<AsynchronousConnection>(e);
      }
      catch (InterruptedException e)
      {
        return new CompletedFutureResult<AsynchronousConnection>(
        ErrorResultException error =
            new ErrorResultException(Responses.newResult(
                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e));
        if (handler != null)
        {
          handler.handleErrorResult(error);
        }
        return new CompletedFutureResult<AsynchronousConnection>(error);
      }
    }
    else
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,14 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.Validator;
@@ -46,10 +47,14 @@
 * An heart beat connection factory can be used to create connections
 * that sends a periodic search request to a Directory Server.
 */
public class HeartBeatConnectionFactory extends
final class HeartBeatConnectionFactory extends
    AbstractConnectionFactory<AsynchronousConnection>
{
  private final int interval;
  private final SearchRequest heartBeat;
  private final long timeout;
  private final TimeUnit unit;
  private final List<AsynchronousConnectionImpl> activeConnections;
@@ -59,7 +64,30 @@
  // FIXME: use a single global scheduler?
  // FIXME: change timeout parameters to long+TimeUnit.
  /**
   * Creates a new heart-beat connection factory which will create
   * connections using the provided connection factory and periodically
   * ping any created connections in order to detect that they are still
   * alive.
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   */
  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
      long timeout, TimeUnit unit)
  {
    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
  }
  private static final SearchRequest DEFAULT_SEARCH = Requests
      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
          "1.1");
@@ -71,15 +99,19 @@
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param interval
   *          The period between keepalive pings.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   * @param heartBeat
   *          The search request to use when pinging connections.
   */
  public HeartBeatConnectionFactory(
      ConnectionFactory<?> connectionFactory,
      int interval)
  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
      long timeout, TimeUnit unit, SearchRequest heartBeat)
  {
    Validator.ensureNotNull(connectionFactory);
    this.interval = interval;
    this.heartBeat = heartBeat;
    this.timeout = timeout;
    this.unit = unit;
    this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
    this.parentFactory = connectionFactory;
@@ -93,9 +125,11 @@
   * operations.
   */
  private final class AsynchronousConnectionImpl implements
      AsynchronousConnection, ConnectionEventListener
      AsynchronousConnection, ConnectionEventListener, ResultHandler<Result>
  {
    private final AsynchronousConnection connection;
    private long lastSuccessfulPing;
    private FutureResult<Result> lastPingFuture;
    private AsynchronousConnectionImpl(AsynchronousConnection connection)
@@ -316,9 +350,9 @@
     */
    public boolean isValid()
    {
      // Avoid extra pings... Let the next ping find out if this connection
      // is still valid.
      return connection.isValid();
      return connection.isValid() && (lastSuccessfulPing <= 0 ||
          System.currentTimeMillis() - lastSuccessfulPing <
              unit.toMillis(timeout) * 2);
    }
    public void connectionReceivedUnsolicitedNotification(
@@ -338,6 +372,15 @@
        activeConnections.remove(this);
      }
    }
    public void handleErrorResult(ErrorResultException error) {
      connection.close(Requests.newUnbindRequest(),
          "Heartbeat retured error: " + error);
    }
    public void handleResult(Result result) {
        lastSuccessfulPing = System.currentTimeMillis();
    }
  }
@@ -354,22 +397,26 @@
    public void run()
    {
      long startTime;
      while(true)
      {
        startTime = System.currentTimeMillis();
        synchronized (activeConnections)
        {
          for (AsynchronousConnectionImpl connection : activeConnections)
          {
            if(!connection.isValid())
            if(connection.lastPingFuture == null ||
                connection.lastPingFuture.isDone())
            {
              connection.close(Requests.newUnbindRequest(),
                               "Connection no longer valid");
              connection.lastPingFuture =
                  connection.search(heartBeat, connection, null);
            }
          }
        }
        try
        {
          sleep(interval);
          sleep(unit.toMillis(timeout) -
              (System.currentTimeMillis() - startTime));
        }
        catch (InterruptedException e)
        {
sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
@@ -32,9 +32,6 @@
import javax.net.ssl.SSLContext;
import org.opends.sdk.schema.Schema;
import org.opends.sdk.requests.SearchRequest;
import org.opends.sdk.requests.Requests;
import org.opends.sdk.SearchScope;
import com.sun.opends.sdk.util.Validator;
@@ -45,18 +42,12 @@
 */
public final class LDAPConnectionOptions
{
  private static final SearchRequest DEFAULT_PING = Requests
      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
                        "1.1");
  private Schema schema = Schema.getDefaultSchema();
  private SSLContext sslContext = null;
  private boolean useStartTLS = false;
  private SearchRequest pingRequest = DEFAULT_PING;
  /**
@@ -206,29 +197,7 @@
    return this;
  }
  /**
   * Retrieves the search request used to verify the validity of a
   * LDAP connection. By default, the root DSE is retrieved without any
   * attributes.
   *
   * @return The search request.
   */
  public SearchRequest getPingRequest()
  {
    return pingRequest;
  }
  /**
   * Sets the search request used to verify the validity of a
   * LDAP connection.
   *
   * @param pingRequest The search request that can be used to verify the
   *                    validity of a LDAP connection.
   */
  public void setPingRequest(SearchRequest pingRequest)
  {
    this.pingRequest = pingRequest;
  }
  // Assigns the provided options to this set of options.
  LDAPConnectionOptions assign(LDAPConnectionOptions options)