mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
22.55.2009 1fbc9df5d0c44ae72c76dacc3c945d4f0d641ee6
Initial implementation of fail over connection factory
4 files added
7 files modified
908 ■■■■■ changed files
sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java 234 ●●●● patch | view | raw | blame | history
sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java 2 ●●● patch | view | raw | blame | history
sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java 7 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java 119 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/AsynchronousConnection.java 16 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/ConnectionPool.java 308 ●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java 26 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java 109 ●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java 10 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java 46 ●●●●● patch | view | raw | blame | history
sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java 31 ●●●●● patch | view | raw | blame | history
sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -37,9 +37,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
@@ -61,7 +59,7 @@
import com.sun.grizzly.ssl.*;
import com.sun.grizzly.streams.StreamWriter;
import com.sun.opends.sdk.util.Validator;
import com.sun.opends.sdk.util.StaticUtils;
/**
@@ -666,6 +664,8 @@
  private final Object writeLock = new Object();
  private final SearchRequest pingRequest;
  /**
   * Creates a new LDAP connection.
   *
@@ -676,18 +676,22 @@
   * @param schema
   *          The schema which will be used to decode responses from the
   *          server.
   * @param pingRequest
   *          The search request to use to verify the validity of
   *          this connection.
   * @param connFactory
   *          The associated connection factory.
   */
  LDAPConnection(com.sun.grizzly.Connection<?> connection,
      InetSocketAddress serverAddress, Schema schema,
      LDAPConnectionFactoryImpl connFactory)
      SearchRequest pingRequest, LDAPConnectionFactoryImpl connFactory)
  {
    this.connection = connection;
    this.serverAddress = serverAddress;
    this.schema = schema;
    this.connFactory = connFactory;
    this.streamWriter = getFilterChainStreamWriter();
    this.pingRequest = pingRequest;
  }
@@ -809,16 +813,7 @@
      NullPointerException
  {
    Validator.ensureNotNull(listener);
    synchronized (writeLock)
    {
      if (isClosed)
      {
        throw new IllegalStateException();
      }
      listeners.add(listener);
    }
    listeners.add(listener);
  }
@@ -1258,11 +1253,7 @@
      ConnectionEventListener listener) throws NullPointerException
  {
    Validator.ensureNotNull(listener);
    synchronized (writeLock)
    {
      listeners.remove(listener);
    }
    listeners.remove(listener);
  }
