From 83325bf8cc3b480c00a01ee4c43391cd0238e041 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 12 Mar 2015 16:32:12 +0000
Subject: [PATCH] Simplified loops in import/rebuild-index code.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java |  138 +++++++++++++++++++--------------------------
 1 files changed, 58 insertions(+), 80 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 520cf1e..e685a1d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1309,7 +1309,8 @@
             final List<ByteString> includeBranches = includeBranchesAsBytes(suffix);
             boolean success = cursor.next();
             while (success
-                && !importConfiguration.isCancelled() && !isCanceled)
+                && !importConfiguration.isCancelled()
+                && !isCanceled)
             {
               final ByteString key = cursor.getKey();
               if (!includeBranches.contains(key))
@@ -1559,6 +1560,7 @@
     }
 
     /** Examine the DN for duplicates and missing parents. */
+    @SuppressWarnings("javadoc")
     boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
         throws StorageRuntimeException, InterruptedException
     {
@@ -1668,17 +1670,12 @@
     void flushIndexBuffers() throws InterruptedException, ExecutionException
     {
       final ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
-      Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> it = indexBufferMap.entrySet().iterator();
-      while (it.hasNext())
+      for (IndexOutputBuffer indexBuffer : indexBufferMap.values())
       {
-        Map.Entry<IndexKey, IndexOutputBuffer> e = it.next();
-        IndexKey indexKey = e.getKey();
-        IndexOutputBuffer indexBuffer = e.getValue();
-        it.remove();
-        indexBuffer.setIndexKey(indexKey);
         indexBuffer.discard();
         futures.add(bufferSortService.submit(new SortTask(indexBuffer)));
       }
+      indexBufferMap.clear();
       getAll(futures);
     }
 
@@ -1689,16 +1686,15 @@
       IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
       if (indexBuffer == null)
       {
-        indexBuffer = getNewIndexBuffer(sizeNeeded);
+        indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
         indexBufferMap.put(indexKey, indexBuffer);
       }
       else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
       {
         // complete the current buffer...
-        indexBuffer.setIndexKey(indexKey);
         bufferSortService.submit(new SortTask(indexBuffer));
         // ... and get a new one
-        indexBuffer = getNewIndexBuffer(sizeNeeded);
+        indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
         indexBufferMap.put(indexKey, indexBuffer);
       }
       int indexID = getIndexID(container);
@@ -1706,7 +1702,7 @@
       return indexID;
     }
 
-    IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException
+    IndexOutputBuffer getNewIndexBuffer(int size, IndexKey indexKey) throws InterruptedException
     {
       IndexOutputBuffer indexBuffer;
       if (size > bufferSize)
@@ -1726,6 +1722,7 @@
       {
         throw new InterruptedException("Cancel processing received.");
       }
+      indexBuffer.setIndexKey(indexKey);
       return indexBuffer;
     }
 
@@ -1951,15 +1948,15 @@
     @Override
     public Void call() throws Exception
     {
-      ByteStringBuilder key = null;
-      ImportIDSet insertIDSet = null;
-      ImportIDSet deleteIDSet = null;
-
       if (isCanceled)
       {
         return null;
       }
 
+      final ByteStringBuilder key = new ByteStringBuilder(BYTE_BUFFER_CAPACITY);
+      ImportIDSet insertIDSet = null;
+      ImportIDSet deleteIDSet = null;
+      Integer indexID = null;
       try
       {
         beginWriteTask();
@@ -1972,25 +1969,18 @@
             return null;
           }
 
-          Integer indexID = null;
           while (!bufferSet.isEmpty())
           {
             IndexInputBuffer b = bufferSet.pollFirst();
-            if (key == null)
+            if (!b.sameKeyAndIndexID(key, indexID))
             {
-              key = new ByteStringBuilder(b.getKeyLen());
+              if (indexID != null)
+              {
+                // save the previous record
+                addToDB(indexID, insertIDSet, deleteIDSet);
+              }
 
-              indexID = b.getIndexID();
-              b.fetchKey(key);
-
-              insertIDSet = newImportIDSet(key, indexID);
-              deleteIDSet = newImportIDSet(key, indexID);
-            }
-            else if (b.compare(key, indexID) != 0)
-            {
-              addToDB(indexID, insertIDSet, deleteIDSet);
-              keyCount.incrementAndGet();
-
+              // this is a new record, reinitialize all
               indexID = b.getIndexID();
               b.fetchKey(key);
 
@@ -1998,6 +1988,7 @@
               deleteIDSet = newImportIDSet(key, indexID);
             }
 
+            // merge all entryIds into the idSets
             b.mergeIDSet(insertIDSet);
             b.mergeIDSet(deleteIDSet);
 
@@ -2008,7 +1999,7 @@
             }
           }
 
-          if (key != null)
+          if (indexID != null)
           {
             addToDB(indexID, insertIDSet, deleteIDSet);
           }
