mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
28.51.2018 f9f5572180ea5a2b47201933895dfcd10077aa4f
Job queueing...
4 files added
1 files modified
156 ■■■■■ changed files
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java 8 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java 30 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java 27 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java 40 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java 51 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
@@ -1,11 +1,11 @@
package de.micromata.borgbutler;
import de.micromata.borgbutler.config.BorgRepoConfig;
import de.micromata.borgbutler.config.Configuration;
import de.micromata.borgbutler.config.ConfigurationHandler;
import de.micromata.borgbutler.config.Definitions;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * A queue is important because Borg doesn't support parallel calls for one repository.
@@ -80,8 +79,8 @@
            executor.setWorkingDirectory(command.getWorkingDir());
        }
        //executor.setExitValue(2);
        //ExecuteWatchdog watchdog = new ExecuteWatchdog(60000);
        //executor.setWatchdog(watchdog);
        ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
        executor.setWatchdog(watchdog);
        //  ExecuteResultHandler handler = new DefaultExecuteResultHandler();
        PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
        executor.setStreamHandler(streamHandler);
@@ -101,7 +100,6 @@
        if (command.getResultStatus() == BorgCommand.ResultStatus.ERROR) {
            log.error("Response: " + command.getAbbreviatedResponse());
        }
    }
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
New file
@@ -0,0 +1,30 @@
package de.micromata.borgbutler.jobs;
import lombok.Getter;
import lombok.Setter;
public abstract class AbstractJob {
    public enum Status {DONE, RUNNING, QUEUED, STOPPED, FAILED}
    @Getter
    @Setter
    private boolean stopRequested;
    @Getter
    private Status status;
    @Getter
    @Setter
    private String title;
    @Getter
    @Setter
    private String statusText;
    @Getter
    @Setter
    private String log;
    protected void stopped() {
        this.status = Status.STOPPED;
    }
    public abstract void execute() throws InterruptedException;
}
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
New file
@@ -0,0 +1,27 @@
package de.micromata.borgbutler.jobs;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class JobQueue {
    private ConcurrentLinkedQueue<AbstractJob> queue = new ConcurrentLinkedQueue<>();
    private List<AbstractJob> done = new LinkedList<>();
    Executor executor = Executors.newSingleThreadExecutor();
    public AbstractJob appendOrJoin(AbstractJob job) {
        synchronized (queue) {
            if (queue.contains(job)) {
                for (AbstractJob queuedJob : queue) {
                    if (queuedJob.equals(job)) {
                        return queuedJob;
                    }
                }
            }
            queue.add(job);
            return job;
        }
    }
}
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
New file
@@ -0,0 +1,40 @@
package de.micromata.borgbutler.jobs;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
public class JobQueueTest {
    private Logger log = LoggerFactory.getLogger(JobQueueTest.class);
    private static String bashScript = "#!/bin/bash\n" +
            "COUNTER=0\n" +
            "while [  $COUNTER -lt $1 ]; do\n" +
            "  sleep 0.1\n" +
            "  echo The counter is $COUNTER\n" +
            "  let COUNTER=COUNTER+1 \n" +
            "done";
    private static File file;
    @BeforeAll
    static void createScript() throws IOException {
        file = File.createTempFile("counter", ".sh");
        file.deleteOnExit();
        FileUtils.write(file, bashScript, Charset.forName("UTF-8"));
        Files.setPosixFilePermissions(file.toPath(), PosixFilePermissions.fromString("rwxr-xr-x"));
    }
    @Test
    void test() {
        TestJob job = new TestJob(10, file);
        job.execute();
    }
}
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
New file
@@ -0,0 +1,51 @@
package de.micromata.borgbutler.jobs;
import org.apache.commons.exec.*;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
public class TestJob extends AbstractJob {
    private Logger log = LoggerFactory.getLogger(TestJob.class);
    private int time;
    private File counterScript;
    TestJob(int time, File counterScript) {
        this.time = time;
        this.counterScript = counterScript;
    }
    @Override
    public void execute() {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath());
        cmdLine.addArgument(String.valueOf(this.time));
        DefaultExecutor executor = new DefaultExecutor();
        ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
        executor.setWatchdog(watchdog);
        PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
            @Override
            protected void processLine(String line, int level) {
                log.info(line);
                try {
                    outputStream.write(line.getBytes());
                    outputStream.write("\n".getBytes());
                } catch (IOException ex) {
                    log.error(ex.getMessage(), ex);
                }
            }
        });
        executor.setStreamHandler(streamHandler);
        log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'...");
        try {
            executor.execute(cmdLine);
        } catch (Exception ex) {
            log.error("Error while executing script: " + ex.getMessage(), ex);
        }
        log.info(outputStream.toString(Charset.forName("UTF-8")));
    }
}