@@ -1354,13 +1345,17 @@
  private void close(UnbindRequest unbindRequest,
      boolean isDisconnectNotification, Result reason)
                     boolean isDisconnectNotification, Result reason)
  {
    boolean notifyClose = false;
    boolean notifyErrorOccurred = false;
    synchronized (writeLock)
    {
      boolean notifyClose = false;
      boolean notifyErrorOccurred = false;
      if(reason == null && unbindRequest == null)
      {
        System.out.println("Something is wrong!");
      }
      if (isClosed)
      {
        // Already closed.
@@ -1381,112 +1376,104 @@
      if (connectionInvalidReason != null)
      {
        // Already invalid.
        if (notifyClose)
        {
          // TODO: uncomment if close notification is required.
          // for (ConnectionEventListener listener : listeners)
          // {
          // listener.connectionClosed(this);
          // }
        }
        return;
      }
      // First abort all outstanding requests.
      for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
          .values())
      {
        if (pendingBindOrStartTLS <= 0)
        {
          ASN1StreamWriter asn1Writer = connFactory
              .getASN1Writer(streamWriter);
          int messageID = nextMsgID.getAndIncrement();
          AbandonRequest abandon = Requests.newAbandonRequest(future
              .getRequestID());
          try
          {
            LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
                abandon);
            asn1Writer.flush();
          }
          catch (IOException e)
          {
            // Underlying channel probably blown up. Just ignore.
          }
          finally
          {
            connFactory.releaseASN1Writer(asn1Writer);
          }
        }
      // Mark the connection as invalid.
      connectionInvalidReason = reason;
    }
        future.adaptErrorResult(reason);
      }
      pendingRequests.clear();
      // Now try cleanly closing the connection if possible.
      try
    // First abort all outstanding requests.
    for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
        .values())
    {
      if (pendingBindOrStartTLS <= 0)
      {
        ASN1StreamWriter asn1Writer = connFactory
            .getASN1Writer(streamWriter);
        if (unbindRequest == null)
        {
          unbindRequest = Requests.newUnbindRequest();
        }
        int messageID = nextMsgID.getAndIncrement();
        AbandonRequest abandon = Requests.newAbandonRequest(future
            .getRequestID());
        try
        {
          LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
              .getAndIncrement(), unbindRequest);
          LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
                                           abandon);
          asn1Writer.flush();
        }
        catch (IOException e)
        {
          // Underlying channel probably blown up. Just ignore.
        }
        finally
        {
          connFactory.releaseASN1Writer(asn1Writer);
        }
      }
      catch (IOException e)
      future.adaptErrorResult(reason);
    }
    pendingRequests.clear();
    // Now try cleanly closing the connection if possible.
    try
    {
      ASN1StreamWriter asn1Writer = connFactory
          .getASN1Writer(streamWriter);
      if (unbindRequest == null)
      {
        // Underlying channel prob blown up. Just ignore.
        unbindRequest = Requests.newUnbindRequest();
      }
      try
      {
        streamWriter.close();
        LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
            .getAndIncrement(), unbindRequest);
        asn1Writer.flush();
      }
      catch (IOException e)
      finally
      {
        // Ignore.
        connFactory.releaseASN1Writer(asn1Writer);
      }
    }
    catch (IOException e)
    {
      // Underlying channel prob blown up. Just ignore.
    }
      try
      {
        connection.close();
      }
      catch (IOException e)
      {
        // Ignore.
      }
    try
    {
      streamWriter.close();
    }
    catch (IOException e)
    {
      // Ignore.
    }
      // Mark the connection as invalid.
      connectionInvalidReason = reason;
    try
    {
      connection.close();
    }
    catch (IOException e)
    {
      // Ignore.
    }
      // Notify listeners.
      if (notifyClose)
      {
        // TODO: uncomment if close notification is required.
        // for (ConnectionEventListener listener : listeners)
        // {
        // listener.connectionClosed(this);
        // }
      }
    // Notify listeners.
    if (notifyClose)
    {
      // TODO: uncomment if close notification is required.
      // for (ConnectionEventListener listener : listeners)
      // {
      // listener.connectionClosed(this);
      // }
    }
      if (notifyErrorOccurred)
    if (notifyErrorOccurred)
    {
      for (ConnectionEventListener listener : listeners)
      {
        for (ConnectionEventListener listener : listeners)
        {
          listener.connectionErrorOccurred(isDisconnectNotification,
              ErrorResultException.wrap(reason));
        }
        listener.connectionErrorOccurred(isDisconnectNotification,
                                         ErrorResultException.wrap(reason));
      }
    }
  }
@@ -1506,27 +1493,38 @@
   */
  public boolean isClosed()
  {
    synchronized (writeLock)
    {
      return isClosed;
    }
    return isClosed;
  }
  //
  //
  //
  // /**
  // * {@inheritDoc}
  // */
  // public boolean isValid() throws InterruptedException
  // {
  // synchronized (writeLock)
  // {
  // return connectionInvalidReason == null;
  // }
  // }
  /**
   * {@inheritDoc}
   */
  public boolean isValid()
  {
    if(!isClosed && connectionInvalidReason == null)
    {
      try
      {
        search(pingRequest, null, null).get(5, TimeUnit.SECONDS);
        return true;
      }
      catch (ErrorResultException e)
      {
        StaticUtils.DEBUG_LOG.warning("Validity check returned error: " +
                                      e.getResult());
      }
      catch (TimeoutException e)
      {
        StaticUtils.DEBUG_LOG.warning("Validity check timed out");
      }
      catch (InterruptedException e)
      {
      }
    }
    return false;
  }
  //
  //
  //
sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -460,7 +460,7 @@
        .getDefaultFilterChainFactory().getFilterChainPattern());
    LDAPConnection ldapConnection = new LDAPConnection(connection,
        socketAddress, options.getSchema(), this);
        socketAddress, options.getSchema(), options.getPingRequest(), this);
    ldapConnectionAttr.set(connection, ldapConnection);
    return ldapConnection;
  }
sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java
@@ -309,6 +309,13 @@
    public boolean isValid()
    {
      return connection.isValid();
    }
    public void close()
    {
      connection.close();
sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
New file
@@ -0,0 +1,119 @@
package org.opends.sdk;
import com.sun.opends.sdk.util.Validator;
import java.util.List;
import java.util.ArrayList;
/**
 * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:49:17
 * PM To change this template use File | Settings | File Templates.
 */
public abstract class AbstractLoadBalancingAlgorithm
    implements LoadBalancingAlgorithm
{
  protected final List<MonitoredConnectionFactory> factoryList;
  protected AbstractLoadBalancingAlgorithm(ConnectionFactory<?>... factories)
  {
    Validator.ensureNotNull(factories);
    factoryList = new ArrayList<MonitoredConnectionFactory>(factories.length);
    for(ConnectionFactory<?> f : factories)
    {
      factoryList.add(new MonitoredConnectionFactory(f));
    }
    new MonitorThread().start();
  }
  protected class MonitoredConnectionFactory
      extends AbstractConnectionFactory<AsynchronousConnection>
      implements ResultHandler<AsynchronousConnection>
  {
    private final ConnectionFactory<?> factory;
    private volatile boolean isOperational;
    private volatile FutureResult<?> pendingConnectFuture;
    private MonitoredConnectionFactory(ConnectionFactory<?> factory)
    {
      this.factory = factory;
      this.isOperational = true;
    }
    public boolean isOperational()
    {
      return isOperational;
    }
    public void handleErrorResult(ErrorResultException error)
    {
      isOperational = false;
    }
    public void handleResult(AsynchronousConnection result)
    {
      isOperational = true;
      // TODO: Notify the server is back up
      result.close();
    }
    public FutureResult<? extends AsynchronousConnection>
      getAsynchronousConnection(
        final ResultHandler<? super AsynchronousConnection> resultHandler)
    {
      ResultHandler handler = new ResultHandler<AsynchronousConnection>()
      {
        public void handleErrorResult(ErrorResultException error)
        {
          isOperational = false;
          if(resultHandler != null)
          {
            resultHandler.handleErrorResult(error);
          }
        }
        public void handleResult(AsynchronousConnection result)
        {
          isOperational = true;
          if(resultHandler != null)
          {
            resultHandler.handleResult(result);
          }
        }
      };
      return factory.getAsynchronousConnection(handler);
    }
  }
  private class MonitorThread extends Thread
  {
    private MonitorThread()
    {
      super("Connection Factory Health Monitor");
      this.setDaemon(true);
    }
    public void run()
    {
      while(true)
      {
        for(MonitoredConnectionFactory f : factoryList)
        {
          if(!f.isOperational && (f.pendingConnectFuture == null ||
                                  f.pendingConnectFuture.isDone()))
          {
            f.pendingConnectFuture = f.factory.getAsynchronousConnection(f);
          }
        }
        try
        {
          sleep(10000);
        }
        catch (InterruptedException e)
        {
          // Ignore and just go around again...
        }
      }
    }
  }
}
sdk/src/org/opends/sdk/AsynchronousConnection.java
@@ -31,6 +31,7 @@
import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.BindResult;
@@ -377,6 +378,21 @@
  /**
   * Returns true if the connection has not been closed and is still valid.
   * The implementation shall submit a search on the connection or use some
   * other mechanism that positively verifies the connection is still valid
   * when this method is called.
   *
   * The query submitted by the driver to validate the connection shall be
   * executed in the authentication context.
   *
   * @return {@code true} if the connection is valid, {@code false} otherwise.
   */
  boolean isValid();
  /**
   * Modifies an entry in the Directory Server using the provided modify
   * request.
   *
sdk/src/org/opends/sdk/ConnectionPool.java
@@ -31,7 +31,9 @@
import java.util.Collection;
import java.util.Stack;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import org.opends.sdk.requests.*;
@@ -58,40 +60,17 @@
  private final int poolSize;
  // FIXME: should use a better collection than this - CLQ?
  private final Stack<AsynchronousConnection> pool;
  private final Queue<AsynchronousConnection> pool;
  private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
  private final Object lock = new Object();
  private final class FutureNewConnection
      extends
      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
  {
    private FutureNewConnection(
        ResultHandler<? super AsynchronousConnection> handler)
    {
      super(handler);
    }
    protected AsynchronousConnection transformResult(
        AsynchronousConnection result) throws ErrorResultException
    {
      return new PooledConnectionWapper(result);
    }
  }
  private final class PooledConnectionWapper implements
      AsynchronousConnection, ConnectionEventListener
  {
    private AsynchronousConnection connection;
    private final AsynchronousConnection connection;
    private volatile boolean isClosed;
    private PooledConnectionWapper(AsynchronousConnection connection)
@@ -106,7 +85,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -120,7 +99,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -134,7 +113,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -145,29 +124,26 @@
    public void close()
    {
      synchronized (lock)
      synchronized (pool)
      {
        try
        if(isClosed)
        {
          // Don't put closed connections back in the pool.
          if (connection.isClosed())
          {
            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
            {
              StaticUtils.DEBUG_LOG
                  .finest(String
                      .format(
                          "Dead connection released to pool. "
                              + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                          numConnections, pool.size(), pendingFutures
                              .size()));
            }
            return;
          }
          return;
        }
        isClosed = true;
        // Don't put closed connections back in the pool.
        if (!connection.isValid())
        {
          numConnections--;
        }
        else
        {
          // See if there waiters pending.
          for (;;)
          {
            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                connection);
            FuturePooledConnection future = pendingFutures.poll();
            if (future == null)
@@ -176,8 +152,6 @@
              break;
            }
            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
                connection);
            future.handleResult(pooledConnection);
            if (!future.isCancelled())
@@ -189,34 +163,45 @@
                StaticUtils.DEBUG_LOG
                    .finest(String
                        .format(
                            "Connection released to pool and directly "
                                + "given to waiter. numConnections: %d, poolSize: %d, "
                                + "pendingFutures: %d", numConnections,
                            pool.size(), pendingFutures.size()));
                        "Connection released to pool and directly "
                        + "given to waiter. numConnections: %d, poolSize: %d, "
                        + "pendingFutures: %d", numConnections,
                        pool.size(), pendingFutures.size()));
              }
              return;
            }
          }
          // No waiters. Put back in pool.
          pool.push(connection);
          pool.offer(connection);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG
                .finest(String
                    .format(
                        "Connection released to pool. "
                            + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                        numConnections, pool.size(), pendingFutures
                            .size()));
                    "Connection released to pool and directly "
                    + "given to waiter. numConnections: %d, poolSize: %d, "
                    + "pendingFutures: %d", numConnections,
                    pool.size(), pendingFutures.size()));
          }
        }
        finally
        {
          // Null out the underlying connection to prevent further use.
          connection = null;
          return;
        }
      }
      // Connection is no longer valid. Close outside of lock
      connection.removeConnectionEventListener(this);
      connection.close();
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
                .format(
                "Dead connection released to pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
      return;
    }
@@ -234,7 +219,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -248,7 +233,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -262,7 +247,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -276,7 +261,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -290,7 +275,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -305,7 +290,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -324,7 +309,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -343,7 +328,7 @@
        throws UnsupportedOperationException, IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -359,7 +344,7 @@
        ResultHandler<RootDSE> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -375,7 +360,7 @@
        ResultHandler<Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -391,7 +376,7 @@
        ResultHandler<Schema> handler)
        throws UnsupportedOperationException, IllegalStateException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -404,7 +389,7 @@
        ConnectionEventListener listener) throws IllegalStateException,
        NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -415,7 +400,7 @@
    public void removeConnectionEventListener(
        ConnectionEventListener listener) throws NullPointerException
    {
      if (connection == null)
      if (isClosed())
      {
        throw new IllegalStateException();
      }
@@ -428,10 +413,13 @@
     */
    public boolean isClosed()
    {
      return connection == null;
      return isClosed;
    }
    public boolean isValid()
    {
      return !isClosed && connection.isValid();
    }
    public void connectionReceivedUnsolicitedNotification(
        GenericExtendedResult notification)
@@ -444,10 +432,10 @@
    public void connectionErrorOccurred(
        boolean isDisconnectNotification, ErrorResultException error)
    {
      synchronized (lock)
      // Remove this connection from the pool if its in there. If not, just
      // ignore and wait for the user to close and we can deal with it there.
      if(pool.remove(this))
      {
        // Remove this connection from the pool if its in there
        pool.remove(this);
        numConnections--;
        connection.removeConnectionEventListener(this);
@@ -460,11 +448,11 @@
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "Connection error occured: "
                          + error.getMessage()
                          + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
                  "Connection error occured: "
                  + error.getMessage()
                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures
                      .size()));
        }
      }
    }
