borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
@@ -3,10 +3,7 @@ import de.micromata.borgbutler.config.BorgRepoConfig; import de.micromata.borgbutler.config.ConfigurationHandler; import de.micromata.borgbutler.config.Definitions; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteWatchdog; import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.exec.*; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.lang3.StringUtils; @@ -42,21 +39,13 @@ public void execute(BorgCommand command) { synchronized (this) { //commandQueue.add(command); _execute(command); } /* while (true) { try { Thread.sleep(1000); } catch (InterruptedException ex) { log.warn("Command '" + command.); } }*/ } private void _execute(BorgCommand command) { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream(); CommandLine cmdLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand()); cmdLine.addArgument(command.getCommand()); if (command.getParams() != null) { @@ -82,7 +71,27 @@ ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); executor.setWatchdog(watchdog); // ExecuteResultHandler handler = new DefaultExecuteResultHandler(); PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream); PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() { @Override protected void processLine(String line, int level) { try { outputStream.write(line.getBytes()); outputStream.write("\n".getBytes()); } catch (IOException ex) { log.error(ex.getMessage(), ex); } } }, new LogOutputStream() { @Override protected void processLine(String line, int logLevel) { try { errorOutputStream.write(line.getBytes()); errorOutputStream.write("\n".getBytes()); } catch (IOException ex) { log.error(ex.getMessage(), ex); } } }); executor.setStreamHandler(streamHandler); String borgCall = cmdLine.getExecutable() + " " + StringUtils.join(cmdLine.getArguments(), " "); if (StringUtils.isNotBlank(command.getDescription())) { borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -36,6 +36,17 @@ this.status = Status.CANCELLED; } this.cancelledRequested = true; cancelRunningProcess(); } protected void setCancelled() { this.status = Status.CANCELLED; } /** * Not supported if not implemented. */ protected void cancelRunningProcess() { } /** @@ -52,6 +63,10 @@ } protected void failed() { if (this.status == Status.CANCELLED) { // do nothing. It's normal that cancelled jobs fail. return; } if (this.status != Status.RUNNING) { logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status); } borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -13,10 +13,11 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.*; public class JobQueueTest { private Logger log = LoggerFactory.getLogger(JobQueueTest.class); // Bash script with simple counter and forced error if second argument is a valid counter. private static String bashScript = "#!/bin/bash\n" + "COUNTER=0\n" + "while [ $COUNTER -lt $1 ]; do\n" + @@ -72,7 +73,7 @@ job1 = (TestJob) queue.getQueuedJob(10); assertEquals(AbstractJob.Status.QUEUED, job1.getStatus()); job = (TestJob)queue.getQueuedJob(100); job = (TestJob) queue.getQueuedJob(100); job.cancel(); assertEquals(AbstractJob.Status.CANCELLED, job.getStatus()); @@ -88,6 +89,42 @@ check(((TestJob) doneJobs.get(3)), AbstractJob.Status.DONE, "10"); } @Test void queueStopRunningProcessTest() { JobQueue queue = new JobQueue(); assertEquals(0, queue.getQueueSize()); queue.append(new TestJob(1000, file)); queue.append(new TestJob(10, file)); TestJob job = (TestJob) queue.getQueuedJob(1000); int counter = 100; while (!job.isExecuteStarted() && counter-- > 0) { try { Thread.sleep(10); } catch (InterruptedException ex) { log.error(ex.getMessage(), ex); } } assertTrue(counter > 0); assertEquals(AbstractJob.Status.RUNNING, job.getStatus()); job.cancel(); counter = 100; while (job.getStatus() == AbstractJob.Status.RUNNING && counter-- > 0) { try { Thread.sleep(10); } catch (InterruptedException ex) { log.error(ex.getMessage(), ex); } } assertTrue(counter > 0); assertEquals(AbstractJob.Status.CANCELLED, job.getStatus()); job = (TestJob)queue.getQueuedJob(10); assertEquals("10\n", job.getResult()); List<AbstractJob> doneJobs = queue.getDoneJobs(); assertEquals(2, doneJobs.size()); check(((TestJob) doneJobs.get(0)), AbstractJob.Status.DONE, null); check(((TestJob) doneJobs.get(1)), AbstractJob.Status.CANCELLED, null); } private void check(TestJob job, AbstractJob.Status status, String result) { assertEquals(status, job.getStatus()); if (result != null) { borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -1,5 +1,6 @@ package de.micromata.borgbutler.jobs; import lombok.Getter; import org.apache.commons.exec.*; import org.apache.commons.io.output.ByteArrayOutputStream; import org.slf4j.Logger; @@ -14,6 +15,9 @@ private int time; private File counterScript; private int failOn = -1; private ExecuteWatchdog watchdog; @Getter private boolean executeStarted; TestJob(int time, File counterScript) { this(time, -1, counterScript); @@ -31,6 +35,17 @@ } @Override protected void cancelRunningProcess() { log.info("CancelRunningProcess: " + watchdog + ", " + getStatus()); if (watchdog != null) { log.info("Cancelling job: " + getId()); watchdog.destroyProcess(); watchdog = null; setCancelled(); } } @Override public String execute() { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream(); @@ -38,7 +53,7 @@ cmdLine.addArgument(String.valueOf(this.time)); cmdLine.addArgument(String.valueOf(this.failOn)); DefaultExecutor executor = new DefaultExecutor(); ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); executor.setWatchdog(watchdog); PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() { @Override @@ -64,12 +79,17 @@ } }); executor.setStreamHandler(streamHandler); if (isCancelledRequested()) { setCancelled(); return null; } log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'..."); executeStarted = true; try { executor.execute(cmdLine); } catch (Exception ex) { failed(); if (failOn < 0) { if (failOn < 0 && getStatus() != Status.CANCELLED) { log.error("Error while executing script: " + ex.getMessage(), ex); } }