mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
29.50.2016 e3a3030cd14ba12631b8c50d955ec800b247fb72
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -176,8 +176,23 @@
      final int indexCount = getIndexCount();
      final int nbBuffer = threadCount * indexCount * 2;
      final int bufferSize = computeBufferSize(nbBuffer);
      logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      final int bufferSize;
      if (BufferPool.SUPPORTS_OFF_HEAP && importConfig.getOffHeapSize() > 0)
      {
        final long offHeapSize = importConfig.getOffHeapSize();
        bufferSize = (int) ((offHeapSize * MB) / nbBuffer);
        if (bufferSize < MIN_BUFFER_SIZE)
        {
          // Not enough memory.
          throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapSize * MB, nbBuffer * MIN_BUFFER_SIZE));
        }
        logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE, offHeapSize, nbBuffer, bufferSize / KB);
      }
      else
      {
        bufferSize = computeBufferSize(nbBuffer);
        logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
      }
      logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
      logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
@@ -466,11 +481,6 @@
    private int computeBufferSize(int nbBuffer) throws InitializationException
    {
      if (BufferPool.SUPPORTS_OFF_HEAP)
      {
        return MAX_BUFFER_SIZE;
      }
      final long availableMemory = calculateAvailableMemory();
      logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
@@ -809,12 +819,14 @@
    }
  }
  /** Default size for the off-heap memory dedicated to the phase one's buffer. */
  private static final int DEFAULT_OFFHEAP_SIZE = 700;
  /** Max size of phase one buffer. */
  private static final int MAX_BUFFER_SIZE = 2 * MB;
  /** Min size of phase one buffer. */
  private static final int MIN_BUFFER_SIZE = 4 * KB;
  /** DB cache size to use during import. */
  private static final int DB_CACHE_SIZE = 4 * MB;
  private static final int DB_CACHE_SIZE = 32 * MB;
  /** Required free memory for this importer. */
  private static final int REQUIRED_FREE_MEMORY = 50 * MB;
  /** LDIF reader. */
@@ -855,7 +867,7 @@
          {
            try
            {
              importStrategy.beforeImport(container);
              importStrategy.beforePhaseOne(container);
            }
            finally
            {
@@ -878,6 +890,8 @@
      throw new InterruptedException("Import processing canceled.");
    }
    importStrategy.afterPhaseOne();
    // Start phase two
    final long phaseTwoStartTime = System.currentTimeMillis();
    try (final PhaseTwoProgressReporter progressReporter = new PhaseTwoProgressReporter())
@@ -895,7 +909,7 @@
    // Finish import
    for(EntryContainer entryContainer : importedContainers.keySet())
    {
      importStrategy.afterImport(entryContainer);
      importStrategy.afterPhaseTwo(entryContainer);
    }
    phaseTwoTimeMs = System.currentTimeMillis() - phaseTwoStartTime;
  }
@@ -951,15 +965,20 @@
    abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException;
    void beforeImport(EntryContainer entryContainer)
    void beforePhaseOne(EntryContainer entryContainer)
    {
      entryContainer.delete(asWriteableTransaction(importer));
      visitIndexes(entryContainer, setTrust(false, importer));
    }
    void afterPhaseOne()
    {
      closeSilently(bufferPool);
    }
    abstract Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk source, PhaseTwoProgressReporter progressReporter);
    void afterImport(EntryContainer entryContainer)
    void afterPhaseTwo(EntryContainer entryContainer)
    {
      visitIndexes(entryContainer, setTrust(true, importer));
    }
@@ -967,7 +986,8 @@
    final Chunk newExternalSortChunk(TreeName treeName) throws Exception
    {
      return new ExternalSortChunk(tempDir, treeName.toString(), bufferPool,
          newCollector(entryContainers.get(treeName.getBaseDN()), treeName), sorter);
          newPhaseOneCollector(entryContainers.get(treeName.getBaseDN()), treeName),
          newPhaseTwoCollector(entryContainers.get(treeName.getBaseDN()), treeName), sorter);
    }
    final Callable<Void> newChunkCopierTask(TreeName treeName, final Chunk source,
@@ -983,7 +1003,7 @@
      final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount();
      return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), source,
          id2count, newCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
          id2count, newPhaseTwoCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
    }
    final Callable<Void> newVLVIndexImporterTask(VLVIndex vlvIndex, final Chunk source,
@@ -1167,14 +1187,14 @@
    }
    @Override
    void beforeImport(EntryContainer entryContainer)
    void beforePhaseOne(EntryContainer entryContainer)
    {
      visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, setTrust(false, importer)));
      visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, deleteDatabase(importer)));
    }
    @Override
    void afterImport(EntryContainer entryContainer)
    void afterPhaseTwo(EntryContainer entryContainer)
    {
      visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, setTrust(true, importer)));
    }