@@ -510,86 +498,100 @@
  {
    this.connectionFactory = connectionFactory;
    this.poolSize = poolSize;
    this.pool = new Stack<AsynchronousConnection>();
    this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
    this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
  }
  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
      ResultHandler<? super AsynchronousConnection> handler)
  public synchronized FutureResult<AsynchronousConnection>
  getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler)
  {
    synchronized (lock)
    // This entire method is synchronized to ensure new connects are done
    // synchronously to avoid the "pending connect" case.
    AsynchronousConnection conn;
    synchronized(pool)
    {
      // Check to see if we have a connection in the pool
      if (!pool.isEmpty())
      conn = pool.poll();
      if (conn == null)
      {
        AsynchronousConnection conn = pool.pop();
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        // Pool was empty. Maybe a new connection if pool size is not
        // reached
        if (numConnections >= poolSize)
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "Connection aquired from pool. "
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
          // We reached max # of conns so wait for a connection to become available.
          FuturePooledConnection future = new FuturePooledConnection(
              handler);
          pendingFutures.add(future);
          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
          {
            StaticUtils.DEBUG_LOG
                .finest(String
                    .format(
                    "No connections available. Wait-listed"
                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                    numConnections, pool.size(), pendingFutures
                        .size()));
          }
          return future;
        }
        PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
            conn);
        if (handler != null)
        {
          handler.handleResult(pooledConnection);
        }
        return new CompletedFutureResult<AsynchronousConnection>(
            pooledConnection);
      }
      // Pool was empty. Maybe a new connection if pool size is not
      // reached
      if (numConnections < poolSize)
      {
        // We can create a new connection.
        numConnections++;
        FutureNewConnection future = new FutureNewConnection(handler);
        future.setFutureResult(connectionFactory
            .getAsynchronousConnection(future));
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "New connection established and aquired. "
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
        return future;
      }
      else
      {
        // Pool is full so wait for a connection to become available.
        FuturePooledConnection future = new FuturePooledConnection(
            handler);
        pendingFutures.add(future);
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                      "No connections available. Wait-listed"
                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                      numConnections, pool.size(), pendingFutures
                          .size()));
        }
        return future;
      }
    }
    if(conn == null)
    {
      try
      {
        // We can create a new connection.
        conn = connectionFactory.getAsynchronousConnection(handler).get();
        numConnections++;
        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
        {
          StaticUtils.DEBUG_LOG
              .finest(String
                  .format(
                  "New connection established and aquired. "
                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                  numConnections, pool.size(), pendingFutures
                      .size()));
        }
      }
      catch (ErrorResultException e)
      {
        return new CompletedFutureResult<AsynchronousConnection>(e);
      }
      catch (InterruptedException e)
      {
        return new CompletedFutureResult<AsynchronousConnection>(
            new ErrorResultException(Responses.newResult(
                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
      }
    }
    else
    {
      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
      {
        StaticUtils.DEBUG_LOG
            .finest(String
                .format(
                "Connection aquired from pool. "
                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
                numConnections, pool.size(), pendingFutures
                    .size()));
      }
    }
    PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
        conn);
    if (handler != null)
    {
      handler.handleResult(pooledConnection);
    }
    return new CompletedFutureResult<AsynchronousConnection>(
        pooledConnection);
  }
}
sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
New file
@@ -0,0 +1,26 @@
package org.opends.sdk;
/**
 * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 5:42:01
 * PM To change this template use File | Settings | File Templates.
 */
public class FailoverLoadBalancingAlgorithm
    extends AbstractLoadBalancingAlgorithm
{
  public FailoverLoadBalancingAlgorithm(ConnectionFactory<?>... factories)
  {
    super(factories);
  }
  public ConnectionFactory<?> getNextConnectionFactory()
  {
    for(MonitoredConnectionFactory f : factoryList)
    {
      if(f.isOperational())
      {
        return f;
      }
    }
    return null;
  }
}
sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,13 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.opends.sdk.requests.*;
import org.opends.sdk.responses.*;
import org.opends.sdk.schema.Schema;
import com.sun.opends.sdk.util.FutureResultTransformer;
import com.sun.opends.sdk.util.Validator;
@@ -46,47 +46,20 @@
 * An heart beat connection factory can be used to create connections
 * that sends a periodic search request to a Directory Server.
 */
final class HeartBeatConnectionFactory extends
public class HeartBeatConnectionFactory extends
    AbstractConnectionFactory<AsynchronousConnection>
{
  private final SearchRequest heartBeat;
  private final long interval;
  private final int interval;
  private final List<AsynchronousConnectionImpl> activeConnections;
  private final ConnectionFactory<?> parentFactory;
  private volatile boolean stopRequested;
  // FIXME: use a single global scheduler?
  /**
   * Creates a new heart-beat connection factory which will create
   * connections using the provided connection factory and periodically
   * ping any created connections in order to detect that they are still
   * alive.
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   */
  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
      long timeout, TimeUnit unit)
  {
    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
  }
  private static final SearchRequest DEFAULT_SEARCH = Requests
      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
          "1.1");
  // FIXME: change timeout parameters to long+TimeUnit.
