mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
06.03.2019 bf7101f797e8f2a6c3b966762afb1ec063c4428e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package de.micromata.borgbutler.jobs;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class JobQueue<T> {
    private static final int MAX_DONE_JOBS_SIZE = 50;
    private static long jobSequence = 0;
    private Logger log = LoggerFactory.getLogger(JobQueue.class);
    private List<AbstractJob<T>> queue = new ArrayList<>();
    private List<AbstractJob<T>> doneJobs = new LinkedList<>();
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    private static synchronized void setNextJobId(AbstractJob<?> job) {
        job.setUniqueJobNumber(jobSequence++);
    }
 
    public int getQueueSize() {
        return queue.size();
    }
 
    public Iterator<AbstractJob<T>> getQueueIterator() {
        return Collections.unmodifiableList(queue).iterator();
    }
 
    /**
     * Searches only for queued jobs (not done jobs).
     *
     * @param uniqueJobNumber
     * @return The job if any job with the given unique job number is queued, otherwise null.
     */
    public AbstractJob<T> getQueuedJobByUniqueJobNumber(long uniqueJobNumber) {
        Iterator<AbstractJob<T>> it = queue.iterator();
        while (it.hasNext()) {
            AbstractJob<T> job = it.next();
            if (job.getUniqueJobNumber() == uniqueJobNumber) {
                return job;
            }
        }
        return null;
    }
 
    /**
     * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
     *
     * @param job
     * @return The given job (if it's not already running or queued), otherwise the already running or queued job.
     */
    public AbstractJob<T> append(AbstractJob<T> job) {
        synchronized (queue) {
            for (AbstractJob<T> queuedJob : queue) {
                if (Objects.equals(queuedJob.getId(), job.getId())) {
                    log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
                    return queuedJob;
                }
            }
            setNextJobId(job);
            queue.add(job.setStatus(AbstractJob.Status.QUEUED));
        }
        job.setFuture(executorService.submit(new CallableTask(job)));
        return job;
    }
 
    public AbstractJob getQueuedJob(Object id) {
        synchronized (queue) {
            for (AbstractJob job : queue) {
                if (Objects.equals(job.getId(), id)) {
                    return job;
                }
            }
        }
        return null;
    }
 
    /**
     * Removes all finished or cancelled jobs from the queued list and adds them to the list of done jobs.
     * Will be called automatically after finishing a job.
     * <br>
     * You should call this method after cancelling a job.
     */
    public void refreshQueue() {
        synchronized (queue) {
            if (queue.isEmpty()) {
                return;
            }
            Iterator<AbstractJob<T>> it = queue.iterator();
            while (it.hasNext()) {
                AbstractJob<T> job = it.next();
                if (job.isFinished()) {
                    it.remove();
                    synchronized (doneJobs) {
                        doneJobs.add(0, job);
                    }
                }
                synchronized (doneJobs) {
                    while (doneJobs.size() > MAX_DONE_JOBS_SIZE) {
                        doneJobs.remove(doneJobs.size() - 1);
                    }
                }
            }
        }
    }
 
    private class CallableTask implements Callable<JobResult<T>> {
        private AbstractJob<T> job;
 
        private CallableTask(AbstractJob<T> job) {
            this.job = job;
        }
 
        @Override
        public JobResult<T> call() throws Exception {
            if (job.isCancellationRequested()) {
                job.setStatus(AbstractJob.Status.CANCELLED);
                return null;
            }
            try {
                log.info("Starting job: " + job.getId());
                job.setStatus(AbstractJob.Status.RUNNING);
                JobResult<T> result = job.execute();
                if (!job.isFinished()) {
                    // Don't overwrite status failed set by job.
                    job.setStatus(AbstractJob.Status.DONE);
                }
                if (job.isCancellationRequested() && job.getStatus() != AbstractJob.Status.CANCELLED) {
                    log.info("Job #" + job.getUniqueJobNumber() + " cancelled: " + job.getId());
                    job.setCancelled();
                }
                refreshQueue();
                return result;
            } catch (Exception ex) {
                log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
                job.setStatus(AbstractJob.Status.FAILED);
                return null;
            }
        }
    }
}