From 6992706b3b92b889db4b3b603107a6ee9fd09f17 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 16 Sep 2011 17:36:51 +0000
Subject: [PATCH] Issue OPENDJ-262: Implement pass through authentication (PTA)

---
 opends/src/server/org/opends/server/extensions/LDAPPassThroughAuthenticationPolicyFactory.java |  555 ++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 426 insertions(+), 129 deletions(-)

diff --git a/opends/src/server/org/opends/server/extensions/LDAPPassThroughAuthenticationPolicyFactory.java b/opends/src/server/org/opends/server/extensions/LDAPPassThroughAuthenticationPolicyFactory.java
index 6283310..1b32e4c 100644
--- a/opends/src/server/org/opends/server/extensions/LDAPPassThroughAuthenticationPolicyFactory.java
+++ b/opends/src/server/org/opends/server/extensions/LDAPPassThroughAuthenticationPolicyFactory.java
@@ -37,8 +37,7 @@
 import java.io.IOException;
 import java.net.*;
 import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -49,10 +48,7 @@
 import org.opends.messages.Message;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.server.*;
-import org.opends.server.api.AuthenticationPolicy;
-import org.opends.server.api.AuthenticationPolicyFactory;
-import org.opends.server.api.AuthenticationPolicyState;
-import org.opends.server.api.TrustManagerProvider;
+import org.opends.server.api.*;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugLogger;
@@ -72,11 +68,353 @@
     AuthenticationPolicyFactory<LDAPPassThroughAuthenticationPolicyCfg>
 {
 
-  // TODO: retry operations transparently until all connections exhausted.
   // TODO: handle password policy response controls? AD?
-  // TODO: periodically ping offline servers in order to detect when they come
-  // back.
   // TODO: provide alternative cfg for search password.
+  // TODO: custom aliveness pings
+  // TODO: manage account lockout
+  // TODO: cache password
+
+  /**
+   * A simplistic load-balancer connection factory implementation using
+   * approximately round-robin balancing.
+   */
+  static abstract class AbstractLoadBalancer implements ConnectionFactory,
+      Runnable
+  {
+    /**
+     * A connection which automatically retries operations on other servers.
+     */
+    private final class FailoverConnection implements Connection
+    {
+      private Connection connection;
+      private MonitoredConnectionFactory factory;
+      private final int startIndex;
+      private int nextIndex;
+
+
+
+      private FailoverConnection(final int startIndex)
+          throws DirectoryException
+      {
+        this.startIndex = nextIndex = startIndex;
+
+        DirectoryException lastException = null;
+        do
+        {
+          factory = factories[nextIndex];
+          if (factory.isAvailable())
+          {
+            try
+            {
+              if (factory.isAvailable)
+              {
+                connection = factory.getConnection();
+                incrementNextIndex();
+                return;
+              }
+            }
+            catch (final DirectoryException e)
+            {
+              // Ignore this error and try the next factory.
+              if (debugEnabled())
+              {
+                TRACER.debugCaught(DebugLogLevel.ERROR, e);
+              }
+              lastException = e;
+            }
+          }
+          incrementNextIndex();
+        }
+        while (nextIndex != startIndex);
+
+        // All the factories have been tried so give up and throw the exception.
+        throw lastException;
+      }
+
+
+
+      /**
+       * {@inheritDoc}
+       */
+      @Override
+      public void close()
+      {
+        connection.close();
+      }
+
+
+
+      /**
+       * {@inheritDoc}
+       */
+      @Override
+      public ByteString search(final DN baseDN, final SearchScope scope,
+          final SearchFilter filter) throws DirectoryException
+      {
+        for (;;)
+        {
+          try
+          {
+            return connection.search(baseDN, scope, filter);
+          }
+          catch (final DirectoryException e)
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            }
+            handleDirectoryException(e);
+          }
+        }
+      }
+
+
+
+      /**
+       * {@inheritDoc}
+       */
+      @Override
+      public void simpleBind(final ByteString username,
+          final ByteString password) throws DirectoryException
+      {
+        for (;;)
+        {
+          try
+          {
+            connection.simpleBind(username, password);
+            return;
+          }
+          catch (final DirectoryException e)
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            }
+            handleDirectoryException(e);
+          }
+        }
+      }
+
+
+
+      private void handleDirectoryException(final DirectoryException e)
+          throws DirectoryException
+      {
+        // If the error does not indicate that the connection has failed, then
+        // pass this back to the caller.
+        if (!isFatalResultCode(e.getResultCode()))
+        {
+          throw e;
+        }
+
+        // The associated server is unavailable, so close the connection and
+        // try the next connection factory.
+        connection.close();
+        factory.isAvailable = false;
+
+        while (nextIndex != startIndex)
+        {
+          factory = factories[nextIndex];
+          if (factory.isAvailable())
+          {
+            try
+            {
+              if (factory.isAvailable)
+              {
+                connection = factory.getConnection();
+                incrementNextIndex();
+                return;
+              }
+            }
+            catch (final DirectoryException de)
+            {
+              // Ignore this error and try the next factory.
+              if (debugEnabled())
+              {
+                TRACER.debugCaught(DebugLogLevel.ERROR, de);
+              }
+            }
+          }
+          incrementNextIndex();
+        }
+
+        // All the factories have been tried so give up and throw the exception.
+        throw e;
+      }
+
+
+
+      private void incrementNextIndex()
+      {
+        // Try the next index.
+        if (++nextIndex == maxIndex)
+        {
+          nextIndex = 0;
+        }
+      }
+
+    }
+
+
+
+    /**
+     * A connection factory which caches its online/offline state in order to
+     * avoid unnecessary connection attempts when it is known to be offline.
+     */
+    private final class MonitoredConnectionFactory implements ConnectionFactory
+    {
+      private final ConnectionFactory factory;
+      private volatile boolean isAvailable = true;
+
+
+
+      private MonitoredConnectionFactory(final ConnectionFactory factory)
+      {
+        this.factory = factory;
+      }
+
+
+
+      /**
+       * {@inheritDoc}
+       */
+      @Override
+      public void close()
+      {
+        factory.close();
+      }
+
+
+
+      /**
+       * {@inheritDoc}
+       */
+      @Override
+      public Connection getConnection() throws DirectoryException
+      {
+        try
+        {
+          final Connection connection = factory.getConnection();
+          isAvailable = true;
+          return connection;
+        }
+        catch (final DirectoryException e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
+          isAvailable = false;
+          throw e;
+        }
+      }
+
+
+
+      private boolean isAvailable()
+      {
+        return isAvailable;
+      }
+    }
+
+
+
+    private final MonitoredConnectionFactory[] factories;
+    private final int maxIndex;
+    private final ScheduledFuture<?> monitorFuture;
+
+
+
+    /**
+     * Creates a new abstract load-balancer.
+     *
+     * @param factories
+     *          The list of underlying connection factories.
+     * @param scheduler
+     *          The monitoring scheduler.
+     */
+    AbstractLoadBalancer(final ConnectionFactory[] factories,
+        final ScheduledExecutorService scheduler)
+    {
+      this.factories = new MonitoredConnectionFactory[factories.length];
+      this.maxIndex = factories.length;
+
+      for (int i = 0; i < maxIndex; i++)
+      {
+        this.factories[i] = new MonitoredConnectionFactory(factories[i]);
+      }
+
+      this.monitorFuture = scheduler.scheduleWithFixedDelay(this, 5, 5,
+          TimeUnit.SECONDS);
+    }
+
+
+
+    /**
+     * Close underlying connection pools.
+     */
+    @Override
+    public final void close()
+    {
+      monitorFuture.cancel(true);
+
+      for (final ConnectionFactory factory : factories)
+      {
+        factory.close();
+      }
+    }
+
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public final Connection getConnection() throws DirectoryException
+    {
+      final int startIndex = getStartIndex();
+      return new FailoverConnection(startIndex);
+    }
+
+
+
+    /**
+     * Try to connect to any offline connection factories.
+     */
+    @Override
+    public void run()
+    {
+      for (final MonitoredConnectionFactory factory : factories)
+      {
+        if (!factory.isAvailable())
+        {
+          try
+          {
+            factory.getConnection().close();
+          }
+          catch (final DirectoryException e)
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            }
+          }
+        }
+      }
+    }
+
+
+
+    /**
+     * Return the start which should be used for the next connection attempt.
+     *
+     * @return The start which should be used for the next connection attempt.
+     */
+    abstract int getStartIndex();
+
+  }
+
+
 
   /**
    * A factory which returns pre-authenticated connections for searches.
@@ -245,7 +583,7 @@
    * <p>
    * Package private for testing.
    */
