mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
30.19.2010 e4076b2991c9604907f6a2f5ba2d526d0072adf6
import-ldif: Change second phase import strategy in order to better handle large LDIF files with low memory.
In first phase write buffer positions to index files instead of storing in memory and suffering OOME due to O(N) memory growth. In second phase, read buffer positions from index files and fall-back to batch import of indexes when the number of buffers for an index would cause OOME if they were all opened at once. Also, improve second phase progress statistics to report batch count, kb remaining/rate, and fix several race conditions in the statistics.
3 files modified
721 ■■■■■ changed files
opends/src/messages/messages/jeb.properties 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java 676 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexInputBuffer.java 35 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/jeb.properties
@@ -278,8 +278,8 @@
 the import process can start
SEVERE_ERR_JEB_IMPORT_THREAD_EXCEPTION_153=An error occurred in import thread \
 %s: %s. The thread can not continue
NOTICE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT_154=Index %s: bytes left = %d, \
key processed rate = %.1f/sec
NOTICE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT_154=Index %s %d%% complete: \
 remaining = %d kb, rate = %d kb/s; batch %d/%d
SEVERE_ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR_155=Unable to create the temporary \
 directory %s
SEVERE_ERR_JEB_INVALID_LOGGING_LEVEL_156=The database logging level string \
@@ -399,7 +399,7 @@
SEVERE_ERR_JEB_IMPORT_UNCAUGHT_EXCEPTION_210=The following error was received \
by the uncaught exception handler: %s
NOTICE_JEB_IMPORT_LDIF_INDEX_STARTED_211=Index %s phase two started processing \
%d buffers
%d buffers in %d batches
SEVERE_ERR_JEB_IMPORT_LDIF_REBUILD_INDEX_TASK_ERR_212=The following \
error was received while processing the rebuild index task: %s
NOTICE_JEB_IMPORT_ADJUST_THREAD_COUNT_213=Insufficient memory to allocate \
@@ -410,4 +410,6 @@
  the LDIF stream
SEVERE_ERR_JEB_IMPORT_BUFFER_IO_ERROR_216=I/O error occurred while reading \
  the index scratch file %s in the temporary directory
MILD_WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO_217=Insufficient free memory (%d bytes) to \
 perform import phase 2 in a single batch. Some indexes will be imported using \
 several batches which may result in reduced performance
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -36,7 +36,6 @@
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1085,7 +1084,7 @@
      final int limit = Math.min(dbThreads, totList.size());
      for (int i = 0; i < limit; i++)
      {
        buffers += totList.get(i).bufferIndexCount;
        buffers += totList.get(i).numberOfBuffers;
      }
      readAheadSize = (int) (usableMemory / buffers);
@@ -1107,15 +1106,20 @@
      }
      else
      {
        // Not enough memory.
        final long minimumPhaseTwoBufferMemory = buffers
            * MIN_READ_AHEAD_CACHE_SIZE;
        Message message = ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
            minimumPhaseTwoBufferMemory + dbCacheSize);
        throw new InitializationException(message);
        // Not enough memory - will need to do batching for the biggest indexes.
        readAheadSize = MIN_READ_AHEAD_CACHE_SIZE;
        buffers = (int) (usableMemory / readAheadSize);
        Message message = WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO.get(usableMemory);
        logError(message);
        break;
      }
    }
    // Ensure that there are always two threads available for parallel
    // processing of smaller indexes.
    dbThreads = Math.max(2, dbThreads);
    Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(
        availableMemory, readAheadSize, buffers);
    logError(message);
@@ -1123,15 +1127,18 @@
    // Start indexing tasks.
    List<Future<Void>> futures = new LinkedList<Future<Void>>();
    ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
    Semaphore permits = new Semaphore(buffers);
    // Start DN processing first.
    for (IndexManager dnMgr : DNIndexMgrList)
    {
      futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, readAheadSize)));
      futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits,
          buffers, readAheadSize)));
    }
    for (IndexManager mgr : indexMgrList)
    {
      futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize)));
      futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers,
          readAheadSize)));
    }
    for (Future<Void> result : futures)
