From 9ff69f265dae8f0647fb9c204fda070eafe25613 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 13 Jun 2012 21:05:11 +0000
Subject: [PATCH] Fix OPENDJ-520: Worker threads are too greedy when caching memory used for encoding/decoding entries and protocol messages

---
 opends/src/server/org/opends/server/backends/jeb/ID2Entry.java |  168 ++++++++++++++++++++++++++++++++++---------------------
 1 files changed, 104 insertions(+), 64 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java b/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java
index 0a7c242..b90f833 100644
--- a/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java
+++ b/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java
@@ -23,11 +23,14 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2012 ForgeRock AS.
  */
 package org.opends.server.backends.jeb;
 import org.opends.messages.Message;
 
+import static org.opends.server.core.DirectoryServer.getMaxInternalBufferSize;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+
 import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.messages.JebMessages.*;
 
@@ -61,30 +64,83 @@
    */
   private DataConfig dataConfig;
 
-  private static ThreadLocal<EntryCoder> entryCodingBuffers =
-      new ThreadLocal<EntryCoder>();
+  /**
+   * Cached encoding buffers.
+   */
+  private static final ThreadLocal<EntryCodec> ENTRY_CODEC_CACHE =
+      new ThreadLocal<EntryCodec>()
+  {
+    protected EntryCodec initialValue()
+    {
+      return new EntryCodec();
+    };
+  };
+
+  private static EntryCodec acquireEntryCodec()
+  {
+    EntryCodec codec = ENTRY_CODEC_CACHE.get();
+    if (codec.maxBufferSize != getMaxInternalBufferSize())
+    {
+      // Setting has changed, so recreate the codec.
+      codec = new EntryCodec();
+      ENTRY_CODEC_CACHE.set(codec);
+    }
+    return codec;
+  }
 
   /**
    * A cached set of ByteStringBuilder buffers and ASN1Writer used to encode
    * entries.
    */