-  static final class ConnectionPool implements ConnectionFactory, Closeable
+  static final class ConnectionPool implements ConnectionFactory
   {
 
     /**
@@ -445,13 +783,8 @@
    * <p>
    * Package private for testing.
    */
-  static final class FailoverConnectionFactory implements ConnectionFactory,
-      Closeable
+  static final class FailoverLoadBalancer extends AbstractLoadBalancer
   {
-    private final ConnectionFactory primary;
-    private final ConnectionFactory secondary;
-
-
 
     /**
      * Creates a new fail-over connection factory which will always try the
@@ -461,27 +794,14 @@
      *          The primary connection factory.
      * @param secondary
      *          The secondary connection factory.
+     * @param scheduler
+     *          The monitoring scheduler.
      */
-    FailoverConnectionFactory(final ConnectionFactory primary,
-        final ConnectionFactory secondary)
+    FailoverLoadBalancer(final ConnectionFactory primary,
+        final ConnectionFactory secondary,
+        final ScheduledExecutorService scheduler)
     {
-      this.primary = primary;
-      this.secondary = secondary;
-    }
-
-
-
-    /**
-     * Close underlying load-balancers.
-     */
-    @Override
-    public void close()
-    {
-      primary.close();
-      if (secondary != null)
-      {
-        secondary.close();
-      }
+      super(new ConnectionFactory[] { primary, secondary }, scheduler);
     }
 
 
@@ -490,24 +810,10 @@
      * {@inheritDoc}
      */
     @Override
-    public Connection getConnection() throws DirectoryException
+    int getStartIndex()
     {
-      if (secondary == null)
-      {
-        // No fail-over so just use the primary.
-        return primary.getConnection();
-      }
-      else
-      {
-        try
-        {
-          return primary.getConnection();
-        }
-        catch (final DirectoryException e)
-        {
-          return secondary.getConnection();
-        }
-      }
+      // Always start with the primaries.
+      return 0;
     }
 
   }
@@ -1116,9 +1422,9 @@
 
   /**
    * An interface for obtaining a connection factory for LDAP connections to a
-   * named LDAP server.
+   * named LDAP server and the monitoring scheduler.
    */
-  static interface LDAPConnectionFactoryProvider
+  static interface Provider
   {
     /**
      * Returns a connection factory which can be used for obtaining connections
@@ -1135,6 +1441,17 @@
      */
     ConnectionFactory getLDAPConnectionFactory(String host, int port,
         LDAPPassThroughAuthenticationPolicyCfg cfg);
+
+
+
+    /**
+     * Returns the scheduler which should be used to periodically ping
+     * connection factories to determine when they are online.
+     *
+     * @return The scheduler which should be used to periodically ping
+     *         connection factories to determine when they are online.
+     */
+    ScheduledExecutorService getScheduledExecutorService();
   }
 
 
@@ -1143,9 +1460,8 @@
    * A simplistic load-balancer connection factory implementation using
    * approximately round-robin balancing.
    */
-  static final class LoadBalancer implements ConnectionFactory, Closeable
+  static final class RoundRobinLoadBalancer extends AbstractLoadBalancer
   {
-    private final ConnectionFactory[] factories;
     private final AtomicInteger nextIndex = new AtomicInteger();
     private final int maxIndex;
 
@@ -1157,67 +1473,23 @@
      *
      * @param factories
      *          The list of underlying connection factories.
+     * @param scheduler
+     *          The monitoring scheduler.
      */
-    LoadBalancer(final ConnectionFactory[] factories)
+    RoundRobinLoadBalancer(final ConnectionFactory[] factories,
+        final ScheduledExecutorService scheduler)
     {
-      this.factories = factories;
+      super(factories, scheduler);
       this.maxIndex = factories.length;
     }
 
 
 
     /**
-     * Close underlying connection pools.
-     */
-    @Override
-    public void close()
-    {
-      for (final ConnectionFactory factory : factories)
-      {
-        factory.close();
-      }
-    }
-
-
-
-    /**
      * {@inheritDoc}
      */
     @Override
-    public Connection getConnection() throws DirectoryException
-    {
-      final int startIndex = getStartIndex();
-      int index = startIndex;
-      for (;;)
-      {
-        final ConnectionFactory factory = factories[index];
-
-        try
-        {
-          return factory.getConnection();
-        }
-        catch (final DirectoryException e)
-        {
-          // Try the next index.
-          if (++index == maxIndex)
-          {
-            index = 0;
-          }
-
-          // If all the factories have been tried then give up and throw the
-          // exception.
-          if (index == startIndex)
-          {
-            throw e;
-          }
-        }
-      }
-    }
-
-
-
-    // Determine the start index.
-    private int getStartIndex()
+    int getStartIndex()
     {
       // A round robin pool of one connection factories is unlikely in
       // practice and requires special treatment.
@@ -1512,8 +1784,8 @@
     // Current configuration.
     private LDAPPassThroughAuthenticationPolicyCfg cfg;
 
-    private FailoverConnectionFactory searchFactory = null;
-    private FailoverConnectionFactory bindFactory = null;
+    private ConnectionFactory searchFactory = null;
+    private ConnectionFactory bindFactory = null;
 
 
 
@@ -1642,8 +1914,10 @@
       // authenticated user.
 
       // Create load-balancers for primary servers.
-      final LoadBalancer primarySearchLoadBalancer;
-      final LoadBalancer primaryBindLoadBalancer;
+      final RoundRobinLoadBalancer primarySearchLoadBalancer;
+      final RoundRobinLoadBalancer primaryBindLoadBalancer;
+      final ScheduledExecutorService scheduler = provider
+          .getScheduledExecutorService();
 
       Set<String> servers = cfg.getPrimaryRemoteLDAPServer();
       ConnectionPool[] searchPool = new ConnectionPool[servers.size()];
@@ -1658,18 +1932,16 @@
                 cfg.getMappedSearchBindPassword()));
         bindPool[index++] = new ConnectionPool(factory);
       }