@@ -1814,54 +1821,241 @@
    private final DatabaseEntry dbKey, dbValue;
    private final int cacheSize;
    private final Map<Integer, DNState> dnStateMap =
                                               new HashMap<Integer, DNState>();
      new HashMap<Integer, DNState>();
    private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
    private final Semaphore permits;
    private final int maxPermits;
    private final AtomicLong bytesRead = new AtomicLong();
    private long lastBytesRead = 0;
    private final AtomicInteger keyCount = new AtomicInteger();
    private RandomAccessFile bufferFile = null;
    private DataInputStream bufferIndexFile = null;
    private int remainingBuffers;
    private volatile int totalBatches;
    private AtomicInteger batchNumber = new AtomicInteger();
    private int nextBufferID;
    private int ownedPermits;
    private volatile boolean isRunning = false;
    public IndexDBWriteTask(IndexManager indexMgr, int cacheSize)
    /**
     * Creates a new index DB writer.
     *
     * @param indexMgr
     *          The index manager.
     * @param permits
     *          The semaphore used for restricting the number of buffer
     *          allocations.
     * @param maxPermits
     *          The maximum number of buffers which can be allocated.
     * @param cacheSize
     *          The buffer cache size.
     */
    public IndexDBWriteTask(IndexManager indexMgr, Semaphore permits,
        int maxPermits, int cacheSize)
    {
      this.indexMgr = indexMgr;
      this.permits = permits;
      this.maxPermits = maxPermits;
      this.cacheSize = cacheSize;
      this.dbKey = new DatabaseEntry();
      this.dbValue = new DatabaseEntry();
      this.cacheSize = cacheSize;
    }
    private NavigableSet<IndexInputBuffer> initializeBuffers()
      throws IOException
    /**
     * Initializes this task.
     *
     * @throws IOException
     *           If an IO error occurred.
     */
    public void beginWriteTask() throws IOException
    {
      NavigableSet<IndexInputBuffer> bufferSet =
        new TreeSet<IndexInputBuffer>();
      for (int i = 0; i < indexMgr.bufferIndexCount; i++)
      bufferFile = new RandomAccessFile(indexMgr.getBufferFile(), "r");
      bufferIndexFile = new DataInputStream(new BufferedInputStream(
          new FileInputStream(indexMgr.getBufferIndexFile())));
      remainingBuffers = indexMgr.getNumberOfBuffers();
      totalBatches = (remainingBuffers / maxPermits) + 1;
      batchNumber.set(0);
      nextBufferID = 0;
      ownedPermits = 0;
      Message message = NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(
          indexMgr.getBufferFileName(), remainingBuffers, totalBatches);
      logError(message);
      indexMgr.setIndexDBWriteTask(this);
      isRunning = true;
    }
    /**
     * Returns the next batch of buffers to be processed, blocking until enough
     * buffer permits are available.
     *
     * @return The next batch of buffers, or {@code null} if there are no more
     *         buffers to be processed.
     * @throws Exception
     *           If an exception occurred.
     */
    public NavigableSet<IndexInputBuffer> getNextBufferBatch() throws Exception
    {
      // First release any previously acquired permits.
      if (ownedPermits > 0)
      {
        IndexInputBuffer b = new IndexInputBuffer(indexMgr,
            indexMgr.bufferIndexBegin[i], indexMgr.bufferIndexEnd[i],
            indexMgr.bufferIndexID[i]);
        b.initializeCache(cacheSize);
        bufferSet.add(b);
        permits.release(ownedPermits);
        ownedPermits = 0;
      }
      // GC arrays.
      indexMgr.bufferIndexBegin = null;
      indexMgr.bufferIndexEnd = null;
      indexMgr.bufferIndexID = null;
      // Block until we can either get enough permits for all buffers, or the
      // maximum number of permits.
      final int permitRequest = Math.min(remainingBuffers, maxPermits);
      if (permitRequest == 0)
      {
        // No more work to do.
        return null;
      }
      permits.acquire(permitRequest);
      return bufferSet;
      // Update counters.
      ownedPermits = permitRequest;
      remainingBuffers -= permitRequest;
      batchNumber.incrementAndGet();
      // Create all the index buffers for the next batch.
      final NavigableSet<IndexInputBuffer> buffers =
        new TreeSet<IndexInputBuffer>();
      for (int i = 0; i < permitRequest; i++)
      {
        final long bufferBegin = bufferIndexFile.readLong();
        final long bufferEnd = bufferIndexFile.readLong();
        final IndexInputBuffer b = new IndexInputBuffer(indexMgr,
            bufferFile.getChannel(), bufferBegin, bufferEnd, nextBufferID++,
            cacheSize);
        buffers.add(b);
      }
      return buffers;
    }
    /**
     * Finishes this task.
     *
     * @throws Exception
     *           If an exception occurred.
     */
    public void endWriteTask() throws Exception
    {
      isRunning = false;
      // First release any previously acquired permits.
      if (ownedPermits > 0)
      {
        permits.release(ownedPermits);
        ownedPermits = 0;
      }
      try
      {
        if (indexMgr.isDN2ID())
        {
          for (DNState dnState : dnStateMap.values())
          {
            dnState.flush();
          }
          Message msg = NOTE_JEB_IMPORT_LDIF_DN_CLOSE
              .get(indexMgr.getDNCount());
          logError(msg);
        }
        else
        {
          for (Index index : indexMap.values())
          {
            index.closeCursor();
          }
          Message message = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr
              .getBufferFileName());
          logError(message);
        }
      }
      finally
      {
        if (bufferFile != null)
        {
          try
          {
            bufferFile.close();
          }
          catch (IOException ignored)
          {
            // Ignore.
          }
        }
        if (bufferIndexFile != null)
        {
          try
          {
            bufferIndexFile.close();
          }
          catch (IOException ignored)
          {
            // Ignore.
          }
        }
        indexMgr.getBufferFile().delete();
        indexMgr.getBufferIndexFile().delete();
      }
    }
    /**
     * Print out progress stats.
     *
     * @param deltaTime
     *          The time since the last update.
     */
    public void printStats(long deltaTime)
    {
      if (isRunning)
      {
        final long bufferFileSize = indexMgr.getBufferFileSize();
        final long tmpBytesRead = bytesRead.get();
        final int currentBatch = batchNumber.get();
        final long bytesReadInterval = tmpBytesRead - lastBytesRead;
        final int bytesReadPercent = Math.round((100f * tmpBytesRead)
            / bufferFileSize);
        // Kilo and milli approximately cancel out.
        final long kiloBytesRate = bytesReadInterval / deltaTime;
        final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
        Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(
            indexMgr.getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
            kiloBytesRate, currentBatch, totalBatches);
        logError(message);
        lastBytesRead = tmpBytesRead;
      }
    }
    /**
     * {@inheritDoc}
     */
    public Void call() throws Exception
    {
      Thread.setDefaultUncaughtExceptionHandler(
             new DefaultExceptionHandler());
      indexMgr.setStarted();
      Message message =
              NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getFileName(),
                         indexMgr.bufferIndexCount);
      logError(message);
      Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler());
      ByteBuffer key = null;
      ImportIDSet insertIDSet = null;
