From d7ab33028a671b16464920ecd0149065639adf3e Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 05:55:54 +0000
Subject: [PATCH] Initial implementation of fail over connection factory

---
 opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java                  |  234 +++++++-------
 opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java           |   46 ++
 opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java           |   26 +
 opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java                   |   16 +
 opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java |    7 
 opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java               |   31 +
 opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java           |  119 +++++++
 opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java                           |  308 +++++++++---------
 opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java                   |   10 
 opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java       |    2 
 opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java               |  109 +-----
 11 files changed, 555 insertions(+), 353 deletions(-)

diff --git a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
index 5d91e23..9e9f8f1 100644
--- a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
+++ b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -37,9 +37,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLContext;
@@ -61,7 +59,7 @@
 import com.sun.grizzly.ssl.*;
 import com.sun.grizzly.streams.StreamWriter;
 import com.sun.opends.sdk.util.Validator;
-
+import com.sun.opends.sdk.util.StaticUtils;
 
 
 /**
@@ -666,6 +664,8 @@
 
   private final Object writeLock = new Object();
 
+  private final SearchRequest pingRequest;
+
   /**
    * Creates a new LDAP connection.
    *
@@ -676,18 +676,22 @@
    * @param schema
    *          The schema which will be used to decode responses from the
    *          server.
+   * @param pingRequest
+   *          The search request to use to verify the validity of
+   *          this connection.
    * @param connFactory
    *          The associated connection factory.
    */
   LDAPConnection(com.sun.grizzly.Connection<?> connection,
       InetSocketAddress serverAddress, Schema schema,
-      LDAPConnectionFactoryImpl connFactory)
+      SearchRequest pingRequest, LDAPConnectionFactoryImpl connFactory)
   {
     this.connection = connection;
     this.serverAddress = serverAddress;
     this.schema = schema;
     this.connFactory = connFactory;
     this.streamWriter = getFilterChainStreamWriter();
+    this.pingRequest = pingRequest;
   }
 
 
@@ -809,16 +813,7 @@
       NullPointerException
   {
     Validator.ensureNotNull(listener);
-
-    synchronized (writeLock)
-    {
-      if (isClosed)
-      {
-        throw new IllegalStateException();
-      }
-
-      listeners.add(listener);
-    }
+    listeners.add(listener);
   }
 
 
@@ -1258,11 +1253,7 @@
       ConnectionEventListener listener) throws NullPointerException
   {
     Validator.ensureNotNull(listener);
-
-    synchronized (writeLock)
-    {
-      listeners.remove(listener);
-    }
+    listeners.remove(listener);
   }
 
 
@@ -1354,13 +1345,17 @@
 
 
   private void close(UnbindRequest unbindRequest,
-      boolean isDisconnectNotification, Result reason)
+                     boolean isDisconnectNotification, Result reason)
   {
+    boolean notifyClose = false;
+    boolean notifyErrorOccurred = false;
+
     synchronized (writeLock)
     {
-      boolean notifyClose = false;
-      boolean notifyErrorOccurred = false;
-
+      if(reason == null && unbindRequest == null)
+      {
+        System.out.println("Something is wrong!");
+      }
       if (isClosed)
       {
         // Already closed.
@@ -1381,112 +1376,104 @@
       if (connectionInvalidReason != null)
       {
         // Already invalid.
-        if (notifyClose)
-        {
-          // TODO: uncomment if close notification is required.
-          // for (ConnectionEventListener listener : listeners)
-          // {
-          // listener.connectionClosed(this);
-          // }
-        }
         return;
       }
 
-      // First abort all outstanding requests.
-      for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
-          .values())
-      {
-        if (pendingBindOrStartTLS <= 0)
-        {
-          ASN1StreamWriter asn1Writer = connFactory
-              .getASN1Writer(streamWriter);
-          int messageID = nextMsgID.getAndIncrement();
-          AbandonRequest abandon = Requests.newAbandonRequest(future
-              .getRequestID());
-          try
-          {
-            LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
-                abandon);
-            asn1Writer.flush();
-          }
-          catch (IOException e)
-          {
-            // Underlying channel probably blown up. Just ignore.
-          }
-          finally
-          {
-            connFactory.releaseASN1Writer(asn1Writer);
-          }
-        }
+      // Mark the connection as invalid.
+      connectionInvalidReason = reason;
+    }
 
-        future.adaptErrorResult(reason);
-      }
-      pendingRequests.clear();
-
-      // Now try cleanly closing the connection if possible.
-      try
+    // First abort all outstanding requests.
+    for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
+        .values())
+    {
+      if (pendingBindOrStartTLS <= 0)
       {
         ASN1StreamWriter asn1Writer = connFactory
             .getASN1Writer(streamWriter);
-        if (unbindRequest == null)
-        {
-          unbindRequest = Requests.newUnbindRequest();
-        }
-
+        int messageID = nextMsgID.getAndIncrement();
+        AbandonRequest abandon = Requests.newAbandonRequest(future
+            .getRequestID());
         try
         {
-          LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
-              .getAndIncrement(), unbindRequest);
+          LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
+                                           abandon);
           asn1Writer.flush();
         }
