From 6d36d6a0cda2cbbfd1fc9da38d42c1ac15c0b160 Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Wed, 13 Jan 2010 03:29:40 +0000
Subject: [PATCH] Issue 4476 large thread count and small heap cause OOME.

---
 opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java |  159 +++++++++++++++++++++++++++++++++++++++--------------
 opends/src/messages/messages/jeb.properties                               |    9 ++-
 2 files changed, 123 insertions(+), 45 deletions(-)

diff --git a/opends/src/messages/messages/jeb.properties b/opends/src/messages/messages/jeb.properties
index 444357e..f1f8cdf 100644
--- a/opends/src/messages/messages/jeb.properties
+++ b/opends/src/messages/messages/jeb.properties
@@ -20,7 +20,7 @@
 #
 # CDDL HEADER END
 #
-#      Copyright 2006-2009 Sun Microsystems, Inc.
+#      Copyright 2006-2010 Sun Microsystems, Inc.
 
 
 
@@ -180,8 +180,8 @@
 NOTICE_JEB_EXPORT_PROGRESS_REPORT_88=Exported %d records and skipped %d (recent \
  rate %.1f/sec)
 NOTICE_JEB_IMPORT_THREAD_COUNT_89=Import Thread Count: %d threads
-SEVERE_ERR_IMPORT_LDIF_LACK_MEM_90=Insufficient free memory to perform import. \
-At least %dMB of free memory is required
+SEVERE_ERR_IMPORT_LDIF_LACK_MEM_90=Insufficient free memory (%d bytes) to \
+perform import. At least %d bytes of free memory is required
 INFO_JEB_IMPORT_LDIF_PROCESSING_TIME_91=LDIF processing took %d seconds
 INFO_JEB_IMPORT_INDEX_PROCESSING_TIME_92=Index processing took %d seconds
 NOTICE_JEB_IMPORT_CLOSING_DATABASE_93=Flushing data to disk
@@ -400,3 +400,6 @@
 %d buffers
 SEVERE_ERR_JEB_IMPORT_LDIF_REBUILD_INDEX_TASK_ERR_212=The following \
 error was received while processing the rebuild index task: %s
+NOTICE_JEB_IMPORT_ADJUST_THREAD_COUNT_213=Insufficient memory to allocate \
+enough phase one buffers for use by %d threads. Lowering the number of threads \
+used to %d
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index c6cec58..ea17f69 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008-2009 Sun Microsystems, Inc.
+ *      Copyright 2008-2010 Sun Microsystems, Inc.
  */
 
 package org.opends.server.backends.jeb.importLDIF;
@@ -68,19 +68,21 @@
   //Defaults for DB cache.
   private static final int MAX_DB_CACHE_SIZE = 8 * MB;
   private static final int MAX_DB_LOG_SIZE = 10 * MB;
+  private static final int MIN_DB_CACHE_SIZE = 4 * MB;
 
   //Defaults for LDIF reader buffers, min memory required to import and default
   //size for byte buffers.
-  private static final int READER_WRITER_BUFFER_SIZE = 2 * MB;
-  private static final int MIN_IMPORT_MEMORY_REQUIRED = 12 * MB;
+  private static final int READER_WRITER_BUFFER_SIZE = 1 * MB;
+  private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE +
+                                                 MAX_DB_LOG_SIZE;
   private static final int BYTE_BUFFER_CAPACITY = 128;
 
   //Min and MAX sizes of phase one buffer.
-  private static final int MAX_BUFFER_SIZE = 48 * MB;
-  private static final int MIN_BUFFER_SIZE = 64 * KB;
+  private static final int MAX_BUFFER_SIZE = 100 * MB;
+  private static final int MIN_BUFFER_SIZE = 8 * KB;
 
   //Min size of phase two read-ahead cache.
-  private static final int MIN_READ_AHEAD_CACHE_SIZE = 1 * KB;
+  private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
 
   //Set aside this much for the JVM from free memory.
   private static final int JVM_MEM_PCT = 45;
@@ -111,7 +113,8 @@
   private final File tempDir;
 
   //Index and thread counts.