@@ -1870,142 +2064,110 @@
      try
      {
        indexMgr.openIndexFile();
        NavigableSet<IndexInputBuffer> bufferSet = initializeBuffers();
        while (!bufferSet.isEmpty())
        {
          IndexInputBuffer b = bufferSet.pollFirst();
          if (key == null)
          {
            indexID = b.getIndexID();
        beginWriteTask();
            if (indexMgr.isDN2ID())
        NavigableSet<IndexInputBuffer> bufferSet;
        while ((bufferSet = getNextBufferBatch()) != null)
        {
          while (!bufferSet.isEmpty())
          {
            IndexInputBuffer b = bufferSet.pollFirst();
            if (key == null)
            {
              insertIDSet = new ImportIDSet(1, 1, false);
              deleteIDSet = new ImportIDSet(1, 1, false);
              indexID = b.getIndexID();
              if (indexMgr.isDN2ID())
              {
                insertIDSet = new ImportIDSet(1, 1, false);
                deleteIDSet = new ImportIDSet(1, 1, false);
              }
              else
              {
                Index index = (Index) idContainerMap.get(indexID);
                int limit = index.getIndexEntryLimit();
                boolean doCount = index.getMaintainCount();
                insertIDSet = new ImportIDSet(1, limit, doCount);
                deleteIDSet = new ImportIDSet(1, limit, doCount);
              }
              key = ByteBuffer.allocate(b.getKeyLen());
              key.flip();
              b.getKey(key);
              b.mergeIDSet(insertIDSet);
              b.mergeIDSet(deleteIDSet);
              insertIDSet.setKey(key);
              deleteIDSet.setKey(key);
            }
            else if (b.compare(key, indexID) != 0)
            {
              addToDB(insertIDSet, deleteIDSet, indexID);
              keyCount.incrementAndGet();
              indexID = b.getIndexID();
              if (indexMgr.isDN2ID())
              {
                insertIDSet = new ImportIDSet(1, 1, false);
                deleteIDSet = new ImportIDSet(1, 1, false);
              }
              else
              {
                Index index = (Index) idContainerMap.get(indexID);
                int limit = index.getIndexEntryLimit();
                boolean doCount = index.getMaintainCount();
                insertIDSet = new ImportIDSet(1, limit, doCount);
                deleteIDSet = new ImportIDSet(1, limit, doCount);
              }
              key.clear();
              if (b.getKeyLen() > key.capacity())
              {
                key = ByteBuffer.allocate(b.getKeyLen());
              }
              key.flip();
              b.getKey(key);
              b.mergeIDSet(insertIDSet);
              b.mergeIDSet(deleteIDSet);
              insertIDSet.setKey(key);
              deleteIDSet.setKey(key);
            }
            else
            {
              Index index = (Index) idContainerMap.get(indexID);
              int limit = index.getIndexEntryLimit();
              boolean doCount = index.getMaintainCount();
              insertIDSet = new ImportIDSet(1, limit, doCount);
              deleteIDSet = new ImportIDSet(1, limit, doCount);
              b.mergeIDSet(insertIDSet);
              b.mergeIDSet(deleteIDSet);
            }
            key = ByteBuffer.allocate(b.getKeyLen());
            key.flip();
            b.getKey(key);
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
            insertIDSet.setKey(key);
            deleteIDSet.setKey(key);
            if (b.hasMoreData())
            {
              b.getNextRecord();
              bufferSet.add(b);
            }
          }
          else if (b.compare(key, indexID) != 0)
          if (key != null)
          {
            addToDB(insertIDSet, deleteIDSet, indexID);
            indexMgr.incrementKeyCount();
            indexID = b.getIndexID();
            if (indexMgr.isDN2ID())
            {
              insertIDSet = new ImportIDSet(1, 1, false);
              deleteIDSet = new ImportIDSet(1, 1, false);
            }
            else
            {
              Index index = (Index) idContainerMap.get(indexID);
              int limit = index.getIndexEntryLimit();
              boolean doCount = index.getMaintainCount();
              insertIDSet = new ImportIDSet(1, limit, doCount);
              deleteIDSet = new ImportIDSet(1, limit, doCount);
            }
            key.clear();
            if (b.getKeyLen() > key.capacity())
            {
              key = ByteBuffer.allocate(b.getKeyLen());
            }
            key.flip();
            b.getKey(key);
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
            insertIDSet.setKey(key);
            deleteIDSet.setKey(key);
          }
          else
          {
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
          }
          if(b.hasMoreData())
          {
            b.getNextRecord();
            bufferSet.add(b);
          }
        }
        if(key != null)
        {
          addToDB(insertIDSet, deleteIDSet, indexID);
        }
      }
      catch (Exception e)
      {
        message =
              ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(indexMgr.getFileName(),
                                                         e.getMessage());
        Message message = ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(
            indexMgr.getBufferFileName(), e.getMessage());
        logError(message);
        e.printStackTrace();
        throw e;
      }
      finally
      {
        cleanUP();
        endWriteTask();
      }
      return null;
    }
    private void cleanUP() throws DatabaseException, DirectoryException,
      IOException
    {
      try
      {
        if(indexMgr.isDN2ID())
        {
          for(DNState dnState : dnStateMap.values())
          {
            dnState.flush();
          }
          Message msg =
            NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getDNCount());
          logError(msg);
        }
        else
        {
          for(Index index : indexMap.values())
          {
            index.closeCursor();
          }
          Message message =
            NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr.getFileName());
          logError(message);
        }
      }
      finally
      {
        indexMgr.setDone();
        indexMgr.close();
        indexMgr.deleteIndexFile();
      }
    }
    private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet,
                         int indexID) throws InterruptedException,
            DatabaseException, DirectoryException
