From 1fbc9df5d0c44ae72c76dacc3c945d4f0d641ee6 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Tue, 22 Dec 2009 05:55:54 +0000
Subject: [PATCH] Initial implementation of fail over connection factory

---
 sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java |  234 +++++++++++++++++++++++++++++-----------------------------
 1 files changed, 116 insertions(+), 118 deletions(-)

diff --git a/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java b/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
index 5d91e23..9e9f8f1 100644
--- a/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
+++ b/sdk/src/com/sun/opends/sdk/ldap/LDAPConnection.java
@@ -37,9 +37,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLContext;
@@ -61,7 +59,7 @@
 import com.sun.grizzly.ssl.*;
 import com.sun.grizzly.streams.StreamWriter;
 import com.sun.opends.sdk.util.Validator;
-
+import com.sun.opends.sdk.util.StaticUtils;
 
 
 /**
@@ -666,6 +664,8 @@
 
   private final Object writeLock = new Object();
 
+  private final SearchRequest pingRequest;
+
   /**
    * Creates a new LDAP connection.
    *
@@ -676,18 +676,22 @@
    * @param schema
    *          The schema which will be used to decode responses from the
    *          server.
+   * @param pingRequest
+   *          The search request to use to verify the validity of
+   *          this connection.
    * @param connFactory
    *          The associated connection factory.
    */
   LDAPConnection(com.sun.grizzly.Connection<?> connection,
       InetSocketAddress serverAddress, Schema schema,
-      LDAPConnectionFactoryImpl connFactory)
+      SearchRequest pingRequest, LDAPConnectionFactoryImpl connFactory)
   {
     this.connection = connection;
     this.serverAddress = serverAddress;
     this.schema = schema;
     this.connFactory = connFactory;
     this.streamWriter = getFilterChainStreamWriter();
+    this.pingRequest = pingRequest;
   }
 
 
@@ -809,16 +813,7 @@
       NullPointerException
   {
     Validator.ensureNotNull(listener);
-
-    synchronized (writeLock)
-    {
-      if (isClosed)
-      {
-        throw new IllegalStateException();
-      }
-
-      listeners.add(listener);
-    }
+    listeners.add(listener);
   }
 
 
@@ -1258,11 +1253,7 @@
       ConnectionEventListener listener) throws NullPointerException
   {
     Validator.ensureNotNull(listener);
-
-    synchronized (writeLock)
-    {
-      listeners.remove(listener);
-    }
+    listeners.remove(listener);
   }
 
 
@@ -1354,13 +1345,17 @@
 
 
   private void close(UnbindRequest unbindRequest,
-      boolean isDisconnectNotification, Result reason)
+                     boolean isDisconnectNotification, Result reason)
   {
+    boolean notifyClose = false;
+    boolean notifyErrorOccurred = false;
+
     synchronized (writeLock)
     {
-      boolean notifyClose = false;
-      boolean notifyErrorOccurred = false;
-
+      if(reason == null && unbindRequest == null)
+      {
+        System.out.println("Something is wrong!");
+      }
       if (isClosed)
       {
         // Already closed.
@@ -1381,112 +1376,104 @@
       if (connectionInvalidReason != null)
       {
         // Already invalid.
-        if (notifyClose)
-        {
-          // TODO: uncomment if close notification is required.
-          // for (ConnectionEventListener listener : listeners)
-          // {
-          // listener.connectionClosed(this);
-          // }
-        }
         return;
       }
 
-      // First abort all outstanding requests.
-      for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
-          .values())
-      {
-        if (pendingBindOrStartTLS <= 0)
-        {
-          ASN1StreamWriter asn1Writer = connFactory
-              .getASN1Writer(streamWriter);
-          int messageID = nextMsgID.getAndIncrement();
-          AbandonRequest abandon = Requests.newAbandonRequest(future
-              .getRequestID());
-          try
-          {
-            LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
-                abandon);
-            asn1Writer.flush();
-          }
-          catch (IOException e)
-          {
-            // Underlying channel probably blown up. Just ignore.
-          }
-          finally
-          {
-            connFactory.releaseASN1Writer(asn1Writer);
-          }
-        }
+      // Mark the connection as invalid.
+      connectionInvalidReason = reason;
+    }
 
-        future.adaptErrorResult(reason);
-      }
-      pendingRequests.clear();
-
-      // Now try cleanly closing the connection if possible.
-      try
+    // First abort all outstanding requests.
+    for (AbstractLDAPFutureResultImpl<?> future : pendingRequests
+        .values())
+    {
+      if (pendingBindOrStartTLS <= 0)
       {
         ASN1StreamWriter asn1Writer = connFactory
             .getASN1Writer(streamWriter);
-        if (unbindRequest == null)
-        {
-          unbindRequest = Requests.newUnbindRequest();
-        }
-
+        int messageID = nextMsgID.getAndIncrement();
+        AbandonRequest abandon = Requests.newAbandonRequest(future
+            .getRequestID());
         try
         {
-          LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
-              .getAndIncrement(), unbindRequest);
+          LDAPEncoder.encodeAbandonRequest(asn1Writer, messageID,
+                                           abandon);
           asn1Writer.flush();
         }
+        catch (IOException e)
+        {
+          // Underlying channel probably blown up. Just ignore.
+        }
         finally
         {
           connFactory.releaseASN1Writer(asn1Writer);
         }
       }