-  private final int indexCount, threadCount;
+  private final int indexCount;
+  private int threadCount;
 
   //Set to true when validation is skipped.
   private final boolean skipDNValidation;
@@ -131,8 +134,9 @@
   //Migrated entry count.
   private int migratedCount;
 
-  //Size in bytes of temporary env and DB cache.
-  private long tmpEnvCacheSize = 0, dbCacheSize = MAX_DB_CACHE_SIZE;
+  //Size in bytes of temporary env, DB cache, DB log buf size.
+  private long tmpEnvCacheSize = 0, dbCacheSize = MAX_DB_CACHE_SIZE,
+               dbLogBufSize = MAX_DB_LOG_SIZE;
 
   //The executor service used for the buffer sort tasks.
   private ExecutorService bufferSortService;
@@ -193,6 +197,7 @@
   //Number of phase one buffers
   private int phaseOneBufferCount;
 
+
   static
   {
     if ((dnType = DirectoryServer.getAttributeType("dn")) == null)
@@ -363,11 +368,28 @@
   }
 
 
+  private void adjustBufferSize(long availMem)
+  {
+     int oldThreadCount = threadCount;
+     for(;threadCount > 0; threadCount--)
+     {
+       phaseOneBufferCount = 2 * (indexCount * threadCount);
+       bufferSize = (int) (availMem / phaseOneBufferCount);
+       if(bufferSize >= MIN_BUFFER_SIZE)
+       {
+         break;
+       }
+     }
+     Message message =
+           NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT.get(oldThreadCount, threadCount);
+     logError(message);
+  }
+
+
   private boolean getBufferSizes(long availMem)
   {
     boolean maxBuf = false;
-    long memory = availMem - (MAX_DB_CACHE_SIZE + MAX_DB_LOG_SIZE);
-    bufferSize = (int) (memory/ phaseOneBufferCount);
+    bufferSize = (int) (availMem/ phaseOneBufferCount);
     if(bufferSize >= MIN_BUFFER_SIZE)
     {
       if(bufferSize > MAX_BUFFER_SIZE)
@@ -378,10 +400,7 @@
     }
     else if(bufferSize < MIN_BUFFER_SIZE)
     {
-      Message message =
-              NOTE_JEB_IMPORT_LDIF_BUFF_SIZE_LESS_DEFAULT.get(MIN_BUFFER_SIZE);
-      logError(message);
-      bufferSize = MIN_BUFFER_SIZE;
+      adjustBufferSize(availMem);
     }
     return maxBuf;
   }
@@ -432,19 +451,73 @@
   private void adjustTmpEnvironmentMemory(long availableMemoryImport)
   {
     long additionalMem = availableMemoryImport -
-                         (phaseOneBufferCount * MAX_BUFFER_SIZE);
-    tmpEnvCacheSize += additionalMem;
-    if(!clearedBackend)
+            (phaseOneBufferCount * MAX_BUFFER_SIZE);
+    if(additionalMem > 0)
     {
-      //The DN cache probably needs to be smaller and the DB cache bigger
-      //because the dn2id is checked if the backend has not been cleared.
-      long additionalDBCache = (tmpEnvCacheSize * 85) / 100;
-      tmpEnvCacheSize -= additionalDBCache;
-      dbCacheSize += additionalDBCache;
+      tmpEnvCacheSize += additionalMem;
+      if(!clearedBackend)
+      {
+        //The DN cache probably needs to be smaller and the DB cache bigger
+        //because the dn2id is checked if the backend has not been cleared.
+        long additionalDBCache = (tmpEnvCacheSize * 85) / 100;
+        tmpEnvCacheSize -= additionalDBCache;
+        dbCacheSize += additionalDBCache;
+      }
     }
   }
 
 
