mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
28.42.2018 ea69c28b8aa40b0de84e3ec52941d08ae9ef6cef
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package de.micromata.borgbutler.jobs;
 
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class JobQueue {
    private static final int MAX_DONE_JOBS_SIZE = 50;
    private Logger log = LoggerFactory.getLogger(JobQueue.class);
    private List<AbstractJob> queue = new ArrayList<>();
    private List<AbstractJob> doneJobs = new LinkedList<>();
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Runner runner = new Runner();
 
    public int getQueueSize() {
        return queue.size();
    }
 
    public List<AbstractJob> getDoneJobs() {
        return Collections.unmodifiableList(doneJobs);
    }
 
    /**
     * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
     *
     * @param job
     */
    public void append(AbstractJob job) {
        synchronized (queue) {
            for (AbstractJob queuedJob : queue) {
                if (Objects.equals(queuedJob.getId(), job.getId())) {
                    log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
                    return;
                }
            }
            queue.add(job.setStatus(AbstractJob.Status.QUEUED));
        }
        run();
    }
 
    public AbstractJob getQueuedJob(Object id) {
        for (AbstractJob job : queue) {
            if (Objects.equals(job.getId(), id)) {
                return job;
            }
        }
        return null;
    }
 
    void waitForQueue(int seconds) {
        int counter = seconds / 10;
        while (CollectionUtils.isNotEmpty(queue) && counter > 0) {
            try {
                run(); // If not running!
                Thread.sleep(100);
            } catch (InterruptedException ex) {
                log.error(ex.getMessage(), ex);
            }
        }
    }
 
    private void run() {
        synchronized (executorService) {
            if (!runner.running) {
                log.info("Starting job executor...");
                executorService.submit(runner);
            }
        }
    }
 
    private void organizeQueue() {
        synchronized (queue) {
            if (queue.isEmpty()) {
                return;
            }
            Iterator<AbstractJob> it = queue.iterator();
            while (it.hasNext()) {
                AbstractJob job = it.next();
                if (job.isFinished()) {
                    it.remove();
                    doneJobs.add(0, job);
                }
                while (doneJobs.size() > MAX_DONE_JOBS_SIZE) {
                    doneJobs.remove(doneJobs.size() - 1);
                }
            }
        }
    }
 
    private class Runner implements Runnable {
        private boolean running;
 
        @Override
        public void run() {
            running = true;
            while (true) {
                AbstractJob job = null;
                synchronized (queue) {
                    organizeQueue();
                    if (queue.isEmpty()) {
                        running = false;
                        return;
                    }
                    for (AbstractJob queuedJob : queue) {
                        if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) {
                            job = queuedJob;
                            break;
                        }
                    }
                }
                if (job == null) {
                    running = false;
                    return;
                }
                try {
                    log.info("Starting job: " + job.getId());
                    job.setStatus(AbstractJob.Status.RUNNING);
                    job.execute();
                    if (!job.isFinished()) {
                        // Don't overwrite status failed set by job.
                        job.setStatus(AbstractJob.Status.DONE);
                    }
                } catch (Exception ex) {
                    log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
                    job.setStatus(AbstractJob.Status.FAILED);
                }
            }
        }
    }
}