@@ -2064,7 +2226,15 @@
    }
      /**
    private void addBytesRead(int bytesRead)
    {
      this.bytesRead.addAndGet(bytesRead);
    }
    /**
     * This class is used to by a index DB merge thread performing DN processing
     * to keep track of the state of individual DN2ID index processing.
     */
@@ -2371,15 +2541,15 @@
    private final int DRAIN_TO = 3;
    private final IndexManager indexMgr;
    private final BlockingQueue<IndexOutputBuffer> queue;
    private final ByteArrayOutputStream insetByteStream =
    private final ByteArrayOutputStream insertByteStream =
            new ByteArrayOutputStream(2 * bufferSize);
    private final ByteArrayOutputStream deleteByteStream =
            new ByteArrayOutputStream(2 * bufferSize);
    private final DataOutputStream bufferStream;
    private final DataOutputStream bufferIndexStream;
    private final byte[] tmpArray = new byte[8];
    private int insertKeyCount = 0, deleteKeyCount = 0;
    private final DataOutputStream dataStream;
    private int bufferCount = 0;
    private final File file;
    private final SortedSet<IndexOutputBuffer> indexSortedSet;
    private boolean poisonSeen = false;
@@ -2388,13 +2558,14 @@
                             IndexManager indexMgr) throws FileNotFoundException
    {
      this.queue = queue;
      file = indexMgr.getFile();
      this.indexMgr = indexMgr;
      BufferedOutputStream bufferedStream =
              new BufferedOutputStream(new FileOutputStream(file),
                                       READER_WRITER_BUFFER_SIZE);
      dataStream = new DataOutputStream(bufferedStream);
      indexSortedSet = new TreeSet<IndexOutputBuffer>();
      this.bufferStream = new DataOutputStream(new BufferedOutputStream(
          new FileOutputStream(indexMgr.getBufferFile()),
          READER_WRITER_BUFFER_SIZE));
      this.bufferIndexStream = new DataOutputStream(new BufferedOutputStream(
          new FileOutputStream(indexMgr.getBufferIndexFile()),
          READER_WRITER_BUFFER_SIZE));
      this.indexSortedSet = new TreeSet<IndexOutputBuffer>();
    }
