From e4076b2991c9604907f6a2f5ba2d526d0072adf6 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Fri, 30 Jul 2010 12:19:32 +0000
Subject: [PATCH] import-ldif: Change second phase import strategy in order to better handle large LDIF files with low memory. In first phase write buffer positions to index files instead of storing in memory and suffering OOME due to O(N) memory growth. In second phase, read buffer positions from index files and fall-back to batch import of indexes when the number of buffers for an index would cause OOME if they were all opened at once. Also, improve second phase progress statistics to report batch count, kb remaining/rate, and fix several race conditions in the statistics.

---
 opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java |  676 +++++++++++++++++++++++++++++++++----------------------
 1 files changed, 407 insertions(+), 269 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 48e0d8a..769652f 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -36,7 +36,6 @@
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1085,7 +1084,7 @@
       final int limit = Math.min(dbThreads, totList.size());
       for (int i = 0; i < limit; i++)
       {
-        buffers += totList.get(i).bufferIndexCount;
+        buffers += totList.get(i).numberOfBuffers;
       }
 
       readAheadSize = (int) (usableMemory / buffers);
@@ -1107,15 +1106,20 @@
       }
       else
       {
-        // Not enough memory.
-        final long minimumPhaseTwoBufferMemory = buffers
-            * MIN_READ_AHEAD_CACHE_SIZE;
-        Message message = ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
-            minimumPhaseTwoBufferMemory + dbCacheSize);
-        throw new InitializationException(message);
+        // Not enough memory - will need to do batching for the biggest indexes.
+        readAheadSize = MIN_READ_AHEAD_CACHE_SIZE;
+        buffers = (int) (usableMemory / readAheadSize);
+
+        Message message = WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO.get(usableMemory);
+        logError(message);
+        break;
       }
     }
 
+    // Ensure that there are always two threads available for parallel
+    // processing of smaller indexes.
+    dbThreads = Math.max(2, dbThreads);
+
     Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(
         availableMemory, readAheadSize, buffers);
     logError(message);
@@ -1123,15 +1127,18 @@
     // Start indexing tasks.
     List<Future<Void>> futures = new LinkedList<Future<Void>>();
     ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
+    Semaphore permits = new Semaphore(buffers);
 
     // Start DN processing first.
     for (IndexManager dnMgr : DNIndexMgrList)
     {
-      futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, readAheadSize)));
+      futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits,
+          buffers, readAheadSize)));
     }
     for (IndexManager mgr : indexMgrList)
     {
-      futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize)));
+      futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers,
+          readAheadSize)));
     }
 
     for (Future<Void> result : futures)
@@ -1814,54 +1821,241 @@
     private final DatabaseEntry dbKey, dbValue;
     private final int cacheSize;
     private final Map<Integer, DNState> dnStateMap =
-                                               new HashMap<Integer, DNState>();
+      new HashMap<Integer, DNState>();
     private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
+    private final Semaphore permits;
+    private final int maxPermits;
+    private final AtomicLong bytesRead = new AtomicLong();
+    private long lastBytesRead = 0;
+    private final AtomicInteger keyCount = new AtomicInteger();
+    private RandomAccessFile bufferFile = null;
+    private DataInputStream bufferIndexFile = null;
+    private int remainingBuffers;
+    private volatile int totalBatches;
+    private AtomicInteger batchNumber = new AtomicInteger();
+    private int nextBufferID;
+    private int ownedPermits;
+    private volatile boolean isRunning = false;
 
 
-    public IndexDBWriteTask(IndexManager indexMgr, int cacheSize)
+    /**
+     * Creates a new index DB writer.
+     *
+     * @param indexMgr
+     *          The index manager.
+     * @param permits
+     *          The semaphore used for restricting the number of buffer
+     *          allocations.
+     * @param maxPermits
+     *          The maximum number of buffers which can be allocated.
+     * @param cacheSize
+     *          The buffer cache size.
+     */
+    public IndexDBWriteTask(IndexManager indexMgr, Semaphore permits,
+        int maxPermits, int cacheSize)
     {
       this.indexMgr = indexMgr;
+      this.permits = permits;
+      this.maxPermits = maxPermits;
+      this.cacheSize = cacheSize;
+
       this.dbKey = new DatabaseEntry();
       this.dbValue = new DatabaseEntry();
-      this.cacheSize = cacheSize;
     }
 
 
