mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
30.19.2010 e4076b2991c9604907f6a2f5ba2d526d0072adf6
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -36,7 +36,6 @@
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1085,7 +1084,7 @@
      final int limit = Math.min(dbThreads, totList.size());
      for (int i = 0; i < limit; i++)
      {
        buffers += totList.get(i).bufferIndexCount;
        buffers += totList.get(i).numberOfBuffers;
      }
      readAheadSize = (int) (usableMemory / buffers);
@@ -1107,15 +1106,20 @@
      }
      else
      {
        // Not enough memory.
        final long minimumPhaseTwoBufferMemory = buffers
            * MIN_READ_AHEAD_CACHE_SIZE;
        Message message = ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
            minimumPhaseTwoBufferMemory + dbCacheSize);
        throw new InitializationException(message);
        // Not enough memory - will need to do batching for the biggest indexes.
        readAheadSize = MIN_READ_AHEAD_CACHE_SIZE;
        buffers = (int) (usableMemory / readAheadSize);
        Message message = WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO.get(usableMemory);
        logError(message);
        break;
      }
    }
    // Ensure that there are always two threads available for parallel
    // processing of smaller indexes.
    dbThreads = Math.max(2, dbThreads);
    Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(
        availableMemory, readAheadSize, buffers);
    logError(message);
@@ -1123,15 +1127,18 @@
    // Start indexing tasks.
    List<Future<Void>> futures = new LinkedList<Future<Void>>();
    ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
    Semaphore permits = new Semaphore(buffers);
    // Start DN processing first.
    for (IndexManager dnMgr : DNIndexMgrList)
    {
      futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, readAheadSize)));
      futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits,
          buffers, readAheadSize)));
    }
    for (IndexManager mgr : indexMgrList)
    {
      futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize)));
      futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers,
          readAheadSize)));
    }
    for (Future<Void> result : futures)
