From 60f6fbc31679a112958db1fbecc62e8195b127f6 Mon Sep 17 00:00:00 2001
From: Kai Reinhard <K.Reinhard@micromata.de>
Date: Fri, 28 Dec 2018 22:55:49 +0000
Subject: [PATCH] Job queueing...

---
 /dev/null                                                                              |  141 -----------------------
 borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java |    8 
 borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java               |    6 
 borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java                |   25 ++--
 borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java                     |   83 +++++++++++++
 borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java           |   45 +++++++
 6 files changed, 148 insertions(+), 160 deletions(-)

diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java
index 3599d83..ea308d8 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java
@@ -35,11 +35,10 @@
         BorgCommand command = new BorgCommand()
                 .setParams("--version")
                 .setDescription("Getting borg version.");
-        execute(command);
+        String version = execute(command).getResult();
         if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
             return null;
         }
-        String version = command.getResponse();
         log.info("Borg version: " + version);
         return version;
     }
@@ -56,11 +55,11 @@
                 .setCommand("info")
                 .setParams("--json")
                 .setDescription("Loading info of repo '" + repoConfig.getDisplayName() + "'.");
-        execute(command);
+        String result = execute(command).getResult();
         if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
             return null;
         }
-        BorgRepoInfo repoInfo = JsonUtils.fromJson(BorgRepoInfo.class, command.getResponse());
+        BorgRepoInfo repoInfo = JsonUtils.fromJson(BorgRepoInfo.class, result);
         BorgRepository borgRepository = repoInfo.getRepository();
         Repository repository = new Repository()
                 .setId(borgRepository.getId())
@@ -88,12 +87,12 @@
                 .setCommand("list")
                 .setParams("--json")
                 .setDescription("Loading list of archives of repo '" + repoConfig.getDisplayName() + "'.");
-        execute(command);
+        String result = execute(command).getResult();
         if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
             log.error("Can't load archives from repo '" + repository.getName() + "'.");
             return;
         }
-        BorgRepoList repoList = JsonUtils.fromJson(BorgRepoList.class, command.getResponse());
+        BorgRepoList repoList = JsonUtils.fromJson(BorgRepoList.class, result);
         if (repoList == null || CollectionUtils.isEmpty(repoList.getArchives())) {
             log.error("Can't load archives from repo '" + repository.getName() + "'.");
             return;
@@ -129,11 +128,11 @@
                 .setArchive(archive.getName())
                 .setParams("--json")
                 .setDescription("Loading info of archive '" + archive.getName() + "' of repo '" + repoConfig.getDisplayName() + "'.");
-        execute(command);
+        String result = execute(command).getResult();
         if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
             return;
         }
-        BorgArchiveInfo archiveInfo = JsonUtils.fromJson(BorgArchiveInfo.class, command.getResponse());
+        BorgArchiveInfo archiveInfo = JsonUtils.fromJson(BorgArchiveInfo.class, result);
         if (archiveInfo == null) {
             log.error("Archive '" + command.getRepoArchive() + "' not found.");
             return;
@@ -170,12 +169,12 @@
                 .setArchive(archive.getName())
                 .setParams("--json-lines")
                 .setDescription("Loading list of files of archive '" + archive.getName() + "' of repo '" + repoConfig.getDisplayName() + "'.");
-        execute(command);
+        String result = execute(command).getResult();
         List<BorgFilesystemItem> content = new ArrayList<>();
         if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
             return content;
         }
