From ea69c28b8aa40b0de84e3ec52941d08ae9ef6cef Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 10:42:03 +0000
Subject: [PATCH] Job queueing...
---
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java | 30 ++++++++++-----
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java | 12 ++++-
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java | 13 ++++++
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java | 14 ++++---
4 files changed, 49 insertions(+), 20 deletions(-)
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
index 743c175..92515a6 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -3,8 +3,11 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractJob {
+ private Logger logger = LoggerFactory.getLogger(AbstractJob.class);
public enum Status {DONE, RUNNING, QUEUED, STOPPED, FAILED}
@Getter
@Setter
@@ -19,10 +22,13 @@
@Getter
@Setter
private String statusText;
- @Getter
- @Setter
- private String log;
+ protected void failed() {
+ if (this.status != Status.RUNNING) {
+ logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status);
+ }
+ this.status = Status.FAILED;
+ }
/**
*
* @return true, if the job is done, stopped or failed. Otherwise false (if job is running or queued).
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
index 4c6717f..3107c56 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -9,6 +9,7 @@
import java.util.concurrent.Executors;
public class JobQueue {
+ private static final int MAX_DONE_JOBS_SIZE = 50;
private Logger log = LoggerFactory.getLogger(JobQueue.class);
private List<AbstractJob> queue = new ArrayList<>();
private List<AbstractJob> doneJobs = new LinkedList<>();
@@ -19,6 +20,10 @@
return queue.size();
}
+ public List<AbstractJob> getDoneJobs() {
+ return Collections.unmodifiableList(doneJobs);
+ }
+
/**
* Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
*
@@ -79,6 +84,9 @@
it.remove();
doneJobs.add(0, job);
}
+ while (doneJobs.size() > MAX_DONE_JOBS_SIZE) {
+ doneJobs.remove(doneJobs.size() - 1);
+ }
}
}
}
@@ -112,7 +120,10 @@
log.info("Starting job: " + job.getId());
job.setStatus(AbstractJob.Status.RUNNING);
job.execute();
- job.setStatus(AbstractJob.Status.DONE);
+ if (!job.isFinished()) {
+ // Don't overwrite status failed set by job.
+ job.setStatus(AbstractJob.Status.DONE);
+ }
} catch (Exception ex) {
log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
job.setStatus(AbstractJob.Status.FAILED);
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
index 230b0c9..edd5e96 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -11,6 +11,7 @@
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -23,7 +24,7 @@
" echo Error on counter $COUNTER >&2\n" +
" exit 2\n" +
" fi\n" +
- " sleep 0.01\n" +
+ " sleep 0.05\n" +
" echo The counter is $COUNTER >&2\n" +
" let COUNTER=COUNTER+1 \n" +
"done\n" +
@@ -40,7 +41,7 @@
}
@Test
- void test() {
+ void queueTest() {
JobQueue queue = new JobQueue();
assertEquals(0, queue.getQueueSize());
queue.append(new TestJob(10, file));
@@ -49,20 +50,20 @@
assertEquals(2, queue.getQueueSize());
queue.append(new TestJob(10, file));
assertEquals(2, queue.getQueueSize());
- TestJob job1 = (TestJob)queue.getQueuedJob(10);
+ TestJob job = (TestJob) queue.getQueuedJob(10);
int counter = 100;
- while (job1.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
+ while (job.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
log.error(ex.getMessage(), ex);
}
}
- assertEquals(AbstractJob.Status.RUNNING, job1.getStatus());
- TestJob job2 = (TestJob)queue.getQueuedJob(5);
- assertEquals(AbstractJob.Status.QUEUED, job2.getStatus());
+ assertEquals(AbstractJob.Status.RUNNING, job.getStatus());
+ job = (TestJob) queue.getQueuedJob(5);
+ assertEquals(AbstractJob.Status.QUEUED, job.getStatus());
counter = 100;
- while (job2.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
+ while (job.getStatus() != AbstractJob.Status.RUNNING && counter-- > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
@@ -70,9 +71,18 @@
}
}
queue.append(new TestJob(10, file));
- TestJob job3 = (TestJob)queue.getQueuedJob(10);
- assertEquals(AbstractJob.Status.QUEUED, job3.getStatus());
+ job = (TestJob) queue.getQueuedJob(10);
+ assertEquals(AbstractJob.Status.QUEUED, job.getStatus());
queue.waitForQueue(10);
assertEquals(0, queue.getQueueSize());
+ List<AbstractJob> doneJobs = queue.getDoneJobs();
+ assertEquals(3, doneJobs.size());
+ check(((TestJob)doneJobs.get(0)), AbstractJob.Status.DONE, "10");
+ check(((TestJob)doneJobs.get(1)), AbstractJob.Status.FAILED, "10");
+ check(((TestJob)doneJobs.get(2)), AbstractJob.Status.DONE, "10");
+ }
+
+ private void check(TestJob job, AbstractJob.Status status, String result) {
+ assertEquals(status, job.getStatus());
}
}
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
index e253f47..1755cb9 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -7,7 +7,6 @@
import java.io.File;
import java.io.IOException;
-import java.nio.charset.Charset;
public class TestJob extends AbstractJob {
private Logger log = LoggerFactory.getLogger(TestJob.class);
@@ -43,7 +42,7 @@
PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
@Override
protected void processLine(String line, int level) {
- log.info(line);
+ //log.info(line);
try {
outputStream.write(line.getBytes());
outputStream.write("\n".getBytes());
@@ -54,7 +53,7 @@
}, new LogOutputStream() {
@Override
protected void processLine(String line, int logLevel) {
- log.error(line);
+ //log.error(line);
try {
errorOutputStream.write(line.getBytes());
errorOutputStream.write("\n".getBytes());
@@ -68,9 +67,12 @@
try {
executor.execute(cmdLine);
} catch (Exception ex) {
- log.error("Error while executing script: " + ex.getMessage(), ex);
+ failed();
+ if (failOn < 0) {
+ log.error("Error while executing script: " + ex.getMessage(), ex);
+ }
}
- log.info(outputStream.toString(Charset.forName("UTF-8")));
- log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
+ //log.info(outputStream.toString(Charset.forName("UTF-8")));
+ //log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
}
}
--
Gitblit v1.10.0