From 7bdd54b23c2ba2f1facb885f312f6d55ce9549e6 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Jan 2015 13:32:42 +0000
Subject: [PATCH] Code cleanup in jeb importer: - extracted methods - renamed methods - cleaned up comments/javadocs

---
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java          |  578 ++++++++++++++++++-------------------
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java |  281 ++++++++----------
 2 files changed, 411 insertions(+), 448 deletions(-)

diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 6610e83..d569381 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -174,7 +174,7 @@
    * Map of index keys to index buffers. Used to allocate sorted index buffers
    * to a index writer thread.
    */
-  private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueMap =
+  private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap =
       new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
 
   /** Map of DB containers to index managers. Used to start phase 2. */
@@ -1009,16 +1009,18 @@
   private void phaseOne() throws InterruptedException, ExecutionException
   {
     initializeIndexBuffers();
-    FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
-    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
-    timerService.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+
+    final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+    scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
     scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
     bufferSortService = Executors.newFixedThreadPool(threadCount);
-    ExecutorService execService = Executors.newFixedThreadPool(threadCount);
-    List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
+    final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
+
+    final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
     tasks.add(new MigrateExistingTask());
     getAll(execService.invokeAll(tasks));
     tasks.clear();
+
     if (importConfiguration.appendToExistingData()
         && importConfiguration.replaceExistingEntries())
     {
@@ -1036,49 +1038,60 @@
     }
     getAll(execService.invokeAll(tasks));
     tasks.clear();
+
     tasks.add(new MigrateExcludedTask());
     getAll(execService.invokeAll(tasks));
+
     stopScratchFileWriters();
     getAll(scratchFileWriterFutures);
 
-    // Shutdown the executor services
-    timerService.shutdown();
-    timerService.awaitTermination(30, TimeUnit.SECONDS);
-    execService.shutdown();
-    execService.awaitTermination(30, TimeUnit.SECONDS);
-    bufferSortService.shutdown();
-    bufferSortService.awaitTermination(30, TimeUnit.SECONDS);
-    scratchFileWriterService.shutdown();
-    scratchFileWriterService.awaitTermination(30, TimeUnit.SECONDS);
+    shutdownAll(timerService, execService, bufferSortService, scratchFileWriterService);
 
     // Try to clear as much memory as possible.
-    scratchFileWriterList.clear();
-    scratchFileWriterFutures.clear();
-    indexKeyQueMap.clear();
-    freeBufferQueue.clear();
+    clearAll(scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
+    indexKeyQueueMap.clear();
+  }
+
+  private void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
+  {
+    timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+  }
+
+  private void shutdownAll(ExecutorService... executorServices) throws InterruptedException
+  {
+    for (ExecutorService executorService : executorServices)
+    {
+      executorService.shutdown();
+    }
+    for (ExecutorService executorService : executorServices)
+    {
+      executorService.awaitTermination(30, TimeUnit.SECONDS);
+    }
+  }
+
+  private void clearAll(Collection<?>... cols)
+  {
+    for (Collection<?> col : cols)
+    {
+      col.clear();
+    }
   }
 
   private void phaseTwo() throws InterruptedException, ExecutionException
   {
-    SecondPhaseProgressTask progress2Task =
-        new SecondPhaseProgressTask(reader.getEntriesRead());
-    ScheduledThreadPoolExecutor timerService =
-        new ScheduledThreadPoolExecutor(1);
-    timerService.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL,
-        TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
     try
     {
       processIndexFiles();
     }
     finally
     {
-      timerService.shutdown();
-      timerService.awaitTermination(30, TimeUnit.SECONDS);
+      shutdownAll(timerService);
     }
   }
 
-  private void processIndexFiles() throws InterruptedException,
-      ExecutionException
+  private void processIndexFiles() throws InterruptedException, ExecutionException
   {
     if (bufferCount.get() == 0)
     {
@@ -1096,15 +1109,15 @@
     int buffers;
     while (true)
     {
-      final List<IndexManager> totList = new ArrayList<IndexManager>(DNIndexMgrList);
-      totList.addAll(indexMgrList);
-      Collections.sort(totList, Collections.reverseOrder());
+      final List<IndexManager> allIndexMgrs = new ArrayList<IndexManager>(DNIndexMgrList);
+      allIndexMgrs.addAll(indexMgrList);
+      Collections.sort(allIndexMgrs, Collections.reverseOrder());
 
       buffers = 0;
-      final int limit = Math.min(dbThreads, totList.size());
+      final int limit = Math.min(dbThreads, allIndexMgrs.size());
       for (int i = 0; i < limit; i++)
       {
-        buffers += totList.get(i).numberOfBuffers;
+        buffers += allIndexMgrs.get(i).numberOfBuffers;
       }
 
       readAheadSize = (int) (usableMemory / buffers);
@@ -1135,7 +1148,7 @@
       }
     }
 
-    // Ensure that there are always two threads available for parallel
+    // Ensure that there are minimum two threads available for parallel
     // processing of smaller indexes.
     dbThreads = Math.max(2, dbThreads);
 
@@ -1147,17 +1160,19 @@
     Semaphore permits = new Semaphore(buffers);
 
     // Start DN processing first.
-    for (IndexManager dnMgr : DNIndexMgrList)
-    {
-      futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits, buffers, readAheadSize)));
-    }
-    for (IndexManager mgr : indexMgrList)
-    {
-      futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers, readAheadSize)));
-    }
+    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
+    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
     getAll(futures);
