From 62b938d31692414f86c8f306b855e2f3bc7cf536 Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 11:16:30 +0000
Subject: [PATCH] Job queueing...

---
 borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java |    2 
 borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java  |    8 +++-
 borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java     |   74 +++++++++++++-----------------------
 borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java      |    7 ++-
 4 files changed, 38 insertions(+), 53 deletions(-)

diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
index 92515a6..9941110 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -6,7 +6,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractJob {
+import java.util.concurrent.Future;
+
+public abstract class AbstractJob<T> {
     private Logger logger = LoggerFactory.getLogger(AbstractJob.class);
     public enum Status {DONE, RUNNING, QUEUED, STOPPED, FAILED}
     @Getter
@@ -23,6 +25,8 @@
     @Setter
     private String statusText;
 
+    private Future<T> future;
+
     protected void failed() {
         if (this.status != Status.RUNNING) {
             logger.error("Internal error, illegal state! You shouldn't set the job status to FAILED if not in status RUNNING: " + this.status);
@@ -40,7 +44,7 @@
         return false;
     }
 
-    public abstract void execute();
+    public abstract T execute();
 
     /**
      * A job is identified by this id. If a job with the same id is already queued (not yet finished), this job will
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
index 3107c56..72d301f 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -5,16 +5,16 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-public class JobQueue {
+public class JobQueue<T> {
     private static final int MAX_DONE_JOBS_SIZE = 50;
     private Logger log = LoggerFactory.getLogger(JobQueue.class);
     private List<AbstractJob> queue = new ArrayList<>();
     private List<AbstractJob> doneJobs = new LinkedList<>();
     private ExecutorService executorService = Executors.newSingleThreadExecutor();
-    private Runner runner = new Runner();
 
     public int getQueueSize() {
         return queue.size();
@@ -38,8 +38,8 @@
                 }
             }
             queue.add(job.setStatus(AbstractJob.Status.QUEUED));
+            executorService.submit(new CallableTask(job));
         }
-        run();
     }
 
     public AbstractJob getQueuedJob(Object id) {
@@ -55,7 +55,6 @@
         int counter = seconds / 10;
         while (CollectionUtils.isNotEmpty(queue) && counter > 0) {
             try {
-                run(); // If not running!
                 Thread.sleep(100);
             } catch (InterruptedException ex) {
                 log.error(ex.getMessage(), ex);
@@ -63,15 +62,6 @@
         }
     }
 
-    private void run() {
-        synchronized (executorService) {
-            if (!runner.running) {
-                log.info("Starting job executor...");
-                executorService.submit(runner);
-            }
-        }
-    }
-
     private void organizeQueue() {
         synchronized (queue) {
             if (queue.isEmpty()) {
@@ -91,43 +81,33 @@
         }
     }
 
-    private class Runner implements Runnable {
-        private boolean running;
+    private class CallableTask implements Callable<T> {
+        private AbstractJob<T> job;
+
+        private CallableTask(AbstractJob<T> job) {
+            this.job = job;
+        }
 
         @Override
-        public void run() {
-            running = true;
-            while (true) {
-                AbstractJob job = null;
-                synchronized (queue) {
-                    organizeQueue();
-                    if (queue.isEmpty()) {
-                        running = false;
-                        return;
-                    }
-                    for (AbstractJob queuedJob : queue) {
-                        if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) {
-                            job = queuedJob;
-                            break;
-                        }
-                    }
+        public T call() throws Exception {
+            if (job.isStopRequested()) {
+                job.setStatus(AbstractJob.Status.STOPPED);
+                return null;
+            }
+            try {
+                log.info("Starting job: " + job.getId());
+                job.setStatus(AbstractJob.Status.RUNNING);
+                T result = job.execute();
+                if (!job.isFinished()) {
+                    // Don't overwrite status failed set by job.
+                    job.setStatus(AbstractJob.Status.DONE);
                 }
-                if (job == null) {
-                    running = false;
-                    return;
-                }
-                try {
-                    log.info("Starting job: " + job.getId());
-                    job.setStatus(AbstractJob.Status.RUNNING);
-                    job.execute();
-                    if (!job.isFinished()) {
-                        // Don't overwrite status failed set by job.
-                        job.setStatus(AbstractJob.Status.DONE);
-                    }
-                } catch (Exception ex) {
-                    log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
-                    job.setStatus(AbstractJob.Status.FAILED);
-                }
+                organizeQueue();
+                return result;
+            } catch (Exception ex) {
+                log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
+                job.setStatus(AbstractJob.Status.FAILED);
+                return null;
             }
         }
     }
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
index edd5e96..91ce40b 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -24,7 +24,7 @@
             "    echo Error on counter $COUNTER >&2\n" +
             "    exit 2\n" +
             "  fi\n" +
-            "  sleep 0.05\n" +
+            "  sleep 0.1\n" +
             "  echo The counter is $COUNTER >&2\n" +
             "  let COUNTER=COUNTER+1 \n" +
             "done\n" +
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
index 1755cb9..fe79067 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -7,8 +7,9 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 
-public class TestJob extends AbstractJob {
+public class TestJob extends AbstractJob<String> {
     private Logger log = LoggerFactory.getLogger(TestJob.class);
     private int time;
     private File counterScript;
@@ -30,7 +31,7 @@
     }
 
     @Override
-    public void execute() {
+    public String execute() {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
         CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath());
@@ -72,7 +73,7 @@
                 log.error("Error while executing script: " + ex.getMessage(), ex);
             }
         }
-        //log.info(outputStream.toString(Charset.forName("UTF-8")));
+        return outputStream.toString(Charset.forName("UTF-8"));
         //log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
     }
 }

--
Gitblit v1.10.0