-      catch (IOException e)
+
+      future.adaptErrorResult(reason);
+    }
+    pendingRequests.clear();
+
+    // Now try cleanly closing the connection if possible.
+    try
+    {
+      ASN1StreamWriter asn1Writer = connFactory
+          .getASN1Writer(streamWriter);
+      if (unbindRequest == null)
       {
-        // Underlying channel prob blown up. Just ignore.
+        unbindRequest = Requests.newUnbindRequest();
       }
 
       try
       {
-        streamWriter.close();
+        LDAPEncoder.encodeUnbindRequest(asn1Writer, nextMsgID
+            .getAndIncrement(), unbindRequest);
+        asn1Writer.flush();
       }
-      catch (IOException e)
+      finally
       {
-        // Ignore.
+        connFactory.releaseASN1Writer(asn1Writer);
       }
+    }
+    catch (IOException e)
+    {
+      // Underlying channel prob blown up. Just ignore.
+    }
 
-      try
-      {
-        connection.close();
-      }
-      catch (IOException e)
-      {
-        // Ignore.
-      }
+    try
+    {
+      streamWriter.close();
+    }
+    catch (IOException e)
+    {
+      // Ignore.
+    }
 
-      // Mark the connection as invalid.
-      connectionInvalidReason = reason;
+    try
+    {
+      connection.close();
+    }
+    catch (IOException e)
+    {
+      // Ignore.
+    }
 
-      // Notify listeners.
-      if (notifyClose)
-      {
-        // TODO: uncomment if close notification is required.
-        // for (ConnectionEventListener listener : listeners)
-        // {
-        // listener.connectionClosed(this);
-        // }
-      }
+    // Notify listeners.
+    if (notifyClose)
+    {
+      // TODO: uncomment if close notification is required.
+      // for (ConnectionEventListener listener : listeners)
+      // {
+      // listener.connectionClosed(this);
+      // }
+    }
 
-      if (notifyErrorOccurred)
+    if (notifyErrorOccurred)
+    {
+      for (ConnectionEventListener listener : listeners)
       {
-        for (ConnectionEventListener listener : listeners)
-        {
-          listener.connectionErrorOccurred(isDisconnectNotification,
-              ErrorResultException.wrap(reason));
-        }
+        listener.connectionErrorOccurred(isDisconnectNotification,
+                                         ErrorResultException.wrap(reason));
       }
     }
   }
@@ -1506,27 +1493,38 @@
    */
   public boolean isClosed()
   {
-    synchronized (writeLock)
-    {
-      return isClosed;
-    }
+    return isClosed;
   }
 
 
 
-  //
-  //
-  //
-  // /**
-  // * {@inheritDoc}
-  // */
-  // public boolean isValid() throws InterruptedException
-  // {
-  // synchronized (writeLock)
-  // {
-  // return connectionInvalidReason == null;
-  // }
-  // }
+  /**
+   * {@inheritDoc}
+   */
+  public boolean isValid()
+  {
+    if(!isClosed && connectionInvalidReason == null)
+    {
+      try
+      {
+        search(pingRequest, null, null).get(5, TimeUnit.SECONDS);
+        return true;
+      }
+      catch (ErrorResultException e)
+      {
+        StaticUtils.DEBUG_LOG.warning("Validity check returned error: " +
+                                      e.getResult());
+      }
+      catch (TimeoutException e)
+      {
+        StaticUtils.DEBUG_LOG.warning("Validity check timed out");
+      }
+      catch (InterruptedException e)
+      {
+      }
+    }
+    return false;
+  }
   //
   //
   //

--
Gitblit v1.10.0