-      primarySearchLoadBalancer = new LoadBalancer(searchPool);
-      primaryBindLoadBalancer = new LoadBalancer(bindPool);
+      primarySearchLoadBalancer = new RoundRobinLoadBalancer(searchPool,
+          scheduler);
+      primaryBindLoadBalancer = new RoundRobinLoadBalancer(bindPool, scheduler);
 
       // Create load-balancers for secondary servers.
-      final LoadBalancer secondarySearchLoadBalancer;
-      final LoadBalancer secondaryBindLoadBalancer;
-
       servers = cfg.getSecondaryRemoteLDAPServer();
       if (servers.isEmpty())
       {
-        secondarySearchLoadBalancer = null;
-        secondaryBindLoadBalancer = null;
+        searchFactory = primarySearchLoadBalancer;
+        bindFactory = primaryBindLoadBalancer;
       }
       else
       {
@@ -1685,14 +1957,15 @@
                   cfg.getMappedSearchBindPassword()));
           bindPool[index++] = new ConnectionPool(factory);
         }
-        secondarySearchLoadBalancer = new LoadBalancer(searchPool);
-        secondaryBindLoadBalancer = new LoadBalancer(bindPool);
+        final RoundRobinLoadBalancer secondarySearchLoadBalancer =
+          new RoundRobinLoadBalancer(searchPool, scheduler);
+        final RoundRobinLoadBalancer secondaryBindLoadBalancer =
+          new RoundRobinLoadBalancer(bindPool, scheduler);
+        searchFactory = new FailoverLoadBalancer(primarySearchLoadBalancer,
+            secondarySearchLoadBalancer, scheduler);
+        bindFactory = new FailoverLoadBalancer(primaryBindLoadBalancer,
+            secondaryBindLoadBalancer, scheduler);
       }
