From 072b27a0e1f0ec459f388f5b62a82cd42c2476db Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 10:19:14 +0000
Subject: [PATCH] Job queuing...
---
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java | 50 +++++++++++-
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java | 21 ++++
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java | 126 +++++++++++++++++++++++++++---
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java | 25 ++++++
4 files changed, 198 insertions(+), 24 deletions(-)
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
index e1cd523..743c175 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -1,5 +1,6 @@
package de.micromata.borgbutler.jobs;
+import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
@@ -10,6 +11,7 @@
private boolean stopRequested;
@Getter
+ @Setter(AccessLevel.PACKAGE)
private Status status;
@Getter
@Setter
@@ -21,10 +23,23 @@
@Setter
private String log;
- protected void stopped() {
- this.status = Status.STOPPED;
+ /**
+ *
+ * @return true, if the job is done, stopped or failed. Otherwise false (if job is running or queued).
+ */
+ public boolean isFinished() {
+ if (status == Status.DONE || status == Status.STOPPED || status == Status.FAILED) {
+ return true;
+ }
+ return false;
}
- public abstract void execute() throws InterruptedException;
+ public abstract void execute();
+ /**
+ * A job is identified by this id. If a job with the same id is already queued (not yet finished), this job will
+ * not be added twice.
+ * @return
+ */
+ public abstract Object getId();
}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
index a47410c..4c6717f 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -1,27 +1,123 @@
package de.micromata.borgbutler.jobs;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JobQueue {
- private ConcurrentLinkedQueue<AbstractJob> queue = new ConcurrentLinkedQueue<>();
- private List<AbstractJob> done = new LinkedList<>();
- Executor executor = Executors.newSingleThreadExecutor();
+ private Logger log = LoggerFactory.getLogger(JobQueue.class);
+ private List<AbstractJob> queue = new ArrayList<>();
+ private List<AbstractJob> doneJobs = new LinkedList<>();
+ private ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private Runner runner = new Runner();
- public AbstractJob appendOrJoin(AbstractJob job) {
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ /**
+ * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
+ *
+ * @param job
+ */
+ public void append(AbstractJob job) {
synchronized (queue) {
- if (queue.contains(job)) {
- for (AbstractJob queuedJob : queue) {
- if (queuedJob.equals(job)) {
- return queuedJob;
- }
+ for (AbstractJob queuedJob : queue) {
+ if (Objects.equals(queuedJob.getId(), job.getId())) {
+ log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
+ return;
}
}
- queue.add(job);
- return job;
+ queue.add(job.setStatus(AbstractJob.Status.QUEUED));
+ }
+ run();
+ }
+
+ public AbstractJob getQueuedJob(Object id) {
+ for (AbstractJob job : queue) {
+ if (Objects.equals(job.getId(), id)) {
+ return job;
+ }
+ }
+ return null;
+ }
+
+ void waitForQueue(int seconds) {
+ int counter = seconds / 10;
+ while (CollectionUtils.isNotEmpty(queue) && counter > 0) {
+ try {
+ run(); // If not running!
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ }
+
+ private void run() {
+ synchronized (executorService) {
+ if (!runner.running) {
+ log.info("Starting job executor...");
+ executorService.submit(runner);
+ }
+ }
+ }
+
+ private void organizeQueue() {
+ synchronized (queue) {
+ if (queue.isEmpty()) {
+ return;
+ }
+ Iterator<AbstractJob> it = queue.iterator();
+ while (it.hasNext()) {
+ AbstractJob job = it.next();
+ if (job.isFinished()) {
+ it.remove();
+ doneJobs.add(0, job);
+ }
+ }
+ }
+ }
+
+ private class Runner implements Runnable {
+ private boolean running;
+
+ @Override
+ public void run() {
+ running = true;
+ while (true) {
+ AbstractJob job = null;
+ synchronized (queue) {
+ organizeQueue();
+ if (queue.isEmpty()) {
+ running = false;
+ return;
+ }
+ for (AbstractJob queuedJob : queue) {
+ if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) {
+ job = queuedJob;
+ break;
+ }
+ }
+ }
+ if (job == null) {
+ running = false;
+ return;
+ }
+ try {
+ log.info("Starting job: " + job.getId());
+ job.setStatus(AbstractJob.Status.RUNNING);
+ job.execute();
+ job.setStatus(AbstractJob.Status.DONE);
+ } catch (Exception ex) {
+ log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
+ job.setStatus(AbstractJob.Status.FAILED);
+ }
+ }
}
}
}
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
index 63fa15a..230b0c9 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -12,15 +12,22 @@
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class JobQueueTest {
private Logger log = LoggerFactory.getLogger(JobQueueTest.class);
private static String bashScript = "#!/bin/bash\n" +
"COUNTER=0\n" +
- "while [ $COUNTER -lt $1 ]; do\n" +
- " sleep 0.1\n" +
- " echo The counter is $COUNTER\n" +
+ "while [ $COUNTER -lt $1 ]; do\n" +
+ " if [ $COUNTER -eq $2 ]; then\n" +
+ " echo Error on counter $COUNTER >&2\n" +
+ " exit 2\n" +
+ " fi\n" +
+ " sleep 0.01\n" +
+ " echo The counter is $COUNTER >&2\n" +
" let COUNTER=COUNTER+1 \n" +
- "done";
+ "done\n" +
+ "echo $COUNTER\n";
private static File file;
@@ -34,7 +41,38 @@
@Test
void test() {
- TestJob job = new TestJob(10, file);
- job.execute();
+ JobQueue queue = new JobQueue();
+ assertEquals(0, queue.getQueueSize());
+ queue.append(new TestJob(10, file));
+ assertEquals(1, queue.getQueueSize());
+ queue.append(new TestJob(5, 2, file));
+ assertEquals(2, queue.getQueueSize());
+ queue.append(new TestJob(10, file));
+ assertEquals(2, queue.getQueueSize());
+ TestJob job1 = (TestJob)queue.getQueuedJob(10);
+ int counter = 100;
+ while (job1.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ assertEquals(AbstractJob.Status.RUNNING, job1.getStatus());
+ TestJob job2 = (TestJob)queue.getQueuedJob(5);
+ assertEquals(AbstractJob.Status.QUEUED, job2.getStatus());
+ counter = 100;
+ while (job2.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ queue.append(new TestJob(10, file));
+ TestJob job3 = (TestJob)queue.getQueuedJob(10);
+ assertEquals(AbstractJob.Status.QUEUED, job3.getStatus());
+ queue.waitForQueue(10);
+ assertEquals(0, queue.getQueueSize());
}
}
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
index 60e6e1e..e253f47 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -13,17 +13,30 @@
private Logger log = LoggerFactory.getLogger(TestJob.class);
private int time;
private File counterScript;
+ private int failOn = -1;
TestJob(int time, File counterScript) {
+ this(time, -1, counterScript);
+ }
+
+ TestJob(int time, int failOn, File counterScript) {
this.time = time;
+ this.failOn = failOn;
this.counterScript = counterScript;
}
@Override
+ public Object getId() {
+ return time;
+ }
+
+ @Override
public void execute() {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath());
cmdLine.addArgument(String.valueOf(this.time));
+ cmdLine.addArgument(String.valueOf(this.failOn));
DefaultExecutor executor = new DefaultExecutor();
ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
@@ -38,6 +51,17 @@
log.error(ex.getMessage(), ex);
}
}
+ }, new LogOutputStream() {
+ @Override
+ protected void processLine(String line, int logLevel) {
+ log.error(line);
+ try {
+ errorOutputStream.write(line.getBytes());
+ errorOutputStream.write("\n".getBytes());
+ } catch (IOException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
});
executor.setStreamHandler(streamHandler);
log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'...");
@@ -47,5 +71,6 @@
log.error("Error while executing script: " + ex.getMessage(), ex);
}
log.info(outputStream.toString(Charset.forName("UTF-8")));
+ log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
}
}
--
Gitblit v1.10.0