-        try (Scanner scanner = new Scanner(command.getResponse())) {
+        try (Scanner scanner = new Scanner(result)) {
             while (scanner.hasNextLine()) {
                 String json = scanner.nextLine();
                 BorgFilesystemItem item = JsonUtils.fromJson(BorgFilesystemItem.class, json);
@@ -209,14 +208,14 @@
                 .setArchive(archive.getName())
                 .setArgs(path)
                 .setDescription("Extract content of archive '" + archive.getName()
-                                + "' of repo '" + repoConfig.getDisplayName() + "': "
+                        + "' of repo '" + repoConfig.getDisplayName() + "': "
                         + path);
         execute(command);
         return restoreDir;
     }
 
-    private static void execute(BorgCommand command) {
+    private static BorgJob execute(BorgCommand command) {
         Validate.notNull(command);
-        BorgExecutorQueue.getQueue(command.getRepoConfig()).execute(command);
+        return BorgQueueExecutor.getInstance().execute(command);
     }
 }
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
deleted file mode 100644
index 8cdb47d..0000000
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package de.micromata.borgbutler;
-
-import de.micromata.borgbutler.config.BorgRepoConfig;
-import de.micromata.borgbutler.config.ConfigurationHandler;
-import de.micromata.borgbutler.config.Definitions;
-import org.apache.commons.exec.*;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A queue is important because Borg doesn't support parallel calls for one repository.
- * For each repository one single queue is allocated.
- */
-public class BorgExecutorQueue {
-    private Logger log = LoggerFactory.getLogger(BorgExecutorQueue.class);
-    // key is the repo name.
-    private static Map<String, BorgExecutorQueue> queueMap = new HashMap<>();
-
-    public static BorgExecutorQueue getQueue(BorgRepoConfig config) {
-        synchronized (queueMap) {
-            String queueName = config != null ? config.getRepo() : "--NO_REPO--";
-            BorgExecutorQueue queue = queueMap.get(queueName);
-            if (queue == null) {
-                queue = new BorgExecutorQueue();
-                queueMap.put(queueName, queue);
-            }
-            return queue;
-        }
-    }
-
-    //private ConcurrentLinkedQueue<BorgCommand> commandQueue = new ConcurrentLinkedQueue<>();
-
-    public void execute(BorgCommand command) {
-        synchronized (this) {
-            _execute(command);
-        }
-    }
-
-    private void _execute(BorgCommand command) {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
-        CommandLine cmdLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand());
-        cmdLine.addArgument(command.getCommand());
-        if (command.getParams() != null) {
-            for (String param : command.getParams()) {
-                if (param != null)
-                    cmdLine.addArgument(param);
-            }
-        }
-        if (command.getRepoArchive() != null) {
-            cmdLine.addArgument(command.getRepoArchive());
-        }
-        if (command.getArgs() != null) {
-            for (String arg : command.getArgs()) {
-                if (arg != null)
-                    cmdLine.addArgument(arg);
-            }
-        }
-        DefaultExecutor executor = new DefaultExecutor();
-        if (command.getWorkingDir() != null) {
-            executor.setWorkingDirectory(command.getWorkingDir());
-        }
-        //executor.setExitValue(2);
-        ExecuteWatchdog watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
-        executor.setWatchdog(watchdog);
-        //  ExecuteResultHandler handler = new DefaultExecuteResultHandler();
-        PumpStreamHandler streamHandler = new PumpStreamHandler(new LogOutputStream() {
-            @Override
-            protected void processLine(String line, int level) {
-                try {
-                    outputStream.write(line.getBytes());
-                    outputStream.write("\n".getBytes());
-                } catch (IOException ex) {
-                    log.error(ex.getMessage(), ex);
-                }
-            }
-        }, new LogOutputStream() {
-            @Override
-            protected void processLine(String line, int logLevel) {
-                try {
-                    errorOutputStream.write(line.getBytes());
-                    errorOutputStream.write("\n".getBytes());
-                } catch (IOException ex) {
-                    log.error(ex.getMessage(), ex);
-                }
-            }
-        });
-        executor.setStreamHandler(streamHandler);
-        String borgCall = cmdLine.getExecutable() + " " + StringUtils.join(cmdLine.getArguments(), " ");
-        if (StringUtils.isNotBlank(command.getDescription())) {
-            log.info(command.getDescription());
-        }
-        log.info("Executing '" + borgCall + "'...");
-        try {
-            executor.execute(cmdLine, getEnvironment(command.getRepoConfig()));
-            command.setResultStatus(BorgCommand.ResultStatus.OK);
-        } catch (Exception ex) {
-            log.error("Error while creating environment for borg call '" + borgCall + "': " + ex.getMessage(), ex);
-            command.setResultStatus(BorgCommand.ResultStatus.ERROR);
-        }
-        command.setResponse(outputStream.toString(Definitions.STD_CHARSET));
-        if (command.getResultStatus() == BorgCommand.ResultStatus.ERROR) {
-            log.error("Response: " + command.getAbbreviatedResponse());
-        }
-    }
-
-
-    private Map<String, String> getEnvironment(BorgRepoConfig repoConfig) throws IOException {
-        if (repoConfig == null) {
-            return null;
-        }
-        Map<String, String> env = EnvironmentUtils.getProcEnvironment();
-        addEnvironmentVariable(env, "BORG_REPO", repoConfig.getRepo());
-        addEnvironmentVariable(env, "BORG_RSH", repoConfig.getRsh());
-        addEnvironmentVariable(env, "BORG_PASSPHRASE", repoConfig.getPassphrase());
-        String passcommand = repoConfig.getPasswordCommand();
-        if (StringUtils.isNotBlank(passcommand)) {
-            // For MacOS BORG_PASSCOMMAND="security find-generic-password -a $USER -s borg-passphrase -w"
-            passcommand = passcommand.replace("$USER", System.getProperty("user.name"));
-            addEnvironmentVariable(env, "BORG_PASSCOMMAND", passcommand);
-        }
-        return env;
-    }
-
-    private void addEnvironmentVariable(Map<String, String> env, String name, String value) {
-        if (StringUtils.isNotBlank(value)) {
-            EnvironmentUtils.addVariableToEnvironment(env, name + "=" + value);
-        }
-    }
-
-    private BorgExecutorQueue() {
-
-    }
-}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java
new file mode 100644
index 0000000..474b49b
--- /dev/null
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java
@@ -0,0 +1,83 @@
+package de.micromata.borgbutler;
+
+import de.micromata.borgbutler.config.BorgRepoConfig;
+import de.micromata.borgbutler.config.ConfigurationHandler;
+import de.micromata.borgbutler.config.Definitions;
+import de.micromata.borgbutler.jobs.AbstractCommandLineJob;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * A queue is important because Borg doesn't support parallel calls for one repository.
+ * For each repository one single queue is allocated.
+ */
+public class BorgJob extends AbstractCommandLineJob {
+    private Logger log = LoggerFactory.getLogger(BorgJob.class);
+    private BorgCommand command;
+
+    public BorgJob(BorgCommand command) {
+        this.command = command;
+        setWorkingDirectory(command.getWorkingDir());
+        setDescription(command.getDescription());
+    }
+
+    @Override
+    protected CommandLine buildCommandLine() {
+        CommandLine commandLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand());
+        commandLine.addArgument(command.getCommand());
+        if (command.getParams() != null) {
+            for (String param : command.getParams()) {
+                if (param != null)
+                    commandLine.addArgument(param);
+            }
+        }
+        if (command.getRepoArchive() != null) {
+            commandLine.addArgument(command.getRepoArchive());
+        }
+        if (command.getArgs() != null) {
+            for (String arg : command.getArgs()) {
+                if (arg != null)
+                    commandLine.addArgument(arg);
+            }
+        }
+        return commandLine;
+    }
+
+    @Override
+    protected void afterSuccess() {
+        command.setResultStatus(BorgCommand.ResultStatus.OK);
+        command.setResponse(outputStream.toString(Definitions.STD_CHARSET));
+    }
+
+    @Override
+    protected void afterFailure(Exception ex) {
+        command.setResultStatus(BorgCommand.ResultStatus.ERROR);
+        command.setResponse(outputStream.toString(Definitions.STD_CHARSET));
+        log.error("Response: " + command.getAbbreviatedResponse());
+    }
+
+    @Override
+    protected Map<String, String> getEnvironment() throws IOException {
+        BorgRepoConfig repoConfig = command.getRepoConfig();
+        if (repoConfig == null) {
+            return null;
+        }
+        Map<String, String> env = EnvironmentUtils.getProcEnvironment();
+        addEnvironmentVariable(env, "BORG_REPO", repoConfig.getRepo());
+        addEnvironmentVariable(env, "BORG_RSH", repoConfig.getRsh());
+        addEnvironmentVariable(env, "BORG_PASSPHRASE", repoConfig.getPassphrase());
+        String passcommand = repoConfig.getPasswordCommand();
+        if (StringUtils.isNotBlank(passcommand)) {
+            // For MacOS BORG_PASSCOMMAND="security find-generic-password -a $USER -s borg-passphrase -w"
+            passcommand = passcommand.replace("$USER", System.getProperty("user.name"));
+            addEnvironmentVariable(env, "BORG_PASSCOMMAND", passcommand);
+        }
+        return env;
+    }
+}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java
new file mode 100644
index 0000000..96daca2
--- /dev/null
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java
@@ -0,0 +1,45 @@
+package de.micromata.borgbutler;
+
+import de.micromata.borgbutler.config.BorgRepoConfig;
+import de.micromata.borgbutler.jobs.JobQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A queue is important because Borg doesn't support parallel calls for one repository.
+ * For each repository one single queue is allocated.
+ */
+public class BorgQueueExecutor {
+    private Logger log = LoggerFactory.getLogger(BorgQueueExecutor.class);
+    private static final BorgQueueExecutor instance = new BorgQueueExecutor();
+
+    public static BorgQueueExecutor getInstance() {
+        return instance;
+    }
+
+    // key is the repo name.
+    private Map<String, JobQueue> queueMap = new HashMap<>();
+
+    private JobQueue getQueue(BorgRepoConfig config) {
+        synchronized (queueMap) {
+            String queueName = config != null ? config.getRepo() : "--NO_REPO--";
+            JobQueue queue = queueMap.get(queueName);
+            if (queue == null) {
+                queue = new JobQueue();
+                queueMap.put(queueName, queue);
+            }
+            return queue;
+        }
+    }
+
+    public BorgJob execute(BorgCommand command) {
+        BorgJob job = new BorgJob(command);
+        return (BorgJob)getQueue(command.getRepoConfig()).append(job);
+    }
+
+    private BorgQueueExecutor() {
+    }
+}
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
index e4cc3ac..c8745ca 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
@@ -1,5 +1,6 @@
 package de.micromata.borgbutler.jobs;
 
