From 62b938d31692414f86c8f306b855e2f3bc7cf536 Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 11:16:30 +0000
Subject: [PATCH] Job queueing...
---
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java | 2
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java | 8 +++-
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java | 74 +++++++++++++-----------------------
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java | 7 ++-
4 files changed, 38 insertions(+), 53 deletions(-)
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
index 92515a6..9941110 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -6,7 +6,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractJob {
+import java.util.concurrent.Future;
+
+public abstract class AbstractJob<T> {
private Logger logger = LoggerFactory.getLogger(AbstractJob.class);
public enum Status {DONE, RUNNING, QUEUED, STOPPED, FAILED}
@Getter
@@ -23,6 +25,8 @@
@Setter
private String statusText;
+ private Future<T> future;
+
protected void failed() {
if (this.status != Status.RUNNING) {
logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status);
@@ -40,7 +44,7 @@
return false;
}
- public abstract void execute();
+ public abstract T execute();
/**
* A job is identified by this id. If a job with the same id is already queued (not yet finished), this job will
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
index 3107c56..72d301f 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -5,16 +5,16 @@
import org.slf4j.LoggerFactory;
import java.util.*;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-public class JobQueue {
+public class JobQueue<T> {
private static final int MAX_DONE_JOBS_SIZE = 50;
private Logger log = LoggerFactory.getLogger(JobQueue.class);
private List<AbstractJob> queue = new ArrayList<>();
private List<AbstractJob> doneJobs = new LinkedList<>();
private ExecutorService executorService = Executors.newSingleThreadExecutor();
- private Runner runner = new Runner();
public int getQueueSize() {
return queue.size();
@@ -38,8 +38,8 @@
}
}
queue.add(job.setStatus(AbstractJob.Status.QUEUED));
+ executorService.submit(new CallableTask(job));
}
- run();
}
public AbstractJob getQueuedJob(Object id) {
@@ -55,7 +55,6 @@
int counter = seconds / 10;
while (CollectionUtils.isNotEmpty(queue) && counter > 0) {
try {
- run(); // If not running!
Thread.sleep(100);
} catch (InterruptedException ex) {
log.error(ex.getMessage(), ex);
@@ -63,15 +62,6 @@
}
}
- private void run() {
- synchronized (executorService) {
- if (!runner.running) {
- log.info("Starting job executor...");
- executorService.submit(runner);
- }
- }
- }
-
private void organizeQueue() {
synchronized (queue) {
if (queue.isEmpty()) {
@@ -91,43 +81,33 @@
}
}
- private class Runner implements Runnable {
- private boolean running;
+ private class CallableTask implements Callable<T> {
+ private AbstractJob<T> job;
+
+ private CallableTask(AbstractJob<T> job) {
+ this.job = job;
+ }
@Override
- public void run() {
- running = true;
- while (true) {
- AbstractJob job = null;
- synchronized (queue) {
- organizeQueue();
- if (queue.isEmpty()) {
- running = false;
- return;
- }
- for (AbstractJob queuedJob : queue) {
- if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) {
- job = queuedJob;
- break;
- }
- }
+ public T call() throws Exception {
+ if (job.isStopRequested()) {
+ job.setStatus(AbstractJob.Status.STOPPED);
+ return null;
+ }
+ try {
+ log.info("Starting job: " + job.getId());
+ job.setStatus(AbstractJob.Status.RUNNING);
+ T result = job.execute();
+ if (!job.isFinished()) {
+ // Don't overwrite status failed set by job.
+ job.setStatus(AbstractJob.Status.DONE);
}
- if (job == null) {
- running = false;
- return;
- }
- try {
- log.info("Starting job: " + job.getId());
- job.setStatus(AbstractJob.Status.RUNNING);
- job.execute();
- if (!job.isFinished()) {
- // Don't overwrite status failed set by job.
- job.setStatus(AbstractJob.Status.DONE);
- }
- } catch (Exception ex) {
- log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
- job.setStatus(AbstractJob.Status.FAILED);
- }
+ organizeQueue();
+ return result;
+ } catch (Exception ex) {
+ log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
+ job.setStatus(AbstractJob.Status.FAILED);
+ return null;
}
}
}
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
index edd5e96..91ce40b 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -24,7 +24,7 @@
" echo Error on counter $COUNTER >&2\n" +
" exit 2\n" +
" fi\n" +
- " sleep 0.05\n" +
+ " sleep 0.1\n" +
" echo The counter is $COUNTER >&2\n" +
" let COUNTER=COUNTER+1 \n" +
"done\n" +
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
index 1755cb9..fe79067 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -7,8 +7,9 @@
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
-public class TestJob extends AbstractJob {
+public class TestJob extends AbstractJob<String> {
private Logger log = LoggerFactory.getLogger(TestJob.class);
private int time;
private File counterScript;
@@ -30,7 +31,7 @@
}
@Override
- public void execute() {
+ public String execute() {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath());
@@ -72,7 +73,7 @@
log.error("Error while executing script: " + ex.getMessage(), ex);
}
}
- //log.info(outputStream.toString(Charset.forName("UTF-8")));
+ return outputStream.toString(Charset.forName("UTF-8"));
//log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
}
}
--
Gitblit v1.10.0