+  private long defaultMemoryCalc(long availMem)
+          throws InitializationException
+  {
+    long bufMem = 0;
+    if(availMem < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
+    {
+      long minCacheSize = MIN_DB_CACHE_SIZE;
+      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) {
+        minCacheSize = 500 *KB;
+      }
+      dbCacheSize = minCacheSize;
+      tmpEnvCacheSize = minCacheSize;
+      dbLogBufSize = 0;
+      bufMem = availMem - 2 * minCacheSize;
+      if(bufMem < 0 || (bufMem < (2 * indexCount) * MIN_BUFFER_SIZE)) {
+        Message message =
+              ERR_IMPORT_LDIF_LACK_MEM.get(availMem,
+                 ((2 * indexCount) * MIN_BUFFER_SIZE) + 2 * MIN_DB_CACHE_SIZE);
+        throw new InitializationException(message);
+      }
+    }
+    else
+    {
+      bufMem = getTmpEnvironmentMemory(availMem);
+    }
+    return bufMem;
+  }
+
+  private long skipDNValidationCalc(long availMem)
+          throws InitializationException
+  {
+    long bufMem = availMem;
+    if(availMem < (MIN_DB_CACHE_MEMORY))
+    {
+      long minCacheSize = MIN_DB_CACHE_SIZE;
+      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) {
+        minCacheSize = 500 *KB;
+      }
+      dbCacheSize = minCacheSize;
+      dbLogBufSize = 0;
+      bufMem = availMem - minCacheSize;
+      if(bufMem < 0 || (bufMem < (2 * indexCount) * MIN_BUFFER_SIZE)) {
+        Message message =
+              ERR_IMPORT_LDIF_LACK_MEM.get(availMem,
+                  ((2 * indexCount) * MIN_BUFFER_SIZE) + MIN_DB_CACHE_SIZE);
+        throw new InitializationException(message);
+      }
+    }
+    return bufMem;
+  }
+
   /**
    * Calculate buffer sizes and initialize JEB properties based on memory.
    *
@@ -469,25 +542,24 @@
       {
         importMemPct -= 15;
       }
-      long availableMemoryImport = (totFreeMemory * importMemPct) / 100;
+      long phaseOneBufferMemory = 0;
       if(!skipDNValidation)
       {
-        availableMemoryImport = getTmpEnvironmentMemory(availableMemoryImport);
+        phaseOneBufferMemory =
+                       defaultMemoryCalc((totFreeMemory * importMemPct) / 100);
       }
-      boolean maxBuffers = getBufferSizes(availableMemoryImport);
+      else
+      {
+        phaseOneBufferMemory =
+                     skipDNValidationCalc((totFreeMemory * importMemPct) / 100);
+      }
+      boolean maxBuffers = getBufferSizes(phaseOneBufferMemory);
+      //Give any extra memory to the temp environment cache if there is any.
       if(!skipDNValidation && maxBuffers)
       {
-        adjustTmpEnvironmentMemory(availableMemoryImport);
+        adjustTmpEnvironmentMemory(phaseOneBufferMemory);
       }
-      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) == null)
-      {
-          if (availableMemoryImport < MIN_IMPORT_MEMORY_REQUIRED)
-          {
-              message = ERR_IMPORT_LDIF_LACK_MEM.get(16);
-              throw new InitializationException(message);
-          }
-      }
-      message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(availableMemoryImport,
+      message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(phaseOneBufferMemory,
               phaseOneBufferCount);
       logError(message);
       if(tmpEnvCacheSize > 0)
@@ -497,14 +569,17 @@
       }
       envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
       envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY,
-                                                    Long.toString(dbCacheSize));
+              Long.toString(dbCacheSize));
       message = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize,
-                                                         bufferSize);
+              bufferSize);
       logError(message);
-      envConfig.setConfigParam(EnvironmentConfig.LOG_TOTAL_BUFFER_BYTES,
-                               Long.toString(MAX_DB_LOG_SIZE));
-      message = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(MAX_DB_LOG_SIZE);
-      logError(message);
+      if(dbLogBufSize > 0)
+      {
+        envConfig.setConfigParam(EnvironmentConfig.LOG_TOTAL_BUFFER_BYTES,
+                Long.toString(MAX_DB_LOG_SIZE));
+        message = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(MAX_DB_LOG_SIZE);
+        logError(message);
+      }
   }
 
 

--
Gitblit v1.10.0