package de.micromata.borgbutler.jobs; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class JobQueue { private Logger log = LoggerFactory.getLogger(JobQueue.class); private List queue = new ArrayList<>(); private List doneJobs = new LinkedList<>(); private ExecutorService executorService = Executors.newSingleThreadExecutor(); private Runner runner = new Runner(); public int getQueueSize() { return queue.size(); } /** * Appends the job if not alread in the queue. Starts the execution if no execution thread is already running. * * @param job */ public void append(AbstractJob job) { synchronized (queue) { for (AbstractJob queuedJob : queue) { if (Objects.equals(queuedJob.getId(), job.getId())) { log.info("Job is already in the queue, don't run twice (OK): " + job.getId()); return; } } queue.add(job.setStatus(AbstractJob.Status.QUEUED)); } run(); } public AbstractJob getQueuedJob(Object id) { for (AbstractJob job : queue) { if (Objects.equals(job.getId(), id)) { return job; } } return null; } void waitForQueue(int seconds) { int counter = seconds / 10; while (CollectionUtils.isNotEmpty(queue) && counter > 0) { try { run(); // If not running! Thread.sleep(100); } catch (InterruptedException ex) { log.error(ex.getMessage(), ex); } } } private void run() { synchronized (executorService) { if (!runner.running) { log.info("Starting job executor..."); executorService.submit(runner); } } } private void organizeQueue() { synchronized (queue) { if (queue.isEmpty()) { return; } Iterator it = queue.iterator(); while (it.hasNext()) { AbstractJob job = it.next(); if (job.isFinished()) { it.remove(); doneJobs.add(0, job); } } } } private class Runner implements Runnable { private boolean running; @Override public void run() { running = true; while (true) { AbstractJob job = null; synchronized (queue) { organizeQueue(); if (queue.isEmpty()) { running = false; return; } for (AbstractJob queuedJob : queue) { if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) { job = queuedJob; break; } } } if (job == null) { running = false; return; } try { log.info("Starting job: " + job.getId()); job.setStatus(AbstractJob.Status.RUNNING); job.execute(); job.setStatus(AbstractJob.Status.DONE); } catch (Exception ex) { log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex); job.setStatus(AbstractJob.Status.FAILED); } } } } }