-    private NavigableSet<IndexInputBuffer> initializeBuffers()
-      throws IOException
+
+    /**
+     * Initializes this task.
+     *
+     * @throws IOException
+     *           If an IO error occurred.
+     */
+    public void beginWriteTask() throws IOException
     {
-      NavigableSet<IndexInputBuffer> bufferSet =
-        new TreeSet<IndexInputBuffer>();
-      for (int i = 0; i < indexMgr.bufferIndexCount; i++)
+      bufferFile = new RandomAccessFile(indexMgr.getBufferFile(), "r");
+      bufferIndexFile = new DataInputStream(new BufferedInputStream(
+          new FileInputStream(indexMgr.getBufferIndexFile())));
+
+      remainingBuffers = indexMgr.getNumberOfBuffers();
+      totalBatches = (remainingBuffers / maxPermits) + 1;
+      batchNumber.set(0);
+      nextBufferID = 0;
+      ownedPermits = 0;
+
+      Message message = NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(
+          indexMgr.getBufferFileName(), remainingBuffers, totalBatches);
+      logError(message);
+
+      indexMgr.setIndexDBWriteTask(this);
+      isRunning = true;
+    }
+
+
+
+    /**
+     * Returns the next batch of buffers to be processed, blocking until enough
+     * buffer permits are available.
+     *
+     * @return The next batch of buffers, or {@code null} if there are no more
+     *         buffers to be processed.
+     * @throws Exception
+     *           If an exception occurred.
+     */
+    public NavigableSet<IndexInputBuffer> getNextBufferBatch() throws Exception
+    {
+      // First release any previously acquired permits.
+      if (ownedPermits > 0)
       {
-        IndexInputBuffer b = new IndexInputBuffer(indexMgr,
-            indexMgr.bufferIndexBegin[i], indexMgr.bufferIndexEnd[i],
-            indexMgr.bufferIndexID[i]);
-        b.initializeCache(cacheSize);
-        bufferSet.add(b);
+        permits.release(ownedPermits);
+        ownedPermits = 0;
       }
 
-      // GC arrays.
-      indexMgr.bufferIndexBegin = null;
-      indexMgr.bufferIndexEnd = null;
-      indexMgr.bufferIndexID = null;
+      // Block until we can either get enough permits for all buffers, or the
+      // maximum number of permits.
+      final int permitRequest = Math.min(remainingBuffers, maxPermits);
+      if (permitRequest == 0)
+      {
+        // No more work to do.
+        return null;
+      }
+      permits.acquire(permitRequest);
 
-      return bufferSet;
+      // Update counters.
+      ownedPermits = permitRequest;
+      remainingBuffers -= permitRequest;
+      batchNumber.incrementAndGet();
+
+      // Create all the index buffers for the next batch.
+      final NavigableSet<IndexInputBuffer> buffers =
+        new TreeSet<IndexInputBuffer>();
+      for (int i = 0; i < permitRequest; i++)
+      {
+        final long bufferBegin = bufferIndexFile.readLong();
+        final long bufferEnd = bufferIndexFile.readLong();
+        final IndexInputBuffer b = new IndexInputBuffer(indexMgr,
+            bufferFile.getChannel(), bufferBegin, bufferEnd, nextBufferID++,
+            cacheSize);
+        buffers.add(b);
+      }
+
+      return buffers;
     }
 
 