@@ -1395,12 +1415,12 @@
    /** Provides buffer used to store and sort chunk of data. */
    private final BufferPool bufferPool;
    /** File containing the regions used to store the data. */
    private final File file;
    private final FileChannel channel;
    /** Pointer to the next available region in the file, typically at end of file. */
    private final AtomicLong filePosition = new AtomicLong();
    /** Collector used to reduces the number of duplicate keys during sort. */
    private final Collector<?, ByteString> deduplicator;
    private final Collector<?, ByteString> phaseOneDeduplicator;
    private final Collector<?, ByteString> phaseTwoDeduplicator;
    /** Keep track of pending sorting tasks. */
    private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
    /** Keep track of currently opened chunks. */
@@ -1419,14 +1439,15 @@
      }
    };
    ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> collector,
        Executor sortExecutor) throws IOException
    ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> phaseOneDeduplicator,
        Collector<?, ByteString> phaseTwoDeduplicator, Executor sortExecutor) throws IOException
    {
      this.name = name;
      this.bufferPool = bufferPool;
      this.deduplicator = collector;
      this.file = new File(tempDir, name.replaceAll("\\W+", "_") + "_" + UUID.randomUUID().toString());
      this.channel = open(this.file.toPath(), CREATE, TRUNCATE_EXISTING, READ, WRITE, DELETE_ON_CLOSE);
      this.phaseOneDeduplicator = phaseOneDeduplicator;
      this.phaseTwoDeduplicator = phaseTwoDeduplicator;
      final File file = new File(tempDir, name.replaceAll("\\W+", "_") + "_" + UUID.randomUUID().toString());
      this.channel = open(file.toPath(), CREATE_NEW, SPARSE, READ, WRITE);
      this.sorter = new ExecutorCompletionService<>(sortExecutor);
    }
