From 9ff69f265dae8f0647fb9c204fda070eafe25613 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 13 Jun 2012 21:05:11 +0000
Subject: [PATCH] Fix OPENDJ-520: Worker threads are too greedy when caching memory used for encoding/decoding entries and protocol messages

---
 opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java |  147 +++++++++++++++++++++++++-----------------------
 1 files changed, 76 insertions(+), 71 deletions(-)

diff --git a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index 3af5e14..8a460d4 100644
--- a/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -31,6 +31,7 @@
 
 import static org.opends.messages.CoreMessages.ERR_ENQUEUE_BIND_IN_PROGRESS;
 import static org.opends.messages.ProtocolMessages.*;
+import static org.opends.server.core.DirectoryServer.getMaxInternalBufferSize;
 import static org.opends.server.loggers.AccessLogger.logDisconnect;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
@@ -48,11 +49,9 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
@@ -79,10 +78,10 @@
  * connection handler and have its requests decoded by an LDAP request
  * handler.
  */
-
-public class LDAPClientConnection extends ClientConnection implements
+public final class LDAPClientConnection extends ClientConnection implements
     TLSCapableConnection
 {
+
   /**
    * A runnable whose task is to close down all IO related channels
    * associated with a client connection after a small delay.
@@ -194,8 +193,8 @@
       if (selector == null)
       {
         // The client connection does not provide a selector, so we'll
-        // fall back
-        // to a more inefficient way that will work without a selector.
+        // fall back to a more inefficient way that will work without a
+        // selector.
         while (byteBuffer.hasRemaining()
                && (System.currentTimeMillis() < stopTime))
         {
@@ -278,6 +277,48 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
+  /**
+   * Thread local ASN1Writer and buffer.
+   */
+  private static final class ASN1WriterHolder
+  {
+    private final ASN1Writer writer;
+    private final ByteStringBuilder buffer;
+    private final int maxBufferSize;
+
+    private ASN1WriterHolder()
+    {
+      this.buffer = new ByteStringBuilder();
+      this.maxBufferSize = getMaxInternalBufferSize();
+      this.writer = ASN1.getWriter(buffer, maxBufferSize);
+    }
+  }
+
+  // Cached ASN1 writer: a thread can only write to one connection at a time.
+  private static final ThreadLocal<ASN1WriterHolder> ASN1_WRITER_CACHE =
+      new ThreadLocal<ASN1WriterHolder>()
+  {
+    /**
+     * {@inheritDoc}
+     */
+    protected ASN1WriterHolder initialValue()
+    {
+      return new ASN1WriterHolder();
+    }
+  };
+
+  private ASN1WriterHolder getASN1Writer()
+  {
+    ASN1WriterHolder holder = ASN1_WRITER_CACHE.get();
+    if (holder.maxBufferSize != getMaxInternalBufferSize())
+    {
+      // Setting has changed, so recreate the holder.
+      holder = new ASN1WriterHolder();
+      ASN1_WRITER_CACHE.set(holder);
+    }
+    return holder;
+  }
+
   // The time that the last operation was completed.
   private final AtomicLong lastCompletionTime;
 
@@ -321,10 +362,6 @@
   // connection.
   private final LDAPConnectionHandler connectionHandler;
 
-  // The reference to the request handler with which this connection is
-  // associated.
-  private final LDAPRequestHandler requestHandler;
-
   // The statistics tracker associated with this client connection.
   private final LDAPStatistics statTracker;
   private boolean useNanoTime=false;
@@ -354,17 +391,12 @@
   // client has connected.
   private final String serverAddress;
 
-  // Holds the mapping between the worker thread and ASN1
-  // writer to allow for parallel, non-blocking encoding.
-  private Map<Thread,ASN1Writer> asn1WriterMap;
+
 
   private ASN1ByteChannelReader asn1Reader;
-
   private final int bufferSize;
-
   private final RedirectingByteChannel saslChannel;
   private final RedirectingByteChannel tlsChannel;
-  private final ReentrantLock writeLock;
   private volatile ConnectionSecurityProvider activeProvider = null;
   private volatile ConnectionSecurityProvider tlsPendingProvider = null;
   private volatile ConnectionSecurityProvider saslPendingProvider = null;
@@ -381,7 +413,7 @@
    * @param  protocol String representing the protocol (LDAP or LDAP+SSL).
    * @throws DirectoryException If SSL initialisation fails.
    */
-  public LDAPClientConnection(LDAPConnectionHandler connectionHandler,
+  LDAPClientConnection(LDAPConnectionHandler connectionHandler,
       SocketChannel clientChannel, String protocol) throws DirectoryException
   {
     this.connectionHandler = connectionHandler;
@@ -394,7 +426,6 @@
     timeoutClientChannel = new TimeoutWriteByteChannel();
     opsInProgressLock = new Object();
     ldapVersion = 3;
-    requestHandler = null;
     lastCompletionTime = new AtomicLong(TimeThread.getTime());
     nextOperationID = new AtomicLong(0);
     connectionValid = true;
@@ -430,9 +461,6 @@
     this.asn1Reader =
         ASN1.getReader(saslChannel, bufferSize, connectionHandler
             .getMaxRequestSize());
-    writeLock = new ReentrantLock();
-
-    asn1WriterMap = new ConcurrentHashMap<Thread,ASN1Writer>();
 
     if (connectionHandler.useSSL())
     {
@@ -477,21 +505,6 @@
 
 
   /**
-   * Retrieves the request handler that will read requests for this
-   * client connection.
-   *
-   * @return The request handler that will read requests for this client
-   *         connection, or <CODE>null</CODE> if none has been assigned
-   *         yet.
-   */
-  public LDAPRequestHandler getRequestHandler()
-  {
-    return requestHandler;
-  }
-
-
-
-  /**
    * Retrieves the socket channel that can be used to communicate with
    * the client.
    *
@@ -632,20 +645,6 @@
 
 
   /**
-   * Retrieves the next operation ID that should be used for this
-   * connection.
-   *
-   * @return The next operation ID that should be used for this
-   *         connection.
-   */
-  public long nextOperationID()
-  {
-    return nextOperationID.getAndIncrement();
-  }
-
-
-
-  /**
    * Sends a response to the client based on the information in the
    * provided operation.
    *
@@ -930,22 +929,14 @@
    * @param message
    *          The LDAP message to send to the client.
    */
-  public void sendLDAPMessage(LDAPMessage message)
+  private void sendLDAPMessage(LDAPMessage message)
   {
-    // Get the writer used by this thread.
-    Thread currentThread = Thread.currentThread();
-    ASN1Writer asn1Writer = asn1WriterMap.get(currentThread);
+    // Use a thread local writer.
+    final ASN1WriterHolder holder = getASN1Writer();
     try
     {
-      if (asn1Writer == null)
-      {
-        asn1Writer = ASN1.getWriter(saslChannel, writeLock,
-                  bufferSize);
-        asn1WriterMap.put(currentThread, asn1Writer);
-      }
-
-      message.write(asn1Writer);
-      asn1Writer.flush();
+      message.write(holder.writer);
+      holder.buffer.copyTo(saslChannel);
 
       if (debugEnabled())
       {
@@ -971,7 +962,21 @@
       disconnect(DisconnectReason.SERVER_ERROR, false, null);
       return;
     }
-  }
+    finally
+    {
+      // Clear and reset all of the internal buffers ready for the next usage.
+      try
+      {
+        // The ASN1Writer is based on a ByteStringBuilder so closing will cause
+        // the internal buffers to be resized if needed.
+        holder.writer.close();
+      }
+      catch (IOException ignored)
+      {
+        // Unreachable.
+      }
+    }
+ }
 
 
 
@@ -1187,7 +1192,7 @@
    *           client already has reached the maximum allowed concurrent
    *           requests).
    */
-  public void addOperationInProgress(AbstractOperation operation)
+  private void addOperationInProgress(AbstractOperation operation)
       throws DirectoryException
   {
     int messageID = operation.getMessageID();
@@ -1539,7 +1544,7 @@
    *
    * @return the ASN1 reader for this connection
    */
-  protected ASN1ByteChannelReader getASN1Reader()
+  ASN1ByteChannelReader getASN1Reader()
   {
     return asn1Reader;
   }
@@ -1552,7 +1557,7 @@
    * @return number of bytes read if this connection is still valid
    *         or negative integer to indicate an error otherwise
    */
-  public int processDataRead()
+  int processDataRead()
   {
     if (bindOrStartTLSInProgress.get())
     {
@@ -1617,7 +1622,7 @@
    *         error and the client has been disconnected as a result, or
    *         if the client unbound from the server.
    */
-  public boolean processLDAPMessage(LDAPMessage message)
+  boolean processLDAPMessage(LDAPMessage message)
   {
     if (keepStats)
     {
@@ -2580,7 +2585,7 @@
   /**
    * Enable the provider that is inactive.
    */
-  public void enableTLS()
+  private void enableTLS()
   {
     activeProvider = tlsPendingProvider;
     tlsChannel.redirect(tlsPendingProvider);
@@ -2595,7 +2600,7 @@
    * @param sslProvider
    *          The provider to set the security provider to.
    */
-  public void enableSSL(ConnectionSecurityProvider sslProvider)
+  private void enableSSL(ConnectionSecurityProvider sslProvider)
   {
     activeProvider = sslProvider;
     tlsChannel.redirect(sslProvider);
@@ -2606,7 +2611,7 @@
   /**
    * Enable the SASL provider that is currently inactive or pending.
    */
-  public void enableSASL()
+  private void enableSASL()
   {
     activeProvider = saslPendingProvider;
     saslChannel.redirect(saslPendingProvider);

--
Gitblit v1.10.0