mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
28.19.2018 072b27a0e1f0ec459f388f5b62a82cd42c2476db
Job queuing...
4 files modified
214 ■■■■■ changed files
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java 21 ●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java 120 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java 48 ●●●● patch | view | raw | blame | history
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java 25 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -1,5 +1,6 @@
package de.micromata.borgbutler.jobs;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
@@ -10,6 +11,7 @@
    private boolean stopRequested;
    @Getter
    @Setter(AccessLevel.PACKAGE)
    private Status status;
    @Getter
    @Setter
@@ -21,10 +23,23 @@
    @Setter
    private String log;
    protected void stopped() {
        this.status = Status.STOPPED;
    /**
     *
     * @return true, if the job is done, stopped or failed. Otherwise false (if job is running or queued).
     */
    public boolean isFinished() {
        if (status == Status.DONE || status == Status.STOPPED || status == Status.FAILED) {
            return true;
        }
        return false;
    }
    public abstract void execute() throws InterruptedException;
    public abstract void execute();
    /**
     * A job is identified by this id. If a job with the same id is already queued (not yet finished), this job will
     * not be added twice.
     * @return
     */
    public abstract Object getId();
}
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -1,27 +1,123 @@
package de.micromata.borgbutler.jobs;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JobQueue {
    private ConcurrentLinkedQueue<AbstractJob> queue = new ConcurrentLinkedQueue<>();
    private List<AbstractJob> done = new LinkedList<>();
    Executor executor = Executors.newSingleThreadExecutor();
    private Logger log = LoggerFactory.getLogger(JobQueue.class);
    private List<AbstractJob> queue = new ArrayList<>();
    private List<AbstractJob> doneJobs = new LinkedList<>();
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Runner runner = new Runner();
    public AbstractJob appendOrJoin(AbstractJob job) {
    public int getQueueSize() {
        return queue.size();
    }
    /**
     * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
     *
     * @param job
     */
    public void append(AbstractJob job) {
        synchronized (queue) {
            if (queue.contains(job)) {
                for (AbstractJob queuedJob : queue) {
                    if (queuedJob.equals(job)) {
                        return queuedJob;
                if (Objects.equals(queuedJob.getId(), job.getId())) {
                    log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
                    return;
                    }
                }
            queue.add(job.setStatus(AbstractJob.Status.QUEUED));
            }
            queue.add(job);
        run();
    }
    public AbstractJob getQueuedJob(Object id) {
        for (AbstractJob job : queue) {
            if (Objects.equals(job.getId(), id)) {
            return job;
        }
    }
        return null;
    }
    void waitForQueue(int seconds) {
        int counter = seconds / 10;
        while (CollectionUtils.isNotEmpty(queue) && counter > 0) {
            try {
                run(); // If not running!
                Thread.sleep(100);
            } catch (InterruptedException ex) {
                log.error(ex.getMessage(), ex);
            }
        }
    }
    private void run() {
        synchronized (executorService) {
            if (!runner.running) {
                log.info("Starting job executor...");
                executorService.submit(runner);
            }
        }
    }
    private void organizeQueue() {
        synchronized (queue) {
            if (queue.isEmpty()) {
                return;
            }
            Iterator<AbstractJob> it = queue.iterator();
            while (it.hasNext()) {
                AbstractJob job = it.next();
                if (job.isFinished()) {
                    it.remove();
                    doneJobs.add(0, job);
                }
            }
        }
    }
    private class Runner implements Runnable {
        private boolean running;
        @Override
        public void run() {
            running = true;
            while (true) {
                AbstractJob job = null;
                synchronized (queue) {
                    organizeQueue();
                    if (queue.isEmpty()) {
                        running = false;
                        return;
                    }
                    for (AbstractJob queuedJob : queue) {
                        if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) {
                            job = queuedJob;
                            break;
                        }
                    }
                }
                if (job == null) {
                    running = false;
                    return;
                }
                try {
                    log.info("Starting job: " + job.getId());
                    job.setStatus(AbstractJob.Status.RUNNING);
                    job.execute();
                    job.setStatus(AbstractJob.Status.DONE);
                } catch (Exception ex) {
                    log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
                    job.setStatus(AbstractJob.Status.FAILED);
                }
            }
        }
    }
}
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -12,15 +12,22 @@
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class JobQueueTest {
    private Logger log = LoggerFactory.getLogger(JobQueueTest.class);
    private static String bashScript = "#!/bin/bash\n" +
            "COUNTER=0\n" +
            "while [  $COUNTER -lt $1 ]; do\n" +
            "  sleep 0.1\n" +
            "  echo The counter is $COUNTER\n" +
            "  if [ $COUNTER -eq $2 ]; then\n" +
            "    echo Error on counter $COUNTER >&2\n" +
            "    exit 2\n" +
            "  fi\n" +
            "  sleep 0.01\n" +
            "  echo The counter is $COUNTER >&2\n" +
            "  let COUNTER=COUNTER+1 \n" +
            "done";
            "done\n" +
            "echo $COUNTER\n";
    private static File file;
@@ -34,7 +41,38 @@
    @Test
    void test() {
        TestJob job = new TestJob(10, file);
        job.execute();
        JobQueue queue = new JobQueue();
        assertEquals(0, queue.getQueueSize());
        queue.append(new TestJob(10, file));
        assertEquals(1, queue.getQueueSize());
        queue.append(new TestJob(5, 2, file));
        assertEquals(2, queue.getQueueSize());
        queue.append(new TestJob(10, file));
        assertEquals(2, queue.getQueueSize());
        TestJob job1 = (TestJob)queue.getQueuedJob(10);
        int counter = 100;
        while (job1.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException ex) {
                log.error(ex.getMessage(), ex);
            }
        }
        assertEquals(AbstractJob.Status.RUNNING, job1.getStatus());
        TestJob job2 = (TestJob)queue.getQueuedJob(5);
        assertEquals(AbstractJob.Status.QUEUED, job2.getStatus());
        counter = 100;
        while (job2.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException ex) {
                log.error(ex.getMessage(), ex);
            }
        }
        queue.append(new TestJob(10, file));
        TestJob job3 = (TestJob)queue.getQueuedJob(10);
        assertEquals(AbstractJob.Status.QUEUED, job3.getStatus());
        queue.waitForQueue(10);
        assertEquals(0, queue.getQueueSize());
    }
}
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -13,17 +13,30 @@
    private Logger log = LoggerFactory.getLogger(TestJob.class);
    private int time;
    private File counterScript;
    private int failOn = -1;
    TestJob(int time, File counterScript) {
        this(time, -1, counterScript);
    }
    TestJob(int time, int failOn, File counterScript) {
        this.time = time;
        this.failOn = failOn;
        this.counterScript = counterScript;
    }
    @Override
    public Object getId() {
        return time;
    }
    @Override
    public void execute() {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
        CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath());
        cmdLine.addArgument(String.valueOf(this.time));
        cmdLine.addArgument(String.valueOf(this.failOn));
        DefaultExecutor executor = new DefaultExecutor();
        ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
        executor.setWatchdog(watchdog);
@@ -38,6 +51,17 @@
                    log.error(ex.getMessage(), ex);
                }
            }
        }, new LogOutputStream() {
            @Override
            protected void processLine(String line, int logLevel) {
                log.error(line);
                try {
                    errorOutputStream.write(line.getBytes());
                    errorOutputStream.write("\n".getBytes());
                } catch (IOException ex) {
                    log.error(ex.getMessage(), ex);
                }
            }
        });
        executor.setStreamHandler(streamHandler);
        log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'...");
@@ -47,5 +71,6 @@
            log.error("Error while executing script: " + ex.getMessage(), ex);
        }
        log.info(outputStream.toString(Charset.forName("UTF-8")));
        log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
    }
}