mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
04.00.2016 a58502a5729b4d4b36ec0731b89265f37770a781
Interrupt all running tasks when one task has failed.

When an error occurs all running tasks must be interrupted so that the
database can be correctly closed.
1 files modified
32 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java 32 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -216,7 +216,7 @@
      }
      finally
      {
        sorter.shutdown();
        sorter.shutdownNow();
        if (OperatingSystem.isWindows())
        {
          // Try to force the JVM to close mmap()ed file so that they can be deleted.
@@ -584,6 +584,7 @@
            @Override
            public Void call() throws Exception
            {
              checkThreadNotInterrupted();
              EntryInformation entryInfo;
              while ((entryInfo = reader.readEntry(entryContainers)) != null && !importConfig.isCancelled())
              {
@@ -612,6 +613,7 @@
                {
                  reader.removePending(entry.getName());
                }
                checkThreadNotInterrupted();
              }
              return null;
            }
@@ -1014,6 +1016,7 @@
        @Override
        public Void call() throws Exception
        {
          checkThreadNotInterrupted();
          try (final MeteredCursor<ByteString, ByteString> unusued = source.flip())
          {
            // force flush
@@ -1248,7 +1251,8 @@
    }
    finally
    {
      executor.shutdown();
      executor.shutdownNow();
      executor.awaitTermination(5, TimeUnit.SECONDS);
    }
  }
@@ -1555,6 +1559,7 @@
           * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
           * performed by the CollectorCursor.
           */
          checkThreadNotInterrupted();
          final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
          final int regionSize;
          try (final SequentialCursor<ByteString, ByteString> source =
@@ -1840,8 +1845,9 @@
        mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
      }
      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException, InterruptedException
      {
        checkThreadNotInterrupted();
        while (source.next())
        {
          final ByteSequence key = source.getKey();
@@ -1850,6 +1856,7 @@
          PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
          key.copyTo(mmapBuffer);
          value.copyTo(mmapBuffer);
          checkThreadNotInterrupted();
        }
        return mmapBuffer.position();
      }
@@ -2206,8 +2213,9 @@
    }
    @Override
    public Void call()
    public Void call() throws InterruptedException
    {
      checkThreadNotInterrupted();
      try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
      {
        copyIntoChunk(sourceCursor, asChunk(treeName, destination));
@@ -2233,8 +2241,9 @@
    }
    @Override
    public Void call()
    public Void call() throws InterruptedException
    {
      checkThreadNotInterrupted();
      try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
      {
        final long nbRecords = copyIntoChunk(sourceCursor, asChunk(vlvIndex.getName(), destination));
@@ -2245,8 +2254,10 @@
  }
  private static long copyIntoChunk(SequentialCursor<ByteString, ByteString> source, Chunk destination)
      throws InterruptedException
  {
    long nbRecords = 0;
    checkThreadNotInterrupted();
    while (source.next())
    {
      if (!destination.put(source.getKey(), source.getValue()))
@@ -2254,6 +2265,7 @@
        throw new IllegalStateException("Destination chunk is full");
      }
      nbRecords++;
      checkThreadNotInterrupted();
    }
    return nbRecords;
  }
@@ -2302,10 +2314,12 @@
          final SequentialCursor<ByteString, ByteString> dn2idCursor =
              dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor))
      {
        checkThreadNotInterrupted();
        while (dn2idCursor.next())
        {
          dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue());
          totalNumberOfEntries++;
          checkThreadNotInterrupted();
        }
      }
      id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries));
@@ -2634,6 +2648,14 @@
    return results;
  }
  private static void checkThreadNotInterrupted() throws InterruptedException
  {
    if (Thread.interrupted())
    {
      throw new InterruptedException();
    }
  }
  /** Regularly report progress statistics from the registered list of {@link ProgressMetric}. */
  private static final class PhaseTwoProgressReporter implements Runnable, Closeable
  {