+        catch (IOException e)
+        {
+          // Underlying channel probably blown up. Just ignore.
+        }
         finally
         {
           connFactory.releaseASN1Writer(asn1Writer);
         }
       }
-      catch (IOException e)
+
+      future.adaptErrorResult(reason);
+    }
+    pendingRequests.clear();
+
+    // Now try cleanly closing the connection if possible.
+    try
+    {
+      ASN1StreamWriter asn1Writer = connFactory
+          .getASN1Writer(streamWriter);
+      if (unbindRequest == null)
       {
-        // Underlying channel prob blown up. Just ignore.
+        unbindRequest = Requests.newUnbindRequest();
       }
 
       try
       {
-        streamWriter.close();
+        LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
+            .getAndIncrement(), unbindRequest);
+        asn1Writer.flush();
       }
-      catch (IOException e)
+      finally
       {
-        // Ignore.
+        connFactory.releaseASN1Writer(asn1Writer);
       }
+    }
+    catch (IOException e)
+    {
+      // Underlying channel prob blown up. Just ignore.
+    }
 
-      try
-      {
-        connection.close();
-      }
-      catch (IOException e)
-      {
-        // Ignore.
-      }
+    try
+    {
+      streamWriter.close();
+    }
+    catch (IOException e)
+    {
+      // Ignore.
+    }
 
-      // Mark the connection as invalid.
-      connectionInvalidReason = reason;
+    try
+    {
+      connection.close();
+    }
+    catch (IOException e)
+    {
+      // Ignore.
+    }
 
-      // Notify listeners.
-      if (notifyClose)
-      {
-        // TODO: uncomment if close notification is required.
-        // for (ConnectionEventListener listener : listeners)
-        // {
-        // listener.connectionClosed(this);
-        // }
-      }
+    // Notify listeners.
+    if (notifyClose)
+    {
+      // TODO: uncomment if close notification is required.
+      // for (ConnectionEventListener listener : listeners)
+      // {
+      // listener.connectionClosed(this);
+      // }
+    }
 
-      if (notifyErrorOccurred)
+    if (notifyErrorOccurred)
+    {
+      for (ConnectionEventListener listener : listeners)
       {
-        for (ConnectionEventListener listener : listeners)
-        {
-          listener.connectionErrorOccurred(isDisconnectNotification,
-              ErrorResultException.wrap(reason));
-        }
+        listener.connectionErrorOccurred(isDisconnectNotification,
+                                         ErrorResultException.wrap(reason));
       }
     }
   }
@@ -1506,27 +1493,38 @@
    */
   public boolean isClosed()
   {
-    synchronized (writeLock)
-    {
-      return isClosed;
-    }
+    return isClosed;
   }
 
 
 
-  //
-  //
-  //
-  // /**
-  // * {@inheritDoc}
-  // */
-  // public boolean isValid() throws InterruptedException
-  // {
-  // synchronized (writeLock)
-  // {
-  // return connectionInvalidReason == null;
-  // }
-  // }
+  /**
+   * {@inheritDoc}
+   */
+  public boolean isValid()
+  {
+    if(!isClosed && connectionInvalidReason == null)
+    {
+      try
+      {
+        search(pingRequest, null, null).get(5, TimeUnit.SECONDS);
+        return true;
+      }
+      catch (ErrorResultException e)
+      {
+        StaticUtils.DEBUG_LOG.warning("Validity check returned error: " +
+                                      e.getResult());
+      }
+      catch (TimeoutException e)
+      {
+        StaticUtils.DEBUG_LOG.warning("Validity check timed out");
+      }
+      catch (InterruptedException e)
+      {
+      }
+    }
+    return false;
+  }
   //
   //
   //