@@ -98,30 +71,18 @@
   *
   * @param connectionFactory
   *          The connection factory to use for creating connections.
   * @param timeout
   *          The time to wait between keepalive pings.
   * @param unit
   *          The time unit of the timeout argument.
   * @param heartBeat
   *          The search request to use when pinging connections.
   * @param interval
   *          The period between keepalive pings.
   */
  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
      long timeout, TimeUnit unit, SearchRequest heartBeat)
  public HeartBeatConnectionFactory(
      ConnectionFactory<?> connectionFactory,
      int interval)
  {
    this.heartBeat = heartBeat;
    this.interval = unit.toMillis(timeout);
    Validator.ensureNotNull(connectionFactory);
    this.interval = interval;
    this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
    this.parentFactory = connectionFactory;
    Runtime.getRuntime().addShutdownHook(new Thread()
    {
      @Override
      public void run()
      {
        stopRequested = true;
      }
    });
    new HeartBeatThread().start();
  }
@@ -132,13 +93,11 @@
   * operations.
   */
  private final class AsynchronousConnectionImpl implements
      AsynchronousConnection, ConnectionEventListener,
      ResultHandler<Result>
      AsynchronousConnection, ConnectionEventListener
  {
    private final AsynchronousConnection connection;
    private AsynchronousConnectionImpl(AsynchronousConnection connection)
    {
      this.connection = connection;
@@ -352,7 +311,15 @@
      return connection.isClosed();
    }
    /**
     * {@inheritDoc}
     */
    public boolean isValid()
    {
      // Avoid extra pings... Let the next ping find out if this connection
      // is still valid.
      return connection.isValid();
    }
    public void connectionReceivedUnsolicitedNotification(
        GenericExtendedResult notification)
@@ -371,31 +338,6 @@
        activeConnections.remove(this);
      }
    }
    public void handleErrorResult(ErrorResultException error)
    {
      // TODO: I18N
      if (error instanceof TimeoutResultException)
      {
        close(Requests.newUnbindRequest(), "Heart beat timed out");
      }
    }
    public void handleResult(Result result)
    {
      // Do nothing
    }
    private void sendHeartBeat()
    {
      search(heartBeat, this, null);
    }
  }