+    shutdownAll(dbService);
+  }
 
-    dbService.shutdown();
+  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService, Semaphore permits,
+      int buffers, int readAheadSize, List<Future<Void>> futures)
+  {
+    for (IndexManager indexMgr : indexMgrs)
+    {
+      futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, permits, buffers, readAheadSize)));
+    }
   }
 
   private <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
@@ -1170,16 +1185,14 @@
 
   private void stopScratchFileWriters()
   {
-    IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
+    final IndexOutputBuffer stopProcessing = IndexOutputBuffer.poison();
     for (ScratchFileWriterTask task : scratchFileWriterList)
     {
-      task.queue.add(indexBuffer);
+      task.queue.add(stopProcessing);
     }
   }
 
-  /**
-   * Task used to migrate excluded branch.
-   */
+  /** Task used to migrate excluded branch. */
   private final class MigrateExcludedTask extends ImportTask
   {
 
@@ -1244,9 +1257,7 @@
     }
   }
 
-  /**
-   * Task to migrate existing entries.
-   */
+  /** Task to migrate existing entries. */
   private final class MigrateExistingTask extends ImportTask
   {
 
@@ -1363,8 +1374,7 @@
         {
           if (importConfiguration.isCancelled() || isCanceled)
           {
-            IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
-            freeBufferQueue.add(indexBuffer);
+            freeBufferQueue.add(IndexOutputBuffer.poison());
             return null;
           }
           oldEntry = null;
@@ -1480,8 +1490,7 @@
         {
           if (importConfiguration.isCancelled() || isCanceled)
           {
-            IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
-            freeBufferQueue.add(indexBuffer);
+            freeBufferQueue.add(IndexOutputBuffer.poison());
             return null;
           }
           Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
@@ -1639,7 +1648,7 @@
         setIterator.remove();
         indexBuffer.setComparator(indexComparator);
         indexBuffer.setIndexKey(indexKey);
-        indexBuffer.setDiscard();
+        indexBuffer.discard();
         Future<Void> future = bufferSortService.submit(new SortTask(indexBuffer));
         future.get();
       }
@@ -1677,7 +1686,7 @@
       if (size > bufferSize)
       {
         indexBuffer = new IndexOutputBuffer(size);
-        indexBuffer.setDiscard();
+        indexBuffer.discard();
       }
       else
       {
@@ -2378,16 +2387,14 @@
     private final int DRAIN_TO = 3;
     private final IndexManager indexMgr;
     private final BlockingQueue<IndexOutputBuffer> queue;
-    private final ByteArrayOutputStream insertByteStream =
-        new ByteArrayOutputStream(2 * bufferSize);
-    private final ByteArrayOutputStream deleteByteStream =
-        new ByteArrayOutputStream(2 * bufferSize);
+    private final ByteArrayOutputStream insertByteStream = new ByteArrayOutputStream(2 * bufferSize);
+    private final ByteArrayOutputStream deleteByteStream = new ByteArrayOutputStream(2 * bufferSize);
     private final DataOutputStream bufferStream;
     private final DataOutputStream bufferIndexStream;
     private final byte[] tmpArray = new byte[8];
+    private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<IndexOutputBuffer>();
     private int insertKeyCount, deleteKeyCount;
     private int bufferCount;
-    private final SortedSet<IndexOutputBuffer> indexSortedSet;
     private boolean poisonSeen;
 
     public ScratchFileWriterTask(BlockingQueue<IndexOutputBuffer> queue,
@@ -2395,13 +2402,13 @@
     {
       this.queue = queue;
       this.indexMgr = indexMgr;
-      this.bufferStream =
-          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(
-              indexMgr.getBufferFile()), READER_WRITER_BUFFER_SIZE));
-      this.bufferIndexStream =
-          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(
-              indexMgr.getBufferIndexFile()), READER_WRITER_BUFFER_SIZE));
-      this.indexSortedSet = new TreeSet<IndexOutputBuffer>();
+      this.bufferStream = newDataOutputStream(indexMgr.getBufferFile());
+      this.bufferIndexStream = newDataOutputStream(indexMgr.getBufferIndexFile());
+    }
+
+    private DataOutputStream newDataOutputStream(File file) throws FileNotFoundException
+    {
+      return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), READER_WRITER_BUFFER_SIZE));
     }
 
     /** {@inheritDoc} */
