| | |
| | | package de.micromata.borgbutler.jobs; |
| | | |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.Executor; |
| | | import org.apache.commons.collections4.CollectionUtils; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | |
| | | import java.util.*; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | |
| | | public class JobQueue { |
| | | private ConcurrentLinkedQueue<AbstractJob> queue = new ConcurrentLinkedQueue<>(); |
| | | private List<AbstractJob> done = new LinkedList<>(); |
| | | Executor executor = Executors.newSingleThreadExecutor(); |
| | | private Logger log = LoggerFactory.getLogger(JobQueue.class); |
| | | private List<AbstractJob> queue = new ArrayList<>(); |
| | | private List<AbstractJob> doneJobs = new LinkedList<>(); |
| | | private ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| | | private Runner runner = new Runner(); |
| | | |
| | | public AbstractJob appendOrJoin(AbstractJob job) { |
| | | public int getQueueSize() { |
| | | return queue.size(); |
| | | } |
| | | |
| | | /** |
| | | * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running. |
| | | * |
| | | * @param job |
| | | */ |
| | | public void append(AbstractJob job) { |
| | | synchronized (queue) { |
| | | if (queue.contains(job)) { |
| | | for (AbstractJob queuedJob : queue) { |
| | | if (queuedJob.equals(job)) { |
| | | return queuedJob; |
| | | } |
| | | for (AbstractJob queuedJob : queue) { |
| | | if (Objects.equals(queuedJob.getId(), job.getId())) { |
| | | log.info("Job is already in the queue, don't run twice (OK): " + job.getId()); |
| | | return; |
| | | } |
| | | } |
| | | queue.add(job); |
| | | return job; |
| | | queue.add(job.setStatus(AbstractJob.Status.QUEUED)); |
| | | } |
| | | run(); |
| | | } |
| | | |
| | | public AbstractJob getQueuedJob(Object id) { |
| | | for (AbstractJob job : queue) { |
| | | if (Objects.equals(job.getId(), id)) { |
| | | return job; |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | void waitForQueue(int seconds) { |
| | | int counter = seconds / 10; |
| | | while (CollectionUtils.isNotEmpty(queue) && counter > 0) { |
| | | try { |
| | | run(); // If not running! |
| | | Thread.sleep(100); |
| | | } catch (InterruptedException ex) { |
| | | log.error(ex.getMessage(), ex); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void run() { |
| | | synchronized (executorService) { |
| | | if (!runner.running) { |
| | | log.info("Starting job executor..."); |
| | | executorService.submit(runner); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void organizeQueue() { |
| | | synchronized (queue) { |
| | | if (queue.isEmpty()) { |
| | | return; |
| | | } |
| | | Iterator<AbstractJob> it = queue.iterator(); |
| | | while (it.hasNext()) { |
| | | AbstractJob job = it.next(); |
| | | if (job.isFinished()) { |
| | | it.remove(); |
| | | doneJobs.add(0, job); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private class Runner implements Runnable { |
| | | private boolean running; |
| | | |
| | | @Override |
| | | public void run() { |
| | | running = true; |
| | | while (true) { |
| | | AbstractJob job = null; |
| | | synchronized (queue) { |
| | | organizeQueue(); |
| | | if (queue.isEmpty()) { |
| | | running = false; |
| | | return; |
| | | } |
| | | for (AbstractJob queuedJob : queue) { |
| | | if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) { |
| | | job = queuedJob; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | if (job == null) { |
| | | running = false; |
| | | return; |
| | | } |
| | | try { |
| | | log.info("Starting job: " + job.getId()); |
| | | job.setStatus(AbstractJob.Status.RUNNING); |
| | | job.execute(); |
| | | job.setStatus(AbstractJob.Status.DONE); |
| | | } catch (Exception ex) { |
| | | log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex); |
| | | job.setStatus(AbstractJob.Status.FAILED); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |