mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.01.2013 db6c2473d37efac054dd9d0d4c097da3942e4dff
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -25,15 +25,13 @@
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.backends.jeb.importLDIF;
import static org.opends.messages.JebMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.util.DynamicConstants.BUILD_ID;
import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.*;
import java.io.*;
import java.nio.ByteBuffer;
@@ -87,122 +85,133 @@
      + MAX_DB_LOG_SIZE;
  private static final int BYTE_BUFFER_CAPACITY = 128;
  //Min and MAX sizes of phase one buffer.
  /** Max size of phase one buffer. */
  private static final int MAX_BUFFER_SIZE = 2 * MB;
  /** Min size of phase one buffer. */
  private static final int MIN_BUFFER_SIZE = 4 * KB;
  //Min size of phase two read-ahead cache.
  /** Min size of phase two read-ahead cache. */
  private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
  //Small heap threshold used to give more memory to JVM to attempt OOM errors.
  /**
   * Small heap threshold used to give more memory to JVM to attempt OOM errors.
   */
  private static final int SMALL_HEAP_SIZE = 256 * MB;
  //The DN attribute type.
  /** The DN attribute type. */
  private static AttributeType dnType;
  static final IndexOutputBuffer.IndexComparator indexComparator =
      new IndexOutputBuffer.IndexComparator();
  //Phase one buffer and imported entries counts.
  /** Phase one buffer count. */
  private final AtomicInteger bufferCount = new AtomicInteger(0);
  /** Phase one imported entries count. */
  private final AtomicLong importCount = new AtomicLong(0);
  //Phase one buffer size in bytes.
  /** Phase one buffer size in bytes. */
  private int bufferSize;
  //Temp scratch directory.
  /** Temp scratch directory. */
  private final File tempDir;
  //Index and thread counts.
  /** Index count. */
  private final int indexCount;
  /** Thread count. */
  private int threadCount;
  //Set to true when validation is skipped.
  /** Set to true when validation is skipped. */
  private final boolean skipDNValidation;
  //Temporary environment used when DN validation is done in first phase.
  /** Temporary environment used when DN validation is done in first phase. */
  private final TmpEnv tmpEnv;
  //Root container.
  /** Root container. */
  private RootContainer rootContainer;
  //Import configuration.
  /** Import configuration. */
  private final LDIFImportConfig importConfiguration;
  //Backend configuration.
  /** Backend configuration. */
  private final LocalDBBackendCfg backendConfiguration;
  //LDIF reader.
  /** LDIF reader. */
  private LDIFReader reader;
  //Migrated entry count.
  /** Migrated entry count. */
  private int migratedCount;
  // Size in bytes of temporary env.
  /** Size in bytes of temporary env. */
  private long tmpEnvCacheSize;
  // Available memory at the start of the import.
  /** Available memory at the start of the import. */
  private long availableMemory;
  // Size in bytes of DB cache.
  /** Size in bytes of DB cache. */
  private long dbCacheSize;
  //The executor service used for the buffer sort tasks.
  /** The executor service used for the buffer sort tasks. */
  private ExecutorService bufferSortService;
  //The executor service used for the scratch file processing tasks.
  /** The executor service used for the scratch file processing tasks. */
  private ExecutorService scratchFileWriterService;
  //Queue of free index buffers -- used to re-cycle index buffers;
  /** Queue of free index buffers -- used to re-cycle index buffers. */
  private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
      new LinkedBlockingQueue<IndexOutputBuffer>();
  //Map of index keys to index buffers.  Used to allocate sorted
  //index buffers to a index writer thread.
  /**
   * Map of index keys to index buffers. Used to allocate sorted index buffers
   * to a index writer thread.
   */
  private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueMap =
      new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
  //Map of DB containers to index managers. Used to start phase 2.
  /** Map of DB containers to index managers. Used to start phase 2. */
  private final List<IndexManager> indexMgrList =
      new LinkedList<IndexManager>();
  //Map of DB containers to DN-based index managers. Used to start phase 2.
  /** Map of DB containers to DN-based index managers. Used to start phase 2. */
  private final List<IndexManager> DNIndexMgrList =
      new LinkedList<IndexManager>();
  //Futures used to indicate when the index file writers are done flushing
  //their work queues and have exited. End of phase one.
  /**
   * Futures used to indicate when the index file writers are done flushing
   * their work queues and have exited. End of phase one.
   */
  private final List<Future<?>> scratchFileWriterFutures;
  //List of index file writer tasks. Used to signal stopScratchFileWriters to
  //the index file writer tasks when the LDIF file has been done.
  /**
   * List of index file writer tasks. Used to signal stopScratchFileWriters to
   * the index file writer tasks when the LDIF file has been done.
   */
  private final List<ScratchFileWriterTask> scratchFileWriterList;
  //Map of DNs to Suffix objects.
  /** Map of DNs to Suffix objects. */
  private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
  //Map of container ids to database containers.
  /** Map of container ids to database containers. */
  private final ConcurrentHashMap<Integer, DatabaseContainer> idContainerMap =
      new ConcurrentHashMap<Integer, DatabaseContainer>();
  //Map of container ids to entry containers
  /** Map of container ids to entry containers. */
  private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
      new ConcurrentHashMap<Integer, EntryContainer>();
  //Used to synchronize when a scratch file index writer is first setup.
  /** Used to synchronize when a scratch file index writer is first setup. */
  private final Object synObj = new Object();
  //Rebuild index manager used when rebuilding indexes.
  /** Rebuild index manager used when rebuilding indexes. */
  private final RebuildIndexManager rebuildManager;
  //Set to true if the backend was cleared.
  /** Set to true if the backend was cleared. */
  private boolean clearedBackend = false;
  //Used to shutdown import if an error occurs in phase one.
  /** Used to shutdown import if an error occurs in phase one. */
  private volatile boolean isCanceled = false;
  private volatile boolean isPhaseOneDone = false;
  //Number of phase one buffers
  /** Number of phase one buffers. */
  private int phaseOneBufferCount;
  static
@@ -624,8 +633,10 @@
    }
  }
  //Mainly used to support multiple suffixes. Each index in each suffix gets
  //an unique ID to identify which DB it needs to go to in phase two processing.
  /**
   * Mainly used to support multiple suffixes. Each index in each suffix gets an
   * unique ID to identify which DB it needs to go to in phase two processing.
   */
  private void generateIndexID(Suffix suffix)
  {
    for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
@@ -728,7 +739,6 @@
           * There are no branches in the explicitly defined include list under
           * this base DN. Skip this base DN all together.
           */
          return null;
        }
@@ -1792,9 +1802,11 @@
      }
      else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
      {
        // complete the current buffer...
        indexBuffer.setComparator(comparator);
        indexBuffer.setIndexKey(indexKey);
        bufferSortService.submit(new SortTask(indexBuffer));
        // ... and get a new one
        indexBuffer = getNewIndexBuffer(sizeNeeded);
        indexBufferMap.put(indexKey, indexBuffer);
      }
@@ -4102,7 +4114,7 @@
      }
      catch (DatabaseException e)
      {
        // Unlikely to happen and not critical.
      }
      previousProcessed = entriesProcessed;
      previousTime = latestTime;