@@ -2039,6 +2030,7 @@
 
     private void addToDB(int indexID, ImportIDSet insertSet, ImportIDSet deleteSet) throws DirectoryException
     {
+      keyCount.incrementAndGet();
       if (indexMgr.isDN2ID())
       {
         addDN2ID(indexID, insertSet);
@@ -2414,29 +2406,29 @@
 
     private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
     {
-      indexBuffer.setPosition(-1);
-      resetStreams();
-
       long bufferLen = 0;
       final int numberKeys = indexBuffer.getNumberKeys();
       for (int i = 0; i < numberKeys; i++)
       {
-        if (indexBuffer.getPosition() == -1)
+        if (i == 0)
         {
-          indexBuffer.setPosition(i);
-          insertOrDeleteKey(indexBuffer, i);
-          continue;
-        }
-        if (!indexBuffer.byteArraysEqual(i))
-        {
-          bufferLen += writeRecord(indexBuffer);
+          // first record, initialize all
           indexBuffer.setPosition(i);
           resetStreams();
         }
-        insertOrDeleteKeyCheckEntryLimit(indexBuffer, i);
+        else if (!indexBuffer.sameKeyAndIndexID(i))
+        {
+          // this is a new record, save previous record ...
+          bufferLen += writeRecord(indexBuffer);
+          // ... and reinitialize all
+          indexBuffer.setPosition(i);
+          resetStreams();
+        }
+        appendNextEntryIDToStream(indexBuffer, i);
       }
-      if (indexBuffer.getPosition() != -1)
+      if (numberKeys > 0)
       {
+        // save the last record
         bufferLen += writeRecord(indexBuffer);
       }
       return bufferLen;
@@ -2466,24 +2458,21 @@
       while (!indexSortedSet.isEmpty())
       {
         final IndexOutputBuffer b = indexSortedSet.pollFirst();
-        if (saveKey == null)
+        if (!b.sameKeyAndIndexID(saveKey, saveIndexID))
         {
+          if (saveKey != null)
+          {
+            // save the previous record
+            bufferLen += writeRecord(saveKey, saveIndexID);
+            resetStreams();
+          }
+          // this is a new record, reinitialize all
           saveKey = b.getKey();
           saveIndexID = b.getIndexID();
-          insertOrDeleteKey(b, b.getPosition());
         }
-        else if (!b.recordsEqual(saveKey, saveIndexID))
-        {
-          bufferLen += writeRecord(saveKey, saveIndexID);
-          resetStreams();
-          saveKey = b.getKey();
-          saveIndexID = b.getIndexID();
-          insertOrDeleteKey(b, b.getPosition());
-        }
-        else
-        {
-          insertOrDeleteKeyCheckEntryLimit(b, b.getPosition());
-        }
+
+        appendNextEntryIDToStream(b, b.getPosition());
+
         if (b.hasMoreData())
         {
           b.nextRecord();
@@ -2505,28 +2494,16 @@
       deleteKeyCount = 0;
     }
 
-    private void insertOrDeleteKey(IndexOutputBuffer indexBuffer, int position)
+    private void appendNextEntryIDToStream(IndexOutputBuffer indexBuffer, int position)
     {
       if (indexBuffer.isInsertRecord(position))
       {
-        indexBuffer.writeEntryID(insertByteStream, position);
-        insertKeyCount++;
-      }
-      else
-      {
-        indexBuffer.writeEntryID(deleteByteStream, position);
-        deleteKeyCount++;
-      }
-    }
-
-    private void insertOrDeleteKeyCheckEntryLimit(IndexOutputBuffer indexBuffer, int position)
-    {
-      if (indexBuffer.isInsertRecord(position))
-      {
-        if (insertKeyCount++ <= indexMgr.getLimit())
+        if (insertKeyCount++ <= indexMgr.getIndexEntryLimit())
         {
           indexBuffer.writeEntryID(insertByteStream, position);
         }
+        // else do not bother appending, this value will not be read.
+        // instead, a special value will be written to show the index entry limit is exceeded
       }
       else
       {
@@ -2537,8 +2514,9 @@
 
     private int writeByteStreams() throws IOException
     {
-      if (insertKeyCount > indexMgr.getLimit())
+      if (insertKeyCount > indexMgr.getIndexEntryLimit())
       {
+        // special handling when index entry limit has been exceeded
         insertKeyCount = 1;
         insertByteStream.reset();
         insertByteStream.write(-1);
@@ -2671,21 +2649,21 @@
     private final String bufferFileName;
     private final File bufferIndexFile;
     private final boolean isDN2ID;
-    private final int limit;
+    private final int indexEntryLimit;
 
     private int numberOfBuffers;
     private long bufferFileSize;
     private long totalDNs;
     private volatile IndexDBWriteTask writer;
 
-    private IndexManager(String fileName, boolean isDN2ID, int limit)
+    private IndexManager(String fileName, boolean isDN2ID, int indexEntryLimit)
     {
       this.bufferFileName = fileName;
       this.bufferFile = new File(tempDir, bufferFileName);
       this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
 
       this.isDN2ID = isDN2ID;
-      this.limit = limit > 0 ? limit : Integer.MAX_VALUE;
+      this.indexEntryLimit = indexEntryLimit > 0 ? indexEntryLimit : Integer.MAX_VALUE;
     }
 
     private void setIndexDBWriteTask(IndexDBWriteTask writer)
@@ -2761,9 +2739,9 @@
       return bufferFileName;
     }
 
-    private int getLimit()
+    private int getIndexEntryLimit()
     {
-      return limit;
+      return indexEntryLimit;
     }
 
     /** {@inheritDoc} */

--
Gitblit v1.10.0