mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.32.2015 83325bf8cc3b480c00a01ee4c43391cd0238e041
Simplified loops in import/rebuild-index code.


Importer.java:
Set the indexKey on the IndexOutputBuffer inside indexBufferMap field + added an IndexKey parameter to getNewIndexBuffer() + simplified code in flushIndexBuffers() as a ceonsequence
In IndexBBWriteTask:
- In call() always create the key buffer, removed duplicated code in a loop.
- Moved call to keyCount.incrementAndGet() from call() to addToDB().
In ScratchFileWriterTask class:
- In call(), removed duplicated code in a loop
- Removed insertOrDeleteKey(), superseded by the more complete insertOrDeleteKeyCheckEntryLimit()
- Renamed insertOrDeleteKeyCheckEntryLimit() to appendNextEntryIDToStream()
- In writeIndexBuffer() and writeIndexBuffers(), simplified the code
In IndexManager class, renamed field limit to indexEntryLimit + changed getter.

IndexInputBuffer.java:
Changed compare(ByteStringBuilder, Integer) to isSameKeyAndIndexID(ByteStringBuilder, Integer) + made it null safe

IndexOutputBuffer.java:
Changed recordsEqual(byte[], int) to isSameKeyAndIndexID(byte[], int) + made it null safe
Renamed byteArraysEqual() to sameKeyAndIndexID().
3 files modified
157 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java 130 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexInputBuffer.java 17 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexOutputBuffer.java 10 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1309,7 +1309,8 @@
            final List<ByteString> includeBranches = includeBranchesAsBytes(suffix);
            boolean success = cursor.next();
            while (success
                && !importConfiguration.isCancelled() && !isCanceled)
                && !importConfiguration.isCancelled()
                && !isCanceled)
            {
              final ByteString key = cursor.getKey();
              if (!includeBranches.contains(key))
@@ -1559,6 +1560,7 @@
    }
    /** Examine the DN for duplicates and missing parents. */
    @SuppressWarnings("javadoc")
    boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
        throws StorageRuntimeException, InterruptedException
    {
@@ -1668,17 +1670,12 @@
    void flushIndexBuffers() throws InterruptedException, ExecutionException
    {
      final ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
      Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> it = indexBufferMap.entrySet().iterator();
      while (it.hasNext())
      for (IndexOutputBuffer indexBuffer : indexBufferMap.values())
      {
        Map.Entry<IndexKey, IndexOutputBuffer> e = it.next();
        IndexKey indexKey = e.getKey();
        IndexOutputBuffer indexBuffer = e.getValue();
        it.remove();
        indexBuffer.setIndexKey(indexKey);
        indexBuffer.discard();
        futures.add(bufferSortService.submit(new SortTask(indexBuffer)));
      }
      indexBufferMap.clear();
      getAll(futures);
    }
@@ -1689,16 +1686,15 @@
      IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
      if (indexBuffer == null)
      {
        indexBuffer = getNewIndexBuffer(sizeNeeded);
        indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
        indexBufferMap.put(indexKey, indexBuffer);
      }
      else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
      {
        // complete the current buffer...
        indexBuffer.setIndexKey(indexKey);
        bufferSortService.submit(new SortTask(indexBuffer));
        // ... and get a new one
        indexBuffer = getNewIndexBuffer(sizeNeeded);
        indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
        indexBufferMap.put(indexKey, indexBuffer);
      }
      int indexID = getIndexID(container);
@@ -1706,7 +1702,7 @@
      return indexID;
    }
    IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException
    IndexOutputBuffer getNewIndexBuffer(int size, IndexKey indexKey) throws InterruptedException
    {
      IndexOutputBuffer indexBuffer;
      if (size > bufferSize)
@@ -1726,6 +1722,7 @@
      {
        throw new InterruptedException("Cancel processing received.");
      }
      indexBuffer.setIndexKey(indexKey);
      return indexBuffer;
    }
