mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.32.2015 83325bf8cc3b480c00a01ee4c43391cd0238e041
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1309,7 +1309,8 @@
            final List<ByteString> includeBranches = includeBranchesAsBytes(suffix);
            boolean success = cursor.next();
            while (success
                && !importConfiguration.isCancelled() && !isCanceled)
                && !importConfiguration.isCancelled()
                && !isCanceled)
            {
              final ByteString key = cursor.getKey();
              if (!includeBranches.contains(key))
@@ -1559,6 +1560,7 @@
    }
    /** Examine the DN for duplicates and missing parents. */
    @SuppressWarnings("javadoc")
    boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
        throws StorageRuntimeException, InterruptedException
    {
@@ -1668,17 +1670,12 @@
    void flushIndexBuffers() throws InterruptedException, ExecutionException
    {
      final ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
      Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> it = indexBufferMap.entrySet().iterator();
      while (it.hasNext())
      for (IndexOutputBuffer indexBuffer : indexBufferMap.values())
      {
        Map.Entry<IndexKey, IndexOutputBuffer> e = it.next();
        IndexKey indexKey = e.getKey();
        IndexOutputBuffer indexBuffer = e.getValue();
        it.remove();
        indexBuffer.setIndexKey(indexKey);
        indexBuffer.discard();
        futures.add(bufferSortService.submit(new SortTask(indexBuffer)));
      }
      indexBufferMap.clear();
      getAll(futures);
    }
@@ -1689,16 +1686,15 @@
      IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
      if (indexBuffer == null)
      {
        indexBuffer = getNewIndexBuffer(sizeNeeded);
        indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
        indexBufferMap.put(indexKey, indexBuffer);
      }
      else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
      {
        // complete the current buffer...
        indexBuffer.setIndexKey(indexKey);
        bufferSortService.submit(new SortTask(indexBuffer));
        // ... and get a new one
        indexBuffer = getNewIndexBuffer(sizeNeeded);
        indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
        indexBufferMap.put(indexKey, indexBuffer);
      }
      int indexID = getIndexID(container);
@@ -1706,7 +1702,7 @@
      return indexID;
    }
    IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException
    IndexOutputBuffer getNewIndexBuffer(int size, IndexKey indexKey) throws InterruptedException
    {
      IndexOutputBuffer indexBuffer;
      if (size > bufferSize)
@@ -1726,6 +1722,7 @@
      {
        throw new InterruptedException("Cancel processing received.");
      }
      indexBuffer.setIndexKey(indexKey);
      return indexBuffer;
    }
@@ -1951,15 +1948,15 @@
    @Override
    public Void call() throws Exception
    {
      ByteStringBuilder key = null;
      ImportIDSet insertIDSet = null;
      ImportIDSet deleteIDSet = null;
      if (isCanceled)
      {
        return null;
      }
      final ByteStringBuilder key = new ByteStringBuilder(BYTE_BUFFER_CAPACITY);
      ImportIDSet insertIDSet = null;
      ImportIDSet deleteIDSet = null;
      Integer indexID = null;
      try
      {
        beginWriteTask();
@@ -1972,25 +1969,18 @@
            return null;
          }
          Integer indexID = null;
          while (!bufferSet.isEmpty())
          {
            IndexInputBuffer b = bufferSet.pollFirst();
            if (key == null)
            if (!b.sameKeyAndIndexID(key, indexID))
            {
              key = new ByteStringBuilder(b.getKeyLen());
              if (indexID != null)
              {
                // save the previous record
                addToDB(indexID, insertIDSet, deleteIDSet);
              }
              indexID = b.getIndexID();
              b.fetchKey(key);
              insertIDSet = newImportIDSet(key, indexID);
              deleteIDSet = newImportIDSet(key, indexID);
            }
            else if (b.compare(key, indexID) != 0)
            {
              addToDB(indexID, insertIDSet, deleteIDSet);
              keyCount.incrementAndGet();
              // this is a new record, reinitialize all
              indexID = b.getIndexID();
              b.fetchKey(key);
@@ -1998,6 +1988,7 @@
              deleteIDSet = newImportIDSet(key, indexID);
            }
            // merge all entryIds into the idSets
            b.mergeIDSet(insertIDSet);
            b.mergeIDSet(deleteIDSet);
@@ -2008,7 +1999,7 @@
            }
          }
          if (key != null)
          if (indexID != null)
          {
            addToDB(indexID, insertIDSet, deleteIDSet);
          }