+import de.micromata.borgbutler.config.Definitions;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.exec.*;
@@ -11,7 +12,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.Map;
 
 /**
@@ -32,6 +32,8 @@
     private File workingDirectory;
     @Setter
     private String description;
+    protected ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    protected ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
 
     protected abstract CommandLine buildCommandLine();
 
@@ -53,8 +55,6 @@
     @Override
     public String execute() {
         getCommandLineAsString();
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
         DefaultExecutor executor = new DefaultExecutor();
         if (workingDirectory != null) {
             executor.setWorkingDirectory(workingDirectory);
@@ -100,7 +100,7 @@
             failed();
             afterFailure(ex);
         }
-        return outputStream.toString(Charset.forName("UTF-8"));
+        return outputStream.toString(Definitions.STD_CHARSET);
     }
 
     @Override
diff --git a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
index 1935f0a..7ee6634 100644
--- a/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
+++ b/borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -28,17 +28,19 @@
      * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
      *
      * @param job
+     * @return The given job (if it's not already running or queued), otherwise the already running or queued job.
      */
-    public void append(AbstractJob job) {
+    public AbstractJob append(AbstractJob job) {
         synchronized (queue) {
             for (AbstractJob queuedJob : queue) {
                 if (Objects.equals(queuedJob.getId(), job.getId())) {
                     log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
-                    return;
+                    return queuedJob;
                 }
             }
             queue.add(job.setStatus(AbstractJob.Status.QUEUED));
             job.setFuture(executorService.submit(new CallableTask(job)));
+            return job;
         }
     }
 

--
Gitblit v1.10.0