From 9e49542cbb5180ef95645926bc7f64b3d5684ad0 Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 21:52:23 +0000
Subject: [PATCH] Job queueing...
---
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java | 41 +++++++++++++++++++-
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java | 37 +++++++++++-------
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java | 15 +++++++
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java | 24 +++++++++++-
4 files changed, 99 insertions(+), 18 deletions(-)
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
index d97b295..8cdb47d 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
@@ -3,10 +3,7 @@
import de.micromata.borgbutler.config.BorgRepoConfig;
import de.micromata.borgbutler.config.ConfigurationHandler;
import de.micromata.borgbutler.config.Definitions;
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.DefaultExecutor;
-import org.apache.commons.exec.ExecuteWatchdog;
-import org.apache.commons.exec.PumpStreamHandler;
+import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang3.StringUtils;
@@ -42,21 +39,13 @@
public void execute(BorgCommand command) {
synchronized (this) {
- //commandQueue.add(command);
_execute(command);
}
- /*
- while (true) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- log.warn("Command '" + command.);
- }
- }*/
}
private void _execute(BorgCommand command) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
CommandLine cmdLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand());
cmdLine.addArgument(command.getCommand());
if (command.getParams() != null) {
@@ -82,7 +71,27 @@
ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
// ExecuteResultHandler handler = new DefaultExecuteResultHandler();
- PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
+ PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
+ @Override
+ protected void processLine(String line, int level) {
+ try {
+ outputStream.write(line.getBytes());
+ outputStream.write("\n".getBytes());
+ } catch (IOException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ }, new LogOutputStream() {
+ @Override
+ protected void processLine(String line, int logLevel) {
+ try {
+ errorOutputStream.write(line.getBytes());
+ errorOutputStream.write("\n".getBytes());
+ } catch (IOException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ });
executor.setStreamHandler(streamHandler);
String borgCall = cmdLine.getExecutable() + " " + StringUtils.join(cmdLine.getArguments(), " ");
if (StringUtils.isNotBlank(command.getDescription())) {
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
index 2670b98..a66b32f 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -36,6 +36,17 @@
this.status = Status.CANCELLED;
}
this.cancelledRequested = true;
+ cancelRunningProcess();
+ }
+
+ protected void setCancelled() {
+ this.status = Status.CANCELLED;
+ }
+
+ /**
+ * Not supported if not implemented.
+ */
+ protected void cancelRunningProcess() {
}
/**
@@ -52,6 +63,10 @@
}
protected void failed() {
+ if (this.status == Status.CANCELLED) {
+ // do nothing. It's normal that cancelled jobs fail.
+ return;
+ }
if (this.status != Status.RUNNING) {
logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status);
}
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
index 6a4e1c2..0eb4edf 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -13,10 +13,11 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.List;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.*;
public class JobQueueTest {
private Logger log = LoggerFactory.getLogger(JobQueueTest.class);
+ // Bash script with simple counter and forced error if second argument is a valid counter.
private static String bashScript = "#!/bin/bash\n" +
"COUNTER=0\n" +
"while [ $COUNTER -lt $1 ]; do\n" +
@@ -72,7 +73,7 @@
job1 = (TestJob) queue.getQueuedJob(10);
assertEquals(AbstractJob.Status.QUEUED, job1.getStatus());
- job = (TestJob)queue.getQueuedJob(100);
+ job = (TestJob) queue.getQueuedJob(100);
job.cancel();
assertEquals(AbstractJob.Status.CANCELLED, job.getStatus());
@@ -88,6 +89,42 @@
check(((TestJob) doneJobs.get(3)), AbstractJob.Status.DONE, "10");
}
+ @Test
+ void queueStopRunningProcessTest() {
+ JobQueue queue = new JobQueue();
+ assertEquals(0, queue.getQueueSize());
+ queue.append(new TestJob(1000, file));
+ queue.append(new TestJob(10, file));
+ TestJob job = (TestJob) queue.getQueuedJob(1000);
+ int counter = 100;
+ while (!job.isExecuteStarted() && counter-- > 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ assertTrue(counter > 0);
+ assertEquals(AbstractJob.Status.RUNNING, job.getStatus());
+ job.cancel();
+ counter = 100;
+ while (job.getStatus() == AbstractJob.Status.RUNNING && counter-- > 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ assertTrue(counter > 0);
+ assertEquals(AbstractJob.Status.CANCELLED, job.getStatus());
+ job = (TestJob)queue.getQueuedJob(10);
+ assertEquals("10\n", job.getResult());
+ List<AbstractJob> doneJobs = queue.getDoneJobs();
+ assertEquals(2, doneJobs.size());
+ check(((TestJob) doneJobs.get(0)), AbstractJob.Status.DONE, null);
+ check(((TestJob) doneJobs.get(1)), AbstractJob.Status.CANCELLED, null);
+ }
+
private void check(TestJob job, AbstractJob.Status status, String result) {
assertEquals(status, job.getStatus());
if (result != null) {
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
index fe79067..8c2a163 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -1,5 +1,6 @@
package de.micromata.borgbutler.jobs;
+import lombok.Getter;
import org.apache.commons.exec.*;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.slf4j.Logger;
@@ -14,6 +15,9 @@
private int time;
private File counterScript;
private int failOn = -1;
+ private ExecuteWatchdog watchdog;
+ @Getter
+ private boolean executeStarted;
TestJob(int time, File counterScript) {
this(time, -1, counterScript);
@@ -31,6 +35,17 @@
}
@Override
+ protected void cancelRunningProcess() {
+ log.info("CancelRunningProcess: " + watchdog + ", " + getStatus());
+ if (watchdog != null) {
+ log.info("Cancelling job: " + getId());
+ watchdog.destroyProcess();
+ watchdog = null;
+ setCancelled();
+ }
+ }
+
+ @Override
public String execute() {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
@@ -38,7 +53,7 @@
cmdLine.addArgument(String.valueOf(this.time));
cmdLine.addArgument(String.valueOf(this.failOn));
DefaultExecutor executor = new DefaultExecutor();
- ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+ watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
@Override
@@ -64,12 +79,17 @@
}
});
executor.setStreamHandler(streamHandler);
+ if (isCancelledRequested()) {
+ setCancelled();
+ return null;
+ }
log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'...");
+ executeStarted = true;
try {
executor.execute(cmdLine);
} catch (Exception ex) {
failed();
- if (failOn < 0) {
+ if (failOn < 0 && getStatus() != Status.CANCELLED) {
log.error("Error while executing script: " + ex.getMessage(), ex);
}
}
--
Gitblit v1.10.0