@@ -1814,54 +1821,241 @@
    private final DatabaseEntry dbKey, dbValue;
    private final int cacheSize;
    private final Map<Integer, DNState> dnStateMap =
                                               new HashMap<Integer, DNState>();
      new HashMap<Integer, DNState>();
    private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
    private final Semaphore permits;
    private final int maxPermits;
    private final AtomicLong bytesRead = new AtomicLong();
    private long lastBytesRead = 0;
    private final AtomicInteger keyCount = new AtomicInteger();
    private RandomAccessFile bufferFile = null;
    private DataInputStream bufferIndexFile = null;
    private int remainingBuffers;
    private volatile int totalBatches;
    private AtomicInteger batchNumber = new AtomicInteger();
    private int nextBufferID;
    private int ownedPermits;
    private volatile boolean isRunning = false;
    public IndexDBWriteTask(IndexManager indexMgr, int cacheSize)
    /**
     * Creates a new index DB writer.
     *
     * @param indexMgr
     *          The index manager.
     * @param permits
     *          The semaphore used for restricting the number of buffer
     *          allocations.
     * @param maxPermits
     *          The maximum number of buffers which can be allocated.
     * @param cacheSize
     *          The buffer cache size.
     */
    public IndexDBWriteTask(IndexManager indexMgr, Semaphore permits,
        int maxPermits, int cacheSize)
    {
      this.indexMgr = indexMgr;
      this.permits = permits;
      this.maxPermits = maxPermits;
      this.cacheSize = cacheSize;
      this.dbKey = new DatabaseEntry();
      this.dbValue = new DatabaseEntry();
      this.cacheSize = cacheSize;
    }
    private NavigableSet<IndexInputBuffer> initializeBuffers()
      throws IOException
    /**
     * Initializes this task.
     *
     * @throws IOException
     *           If an IO error occurred.
     */
    public void beginWriteTask() throws IOException
    {
      NavigableSet<IndexInputBuffer> bufferSet =
        new TreeSet<IndexInputBuffer>();
      for (int i = 0; i < indexMgr.bufferIndexCount; i++)
      bufferFile = new RandomAccessFile(indexMgr.getBufferFile(), "r");
      bufferIndexFile = new DataInputStream(new BufferedInputStream(
          new FileInputStream(indexMgr.getBufferIndexFile())));
      remainingBuffers = indexMgr.getNumberOfBuffers();
      totalBatches = (remainingBuffers / maxPermits) + 1;
      batchNumber.set(0);
      nextBufferID = 0;
      ownedPermits = 0;
      Message message = NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(
          indexMgr.getBufferFileName(), remainingBuffers, totalBatches);
      logError(message);
      indexMgr.setIndexDBWriteTask(this);
      isRunning = true;
    }
    /**
     * Returns the next batch of buffers to be processed, blocking until enough
     * buffer permits are available.
     *
     * @return The next batch of buffers, or {@code null} if there are no more
     *         buffers to be processed.
     * @throws Exception
     *           If an exception occurred.
     */
    public NavigableSet<IndexInputBuffer> getNextBufferBatch() throws Exception
    {
      // First release any previously acquired permits.
      if (ownedPermits > 0)
      {
        IndexInputBuffer b = new IndexInputBuffer(indexMgr,
            indexMgr.bufferIndexBegin[i], indexMgr.bufferIndexEnd[i],
            indexMgr.bufferIndexID[i]);
        b.initializeCache(cacheSize);
        bufferSet.add(b);
        permits.release(ownedPermits);
        ownedPermits = 0;
      }
      // GC arrays.
      indexMgr.bufferIndexBegin = null;
      indexMgr.bufferIndexEnd = null;
      indexMgr.bufferIndexID = null;
      // Block until we can either get enough permits for all buffers, or the
      // maximum number of permits.
      final int permitRequest = Math.min(remainingBuffers, maxPermits);
      if (permitRequest == 0)
      {
        // No more work to do.
        return null;
      }
      permits.acquire(permitRequest);
      return bufferSet;
      // Update counters.
      ownedPermits = permitRequest;
      remainingBuffers -= permitRequest;
      batchNumber.incrementAndGet();
      // Create all the index buffers for the next batch.
      final NavigableSet<IndexInputBuffer> buffers =
        new TreeSet<IndexInputBuffer>();
      for (int i = 0; i < permitRequest; i++)
      {
        final long bufferBegin = bufferIndexFile.readLong();
        final long bufferEnd = bufferIndexFile.readLong();
        final IndexInputBuffer b = new IndexInputBuffer(indexMgr,
            bufferFile.getChannel(), bufferBegin, bufferEnd, nextBufferID++,
            cacheSize);
        buffers.add(b);
      }
      return buffers;
    }
    /**
     * Finishes this task.
     *
     * @throws Exception
     *           If an exception occurred.
     */
    public void endWriteTask() throws Exception
    {
      isRunning = false;
      // First release any previously acquired permits.
      if (ownedPermits > 0)
      {
        permits.release(ownedPermits);
        ownedPermits = 0;
      }
      try
      {
        if (indexMgr.isDN2ID())
        {
          for (DNState dnState : dnStateMap.values())
          {
            dnState.flush();
          }
          Message msg = NOTE_JEB_IMPORT_LDIF_DN_CLOSE
              .get(indexMgr.getDNCount());
          logError(msg);
        }
        else
        {
          for (Index index : indexMap.values())
          {
            index.closeCursor();
          }
          Message message = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr
              .getBufferFileName());
          logError(message);
        }
      }
      finally
      {
        if (bufferFile != null)
        {
          try
          {
            bufferFile.close();
          }
          catch (IOException ignored)
          {
            // Ignore.
          }
        }
        if (bufferIndexFile != null)
        {
          try
          {
            bufferIndexFile.close();
          }
          catch (IOException ignored)
          {
            // Ignore.
          }
        }
        indexMgr.getBufferFile().delete();
        indexMgr.getBufferIndexFile().delete();
      }
    }
    /**
     * Print out progress stats.
     *
     * @param deltaTime
     *          The time since the last update.
     */
    public void printStats(long deltaTime)
    {
      if (isRunning)
      {
        final long bufferFileSize = indexMgr.getBufferFileSize();
        final long tmpBytesRead = bytesRead.get();
        final int currentBatch = batchNumber.get();
        final long bytesReadInterval = tmpBytesRead - lastBytesRead;
        final int bytesReadPercent = Math.round((100f * tmpBytesRead)
            / bufferFileSize);
        // Kilo and milli approximately cancel out.
        final long kiloBytesRate = bytesReadInterval / deltaTime;
        final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
        Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(
            indexMgr.getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
            kiloBytesRate, currentBatch, totalBatches);
        logError(message);
        lastBytesRead = tmpBytesRead;
      }
    }
    /**
     * {@inheritDoc}
     */
    public Void call() throws Exception
    {
      Thread.setDefaultUncaughtExceptionHandler(
             new DefaultExceptionHandler());
      indexMgr.setStarted();
      Message message =
              NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getFileName(),
                         indexMgr.bufferIndexCount);
      logError(message);
      Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler());
      ByteBuffer key = null;
      ImportIDSet insertIDSet = null;
