From a58502a5729b4d4b36ec0731b89265f37770a781 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Mon, 09 May 2016 16:53:45 +0000
Subject: [PATCH] Interrupt all running tasks when one task has failed.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 32 +++++++++++++++++++++++++++-----
1 files changed, 27 insertions(+), 5 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 5dca49c..6579e0a 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -216,7 +216,7 @@
}
finally
{
- sorter.shutdown();
+ sorter.shutdownNow();
if (OperatingSystem.isWindows())
{
// Try to force the JVM to close mmap()ed file so that they can be deleted.
@@ -584,6 +584,7 @@
@Override
public Void call() throws Exception
{
+ checkThreadNotInterrupted();
EntryInformation entryInfo;
while ((entryInfo = reader.readEntry(entryContainers)) != null && !importConfig.isCancelled())
{
@@ -612,6 +613,7 @@
{
reader.removePending(entry.getName());
}
+ checkThreadNotInterrupted();
}
return null;
}
@@ -1014,6 +1016,7 @@
@Override
public Void call() throws Exception
{
+ checkThreadNotInterrupted();
try (final MeteredCursor<ByteString, ByteString> unusued = source.flip())
{
// force flush
@@ -1248,7 +1251,8 @@
}
finally
{
- executor.shutdown();
+ executor.shutdownNow();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
}
}
@@ -1555,6 +1559,7 @@
* NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
* performed by the CollectorCursor.
*/
+ checkThreadNotInterrupted();
final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
final int regionSize;
try (final SequentialCursor<ByteString, ByteString> source =
@@ -1840,8 +1845,9 @@
mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
}
- public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
+ public int write(SequentialCursor<ByteString, ByteString> source) throws IOException, InterruptedException
{
+ checkThreadNotInterrupted();
while (source.next())
{
final ByteSequence key = source.getKey();
@@ -1850,6 +1856,7 @@
PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
key.copyTo(mmapBuffer);
value.copyTo(mmapBuffer);
+ checkThreadNotInterrupted();
}
return mmapBuffer.position();
}
@@ -2206,8 +2213,9 @@
}
@Override
- public Void call()
+ public Void call() throws InterruptedException
{
+ checkThreadNotInterrupted();
try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
{
copyIntoChunk(sourceCursor, asChunk(treeName, destination));
@@ -2233,8 +2241,9 @@
}
@Override
- public Void call()
+ public Void call() throws InterruptedException
{
+ checkThreadNotInterrupted();
try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
{
final long nbRecords = copyIntoChunk(sourceCursor, asChunk(vlvIndex.getName(), destination));
@@ -2245,8 +2254,10 @@
}
private static long copyIntoChunk(SequentialCursor<ByteString, ByteString> source, Chunk destination)
+ throws InterruptedException
{
long nbRecords = 0;
+ checkThreadNotInterrupted();
while (source.next())
{
if (!destination.put(source.getKey(), source.getValue()))
@@ -2254,6 +2265,7 @@
throw new IllegalStateException("Destination chunk is full");
}
nbRecords++;
+ checkThreadNotInterrupted();
}
return nbRecords;
}
@@ -2302,10 +2314,12 @@
final SequentialCursor<ByteString, ByteString> dn2idCursor =
dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor))
{
+ checkThreadNotInterrupted();
while (dn2idCursor.next())
{
dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue());
totalNumberOfEntries++;
+ checkThreadNotInterrupted();
}
}
id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries));
@@ -2634,6 +2648,14 @@
return results;
}
+ private static void checkThreadNotInterrupted() throws InterruptedException
+ {
+ if (Thread.interrupted())
+ {
+ throw new InterruptedException();
+ }
+ }
+
/** Regularly report progress statistics from the registered list of {@link ProgressMetric}. */
private static final class PhaseTwoProgressReporter implements Runnable, Closeable
{
--
Gitblit v1.10.0