From 2bbfdcf54303286798114aee270531f65c9c3a7c Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Fri, 09 Oct 2009 01:20:46 +0000
Subject: [PATCH] - CMT improvements to parallel request and response processing.

---
 opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java |  195 +++++++++++++++++++++---------------------------
 1 files changed, 86 insertions(+), 109 deletions(-)

diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index b14755b..f808e41 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -181,10 +181,7 @@
 
   // The number of operations performed on this connection.
   // Used to compare with the resource limits of the network group.
-  private long operationsPerformed;
-
-  // Lock on the number of operations
-  private final Object operationsPerformedLock;
+  private final AtomicLong operationsPerformed;
 
   // The port on the client from which this connection originated.
   private final int clientPort;
@@ -216,10 +213,6 @@
   // in progress.
   private final Object opsInProgressLock;
 
-  // The lock used to provide threadsafe access when sending data to the
-  // client.
-  private final Object transmitLock;
-
   // The socket channel with which this client connection is associated.
   private final SocketChannel clientChannel;
 
@@ -236,9 +229,7 @@
 
   private ASN1ByteChannelReader asn1Reader;
 
-  private ASN1Writer asn1Writer;
-
-  private static int APPLICATION_BUFFER_SIZE = 4096;
+  private int APPLICATION_BUFFER_SIZE = 4096;
 
   private final RedirectingByteChannel saslChannel;
   private final RedirectingByteChannel tlsChannel;
@@ -246,6 +237,28 @@
   private volatile ConnectionSecurityProvider tlsPendingProvider = null;
   private volatile ConnectionSecurityProvider saslPendingProvider = null;
 
+  /**
+   * Thread local storage to provide each worker thread with its
+   * own ASN1Writer to allow for parallel, non-blocking encoding.
+   */
+  private final ThreadLocal<ASN1Writer> threadLocalASN1Writer =
+          new ThreadLocal<ASN1Writer>()
+  {
+    @Override
+    protected ASN1Writer initialValue()
+    {
+      if (isSecure())
+      {
+        int appBufSize = activeProvider.getAppBufSize();
+        return ASN1.getWriter(saslChannel, appBufSize);
+      }
+      else
+      {
+        return ASN1.getWriter(saslChannel,
+          APPLICATION_BUFFER_SIZE);
+      }
+    }
+  };
 
   /**
    * Creates a new LDAP client connection with the provided information.
@@ -270,7 +283,6 @@
 
     this.clientChannel = clientChannel;
     opsInProgressLock = new Object();
-    transmitLock = new Object();
     ldapVersion = 3;
     requestHandler = null;
     lastCompletionTime = new AtomicLong(TimeThread.getTime());
@@ -278,8 +290,7 @@
     connectionValid = true;
     disconnectRequested = false;
     operationsInProgress = new ConcurrentHashMap<Integer, Operation>();
-    operationsPerformed = 0;
-    operationsPerformedLock = new Object();
+    operationsPerformed = new AtomicLong(0);
     keepStats = connectionHandler.keepStats();
     this.protocol = protocol;
     writeSelector = new AtomicReference<Selector>();
@@ -299,6 +310,8 @@
       this.useNanoTime=DirectoryServer.getUseNanoTime();
     }
 
+    APPLICATION_BUFFER_SIZE = connectionHandler.getBufferSize();
+
     tlsChannel =
         RedirectingByteChannel.getRedirectingByteChannel(clientChannel);
     saslChannel =
@@ -794,42 +807,24 @@
    */
   public void sendLDAPMessage(LDAPMessage message)
   {
-    // Get the buffer used by this thread.
+    // Get the writer used by this thread.
+    ASN1Writer asn1Writer = threadLocalASN1Writer.get();
     try
     {
-      // Make sure that we can only send one message at a time. This
-      // locking will not have any impact on the ability to read
-      // requests from the client.
-      synchronized (transmitLock)
+      message.write(asn1Writer);
+      asn1Writer.flush();
+
+      if (debugEnabled())
       {
-        if (asn1Writer == null)
-        {
-          if (isSecure())
-          {
-            int appBufSize = activeProvider.getAppBufSize();
-            asn1Writer = ASN1.getWriter(saslChannel, appBufSize);
-          }
-          else
-          {
-            asn1Writer =
-                ASN1.getWriter(saslChannel, APPLICATION_BUFFER_SIZE);
-          }
-        }
+        TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
+          message.toString());
+      }
 
-        message.write(asn1Writer);
-        asn1Writer.flush();
-        if(debugEnabled())
-        {
-          TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
-              message.toString());
-        }
-
-        if (keepStats)
-        {
-          // TODO hard-coded for now, flush probably needs to
-          // return how many bytes were flushed.
-          statTracker.updateMessageWritten(message, 4096);
-        }
+      if (keepStats)
+      {
+        // TODO hard-coded for now, flush probably needs to
+        // return how many bytes were flushed.
+        statTracker.updateMessageWritten(message, 4096);
       }
     }
     catch (Exception e)
