mirror of https://github.com/micromata/borgbackup-butler.git

Kai Reinhard
05.03.2019 67dd1243073e766178dd70dd2f45aa5bc77ec529
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package de.micromata.borgbutler.jobs;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class JobQueue<T> {
    private static final int MAX_DONE_JOBS_SIZE = 50;
    private static long jobSequence = 0;
    private Logger log = LoggerFactory.getLogger(JobQueue.class);
    private List<AbstractJob<T>> queue = new ArrayList<>();
    private List<AbstractJob<T>> doneJobs = new LinkedList<>();
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    private  static synchronized void setNextJobId(AbstractJob<?> job) {
        job.setUniqueJobNumber(jobSequence++);
    }
 
    public int getQueueSize() {
        return queue.size();
    }
 
    public Iterator<AbstractJob<T>> getQueueIterator() {
        return Collections.unmodifiableList(queue).iterator();
    }
 
    /**
     * Searches only for queued jobs (not done jobs).
     * @param uniqueJobNumber
     * @return The job if any job with the given unique job number is queued, otherwise null.
     */
    public AbstractJob<T> getQueuedJobByUniqueJobNumber(long uniqueJobNumber) {
        Iterator<AbstractJob<T>> it = queue.iterator();
        while (it.hasNext()) {
            AbstractJob<T> job = it.next();
            if (job.getUniqueJobNumber() == uniqueJobNumber) {
                return job;
            }
        }
        return null;
    }
 
    /**
     * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running.
     *
     * @param job
     * @return The given job (if it's not already running or queued), otherwise the already running or queued job.
     */
    public AbstractJob<T> append(AbstractJob<T> job) {
        synchronized (queue) {
            for (AbstractJob<T> queuedJob : queue) {
                if (Objects.equals(queuedJob.getId(), job.getId())) {
                    log.info("Job is already in the queue, don't run twice (OK): " + job.getId());
                    return queuedJob;
                }
            }
            setNextJobId(job);
            queue.add(job.setStatus(AbstractJob.Status.QUEUED));
        }
        job.setFuture(executorService.submit(new CallableTask(job)));
        return job;
    }
 
    public AbstractJob getQueuedJob(Object id) {
        synchronized (queue) {
            for (AbstractJob job : queue) {
                if (Objects.equals(job.getId(), id)) {
                    return job;
                }
            }
        }
        return null;
    }
 
    private void organizeQueue() {
        synchronized (queue) {
            if (queue.isEmpty()) {
                return;
            }
            Iterator<AbstractJob<T>> it = queue.iterator();
            while (it.hasNext()) {
                AbstractJob<T> job = it.next();
                if (job.isFinished()) {
                    it.remove();
                    synchronized (doneJobs) {
                        doneJobs.add(0, job);
                    }
                }
                synchronized (doneJobs) {
                    while (doneJobs.size() > MAX_DONE_JOBS_SIZE) {
                        doneJobs.remove(doneJobs.size() - 1);
                    }
                }
            }
        }
    }
 
    private class CallableTask implements Callable<JobResult<T>> {
        private AbstractJob<T> job;
 
        private CallableTask(AbstractJob<T> job) {
            this.job = job;
        }
 
        @Override
        public JobResult<T> call() throws Exception {
            if (job.isCancellationRequested()) {
                job.setStatus(AbstractJob.Status.CANCELLED);
                return null;
            }
            try {
                log.info("Starting job: " + job.getId());
                job.setStatus(AbstractJob.Status.RUNNING);
                JobResult<T> result = job.execute();
                if (!job.isFinished()) {
                    // Don't overwrite status failed set by job.
                    job.setStatus(AbstractJob.Status.DONE);
                }
                organizeQueue();
                return result;
            } catch (Exception ex) {
                log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex);
                job.setStatus(AbstractJob.Status.FAILED);
                return null;
            }
        }
    }
}