@@ -2424,7 +2431,7 @@
             bufferLen = writeIndexBuffers(l);
             for (IndexOutputBuffer id : l)
             {
-              if (!id.isDiscard())
+              if (!id.isDiscarded())
               {
                 id.reset();
                 freeBufferQueue.add(id);
@@ -2439,7 +2446,7 @@
               break;
             }
             bufferLen = writeIndexBuffer(indexBuffer);
-            if (!indexBuffer.isDiscard())
+            if (!indexBuffer.isDiscarded())
             {
               indexBuffer.reset();
               freeBufferQueue.add(indexBuffer);
@@ -2475,54 +2482,28 @@
       return null;
     }
 
-    private long writeIndexBuffer(IndexOutputBuffer indexBuffer)
-        throws IOException
+    private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
     {
-      int numberKeys = indexBuffer.getNumberKeys();
       indexBuffer.setPosition(-1);
+      resetStreams();
+
       long bufferLen = 0;
-      insertByteStream.reset();
-      insertKeyCount = 0;
-      deleteByteStream.reset();
-      deleteKeyCount = 0;
+      final int numberKeys = indexBuffer.getNumberKeys();
       for (int i = 0; i < numberKeys; i++)
       {
         if (indexBuffer.getPosition() == -1)
         {
           indexBuffer.setPosition(i);
-          if (indexBuffer.isInsert(i))
-          {
-            indexBuffer.writeID(insertByteStream, i);
-            insertKeyCount++;
-          }
-          else
-          {
-            indexBuffer.writeID(deleteByteStream, i);
-            deleteKeyCount++;
-          }
+          insertOrDeleteKey(indexBuffer, i);
           continue;
         }
         if (!indexBuffer.compare(i))
         {
           bufferLen += writeRecord(indexBuffer);
           indexBuffer.setPosition(i);
-          insertByteStream.reset();
-          insertKeyCount = 0;
-          deleteByteStream.reset();
-          deleteKeyCount = 0;
+          resetStreams();
         }
-        if (indexBuffer.isInsert(i))
-        {
-          if (insertKeyCount++ <= indexMgr.getLimit())
-          {
-            indexBuffer.writeID(insertByteStream, i);
-          }
-        }
-        else
-        {
-          indexBuffer.writeID(deleteByteStream, i);
-          deleteKeyCount++;
-        }
+        insertOrDeleteKeyCheckEntryLimit(indexBuffer, i);
       }
       if (indexBuffer.getPosition() != -1)
       {
@@ -2531,15 +2512,12 @@
       return bufferLen;
     }
 
-    private long writeIndexBuffers(List<IndexOutputBuffer> buffers)
-        throws IOException
+    private long writeIndexBuffers(List<IndexOutputBuffer> buffers) throws IOException
     {
+      resetStreams();
+
       long id = 0;
       long bufferLen = 0;
-      insertByteStream.reset();
-      insertKeyCount = 0;
-      deleteByteStream.reset();
-      deleteKeyCount = 0;
       for (IndexOutputBuffer b : buffers)
       {
         if (b.isPoison())
@@ -2557,58 +2535,28 @@
       int saveIndexID = 0;
       while (!indexSortedSet.isEmpty())
       {
-        IndexOutputBuffer b = indexSortedSet.first();
-        indexSortedSet.remove(b);
+        final IndexOutputBuffer b = indexSortedSet.pollFirst();
         if (saveKey == null)
         {
           saveKey = b.getKey();
           saveIndexID = b.getIndexID();
-          if (b.isInsert(b.getPosition()))
-          {
-            b.writeID(insertByteStream, b.getPosition());
-            insertKeyCount++;
-          }
-          else
-          {
-            b.writeID(deleteByteStream, b.getPosition());
-            deleteKeyCount++;
-          }
+          insertOrDeleteKey(b, b.getPosition());
         }
         else if (!b.compare(saveKey, saveIndexID))
         {
           bufferLen += writeRecord(saveKey, saveIndexID);
-          insertByteStream.reset();
-          deleteByteStream.reset();
-          insertKeyCount = 0;
-          deleteKeyCount = 0;
+          resetStreams();
           saveKey = b.getKey();
           saveIndexID = b.getIndexID();
-          if (b.isInsert(b.getPosition()))
-          {
-            b.writeID(insertByteStream, b.getPosition());
-            insertKeyCount++;
-          }
-          else
-          {
-            b.writeID(deleteByteStream, b.getPosition());
-            deleteKeyCount++;
-          }
-        }
-        else if (b.isInsert(b.getPosition()))
-        {
-          if (insertKeyCount++ <= indexMgr.getLimit())
-          {
-            b.writeID(insertByteStream, b.getPosition());
-          }
+          insertOrDeleteKey(b, b.getPosition());
         }
         else
         {
-          b.writeID(deleteByteStream, b.getPosition());
-          deleteKeyCount++;
+          insertOrDeleteKeyCheckEntryLimit(b, b.getPosition());
         }
         if (b.hasMoreData())
         {
-          b.getNextRecord();
+          b.nextRecord();
           indexSortedSet.add(b);
         }
       }
@@ -2619,6 +2567,44 @@
       return bufferLen;
     }
 
+    private void resetStreams()
+    {
+      insertByteStream.reset();
+      insertKeyCount = 0;
+      deleteByteStream.reset();
+      deleteKeyCount = 0;
+    }
+
+    private void insertOrDeleteKey(IndexOutputBuffer indexBuffer, int i)
+    {
+      if (indexBuffer.isInsertRecord(i))
+      {
+        indexBuffer.writeID(insertByteStream, i);
+        insertKeyCount++;
+      }
+      else
+      {
+        indexBuffer.writeID(deleteByteStream, i);
+        deleteKeyCount++;
+      }
+    }
+
+    private void insertOrDeleteKeyCheckEntryLimit(IndexOutputBuffer indexBuffer, int i)
+    {
+      if (indexBuffer.isInsertRecord(i))
+      {
+        if (insertKeyCount++ <= indexMgr.getLimit())
+        {
+          indexBuffer.writeID(insertByteStream, i);
+        }
+      }
+      else
+      {
+        indexBuffer.writeID(deleteByteStream, i);
+        deleteKeyCount++;
+      }
+    }
+
     private int writeByteStreams() throws IOException
     {
       if (insertKeyCount > indexMgr.getLimit())
@@ -2699,12 +2685,11 @@
         return null;
       }
       indexBuffer.sort();
-      if (!indexKeyQueMap.containsKey(indexBuffer.getIndexKey()))
+      if (!indexKeyQueueMap.containsKey(indexBuffer.getIndexKey()))
       {
         createIndexWriterTask(indexBuffer.getIndexKey());
       }
-      BlockingQueue<IndexOutputBuffer> q = indexKeyQueMap.get(indexBuffer.getIndexKey());
-      q.add(indexBuffer);
+      indexKeyQueueMap.get(indexBuffer.getIndexKey()).add(indexBuffer);
       return null;
     }
 
@@ -2712,14 +2697,13 @@
     {
       synchronized (synObj)
       {
-        if (indexKeyQueMap.containsKey(indexKey))
+        if (indexKeyQueueMap.containsKey(indexKey))
         {
           return;
         }
-        boolean isDN = ImportIndexType.DN.equals(indexKey.getIndexType());
-        IndexManager indexMgr = new IndexManager(
-            indexKey.getName(), isDN, indexKey.getEntryLimit());
-        if (isDN)
+        boolean isDN2ID = ImportIndexType.DN.equals(indexKey.getIndexType());
+        IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN2ID, indexKey.getEntryLimit());
+        if (isDN2ID)
         {
           DNIndexMgrList.add(indexMgr);
         }
@@ -2727,44 +2711,44 @@
         {
           indexMgrList.add(indexMgr);
         }
-        BlockingQueue<IndexOutputBuffer> newQue =
+        BlockingQueue<IndexOutputBuffer> newQueue =
             new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount);
-        ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQue, indexMgr);
+        ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr);
         scratchFileWriterList.add(indexWriter);
         scratchFileWriterFutures.add(scratchFileWriterService.submit(indexWriter));