@@ -1951,15 +1948,15 @@
    @Override
    public Void call() throws Exception
    {
      ByteStringBuilder key = null;
      ImportIDSet insertIDSet = null;
      ImportIDSet deleteIDSet = null;
      if (isCanceled)
      {
        return null;
      }
      final ByteStringBuilder key = new ByteStringBuilder(BYTE_BUFFER_CAPACITY);
      ImportIDSet insertIDSet = null;
      ImportIDSet deleteIDSet = null;
      Integer indexID = null;
      try
      {
        beginWriteTask();
@@ -1972,25 +1969,18 @@
            return null;
          }
          Integer indexID = null;
          while (!bufferSet.isEmpty())
          {
            IndexInputBuffer b = bufferSet.pollFirst();
            if (key == null)
            if (!b.sameKeyAndIndexID(key, indexID))
            {
              key = new ByteStringBuilder(b.getKeyLen());
              indexID = b.getIndexID();
              b.fetchKey(key);
              insertIDSet = newImportIDSet(key, indexID);
              deleteIDSet = newImportIDSet(key, indexID);
            }
            else if (b.compare(key, indexID) != 0)
              if (indexID != null)
            {
                // save the previous record
              addToDB(indexID, insertIDSet, deleteIDSet);
              keyCount.incrementAndGet();
              }
              // this is a new record, reinitialize all
              indexID = b.getIndexID();
              b.fetchKey(key);
@@ -1998,6 +1988,7 @@
              deleteIDSet = newImportIDSet(key, indexID);
            }
            // merge all entryIds into the idSets
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
@@ -2008,7 +1999,7 @@
            }
          }
          if (key != null)
          if (indexID != null)
          {
            addToDB(indexID, insertIDSet, deleteIDSet);
          }