diff --git a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
index d56a8fb..f815431 100644
--- a/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
+++ b/opendj-sdk/sdk/src/com/sun/opends/sdk/ldap/LDAPConnectionFactoryImpl.java
@@ -460,7 +460,7 @@
         .getDefaultFilterChainFactory().getFilterChainPattern());
 
     LDAPConnection ldapConnection = new LDAPConnection(connection,
-        socketAddress, options.getSchema(), this);
+        socketAddress, options.getSchema(), options.getPingRequest(), this);
     ldapConnectionAttr.set(connection, ldapConnection);
     return ldapConnection;
   }
diff --git a/opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java b/opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java
index 2971d6b..82675de 100644
--- a/opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java
+++ b/opendj-sdk/sdk/src/com/sun/opends/sdk/tools/AuthenticatedConnectionFactory.java
@@ -309,6 +309,13 @@
 
 
 
+    public boolean isValid()
+    {
+      return connection.isValid();
+    }
+
+
+
     public void close()
     {
       connection.close();
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
new file mode 100644
index 0000000..d5676dd
--- /dev/null
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AbstractLoadBalancingAlgorithm.java
@@ -0,0 +1,119 @@
+package org.opends.sdk;
+
+import com.sun.opends.sdk.util.Validator;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:49:17
+ * PM To change this template use File | Settings | File Templates.
+ */
+public abstract class AbstractLoadBalancingAlgorithm
+    implements LoadBalancingAlgorithm
+{
+  protected final List<MonitoredConnectionFactory> factoryList;
+
+  protected AbstractLoadBalancingAlgorithm(ConnectionFactory<?>... factories)
+  {
+    Validator.ensureNotNull(factories);
+    factoryList = new ArrayList<MonitoredConnectionFactory>(factories.length);
+    for(ConnectionFactory<?> f : factories)
+    {
+      factoryList.add(new MonitoredConnectionFactory(f));
+    }
+
+    new MonitorThread().start();
+  }
+
+  protected class MonitoredConnectionFactory
+      extends AbstractConnectionFactory<AsynchronousConnection>
+      implements ResultHandler<AsynchronousConnection>
+  {
+    private final ConnectionFactory<?> factory;
+    private volatile boolean isOperational;
+    private volatile FutureResult<?> pendingConnectFuture;
+
+    private MonitoredConnectionFactory(ConnectionFactory<?> factory)
+    {
+      this.factory = factory;
+      this.isOperational = true;
+    }
+
+    public boolean isOperational()
+    {
+      return isOperational;
+    }
+
+    public void handleErrorResult(ErrorResultException error)
+    {
+      isOperational = false;
+    }
+
+    public void handleResult(AsynchronousConnection result)
+    {
+      isOperational = true;
+      // TODO: Notify the server is back up
+      result.close();
+    }
+
+    public FutureResult<? extends AsynchronousConnection>
+      getAsynchronousConnection(
+        final ResultHandler<? super AsynchronousConnection> resultHandler)
+    {
+      ResultHandler handler = new ResultHandler<AsynchronousConnection>()
+      {
+        public void handleErrorResult(ErrorResultException error)
+        {
+          isOperational = false;
+          if(resultHandler != null)
+          {
+            resultHandler.handleErrorResult(error);
+          }
+        }
+
+        public void handleResult(AsynchronousConnection result)
+        {
+          isOperational = true;
+          if(resultHandler != null)
+          {
+            resultHandler.handleResult(result);
+          }
+        }
+      };
+      return factory.getAsynchronousConnection(handler);
+    }
+  }
+
+  private class MonitorThread extends Thread
+  {
+    private MonitorThread()
+    {
+      super("Connection Factory Health Monitor");
+      this.setDaemon(true);
+    }
+
+    public void run()
+    {
+      while(true)
+      {
+        for(MonitoredConnectionFactory f : factoryList)
+        {
+          if(!f.isOperational && (f.pendingConnectFuture == null ||
+                                  f.pendingConnectFuture.isDone()))
+          {
+            f.pendingConnectFuture = f.factory.getAsynchronousConnection(f);
+          }
+        }
+        try
+        {
+          sleep(10000);
+        }
+        catch (InterruptedException e)
+        {
+          // Ignore and just go around again...
+        }
+      }
+    }
+  }
+}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java b/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
index 00d2ba0..ecbc74d 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/AsynchronousConnection.java
@@ -31,6 +31,7 @@
 
 import java.io.Closeable;
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 
 import org.opends.sdk.requests.*;
 import org.opends.sdk.responses.BindResult;
@@ -377,6 +378,21 @@
 
 
   /**
+   * Returns true if the connection has not been closed and is still valid.
+   * The implementation shall submit a search on the connection or use some
+   * other mechanism that positively verifies the connection is still valid
+   * when this method is called.
+   *
+   * The query submitted by the driver to validate the connection shall be
+   * executed in the authentication context.
+   *
+   * @return {@code true} if the connection is valid, {@code false} otherwise.
+   */
+  boolean isValid();
+
+
+  
+  /**
    * Modifies an entry in the Directory Server using the provided modify
    * request.
    *
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
index dc22fae..def5ef1 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/ConnectionPool.java
@@ -31,7 +31,9 @@
 
 import java.util.Collection;
 import java.util.Stack;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 
 import org.opends.sdk.requests.*;
@@ -58,40 +60,17 @@
   private final int poolSize;
 
   // FIXME: should use a better collection than this - CLQ?
-  private final Stack<AsynchronousConnection> pool;
+  private final Queue<AsynchronousConnection> pool;
 
   private final ConcurrentLinkedQueue<FuturePooledConnection> pendingFutures;
 
-  private final Object lock = new Object();
-
-
-
-  private final class FutureNewConnection
-      extends
-      FutureResultTransformer<AsynchronousConnection, AsynchronousConnection>
-  {
-    private FutureNewConnection(
-        ResultHandler<? super AsynchronousConnection> handler)
-    {
-      super(handler);
-    }
-
-
-
-    protected AsynchronousConnection transformResult(
-        AsynchronousConnection result) throws ErrorResultException
-    {
-      return new PooledConnectionWapper(result);
-    }
-  }
-
 
 
   private final class PooledConnectionWapper implements
       AsynchronousConnection, ConnectionEventListener
   {
-    private AsynchronousConnection connection;
-
+    private final AsynchronousConnection connection;
+    private volatile boolean isClosed;
 
 
     private PooledConnectionWapper(AsynchronousConnection connection)
@@ -106,7 +85,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -120,7 +99,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -134,7 +113,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -145,29 +124,26 @@
 
     public void close()
     {
-      synchronized (lock)
+      synchronized (pool)
       {
-        try
+        if(isClosed)
         {
-          // Don't put closed connections back in the pool.
-          if (connection.isClosed())
-          {
-            if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-            {
-              StaticUtils.DEBUG_LOG
-                  .finest(String
-                      .format(
-                          "Dead connection released to pool. "
-                              + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                          numConnections, pool.size(), pendingFutures
-                              .size()));
-            }
-            return;
-          }
+          return;
+        }
+        isClosed = true;
 
+        // Don't put closed connections back in the pool.
+        if (!connection.isValid())
+        {
+          numConnections--;
+        }
+        else
+        {
           // See if there waiters pending.
           for (;;)
           {
+            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+                connection);
             FuturePooledConnection future = pendingFutures.poll();
 
             if (future == null)
@@ -176,8 +152,6 @@
               break;
             }
 
-            PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-                connection);
             future.handleResult(pooledConnection);
 
             if (!future.isCancelled())
@@ -189,34 +163,45 @@
                 StaticUtils.DEBUG_LOG
                     .finest(String
                         .format(
-                            "Connection released to pool and directly "
-                                + "given to waiter. numConnections: %d, poolSize: %d, "
-                                + "pendingFutures: %d", numConnections,
-                            pool.size(), pendingFutures.size()));
+                        "Connection released to pool and directly "
+                        + "given to waiter. numConnections: %d, poolSize: %d, "
+                        + "pendingFutures: %d", numConnections,
+                        pool.size(), pendingFutures.size()));
               }
               return;
             }
           }
 
           // No waiters. Put back in pool.
-          pool.push(connection);
+          pool.offer(connection);
           if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
           {
             StaticUtils.DEBUG_LOG
                 .finest(String
                     .format(
-                        "Connection released to pool. "
-                            + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                        numConnections, pool.size(), pendingFutures
-                            .size()));
+                    "Connection released to pool and directly "
+                    + "given to waiter. numConnections: %d, poolSize: %d, "
+                    + "pendingFutures: %d", numConnections,
+                    pool.size(), pendingFutures.size()));
           }
-        }
-        finally
-        {
-          // Null out the underlying connection to prevent further use.
-          connection = null;
+          return;
         }
       }
+
+      // Connection is no longer valid. Close outside of lock
+      connection.removeConnectionEventListener(this);
+      connection.close();
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG
+            .finest(String
+                .format(
+                "Dead connection released to pool. "
+                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                numConnections, pool.size(), pendingFutures
+                    .size()));
+      }
+      return;
     }
 
 
@@ -234,7 +219,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -248,7 +233,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -262,7 +247,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -276,7 +261,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -290,7 +275,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -305,7 +290,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -324,7 +309,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -343,7 +328,7 @@
         throws UnsupportedOperationException, IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -359,7 +344,7 @@
         ResultHandler<RootDSE> handler)
         throws UnsupportedOperationException, IllegalStateException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -375,7 +360,7 @@
         ResultHandler<Schema> handler)
         throws UnsupportedOperationException, IllegalStateException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -391,7 +376,7 @@
         ResultHandler<Schema> handler)
         throws UnsupportedOperationException, IllegalStateException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -404,7 +389,7 @@
         ConnectionEventListener listener) throws IllegalStateException,
         NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -415,7 +400,7 @@
     public void removeConnectionEventListener(
         ConnectionEventListener listener) throws NullPointerException
     {
-      if (connection == null)
+      if (isClosed())
       {
         throw new IllegalStateException();
       }
@@ -428,10 +413,13 @@
      */
     public boolean isClosed()
     {
-      return connection == null;
+      return isClosed;
     }
 
-
+    public boolean isValid()
+    {
+      return !isClosed && connection.isValid();
+    }
 
     public void connectionReceivedUnsolicitedNotification(
         GenericExtendedResult notification)
@@ -444,10 +432,10 @@
     public void connectionErrorOccurred(
         boolean isDisconnectNotification, ErrorResultException error)
     {
-      synchronized (lock)
+      // Remove this connection from the pool if its in there. If not, just
+      // ignore and wait for the user to close and we can deal with it there.
+      if(pool.remove(this))
       {
-        // Remove this connection from the pool if its in there
-        pool.remove(this);
         numConnections--;
         connection.removeConnectionEventListener(this);
 
@@ -460,11 +448,11 @@
           StaticUtils.DEBUG_LOG
               .finest(String
                   .format(
-                      "Connection error occured: "
-                          + error.getMessage()
-                          + " numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
+                  "Connection error occured: "
+                  + error.getMessage()
+                  + " numConnections: %d, poolSize: %d, pendingFutures: %d",
+                  numConnections, pool.size(), pendingFutures
+                      .size()));
         }
       }
     }
