From a58502a5729b4d4b36ec0731b89265f37770a781 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 09 May 2016 16:53:45 +0000
Subject: [PATCH] Interrupt all running tasks when one task has failed.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java |   32 +++++++++++++++++++++++++++-----
 1 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 5dca49c..6579e0a 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -216,7 +216,7 @@
       }
       finally
       {
-        sorter.shutdown();
+        sorter.shutdownNow();
         if (OperatingSystem.isWindows())
         {
           // Try to force the JVM to close mmap()ed file so that they can be deleted.
@@ -584,6 +584,7 @@
             @Override
             public Void call() throws Exception
             {
+              checkThreadNotInterrupted();
               EntryInformation entryInfo;
               while ((entryInfo = reader.readEntry(entryContainers)) != null && !importConfig.isCancelled())
               {
@@ -612,6 +613,7 @@
                 {
                   reader.removePending(entry.getName());
                 }
+                checkThreadNotInterrupted();
               }
               return null;
             }
@@ -1014,6 +1016,7 @@
         @Override
         public Void call() throws Exception
         {
+          checkThreadNotInterrupted();
           try (final MeteredCursor<ByteString, ByteString> unusued = source.flip())
           {
             // force flush
@@ -1248,7 +1251,8 @@
     }
     finally
     {
-      executor.shutdown();
+      executor.shutdownNow();
+      executor.awaitTermination(5, TimeUnit.SECONDS);
     }
   }
 
@@ -1555,6 +1559,7 @@
            * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
            * performed by the CollectorCursor.
            */
+          checkThreadNotInterrupted();
           final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
           final int regionSize;
           try (final SequentialCursor<ByteString, ByteString> source =
@@ -1840,8 +1845,9 @@
         mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
       }
 
-      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
+      public int write(SequentialCursor<ByteString, ByteString> source) throws IOException, InterruptedException
       {
+        checkThreadNotInterrupted();
         while (source.next())
         {
           final ByteSequence key = source.getKey();
@@ -1850,6 +1856,7 @@
           PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
           key.copyTo(mmapBuffer);
           value.copyTo(mmapBuffer);
+          checkThreadNotInterrupted();
         }
         return mmapBuffer.position();
       }
@@ -2206,8 +2213,9 @@
     }
 
     @Override
-    public Void call()
+    public Void call() throws InterruptedException
     {
+      checkThreadNotInterrupted();
       try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
       {
         copyIntoChunk(sourceCursor, asChunk(treeName, destination));
@@ -2233,8 +2241,9 @@
     }
 
     @Override
-    public Void call()
+    public Void call() throws InterruptedException
     {
+      checkThreadNotInterrupted();
       try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
       {
         final long nbRecords = copyIntoChunk(sourceCursor, asChunk(vlvIndex.getName(), destination));
@@ -2245,8 +2254,10 @@
   }
 
   private static long copyIntoChunk(SequentialCursor<ByteString, ByteString> source, Chunk destination)
+      throws InterruptedException
   {
     long nbRecords = 0;
+    checkThreadNotInterrupted();
     while (source.next())
     {
       if (!destination.put(source.getKey(), source.getValue()))
@@ -2254,6 +2265,7 @@
         throw new IllegalStateException("Destination chunk is full");
       }
       nbRecords++;
+      checkThreadNotInterrupted();
     }
     return nbRecords;
   }
@@ -2302,10 +2314,12 @@
           final SequentialCursor<ByteString, ByteString> dn2idCursor =
               dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor))
       {
+        checkThreadNotInterrupted();
         while (dn2idCursor.next())
         {
           dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue());
           totalNumberOfEntries++;
+          checkThreadNotInterrupted();
         }
       }
       id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries));
@@ -2634,6 +2648,14 @@
     return results;
   }
 
+  private static void checkThreadNotInterrupted() throws InterruptedException
+  {
+    if (Thread.interrupted())
+    {
+      throw new InterruptedException();
+    }
+  }
+
   /** Regularly report progress statistics from the registered list of {@link ProgressMetric}. */
   private static final class PhaseTwoProgressReporter implements Runnable, Closeable
   {

--
Gitblit v1.10.0