From 60f6fbc31679a112958db1fbecc62e8195b127f6 Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 22:55:49 +0000
Subject: [PATCH] Job queueing...
---
/dev/null | 141 -----------------------
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java | 8
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java | 6
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java | 25 ++--
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java | 83 +++++++++++++
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java | 45 +++++++
6 files changed, 148 insertions(+), 160 deletions(-)
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java
index 3599d83..ea308d8 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java
@@ -35,11 +35,10 @@
BorgCommand command = new BorgCommand()
.setParams("--version")
.setDescription("Getting borg version.");
- execute(command);
+ String version = execute(command).getResult();
if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
return null;
}
- String version = command.getResponse();
log.info("Borg version: " + version);
return version;
}
@@ -56,11 +55,11 @@
.setCommand("info")
.setParams("--json")
.setDescription("Loading info of repo '" + repoConfig.getDisplayName() + "'.");
- execute(command);
+ String result = execute(command).getResult();
if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
return null;
}
- BorgRepoInfo repoInfo = JsonUtils.fromJson(BorgRepoInfo.class, command.getResponse());
+ BorgRepoInfo repoInfo = JsonUtils.fromJson(BorgRepoInfo.class, result);
BorgRepository borgRepository = repoInfo.getRepository();
Repository repository = new Repository()
.setId(borgRepository.getId())
@@ -88,12 +87,12 @@
.setCommand("list")
.setParams("--json")
.setDescription("Loading list of archives of repo '" + repoConfig.getDisplayName() + "'.");
- execute(command);
+ String result = execute(command).getResult();
if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
log.error("Can't load archives from repo '" + repository.getName() + "'.");
return;
}
- BorgRepoList repoList = JsonUtils.fromJson(BorgRepoList.class, command.getResponse());
+ BorgRepoList repoList = JsonUtils.fromJson(BorgRepoList.class, result);
if (repoList == null || CollectionUtils.isEmpty(repoList.getArchives())) {
log.error("Can't load archives from repo '" + repository.getName() + "'.");
return;
@@ -129,11 +128,11 @@
.setArchive(archive.getName())
.setParams("--json")
.setDescription("Loading info of archive '" + archive.getName() + "' of repo '" + repoConfig.getDisplayName() + "'.");
- execute(command);
+ String result = execute(command).getResult();
if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
return;
}
- BorgArchiveInfo archiveInfo = JsonUtils.fromJson(BorgArchiveInfo.class, command.getResponse());
+ BorgArchiveInfo archiveInfo = JsonUtils.fromJson(BorgArchiveInfo.class, result);
if (archiveInfo == null) {
log.error("Archive '" + command.getRepoArchive() + "' not found.");
return;
@@ -170,12 +169,12 @@
.setArchive(archive.getName())
.setParams("--json-lines")
.setDescription("Loading list of files of archive '" + archive.getName() + "' of repo '" + repoConfig.getDisplayName() + "'.");
- execute(command);
+ String result = execute(command).getResult();
List<BorgFilesystemItem> content = new ArrayList<>();
if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
return content;
}
- try (Scanner scanner = new Scanner(command.getResponse())) {
+ try (Scanner scanner = new Scanner(result)) {
while (scanner.hasNextLine()) {
String json = scanner.nextLine();
BorgFilesystemItem item = JsonUtils.fromJson(BorgFilesystemItem.class, json);
@@ -209,14 +208,14 @@
.setArchive(archive.getName())
.setArgs(path)
.setDescription("Extract content of archive '" + archive.getName()
- + "' of repo '" + repoConfig.getDisplayName() + "': "
+ + "' of repo '" + repoConfig.getDisplayName() + "': "
+ path);
execute(command);
return restoreDir;
}
- private static void execute(BorgCommand command) {
+ private static BorgJob execute(BorgCommand command) {
Validate.notNull(command);
- BorgExecutorQueue.getQueue(command.getRepoConfig()).execute(command);
+ return BorgQueueExecutor.getInstance().execute(command);
}
}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
deleted file mode 100644
index 8cdb47d..0000000
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package de.micromata.borgbutler;
-
-import de.micromata.borgbutler.config.BorgRepoConfig;
-import de.micromata.borgbutler.config.ConfigurationHandler;
-import de.micromata.borgbutler.config.Definitions;
-import org.apache.commons.exec.*;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A queue is important because Borg doesn't support parallel calls for one repository.
- * For each repository one single queue is allocated.
- */
-public class BorgExecutorQueue {
- private Logger log = LoggerFactory.getLogger(BorgExecutorQueue.class);
- // key is the repo name.
- private static Map<String, BorgExecutorQueue> queueMap = new HashMap<>();
-
- public static BorgExecutorQueue getQueue(BorgRepoConfig config) {
- synchronized (queueMap) {
- String queueName = config != null ? config.getRepo() : "--NO_REPO--";
- BorgExecutorQueue queue = queueMap.get(queueName);
- if (queue == null) {
- queue = new BorgExecutorQueue();
- queueMap.put(queueName, queue);
- }
- return queue;
- }
- }
-
- //private ConcurrentLinkedQueue<BorgCommand> commandQueue = new ConcurrentLinkedQueue<>();
-
- public void execute(BorgCommand command) {
- synchronized (this) {
- _execute(command);
- }
- }
-
- private void _execute(BorgCommand command) {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
- CommandLine cmdLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand());
- cmdLine.addArgument(command.getCommand());
- if (command.getParams() != null) {
- for (String param : command.getParams()) {
- if (param != null)
- cmdLine.addArgument(param);
- }
- }
- if (command.getRepoArchive() != null) {
- cmdLine.addArgument(command.getRepoArchive());
- }
- if (command.getArgs() != null) {
- for (String arg : command.getArgs()) {
- if (arg != null)
- cmdLine.addArgument(arg);
- }
- }
- DefaultExecutor executor = new DefaultExecutor();
- if (command.getWorkingDir() != null) {
- executor.setWorkingDirectory(command.getWorkingDir());
- }
- //executor.setExitValue(2);
- ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
- executor.setWatchdog(watchdog);
- // ExecuteResultHandler handler = new DefaultExecuteResultHandler();
- PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
- @Override
- protected void processLine(String line, int level) {
- try {
- outputStream.write(line.getBytes());
- outputStream.write("\n".getBytes());
- } catch (IOException ex) {
- log.error(ex.getMessage(), ex);
- }
- }
- }, new LogOutputStream() {
- @Override
- protected void processLine(String line, int logLevel) {
- try {
- errorOutputStream.write(line.getBytes());
- errorOutputStream.write("\n".getBytes());
- } catch (IOException ex) {
- log.error(ex.getMessage(), ex);
- }
- }
- });
- executor.setStreamHandler(streamHandler);
- String borgCall = cmdLine.getExecutable() + " " + StringUtils.join(cmdLine.getArguments(), " ");
- if (StringUtils.isNotBlank(command.getDescription())) {
- log.info(command.getDescription());
- }
- log.info("Executing '" + borgCall + "'...");
- try {
- executor.execute(cmdLine, getEnvironment(command.getRepoConfig()));
- command.setResultStatus(BorgCommand.ResultStatus.OK);
- } catch (Exception ex) {
- log.error("Error while creating environment for borg call '" + borgCall + "': " + ex.getMessage(), ex);
- command.setResultStatus(BorgCommand.ResultStatus.ERROR);
- }
- command.setResponse(outputStream.toString(Definitions.STD_CHARSET));
- if (command.getResultStatus() == BorgCommand.ResultStatus.ERROR) {
- log.error("Response: " + command.getAbbreviatedResponse());
- }
- }
-
-
- private Map<String, String> getEnvironment(BorgRepoConfig repoConfig) throws IOException {
- if (repoConfig == null) {
- return null;
- }
- Map<String, String> env = EnvironmentUtils.getProcEnvironment();
- addEnvironmentVariable(env, "BORG_REPO", repoConfig.getRepo());
- addEnvironmentVariable(env, "BORG_RSH", repoConfig.getRsh());
- addEnvironmentVariable(env, "BORG_PASSPHRASE", repoConfig.getPassphrase());
- String passcommand = repoConfig.getPasswordCommand();
- if (StringUtils.isNotBlank(passcommand)) {
- // For MacOS BORG_PASSCOMMAND="security find-generic-password -a $USER -s borg-passphrase -w"
- passcommand = passcommand.replace("$USER", System.getProperty("user.name"));
- addEnvironmentVariable(env, "BORG_PASSCOMMAND", passcommand);
- }
- return env;
- }
-
- private void addEnvironmentVariable(Map<String, String> env, String name, String value) {
- if (StringUtils.isNotBlank(value)) {
- EnvironmentUtils.addVariableToEnvironment(env, name + "=" + value);
- }
- }
-
- private BorgExecutorQueue() {
-
- }
-}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java
new file mode 100644
index 0000000..474b49b
--- /dev/null
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java
@@ -0,0 +1,83 @@
+package de.micromata.borgbutler;
+
+import de.micromata.borgbutler.config.BorgRepoConfig;
+import de.micromata.borgbutler.config.ConfigurationHandler;
+import de.micromata.borgbutler.config.Definitions;
+import de.micromata.borgbutler.jobs.AbstractCommandLineJob;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * A queue is important because Borg doesn't support parallel calls for one repository.
+ * For each repository one single queue is allocated.
+ */
+public class BorgJob extends AbstractCommandLineJob {
+ private Logger log = LoggerFactory.getLogger(BorgJob.class);
+ private BorgCommand command;
+
+ public BorgJob(BorgCommand command) {
+ this.command = command;
+ setWorkingDirectory(command.getWorkingDir());
+ setDescription(command.getDescription());
+ }
+
+ @Override
+ protected CommandLine buildCommandLine() {
+ CommandLine commandLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand());
+ commandLine.addArgument(command.getCommand());
+ if (command.getParams() != null) {
+ for (String param : command.getParams()) {
+ if (param != null)
+ commandLine.addArgument(param);
+ }
+ }
+ if (command.getRepoArchive() != null) {
+ commandLine.addArgument(command.getRepoArchive());
+ }
+ if (command.getArgs() != null) {
+ for (String arg : command.getArgs()) {
+ if (arg != null)
+ commandLine.addArgument(arg);
+ }
+ }
+ return commandLine;
+ }
+
+ @Override
+ protected void afterSuccess() {
+ command.setResultStatus(BorgCommand.ResultStatus.OK);
+ command.setResponse(outputStream.toString(Definitions.STD_CHARSET));
+ }
+
+ @Override
+ protected void afterFailure(Exception ex) {
+ command.setResultStatus(BorgCommand.ResultStatus.ERROR);
+ command.setResponse(outputStream.toString(Definitions.STD_CHARSET));
+ log.error("Response: " + command.getAbbreviatedResponse());
+ }
+
+ @Override
+ protected Map<String, String> getEnvironment() throws IOException {
+ BorgRepoConfig repoConfig = command.getRepoConfig();
+ if (repoConfig == null) {
+ return null;
+ }
+ Map<String, String> env = EnvironmentUtils.getProcEnvironment();
+ addEnvironmentVariable(env, "BORG_REPO", repoConfig.getRepo());
+ addEnvironmentVariable(env, "BORG_RSH", repoConfig.getRsh());
+ addEnvironmentVariable(env, "BORG_PASSPHRASE", repoConfig.getPassphrase());
+ String passcommand = repoConfig.getPasswordCommand();
+ if (StringUtils.isNotBlank(passcommand)) {
+ // For MacOS BORG_PASSCOMMAND="security find-generic-password -a $USER -s borg-passphrase -w"
+ passcommand = passcommand.replace("$USER", System.getProperty("user.name"));
+ addEnvironmentVariable(env, "BORG_PASSCOMMAND", passcommand);
+ }
+ return env;
+ }
+}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java
new file mode 100644
index 0000000..96daca2
--- /dev/null
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java
@@ -0,0 +1,45 @@
+package de.micromata.borgbutler;
+
+import de.micromata.borgbutler.config.BorgRepoConfig;
+import de.micromata.borgbutler.jobs.JobQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A queue is important because Borg doesn't support parallel calls for one repository.
+ * For each repository one single queue is allocated.
+ */
+public class BorgQueueExecutor {
+ private Logger log = LoggerFactory.getLogger(BorgQueueExecutor.class);
+ private static final BorgQueueExecutor instance = new BorgQueueExecutor();
+
+ public static BorgQueueExecutor getInstance() {
+ return instance;
+ }
+
+ // key is the repo name.
+ private Map<String, JobQueue> queueMap = new HashMap<>();
+
+ private JobQueue getQueue(BorgRepoConfig config) {
+ synchronized (queueMap) {
+ String queueName = config != null ? config.getRepo() : "--NO_REPO--";
+ JobQueue queue = queueMap.get(queueName);
+ if (queue == null) {
+ queue = new JobQueue();
+ queueMap.put(queueName, queue);
+ }
+ return queue;
+ }
+ }
+
+ public BorgJob execute(BorgCommand command) {
+ BorgJob job = new BorgJob(command);
+ return (BorgJob)getQueue(command.getRepoConfig()).append(job);
+ }
+
+ private BorgQueueExecutor() {
+ }
+}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
index e4cc3ac..c8745ca 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
@@ -1,5 +1,6 @@
package de.micromata.borgbutler.jobs;
+import de.micromata.borgbutler.config.Definitions;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.exec.*;
@@ -11,7 +12,6 @@
import java.io.File;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.util.Map;
/**
@@ -32,6 +32,8 @@
private File workingDirectory;
@Setter
private String description;
+ protected ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ protected ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
protected abstract CommandLine buildCommandLine();
@@ -53,8 +55,6 @@
@Override
public String execute() {
getCommandLineAsString();
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
DefaultExecutor executor = new DefaultExecutor();
if (workingDirectory != null) {
executor.setWorkingDirectory(workingDirectory);
@@ -100,7 +100,7 @@
failed();
afterFailure(ex);
}
- return outputStream.toString(Charset.forName("UTF-8"));
+ return outputStream.toString(Definitions.STD_CHARSET);
}
@Override
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
index 1935f0a..7ee6634 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -28,17 +28,19 @@
* Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
*
* @param job
+ * @return The given job (if it's not already running or queued), otherwise the already running or queued job.
*/
- public void append(AbstractJob job) {
+ public AbstractJob append(AbstractJob job) {
synchronized (queue) {
for (AbstractJob queuedJob : queue) {
if (Objects.equals(queuedJob.getId(), job.getId())) {
log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
- return;
+ return queuedJob;
}
}
queue.add(job.setStatus(AbstractJob.Status.QUEUED));
job.setFuture(executorService.submit(new CallableTask(job)));
+ return job;
}
}
--
Gitblit v1.10.0