@@ -1870,142 +2064,110 @@
      try
      {
        indexMgr.openIndexFile();
        NavigableSet<IndexInputBuffer> bufferSet = initializeBuffers();
        while (!bufferSet.isEmpty())
        {
          IndexInputBuffer b = bufferSet.pollFirst();
          if (key == null)
          {
            indexID = b.getIndexID();
        beginWriteTask();
            if (indexMgr.isDN2ID())
        NavigableSet<IndexInputBuffer> bufferSet;
        while ((bufferSet = getNextBufferBatch()) != null)
        {
          while (!bufferSet.isEmpty())
          {
            IndexInputBuffer b = bufferSet.pollFirst();
            if (key == null)
            {
              insertIDSet = new ImportIDSet(1, 1, false);
              deleteIDSet = new ImportIDSet(1, 1, false);
              indexID = b.getIndexID();
              if (indexMgr.isDN2ID())
              {
                insertIDSet = new ImportIDSet(1, 1, false);
                deleteIDSet = new ImportIDSet(1, 1, false);
              }
              else
              {
                Index index = (Index) idContainerMap.get(indexID);
                int limit = index.getIndexEntryLimit();
                boolean doCount = index.getMaintainCount();
                insertIDSet = new ImportIDSet(1, limit, doCount);
                deleteIDSet = new ImportIDSet(1, limit, doCount);
              }
              key = ByteBuffer.allocate(b.getKeyLen());
              key.flip();
              b.getKey(key);
              b.mergeIDSet(insertIDSet);
              b.mergeIDSet(deleteIDSet);
              insertIDSet.setKey(key);
              deleteIDSet.setKey(key);
            }
            else if (b.compare(key, indexID) != 0)
            {
              addToDB(insertIDSet, deleteIDSet, indexID);
              keyCount.incrementAndGet();
              indexID = b.getIndexID();
              if (indexMgr.isDN2ID())
              {
                insertIDSet = new ImportIDSet(1, 1, false);
                deleteIDSet = new ImportIDSet(1, 1, false);
              }
              else
              {
                Index index = (Index) idContainerMap.get(indexID);
                int limit = index.getIndexEntryLimit();
                boolean doCount = index.getMaintainCount();
                insertIDSet = new ImportIDSet(1, limit, doCount);
                deleteIDSet = new ImportIDSet(1, limit, doCount);
              }
              key.clear();
              if (b.getKeyLen() > key.capacity())
              {
                key = ByteBuffer.allocate(b.getKeyLen());
              }
              key.flip();
              b.getKey(key);
              b.mergeIDSet(insertIDSet);
              b.mergeIDSet(deleteIDSet);
              insertIDSet.setKey(key);
              deleteIDSet.setKey(key);
            }
            else
            {
              Index index = (Index) idContainerMap.get(indexID);
              int limit = index.getIndexEntryLimit();
              boolean doCount = index.getMaintainCount();
              insertIDSet = new ImportIDSet(1, limit, doCount);
              deleteIDSet = new ImportIDSet(1, limit, doCount);
              b.mergeIDSet(insertIDSet);
              b.mergeIDSet(deleteIDSet);
            }
            key = ByteBuffer.allocate(b.getKeyLen());
            key.flip();
            b.getKey(key);
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
            insertIDSet.setKey(key);
            deleteIDSet.setKey(key);
            if (b.hasMoreData())
            {
              b.getNextRecord();
              bufferSet.add(b);
            }
          }
          else if (b.compare(key, indexID) != 0)
          if (key != null)
          {
            addToDB(insertIDSet, deleteIDSet, indexID);
            indexMgr.incrementKeyCount();
            indexID = b.getIndexID();
            if (indexMgr.isDN2ID())
            {
              insertIDSet = new ImportIDSet(1, 1, false);
              deleteIDSet = new ImportIDSet(1, 1, false);
            }
            else
            {
              Index index = (Index) idContainerMap.get(indexID);
              int limit = index.getIndexEntryLimit();
              boolean doCount = index.getMaintainCount();
              insertIDSet = new ImportIDSet(1, limit, doCount);
              deleteIDSet = new ImportIDSet(1, limit, doCount);
            }
            key.clear();
            if (b.getKeyLen() > key.capacity())
            {
              key = ByteBuffer.allocate(b.getKeyLen());
            }
            key.flip();
            b.getKey(key);
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
            insertIDSet.setKey(key);
            deleteIDSet.setKey(key);
          }
          else
          {
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
          }
          if(b.hasMoreData())
          {
            b.getNextRecord();
            bufferSet.add(b);
          }
        }
        if(key != null)
        {
          addToDB(insertIDSet, deleteIDSet, indexID);
        }
      }
      catch (Exception e)
      {
        message =
              ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(indexMgr.getFileName(),
                                                         e.getMessage());
        Message message = ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(
            indexMgr.getBufferFileName(), e.getMessage());
        logError(message);
        e.printStackTrace();
        throw e;
      }
      finally
      {
        cleanUP();
        endWriteTask();
      }
      return null;
    }
    private void cleanUP() throws DatabaseException, DirectoryException,
      IOException
    {
      try
      {
        if(indexMgr.isDN2ID())
        {
          for(DNState dnState : dnStateMap.values())
          {
            dnState.flush();
          }
          Message msg =
            NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getDNCount());
          logError(msg);
        }
        else
        {
          for(Index index : indexMap.values())
          {
            index.closeCursor();
          }
          Message message =
            NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr.getFileName());
          logError(message);
        }
      }
      finally
      {
        indexMgr.setDone();
        indexMgr.close();
        indexMgr.deleteIndexFile();
      }
    }
    private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet,
                         int indexID) throws InterruptedException,
            DatabaseException, DirectoryException