-        indexKeyQueMap.put(indexKey, newQue);
+        indexKeyQueueMap.put(indexKey, newQueue);
       }
     }
   }
 
   /**
-   * The index manager class has several functions: 1. It used to carry
-   * information about index processing created in phase one to phase two. 2. It
-   * collects statistics about phase two processing for each index. 3. It
-   * manages opening and closing the scratch index files.
+   * The index manager class has several functions:
+   * <ol>
+   * <li>It is used to carry information about index processing created in phase one to phase two</li>
+   * <li>It collects statistics about phase two processing for each index</li>
+   * <li>It manages opening and closing the scratch index files</li>
+   * </ol>
    */
   final class IndexManager implements Comparable<IndexManager>
   {
     private final File bufferFile;
     private final String bufferFileName;
     private final File bufferIndexFile;
-    private final String bufferIndexFileName;
-    private long bufferFileSize;
-    private long totalDNS;
-    private final boolean isDN;
+    private final boolean isDN2ID;
     private final int limit;
+
     private int numberOfBuffers;
+    private long bufferFileSize;
+    private long totalDNs;
     private volatile IndexDBWriteTask writer;
 
-    private IndexManager(String fileName, boolean isDN, int limit)
+    private IndexManager(String fileName, boolean isDN2ID, int limit)
     {
       this.bufferFileName = fileName;
-      this.bufferIndexFileName = fileName + ".index";
-
       this.bufferFile = new File(tempDir, bufferFileName);
-      this.bufferIndexFile = new File(tempDir, bufferIndexFileName);
+      this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
 
-      this.isDN = isDN;
+      this.isDN2ID = isDN2ID;
       this.limit = limit > 0 ? limit : Integer.MAX_VALUE;
     }
 
@@ -2810,17 +2794,17 @@
 
     private void addTotDNCount(int delta)
     {
-      totalDNS += delta;
+      totalDNs += delta;
     }
 
     private long getDNCount()
     {
-      return totalDNS;
+      return totalDNs;
     }
 
     private boolean isDN2ID()
     {
-      return isDN;
+      return isDN2ID;
     }
 
     private void printStats(long deltaTime)
@@ -3302,13 +3286,10 @@
         ExecutionException
     {
       initializeIndexBuffers();
-      RebuildFirstPhaseProgressTask progressTask = new RebuildFirstPhaseProgressTask();
-      Timer timer = new Timer();
-      timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
+      Timer timer = scheduleAtFixedRate(new RebuildFirstPhaseProgressTask());
       scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
       bufferSortService = Executors.newFixedThreadPool(threadCount);
-      ExecutorService rebuildIndexService =
-          Executors.newFixedThreadPool(threadCount);
+      ExecutorService rebuildIndexService = Executors.newFixedThreadPool(threadCount);
       List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
       for (int i = 0; i < threadCount; i++)
       {
@@ -3320,29 +3301,31 @@
       getAll(scratchFileWriterFutures);
 
       // Try to clear as much memory as possible.
-      rebuildIndexService.shutdown();
-      rebuildIndexService.awaitTermination(30, TimeUnit.SECONDS);
-      bufferSortService.shutdown();
-      bufferSortService.awaitTermination(30, TimeUnit.SECONDS);
-      scratchFileWriterService.shutdown();
-      scratchFileWriterService.awaitTermination(30, TimeUnit.SECONDS);
+      shutdownAll(rebuildIndexService, bufferSortService, scratchFileWriterService);
       timer.cancel();
 
-      tasks.clear();
-      results.clear();
-      scratchFileWriterList.clear();
-      scratchFileWriterFutures.clear();
-      indexKeyQueMap.clear();
-      freeBufferQueue.clear();
+      clearAll(tasks, results, scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
+      indexKeyQueueMap.clear();
     }
 
     private void phaseTwo() throws InterruptedException, ExecutionException
     {
-      SecondPhaseProgressTask progressTask = new SecondPhaseProgressTask(entriesProcessed.get());
-      Timer timer2 = new Timer();
-      timer2.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
-      processIndexFiles();
-      timer2.cancel();
+      final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
+      try
+      {
+        processIndexFiles();
+      }
+      finally
+      {
+        timer.cancel();
+      }
+    }
+
+    private Timer scheduleAtFixedRate(TimerTask task)
+    {
+      final Timer timer = new Timer();
+      timer.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL);
+      return timer;
     }
 
     private int getIndexCount() throws ConfigException, JebException,
@@ -3365,126 +3348,127 @@
     private int getRebuildListIndexCount(LocalDBBackendCfg cfg)
         throws JebException, ConfigException, InitializationException
     {
-      int indexCount = 0;
-      List<String> rebuildList = rebuildConfig.getRebuildList();
-      if (!rebuildList.isEmpty())
+      final List<String> rebuildList = rebuildConfig.getRebuildList();
+      if (rebuildList.isEmpty())
       {
-        for (String index : rebuildList)
+        return 0;
+      }
+
+      int indexCount = 0;
+      for (String index : rebuildList)
+      {
+        final String lowerName = index.toLowerCase();
+        if ("dn2id".equals(lowerName))
         {
-          String lowerName = index.toLowerCase();
-          if ("dn2id".equals(lowerName))
+          indexCount += 3;
+        }
+        else if ("dn2uri".equals(lowerName))
+        {
+          indexCount++;
+        }
+        else if (lowerName.startsWith("vlv."))
+        {
+          if (lowerName.length() < 5)
           {
-            indexCount += 3;
+            throw new JebException(ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName));
           }
-          else if ("dn2uri".equals(lowerName))
-          {
-            indexCount++;
-          }
-          else if (lowerName.startsWith("vlv."))
-          {
-            if (lowerName.length() < 5)
-            {
-              LocalizableMessage msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName);
-              throw new JebException(msg);
-            }
-            indexCount++;
-          }
-          else if ("id2subtree".equals(lowerName)
-              || "id2children".equals(lowerName))
+          indexCount++;
+        }
+        else if ("id2subtree".equals(lowerName)
+            || "id2children".equals(lowerName))
+        {
+          LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+          throw new InitializationException(msg);
+        }
+        else
+        {
+          String[] attrIndexParts = lowerName.split("\\.");
+          if (attrIndexParts.length <= 0 || attrIndexParts.length > 3)
           {
             LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
             throw new InitializationException(msg);
           }
-          else
+          AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]);
+          if (attrType == null)
           {
-            String[] attrIndexParts = lowerName.split("\\.");
-            if (attrIndexParts.length <= 0 || attrIndexParts.length > 3)
+            LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+            throw new InitializationException(msg);
+          }
+          if (attrIndexParts.length != 1)
+          {
+            String indexType = attrIndexParts[1];
+            if (attrIndexParts.length == 2)
             {
-              LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
-              throw new InitializationException(msg);
-            }
-            AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]);
-            if (attrType == null)
-            {
-              LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
-              throw new InitializationException(msg);
-            }
-            if (attrIndexParts.length != 1)
-            {
-              String indexType = attrIndexParts[1];
-              if (attrIndexParts.length == 2)
+              if ("presence".equals(indexType)
+                  || "equality".equals(indexType)
+                  || "substring".equals(indexType)
+                  || "ordering".equals(indexType)
+                  || "approximate".equals(indexType))
               {
-                if ("presence".equals(indexType)
-                    || "equality".equals(indexType)
-                    || "substring".equals(indexType)
-                    || "ordering".equals(indexType)
-                    || "approximate".equals(indexType))
-                {
-                  indexCount++;
-                }
-                else
-                {
-                  LocalizableMessage msg =
-                      ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
-                  throw new InitializationException(msg);
-                }
-              }
-              else
-              {
-                if (!findExtensibleMatchingRule(cfg, indexType + "." + attrIndexParts[2]))
-                {
-                  LocalizableMessage msg =
-                      ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
-                  throw new InitializationException(msg);
-                }
                 indexCount++;
               }
-            }
-            else
-            {
-              boolean found = false;
-              for (final String idx : cfg.listLocalDBIndexes())
-              {
-                if (!idx.equalsIgnoreCase(index))
-                {
-                  continue;
-                }
-                found = true;
-                LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
-                SortedSet<IndexType> indexType = indexCfg.getIndexType();
-                if (indexType.contains(EQUALITY)
-                    || indexType.contains(ORDERING)
-                    || indexType.contains(PRESENCE)
-                    || indexType.contains(SUBSTRING)
-                    || indexType.contains(APPROXIMATE))
-                {
-                  indexCount++;
-                }
-                if (indexType.contains(EXTENSIBLE))
-                {
-                  Set<String> extensibleRules = indexCfg.getIndexExtensibleMatchingRule();
-                  boolean shared = false;
-                  for (final String exRule : extensibleRules)
-                  {
-                    if (exRule.endsWith(".sub"))
-                    {
-                      indexCount++;
-                    }
-                    else if (!shared)
-                    {
-                      shared = true;
-                      indexCount++;
-                    }
-                  }
-                }
-              }
-              if (!found)
+              else
               {
                 LocalizableMessage msg =
                     ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
                 throw new InitializationException(msg);
               }
             }
