From 072b27a0e1f0ec459f388f5b62a82cd42c2476db Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 10:19:14 +0000
Subject: [PATCH] Job queuing...
---
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java | 126 +++++++++++++++++++++++++++++++++++++-----
1 files changed, 111 insertions(+), 15 deletions(-)
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
index a47410c..4c6717f 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -1,27 +1,123 @@
package de.micromata.borgbutler.jobs;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JobQueue {
- private ConcurrentLinkedQueue<AbstractJob> queue = new ConcurrentLinkedQueue<>();
- private List<AbstractJob> done = new LinkedList<>();
- Executor executor = Executors.newSingleThreadExecutor();
+ private Logger log = LoggerFactory.getLogger(JobQueue.class);
+ private List<AbstractJob> queue = new ArrayList<>();
+ private List<AbstractJob> doneJobs = new LinkedList<>();
+ private ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private Runner runner = new Runner();
- public AbstractJob appendOrJoin(AbstractJob job) {
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ /**
+ * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
+ *
+ * @param job
+ */
+ public void append(AbstractJob job) {
synchronized (queue) {
- if (queue.contains(job)) {
- for (AbstractJob queuedJob : queue) {
- if (queuedJob.equals(job)) {
- return queuedJob;
- }
+ for (AbstractJob queuedJob : queue) {
+ if (Objects.equals(queuedJob.getId(), job.getId())) {
+ log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
+ return;
}
}
- queue.add(job);
- return job;
+ queue.add(job.setStatus(AbstractJob.Status.QUEUED));
+ }
+ run();
+ }
+
+ public AbstractJob getQueuedJob(Object id) {
+ for (AbstractJob job : queue) {
+ if (Objects.equals(job.getId(), id)) {
+ return job;
+ }
+ }
+ return null;
+ }
+
+ void waitForQueue(int seconds) {
+ int counter = seconds / 10;
+ while (CollectionUtils.isNotEmpty(queue) && counter > 0) {
+ try {
+ run(); // If not running!
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ }
+
+ private void run() {
+ synchronized (executorService) {
+ if (!runner.running) {
+ log.info("Starting job executor...");
+ executorService.submit(runner);
+ }
+ }
+ }
+
+ private void organizeQueue() {
+ synchronized (queue) {
+ if (queue.isEmpty()) {
+ return;
+ }
+ Iterator<AbstractJob> it = queue.iterator();
+ while (it.hasNext()) {
+ AbstractJob job = it.next();
+ if (job.isFinished()) {
+ it.remove();
+ doneJobs.add(0, job);
+ }
+ }
+ }
+ }
+
+ private class Runner implements Runnable {
+ private boolean running;
+
+ @Override
+ public void run() {
+ running = true;
+ while (true) {
+ AbstractJob job = null;
+ synchronized (queue) {
+ organizeQueue();
+ if (queue.isEmpty()) {
+ running = false;
+ return;
+ }
+ for (AbstractJob queuedJob : queue) {
+ if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) {
+ job = queuedJob;
+ break;
+ }
+ }
+ }
+ if (job == null) {
+ running = false;
+ return;
+ }
+ try {
+ log.info("Starting job: " + job.getId());
+ job.setStatus(AbstractJob.Status.RUNNING);
+ job.execute();
+ job.setStatus(AbstractJob.Status.DONE);
+ } catch (Exception ex) {
+ log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
+ job.setStatus(AbstractJob.Status.FAILED);
+ }
+ }
}
}
}
--
Gitblit v1.10.0