-
-      searchFactory = new FailoverConnectionFactory(primarySearchLoadBalancer,
-          secondarySearchLoadBalancer);
-      bindFactory = new FailoverConnectionFactory(primaryBindLoadBalancer,
-          secondaryBindLoadBalancer);
     }
 
 
@@ -1725,15 +1998,32 @@
   }
 
   // The provider which should be used by policies to create LDAP connections.
-  private final LDAPConnectionFactoryProvider provider;
+  private final Provider provider;
 
   /**
    * The default LDAP connection factory provider.
    */
-  private static final LDAPConnectionFactoryProvider DEFAULT_PROVIDER =
-    new LDAPConnectionFactoryProvider()
+  private static final Provider DEFAULT_PROVIDER = new Provider()
   {
 
+    // Global scheduler used for periodically monitoring connection factories in
+    // order to detect when they are online.
+    private final ScheduledExecutorService scheduler = Executors
+        .newScheduledThreadPool(2, new ThreadFactory()
+        {
+
+          @Override
+          public Thread newThread(final Runnable r)
+          {
+            final Thread t = new DirectoryThread(r,
+                "LDAP PTA connection monitor thread");
+            t.setDaemon(true);
+            return t;
+          }
+        });
+
+
+
     @Override
     public ConnectionFactory getLDAPConnectionFactory(final String host,
         final int port, final LDAPPassThroughAuthenticationPolicyCfg cfg)
@@ -1741,6 +2031,14 @@
       return new LDAPConnectionFactory(host, port, cfg);
     }
 
+
+
+    @Override
+    public ScheduledExecutorService getScheduledExecutorService()
+    {
+      return scheduler;
+    }
+
   };
 
 
@@ -1843,8 +2141,7 @@
    *          The LDAP connection factory provider implementation which LDAP PTA
    *          authentication policies will use.
    */
-  LDAPPassThroughAuthenticationPolicyFactory(
-      final LDAPConnectionFactoryProvider provider)
+  LDAPPassThroughAuthenticationPolicyFactory(final Provider provider)
   {
     this.provider = provider;
   }

--
Gitblit v1.10.0