@@ -510,86 +498,100 @@
   {
     this.connectionFactory = connectionFactory;
     this.poolSize = poolSize;
-    this.pool = new Stack<AsynchronousConnection>();
+    this.pool = new ConcurrentLinkedQueue<AsynchronousConnection>();
     this.pendingFutures = new ConcurrentLinkedQueue<FuturePooledConnection>();
   }
 
 
 
-  public FutureResult<AsynchronousConnection> getAsynchronousConnection(
-      ResultHandler<? super AsynchronousConnection> handler)
+  public synchronized FutureResult<AsynchronousConnection>
+  getAsynchronousConnection(ResultHandler<? super AsynchronousConnection> handler)
   {
-    synchronized (lock)
+    // This entire method is synchronized to ensure new connects are done
+    // synchronously to avoid the "pending connect" case.
+    AsynchronousConnection conn;
+    synchronized(pool)
     {
       // Check to see if we have a connection in the pool
-
-      if (!pool.isEmpty())
+      conn = pool.poll();
+      if (conn == null)
       {
-        AsynchronousConnection conn = pool.pop();
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+        // Pool was empty. Maybe a new connection if pool size is not
+        // reached
+        if (numConnections >= poolSize)
         {
-          StaticUtils.DEBUG_LOG
-              .finest(String
-                  .format(
-                      "Connection aquired from pool. "
-                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
+          // We reached max # of conns so wait for a connection to become available.
+          FuturePooledConnection future = new FuturePooledConnection(
+              handler);
+          pendingFutures.add(future);
+
+          if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+          {
+            StaticUtils.DEBUG_LOG
+                .finest(String
+                    .format(
+                    "No connections available. Wait-listed"
+                    + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                    numConnections, pool.size(), pendingFutures
+                        .size()));
+          }
+
+          return future;
         }
-        PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
-            conn);
-        if (handler != null)
-        {
-          handler.handleResult(pooledConnection);
-        }
-        return new CompletedFutureResult<AsynchronousConnection>(
-            pooledConnection);
-      }
-
-      // Pool was empty. Maybe a new connection if pool size is not
-      // reached
-      if (numConnections < poolSize)
-      {
-        // We can create a new connection.
-        numConnections++;
-
-        FutureNewConnection future = new FutureNewConnection(handler);
-        future.setFutureResult(connectionFactory
-            .getAsynchronousConnection(future));
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG
-              .finest(String
-                  .format(
-                      "New connection established and aquired. "
-                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
-        }
-
-        return future;
-      }
-      else
-      {
-        // Pool is full so wait for a connection to become available.
-        FuturePooledConnection future = new FuturePooledConnection(
-            handler);
-        pendingFutures.add(future);
-
-        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
-        {
-          StaticUtils.DEBUG_LOG
-              .finest(String
-                  .format(
-                      "No connections available. Wait-listed"
-                          + "numConnections: %d, poolSize: %d, pendingFutures: %d",
-                      numConnections, pool.size(), pendingFutures
-                          .size()));
-        }
-
-        return future;
       }
     }
+
+    if(conn == null)
+    {
+      try
+      {
+        // We can create a new connection.
+        conn = connectionFactory.getAsynchronousConnection(handler).get();
+        numConnections++;
+        if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+        {
+          StaticUtils.DEBUG_LOG
+              .finest(String
+                  .format(
+                  "New connection established and aquired. "
+                  + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                  numConnections, pool.size(), pendingFutures
+                      .size()));
+        }
+      }
+      catch (ErrorResultException e)
+      {
+        return new CompletedFutureResult<AsynchronousConnection>(e);
+      }
+      catch (InterruptedException e)
+      {
+        return new CompletedFutureResult<AsynchronousConnection>(
+            new ErrorResultException(Responses.newResult(
+                ResultCode.CLIENT_SIDE_LOCAL_ERROR).setCause(e)));
+      }
+    }
+    else
+    {
+      if (StaticUtils.DEBUG_LOG.isLoggable(Level.FINE))
+      {
+        StaticUtils.DEBUG_LOG
+            .finest(String
+                .format(
+                "Connection aquired from pool. "
+                + "numConnections: %d, poolSize: %d, pendingFutures: %d",
+                numConnections, pool.size(), pendingFutures
+                    .size()));
+      }
+    }
+
+    PooledConnectionWapper pooledConnection = new PooledConnectionWapper(
+        conn);
+    if (handler != null)
+    {
+      handler.handleResult(pooledConnection);
+    }
+    return new CompletedFutureResult<AsynchronousConnection>(
+        pooledConnection);
+
   }
 }
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
new file mode 100644
index 0000000..cb5c309
--- /dev/null
+++ b/opendj-sdk/sdk/src/org/opends/sdk/FailoverLoadBalancingAlgorithm.java
@@ -0,0 +1,26 @@
+package org.opends.sdk;
+
+/**
+ * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 5:42:01
+ * PM To change this template use File | Settings | File Templates.
+ */
+public class FailoverLoadBalancingAlgorithm
+    extends AbstractLoadBalancingAlgorithm
+{
+  public FailoverLoadBalancingAlgorithm(ConnectionFactory<?>... factories)
+  {
+    super(factories);
+  }
+
+  public ConnectionFactory<?> getNextConnectionFactory()
+  {
+    for(MonitoredConnectionFactory f : factoryList)
+    {
+      if(f.isOperational())
+      {
+        return f;
+      }
+    }
+    return null;
+  }
+}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java b/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
index 285633c..3d426cd 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/HeartBeatConnectionFactory.java
@@ -32,13 +32,13 @@
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import org.opends.sdk.requests.*;
 import org.opends.sdk.responses.*;
 import org.opends.sdk.schema.Schema;
 
 import com.sun.opends.sdk.util.FutureResultTransformer;
+import com.sun.opends.sdk.util.Validator;
 
 
 
@@ -46,47 +46,20 @@
  * An heart beat connection factory can be used to create connections
  * that sends a periodic search request to a Directory Server.
  */
-final class HeartBeatConnectionFactory extends
+public class HeartBeatConnectionFactory extends
     AbstractConnectionFactory<AsynchronousConnection>
 {
-  private final SearchRequest heartBeat;
-
-  private final long interval;
+  private final int interval;
 
   private final List<AsynchronousConnectionImpl> activeConnections;
 
   private final ConnectionFactory<?> parentFactory;
 
-  private volatile boolean stopRequested;
-
 
 
   // FIXME: use a single global scheduler?
 
-  /**
-   * Creates a new heart-beat connection factory which will create
-   * connections using the provided connection factory and periodically
-   * ping any created connections in order to detect that they are still
-   * alive.
-   *
-   * @param connectionFactory
-   *          The connection factory to use for creating connections.
-   * @param timeout
-   *          The time to wait between keepalive pings.
-   * @param unit
-   *          The time unit of the timeout argument.
-   */
-  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
-      long timeout, TimeUnit unit)
-  {
-    this(connectionFactory, timeout, unit, DEFAULT_SEARCH);
-  }
-
-
-
-  private static final SearchRequest DEFAULT_SEARCH = Requests
-      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
-          "1.1");
+  // FIXME: change timeout parameters to long+TimeUnit.
 
 
 