-  private static class EntryCoder
+  private static class EntryCodec
   {
-    ByteStringBuilder encodedBuffer;
-    private ByteStringBuilder entryBuffer;
-    private ByteStringBuilder compressedEntryBuffer;
-    private ASN1Writer writer;
+    private static final int BUFFER_INIT_SIZE = 512;
 
-    private EntryCoder()
+    private final ByteStringBuilder encodedBuffer = new ByteStringBuilder();
+    private final ByteStringBuilder entryBuffer = new ByteStringBuilder();
+    private final ByteStringBuilder compressedEntryBuffer =
+        new ByteStringBuilder();
+    private final ASN1Writer writer;
+    private final int maxBufferSize;
+
+    private EntryCodec()
     {
-      encodedBuffer = new ByteStringBuilder();
-      entryBuffer = new ByteStringBuilder();
-      compressedEntryBuffer = new ByteStringBuilder();
-      writer = ASN1.getWriter(encodedBuffer);
+      this.maxBufferSize = getMaxInternalBufferSize();
+      this.writer = ASN1.getWriter(encodedBuffer, maxBufferSize);
+    }
+
+    private void release()
+    {
+      try
+      {
+        writer.close(); // Clears encodedBuffer as well.
+      }
+      catch (Exception ignored)
+      {
+        // Unreachable.
+      }
+
+      if (entryBuffer.capacity() < maxBufferSize)
+      {
+        entryBuffer.clear();
+      }
+      else
+      {
+        entryBuffer.clear(BUFFER_INIT_SIZE);
+      }
+
+      if (compressedEntryBuffer.capacity() < maxBufferSize)
+      {
+        compressedEntryBuffer.clear();
+      }
+      else
+      {
+        compressedEntryBuffer.clear(BUFFER_INIT_SIZE);
+      }
     }
 
     private Entry decode(ByteString bytes, CompressedSchema compressedSchema)
-        throws DirectoryException,ASN1Exception,LDAPException,
+        throws DirectoryException, ASN1Exception, LDAPException,
         DataFormatException, IOException
     {
       // Get the format version.
@@ -102,14 +158,8 @@
 
       // See if it was compressed.
       int uncompressedSize = (int)reader.readInteger();
-
       if(uncompressedSize > 0)
       {
-        // We will use the cached buffers to avoid allocations.
-        // Reset the buffers;
-        entryBuffer.clear();
-        compressedEntryBuffer.clear();
-
         // It was compressed.
         reader.readOctetString(compressedEntryBuffer);
         CryptoManager cryptoManager = DirectoryServer.getCryptoManager();
@@ -125,7 +175,7 @@
       else
       {
         // Since we don't have to do any decompression, we can just decode
-        // the entry off the
+        // the entry directly.
         ByteString encodedEntry = reader.readOctetString();
         return Entry.decode(encodedEntry.asReader(), compressedSchema);
       }
@@ -149,11 +199,6 @@
     private void encodeVolatile(Entry entry, DataConfig dataConfig)
         throws DirectoryException
     {
-      // Reset the buffers;
-      encodedBuffer.clear();
-      entryBuffer.clear();
-      compressedEntryBuffer.clear();
-
       // Encode the entry for later use.
       entry.encode(entryBuffer, dataConfig.getEntryEncodeConfig());
 
@@ -189,7 +234,6 @@
         {
           TRACER.debugCaught(DebugLogLevel.ERROR, ioe);
         }
-
       }
     }
   }
@@ -270,18 +314,19 @@
    * @throws DirectoryException If a Directory Server error occurs.
    * @throws IOException if an error occurs while reading the ASN1 sequence.
    */
-  static public Entry entryFromDatabase(ByteString bytes,
-                                        CompressedSchema compressedSchema)
-      throws DirectoryException,ASN1Exception,LDAPException,
-      DataFormatException,IOException
+  public static Entry entryFromDatabase(ByteString bytes,
+      CompressedSchema compressedSchema) throws DirectoryException,
+      ASN1Exception, LDAPException, DataFormatException, IOException
   {
-    EntryCoder coder = entryCodingBuffers.get();
-    if(coder == null)
+    EntryCodec codec = acquireEntryCodec();
+    try
     {
-      coder = new EntryCoder();
-      entryCodingBuffers.set(coder);
+      return codec.decode(bytes, compressedSchema);
     }
-    return coder.decode(bytes, compressedSchema);
+    finally
+    {
+      codec.release();
+    }
   }
 
   /**
@@ -294,19 +339,22 @@
    * @throws  DirectoryException  If a problem occurs while attempting to encode
    *                              the entry.
    */
-  static public ByteString entryToDatabase(Entry entry, DataConfig dataConfig)
+  public static ByteString entryToDatabase(Entry entry, DataConfig dataConfig)
       throws DirectoryException
   {
-    EntryCoder coder = entryCodingBuffers.get();
-    if(coder == null)
+    EntryCodec codec = acquireEntryCodec();
+    try
     {
-      coder = new EntryCoder();
-      entryCodingBuffers.set(coder);
+      return codec.encodeCopy(entry, dataConfig);
     }
-
-    return coder.encodeCopy(entry, dataConfig);
+    finally
+    {
+      codec.release();
+    }
   }
 
+
+
   /**
    * Insert a record into the entry database.
    *
@@ -323,21 +371,17 @@
        throws DatabaseException, DirectoryException
   {
     DatabaseEntry key = id.getDatabaseEntry();
-    EntryCoder coder = entryCodingBuffers.get();
-    if(coder == null)
+    EntryCodec codec = acquireEntryCodec();
+    try
     {
-      coder = new EntryCoder();
-      entryCodingBuffers.set(coder);
+      DatabaseEntry data = codec.encodeInternal(entry, dataConfig);
+      OperationStatus status = insert(txn, key, data);
+      return (status == OperationStatus.SUCCESS);
     }
-    DatabaseEntry data = coder.encodeInternal(entry, dataConfig);
-
-    OperationStatus status;
-    status = insert(txn, key, data);
-    if (status != OperationStatus.SUCCESS)
+    finally
     {
-      return false;
+      codec.release();
     }
-    return true;
   }
 
   /**
@@ -355,21 +399,17 @@
        throws DatabaseException, DirectoryException
   {
     DatabaseEntry key = id.getDatabaseEntry();
-    EntryCoder coder = entryCodingBuffers.get();
-    if(coder == null)
+    EntryCodec codec = acquireEntryCodec();
+    try
     {
-      coder = new EntryCoder();
-      entryCodingBuffers.set(coder);
+      DatabaseEntry data = codec.encodeInternal(entry, dataConfig);
+      OperationStatus status = put(txn, key, data);
+      return (status == OperationStatus.SUCCESS);
     }
-    DatabaseEntry data = coder.encodeInternal(entry, dataConfig);
-
-    OperationStatus status;
-    status = put(txn, key, data);
-    if (status != OperationStatus.SUCCESS)
+    finally
     {
-      return false;
+      codec.release();
     }
-    return true;
   }
 
   /**

--
Gitblit v1.10.0