mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
28.52.2018 9e49542cbb5180ef95645926bc7f64b3d5684ad0
Job queueing...
4 files modified
117 ■■■■ changed files
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java 37 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java 15 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java 41 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java 24 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
@@ -3,10 +3,7 @@
import de.micromata.borgbutler.config.BorgRepoConfig;
import de.micromata.borgbutler.config.ConfigurationHandler;
import de.micromata.borgbutler.config.Definitions;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang3.StringUtils;
@@ -42,21 +39,13 @@
    public void execute(BorgCommand command) {
        synchronized (this) {
            //commandQueue.add(command);
            _execute(command);
        }
        /*
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                log.warn("Command '" + command.);
            }
        }*/
    }
    private void _execute(BorgCommand command) {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
        CommandLine cmdLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand());
        cmdLine.addArgument(command.getCommand());
        if (command.getParams() != null) {
@@ -82,7 +71,27 @@
        ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
        executor.setWatchdog(watchdog);
        //  ExecuteResultHandler handler = new DefaultExecuteResultHandler();
        PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
        PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
            @Override
            protected void processLine(String line, int level) {
                try {
                    outputStream.write(line.getBytes());
                    outputStream.write("\n".getBytes());
                } catch (IOException ex) {
                    log.error(ex.getMessage(), ex);
                }
            }
        }, new LogOutputStream() {
            @Override
            protected void processLine(String line, int logLevel) {
                try {
                    errorOutputStream.write(line.getBytes());
                    errorOutputStream.write("\n".getBytes());
                } catch (IOException ex) {
                    log.error(ex.getMessage(), ex);
                }
            }
        });
        executor.setStreamHandler(streamHandler);
        String borgCall = cmdLine.getExecutable() + " " + StringUtils.join(cmdLine.getArguments(), " ");
        if (StringUtils.isNotBlank(command.getDescription())) {
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -36,6 +36,17 @@
            this.status = Status.CANCELLED;
        }
        this.cancelledRequested = true;
        cancelRunningProcess();
    }
    protected void setCancelled() {
        this.status = Status.CANCELLED;
    }
    /**
     * Not supported if not implemented.
     */
    protected void cancelRunningProcess() {
    }
    /**
@@ -52,6 +63,10 @@
    }
    protected void failed() {
        if (this.status == Status.CANCELLED) {
            // do nothing. It's normal that cancelled jobs fail.
            return;
        }
        if (this.status != Status.RUNNING) {
            logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status);
        }
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -13,10 +13,11 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;
public class JobQueueTest {
    private Logger log = LoggerFactory.getLogger(JobQueueTest.class);
    // Bash script with simple counter and forced error if second argument is a valid counter.
    private static String bashScript = "#!/bin/bash\n" +
            "COUNTER=0\n" +
            "while [ $COUNTER -lt $1 ]; do\n" +
@@ -72,7 +73,7 @@
        job1 = (TestJob) queue.getQueuedJob(10);
        assertEquals(AbstractJob.Status.QUEUED, job1.getStatus());
        job = (TestJob)queue.getQueuedJob(100);
        job = (TestJob) queue.getQueuedJob(100);
        job.cancel();
        assertEquals(AbstractJob.Status.CANCELLED, job.getStatus());
@@ -88,6 +89,42 @@
        check(((TestJob) doneJobs.get(3)), AbstractJob.Status.DONE, "10");
    }
    @Test
    void queueStopRunningProcessTest() {
        JobQueue queue = new JobQueue();
        assertEquals(0, queue.getQueueSize());
        queue.append(new TestJob(1000, file));
        queue.append(new TestJob(10, file));
        TestJob job = (TestJob) queue.getQueuedJob(1000);
        int counter = 100;
        while (!job.isExecuteStarted() && counter-- > 0) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException ex) {
                log.error(ex.getMessage(), ex);
            }
        }
        assertTrue(counter > 0);
        assertEquals(AbstractJob.Status.RUNNING, job.getStatus());
        job.cancel();
        counter = 100;
        while (job.getStatus() == AbstractJob.Status.RUNNING && counter-- > 0) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException ex) {
                log.error(ex.getMessage(), ex);
            }
        }
        assertTrue(counter > 0);
        assertEquals(AbstractJob.Status.CANCELLED, job.getStatus());
        job = (TestJob)queue.getQueuedJob(10);
        assertEquals("10\n", job.getResult());
        List<AbstractJob> doneJobs = queue.getDoneJobs();
        assertEquals(2, doneJobs.size());
        check(((TestJob) doneJobs.get(0)), AbstractJob.Status.DONE, null);
        check(((TestJob) doneJobs.get(1)), AbstractJob.Status.CANCELLED, null);
    }
    private void check(TestJob job, AbstractJob.Status status, String result) {
        assertEquals(status, job.getStatus());
        if (result != null) {
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -1,5 +1,6 @@
package de.micromata.borgbutler.jobs;
import lombok.Getter;
import org.apache.commons.exec.*;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.slf4j.Logger;
@@ -14,6 +15,9 @@
    private int time;
    private File counterScript;
    private int failOn = -1;
    private ExecuteWatchdog watchdog;
    @Getter
    private boolean executeStarted;
    TestJob(int time, File counterScript) {
        this(time, -1, counterScript);
@@ -31,6 +35,17 @@
    }
    @Override
    protected void cancelRunningProcess() {
        log.info("CancelRunningProcess: " + watchdog + ", " + getStatus());
        if (watchdog != null) {
            log.info("Cancelling job: " + getId());
            watchdog.destroyProcess();
            watchdog = null;
            setCancelled();
        }
    }
    @Override
    public String execute() {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
@@ -38,7 +53,7 @@
        cmdLine.addArgument(String.valueOf(this.time));
        cmdLine.addArgument(String.valueOf(this.failOn));
        DefaultExecutor executor = new DefaultExecutor();
        ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
        watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
        executor.setWatchdog(watchdog);
        PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
            @Override
@@ -64,12 +79,17 @@
            }
        });
        executor.setStreamHandler(streamHandler);
        if (isCancelledRequested()) {
            setCancelled();
            return null;
        }
        log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'...");
        executeStarted = true;
        try {
            executor.execute(cmdLine);
        } catch (Exception ex) {
            failed();
            if (failOn < 0) {
            if (failOn < 0 && getStatus() != Status.CANCELLED) {
                log.error("Error while executing script: " + ex.getMessage(), ex);
            }
        }