@@ -98,30 +71,18 @@
    *
    * @param connectionFactory
    *          The connection factory to use for creating connections.
-   * @param timeout
-   *          The time to wait between keepalive pings.
-   * @param unit
-   *          The time unit of the timeout argument.
-   * @param heartBeat
-   *          The search request to use when pinging connections.
+   * @param interval
+   *          The period between keepalive pings.
    */
-  HeartBeatConnectionFactory(ConnectionFactory<?> connectionFactory,
-      long timeout, TimeUnit unit, SearchRequest heartBeat)
+  public HeartBeatConnectionFactory(
+      ConnectionFactory<?> connectionFactory,
+      int interval)
   {
-    this.heartBeat = heartBeat;
-    this.interval = unit.toMillis(timeout);
+    Validator.ensureNotNull(connectionFactory);
+    this.interval = interval;
     this.activeConnections = new LinkedList<AsynchronousConnectionImpl>();
     this.parentFactory = connectionFactory;
 
-    Runtime.getRuntime().addShutdownHook(new Thread()
-    {
-      @Override
-      public void run()
-      {
-        stopRequested = true;
-      }
-    });
-
     new HeartBeatThread().start();
   }
 
@@ -132,13 +93,11 @@
    * operations.
    */
   private final class AsynchronousConnectionImpl implements