+            else
+            {
+              if (!findExtensibleMatchingRule(cfg, indexType + "." + attrIndexParts[2]))
+              {
+                LocalizableMessage msg =
+                    ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+                throw new InitializationException(msg);
+              }
+              indexCount++;
+            }
+          }
+          else
+          {
+            boolean found = false;
+            for (final String idx : cfg.listLocalDBIndexes())
+            {
+              if (!idx.equalsIgnoreCase(index))
+              {
+                continue;
+              }
+              found = true;
+              LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
+              SortedSet<IndexType> indexType = indexCfg.getIndexType();
+              if (indexType.contains(EQUALITY)
+                  || indexType.contains(ORDERING)
+                  || indexType.contains(PRESENCE)
+                  || indexType.contains(SUBSTRING)
+                  || indexType.contains(APPROXIMATE))
+              {
+                indexCount++;
+              }
+              if (indexType.contains(EXTENSIBLE))
+              {
+                Set<String> extensibleRules = indexCfg.getIndexExtensibleMatchingRule();
+                boolean shared = false;
+                for (final String exRule : extensibleRules)
+                {
+                  if (exRule.endsWith(".sub"))
+                  {
+                    indexCount++;
+                  }
+                  else if (!shared)
+                  {
+                    shared = true;
+                    indexCount++;
+                  }
+                }
+              }
+            }
+            if (!found)
+            {
+              LocalizableMessage msg =
+                  ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+              throw new InitializationException(msg);
+            }
           }
         }
       }
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
index 4c251cb..e5fdcd8 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/IndexOutputBuffer.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2013 ForgeRock AS.
+ *      Portions Copyright 2013-2015 ForgeRock AS.
  */
 package org.opends.server.backends.jeb.importLDIF;
 