@@ -2064,7 +2226,15 @@
    }
      /**
    private void addBytesRead(int bytesRead)
    {
      this.bytesRead.addAndGet(bytesRead);
    }
    /**
     * This class is used to by a index DB merge thread performing DN processing
     * to keep track of the state of individual DN2ID index processing.
     */
@@ -2371,15 +2541,15 @@
    private final int DRAIN_TO = 3;
    private final IndexManager indexMgr;
    private final BlockingQueue<IndexOutputBuffer> queue;
    private final ByteArrayOutputStream insetByteStream =
    private final ByteArrayOutputStream insertByteStream =
            new ByteArrayOutputStream(2 * bufferSize);
    private final ByteArrayOutputStream deleteByteStream =
            new ByteArrayOutputStream(2 * bufferSize);
    private final DataOutputStream bufferStream;
    private final DataOutputStream bufferIndexStream;
    private final byte[] tmpArray = new byte[8];
    private int insertKeyCount = 0, deleteKeyCount = 0;
    private final DataOutputStream dataStream;
    private int bufferCount = 0;
    private final File file;
    private final SortedSet<IndexOutputBuffer> indexSortedSet;
    private boolean poisonSeen = false;
@@ -2388,13 +2558,14 @@
                             IndexManager indexMgr) throws FileNotFoundException
    {
      this.queue = queue;
      file = indexMgr.getFile();
      this.indexMgr = indexMgr;
      BufferedOutputStream bufferedStream =
              new BufferedOutputStream(new FileOutputStream(file),
                                       READER_WRITER_BUFFER_SIZE);
      dataStream = new DataOutputStream(bufferedStream);
      indexSortedSet = new TreeSet<IndexOutputBuffer>();
      this.bufferStream = new DataOutputStream(new BufferedOutputStream(
          new FileOutputStream(indexMgr.getBufferFile()),
          READER_WRITER_BUFFER_SIZE));
      this.bufferIndexStream = new DataOutputStream(new BufferedOutputStream(
          new FileOutputStream(indexMgr.getBufferIndexFile()),
          READER_WRITER_BUFFER_SIZE));
      this.indexSortedSet = new TreeSet<IndexOutputBuffer>();
    }
