mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
14.33.2019 53299b4114d02190d306c921d1685ebb3e6ad7c8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package de.micromata.borgbutler.jobs;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class JobQueue<T> {
    private static final int MAX_OLD_JOBS_SIZE = 50;
    private static long jobSequence = 0;
    private Logger log = LoggerFactory.getLogger(JobQueue.class);
    private List<AbstractJob<T>> queue = new ArrayList<>();
    /**
     * Finished, failed and cancelled jobs.
     */
    private List<AbstractJob<T>> oldJobs = new LinkedList<>();
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    private static synchronized void setNextJobId(AbstractJob<?> job) {
        job.setUniqueJobNumber(jobSequence++);
    }
 
    /**
     * @return the number of running and queued jobs of this queue or 0 if no job is in the queue.
     */
    public int getQueueSize() {
        return queue.size();
    }
 
    /**
     * @return the number of old jobs (done, failed or cancelled) stored. The size of stored old jobs is limited.
     */
    public int getOldJobsSize() {
        return oldJobs.size();
    }
 
    public Iterator<AbstractJob<T>> getQueueIterator() {
        synchronized (queue) {
            return Collections.unmodifiableList(queue).iterator();
        }
    }
 
    /**
     * Searches only for queued jobs (not done jobs).
     *
     * @param uniqueJobNumber
     * @return The job if any job with the given unique job number is queued, otherwise null.
     */
    public AbstractJob<T> getQueuedJobByUniqueJobNumber(long uniqueJobNumber) {
        synchronized (queue) {
            Iterator<AbstractJob<T>> it = queue.iterator();
            while (it.hasNext()) {
                AbstractJob<T> job = it.next();
                if (job.getUniqueJobNumber() == uniqueJobNumber) {
                    return job;
                }
            }
        }
        return null;
    }
 
    /**
     * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
     *
     * @param job
     * @return The given job (if it's not already running or queued), otherwise the already running or queued job.
     */
    public AbstractJob<T> append(AbstractJob<T> job) {
        synchronized (queue) {
            for (AbstractJob<T> queuedJob : queue) {
                if (Objects.equals(queuedJob.getId(), job.getId())) {
                    log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
                    return queuedJob;
                }
            }
            setNextJobId(job);
            queue.add(job.setStatus(AbstractJob.Status.QUEUED));
        }
        job.setFuture(executorService.submit(new CallableTask(job)));
        return job;
    }
 
    public AbstractJob getQueuedJob(Object id) {
        synchronized (queue) {
            for (AbstractJob job : queue) {
                if (Objects.equals(job.getId(), id)) {
                    return job;
                }
            }
        }
        return null;
    }
 
    /**
     * Removes all finished or cancelled jobs from the queued list and adds them to the list of done jobs.
     * Will be called automatically after finishing a job.
     * <br>
     * You should call this method after cancelling a job.
     */
    public void refreshQueue() {
        synchronized (queue) {
            if (queue.isEmpty()) {
                return;
            }
            Iterator<AbstractJob<T>> it = queue.iterator();
            while (it.hasNext()) {
                AbstractJob<T> job = it.next();
                if (job.isFinished()) {
                    it.remove();
                    synchronized (oldJobs) {
                        oldJobs.add(0, job);
                    }
                }
                synchronized (oldJobs) {
                    while (oldJobs.size() > MAX_OLD_JOBS_SIZE) {
                        oldJobs.remove(oldJobs.size() - 1);
                    }
                }
            }
        }
    }
 
    private class CallableTask implements Callable<JobResult<T>> {
        private AbstractJob<T> job;
 
        private CallableTask(AbstractJob<T> job) {
            this.job = job;
        }
 
        @Override
        public JobResult<T> call() throws Exception {
            if (job.isCancellationRequested()) {
                job.setStatus(AbstractJob.Status.CANCELLED);
                return null;
            }
            try {
                log.info("Starting job: " + job.getId());
                job.setStatus(AbstractJob.Status.RUNNING);
                JobResult<T> result = job.execute();
                if (!job.isFinished()) {
                    // Don't overwrite status failed set by job.
                    job.setStatus(AbstractJob.Status.DONE);
                }
                if (job.isCancellationRequested() && job.getStatus() != AbstractJob.Status.CANCELLED) {
                    log.info("Job #" + job.getUniqueJobNumber() + " cancelled: " + job.getId());
                    job.setCancelled();
                }
                refreshQueue();
                return result;
            } catch (Exception ex) {
                log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
                job.setStatus(AbstractJob.Status.FAILED);
                return null;
            }
        }
    }
 
    List<AbstractJob<T>> getOldJobs() {
        return oldJobs;
    }
}