From de7d8d4cfbf9fe9fa6dcc7fd054301f52e47f9eb Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 22:27:48 +0000
Subject: [PATCH] Job queueing...
---
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java | 131 ++++++++++++++++++++++++++++++++
borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java | 75 ++----------------
2 files changed, 141 insertions(+), 65 deletions(-)
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
new file mode 100644
index 0000000..e4cc3ac
--- /dev/null
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
@@ -0,0 +1,131 @@
+package de.micromata.borgbutler.jobs;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.commons.exec.*;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * A queue is important because Borg doesn't support parallel calls for one repository.
+ * For each repository one single queue is allocated.
+ */
+public abstract class AbstractCommandLineJob extends AbstractJob<String> {
+ private Logger log = LoggerFactory.getLogger(AbstractCommandLineJob.class);
+ private ExecuteWatchdog watchdog;
+ @Getter
+ private boolean executeStarted;
+ private CommandLine commandLine;
+ /**
+ * The command line as string. This property is also used as ID for detecting multiple borg calls.
+ */
+ private String commandLineAsString;
+ @Setter
+ private File workingDirectory;
+ @Setter
+ private String description;
+
+ protected abstract CommandLine buildCommandLine();
+
+ @Override
+ public Object getId() {
+ return getCommandLineAsString();
+ }
+
+ public String getCommandLineAsString() {
+ if (commandLine == null) {
+ commandLine = buildCommandLine();
+ }
+ if (commandLineAsString == null) {
+ commandLineAsString = commandLine.getExecutable() + " " + StringUtils.join(commandLine.getArguments(), " ");
+ }
+ return commandLineAsString;
+ }
+
+ @Override
+ public String execute() {
+ getCommandLineAsString();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
+ DefaultExecutor executor = new DefaultExecutor();
+ if (workingDirectory != null) {
+ executor.setWorkingDirectory(workingDirectory);
+ }
+ //executor.setExitValue(2);
+ this.watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+ executor.setWatchdog(watchdog);
+ // ExecuteResultHandler handler = new DefaultExecuteResultHandler();
+ PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
+ @Override
+ protected void processLine(String line, int level) {
+ //log.info(line);
+ try {
+ outputStream.write(line.getBytes());
+ outputStream.write("\n".getBytes());
+ } catch (IOException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ }, new LogOutputStream() {
+ @Override
+ protected void processLine(String line, int logLevel) {
+ //log.error(line);
+ try {
+ errorOutputStream.write(line.getBytes());
+ errorOutputStream.write("\n".getBytes());
+ } catch (IOException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ });
+ executor.setStreamHandler(streamHandler);
+
+ if (StringUtils.isNotBlank(this.description)) {
+ log.info(description);
+ }
+ log.info("Executing '" + commandLineAsString + "'...");
+ this.executeStarted = true;
+ try {
+ executor.execute(commandLine, getEnvironment());
+ afterSuccess();
+ } catch (Exception ex) {
+ failed();
+ afterFailure(ex);
+ }
+ return outputStream.toString(Charset.forName("UTF-8"));
+ }
+
+ @Override
+ protected void cancelRunningProcess() {
+ if (watchdog != null) {
+ log.info("Cancelling job: " + getId());
+ watchdog.destroyProcess();
+ watchdog = null;
+ setCancelled();
+ }
+ }
+
+ protected void afterSuccess() {
+ }
+
+ protected void afterFailure(Exception ex) {
+ }
+
+ protected Map<String, String> getEnvironment() throws IOException {
+ return null;
+ }
+
+ protected void addEnvironmentVariable(Map<String, String> env, String name, String value) {
+ if (StringUtils.isNotBlank(value)) {
+ EnvironmentUtils.addVariableToEnvironment(env, name + "=" + value);
+ }
+ }
+}
diff --git a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
index 8c2a163..ed9b98b 100644
--- a/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
+++ b/borgbutler-core/src/test/java/de/micromata/borgbutler/jobs/TestJob.java
@@ -1,23 +1,16 @@
package de.micromata.borgbutler.jobs;
-import lombok.Getter;
-import org.apache.commons.exec.*;
-import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.exec.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-public class TestJob extends AbstractJob<String> {
+public class TestJob extends AbstractCommandLineJob {
private Logger log = LoggerFactory.getLogger(TestJob.class);
private int time;
private File counterScript;
private int failOn = -1;
- private ExecuteWatchdog watchdog;
- @Getter
- private boolean executeStarted;
TestJob(int time, File counterScript) {
this(time, -1, counterScript);
@@ -35,65 +28,17 @@
}
@Override
- protected void cancelRunningProcess() {
- log.info("CancelRunningProcess: " + watchdog + ", " + getStatus());
- if (watchdog != null) {
- log.info("Cancelling job: " + getId());
- watchdog.destroyProcess();
- watchdog = null;
- setCancelled();
- }
+ protected CommandLine buildCommandLine() {
+ CommandLine commandLine = new CommandLine(counterScript.getAbsolutePath());
+ commandLine.addArgument(String.valueOf(this.time));
+ commandLine.addArgument(String.valueOf(this.failOn));
+ return commandLine;
}
@Override
- public String execute() {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
- CommandLine cmdLine = new CommandLine(counterScript.getAbsolutePath());
- cmdLine.addArgument(String.valueOf(this.time));
- cmdLine.addArgument(String.valueOf(this.failOn));
- DefaultExecutor executor = new DefaultExecutor();
- watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
- executor.setWatchdog(watchdog);
- PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
- @Override
- protected void processLine(String line, int level) {
- //log.info(line);
- try {
- outputStream.write(line.getBytes());
- outputStream.write("\n".getBytes());
- } catch (IOException ex) {
- log.error(ex.getMessage(), ex);
- }
- }
- }, new LogOutputStream() {
- @Override
- protected void processLine(String line, int logLevel) {
- //log.error(line);
- try {
- errorOutputStream.write(line.getBytes());
- errorOutputStream.write("\n".getBytes());
- } catch (IOException ex) {
- log.error(ex.getMessage(), ex);
- }
- }
- });
- executor.setStreamHandler(streamHandler);
- if (isCancelledRequested()) {
- setCancelled();
- return null;
+ protected void afterFailure(Exception ex) {
+ if (failOn < 0 && getStatus() != Status.CANCELLED) {
+ log.error("Error while executing script '" + getCommandLineAsString() + "': " + ex.getMessage(), ex);
}
- log.info("Executing '" + counterScript.getAbsolutePath() + " " + this.time + "'...");
- executeStarted = true;
- try {
- executor.execute(cmdLine);
- } catch (Exception ex) {
- failed();
- if (failOn < 0 && getStatus() != Status.CANCELLED) {
- log.error("Error while executing script: " + ex.getMessage(), ex);
- }
- }
- return outputStream.toString(Charset.forName("UTF-8"));
- //log.error(errorOutputStream.toString(Charset.forName("UTF-8")));
}
}
--
Gitblit v1.10.0