From 9e49542cbb5180ef95645926bc7f64b3d5684ad0 Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 21:52:23 +0000
Subject: [PATCH] Job queueing...

---
 borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java |   41 +++++++++++++++++++-
 borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java |   37 +++++++++++-------
 borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java  |   15 +++++++
 borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java      |   24 +++++++++++-
 4 files changed, 99 insertions(+), 18 deletions(-)

diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
index d97b295..8cdb47d 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
@@ -3,10 +3,7 @@
 import de.micromata.borgbutler.config.BorgRepoConfig;
 import de.micromata.borgbutler.config.ConfigurationHandler;
 import de.micromata.borgbutler.config.Definitions;
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.DefaultExecutor;
-import org.apache.commons.exec.ExecuteWatchdog;
-import org.apache.commons.exec.PumpStreamHandler;
+import org.apache.commons.exec.*;
 import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.commons.lang3.StringUtils;
@@ -42,21 +39,13 @@
 
     public void execute(BorgCommand command) {
         synchronized (this) {
-            //commandQueue.add(command);
             _execute(command);
         }
-        /*
-        while (true) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ex) {
-                log.warn("Command '" + command.);
-            }
-        }*/
     }
 
     private void _execute(BorgCommand command) {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
         CommandLine cmdLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand());
         cmdLine.addArgument(command.getCommand());
         if (command.getParams() != null) {
@@ -82,7 +71,27 @@
         ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
         executor.setWatchdog(watchdog);
         //  ExecuteResultHandler handler = new DefaultExecuteResultHandler();
-        PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
+        PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
+            @Override
+            protected void processLine(String line, int level) {
+                try {
+                    outputStream.write(line.getBytes());
+                    outputStream.write("\n".getBytes());
+                } catch (IOException ex) {
+                    log.error(ex.getMessage(), ex);
+                }
+            }
+        }, new LogOutputStream() {
+            @Override
+            protected void processLine(String line, int logLevel) {
+                try {
+                    errorOutputStream.write(line.getBytes());
+                    errorOutputStream.write("\n".getBytes());
+                } catch (IOException ex) {
+                    log.error(ex.getMessage(), ex);
+                }
+            }
+        });
         executor.setStreamHandler(streamHandler);
         String borgCall = cmdLine.getExecutable() + " " + StringUtils.join(cmdLine.getArguments(), " ");
         if (StringUtils.isNotBlank(command.getDescription())) {
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
index 2670b98..a66b32f 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -36,6 +36,17 @@
             this.status = Status.CANCELLED;
         }
         this.cancelledRequested = true;
+        cancelRunningProcess();
+    }
+
+    protected void setCancelled() {
+        this.status = Status.CANCELLED;
+    }
+
+    /**
+     * Not supported if not implemented.
+     */
+    protected void cancelRunningProcess() {
     }
 
     /**
@@ -52,6 +63,10 @@
     }
 
     protected void failed() {
+        if (this.status == Status.CANCELLED) {
+            // do nothing. It's normal that cancelled jobs fail.
+            return;
+        }
         if (this.status != Status.RUNNING) {
             logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status);
         }
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
index 6a4e1c2..0eb4edf 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -13,10 +13,11 @@
 import java.nio.file.attribute.PosixFilePermissions;
 import java.util.List;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.*;
 
 public class JobQueueTest {
     private Logger log = LoggerFactory.getLogger(JobQueueTest.class);
+    // Bash script with simple counter and forced error if second argument is a valid counter.
     private static String bashScript = "#!/bin/bash\n" +
             "COUNTER=0\n" +
             "while [ $COUNTER -lt $1 ]; do\n" +
@@ -72,7 +73,7 @@
         job1 = (TestJob) queue.getQueuedJob(10);
         assertEquals(AbstractJob.Status.QUEUED, job1.getStatus());
 
-        job = (TestJob)queue.getQueuedJob(100);
+        job = (TestJob) queue.getQueuedJob(100);
         job.cancel();
         assertEquals(AbstractJob.Status.CANCELLED, job.getStatus());
 
@@ -88,6 +89,42 @@
         check(((TestJob) doneJobs.get(3)), AbstractJob.Status.DONE, "10");
     }
 
+    @Test
+    void queueStopRunningProcessTest() {
+        JobQueue queue = new JobQueue();
+        assertEquals(0, queue.getQueueSize());
+        queue.append(new TestJob(1000, file));
+        queue.append(new TestJob(10, file));
+        TestJob job = (TestJob) queue.getQueuedJob(1000);
+        int counter = 100;
+        while (!job.isExecuteStarted() && counter-- > 0) {
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException ex) {
+                log.error(ex.getMessage(), ex);
+            }
+        }
+        assertTrue(counter > 0);
+        assertEquals(AbstractJob.Status.RUNNING, job.getStatus());
+        job.cancel();
+        counter = 100;
+        while (job.getStatus() == AbstractJob.Status.RUNNING && counter-- > 0) {
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException ex) {
+                log.error(ex.getMessage(), ex);
+            }
+        }
+        assertTrue(counter > 0);
+        assertEquals(AbstractJob.Status.CANCELLED, job.getStatus());
+        job = (TestJob)queue.getQueuedJob(10);
+        assertEquals("10\n", job.getResult());
+        List<AbstractJob> doneJobs = queue.getDoneJobs();
+        assertEquals(2, doneJobs.size());
+        check(((TestJob) doneJobs.get(0)), AbstractJob.Status.DONE, null);
+        check(((TestJob) doneJobs.get(1)), AbstractJob.Status.CANCELLED, null);
+    }
+
     private void check(TestJob job, AbstractJob.Status status, String result) {
         assertEquals(status, job.getStatus());
         if (result != null) {
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
index fe79067..8c2a163 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -1,5 +1,6 @@
 package de.micromata.borgbutler.jobs;
 
+import lombok.Getter;
 import org.apache.commons.exec.*;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.slf4j.Logger;
@@ -14,6 +15,9 @@
     private int time;
     private File counterScript;
     private int failOn = -1;
+    private ExecuteWatchdog watchdog;
+    @Getter
+    private boolean executeStarted;
 
     TestJob(int time, File counterScript) {
         this(time, -1, counterScript);
@@ -31,6 +35,17 @@
     }
 
     @Override
+    protected void cancelRunningProcess() {
+        log.info("CancelRunningProcess: " + watchdog + ", " + getStatus());
+        if (watchdog != null) {
+            log.info("Cancelling job: " + getId());
+            watchdog.destroyProcess();
+            watchdog = null;
+            setCancelled();
+        }
+    }
+
+    @Override
     public String execute() {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
@@ -38,7 +53,7 @@
         cmdLine.addArgument(String.valueOf(this.time));
         cmdLine.addArgument(String.valueOf(this.failOn));
         DefaultExecutor executor = new DefaultExecutor();
-        ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+        watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
         executor.setWatchdog(watchdog);
         PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
             @Override
@@ -64,12 +79,17 @@
             }
         });
         executor.setStreamHandler(streamHandler);
+        if (isCancelledRequested()) {
+            setCancelled();
+            return null;
+        }
         log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'...");
+        executeStarted = true;
         try {
             executor.execute(cmdLine);
         } catch (Exception ex) {
             failed();
-            if (failOn < 0) {
+            if (failOn < 0 && getStatus() != Status.CANCELLED) {
                 log.error("Error while executing script: " + ex.getMessage(), ex);
             }
         }

--
Gitblit v1.10.0