+
+    /**
+     * Finishes this task.
+     *
+     * @throws Exception
+     *           If an exception occurred.
+     */
+    public void endWriteTask() throws Exception
+    {
+      isRunning = false;
+
+      // First release any previously acquired permits.
+      if (ownedPermits > 0)
+      {
+        permits.release(ownedPermits);
+        ownedPermits = 0;
+      }
+
+      try
+      {
+        if (indexMgr.isDN2ID())
+        {
+          for (DNState dnState : dnStateMap.values())
+          {
+            dnState.flush();
+          }
+          Message msg = NOTE_JEB_IMPORT_LDIF_DN_CLOSE
+              .get(indexMgr.getDNCount());
+          logError(msg);
+        }
+        else
+        {
+          for (Index index : indexMap.values())
+          {
+            index.closeCursor();
+          }
+          Message message = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr
+              .getBufferFileName());
+          logError(message);
+        }
+      }
+      finally
+      {
+        if (bufferFile != null)
+        {
+          try
+          {
+            bufferFile.close();
+          }
+          catch (IOException ignored)
+          {
+            // Ignore.
+          }
+        }
+
+        if (bufferIndexFile != null)
+        {
+          try
+          {
+            bufferIndexFile.close();
+          }
+          catch (IOException ignored)
+          {
+            // Ignore.
+          }
+        }
+
+        indexMgr.getBufferFile().delete();
+        indexMgr.getBufferIndexFile().delete();
+      }
+    }
+
+
+
+    /**
+     * Print out progress stats.
+     *
+     * @param deltaTime
+     *          The time since the last update.
+     */
+    public void printStats(long deltaTime)
+    {
+      if (isRunning)
+      {
+        final long bufferFileSize = indexMgr.getBufferFileSize();
+        final long tmpBytesRead = bytesRead.get();
+        final int currentBatch = batchNumber.get();
+
+        final long bytesReadInterval = tmpBytesRead - lastBytesRead;
+        final int bytesReadPercent = Math.round((100f * tmpBytesRead)
+            / bufferFileSize);
+
+        // Kilo and milli approximately cancel out.
+        final long kiloBytesRate = bytesReadInterval / deltaTime;
+        final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
+
+        Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(
+            indexMgr.getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
+            kiloBytesRate, currentBatch, totalBatches);
+        logError(message);
+
+        lastBytesRead = tmpBytesRead;
+      }
+    }
+
+
+
     /**
      * {@inheritDoc}
      */
     public Void call() throws Exception
     {
-      Thread.setDefaultUncaughtExceptionHandler(
-             new DefaultExceptionHandler());
-      indexMgr.setStarted();
-      Message message =
-              NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getFileName(),
-                         indexMgr.bufferIndexCount);
-      logError(message);
+      Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler());
 
       ByteBuffer key = null;
       ImportIDSet insertIDSet = null;
