From f9f5572180ea5a2b47201933895dfcd10077aa4f Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 07:51:47 +0000
Subject: [PATCH] Job queueing...
---
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java | 40 +++++++++++++
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java | 8 +-
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java | 30 ++++++++++
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java | 27 +++++++++
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java | 51 +++++++++++++++++
5 files changed, 151 insertions(+), 5 deletions(-)
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
index 81124d7..d97b295 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
@@ -1,11 +1,11 @@
package de.micromata.borgbutler;
import de.micromata.borgbutler.config.BorgRepoConfig;
-import de.micromata.borgbutler.config.Configuration;
import de.micromata.borgbutler.config.ConfigurationHandler;
import de.micromata.borgbutler.config.Definitions;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
+import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
/**
* A queue is important because Borg doesn't support parallel calls for one repository.
@@ -80,8 +79,8 @@
executor.setWorkingDirectory(command.getWorkingDir());
}
//executor.setExitValue(2);
- //ExecuteWatchdog watchdog = new ExecuteWatchdog(60000);
- //executor.setWatchdog(watchdog);
+ ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+ executor.setWatchdog(watchdog);
// ExecuteResultHandler handler = new DefaultExecuteResultHandler();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
executor.setStreamHandler(streamHandler);
@@ -101,7 +100,6 @@
if (command.getResultStatus() == BorgCommand.ResultStatus.ERROR) {
log.error("Response: " + command.getAbbreviatedResponse());
}
-
}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
new file mode 100644
index 0000000..e1cd523
--- /dev/null
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractJob.java
@@ -0,0 +1,30 @@
+package de.micromata.borgbutler.jobs;
+
+import lombok.Getter;
+import lombok.Setter;
+
+public abstract class AbstractJob {
+ public enum Status {DONE, RUNNING, QUEUED, STOPPED, FAILED}
+ @Getter
+ @Setter
+ private boolean stopRequested;
+
+ @Getter
+ private Status status;
+ @Getter
+ @Setter
+ private String title;
+ @Getter
+ @Setter
+ private String statusText;
+ @Getter
+ @Setter
+ private String log;
+
+ protected void stopped() {
+ this.status = Status.STOPPED;
+ }
+
+ public abstract void execute() throws InterruptedException;
+
+}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
new file mode 100644
index 0000000..a47410c
--- /dev/null
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -0,0 +1,27 @@
+package de.micromata.borgbutler.jobs;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class JobQueue {
+ private ConcurrentLinkedQueue<AbstractJob> queue = new ConcurrentLinkedQueue<>();
+ private List<AbstractJob> done = new LinkedList<>();
+ Executor executor = Executors.newSingleThreadExecutor();
+
+ public AbstractJob appendOrJoin(AbstractJob job) {
+ synchronized (queue) {
+ if (queue.contains(job)) {
+ for (AbstractJob queuedJob : queue) {
+ if (queuedJob.equals(job)) {
+ return queuedJob;
+ }
+ }
+ }
+ queue.add(job);
+ return job;
+ }
+ }
+}
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
new file mode 100644
index 0000000..63fa15a
--- /dev/null
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/JobQueueTest.java
@@ -0,0 +1,40 @@
+package de.micromata.borgbutler.jobs;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermissions;
+
+public class JobQueueTest {
+ private Logger log = LoggerFactory.getLogger(JobQueueTest.class);
+ private static String bashScript = "#!/bin/bash\n" +
+ "COUNTER=0\n" +
+ "while [ $COUNTER -lt $1 ]; do\n" +
+ " sleep 0.1\n" +
+ " echo The counter is $COUNTER\n" +
+ " let COUNTER=COUNTER+1 \n" +
+ "done";
+
+ private static File file;
+
+ @BeforeAll
+ static void createScript() throws IOException {
+ file = File.createTempFile("counter", ".sh");
+ file.deleteOnExit();
+ FileUtils.write(file, bashScript, Charset.forName("UTF-8"));
+ Files.setPosixFilePermissions(file.toPath(), PosixFilePermissions.fromString("rwxr-xr-x"));
+ }
+
+ @Test
+ void test() {
+ TestJob job = new TestJob(10, file);
+ job.execute();
+ }
+}
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
new file mode 100644
index 0000000..60e6e1e
--- /dev/null
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -0,0 +1,51 @@
+package de.micromata.borgbutler.jobs;
+
+import org.apache.commons.exec.*;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+public class TestJob extends AbstractJob {
+ private Logger log = LoggerFactory.getLogger(TestJob.class);
+ private int time;
+ private File counterScript;
+
+ TestJob(int time, File counterScript) {
+ this.time = time;
+ this.counterScript = counterScript;
+ }
+
+ @Override
+ public void execute() {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath());
+ cmdLine.addArgument(String.valueOf(this.time));
+ DefaultExecutor executor = new DefaultExecutor();
+ ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+ executor.setWatchdog(watchdog);
+ PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
+ @Override
+ protected void processLine(String line, int level) {
+ log.info(line);
+ try {
+ outputStream.write(line.getBytes());
+ outputStream.write("\n".getBytes());
+ } catch (IOException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ });
+ executor.setStreamHandler(streamHandler);
+ log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'...");
+ try {
+ executor.execute(cmdLine);
+ } catch (Exception ex) {
+ log.error("Error while executing script: " + ex.getMessage(), ex);
+ }
+ log.info(outputStream.toString(Charset.forName("UTF-8")));
+ }
+}
--
Gitblit v1.10.0