mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
04.00.2016 a58502a5729b4d4b36ec0731b89265f37770a781
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -216,7 +216,7 @@
      }
      finally
      {
        sorter.shutdown();
        sorter.shutdownNow();
        if (OperatingSystem.isWindows())
        {
          // Try to force the JVM to close mmap()ed file so that they can be deleted.
@@ -584,6 +584,7 @@
            @Override
            public Void call() throws Exception
            {
              checkThreadNotInterrupted();
              EntryInformation entryInfo;
              while ((entryInfo = reader.readEntry(entryContainers)) != null && !importConfig.isCancelled())
              {
@@ -612,6 +613,7 @@
                {
                  reader.removePending(entry.getName());
                }
                checkThreadNotInterrupted();
              }
              return null;
            }
@@ -1014,6 +1016,7 @@
        @Override
        public Void call() throws Exception
        {
          checkThreadNotInterrupted();
          try (final MeteredCursor<ByteString, ByteString> unusued = source.flip())
          {
            // force flush
@@ -1248,7 +1251,8 @@
    }
    finally
    {
      executor.shutdown();
      executor.shutdownNow();
      executor.awaitTermination(5, TimeUnit.SECONDS);
    }
  }
@@ -1555,6 +1559,7 @@
           * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
           * performed by the CollectorCursor.
           */
          checkThreadNotInterrupted();
          final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
          final int regionSize;
          try (final SequentialCursor<ByteString, ByteString> source =
@@ -1840,8 +1845,9 @@
        mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
      }
      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException, InterruptedException
      {
        checkThreadNotInterrupted();
        while (source.next())
        {
          final ByteSequence key = source.getKey();
@@ -1850,6 +1856,7 @@
          PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
          key.copyTo(mmapBuffer);
          value.copyTo(mmapBuffer);
          checkThreadNotInterrupted();
        }
        return mmapBuffer.position();
      }
@@ -2206,8 +2213,9 @@
    }
    @Override
    public Void call()
    public Void call() throws InterruptedException
    {
      checkThreadNotInterrupted();
      try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
      {
        copyIntoChunk(sourceCursor, asChunk(treeName, destination));
@@ -2233,8 +2241,9 @@
    }
    @Override
    public Void call()
    public Void call() throws InterruptedException
    {
      checkThreadNotInterrupted();
      try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
      {
        final long nbRecords = copyIntoChunk(sourceCursor, asChunk(vlvIndex.getName(), destination));
@@ -2245,8 +2254,10 @@
  }
  private static long copyIntoChunk(SequentialCursor<ByteString, ByteString> source, Chunk destination)
      throws InterruptedException
  {
    long nbRecords = 0;
    checkThreadNotInterrupted();
    while (source.next())
    {
      if (!destination.put(source.getKey(), source.getValue()))
@@ -2254,6 +2265,7 @@
        throw new IllegalStateException("Destination chunk is full");
      }
      nbRecords++;
      checkThreadNotInterrupted();
    }
    return nbRecords;
  }
@@ -2302,10 +2314,12 @@
          final SequentialCursor<ByteString, ByteString> dn2idCursor =
              dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor))
      {
        checkThreadNotInterrupted();
        while (dn2idCursor.next())
        {
          dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue());
          totalNumberOfEntries++;
          checkThreadNotInterrupted();
        }
      }
      id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries));
@@ -2634,6 +2648,14 @@
    return results;
  }
  private static void checkThreadNotInterrupted() throws InterruptedException
  {
    if (Thread.interrupted())
    {
      throw new InterruptedException();
    }
  }
  /** Regularly report progress statistics from the registered list of {@link ProgressMetric}. */
  private static final class PhaseTwoProgressReporter implements Runnable, Closeable
  {