@@ -1478,7 +1499,7 @@
                }
                closeSilently(channel);
              }
            }, (Collector<?, ByteString>) deduplicator);
            }, (Collector<?, ByteString>) phaseTwoDeduplicator);
      }
      catch (ExecutionException | InterruptedException e)
      {
@@ -1520,7 +1541,7 @@
           */
          final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
          try (final SequentialCursor<ByteString, ByteString> source =
              new CollectorCursor<>(chunk.flip(), deduplicator))
              new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
          {
            copyIntoChunk(source, persistentChunk);
          }
@@ -1553,8 +1574,6 @@
     */
    static final class InMemorySortedChunk implements Chunk, Comparator<Integer>
    {
      private static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
      private final String metricName;
      private final BufferPool bufferPool;
      private final Buffer buffer;
@@ -1574,13 +1593,11 @@
      @Override
      public boolean put(ByteSequence key, ByteSequence value)
      {
        final int keyHeaderSize = PackedLong.getEncodedSize(key.length());
        final int valueHeaderSize = PackedLong.getEncodedSize(value.length());
        final int keyRecordSize = keyHeaderSize + key.length();
        final int recordSize = keyRecordSize + valueHeaderSize + value.length();
        final int keyRecordSize = INT_SIZE + key.length();
        final int recordSize = keyRecordSize + INT_SIZE + value.length();
        dataPos -= recordSize;
        final int recordDataPos = dataPos;
        int recordDataPos = dataPos;
        final int recordIndexPos = indexPos;
        indexPos += INT_SIZE;
@@ -1596,19 +1613,18 @@
        // Write record offset
        buffer.writeInt(recordIndexPos, recordDataPos);
        final int valuePos = writeDataAt(recordDataPos, key);
        writeDataAt(valuePos, value);
        buffer.writeInt(recordDataPos, key.length());
        recordDataPos += INT_SIZE;
        buffer.writeInt(recordDataPos, value.length());
        recordDataPos += INT_SIZE;
        buffer.writeByteSequence(recordDataPos, key);
        recordDataPos += key.length();
        buffer.writeByteSequence(recordDataPos, value);
        return true;
      }
      private int writeDataAt(int offset, ByteSequence data)
      {
        final int headerSize = buffer.writeCompactUnsignedLong(offset, data.length());
        buffer.writeByteSequence(offset + headerSize, data);
        return offset + headerSize + data.length();
      }
      @Override
      public long size()
      {
@@ -1660,11 +1676,10 @@
          return 0;
        }
        // Compare Keys
        final int keyLengthA = (int) buffer.readCompactUnsignedLong(iOffsetA);
        final int keyOffsetA = iOffsetA + PackedLong.getEncodedSize(keyLengthA);
        final int keyLengthB = (int) buffer.readCompactUnsignedLong(iOffsetB);
        final int keyOffsetB = iOffsetB + PackedLong.getEncodedSize(keyLengthB);
        final int keyLengthA = buffer.readInt(iOffsetA);
        final int keyLengthB = buffer.readInt(iOffsetB);
        final int keyOffsetA = iOffsetA + 2 * INT_SIZE;
        final int keyOffsetB = iOffsetB + 2 * INT_SIZE;
        return buffer.compare(keyOffsetA, keyLengthA, keyOffsetB, keyLengthB);
      }
@@ -1685,19 +1700,19 @@
            key = value = null;
            return false;
          }
          final int recordOffset = buffer.readInt(indexOffset);
          int recordOffset = buffer.readInt(indexOffset);
          final int keyLength = (int) buffer.readCompactUnsignedLong(recordOffset);
          final int keyHeaderSize = PackedLong.getEncodedSize(keyLength);
          key = buffer.readByteString(recordOffset + keyHeaderSize, keyLength);
          final int keyLength = buffer.readInt(recordOffset);
          recordOffset += 4;
          final int valueLength = buffer.readInt(recordOffset);
          recordOffset += 4;
          final int valueOffset = recordOffset + keyHeaderSize + keyLength;
          final int valueLength = (int) buffer.readCompactUnsignedLong(valueOffset);
          final int valueHeaderSize = PackedLong.getEncodedSize(valueLength);
          value = buffer.readByteString(valueOffset + valueHeaderSize, valueLength);
          key = buffer.readByteString(recordOffset, keyLength);
          recordOffset += key.length();
          value = buffer.readByteString(recordOffset, valueLength);
          indexOffset += INT_SIZE;
          bytesRead += keyHeaderSize + keyLength + valueHeaderSize + valueLength;
          bytesRead += (2 * INT_SIZE) + keyLength + valueLength;
          return true;
        }
@@ -1833,11 +1848,6 @@
      public MeteredCursor<ByteString, ByteString> flip()
      {
        size = mmapBuffer.position();
        /*
         * We force OS to write dirty pages now so that they don't accumulate. Indeed, huge number of dirty pages might
         * cause the OS to freeze the producer of those dirty pages (this importer) while it is swapping-out the pages.
         */
        mmapBuffer.force();
        mmapBuffer = null;
        return new FileRegionChunkCursor(startOffset, size);
      }
@@ -2297,7 +2307,7 @@
    {
      final Chunk id2CountChunk =
          new ExternalSortChunk(tempDir, id2count.getName().toString(), bufferPool, id2countCollector,
              sameThreadExecutor());
              id2countCollector, sameThreadExecutor());
      long totalNumberOfEntries = 0;
      final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