@@ -2442,9 +2613,14 @@
              }
            }
            offset += bufferLen;
            indexMgr.addBuffer(beginOffset, offset, bufferCount);
            // Write buffer index information.
            bufferIndexStream.writeLong(beginOffset);
            bufferIndexStream.writeLong(offset);
            bufferCount++;
            Importer.this.bufferCount.incrementAndGet();
            if(poisonSeen)
            {
              break;
@@ -2454,17 +2630,17 @@
      }
      catch (IOException e)
      {
        Message message =
                ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(
                    file.getAbsolutePath(), e.getMessage());
        Message message = ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(indexMgr
            .getBufferFile().getAbsolutePath(), e.getMessage());
        logError(message);
        isPhaseOneCanceled = true;
        throw e;
      }
      finally
      {
        dataStream.close();
        indexMgr.setFileLength();
        bufferStream.close();
        bufferIndexStream.close();
        indexMgr.setBufferInfo(bufferCount, indexMgr.getBufferFile().length());
      }
      return null;
    }
@@ -2476,7 +2652,7 @@
      int numberKeys = indexBuffer.getNumberKeys();
      indexBuffer.setPosition(-1);
      long bufferLen = 0;
      insetByteStream.reset(); insertKeyCount = 0;
      insertByteStream.reset(); insertKeyCount = 0;
      deleteByteStream.reset(); deleteKeyCount = 0;
      for(int i = 0; i < numberKeys; i++)
      {
@@ -2485,7 +2661,7 @@
          indexBuffer.setPosition(i);
          if(indexBuffer.isInsert(i))
          {
            indexBuffer.writeID(insetByteStream, i);
            indexBuffer.writeID(insertByteStream, i);
            insertKeyCount++;
          }
          else
@@ -2499,14 +2675,14 @@
        {
          bufferLen += writeRecord(indexBuffer);
          indexBuffer.setPosition(i);
          insetByteStream.reset();insertKeyCount = 0;
          insertByteStream.reset();insertKeyCount = 0;
          deleteByteStream.reset();deleteKeyCount = 0;
        }
        if(indexBuffer.isInsert(i))
        {
          if(insertKeyCount++ <= indexMgr.getLimit())
          {
            indexBuffer.writeID(insetByteStream, i);
            indexBuffer.writeID(insertByteStream, i);
          }
        }
        else
@@ -2528,7 +2704,7 @@
    {
      long id = 0;
      long bufferLen = 0;
      insetByteStream.reset(); insertKeyCount = 0;
      insertByteStream.reset(); insertKeyCount = 0;
      deleteByteStream.reset(); deleteKeyCount = 0;
      for(IndexOutputBuffer b : buffers)
      {
@@ -2555,7 +2731,7 @@
          saveIndexID = b.getIndexID();
          if(b.isInsert(b.getPosition()))
          {
            b.writeID(insetByteStream, b.getPosition());
            b.writeID(insertByteStream, b.getPosition());
            insertKeyCount++;
          }
          else
@@ -2569,7 +2745,7 @@
          if(!b.compare(saveKey, saveIndexID))
          {
            bufferLen += writeRecord(saveKey, saveIndexID);
            insetByteStream.reset();
            insertByteStream.reset();
            deleteByteStream.reset();
            insertKeyCount = 0;
            deleteKeyCount = 0;
@@ -2577,7 +2753,7 @@
            saveIndexID =  b.getIndexID();
            if(b.isInsert(b.getPosition()))
            {
              b.writeID(insetByteStream, b.getPosition());
              b.writeID(insertByteStream, b.getPosition());
              insertKeyCount++;
            }
            else
@@ -2592,7 +2768,7 @@
            {
              if(insertKeyCount++ <= indexMgr.getLimit())
              {
                b.writeID(insetByteStream, b.getPosition());
                b.writeID(insertByteStream, b.getPosition());
              }
            }
            else
@@ -2621,23 +2797,23 @@
      if(insertKeyCount > indexMgr.getLimit())
      {
        insertKeyCount = 1;
        insetByteStream.reset();
        insertByteStream.reset();
        PackedInteger.writeInt(tmpArray, 0, -1);
        insetByteStream.write(tmpArray, 0, 1);
        insertByteStream.write(tmpArray, 0, 1);
      }
      int insertSize = PackedInteger.getWriteIntLength(insertKeyCount);
      PackedInteger.writeInt(tmpArray, 0, insertKeyCount);
      dataStream.write(tmpArray, 0, insertSize);
      if(insetByteStream.size() > 0)
      bufferStream.write(tmpArray, 0, insertSize);
      if(insertByteStream.size() > 0)
      {
        insetByteStream.writeTo(dataStream);
        insertByteStream.writeTo(bufferStream);
      }
      int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount);
      PackedInteger.writeInt(tmpArray, 0, deleteKeyCount);
      dataStream.write(tmpArray, 0, deleteSize);
      bufferStream.write(tmpArray, 0, deleteSize);
      if(deleteByteStream.size() > 0)
      {
        deleteByteStream.writeTo(dataStream);
        deleteByteStream.writeTo(bufferStream);
      }
      return insertSize + deleteSize;
    }
@@ -2645,10 +2821,10 @@
    private int writeHeader(int indexID, int keySize) throws IOException
    {
      dataStream.writeInt(indexID);
      bufferStream.writeInt(indexID);
      int packedSize = PackedInteger.getWriteIntLength(keySize);
      PackedInteger.writeInt(tmpArray, 0, keySize);
      dataStream.write(tmpArray, 0, packedSize);
      bufferStream.write(tmpArray, 0, packedSize);
      return packedSize;
    }
@@ -2657,9 +2833,9 @@
    {
      int keySize = b.getKeySize();
      int packedSize = writeHeader(b.getIndexID(), keySize);
      b.writeKey(dataStream);
      b.writeKey(bufferStream);
      packedSize += writeByteStreams();
      return (packedSize + keySize + insetByteStream.size() +
      return (packedSize + keySize + insertByteStream.size() +
              deleteByteStream.size() + 4);
    }
@@ -2667,9 +2843,9 @@
    private int writeRecord(byte[] k, int indexID) throws IOException
    {
      int packedSize = writeHeader(indexID, k.length);
      dataStream.write(k);
      bufferStream.write(k);
      packedSize += writeByteStreams();
      return (packedSize + k.length + insetByteStream.size() +
      return (packedSize + k.length + insertByteStream.size() +
              deleteByteStream.size() + 4);
    }
  }
@@ -2768,89 +2944,60 @@
   */
  final class IndexManager implements Comparable<IndexManager>
  {
    private static final int BUFFER_SIZE = 128;
    private final File file;
    private RandomAccessFile rFile = null;
    private long fileLength, bytesRead = 0;
    private boolean done = false, started = false;
    private final File bufferFile;
    private final String bufferFileName;
    private final File bufferIndexFile;
    private final String bufferIndexFileName;
    private long bufferFileSize;
    private long totalDNS;
    private AtomicInteger keyCount = new AtomicInteger(0);
    private final String fileName;
    private final boolean isDN;
    private final int limit;
    private long[] bufferIndexBegin = new long[BUFFER_SIZE];
    private long[] bufferIndexEnd   = new long[BUFFER_SIZE];
    private int[]  bufferIndexID    = new int[BUFFER_SIZE];
    private int    bufferIndexCount = 0;
    private int numberOfBuffers = 0;
    private volatile IndexDBWriteTask writer = null;
    private IndexManager(String fileName, boolean isDN, int limit)
    {
      file = new File(tempDir, fileName);
      this.fileName = fileName;
      this.bufferFileName = fileName;
      this.bufferIndexFileName = fileName + ".index";
      this.bufferFile = new File(tempDir, bufferFileName);
      this.bufferIndexFile = new File(tempDir, bufferIndexFileName);
      this.isDN = isDN;
      this.limit = limit;
    }
    private void openIndexFile() throws FileNotFoundException
    private void setIndexDBWriteTask(IndexDBWriteTask writer)
    {
      rFile = new RandomAccessFile(file, "r");
      this.writer = writer;
    }
    /**
     * Returns the file channel associated with this index manager.
     *
     * @return The file channel associated with this index manager.
     */
    FileChannel getChannel()
    private File getBufferFile()
    {
      return rFile.getChannel();
      return bufferFile;
    }
    private void addBuffer(long begin, long end, int id)
    private long getBufferFileSize()
    {
      int size = bufferIndexBegin.length;
      if (bufferIndexCount >= size)
      {
        size += BUFFER_SIZE;
        bufferIndexBegin = Arrays.copyOf(bufferIndexBegin, size);
        bufferIndexEnd = Arrays.copyOf(bufferIndexEnd, size);
        bufferIndexID = Arrays.copyOf(bufferIndexID, size);
      }
      bufferIndexBegin[bufferIndexCount] = begin;
      bufferIndexEnd[bufferIndexCount] = end;
      bufferIndexID[bufferIndexCount] = id;
      bufferIndexCount++;
      return bufferFileSize;
    }
    private File getFile()
    private File getBufferIndexFile()
    {
      return file;
      return bufferIndexFile;
    }
    private boolean deleteIndexFile()
    private void setBufferInfo(int numberOfBuffers, long bufferFileSize)
    {
      return file.delete();
    }
    private void close() throws IOException
    {
      rFile.close();
    }
    private void setFileLength()
    {
      this.fileLength = file.length();
      this.numberOfBuffers = numberOfBuffers;
      this.bufferFileSize = bufferFileSize;
    }
@@ -2863,25 +3010,16 @@
     */
    void addBytesRead(int bytesRead)
    {
      this.bytesRead += bytesRead;
    }
    private void setDone()
    {
      this.done = true;
    }
    private void setStarted()
    {
      started = true;
      if (writer != null)
      {
        writer.addBytesRead(bytesRead);
      }
    }
    private void addTotDNCount(int delta)
    {
      this.totalDNS += delta;
      totalDNS += delta;
    }
@@ -2899,30 +3037,21 @@
    private void printStats(long deltaTime)
    {
      if(!done && started)
      if (writer != null)
      {
        float rate = 1000f * keyCount.getAndSet(0) / deltaTime;
        Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(fileName,
                (fileLength - bytesRead), rate);
        logError(message);
        writer.printStats(deltaTime);
      }
    }
    private void incrementKeyCount()
    {
      keyCount.incrementAndGet();
    }
    /**
     * Returns the file name associated with this index manager.
     *
     * @return The file name associated with this index manager.
     */
    String getFileName()
    String getBufferFileName()
    {
      return fileName;
      return bufferFileName;
    }
@@ -2937,7 +3066,14 @@
     */
    public int compareTo(IndexManager mgr)
    {
      return bufferIndexCount - mgr.bufferIndexCount;
      return numberOfBuffers - mgr.numberOfBuffers;
    }
    private int getNumberOfBuffers()
    {
      return numberOfBuffers;
    }
  }
@@ -4130,19 +4266,21 @@
    private long latestCount;
      /**
    /**
     * Create a new import progress task.
     *
     * @param  latestCount The latest count of entries processed in phase one.
     * @param latestCount
     *          The latest count of entries processed in phase one.
     */
    public SecondPhaseProgressTask (long latestCount)
    public SecondPhaseProgressTask(long latestCount)
    {
      previousTime = System.currentTimeMillis();
      this.latestCount = latestCount;
      try
      {
        previousStats =
                rootContainer.getEnvironmentStats(new StatsConfig());
        previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
      }
      catch (DatabaseException e)
      {
opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexInputBuffer.java
@@ -50,12 +50,13 @@
public final class IndexInputBuffer implements Comparable<IndexInputBuffer>
{
  private final IndexManager indexMgr;
  private final FileChannel channel;
  private final long begin;
  private final long end;
  private final int id;
  private long offset;
  private ByteBuffer cache;
  private final ByteBuffer cache;
  private Integer indexID = null;
  private ByteBuffer keyBuf = ByteBuffer.allocate(128);
@@ -80,35 +81,30 @@
   *
   * @param indexMgr
   *          The index manager.
   * @param channel
   *          The index file channel.
   * @param begin
   *          The position of the start of the buffer in the scratch file.
   * @param end
   *          The position of the end of the buffer in the scratch file.
   * @param id
   *          The index ID.
   * @param cacheSize
   *          The cache size.
   * @throws IOException
   *           If an IO error occurred when priming the cache.
   */
  public IndexInputBuffer(IndexManager indexMgr, long begin, long end, int id)
  public IndexInputBuffer(IndexManager indexMgr, FileChannel channel,
      long begin, long end, int id, int cacheSize) throws IOException
  {
    this.indexMgr = indexMgr;
    this.channel = channel;
    this.begin = begin;
    this.end = end;
    this.offset = 0;
    this.id = id;
  }
    this.cache = ByteBuffer.allocate(Math.max(cacheSize - 384, 256));
  /**
   * Initializes this index input buffer using the provided cache size.
   *
   * @param cacheSize
   *          The cache size.
   * @throws IOException
   *           If an IO error occurred.
   */
  void initializeCache(long cacheSize) throws IOException
  {
    cache = ByteBuffer.allocate((int) Math.max(cacheSize - 256, 256));
    loadCache();
    cache.flip();
    keyBuf.flip();
@@ -118,8 +114,7 @@
  private void loadCache() throws IOException
  {
    FileChannel fileChannel = indexMgr.getChannel();
    fileChannel.position(begin + offset);
    channel.position(begin + offset);
    long leftToRead = end - (begin + offset);
    long bytesToRead;
    if (leftToRead < cache.remaining())
@@ -134,7 +129,7 @@
    int bytesRead = 0;
    while (bytesRead < bytesToRead)
    {
      bytesRead += fileChannel.read(cache);
      bytesRead += channel.read(cache);
    }
    offset += bytesRead;
    indexMgr.addBytesRead(bytesRead);
@@ -199,7 +194,7 @@
      catch (IOException ex)
      {
        Message message = ERR_JEB_IMPORT_BUFFER_IO_ERROR.get(indexMgr
            .getFileName());
            .getBufferFileName());
        logError(message);
        ex.printStackTrace();
        System.exit(1);