-      AsynchronousConnection, ConnectionEventListener,
-      ResultHandler<Result>
+      AsynchronousConnection, ConnectionEventListener
   {
     private final AsynchronousConnection connection;
 
 
-
     private AsynchronousConnectionImpl(AsynchronousConnection connection)
     {
       this.connection = connection;
@@ -352,7 +311,15 @@
       return connection.isClosed();
     }
 
-
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isValid()
+    {
+      // Avoid extra pings... Let the next ping find out if this connection
+      // is still valid.
+      return connection.isValid();
+    }
 
     public void connectionReceivedUnsolicitedNotification(
         GenericExtendedResult notification)
@@ -371,31 +338,6 @@
         activeConnections.remove(this);
       }
     }
-
-
-
-    public void handleErrorResult(ErrorResultException error)
-    {
-      // TODO: I18N
-      if (error instanceof TimeoutResultException)
-      {
-        close(Requests.newUnbindRequest(), "Heart beat timed out");
-      }
-    }
-
-
-
-    public void handleResult(Result result)
-    {
-      // Do nothing
-    }
-
-
-
-    private void sendHeartBeat()
-    {
-      search(heartBeat, this, null);
-    }
   }
 
 
@@ -405,19 +347,24 @@
     private HeartBeatThread()
     {
       super("Heart Beat Thread");
+      this.setDaemon(true);
     }
 
 
 
     public void run()
     {
-      while (!stopRequested)
+      while(true)
       {
         synchronized (activeConnections)
         {
           for (AsynchronousConnectionImpl connection : activeConnections)
           {
-            connection.sendHeartBeat();
+            if(!connection.isValid())
+            {
+              connection.close(Requests.newUnbindRequest(),
+                               "Connection no longer valid");
+            }
           }
         }
         try
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java b/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java
new file mode 100644
index 0000000..f11cc7c
--- /dev/null
+++ b/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingAlgorithm.java
@@ -0,0 +1,10 @@
+package org.opends.sdk;
+
+/**
+ * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:37:03
+ * PM To change this template use File | Settings | File Templates.
+ */
+public interface LoadBalancingAlgorithm
+{
+  public ConnectionFactory<?> getNextConnectionFactory();
+}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java b/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java
new file mode 100644
index 0000000..d03a624
--- /dev/null
+++ b/opendj-sdk/sdk/src/org/opends/sdk/LoadBalancingConnectionFactory.java
@@ -0,0 +1,46 @@
+package org.opends.sdk;
+
+import com.sun.opends.sdk.util.Validator;
+import com.sun.opends.sdk.util.AbstractFutureResult;
+
+import org.opends.sdk.responses.Responses;
+
+/**
+ * Created by IntelliJ IDEA. User: digitalperk Date: Dec 15, 2009 Time: 3:23:52
+ * PM To change this template use File | Settings | File Templates.
+ */
+public class LoadBalancingConnectionFactory
+    extends AbstractConnectionFactory<AsynchronousConnection>
+{
+  private final LoadBalancingAlgorithm algorithm;
+
+  public LoadBalancingConnectionFactory(LoadBalancingAlgorithm algorithm)
+  {
+    Validator.ensureNotNull(algorithm);
+    this.algorithm = algorithm;
+  }
+
+  public FutureResult<? extends AsynchronousConnection>
+  getAsynchronousConnection(
+      ResultHandler<? super AsynchronousConnection> resultHandler)
+  {
+    ConnectionFactory<?> factory = algorithm.getNextConnectionFactory();
+    if(factory == null)
+    {
+      AbstractFutureResult<AsynchronousConnection> future =
+          new AbstractFutureResult<AsynchronousConnection>(resultHandler)
+      {
+        public int getRequestID()
+        {
+          return -1;
+        }
+      };
+      future.handleErrorResult(new ErrorResultException(
+          Responses.newResult(ResultCode.CLIENT_SIDE_CONNECT_ERROR).
+              setDiagnosticMessage("No connection factories available")));
+      return future;
+    }
+
+    return factory.getAsynchronousConnection(resultHandler);
+  }
+}
diff --git a/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java b/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
index 1267b96..323a27d 100644
--- a/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
+++ b/opendj-sdk/sdk/src/org/opends/sdk/ldap/LDAPConnectionOptions.java
@@ -32,6 +32,9 @@
 import javax.net.ssl.SSLContext;
 
 import org.opends.sdk.schema.Schema;
+import org.opends.sdk.requests.SearchRequest;
+import org.opends.sdk.requests.Requests;
+import org.opends.sdk.SearchScope;
 
 import com.sun.opends.sdk.util.Validator;
 
@@ -42,12 +45,18 @@
  */
 public final class LDAPConnectionOptions
 {
+  private static final SearchRequest DEFAULT_PING = Requests
+      .newSearchRequest("", SearchScope.BASE_OBJECT, "(objectClass=*)",
+                        "1.1");
+
   private Schema schema = Schema.getDefaultSchema();
 
   private SSLContext sslContext = null;
 
   private boolean useStartTLS = false;
 
+  private SearchRequest pingRequest = DEFAULT_PING;
+
 
 
   /**
@@ -197,7 +206,29 @@
     return this;
   }
 
+  /**
+   * Retrieves the search request used to verify the validity of a
+   * LDAP connection. By default, the root DSE is retrieved without any
+   * attributes.
+   *
+   * @return The search request.
+   */
+  public SearchRequest getPingRequest()
+  {
+    return pingRequest;
+  }
 
+  /**
+   * Sets the search request used to verify the validity of a
+   * LDAP connection.
+   *
+   * @param pingRequest The search request that can be used to verify the
+   *                    validity of a LDAP connection.
+   */
+  public void setPingRequest(SearchRequest pingRequest)
+  {
+    this.pingRequest = pingRequest;
+  }
 
   // Assigns the provided options to this set of options.
   LDAPConnectionOptions assign(LDAPConnectionOptions options)

--
Gitblit v1.10.0