mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

dugan
24.59.2008 8e2715e335ec8386e5e0285f702bad7e19d26197
Setting import thread count to a high number (2048) throws exceptions. Issue 2333.
3 files modified
29 ■■■■■ changed files
opendj-sdk/opends/src/messages/messages/jeb.properties 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java 25 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/messages/messages/jeb.properties
@@ -181,7 +181,7 @@
 seconds (average rate %.1f/sec)
NOTICE_JEB_EXPORT_PROGRESS_REPORT_88=Exported %d records and skipped %d (recent \
 rate %.1f/sec)
INFO_JEB_IMPORT_THREAD_COUNT_89=Import Thread Count: %d threads
NOTICE_JEB_IMPORT_THREAD_COUNT_89=Import Thread Count: %d threads
INFO_JEB_IMPORT_BUFFER_SIZE_90=Buffer size per thread = %,d
INFO_JEB_IMPORT_LDIF_PROCESSING_TIME_91=LDIF processing took %d seconds
INFO_JEB_IMPORT_INDEX_PROCESSING_TIME_92=Index processing took %d seconds
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -75,7 +75,7 @@
  private final static int TREEMAP_ENTRY_OVERHEAD = 29;
  private final static int KEY_ELEMENT_OVERHEAD = 28;
  //Count of number of worker threads.
  //Import worker thread count.
  private int importThreadCount;
  //Used to prevent memory flush
@@ -149,7 +149,7 @@
    } else {
      iter = elementMap.tailMap(nextElem).keySet().iterator();
    }
    while((memoryUsage + extraBytes > memoryLimit) && limitFlush) {
    while(((memoryUsage + extraBytes) > memoryLimit) && limitFlush) {
      if(iter.hasNext()) {
        KeyHashElement curElem = iter.next();
        //Never flush undefined elements.
@@ -184,27 +184,22 @@
  }
  /**
   * Writes all of the buffer elements to DB. The specific id is used to
   * share the buffer among the worker threads so this function can be
   * multi-threaded.
   * Writes all of the buffer elements to DB. Thread id 0 always peforms the
   * flushing.
   *
   * @param id The thread id.
   * @throws DatabaseException If an error occurred during the insert.
   */
  void flushAll(int id) throws DatabaseException {
    if(id != 0) {
      return;
    }
    TreeSet<KeyHashElement>  tSet =
            new TreeSet<KeyHashElement>(elementMap.keySet());
    Iterator<KeyHashElement> iter = tSet.iterator();
    int i=0;
    while(iter.hasNext()) {
      KeyHashElement curElem = iter.next();
    for (KeyHashElement curElem : tSet) {
      Index index = curElem.getIndex();
      //Each thread handles a piece of the buffer based on its thread id.
      if((i % importThreadCount) == id) {
        index.insert(null, new DatabaseEntry(curElem.getKey()),
                curElem.getIDSet());
      }
      i++;
      index.insert(null, new DatabaseEntry(curElem.getKey()),
              curElem.getIDSet());
    }
  }
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -221,7 +221,7 @@
      message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
                                                     BUILD_ID, REVISION_NUMBER);
      logError(message);
      message = INFO_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
      message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
      logError(message);
      RuntimeInformation.logInfo();
      for (EntryContainer entryContainer : rootContainer.getEntryContainers()) {