@@ -1870,142 +2064,110 @@
 
       try
       {
-        indexMgr.openIndexFile();
-        NavigableSet<IndexInputBuffer> bufferSet = initializeBuffers();
-        while (!bufferSet.isEmpty())
-        {
-          IndexInputBuffer b = bufferSet.pollFirst();
-          if (key == null)
-          {
-            indexID = b.getIndexID();
+        beginWriteTask();
 
-            if (indexMgr.isDN2ID())
+        NavigableSet<IndexInputBuffer> bufferSet;
+        while ((bufferSet = getNextBufferBatch()) != null)
+        {
+          while (!bufferSet.isEmpty())
+          {
+            IndexInputBuffer b = bufferSet.pollFirst();
+            if (key == null)
             {
-              insertIDSet = new ImportIDSet(1, 1, false);
-              deleteIDSet = new ImportIDSet(1, 1, false);
+              indexID = b.getIndexID();
+
+              if (indexMgr.isDN2ID())
+              {
+                insertIDSet = new ImportIDSet(1, 1, false);
+                deleteIDSet = new ImportIDSet(1, 1, false);
+              }
+              else
+              {
+                Index index = (Index) idContainerMap.get(indexID);
+                int limit = index.getIndexEntryLimit();
+                boolean doCount = index.getMaintainCount();
+                insertIDSet = new ImportIDSet(1, limit, doCount);
+                deleteIDSet = new ImportIDSet(1, limit, doCount);
+              }
+
+              key = ByteBuffer.allocate(b.getKeyLen());
+              key.flip();
+              b.getKey(key);
+
+              b.mergeIDSet(insertIDSet);
+              b.mergeIDSet(deleteIDSet);
+              insertIDSet.setKey(key);
+              deleteIDSet.setKey(key);
+            }
+            else if (b.compare(key, indexID) != 0)
+            {
+              addToDB(insertIDSet, deleteIDSet, indexID);
+              keyCount.incrementAndGet();
+
+              indexID = b.getIndexID();
+
+              if (indexMgr.isDN2ID())
+              {
+                insertIDSet = new ImportIDSet(1, 1, false);
+                deleteIDSet = new ImportIDSet(1, 1, false);
+              }
+              else
+              {
+                Index index = (Index) idContainerMap.get(indexID);
+                int limit = index.getIndexEntryLimit();
+                boolean doCount = index.getMaintainCount();
+                insertIDSet = new ImportIDSet(1, limit, doCount);
+                deleteIDSet = new ImportIDSet(1, limit, doCount);
+              }
+
+              key.clear();
+              if (b.getKeyLen() > key.capacity())
+              {
+                key = ByteBuffer.allocate(b.getKeyLen());
+              }
+              key.flip();
+              b.getKey(key);
+
+              b.mergeIDSet(insertIDSet);
+              b.mergeIDSet(deleteIDSet);
+              insertIDSet.setKey(key);
+              deleteIDSet.setKey(key);
             }
             else
             {
-              Index index = (Index) idContainerMap.get(indexID);
-              int limit = index.getIndexEntryLimit();
-              boolean doCount = index.getMaintainCount();
-              insertIDSet = new ImportIDSet(1, limit, doCount);
-              deleteIDSet = new ImportIDSet(1, limit, doCount);
+              b.mergeIDSet(insertIDSet);
+              b.mergeIDSet(deleteIDSet);
             }
 
-            key = ByteBuffer.allocate(b.getKeyLen());
-            key.flip();
-            b.getKey(key);
-
-            b.mergeIDSet(insertIDSet);
-            b.mergeIDSet(deleteIDSet);
-            insertIDSet.setKey(key);
-            deleteIDSet.setKey(key);
+            if (b.hasMoreData())
+            {
+              b.getNextRecord();
+              bufferSet.add(b);
+            }
           }
-          else if (b.compare(key, indexID) != 0)
+
+          if (key != null)
           {
             addToDB(insertIDSet, deleteIDSet, indexID);
-            indexMgr.incrementKeyCount();
-
-            indexID = b.getIndexID();
-
-            if (indexMgr.isDN2ID())
-            {
-              insertIDSet = new ImportIDSet(1, 1, false);
-              deleteIDSet = new ImportIDSet(1, 1, false);
-            }
-            else
-            {
-              Index index = (Index) idContainerMap.get(indexID);
-              int limit = index.getIndexEntryLimit();
-              boolean doCount = index.getMaintainCount();
-              insertIDSet = new ImportIDSet(1, limit, doCount);
-              deleteIDSet = new ImportIDSet(1, limit, doCount);
-            }
-
-            key.clear();
-            if (b.getKeyLen() > key.capacity())
-            {
-              key = ByteBuffer.allocate(b.getKeyLen());
-            }
-            key.flip();
-            b.getKey(key);
-
-            b.mergeIDSet(insertIDSet);
-            b.mergeIDSet(deleteIDSet);
-            insertIDSet.setKey(key);
-            deleteIDSet.setKey(key);
           }
-          else
-          {
-            b.mergeIDSet(insertIDSet);
-            b.mergeIDSet(deleteIDSet);
-          }
-
-          if(b.hasMoreData())
-          {
-            b.getNextRecord();
-            bufferSet.add(b);
-          }
-        }
-
-        if(key != null)
-        {
-          addToDB(insertIDSet, deleteIDSet, indexID);
         }
       }
       catch (Exception e)
       {
-        message =
-              ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(indexMgr.getFileName(),
-                                                         e.getMessage());
+        Message message = ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(
+            indexMgr.getBufferFileName(), e.getMessage());
         logError(message);
         e.printStackTrace();
         throw e;
       }
       finally
       {
-        cleanUP();
+        endWriteTask();
       }
       return null;
     }
 
 
-    private void cleanUP() throws DatabaseException, DirectoryException,
-      IOException
-    {
-      try
-      {
-        if(indexMgr.isDN2ID())
-        {
-          for(DNState dnState : dnStateMap.values())
-          {
-            dnState.flush();
-          }
-          Message msg =
-            NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getDNCount());
-          logError(msg);
-        }
-        else
-        {
-          for(Index index : indexMap.values())
-          {
-            index.closeCursor();
-          }
-          Message message =
-            NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr.getFileName());
-          logError(message);
-        }
-      }
-      finally
-      {
-        indexMgr.setDone();
-        indexMgr.close();
-        indexMgr.deleteIndexFile();
-      }
-    }
-
-
     private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet,
                          int indexID) throws InterruptedException,
             DatabaseException, DirectoryException