@@ -2442,9 +2613,14 @@
              }
            }
            offset += bufferLen;
            indexMgr.addBuffer(beginOffset, offset, bufferCount);
            // Write buffer index information.
            bufferIndexStream.writeLong(beginOffset);
            bufferIndexStream.writeLong(offset);
            bufferCount++;
            Importer.this.bufferCount.incrementAndGet();
            if(poisonSeen)
            {
              break;
@@ -2454,17 +2630,17 @@
      }
      catch (IOException e)
      {
        Message message =
                ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(
                    file.getAbsolutePath(), e.getMessage());
        Message message = ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(indexMgr
            .getBufferFile().getAbsolutePath(), e.getMessage());
        logError(message);
        isPhaseOneCanceled = true;
        throw e;
      }
      finally
      {
        dataStream.close();
        indexMgr.setFileLength();
        bufferStream.close();
        bufferIndexStream.close();
        indexMgr.setBufferInfo(bufferCount, indexMgr.getBufferFile().length());
      }
      return null;
    }
@@ -2476,7 +2652,7 @@
      int numberKeys = indexBuffer.getNumberKeys();
      indexBuffer.setPosition(-1);
      long bufferLen = 0;
      insetByteStream.reset(); insertKeyCount = 0;
      insertByteStream.reset(); insertKeyCount = 0;
      deleteByteStream.reset(); deleteKeyCount = 0;
      for(int i = 0; i < numberKeys; i++)
      {
@@ -2485,7 +2661,7 @@
          indexBuffer.setPosition(i);
          if(indexBuffer.isInsert(i))
          {
            indexBuffer.writeID(insetByteStream, i);
            indexBuffer.writeID(insertByteStream, i);
            insertKeyCount++;
          }
          else
@@ -2499,14 +2675,14 @@
        {
          bufferLen += writeRecord(indexBuffer);
          indexBuffer.setPosition(i);
          insetByteStream.reset();insertKeyCount = 0;
          insertByteStream.reset();insertKeyCount = 0;
          deleteByteStream.reset();deleteKeyCount = 0;
        }
        if(indexBuffer.isInsert(i))
        {
          if(insertKeyCount++ <= indexMgr.getLimit())
          {
            indexBuffer.writeID(insetByteStream, i);
            indexBuffer.writeID(insertByteStream, i);
          }
        }
        else
@@ -2528,7 +2704,7 @@
    {
      long id = 0;
      long bufferLen = 0;
      insetByteStream.reset(); insertKeyCount = 0;
      insertByteStream.reset(); insertKeyCount = 0;
      deleteByteStream.reset(); deleteKeyCount = 0;
      for(IndexOutputBuffer b : buffers)
      {
@@ -2555,7 +2731,7 @@
          saveIndexID = b.getIndexID();
          if(b.isInsert(b.getPosition()))
          {
            b.writeID(insetByteStream, b.getPosition());
            b.writeID(insertByteStream, b.getPosition());
            insertKeyCount++;
          }
          else
@@ -2569,7 +2745,7 @@
          if(!b.compare(saveKey, saveIndexID))
          {
            bufferLen += writeRecord(saveKey, saveIndexID);
            insetByteStream.reset();
            insertByteStream.reset();
            deleteByteStream.reset();
            insertKeyCount = 0;
            deleteKeyCount = 0;
@@ -2577,7 +2753,7 @@
            saveIndexID =  b.getIndexID();
            if(b.isInsert(b.getPosition()))
            {
              b.writeID(insetByteStream, b.getPosition());
              b.writeID(insertByteStream, b.getPosition());
              insertKeyCount++;
            }
            else
@@ -2592,7 +2768,7 @@
            {
              if(insertKeyCount++ <= indexMgr.getLimit())
              {
                b.writeID(insetByteStream, b.getPosition());
                b.writeID(insertByteStream, b.getPosition());
              }
            }
            else
@@ -2621,23 +2797,23 @@
      if(insertKeyCount > indexMgr.getLimit())
      {
        insertKeyCount = 1;
        insetByteStream.reset();
        insertByteStream.reset();
        PackedInteger.writeInt(tmpArray, 0, -1);
        insetByteStream.write(tmpArray, 0, 1);
        insertByteStream.write(tmpArray, 0, 1);
      }
      int insertSize = PackedInteger.getWriteIntLength(insertKeyCount);
      PackedInteger.writeInt(tmpArray, 0, insertKeyCount);
      dataStream.write(tmpArray, 0, insertSize);
      if(insetByteStream.size() > 0)
      bufferStream.write(tmpArray, 0, insertSize);
      if(insertByteStream.size() > 0)
      {
        insetByteStream.writeTo(dataStream);
        insertByteStream.writeTo(bufferStream);
      }
      int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount);
      PackedInteger.writeInt(tmpArray, 0, deleteKeyCount);
      dataStream.write(tmpArray, 0, deleteSize);
      bufferStream.write(tmpArray, 0, deleteSize);
      if(deleteByteStream.size() > 0)
      {
        deleteByteStream.writeTo(dataStream);
        deleteByteStream.writeTo(bufferStream);
      }
      return insertSize + deleteSize;
    }
@@ -2645,10 +2821,10 @@
    private int writeHeader(int indexID, int keySize) throws IOException
    {
      dataStream.writeInt(indexID);
      bufferStream.writeInt(indexID);
      int packedSize = PackedInteger.getWriteIntLength(keySize);
      PackedInteger.writeInt(tmpArray, 0, keySize);
      dataStream.write(tmpArray, 0, packedSize);
      bufferStream.write(tmpArray, 0, packedSize);
      return packedSize;
    }
@@ -2657,9 +2833,9 @@
    {
      int keySize = b.getKeySize();
      int packedSize = writeHeader(b.getIndexID(), keySize);
      b.writeKey(dataStream);
      b.writeKey(bufferStream);
      packedSize += writeByteStreams();
      return (packedSize + keySize + insetByteStream.size() +
      return (packedSize + keySize + insertByteStream.size() +
              deleteByteStream.size() + 4);
    }
@@ -2667,9 +2843,9 @@
    private int writeRecord(byte[] k, int indexID) throws IOException
    {
      int packedSize = writeHeader(indexID, k.length);
      dataStream.write(k);
      bufferStream.write(k);
      packedSize += writeByteStreams();
      return (packedSize + k.length + insetByteStream.size() +
      return (packedSize + k.length + insertByteStream.size() +
              deleteByteStream.size() + 4);
    }
  }
@@ -2768,89 +2944,60 @@
   */
  final class IndexManager implements Comparable<IndexManager>
  {
    private static final int BUFFER_SIZE = 128;
    private final File file;
    private RandomAccessFile rFile = null;
    private long fileLength, bytesRead = 0;
    private boolean done = false, started = false;
    private final File bufferFile;
    private final String bufferFileName;
    private final File bufferIndexFile;
    private final String bufferIndexFileName;
    private long bufferFileSize;
    private long totalDNS;
    private AtomicInteger keyCount = new AtomicInteger(0);
    private final String fileName;
    private final boolean isDN;
    private final int limit;
    private long[] bufferIndexBegin = new long[BUFFER_SIZE];
    private long[] bufferIndexEnd   = new long[BUFFER_SIZE];
    private int[]  bufferIndexID    = new int[BUFFER_SIZE];
    private int    bufferIndexCount = 0;
    private int numberOfBuffers = 0;
    private volatile IndexDBWriteTask writer = null;
    private IndexManager(String fileName, boolean isDN, int limit)
    {
      file = new File(tempDir, fileName);
      this.fileName = fileName;
      this.bufferFileName = fileName;
      this.bufferIndexFileName = fileName + ".index";
      this.bufferFile = new File(tempDir, bufferFileName);
      this.bufferIndexFile = new File(tempDir, bufferIndexFileName);
      this.isDN = isDN;
      this.limit = limit;
    }
    private void openIndexFile() throws FileNotFoundException
    private void setIndexDBWriteTask(IndexDBWriteTask writer)
    {
      rFile = new RandomAccessFile(file, "r");
      this.writer = writer;
    }
    /**
     * Returns the file channel associated with this index manager.
     *
     * @return The file channel associated with this index manager.
     */
    FileChannel getChannel()
    private File getBufferFile()
    {
      return rFile.getChannel();
      return bufferFile;
    }
    private void addBuffer(long begin, long end, int id)
    private long getBufferFileSize()
    {
      int size = bufferIndexBegin.length;
      if (bufferIndexCount >= size)
      {
        size += BUFFER_SIZE;
        bufferIndexBegin = Arrays.copyOf(bufferIndexBegin, size);
        bufferIndexEnd = Arrays.copyOf(bufferIndexEnd, size);
        bufferIndexID = Arrays.copyOf(bufferIndexID, size);
      }
      bufferIndexBegin[bufferIndexCount] = begin;
      bufferIndexEnd[bufferIndexCount] = end;
      bufferIndexID[bufferIndexCount] = id;
      bufferIndexCount++;
      return bufferFileSize;
    }
    private File getFile()
    private File getBufferIndexFile()
    {
      return file;
      return bufferIndexFile;
    }
    private boolean deleteIndexFile()
    private void setBufferInfo(int numberOfBuffers, long bufferFileSize)
    {
      return file.delete();
    }
    private void close() throws IOException
    {
      rFile.close();
    }
    private void setFileLength()
    {
      this.fileLength = file.length();
      this.numberOfBuffers = numberOfBuffers;
      this.bufferFileSize = bufferFileSize;
    }
@@ -2863,25 +3010,16 @@
     */
    void addBytesRead(int bytesRead)
    {
      this.bytesRead += bytesRead;
    }
    private void setDone()
    {
      this.done = true;
    }
    private void setStarted()
    {
      started = true;
      if (writer != null)
      {
        writer.addBytesRead(bytesRead);
      }
    }
    private void addTotDNCount(int delta)
    {
      this.totalDNS += delta;
      totalDNS += delta;
    }
@@ -2899,30 +3037,21 @@
    private void printStats(long deltaTime)
    {
      if(!done && started)
      if (writer != null)
      {
        float rate = 1000f * keyCount.getAndSet(0) / deltaTime;
        Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(fileName,
                (fileLength - bytesRead), rate);
        logError(message);
        writer.printStats(deltaTime);
      }
    }
    private void incrementKeyCount()
    {
      keyCount.incrementAndGet();
    }
    /**
     * Returns the file name associated with this index manager.
     *
     * @return The file name associated with this index manager.
     */
    String getFileName()
    String getBufferFileName()
    {
      return fileName;
      return bufferFileName;
    }
@@ -2937,7 +3066,14 @@
     */
    public int compareTo(IndexManager mgr)
    {
      return bufferIndexCount - mgr.bufferIndexCount;
      return numberOfBuffers - mgr.numberOfBuffers;
    }
    private int getNumberOfBuffers()
    {
      return numberOfBuffers;
    }
  }
@@ -4130,19 +4266,21 @@
    private long latestCount;
      /**
    /**
     * Create a new import progress task.
     *
     * @param  latestCount The latest count of entries processed in phase one.
     * @param latestCount
     *          The latest count of entries processed in phase one.
     */
    public SecondPhaseProgressTask (long latestCount)
    public SecondPhaseProgressTask(long latestCount)
    {
      previousTime = System.currentTimeMillis();
      this.latestCount = latestCount;
      try
      {
        previousStats =
                rootContainer.getEnvironmentStats(new StatsConfig());
        previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
      }
      catch (DatabaseException e)
      {