@@ -1078,9 +1073,12 @@
               message);
         }
 
+        // Add the operation to the list of operations in progress for
+        // this connection.
+        Operation op = operationsInProgress.putIfAbsent(messageID, operation);
+
         // See if there is already an operation in progress with the
         // same message ID. If so, then we can't allow it.
-        Operation op = operationsInProgress.get(messageID);
         if (op != null)
         {
           Message message =
@@ -1089,10 +1087,6 @@
               message);
         }
 
-        // Add the operation to the list of operations in progress for
-        // this connection.
-        operationsInProgress.put(messageID, operation);
-
         // Try to add the operation to the work queue,
         // or run it synchronously (typically for the administration
         // connector)
@@ -1399,12 +1393,19 @@
   @Override
   public long getNumberOfOperations()
   {
-    long tmpNumberOfOperations;
-    synchronized (operationsPerformedLock)
-    {
-      tmpNumberOfOperations = operationsPerformed;
-    }
-    return tmpNumberOfOperations;
+    return operationsPerformed.get();
+  }
+
+
+
+  /**
+   * Returns the ASN1 reader for this connection.
+   *
+   * @return the ASN1 reader for this connection
+   */
+  protected ASN1ByteChannelReader getASN1Reader()
+  {
+    return asn1Reader;
   }
 
 
@@ -1412,61 +1413,40 @@
   /**
    * Process data read.
    *
-   * @return {@code true} if this connection is still valid.
+   * @return number of bytes read if this connection is still valid
+   *         or negative integer to indicate an error otherwise
    */
-  public boolean processDataRead()
+  public int processDataRead()
   {
-    while (true)
+    if (bindOrStartTLSInProgress.get())
     {
-      if(bindOrStartTLSInProgress.get())
-      {
-        // We should wait for the bind or startTLS to finish before
-        // reading any more data off the socket.
-        return true;
-      }
+      // We should wait for the bind or startTLS to finish before
+      // reading any more data off the socket.
+      return 0;
+    }
 
-      try
+    try
+    {
+      int result = asn1Reader.processChannelData();
+      if (result < 0)
       {
-        int result = asn1Reader.processChannelData();
-        if (result < 0)
-        {
-          // The connection has been closed by the client. Disconnect
-          // and return.
-          disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
-          return false;
-        }
-        if (result == 0)
-        {
-          // We have read all the data that there is to read right now
-          // (or there wasn't any in the first place). Just return and
-          // wait for future notification.
-          return true;
-        }
-        else
-        {
-          // Decode all complete elements from the last read
-          while (asn1Reader.elementAvailable())
-          {
-            if (!processLDAPMessage(LDAPReader.readMessage(asn1Reader)))
-            {
-              // Fatal connection error - client disconnected.
-              return false;
-            }
-          }
-        }
+        // The connection has been closed by the client. Disconnect
+        // and return.
+        disconnect(DisconnectReason.CLIENT_DISCONNECT, false, null);
+        return -1;
       }
-      catch (Exception e)
+      return result;
+    }
+    catch (Exception e)
+    {
+      if (debugEnabled())
       {
-        if (debugEnabled())
-        {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        }
-        Message m =
-            ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String
-                .valueOf(e));
-        disconnect(DisconnectReason.PROTOCOL_ERROR, false, m);
-        return false;
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
       }
+      Message m =
+        ERR_LDAP_CLIENT_DECODE_LDAP_MESSAGE_FAILED.get(String.valueOf(e));
+      disconnect(DisconnectReason.PROTOCOL_ERROR, true, m);
+      return -1;
     }
   }
 
@@ -1485,17 +1465,14 @@
    *         error and the client has been disconnected as a result, or
    *         if the client unbound from the server.
    */
-  private boolean processLDAPMessage(LDAPMessage message)
+  public boolean processLDAPMessage(LDAPMessage message)
   {
     if (keepStats)
     {
       statTracker.updateMessageRead(message);
       this.getNetworkGroup().updateMessageRead(message);
     }
-    synchronized (operationsPerformedLock)
-    {
-      operationsPerformed++;
-    }
+    operationsPerformed.getAndIncrement();
 
     List<Control> opControls = message.getControls();
 

--
Gitblit v1.10.0