mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
28.42.2018 ea69c28b8aa40b0de84e3ec52941d08ae9ef6cef
Job queueing...
4 files modified
69 ■■■■ changed files
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java 12 ●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java 13 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java 30 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java 14 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -3,8 +3,11 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractJob {
    private Logger logger = LoggerFactory.getLogger(AbstractJob.class);
    public enum Status {DONE, RUNNING, QUEUED, STOPPED, FAILED}
    @Getter
    @Setter
@@ -19,10 +22,13 @@
    @Getter
    @Setter
    private String statusText;
    @Getter
    @Setter
    private String log;
    protected void failed() {
        if (this.status != Status.RUNNING) {
            logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status);
        }
        this.status = Status.FAILED;
    }
    /**
     *
     * @return true, if the job is done, stopped or failed. Otherwise false (if job is running or queued).
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -9,6 +9,7 @@
import java.util.concurrent.Executors;
public class JobQueue {
    private static final int MAX_DONE_JOBS_SIZE = 50;
    private Logger log = LoggerFactory.getLogger(JobQueue.class);
    private List<AbstractJob> queue = new ArrayList<>();
    private List<AbstractJob> doneJobs = new LinkedList<>();
@@ -19,6 +20,10 @@
        return queue.size();
    }
    public List<AbstractJob> getDoneJobs() {
        return Collections.unmodifiableList(doneJobs);
    }
    /**
     * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
     *
@@ -79,6 +84,9 @@
                    it.remove();
                    doneJobs.add(0, job);
                }
                while (doneJobs.size() > MAX_DONE_JOBS_SIZE) {
                    doneJobs.remove(doneJobs.size() - 1);
                }
            }
        }
    }
@@ -112,7 +120,10 @@
                    log.info("Starting job: " + job.getId());
                    job.setStatus(AbstractJob.Status.RUNNING);
                    job.execute();
                    job.setStatus(AbstractJob.Status.DONE);
                    if (!job.isFinished()) {
                        // Don't overwrite status failed set by job.
                        job.setStatus(AbstractJob.Status.DONE);
                    }
                } catch (Exception ex) {
                    log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
                    job.setStatus(AbstractJob.Status.FAILED);
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -11,6 +11,7 @@
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -23,7 +24,7 @@
            "    echo Error on counter $COUNTER >&2\n" +
            "    exit 2\n" +
            "  fi\n" +
            "  sleep 0.01\n" +
            "  sleep 0.05\n" +
            "  echo The counter is $COUNTER >&2\n" +
            "  let COUNTER=COUNTER+1 \n" +
            "done\n" +
@@ -40,7 +41,7 @@
    }
    @Test
    void test() {
    void queueTest() {
        JobQueue queue = new JobQueue();
        assertEquals(0, queue.getQueueSize());
        queue.append(new TestJob(10, file));
@@ -49,20 +50,20 @@
        assertEquals(2, queue.getQueueSize());
        queue.append(new TestJob(10, file));
        assertEquals(2, queue.getQueueSize());
        TestJob job1 = (TestJob)queue.getQueuedJob(10);
        TestJob job = (TestJob) queue.getQueuedJob(10);
        int counter = 100;
        while (job1.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
        while (job.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException ex) {
                log.error(ex.getMessage(), ex);
            }
        }
        assertEquals(AbstractJob.Status.RUNNING, job1.getStatus());
        TestJob job2 = (TestJob)queue.getQueuedJob(5);
        assertEquals(AbstractJob.Status.QUEUED, job2.getStatus());
        assertEquals(AbstractJob.Status.RUNNING, job.getStatus());
        job = (TestJob) queue.getQueuedJob(5);
        assertEquals(AbstractJob.Status.QUEUED, job.getStatus());
        counter = 100;
        while (job2.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
        while (job.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException ex) {
@@ -70,9 +71,18 @@
            }
        }
        queue.append(new TestJob(10, file));
        TestJob job3 = (TestJob)queue.getQueuedJob(10);
        assertEquals(AbstractJob.Status.QUEUED, job3.getStatus());
        job = (TestJob) queue.getQueuedJob(10);
        assertEquals(AbstractJob.Status.QUEUED, job.getStatus());
        queue.waitForQueue(10);
        assertEquals(0, queue.getQueueSize());
        List<AbstractJob> doneJobs = queue.getDoneJobs();
        assertEquals(3, doneJobs.size());
        check(((TestJob)doneJobs.get(0)), AbstractJob.Status.DONE, "10");
        check(((TestJob)doneJobs.get(1)), AbstractJob.Status.FAILED, "10");
        check(((TestJob)doneJobs.get(2)), AbstractJob.Status.DONE, "10");
    }
    private void check(TestJob job, AbstractJob.Status status, String result) {
        assertEquals(status, job.getStatus());
    }
}
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -7,7 +7,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
public class TestJob extends AbstractJob {
    private Logger log = LoggerFactory.getLogger(TestJob.class);
@@ -43,7 +42,7 @@
        PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
            @Override
            protected void processLine(String line, int level) {
                log.info(line);
                //log.info(line);
                try {
                    outputStream.write(line.getBytes());
                    outputStream.write("\n".getBytes());
@@ -54,7 +53,7 @@
        }, new LogOutputStream() {
            @Override
            protected void processLine(String line, int logLevel) {
                log.error(line);
                //log.error(line);
                try {
                    errorOutputStream.write(line.getBytes());
                    errorOutputStream.write("\n".getBytes());
@@ -68,9 +67,12 @@
        try {
            executor.execute(cmdLine);
        } catch (Exception ex) {
            log.error("Error while executing script: " + ex.getMessage(), ex);
            failed();
            if (failOn < 0) {
                log.error("Error while executing script: " + ex.getMessage(), ex);
            }
        }
        log.info(outputStream.toString(Charset.forName("UTF-8")));
        log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
        //log.info(outputStream.toString(Charset.forName("UTF-8")));
        //log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
    }
}