From 2bbfdcf54303286798114aee270531f65c9c3a7c Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 01:20:46 +0000
Subject: [PATCH] - CMT improvements to parallel request and response processing.

---
 opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java |  145 +++++++++++++++++++++++++++++++----------------
 1 files changed, 95 insertions(+), 50 deletions(-)

diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
index 0301ec4..528679e 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPRequestHandler.java
@@ -40,15 +40,16 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+
 import java.util.LinkedList;
 import java.util.List;
-
 import org.opends.messages.Message;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.ServerShutdownListener;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.ErrorLogger;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.protocols.asn1.ASN1ByteChannelReader;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DisconnectReason;
 import org.opends.server.types.InitializationException;
@@ -71,16 +72,6 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-
-
-
-  /**
-   * The buffer size in bytes to use when reading data from a client.
-   */
-  public static final int BUFFER_SIZE = 8192;
-
-
-
   // Indicates whether the Directory Server is in the process of shutting down.
   private volatile boolean shutdownRequested = false;
 
@@ -89,12 +80,17 @@
 
   // The queue that will be used to hold the set of pending connections that
   // need to be registered with the selector.
+  // TODO: revisit, see Issue 4202.
   private List<LDAPClientConnection> pendingConnections =
     new LinkedList<LDAPClientConnection>();
 
   // Lock object for synchronizing access to the pending connections queue.
   private final Object pendingConnectionsLock = new Object();
 
+  // The list of connections ready for request processing.
+  private LinkedList<LDAPClientConnection> readyConnections =
+    new LinkedList<LDAPClientConnection>();
+
   // The selector that will be used to monitor the client connections.
   private final Selector selector;
 
@@ -180,6 +176,89 @@
     // loop, check for new requests, then check for new connections.
     while (!shutdownRequested)
     {
+      LDAPClientConnection readyConnection = null;
+      while ((readyConnection = readyConnections.poll()) != null)
+      {
+        try
+        {
+          ASN1ByteChannelReader asn1Reader = readyConnection.getASN1Reader();
+          boolean ldapMessageProcessed = false;
+          while (true)
+          {
+            if (asn1Reader.elementAvailable())
+            {
+              if (!ldapMessageProcessed)
+              {
+                if (readyConnection.processLDAPMessage(
+                    LDAPReader.readMessage(asn1Reader)))
+                {
+                  ldapMessageProcessed = true;
+                }
+                else
+                {
+                  break;
+                }
+              }
+              else
+              {
+                readyConnections.add(readyConnection);
+                break;
+              }
+            }
+            else
+            {
+              if (readyConnection.processDataRead() <= 0)
+              {
+                break;
+              }
+            }
+          }
+        }
+        catch (Exception e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
+        }
+      }
+
+      // Check to see if we have any pending connections that need to be
+      // registered with the selector.
+      List<LDAPClientConnection> tmp = null;
+      synchronized (pendingConnectionsLock)
+      {
+        if (!pendingConnections.isEmpty())
+        {
+          tmp = pendingConnections;
+          pendingConnections = new LinkedList<LDAPClientConnection>();
+        }
+      }
+
+      if (tmp != null)
+      {
+        for (LDAPClientConnection c : tmp)
+        {
+          try
+          {
+            SocketChannel socketChannel = c.getSocketChannel();
+            socketChannel.configureBlocking(false);
+            socketChannel.register(selector, SelectionKey.OP_READ, c);
+          }
+          catch (Exception e)
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            }
+
+            c.disconnect(DisconnectReason.SERVER_ERROR, true,
+                ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName,
+                    String.valueOf(e)));
+          }
+        }
+      }
+
       // Create a copy of the selection keys which can be used in a
       // thread-safe manner by getClientConnections. This copy is only
       // updated once per loop, so may not be accurate.
@@ -226,10 +305,14 @@
 
                 try
                 {
-                  if (! clientConnection.processDataRead())
+                  int readResult = clientConnection.processDataRead();
+                  if (readResult < 0)
                   {
                     key.cancel();
                   }
+                  if (readResult > 0) {
+                    readyConnections.add(clientConnection);
+                  }
                 }
                 catch (Exception e)
                 {
@@ -307,43 +390,6 @@
           }
         }
       }
-
-
-      // Check to see if we have any pending connections that need to be
-      // registered with the selector.
-      List<LDAPClientConnection> tmp = null;
-      synchronized (pendingConnectionsLock)
-      {
-        if (!pendingConnections.isEmpty())
-        {
-          tmp = pendingConnections;
-          pendingConnections = new LinkedList<LDAPClientConnection>();
-        }
-      }
-
-      if (tmp != null)
-      {
-        for (LDAPClientConnection c : tmp)
-        {
-          try
-          {
-            SocketChannel socketChannel = c.getSocketChannel();
-            socketChannel.configureBlocking(false);
-            socketChannel.register(selector, SelectionKey.OP_READ, c);
-          }
-          catch (Exception e)
-          {
-            if (debugEnabled())
-            {
-              TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            }
-
-            c.disconnect(DisconnectReason.SERVER_ERROR, true,
-                ERR_LDAP_REQHANDLER_CANNOT_REGISTER.get(handlerName,
-                    String.valueOf(e)));
-          }
-        }
-      }
     }
 
     // Disconnect all active connections.
@@ -439,7 +485,6 @@
       return false;
     }
 
-
     // Try to add the new connection to the queue.  If it succeeds, then wake
     // up the selector so it will be picked up right away.  Otherwise,
     // disconnect the client.

--
Gitblit v1.10.0