mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
28.16.2018 62b938d31692414f86c8f306b855e2f3bc7cf536
Job queueing...
4 files modified
71 ■■■■■ changed files
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java 8 ●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java 54 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java 2 ●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java 7 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -6,7 +6,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractJob {
import java.util.concurrent.Future;
public abstract class AbstractJob<T> {
    private Logger logger = LoggerFactory.getLogger(AbstractJob.class);
    public enum Status {DONE, RUNNING, QUEUED, STOPPED, FAILED}
    @Getter
@@ -23,6 +25,8 @@
    @Setter
    private String statusText;
    private Future<T> future;
    protected void failed() {
        if (this.status != Status.RUNNING) {
            logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status);
@@ -40,7 +44,7 @@
        return false;
    }
    public abstract void execute();
    public abstract T execute();
    /**
     * A job is identified by this id. If a job with the same id is already queued (not yet finished), this job will
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -5,16 +5,16 @@
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JobQueue {
public class JobQueue<T> {
    private static final int MAX_DONE_JOBS_SIZE = 50;
    private Logger log = LoggerFactory.getLogger(JobQueue.class);
    private List<AbstractJob> queue = new ArrayList<>();
    private List<AbstractJob> doneJobs = new LinkedList<>();
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Runner runner = new Runner();
    public int getQueueSize() {
        return queue.size();
@@ -38,8 +38,8 @@
                }
            }
            queue.add(job.setStatus(AbstractJob.Status.QUEUED));
            executorService.submit(new CallableTask(job));
        }
        run();
    }
    public AbstractJob getQueuedJob(Object id) {
@@ -55,7 +55,6 @@
        int counter = seconds / 10;
        while (CollectionUtils.isNotEmpty(queue) && counter > 0) {
            try {
                run(); // If not running!
                Thread.sleep(100);
            } catch (InterruptedException ex) {
                log.error(ex.getMessage(), ex);
@@ -63,15 +62,6 @@
        }
    }
    private void run() {
        synchronized (executorService) {
            if (!runner.running) {
                log.info("Starting job executor...");
                executorService.submit(runner);
            }
        }
    }
    private void organizeQueue() {
        synchronized (queue) {
            if (queue.isEmpty()) {
@@ -91,43 +81,33 @@
        }
    }
    private class Runner implements Runnable {
        private boolean running;
    private class CallableTask implements Callable<T> {
        private AbstractJob<T> job;
        private CallableTask(AbstractJob<T> job) {
            this.job = job;
        }
        @Override
        public void run() {
            running = true;
            while (true) {
                AbstractJob job = null;
                synchronized (queue) {
                    organizeQueue();
                    if (queue.isEmpty()) {
                        running = false;
                        return;
                    }
                    for (AbstractJob queuedJob : queue) {
                        if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) {
                            job = queuedJob;
                            break;
                        }
                    }
                }
                if (job == null) {
                    running = false;
                    return;
        public T call() throws Exception {
            if (job.isStopRequested()) {
                job.setStatus(AbstractJob.Status.STOPPED);
                return null;
                }
                try {
                    log.info("Starting job: " + job.getId());
                    job.setStatus(AbstractJob.Status.RUNNING);
                    job.execute();
                T result = job.execute();
                    if (!job.isFinished()) {
                        // Don't overwrite status failed set by job.
                        job.setStatus(AbstractJob.Status.DONE);
                    }
                organizeQueue();
                return result;
                } catch (Exception ex) {
                    log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
                    job.setStatus(AbstractJob.Status.FAILED);
                }
                return null;
            }
        }
    }
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -24,7 +24,7 @@
            "    echo Error on counter $COUNTER >&2\n" +
            "    exit 2\n" +
            "  fi\n" +
            "  sleep 0.05\n" +
            "  sleep 0.1\n" +
            "  echo The counter is $COUNTER >&2\n" +
            "  let COUNTER=COUNTER+1 \n" +
            "done\n" +
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -7,8 +7,9 @@
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
public class TestJob extends AbstractJob {
public class TestJob extends AbstractJob<String> {
    private Logger log = LoggerFactory.getLogger(TestJob.class);
    private int time;
    private File counterScript;
@@ -30,7 +31,7 @@
    }
    @Override
    public void execute() {
    public String execute() {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
        CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath());
@@ -72,7 +73,7 @@
                log.error("Error while executing script: " + ex.getMessage(), ex);
            }
        }
        //log.info(outputStream.toString(Charset.forName("UTF-8")));
        return outputStream.toString(Charset.forName("UTF-8"));
        //log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
    }
}