@@ -63,9 +63,7 @@
  */
 public final class IndexOutputBuffer implements Comparable<IndexOutputBuffer> {
 
-  /**
-   * Enumeration used when sorting a buffer.
-   */
+  /** Enumeration used when sorting a buffer. */
   private enum CompareOp {
     LT, GT, LE, GE, EQ
   }
@@ -84,12 +82,11 @@
 
   /** The size of a buffer. */
   private final int size;
-
   /** Byte array holding the actual buffer data. */
   private final byte buffer[];
 
   /**
-   * id is used to break a tie (keys equal) when the buffers are being merged
+   * Used to break a tie (keys equal) when the buffers are being merged
    * for writing to the index scratch file.
    */
   private long id;
@@ -97,25 +94,22 @@
   /** Temporary buffer used to store integer values. */
   private final byte[] intBytes = new byte[INT_SIZE];
 
-  /** keyOffset - offSet where next key is written. */
-  private int keyOffset = 0;
-  /** recordOffset- offSet where next value record is written. */
-  private int recordOffset = 0;
-  /** bytesLeft - amount of bytes left in the buffer. */
-  private int bytesLeft = 0;
-
-  /** keys - number of keys in the buffer. */
-  private int keys = 0;
-  /**
-   * position - used to iterate over the buffer when writing to a scratch file.
-   */
-  private int position = 0;
+  /** OffSet where next key is written. */
+  private int keyOffset;
+  /** OffSet where next value record is written. */
+  private int recordOffset;
+  /** Amount of bytes left in the buffer. */
+  private int bytesLeft;
+  /** Number of keys in the buffer. */
+  private int keys;
+  /** Used to iterate over the buffer when writing to a scratch file. */
+  private int position;
 
   /** The comparator to use sort the keys. */
   private ComparatorBuffer<byte[]> comparator;
 
   /**
-   * This is used to make sure that an instance of this class is put on the
+   * Used to make sure that an instance of this class is put on the
    * correct scratch file writer work queue for processing.
    */
   private Importer.IndexKey indexKey;
@@ -134,7 +128,7 @@
    * importer/rebuild index process is doing phase one cleanup and flushing
    * buffers not completed.
    */
-  private boolean discard = false;
+  private boolean discarded;
 
 
   /**
@@ -163,6 +157,15 @@
     indexKey = null;
   }
 
+  /**
+   * Creates a new poison buffer. Poison buffers are used to stop the processing of import tasks.
+   *
+   * @return a new poison buffer
+   */
+  public static IndexOutputBuffer poison()
+  {
+    return new IndexOutputBuffer(0);
+  }
 
   /**
    * Set the ID of a buffer to the specified value.
@@ -199,28 +202,25 @@
     return size == 0;
   }
 
-
   /**
-   * Determines of a buffer should be re-cycled.
+   * Determines if buffer should be re-cycled by calling {@link #reset()}.
    *
    * @return {@code true} if buffer should be recycled, or {@code false} if it
    *          should not.
    */
-  public boolean isDiscard()
+  public boolean isDiscarded()
   {
-    return discard;
+    return discarded;
   }
 
-
   /**
-   * Set the discard flag to {@code true}.
+   * Sets the discarded flag to {@code true}.
    */
-  public void setDiscard()
+  public void discard()
   {
-    discard = true;
+    discarded = true;
   }
 
-
   /**
    * Returns {@code true} if there is enough space available to write the
    * specified byte array in the buffer. It returns {@code false} otherwise.
@@ -367,7 +367,7 @@
    * @return {@code true} if the record is an insert record, or {@code false}
    *          if it is a delete record.
    */
-  public boolean isInsert(int index)
+  public boolean isInsertRecord(int index)
   {
     int recOffset = getIntegerValue(index * INT_SIZE);
     return buffer[recOffset] != DEL;
@@ -503,14 +503,8 @@
     offset += PackedInteger.getReadIntLength(buffer, offset);
     int keyLen = PackedInteger.readInt(buffer, offset);
     int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
-    if( comparator.compare(buffer, key, keyLen, b, b.length) == 0)
-    {
-      if(indexID == bIndexID)
-      {
-        return true;
-      }
-    }
-    return false;
+    return comparator.compare(buffer, key, keyLen, b, b.length) == 0
+        && indexID == bIndexID;
   }
 
 
@@ -527,46 +521,50 @@
   @Override
   public int compareTo(IndexOutputBuffer b)
   {
-    ByteBuffer keyBuf = b.getKeyBuf(b.position);
+    final ByteBuffer keyBuf = b.getKeyBuf(b.position);
     int offset = getIntegerValue(position * INT_SIZE);
     int indexID = getIntegerValue(offset + 1);
     offset += REC_OVERHEAD;
     offset += PackedInteger.getReadIntLength(buffer, offset);
     int keyLen = PackedInteger.readInt(buffer, offset);
     int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
-    int returnCode = comparator.compare(buffer, key, keyLen, keyBuf.array(),
-                                        keyBuf.limit());
-    if(returnCode == 0)
+
+    final int cmp = comparator.compare(buffer, key, keyLen, keyBuf.array(), keyBuf.limit());
+    if (cmp != 0)
     {
-      int bIndexID = b.getIndexID();
-      if(indexID == bIndexID)
-      {
-        long otherBufferID = b.getBufferID();
-        //This is tested in a tree set remove when a buffer is removed from
-        //the tree set.
-        if(this.id == otherBufferID)
-        {
-          returnCode = 0;
-        }
-        else if(this.id < otherBufferID)
-        {
-          returnCode = -1;
-        }
-        else
-        {
-          returnCode = 1;
-        }
-      }
-      else if(indexID < bIndexID)
-      {
-        returnCode = -1;
-      }
-      else
-      {
-        returnCode = 1;
-      }
+      return cmp;
     }
-    return returnCode;
+
+    final int bIndexID = b.getIndexID();
+    if (indexID == bIndexID)
+    {
+      // This is tested in a tree set remove when a buffer is removed from the tree set.
+      return compare(this.id, b.getBufferID());
+    }
+    else if (indexID < bIndexID)
+    {
+      return -1;
+    }
+    else
+    {
+      return 1;
+    }
+  }
+
+  private int compare(long l1, long l2)
+  {
+    if (l1 == l2)
+    {
+      return 0;
+    }
+    else if (l1 < l2)
+    {
+      return -1;
+    }
+    else
+    {
+      return 1;
+    }
   }
 
 
@@ -587,21 +585,18 @@
     dataStream.write(buffer, offSet, keyLen);
   }
 