@@ -2039,6 +2030,7 @@
    private void addToDB(int indexID, ImportIDSet insertSet, ImportIDSet deleteSet) throws DirectoryException
    {
      keyCount.incrementAndGet();
      if (indexMgr.isDN2ID())
      {
        addDN2ID(indexID, insertSet);
@@ -2414,29 +2406,29 @@
    private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
    {
      indexBuffer.setPosition(-1);
      resetStreams();
      long bufferLen = 0;
      final int numberKeys = indexBuffer.getNumberKeys();
      for (int i = 0; i < numberKeys; i++)
      {
        if (indexBuffer.getPosition() == -1)
        if (i == 0)
        {
          indexBuffer.setPosition(i);
          insertOrDeleteKey(indexBuffer, i);
          continue;
        }
        if (!indexBuffer.byteArraysEqual(i))
        {
          bufferLen += writeRecord(indexBuffer);
          // first record, initialize all
          indexBuffer.setPosition(i);
          resetStreams();
        }
        insertOrDeleteKeyCheckEntryLimit(indexBuffer, i);
      }
      if (indexBuffer.getPosition() != -1)
        else if (!indexBuffer.sameKeyAndIndexID(i))
      {
          // this is a new record, save previous record ...
          bufferLen += writeRecord(indexBuffer);
          // ... and reinitialize all
          indexBuffer.setPosition(i);
          resetStreams();
        }
        appendNextEntryIDToStream(indexBuffer, i);
      }
      if (numberKeys > 0)
      {
        // save the last record
        bufferLen += writeRecord(indexBuffer);
      }
      return bufferLen;
@@ -2466,24 +2458,21 @@
      while (!indexSortedSet.isEmpty())
      {
        final IndexOutputBuffer b = indexSortedSet.pollFirst();
        if (saveKey == null)
        if (!b.sameKeyAndIndexID(saveKey, saveIndexID))
        {
          saveKey = b.getKey();
          saveIndexID = b.getIndexID();
          insertOrDeleteKey(b, b.getPosition());
        }
        else if (!b.recordsEqual(saveKey, saveIndexID))
          if (saveKey != null)
        {
            // save the previous record
          bufferLen += writeRecord(saveKey, saveIndexID);
          resetStreams();
          }
          // this is a new record, reinitialize all
          saveKey = b.getKey();
          saveIndexID = b.getIndexID();
          insertOrDeleteKey(b, b.getPosition());
        }
        else
        {
          insertOrDeleteKeyCheckEntryLimit(b, b.getPosition());
        }
        appendNextEntryIDToStream(b, b.getPosition());
        if (b.hasMoreData())
        {
          b.nextRecord();
@@ -2505,28 +2494,16 @@
      deleteKeyCount = 0;
    }
    private void insertOrDeleteKey(IndexOutputBuffer indexBuffer, int position)
    private void appendNextEntryIDToStream(IndexOutputBuffer indexBuffer, int position)
    {
      if (indexBuffer.isInsertRecord(position))
      {
        indexBuffer.writeEntryID(insertByteStream, position);
        insertKeyCount++;
      }
      else
      {
        indexBuffer.writeEntryID(deleteByteStream, position);
        deleteKeyCount++;
      }
    }
    private void insertOrDeleteKeyCheckEntryLimit(IndexOutputBuffer indexBuffer, int position)
    {
      if (indexBuffer.isInsertRecord(position))
      {
        if (insertKeyCount++ <= indexMgr.getLimit())
        if (insertKeyCount++ <= indexMgr.getIndexEntryLimit())
        {
          indexBuffer.writeEntryID(insertByteStream, position);
        }
        // else do not bother appending, this value will not be read.
        // instead, a special value will be written to show the index entry limit is exceeded
      }
      else
      {
@@ -2537,8 +2514,9 @@
    private int writeByteStreams() throws IOException
    {
      if (insertKeyCount > indexMgr.getLimit())
      if (insertKeyCount > indexMgr.getIndexEntryLimit())
      {
        // special handling when index entry limit has been exceeded
        insertKeyCount = 1;
        insertByteStream.reset();
        insertByteStream.write(-1);
@@ -2671,21 +2649,21 @@
    private final String bufferFileName;
    private final File bufferIndexFile;
    private final boolean isDN2ID;
    private final int limit;
    private final int indexEntryLimit;
    private int numberOfBuffers;
    private long bufferFileSize;
    private long totalDNs;
    private volatile IndexDBWriteTask writer;
    private IndexManager(String fileName, boolean isDN2ID, int limit)
    private IndexManager(String fileName, boolean isDN2ID, int indexEntryLimit)
    {
      this.bufferFileName = fileName;
      this.bufferFile = new File(tempDir, bufferFileName);
      this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
      this.isDN2ID = isDN2ID;
      this.limit = limit > 0 ? limit : Integer.MAX_VALUE;
      this.indexEntryLimit = indexEntryLimit > 0 ? indexEntryLimit : Integer.MAX_VALUE;
    }
    private void setIndexDBWriteTask(IndexDBWriteTask writer)
@@ -2761,9 +2739,9 @@
      return bufferFileName;
    }
    private int getLimit()
    private int getIndexEntryLimit()
    {
      return limit;
      return indexEntryLimit;
    }
    /** {@inheritDoc} */
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexInputBuffer.java
@@ -289,22 +289,17 @@
  /**
   * Compares this buffer with the provided key and index ID.
   *
   * @param cKey
   * @param key
   *          The key.
   * @param cIndexID
   * @param indexID
   *          The index ID.
   * @return A negative number if this buffer is less than the provided key and
   *         index ID, a positive number if this buffer is greater, or zero if
   *         it is the same.
   * @return true if this buffer represent the same key and indexID, false otherwise.
   */
  int compare(ByteStringBuilder cKey, Integer cIndexID)
  boolean sameKeyAndIndexID(final ByteStringBuilder key, Integer indexID)
  {
    ensureRecordFetched();
    if (Importer.indexComparator.compare(keyBuffer, cKey) == 0)
    {
      return (indexID.intValue() == cIndexID.intValue()) ? 0 : 1;
    }
    return 1;
    return Importer.indexComparator.compare(keyBuffer, key) == 0
        && this.indexID.equals(indexID);
  }
  /** {@inheritDoc} */
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexOutputBuffer.java
@@ -449,13 +449,19 @@
   * @param bIndexID The index key to compare.
   * @return <CODE>True</CODE> if the byte arrays are equal.
   */
  public boolean recordsEqual(byte[] b, int bIndexID)
  public boolean sameKeyAndIndexID(byte[] b, int bIndexID)
  {
    if (b == null)
    {
      return false;
    }
    int offset = getOffset(position);
    int indexID = getIndexIDFromOffset(offset);
    offset += REC_OVERHEAD + LONG_SIZE;
    int keyLen = readInt(buffer, offset);
    int key = INT_SIZE + offset;
    return indexComparator.compare(buffer, key, keyLen, b, b.length) == 0
        && indexID == bIndexID;
  }
@@ -533,7 +539,7 @@
   * @param position The index pointing to the byte array to compare.
   * @return {@code true} if the byte arrays are equal, or {@code false} otherwise
   */
  public boolean byteArraysEqual(int position)
  public boolean sameKeyAndIndexID(int position)
  {
    return compare(position, this.position) == 0;
  }