mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
28.55.2018 60f6fbc31679a112958db1fbecc62e8195b127f6
Job queueing...
1 files deleted
3 files modified
2 files added
308 ■■■■ changed files
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java 25 ●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java 141 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java 83 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java 45 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java 8 ●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java 6 ●●●●● patch | view | raw | blame | history
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgCommands.java
@@ -35,11 +35,10 @@
        BorgCommand command = new BorgCommand()
                .setParams("--version")
                .setDescription("Getting borg version.");
        execute(command);
        String version = execute(command).getResult();
        if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
            return null;
        }
        String version = command.getResponse();
        log.info("Borg version: " + version);
        return version;
    }
@@ -56,11 +55,11 @@
                .setCommand("info")
                .setParams("--json")
                .setDescription("Loading info of repo '" + repoConfig.getDisplayName() + "'.");
        execute(command);
        String result = execute(command).getResult();
        if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
            return null;
        }
        BorgRepoInfo repoInfo = JsonUtils.fromJson(BorgRepoInfo.class, command.getResponse());
        BorgRepoInfo repoInfo = JsonUtils.fromJson(BorgRepoInfo.class, result);
        BorgRepository borgRepository = repoInfo.getRepository();
        Repository repository = new Repository()
                .setId(borgRepository.getId())
@@ -88,12 +87,12 @@
                .setCommand("list")
                .setParams("--json")
                .setDescription("Loading list of archives of repo '" + repoConfig.getDisplayName() + "'.");
        execute(command);
        String result = execute(command).getResult();
        if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
            log.error("Can't load archives from repo '" + repository.getName() + "'.");
            return;
        }
        BorgRepoList repoList = JsonUtils.fromJson(BorgRepoList.class, command.getResponse());
        BorgRepoList repoList = JsonUtils.fromJson(BorgRepoList.class, result);
        if (repoList == null || CollectionUtils.isEmpty(repoList.getArchives())) {
            log.error("Can't load archives from repo '" + repository.getName() + "'.");
            return;
@@ -129,11 +128,11 @@
                .setArchive(archive.getName())
                .setParams("--json")
                .setDescription("Loading info of archive '" + archive.getName() + "' of repo '" + repoConfig.getDisplayName() + "'.");
        execute(command);
        String result = execute(command).getResult();
        if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
            return;
        }
        BorgArchiveInfo archiveInfo = JsonUtils.fromJson(BorgArchiveInfo.class, command.getResponse());
        BorgArchiveInfo archiveInfo = JsonUtils.fromJson(BorgArchiveInfo.class, result);
        if (archiveInfo == null) {
            log.error("Archive '" + command.getRepoArchive() + "' not found.");
            return;
@@ -170,12 +169,12 @@
                .setArchive(archive.getName())
                .setParams("--json-lines")
                .setDescription("Loading list of files of archive '" + archive.getName() + "' of repo '" + repoConfig.getDisplayName() + "'.");
        execute(command);
        String result = execute(command).getResult();
        List<BorgFilesystemItem> content = new ArrayList<>();
        if (command.getResultStatus() != BorgCommand.ResultStatus.OK) {
            return content;
        }
        try (Scanner scanner = new Scanner(command.getResponse())) {
        try (Scanner scanner = new Scanner(result)) {
            while (scanner.hasNextLine()) {
                String json = scanner.nextLine();
                BorgFilesystemItem item = JsonUtils.fromJson(BorgFilesystemItem.class, json);
@@ -209,14 +208,14 @@
                .setArchive(archive.getName())
                .setArgs(path)
                .setDescription("Extract content of archive '" + archive.getName()
                                + "' of repo '" + repoConfig.getDisplayName() + "': "
                        + "' of repo '" + repoConfig.getDisplayName() + "': "
                        + path);
        execute(command);
        return restoreDir;
    }
    private static void execute(BorgCommand command) {
    private static BorgJob execute(BorgCommand command) {
        Validate.notNull(command);
        BorgExecutorQueue.getQueue(command.getRepoConfig()).execute(command);
        return BorgQueueExecutor.getInstance().execute(command);
    }
}
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgExecutorQueue.java
File was deleted
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgJob.java
New file
@@ -0,0 +1,83 @@
package de.micromata.borgbutler;
import de.micromata.borgbutler.config.BorgRepoConfig;
import de.micromata.borgbutler.config.ConfigurationHandler;
import de.micromata.borgbutler.config.Definitions;
import de.micromata.borgbutler.jobs.AbstractCommandLineJob;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
/**
 * A queue is important because Borg doesn't support parallel calls for one repository.
 * For each repository one single queue is allocated.
 */
public class BorgJob extends AbstractCommandLineJob {
    private Logger log = LoggerFactory.getLogger(BorgJob.class);
    private BorgCommand command;
    public BorgJob(BorgCommand command) {
        this.command = command;
        setWorkingDirectory(command.getWorkingDir());
        setDescription(command.getDescription());
    }
    @Override
    protected CommandLine buildCommandLine() {
        CommandLine commandLine = new CommandLine(ConfigurationHandler.getConfiguration().getBorgCommand());
        commandLine.addArgument(command.getCommand());
        if (command.getParams() != null) {
            for (String param : command.getParams()) {
                if (param != null)
                    commandLine.addArgument(param);
            }
        }
        if (command.getRepoArchive() != null) {
            commandLine.addArgument(command.getRepoArchive());
        }
        if (command.getArgs() != null) {
            for (String arg : command.getArgs()) {
                if (arg != null)
                    commandLine.addArgument(arg);
            }
        }
        return commandLine;
    }
    @Override
    protected void afterSuccess() {
        command.setResultStatus(BorgCommand.ResultStatus.OK);
        command.setResponse(outputStream.toString(Definitions.STD_CHARSET));
    }
    @Override
    protected void afterFailure(Exception ex) {
        command.setResultStatus(BorgCommand.ResultStatus.ERROR);
        command.setResponse(outputStream.toString(Definitions.STD_CHARSET));
        log.error("Response: " + command.getAbbreviatedResponse());
    }
    @Override
    protected Map<String, String> getEnvironment() throws IOException {
        BorgRepoConfig repoConfig = command.getRepoConfig();
        if (repoConfig == null) {
            return null;
        }
        Map<String, String> env = EnvironmentUtils.getProcEnvironment();
        addEnvironmentVariable(env, "BORG_REPO", repoConfig.getRepo());
        addEnvironmentVariable(env, "BORG_RSH", repoConfig.getRsh());
        addEnvironmentVariable(env, "BORG_PASSPHRASE", repoConfig.getPassphrase());
        String passcommand = repoConfig.getPasswordCommand();
        if (StringUtils.isNotBlank(passcommand)) {
            // For MacOS BORG_PASSCOMMAND="security find-generic-password -a $USER -s borg-passphrase -w"
            passcommand = passcommand.replace("$USER", System.getProperty("user.name"));
            addEnvironmentVariable(env, "BORG_PASSCOMMAND", passcommand);
        }
        return env;
    }
}
borgbutler-core/src/main/java/de/micromata/borgbutler/BorgQueueExecutor.java
New file
@@ -0,0 +1,45 @@
package de.micromata.borgbutler;
import de.micromata.borgbutler.config.BorgRepoConfig;
import de.micromata.borgbutler.jobs.JobQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
 * A queue is important because Borg doesn't support parallel calls for one repository.
 * For each repository one single queue is allocated.
 */
public class BorgQueueExecutor {
    private Logger log = LoggerFactory.getLogger(BorgQueueExecutor.class);
    private static final BorgQueueExecutor instance = new BorgQueueExecutor();
    public static BorgQueueExecutor getInstance() {
        return instance;
    }
    // key is the repo name.
    private Map<String, JobQueue> queueMap = new HashMap<>();
    private JobQueue getQueue(BorgRepoConfig config) {
        synchronized (queueMap) {
            String queueName = config != null ? config.getRepo() : "--NO_REPO--";
            JobQueue queue = queueMap.get(queueName);
            if (queue == null) {
                queue = new JobQueue();
                queueMap.put(queueName, queue);
            }
            return queue;
        }
    }
    public BorgJob execute(BorgCommand command) {
        BorgJob job = new BorgJob(command);
        return (BorgJob)getQueue(command.getRepoConfig()).append(job);
    }
    private BorgQueueExecutor() {
    }
}
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/AbstractCommandLineJob.java
@@ -1,5 +1,6 @@
package de.micromata.borgbutler.jobs;
import de.micromata.borgbutler.config.Definitions;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.exec.*;
@@ -11,7 +12,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Map;
/**
@@ -32,6 +32,8 @@
    private File workingDirectory;
    @Setter
    private String description;
    protected ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    protected ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
    protected abstract CommandLine buildCommandLine();
@@ -53,8 +55,6 @@
    @Override
    public String execute() {
        getCommandLineAsString();
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ByteArrayOutputStream errorOutputStream = new ByteArrayOutputStream();
        DefaultExecutor executor = new DefaultExecutor();
        if (workingDirectory != null) {
            executor.setWorkingDirectory(workingDirectory);
@@ -100,7 +100,7 @@
            failed();
            afterFailure(ex);
        }
        return outputStream.toString(Charset.forName("UTF-8"));
        return outputStream.toString(Definitions.STD_CHARSET);
    }
    @Override
borgbutler-core/src/main/java/de/micromata/borgbutler/jobs/JobQueue.java
@@ -28,17 +28,19 @@
     * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
     *
     * @param job
     * @return The given job (if it's not already running or queued), otherwise the already running or queued job.
     */
    public void append(AbstractJob job) {
    public AbstractJob append(AbstractJob job) {
        synchronized (queue) {
            for (AbstractJob queuedJob : queue) {
                if (Objects.equals(queuedJob.getId(), job.getId())) {
                    log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
                    return;
                    return queuedJob;
                }
            }
            queue.add(job.setStatus(AbstractJob.Status.QUEUED));
            job.setFuture(executorService.submit(new CallableTask(job)));
            return job;
        }
    }