@@ -2064,7 +2226,15 @@
     }
 
 
-      /**
+
+    private void addBytesRead(int bytesRead)
+    {
+      this.bytesRead.addAndGet(bytesRead);
+    }
+
+
+
+    /**
      * This class is used to by a index DB merge thread performing DN processing
      * to keep track of the state of individual DN2ID index processing.
      */
@@ -2371,15 +2541,15 @@
     private final int DRAIN_TO = 3;
     private final IndexManager indexMgr;
     private final BlockingQueue<IndexOutputBuffer> queue;
-    private final ByteArrayOutputStream insetByteStream =
+    private final ByteArrayOutputStream insertByteStream =
             new ByteArrayOutputStream(2 * bufferSize);
     private final ByteArrayOutputStream deleteByteStream =
             new ByteArrayOutputStream(2 * bufferSize);
+    private final DataOutputStream bufferStream;
+    private final DataOutputStream bufferIndexStream;
     private final byte[] tmpArray = new byte[8];
     private int insertKeyCount = 0, deleteKeyCount = 0;
-    private final DataOutputStream dataStream;
     private int bufferCount = 0;
-    private final File file;
     private final SortedSet<IndexOutputBuffer> indexSortedSet;
     private boolean poisonSeen = false;
 
@@ -2388,13 +2558,14 @@
                              IndexManager indexMgr) throws FileNotFoundException
     {
       this.queue = queue;
-      file = indexMgr.getFile();
       this.indexMgr = indexMgr;
-      BufferedOutputStream bufferedStream =
-              new BufferedOutputStream(new FileOutputStream(file),
-                                       READER_WRITER_BUFFER_SIZE);
-      dataStream = new DataOutputStream(bufferedStream);
-      indexSortedSet = new TreeSet<IndexOutputBuffer>();
+      this.bufferStream = new DataOutputStream(new BufferedOutputStream(
+          new FileOutputStream(indexMgr.getBufferFile()),
+          READER_WRITER_BUFFER_SIZE));
+      this.bufferIndexStream = new DataOutputStream(new BufferedOutputStream(
+          new FileOutputStream(indexMgr.getBufferIndexFile()),
+          READER_WRITER_BUFFER_SIZE));
+      this.indexSortedSet = new TreeSet<IndexOutputBuffer>();
     }
 
 
@@ -2442,9 +2613,14 @@
               }
             }
             offset += bufferLen;
-            indexMgr.addBuffer(beginOffset, offset, bufferCount);
+
+            // Write buffer index information.
+            bufferIndexStream.writeLong(beginOffset);
+            bufferIndexStream.writeLong(offset);
+
             bufferCount++;
             Importer.this.bufferCount.incrementAndGet();