@@ -2039,6 +2030,7 @@
    private void addToDB(int indexID, ImportIDSet insertSet, ImportIDSet deleteSet) throws DirectoryException
    {
      keyCount.incrementAndGet();
      if (indexMgr.isDN2ID())
      {
        addDN2ID(indexID, insertSet);
@@ -2414,29 +2406,29 @@
    private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
    {
      indexBuffer.setPosition(-1);
      resetStreams();
      long bufferLen = 0;
      final int numberKeys = indexBuffer.getNumberKeys();
      for (int i = 0; i < numberKeys; i++)
      {
        if (indexBuffer.getPosition() == -1)
        if (i == 0)
        {
          indexBuffer.setPosition(i);
          insertOrDeleteKey(indexBuffer, i);
          continue;
        }
        if (!indexBuffer.byteArraysEqual(i))
        {
          bufferLen += writeRecord(indexBuffer);
          // first record, initialize all
          indexBuffer.setPosition(i);
          resetStreams();
        }
        insertOrDeleteKeyCheckEntryLimit(indexBuffer, i);
        else if (!indexBuffer.sameKeyAndIndexID(i))
        {
          // this is a new record, save previous record ...
          bufferLen += writeRecord(indexBuffer);
          // ... and reinitialize all
          indexBuffer.setPosition(i);
          resetStreams();
        }
        appendNextEntryIDToStream(indexBuffer, i);
      }
      if (indexBuffer.getPosition() != -1)
      if (numberKeys > 0)
      {
        // save the last record
        bufferLen += writeRecord(indexBuffer);
      }
      return bufferLen;
@@ -2466,24 +2458,21 @@
      while (!indexSortedSet.isEmpty())
      {
        final IndexOutputBuffer b = indexSortedSet.pollFirst();
        if (saveKey == null)
        if (!b.sameKeyAndIndexID(saveKey, saveIndexID))
        {
          if (saveKey != null)
          {
            // save the previous record
            bufferLen += writeRecord(saveKey, saveIndexID);
            resetStreams();
          }
          // this is a new record, reinitialize all
          saveKey = b.getKey();
          saveIndexID = b.getIndexID();
          insertOrDeleteKey(b, b.getPosition());
        }
        else if (!b.recordsEqual(saveKey, saveIndexID))
        {
          bufferLen += writeRecord(saveKey, saveIndexID);
          resetStreams();
          saveKey = b.getKey();
          saveIndexID = b.getIndexID();
          insertOrDeleteKey(b, b.getPosition());
        }
        else
        {
          insertOrDeleteKeyCheckEntryLimit(b, b.getPosition());
        }
        appendNextEntryIDToStream(b, b.getPosition());
        if (b.hasMoreData())
        {
          b.nextRecord();
@@ -2505,28 +2494,16 @@
      deleteKeyCount = 0;
    }
    private void insertOrDeleteKey(IndexOutputBuffer indexBuffer, int position)
    private void appendNextEntryIDToStream(IndexOutputBuffer indexBuffer, int position)
    {
      if (indexBuffer.isInsertRecord(position))
      {
        indexBuffer.writeEntryID(insertByteStream, position);
        insertKeyCount++;
      }
      else
      {
        indexBuffer.writeEntryID(deleteByteStream, position);
        deleteKeyCount++;
      }
    }
    private void insertOrDeleteKeyCheckEntryLimit(IndexOutputBuffer indexBuffer, int position)
    {
      if (indexBuffer.isInsertRecord(position))
      {
        if (insertKeyCount++ <= indexMgr.getLimit())
        if (insertKeyCount++ <= indexMgr.getIndexEntryLimit())
        {
          indexBuffer.writeEntryID(insertByteStream, position);
        }
        // else do not bother appending, this value will not be read.
        // instead, a special value will be written to show the index entry limit is exceeded
      }
      else
      {
@@ -2537,8 +2514,9 @@
    private int writeByteStreams() throws IOException
    {
      if (insertKeyCount > indexMgr.getLimit())
      if (insertKeyCount > indexMgr.getIndexEntryLimit())
      {
        // special handling when index entry limit has been exceeded
        insertKeyCount = 1;
        insertByteStream.reset();
        insertByteStream.write(-1);
@@ -2671,21 +2649,21 @@
    private final String bufferFileName;
    private final File bufferIndexFile;
    private final boolean isDN2ID;
    private final int limit;
    private final int indexEntryLimit;
    private int numberOfBuffers;
    private long bufferFileSize;
    private long totalDNs;
    private volatile IndexDBWriteTask writer;
    private IndexManager(String fileName, boolean isDN2ID, int limit)
    private IndexManager(String fileName, boolean isDN2ID, int indexEntryLimit)
    {
      this.bufferFileName = fileName;
      this.bufferFile = new File(tempDir, bufferFileName);
      this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
      this.isDN2ID = isDN2ID;
      this.limit = limit > 0 ? limit : Integer.MAX_VALUE;
      this.indexEntryLimit = indexEntryLimit > 0 ? indexEntryLimit : Integer.MAX_VALUE;
    }
    private void setIndexDBWriteTask(IndexDBWriteTask writer)
@@ -2761,9 +2739,9 @@
      return bufferFileName;
    }
    private int getLimit()
    private int getIndexEntryLimit()
    {
      return limit;
      return indexEntryLimit;
    }
    /** {@inheritDoc} */