borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -1,5 +1,6 @@ package de.micromata.borgbutler.jobs; import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; @@ -10,6 +11,7 @@ private boolean stopRequested; @Getter @Setter(AccessLevel.PACKAGE) private Status status; @Getter @Setter @@ -21,10 +23,23 @@ @Setter private String log; protected void stopped() { this.status = Status.STOPPED; /** * * @return true, if the job is done, stopped or failed. Otherwise false (if job is running or queued). */ public boolean isFinished() { if (status == Status.DONE || status == Status.STOPPED || status == Status.FAILED) { return true; } return false; } public abstract void execute() throws InterruptedException; public abstract void execute(); /** * A job is identified by this id. If a job with the same id is already queued (not yet finished), this job will * not be added twice. * @return */ public abstract Object getId(); } borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -1,27 +1,123 @@ package de.micromata.borgbutler.jobs; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class JobQueue { private ConcurrentLinkedQueue<AbstractJob> queue = new ConcurrentLinkedQueue<>(); private List<AbstractJob> done = new LinkedList<>(); Executor executor = Executors.newSingleThreadExecutor(); private Logger log = LoggerFactory.getLogger(JobQueue.class); private List<AbstractJob> queue = new ArrayList<>(); private List<AbstractJob> doneJobs = new LinkedList<>(); private ExecutorService executorService = Executors.newSingleThreadExecutor(); private Runner runner = new Runner(); public AbstractJob appendOrJoin(AbstractJob job) { public int getQueueSize() { return queue.size(); } /** * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running. * * @param job */ public void append(AbstractJob job) { synchronized (queue) { if (queue.contains(job)) { for (AbstractJob queuedJob : queue) { if (queuedJob.equals(job)) { return queuedJob; } for (AbstractJob queuedJob : queue) { if (Objects.equals(queuedJob.getId(), job.getId())) { log.info("Job is already in the queue, don't run twice (OK): " + job.getId()); return; } } queue.add(job); return job; queue.add(job.setStatus(AbstractJob.Status.QUEUED)); } run(); } public AbstractJob getQueuedJob(Object id) { for (AbstractJob job : queue) { if (Objects.equals(job.getId(), id)) { return job; } } return null; } void waitForQueue(int seconds) { int counter = seconds / 10; while (CollectionUtils.isNotEmpty(queue) && counter > 0) { try { run(); // If not running! Thread.sleep(100); } catch (InterruptedException ex) { log.error(ex.getMessage(), ex); } } } private void run() { synchronized (executorService) { if (!runner.running) { log.info("Starting job executor..."); executorService.submit(runner); } } } private void organizeQueue() { synchronized (queue) { if (queue.isEmpty()) { return; } Iterator<AbstractJob> it = queue.iterator(); while (it.hasNext()) { AbstractJob job = it.next(); if (job.isFinished()) { it.remove(); doneJobs.add(0, job); } } } } private class Runner implements Runnable { private boolean running; @Override public void run() { running = true; while (true) { AbstractJob job = null; synchronized (queue) { organizeQueue(); if (queue.isEmpty()) { running = false; return; } for (AbstractJob queuedJob : queue) { if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) { job = queuedJob; break; } } } if (job == null) { running = false; return; } try { log.info("Starting job: " + job.getId()); job.setStatus(AbstractJob.Status.RUNNING); job.execute(); job.setStatus(AbstractJob.Status.DONE); } catch (Exception ex) { log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex); job.setStatus(AbstractJob.Status.FAILED); } } } } } borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -12,15 +12,22 @@ import java.nio.file.Files; import java.nio.file.attribute.PosixFilePermissions; import static org.junit.jupiter.api.Assertions.assertEquals; public class JobQueueTest { private Logger log = LoggerFactory.getLogger(JobQueueTest.class); private static String bashScript = "#!/bin/bash\n" + "COUNTER=0\n" + "while [ $COUNTER -lt $1 ]; do\n" + " sleep 0.1\n" + " echo The counter is $COUNTER\n" + "while [ $COUNTER -lt $1 ]; do\n" + " if [ $COUNTER -eq $2 ]; then\n" + " echo Error on counter $COUNTER >&2\n" + " exit 2\n" + " fi\n" + " sleep 0.01\n" + " echo The counter is $COUNTER >&2\n" + " let COUNTER=COUNTER+1 \n" + "done"; "done\n" + "echo $COUNTER\n"; private static File file; @@ -34,7 +41,38 @@ @Test void test() { TestJob job = new TestJob(10, file); job.execute(); JobQueue queue = new JobQueue(); assertEquals(0, queue.getQueueSize()); queue.append(new TestJob(10, file)); assertEquals(1, queue.getQueueSize()); queue.append(new TestJob(5, 2, file)); assertEquals(2, queue.getQueueSize()); queue.append(new TestJob(10, file)); assertEquals(2, queue.getQueueSize()); TestJob job1 = (TestJob)queue.getQueuedJob(10); int counter = 100; while (job1.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) { try { Thread.sleep(10); } catch (InterruptedException ex) { log.error(ex.getMessage(), ex); } } assertEquals(AbstractJob.Status.RUNNING, job1.getStatus()); TestJob job2 = (TestJob)queue.getQueuedJob(5); assertEquals(AbstractJob.Status.QUEUED, job2.getStatus()); counter = 100; while (job2.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) { try { Thread.sleep(10); } catch (InterruptedException ex) { log.error(ex.getMessage(), ex); } } queue.append(new TestJob(10, file)); TestJob job3 = (TestJob)queue.getQueuedJob(10); assertEquals(AbstractJob.Status.QUEUED, job3.getStatus()); queue.waitForQueue(10); assertEquals(0, queue.getQueueSize()); } } borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -13,17 +13,30 @@ private Logger log = LoggerFactory.getLogger(TestJob.class); private int time; private File counterScript; private int failOn = -1; TestJob(int time, File counterScript) { this(time, -1, counterScript); } TestJob(int time, int failOn, File counterScript) { this.time = time; this.failOn = failOn; this.counterScript = counterScript; } @Override public Object getId() { return time; } @Override public void execute() { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream(); CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath()); cmdLine.addArgument(String.valueOf(this.time)); cmdLine.addArgument(String.valueOf(this.failOn)); DefaultExecutor executor = new DefaultExecutor(); ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); executor.setWatchdog(watchdog); @@ -38,6 +51,17 @@ log.error(ex.getMessage(), ex); } } }, new LogOutputStream() { @Override protected void processLine(String line, int logLevel) { log.error(line); try { errorOutputStream.write(line.getBytes()); errorOutputStream.write("\n".getBytes()); } catch (IOException ex) { log.error(ex.getMessage(), ex); } } }); executor.setStreamHandler(streamHandler); log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'..."); @@ -47,5 +71,6 @@ log.error("Error while executing script: " + ex.getMessage(), ex); } log.info(outputStream.toString(Charset.forName("UTF-8"))); log.error(errorOutputStream.toString(Charset.forName("UTF-8"))); } }