@@ -2471,7 +2481,13 @@
     * threads which will access the put() method. If underestimated, {@link #put(ByteSequence, ByteSequence)} might
     * lead to unordered copy. If overestimated, extra memory is wasted.
     */
    private static final int QUEUE_SIZE = 1024;
    private static final int QUEUE_SIZE = 128;
    /**
     * Maximum queued entry size. Beyond this size, entry will not be queued but written directly to the storage in
     * order to limit the heap size requirement for import.
     */
    private static final int ENTRY_MAX_SIZE = 32 * KB;
    private final NavigableMap<ByteSequence, ByteSequence> pendingRecords = new TreeMap<>();
    private final int queueSize;
@@ -2486,6 +2502,11 @@
    @Override
    public synchronized boolean put(ByteSequence key, ByteSequence value)
    {
      if ((key.length() + value.length()) >= ENTRY_MAX_SIZE)
      {
          return delegate.put(key, value);
      }
      pendingRecords.put(key, value);
      if (pendingRecords.size() == queueSize)
      {
@@ -2700,12 +2721,8 @@
    int readInt(int position);
    long readCompactUnsignedLong(int position);
    ByteString readByteString(int position, int length);
    int writeCompactUnsignedLong(int position, long value);
    void writeByteSequence(int position, ByteSequence data);
    int length();
@@ -2808,14 +2825,6 @@
      private final long address;
      private final int size;
      private int position;
      private final InputStream asInputStream = new InputStream()
      {
        @Override
        public int read() throws IOException
        {
          return UNSAFE.getByte(address + position++) & 0xFF;
        }
      };
      private final OutputStream asOutputStream = new OutputStream()
      {
        @Override
@@ -2823,6 +2832,18 @@
        {
          UNSAFE.putByte(address + position++, (byte) (value & 0xFF));
        }
        @Override
        public void write(byte[] b) throws IOException {
            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET, null, address + position, b.length);
            position += b.length;
        }
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET + off, null, address + position, len);
            position += b.length;
        }
      };
      private boolean closed;
@@ -2845,42 +2866,17 @@
      }
      @Override
      public int writeCompactUnsignedLong(final int position, long value)
      public void writeByteSequence(final int position, ByteSequence data)
      {
        try
        {
          this.position = position;
          return PackedLong.writeCompactUnsigned(asOutputStream, value);
        }
        catch (IOException e)
        {
          throw new StorageRuntimeException(e);
        }
      }
      @Override
      public long readCompactUnsignedLong(final int position)
      {
        Reject.ifFalse(position + data.length() <= size);
        this.position = position;
        try
        {
          return PackedLong.readCompactUnsignedLong(asInputStream);
          data.copyTo(asOutputStream);
        }
        catch (IOException e)
        catch(IOException e)
        {
          throw new IllegalStateException(e);
        }
      }
      @Override
      public void writeByteSequence(int position, ByteSequence data)
      {
        Reject.ifFalse(position + data.length() <= size);
        long offset = address + position;
        for(int i = 0 ; i < data.length() ; i++)
        {
          UNSAFE.putByte(offset++, data.byteAt(i));
          throw new StorageRuntimeException(e);
        }
      }
