From 072b27a0e1f0ec459f388f5b62a82cd42c2476db Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 10:19:14 +0000
Subject: [PATCH] Job queuing...

---
 borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java |  126 +++++++++++++++++++++++++++++++++++++-----
 1 files changed, 111 insertions(+), 15 deletions(-)

diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
index a47410c..4c6717f 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -1,27 +1,123 @@
 package de.micromata.borgbutler.jobs;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class JobQueue {
-    private ConcurrentLinkedQueue<AbstractJob> queue = new ConcurrentLinkedQueue<>();
-    private List<AbstractJob> done = new LinkedList<>();
-    Executor executor = Executors.newSingleThreadExecutor();
+    private Logger log = LoggerFactory.getLogger(JobQueue.class);
+    private List<AbstractJob> queue = new ArrayList<>();
+    private List<AbstractJob> doneJobs = new LinkedList<>();
+    private ExecutorService executorService = Executors.newSingleThreadExecutor();
+    private Runner runner = new Runner();
 
-    public AbstractJob appendOrJoin(AbstractJob job) {
+    public int getQueueSize() {
+        return queue.size();
+    }
+
+    /**
+     * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
+     *
+     * @param job
+     */
+    public void append(AbstractJob job) {
         synchronized (queue) {
-            if (queue.contains(job)) {
-                for (AbstractJob queuedJob : queue) {
-                    if (queuedJob.equals(job)) {
-                        return queuedJob;
-                    }
+            for (AbstractJob queuedJob : queue) {
+                if (Objects.equals(queuedJob.getId(), job.getId())) {
+                    log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
+                    return;
                 }
             }
-            queue.add(job);
-            return job;
+            queue.add(job.setStatus(AbstractJob.Status.QUEUED));
+        }
+        run();
+    }
+
+    public AbstractJob getQueuedJob(Object id) {
+        for (AbstractJob job : queue) {
+            if (Objects.equals(job.getId(), id)) {
+                return job;
+            }
+        }
+        return null;
+    }
+
+    void waitForQueue(int seconds) {
+        int counter = seconds / 10;
+        while (CollectionUtils.isNotEmpty(queue) && counter > 0) {
+            try {
+                run(); // If not running!
+                Thread.sleep(100);
+            } catch (InterruptedException ex) {
+                log.error(ex.getMessage(), ex);
+            }
+        }
+    }
+
+    private void run() {
+        synchronized (executorService) {
+            if (!runner.running) {
+                log.info("Starting job executor...");
+                executorService.submit(runner);
+            }
+        }
+    }
+
+    private void organizeQueue() {
+        synchronized (queue) {
+            if (queue.isEmpty()) {
+                return;
+            }
+            Iterator<AbstractJob> it = queue.iterator();
+            while (it.hasNext()) {
+                AbstractJob job = it.next();
+                if (job.isFinished()) {
+                    it.remove();
+                    doneJobs.add(0, job);
+                }
+            }
+        }
+    }
+
+    private class Runner implements Runnable {
+        private boolean running;
+
+        @Override
+        public void run() {
+            running = true;
+            while (true) {
+                AbstractJob job = null;
+                synchronized (queue) {
+                    organizeQueue();
+                    if (queue.isEmpty()) {
+                        running = false;
+                        return;
+                    }
+                    for (AbstractJob queuedJob : queue) {
+                        if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) {
+                            job = queuedJob;
+                            break;
+                        }
+                    }
+                }
+                if (job == null) {
+                    running = false;
+                    return;
+                }
+                try {
+                    log.info("Starting job: " + job.getId());
+                    job.setStatus(AbstractJob.Status.RUNNING);
+                    job.execute();
+                    job.setStatus(AbstractJob.Status.DONE);
+                } catch (Exception ex) {
+                    log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
+                    job.setStatus(AbstractJob.Status.FAILED);
+                }
+            }
         }
     }
 }

--
Gitblit v1.10.0