@@ -405,19 +347,24 @@
    private HeartBeatThread()
    {
      super("Heart Beat Thread");
      this.setDaemon(true);
    }
    public void run()
    {
      while (!stopRequested)
      while(true)
      {
        synchronized (activeConnections)
        {
          for (AsynchronousConnectionImpl connection : activeConnections)
          {
            connection.sendHeartBeat();
            if(!connection.isValid())
            {
              connection.close(Requests.newUnbindRequest(),
                               "Connection no longer valid");
            }
          }
        }
        try
sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java
New file
@@ -0,0 +1,10 @@
package org.opends.sdk;
/**
 * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:37:03
 * PM To change this template use File | Settings | File Templates.
 */
public interface LoadBalancingAlgorithm
{
  public ConnectionFactory<?> getNextConnectionFactory();
}
sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java
New file
@@ -0,0 +1,46 @@
package org.opends.sdk;
import com.sun.opends.sdk.util.Validator;
import com.sun.opends.sdk.util.AbstractFutureResult;
import org.opends.sdk.responses.Responses;
/**
 * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:23:52
 * PM To change this template use File | Settings | File Templates.
 */
public class LoadBalancingConnectionFactory
    extends AbstractConnectionFactory<AsynchronousConnection>
{
  private final LoadBalancingAlgorithm algorithm;
  public LoadBalancingConnectionFactory(LoadBalancingAlgorithm algorithm)
  {
    Validator.ensureNotNull(algorithm);
    this.algorithm = algorithm;
  }
  public FutureResult<? extends AsynchronousConnection>
  getAsynchronousConnection(
      ResultHandler<? super AsynchronousConnection> resultHandler)
  {
    ConnectionFactory<?> factory = algorithm.getNextConnectionFactory();
    if(factory == null)
    {
      AbstractFutureResult<AsynchronousConnection> future =
          new AbstractFutureResult<AsynchronousConnection>(resultHandler)
      {
        public int getRequestID()
        {
          return -1;
        }
      };
      future.handleErrorResult(new ErrorResultException(
          Responses.newResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR).
              setDiagnosticMessage("No connection factories available")));
      return future;
    }
    return factory.getAsynchronousConnection(resultHandler);
  }
}
sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
@@ -32,6 +32,9 @@
import javax.net.ssl.SSLContext;
import org.opends.sdk.schema.Schema;
import org.opends.sdk.requests.SearchRequest;
import org.opends.sdk.requests.Requests;
import org.opends.sdk.SearchScope;
import com.sun.opends.sdk.util.Validator;
@@ -42,12 +45,18 @@
 */
public final class LDAPConnectionOptions
{
  private static final SearchRequest DEFAULT_PING = Requests
      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
                        "1.1");
  private Schema schema = Schema.getDefaultSchema();
  private SSLContext sslContext = null;
  private boolean useStartTLS = false;
  private SearchRequest pingRequest = DEFAULT_PING;
  /**
@@ -197,7 +206,29 @@
    return this;
  }
  /**
   * Retrieves the search request used to verify the validity of a
   * LDAP connection. By default, the root DSE is retrieved without any
   * attributes.
   *
   * @return The search request.
   */
  public SearchRequest getPingRequest()
  {
    return pingRequest;
  }
  /**
   * Sets the search request used to verify the validity of a
   * LDAP connection.
   *
   * @param pingRequest The search request that can be used to verify the
   *                    validity of a LDAP connection.
   */
  public void setPingRequest(SearchRequest pingRequest)
  {
    this.pingRequest = pingRequest;
  }
  // Assigns the provided options to this set of options.
  LDAPConnectionOptions assign(LDAPConnectionOptions options)