| | |
| | | import org.slf4j.LoggerFactory; |
| | | |
| | | import java.util.*; |
| | | import java.util.concurrent.Callable; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | |
| | | public class JobQueue { |
| | | public class JobQueue<T> { |
| | | private static final int MAX_DONE_JOBS_SIZE = 50; |
| | | private Logger log = LoggerFactory.getLogger(JobQueue.class); |
| | | private List<AbstractJob> queue = new ArrayList<>(); |
| | | private List<AbstractJob> doneJobs = new LinkedList<>(); |
| | | private ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| | | private Runner runner = new Runner(); |
| | | |
| | | public int getQueueSize() { |
| | | return queue.size(); |
| | |
| | | } |
| | | } |
| | | queue.add(job.setStatus(AbstractJob.Status.QUEUED)); |
| | | executorService.submit(new CallableTask(job)); |
| | | } |
| | | run(); |
| | | } |
| | | |
| | | public AbstractJob getQueuedJob(Object id) { |
| | |
| | | int counter = seconds / 10; |
| | | while (CollectionUtils.isNotEmpty(queue) && counter > 0) { |
| | | try { |
| | | run(); // If not running! |
| | | Thread.sleep(100); |
| | | } catch (InterruptedException ex) { |
| | | log.error(ex.getMessage(), ex); |
| | |
| | | } |
| | | } |
| | | |
| | | private void run() { |
| | | synchronized (executorService) { |
| | | if (!runner.running) { |
| | | log.info("Starting job executor..."); |
| | | executorService.submit(runner); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void organizeQueue() { |
| | | synchronized (queue) { |
| | | if (queue.isEmpty()) { |
| | |
| | | } |
| | | } |
| | | |
| | | private class Runner implements Runnable { |
| | | private boolean running; |
| | | private class CallableTask implements Callable<T> { |
| | | private AbstractJob<T> job; |
| | | |
| | | private CallableTask(AbstractJob<T> job) { |
| | | this.job = job; |
| | | } |
| | | |
| | | @Override |
| | | public void run() { |
| | | running = true; |
| | | while (true) { |
| | | AbstractJob job = null; |
| | | synchronized (queue) { |
| | | organizeQueue(); |
| | | if (queue.isEmpty()) { |
| | | running = false; |
| | | return; |
| | | } |
| | | for (AbstractJob queuedJob : queue) { |
| | | if (queuedJob.getStatus() == AbstractJob.Status.QUEUED) { |
| | | job = queuedJob; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | if (job == null) { |
| | | running = false; |
| | | return; |
| | | public T call() throws Exception { |
| | | if (job.isStopRequested()) { |
| | | job.setStatus(AbstractJob.Status.STOPPED); |
| | | return null; |
| | | } |
| | | try { |
| | | log.info("Starting job: " + job.getId()); |
| | | job.setStatus(AbstractJob.Status.RUNNING); |
| | | job.execute(); |
| | | T result = job.execute(); |
| | | if (!job.isFinished()) { |
| | | // Don't overwrite status failed set by job. |
| | | job.setStatus(AbstractJob.Status.DONE); |
| | | } |
| | | organizeQueue(); |
| | | return result; |
| | | } catch (Exception ex) { |
| | | log.error("Error while executing job '" + job.getId() + "': " + ex.getMessage(), ex); |
| | | job.setStatus(AbstractJob.Status.FAILED); |
| | | } |
| | | return null; |
| | | } |
| | | } |
| | | } |