-
-   /**
+  /**
    * Compare the byte array at the current position with the byte array at the
    * specified index.
    *
    * @param i The index pointing to the byte array to compare.
-   * @return {@code true} if the byte arrays are equal, or {@code false}
-    *                     otherwise.
+   * @return {@code true} if the byte arrays are equal, or {@code false} otherwise
    */
   public boolean compare(int i)
   {
-      return is(i, position, CompareOp.EQ);
+    return is(i, position, CompareOp.EQ);
   }
 
-
   /**
    * Return the current number of keys.
    *
@@ -612,7 +607,6 @@
     return keys;
   }
 
-
   /**
    * Return {@code true} if the buffer has more data to process, or
    * {@code false} otherwise. Used when iterating over the buffer writing the
@@ -622,20 +616,18 @@
    *         {@code false} otherwise.
    */
   public boolean hasMoreData()
-   {
-     return (position + 1) < keys;
-   }
-
+  {
+    return position + 1 < keys;
+  }
 
   /**
    * Advance the position pointer to the next record in the buffer. Used when
    * iterating over the buffer examining keys.
    */
-   public void getNextRecord()
-   {
-     position++;
-   }
-
+  public void nextRecord()
+  {
+    position++;
+  }
 
   private byte[] getIntBytes(int val)
   {
@@ -661,9 +653,9 @@
 
   private int med3(int a, int b, int c)
   {
-    return (is(a,b, CompareOp.LT) ?
+    return is(a, b, CompareOp.LT) ?
            (is(b,c,CompareOp.LT) ? b : is(a,c,CompareOp.LT) ? c : a) :
-           (is(b,c,CompareOp.GT) ? b :is(a,c,CompareOp.GT) ? c : a));
+           (is(b,c,CompareOp.GT) ? b : is(a,c,CompareOp.GT) ? c : a);
   }
 
 
@@ -671,8 +663,12 @@
   {
     if (len < 7) {
       for (int i=off; i<len+off; i++)
+      {
         for (int j=i; j>off && is(j-1, j, CompareOp.GT); j--)
+        {
           swap(j, j-1);
+        }
+      }
       return;
     }
 
@@ -695,20 +691,26 @@
     int a = off, b = a, c = off + len - 1, d = c;
     while(true)
     {
-      while ((b <= c) && is(b, mKey, CompareOp.LE, mIndexID))
+      while (b <= c && is(b, mKey, CompareOp.LE, mIndexID))
       {
         if (is(b, mKey, CompareOp.EQ, mIndexID))
+        {
           swap(a++, b);
+        }
         b++;
       }
       while (c >= b && is(c, mKey, CompareOp.GE, mIndexID))
       {
         if (is(c, mKey, CompareOp.EQ, mIndexID))
+        {
           swap(c, d--);
+        }
         c--;
       }
       if (b > c)
+      {
         break;
+      }
       swap(b++, c--);
     }
 
@@ -719,11 +721,17 @@
     s = Math.min(d-c,   n-d-1);
     vectorSwap(b, n-s, s);
 
+    s = b - a;
     // Recursively sort non-partition-elements
-    if ((s = b-a) > 1)
+    if (s > 1)
+    {
       sort(off, s);
-    if ((s = d-c) > 1)
+    }
+    s = d - c;
+    if (s > 1)
+    {
       sort(n-s, s);
+    }
   }
 
 
@@ -740,31 +748,28 @@
   private void vectorSwap(int a, int b, int n)
   {
     for (int i=0; i<n; i++, a++, b++)
+    {
       swap(a, b);
+    }
   }
 
 
   private boolean evaluateReturnCode(int rc, CompareOp op)
   {
-    boolean returnCode = false;
     switch(op) {
     case LT:
-      returnCode = rc < 0;
-      break;
+      return rc < 0;
     case GT:
-      returnCode = rc > 0;
-      break;
+      return rc > 0;
     case LE:
-      returnCode = rc <= 0;
-      break;
+      return rc <= 0;
     case GE:
-      returnCode = rc >= 0;
-      break;
+      return rc >= 0;
     case EQ:
-      returnCode = rc == 0;
-      break;
+      return rc == 0;
+    default:
+      return false;
     }
-    return returnCode;
   }
 
 
@@ -835,8 +840,7 @@
    * Implementation of ComparatorBuffer interface. Used to compare keys when
    * they are non-DN indexes.
    */
-  public static
-  class IndexComparator implements IndexOutputBuffer.ComparatorBuffer<byte[]>
+  public static class IndexComparator implements IndexOutputBuffer.ComparatorBuffer<byte[]>
   {
 
     /**
@@ -869,34 +873,9 @@
           return -1;
         }
       }
-      //The arrays are equal, make sure they are in the same index since
-      //multiple suffixes might have the same key.
-      if(length == otherLength)
-      {
-        if(indexID == otherIndexID)
-        {
-          return 0;
-        }
-        else if(indexID > otherIndexID)
-        {
-          return 1;
-        }
-        else
-        {
-          return -1;
-        }
-      }
-      if (length > otherLength)
-      {
-        return 1;
-      }
-      else
-      {
-        return -1;
-      }
+      return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID);
     }
 
-
     /**
      * Compare an offset in an byte array with the specified byte array,
      * using the DN compare algorithm.   The specified index ID is used in the
@@ -928,24 +907,20 @@
           return -1;
         }
       }
-      //The arrays are equal, make sure they are in the same index since
-      //multiple suffixes might have the same key.
-      if(length == otherLength)
+      return compareLengthThenIndexID(length, indexID, otherLength, otherIndexID);
+    }
+
+    /**
+     * The arrays are equal, make sure they are in the same index
+     * since multiple suffixes might have the same key.
+     */
+    private int compareLengthThenIndexID(int length, int indexID, int otherLength, int otherIndexID)
+    {
+      if (length == otherLength)
       {
-        if(indexID == otherIndexID)
-        {
-          return 0;
-        }
-        else if(indexID > otherIndexID)
-        {
-          return 1;
-        }
-        else
-        {
-          return -1;
-        }
+        return compare(indexID, otherIndexID);
       }
-      if (length > otherLength)
+      else if (length > otherLength)
       {
         return 1;
       }
@@ -955,7 +930,6 @@
       }
     }
 
-
     /**
      * Compare an offset in an byte array with the specified byte array,
      * using the DN compare algorithm.
@@ -984,11 +958,16 @@
           return -1;
         }
       }
-      if(length == otherLength)
+      return compare(length, otherLength);
+    }
+
+    private int compare(int i1, int i2)
+    {
+      if (i1 == i2)
       {
         return 0;
       }
-      if (length > otherLength)
+      else if (i1 > i2)
       {
         return 1;
       }

--
Gitblit v1.10.0