+
             if(poisonSeen)
             {
               break;
@@ -2454,17 +2630,17 @@
       }
       catch (IOException e)
       {
-        Message message =
-                ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(
-                    file.getAbsolutePath(), e.getMessage());
+        Message message = ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(indexMgr
+            .getBufferFile().getAbsolutePath(), e.getMessage());
         logError(message);
         isPhaseOneCanceled = true;
         throw e;
       }
       finally
       {
-        dataStream.close();
-        indexMgr.setFileLength();
+        bufferStream.close();
+        bufferIndexStream.close();
+        indexMgr.setBufferInfo(bufferCount, indexMgr.getBufferFile().length());
       }
       return null;
     }
@@ -2476,7 +2652,7 @@
       int numberKeys = indexBuffer.getNumberKeys();
       indexBuffer.setPosition(-1);
       long bufferLen = 0;
-      insetByteStream.reset(); insertKeyCount = 0;
+      insertByteStream.reset(); insertKeyCount = 0;
       deleteByteStream.reset(); deleteKeyCount = 0;
       for(int i = 0; i < numberKeys; i++)
       {
@@ -2485,7 +2661,7 @@
           indexBuffer.setPosition(i);
           if(indexBuffer.isInsert(i))
           {
-            indexBuffer.writeID(insetByteStream, i);
+            indexBuffer.writeID(insertByteStream, i);
             insertKeyCount++;
           }
           else
@@ -2499,14 +2675,14 @@
         {
           bufferLen += writeRecord(indexBuffer);
           indexBuffer.setPosition(i);
-          insetByteStream.reset();insertKeyCount = 0;
+          insertByteStream.reset();insertKeyCount = 0;
           deleteByteStream.reset();deleteKeyCount = 0;
         }
         if(indexBuffer.isInsert(i))
         {
           if(insertKeyCount++ <= indexMgr.getLimit())
           {
-            indexBuffer.writeID(insetByteStream, i);
+            indexBuffer.writeID(insertByteStream, i);
           }
         }
         else
@@ -2528,7 +2704,7 @@
     {
       long id = 0;
       long bufferLen = 0;
-      insetByteStream.reset(); insertKeyCount = 0;
+      insertByteStream.reset(); insertKeyCount = 0;
       deleteByteStream.reset(); deleteKeyCount = 0;
       for(IndexOutputBuffer b : buffers)
       {
@@ -2555,7 +2731,7 @@
           saveIndexID = b.getIndexID();
           if(b.isInsert(b.getPosition()))
           {
-            b.writeID(insetByteStream, b.getPosition());
+            b.writeID(insertByteStream, b.getPosition());
             insertKeyCount++;
           }
           else
@@ -2569,7 +2745,7 @@
           if(!b.compare(saveKey, saveIndexID))
           {
             bufferLen += writeRecord(saveKey, saveIndexID);
-            insetByteStream.reset();
+            insertByteStream.reset();
             deleteByteStream.reset();
             insertKeyCount = 0;
             deleteKeyCount = 0;
@@ -2577,7 +2753,7 @@
             saveIndexID =  b.getIndexID();
             if(b.isInsert(b.getPosition()))
             {
-              b.writeID(insetByteStream, b.getPosition());
+              b.writeID(insertByteStream, b.getPosition());
               insertKeyCount++;
             }
             else
@@ -2592,7 +2768,7 @@
             {
               if(insertKeyCount++ <= indexMgr.getLimit())
               {
-                b.writeID(insetByteStream, b.getPosition());
+                b.writeID(insertByteStream, b.getPosition());
               }
             }
             else
@@ -2621,23 +2797,23 @@
       if(insertKeyCount > indexMgr.getLimit())
       {
         insertKeyCount = 1;
-        insetByteStream.reset();
+        insertByteStream.reset();
         PackedInteger.writeInt(tmpArray, 0, -1);
-        insetByteStream.write(tmpArray, 0, 1);
+        insertByteStream.write(tmpArray, 0, 1);
       }
       int insertSize = PackedInteger.getWriteIntLength(insertKeyCount);
       PackedInteger.writeInt(tmpArray, 0, insertKeyCount);
-      dataStream.write(tmpArray, 0, insertSize);
-      if(insetByteStream.size() > 0)
+      bufferStream.write(tmpArray, 0, insertSize);
+      if(insertByteStream.size() > 0)
       {
-        insetByteStream.writeTo(dataStream);
+        insertByteStream.writeTo(bufferStream);
       }
       int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount);
       PackedInteger.writeInt(tmpArray, 0, deleteKeyCount);
-      dataStream.write(tmpArray, 0, deleteSize);
+      bufferStream.write(tmpArray, 0, deleteSize);
       if(deleteByteStream.size() > 0)
       {
-        deleteByteStream.writeTo(dataStream);
+        deleteByteStream.writeTo(bufferStream);
       }
       return insertSize + deleteSize;
     }
@@ -2645,10 +2821,10 @@
 
     private int writeHeader(int indexID, int keySize) throws IOException
     {
-      dataStream.writeInt(indexID);
+      bufferStream.writeInt(indexID);
       int packedSize = PackedInteger.getWriteIntLength(keySize);
       PackedInteger.writeInt(tmpArray, 0, keySize);
-      dataStream.write(tmpArray, 0, packedSize);
+      bufferStream.write(tmpArray, 0, packedSize);
       return packedSize;
     }
 
@@ -2657,9 +2833,9 @@
     {
       int keySize = b.getKeySize();
       int packedSize = writeHeader(b.getIndexID(), keySize);
-      b.writeKey(dataStream);
+      b.writeKey(bufferStream);
       packedSize += writeByteStreams();
-      return (packedSize + keySize + insetByteStream.size() +
+      return (packedSize + keySize + insertByteStream.size() +
               deleteByteStream.size() + 4);
     }
 
@@ -2667,9 +2843,9 @@
     private int writeRecord(byte[] k, int indexID) throws IOException
     {
       int packedSize = writeHeader(indexID, k.length);
-      dataStream.write(k);
+      bufferStream.write(k);
       packedSize += writeByteStreams();
-      return (packedSize + k.length + insetByteStream.size() +
+      return (packedSize + k.length + insertByteStream.size() +
               deleteByteStream.size() + 4);
     }
   }
@@ -2768,89 +2944,60 @@
    */
   final class IndexManager implements Comparable<IndexManager>
   {
-    private static final int BUFFER_SIZE = 128;
-
-    private final File file;
-    private RandomAccessFile rFile = null;
-    private long fileLength, bytesRead = 0;
-    private boolean done = false, started = false;
+    private final File bufferFile;
+    private final String bufferFileName;
+    private final File bufferIndexFile;
+    private final String bufferIndexFileName;
+    private long bufferFileSize;
     private long totalDNS;
-    private AtomicInteger keyCount = new AtomicInteger(0);
-    private final String fileName;
     private final boolean isDN;
     private final int limit;
-
-    private long[] bufferIndexBegin = new long[BUFFER_SIZE];
-    private long[] bufferIndexEnd   = new long[BUFFER_SIZE];
-    private int[]  bufferIndexID    = new int[BUFFER_SIZE];
-    private int    bufferIndexCount = 0;
+    private int numberOfBuffers = 0;
+    private volatile IndexDBWriteTask writer = null;
 
     private IndexManager(String fileName, boolean isDN, int limit)
     {
-      file = new File(tempDir, fileName);
-      this.fileName = fileName;
+      this.bufferFileName = fileName;
+      this.bufferIndexFileName = fileName + ".index";
+
+      this.bufferFile = new File(tempDir, bufferFileName);
+      this.bufferIndexFile = new File(tempDir, bufferIndexFileName);
+
       this.isDN = isDN;
       this.limit = limit;
     }
 
 
-    private void openIndexFile() throws FileNotFoundException
+    private void setIndexDBWriteTask(IndexDBWriteTask writer)
     {
-      rFile = new RandomAccessFile(file, "r");
+      this.writer = writer;
     }
 
 
-    /**
-     * Returns the file channel associated with this index manager.
-     *
-     * @return The file channel associated with this index manager.
-     */
-    FileChannel getChannel()
+    private File getBufferFile()
     {
-      return rFile.getChannel();
+      return bufferFile;
     }
 
 
 
-    private void addBuffer(long begin, long end, int id)
+    private long getBufferFileSize()
     {
-      int size = bufferIndexBegin.length;
-      if (bufferIndexCount >= size)
-      {
-        size += BUFFER_SIZE;
-        bufferIndexBegin = Arrays.copyOf(bufferIndexBegin, size);
-        bufferIndexEnd = Arrays.copyOf(bufferIndexEnd, size);
-        bufferIndexID = Arrays.copyOf(bufferIndexID, size);
-      }
-      bufferIndexBegin[bufferIndexCount] = begin;
-      bufferIndexEnd[bufferIndexCount] = end;
-      bufferIndexID[bufferIndexCount] = id;
-      bufferIndexCount++;
+      return bufferFileSize;
     }
 
 
 
-    private File getFile()
+    private File getBufferIndexFile()
     {
-      return file;
+      return bufferIndexFile;
     }
 
 
-    private boolean deleteIndexFile()
+    private void setBufferInfo(int numberOfBuffers, long bufferFileSize)
     {
-      return file.delete();
-    }
-
-
-    private void close() throws IOException
-    {
-      rFile.close();
-    }
-
-
-    private void setFileLength()
-    {
-      this.fileLength = file.length();
+      this.numberOfBuffers = numberOfBuffers;
+      this.bufferFileSize = bufferFileSize;
     }
 
 
@@ -2863,25 +3010,16 @@
      */
     void addBytesRead(int bytesRead)
     {
-      this.bytesRead += bytesRead;
-    }
-
-
-    private void setDone()
-    {
-      this.done = true;
-    }
-
-
-    private void setStarted()
-    {
-      started = true;
+      if (writer != null)
+      {
+        writer.addBytesRead(bytesRead);
+      }
     }
 
 
     private void addTotDNCount(int delta)
     {
-      this.totalDNS += delta;
+      totalDNS += delta;
     }
 
 
@@ -2899,30 +3037,21 @@
 
     private void printStats(long deltaTime)
     {
-      if(!done && started)
+      if (writer != null)
       {
-        float rate = 1000f * keyCount.getAndSet(0) / deltaTime;
-        Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(fileName,
-                (fileLength - bytesRead), rate);
-        logError(message);
+        writer.printStats(deltaTime);
       }
     }
 
 
-    private void incrementKeyCount()
-    {
-      keyCount.incrementAndGet();
-    }
-
-
     /**
      * Returns the file name associated with this index manager.
      *
      * @return The file name associated with this index manager.
      */
-    String getFileName()
+    String getBufferFileName()
     {
-      return fileName;
+      return bufferFileName;
     }
 
 
@@ -2937,7 +3066,14 @@
      */
     public int compareTo(IndexManager mgr)
     {
-      return bufferIndexCount - mgr.bufferIndexCount;
+      return numberOfBuffers - mgr.numberOfBuffers;
+    }
+
+
+
+    private int getNumberOfBuffers()
+    {
+      return numberOfBuffers;
     }
   }
 
@@ -4130,19 +4266,21 @@
 
     private long latestCount;
 
-      /**
+
+
+    /**
      * Create a new import progress task.
      *
-     * @param  latestCount The latest count of entries processed in phase one.
+     * @param latestCount
+     *          The latest count of entries processed in phase one.
      */
-    public SecondPhaseProgressTask (long latestCount)
+    public SecondPhaseProgressTask(long latestCount)
     {
       previousTime = System.currentTimeMillis();
       this.latestCount = latestCount;
       try
       {
-        previousStats =
-                rootContainer.getEnvironmentStats(new StatsConfig());
+        previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
       }
       catch (DatabaseException e)
       {

--
Gitblit v1.10.0