@@ -2931,23 +2927,6 @@
    static final class HeapBuffer implements Buffer
    {
      private final ByteBuffer buffer;
      private final OutputStream asOutputStream = new OutputStream()
      {
        @Override
        public void write(int b) throws IOException
        {
          buffer.put((byte) (b & 0xFF));
        }
      };
      private final InputStream asInputStream = new InputStream()
      {
        @Override
        public int read() throws IOException
        {
          return buffer.get() & 0xFF;
        }
      };
      HeapBuffer(int size)
      {
@@ -2967,34 +2946,6 @@
      }
      @Override
      public int writeCompactUnsignedLong(final int position, long value)
      {
        buffer.position(position);
        try
        {
          return PackedLong.writeCompactUnsigned(asOutputStream, value);
        }
        catch (IOException e)
        {
          throw new StorageRuntimeException(e);
        }
      }
      @Override
      public long readCompactUnsignedLong(final int position)
      {
        buffer.position(position);
        try
        {
          return PackedLong.readCompactUnsignedLong(asInputStream);
        }
        catch (IOException e)
        {
          throw new IllegalArgumentException(e);
        }
      }
      @Override
      public void writeByteSequence(int position, ByteSequence data)
      {
        buffer.position(position);
@@ -3065,7 +3016,8 @@
   * Get a new {@link Collector} which can be used to merge encoded values. The types of values to merged is deduced
   * from the {@link TreeName}
   */
  private static Collector<?, ByteString> newCollector(final EntryContainer entryContainer, final TreeName treeName)
  private static Collector<?, ByteString> newPhaseTwoCollector(final EntryContainer entryContainer,
      final TreeName treeName)
  {
    final DefaultIndex index = getIndex(entryContainer, treeName);
    if (index != null)
@@ -3086,6 +3038,18 @@
    throw new IllegalArgumentException("Unknown tree: " + treeName);
  }
  private static Collector<?, ByteString> newPhaseOneCollector(final EntryContainer entryContainer,
      final TreeName treeName)
  {
    final DefaultIndex index = getIndex(entryContainer, treeName);
    if (index != null)
    {
      // key conflicts == merge EntryIDSets
      return new EntryIDsCollector(index);
    }
    return newPhaseTwoCollector(entryContainer, treeName);
  }
  private static boolean isDN2ID(TreeName treeName)
  {
    return SuffixContainer.DN2ID_INDEX_NAME.equals(treeName.getIndexId());
@@ -3212,6 +3176,80 @@
  }
  /**
   * {@link Collector} that accepts encoded {@link EntryIDSet} objects and
   * produces a {@link ByteString} representing the merged {@link EntryIDSet}.
   */
  static final class EntryIDsCollector implements Collector<LongArray, ByteString>
  {
    private final DefaultIndex index;
    private final int indexLimit;
    EntryIDsCollector(DefaultIndex index)
    {
      this.index = index;
      this.indexLimit = index.getIndexEntryLimit();
    }
    @Override
    public LongArray get()
    {
      return new LongArray();
    }
    @Override
    public LongArray accept(LongArray resultContainer, ByteString value)
    {
      if (resultContainer.size() < indexLimit)
      {
        resultContainer.add(value.toLong());
      }
      /*
       * else EntryIDSet is above index entry limits, discard additional values
       * to avoid blowing up memory now, then discard all entries in merge()
       */
      return resultContainer;
    }
    @Override
    public ByteString merge(LongArray resultContainer)
    {
      if (resultContainer.size() >= indexLimit)
      {
        return index.toValue(EntryIDSet.newUndefinedSet());
      }
      return index.toValue(EntryIDSet.newDefinedSet(resultContainer.get()));
    }
  }
  /** Simple long array primitive wrapper. */
  private static final class LongArray
  {
    private long[] values = new long[16];
    private int size;
    void add(long value)
    {
      if (size == values.length)
      {
        values = Arrays.copyOf(values, values.length * 2);
      }
      values[size++] = value;
    }
    int size()
    {
      return size;
    }
    long[] get()
    {
      values = Arrays.copyOf(values, size);
      Arrays.sort(values);
      return values;
    }
  }
  /**
   * {@link Collector} that accepts encoded {@link EntryIDSet} objects and produces a {@link ByteString} representing
   * the merged {@link EntryIDSet}.
   */
@@ -3264,27 +3302,26 @@
    private EntryIDSet buildEntryIDSet(Collection<ByteString> encodedIDSets)
    {
      final long[] entryIDs = new long[indexLimit];
      // accumulate in array
      int i = 0;
      for (ByteString encodedIDSet : encodedIDSets)
      {
      final List<EntryIDSet> idSets = new ArrayList<>(encodedIDSets.size());
      int mergedSize = 0;
      for(ByteString encodedIDSet :encodedIDSets) {
        final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), encodedIDSet);
        if (!entryIDSet.isDefined() || i + entryIDSet.size() >= indexLimit)
        mergedSize += entryIDSet.size();
        if (!entryIDSet.isDefined() || mergedSize >= indexLimit)
        {
          // above index entry limit
          return EntryIDSet.newUndefinedSet();
        }
        for (EntryID entryID : entryIDSet)
        {
          entryIDs[i++] = entryID.longValue();
        }
        idSets.add(entryIDSet);
      }
      Arrays.sort(entryIDs, 0, i);
      return EntryIDSet.newDefinedSet(Arrays.copyOf(entryIDs, i));
      final long[] entryIDs = new long[mergedSize];
      int offset = 0;
      for(EntryIDSet idSet : idSets) {
        offset += idSet.copyTo(entryIDs, offset);
      }
      Arrays.sort(entryIDs);
      return EntryIDSet.newDefinedSet(entryIDs);
    }
  }