/* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2016 ForgeRock AS. */ package org.opends.server.util; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.ListIterator; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.server.api.DirectoryThread; /** * Implements a {@link ScheduledExecutorService} on top of a {@link Executors#newCachedThreadPool() * cached thread pool} to achieve UNIX's cron-like capabilities. *

* This executor service is implemented with two underlying executor services: *

*

* All the tasks submitted to the current class are 1. scheduled by the single-thread scheduled * executor and 2. finally executed by the cached thread pool. *

* Because of this setup, the assumptions of * {@link #scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} cannot be fulfilled, so calling * this method will throw a {@link UnsupportedOperationException}. *

*   *

* Current (Nov. 2016) OpenDJ threads that may be replaced by using the current class: *

*/ public class CronExecutorService implements ScheduledExecutorService { /** Made necessary by the double executor services. */ private static class FutureTaskResult implements ScheduledFuture { private volatile Future executorFuture; private Future schedulerFuture; public void setExecutorFuture(Future executorFuture) { this.executorFuture = executorFuture; } public void setSchedulerFuture(Future schedulerFuture) { this.schedulerFuture = schedulerFuture; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return schedulerFuture.cancel(mayInterruptIfRunning) | (executorFuture != null && executorFuture.cancel(mayInterruptIfRunning)); } @Override public V get() throws InterruptedException, ExecutionException { schedulerFuture.get(); return executorFuture.get(); } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { schedulerFuture.get(timeout, unit); return executorFuture.get(timeout, unit); } @Override public boolean isCancelled() { return schedulerFuture.isCancelled() || executorFuture.isCancelled(); } @Override public boolean isDone() { return schedulerFuture.isDone() && executorFuture.isDone(); } @Override public long getDelay(TimeUnit unit) { throw new UnsupportedOperationException("Not implemented"); } @Override public int compareTo(Delayed o) { throw new UnsupportedOperationException("Not implemented"); } } private final ExecutorService cronExecutorService; private final ScheduledExecutorService cronScheduler; /** Default constructor. */ public CronExecutorService() { cronExecutorService = Executors.newCachedThreadPool(new ThreadFactory() { private final List> workerThreads = new ArrayList<>(); @Override public Thread newThread(Runnable r) { final Thread t = new DirectoryThread(r, "Cron worker thread - waiting for number"); t.setDaemon(true); final int index = addThreadAndReturnWorkerThreadNumber(t); t.setName("Cron worker thread " + index); return t; } private synchronized int addThreadAndReturnWorkerThreadNumber(Thread t) { for (final ListIterator> it = workerThreads.listIterator(); it.hasNext();) { final WeakReference threadRef = it.next(); if (threadRef.get() == null) { // Free slot, let's use it final int result = it.previousIndex(); it.set(new WeakReference<>(t)); return result; } } final int result = workerThreads.size(); workerThreads.add(new WeakReference<>(t)); return result; } }); cronScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(final Runnable r) { Thread t = new DirectoryThread(r, "Cron scheduler thread"); t.setDaemon(true); return t; } }); } @Override public void execute(final Runnable task) { /** Class specialized for this method. */ class ExecuteRunnable implements Runnable { @Override public void run() { cronExecutorService.execute(task); } } cronScheduler.execute(new ExecuteRunnable()); } @Override public Future submit(final Runnable task) { return submit(task, null); } @Override public Future submit(final Runnable task, final T result) { final FutureTaskResult futureResult = new FutureTaskResult<>(); /** Class specialized for this method. */ class SubmitRunnable implements Runnable { @Override public void run() { futureResult.setExecutorFuture(cronExecutorService.submit(task, result)); } } futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitRunnable(), null)); return futureResult; } @Override public Future submit(final Callable task) { final FutureTaskResult futureResult = new FutureTaskResult<>(); /** Class specialized for this method. */ class SubmitCallableRunnable implements Runnable { @Override public void run() { futureResult.setExecutorFuture(cronExecutorService.submit(task)); } } futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitCallableRunnable())); return futureResult; } @Override public ScheduledFuture scheduleAtFixedRate(final Runnable task, final long initialDelay, final long period, final TimeUnit unit) { /** Class specialized for this method. */ class NoConcurrentRunRunnable implements Runnable { private AtomicBoolean isRunning = new AtomicBoolean(); @Override public void run() { if (isRunning.compareAndSet(false, true)) { try { task.run(); } finally { isRunning.set(false); } } } } final FutureTaskResult futureResult = new FutureTaskResult<>(); /** Class specialized for this method. */ class ScheduleAtFixedRateRunnable implements Runnable { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public void run() { futureResult.setExecutorFuture((Future) cronExecutorService.submit(new NoConcurrentRunRunnable())); } } futureResult.setSchedulerFuture( cronScheduler.scheduleAtFixedRate(new ScheduleAtFixedRateRunnable(), initialDelay, period, unit)); return futureResult; } @Override public ScheduledFuture scheduleWithFixedDelay(final Runnable task, final long initialDelay, final long delay, final TimeUnit unit) { throw new UnsupportedOperationException("Cannot schedule with fixed delay between each runs of the task"); } @Override public ScheduledFuture schedule(final Callable task, final long delay, final TimeUnit unit) { final FutureTaskResult futureResult = new FutureTaskResult<>(); /** Class specialized for this method. */ class ScheduleRunnable implements Runnable { @Override public void run() { futureResult.setExecutorFuture(cronExecutorService.submit(task)); } } futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleRunnable(), delay, unit)); return futureResult; } @Override public ScheduledFuture schedule(final Runnable task, final long delay, final TimeUnit unit) { final FutureTaskResult futureResult = new FutureTaskResult<>(); /** Class specialized for this method. */ class ScheduleCallableRunnable implements Runnable { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public void run() { futureResult.setExecutorFuture((Future) cronExecutorService.submit(task)); } } futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleCallableRunnable(), delay, unit)); return futureResult; } @Override public T invokeAny(final Collection> tasks, final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { throw new UnsupportedOperationException("Not implemented"); } @Override public T invokeAny(final Collection> tasks) throws InterruptedException, ExecutionException { throw new UnsupportedOperationException("Not implemented"); } @Override public List> invokeAll(final Collection> tasks, final long timeout, final TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException("Not implemented"); } @Override public List> invokeAll(final Collection> tasks) throws InterruptedException { throw new UnsupportedOperationException("Not implemented"); } @Override public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { return cronScheduler.awaitTermination(timeout, unit) & cronExecutorService.awaitTermination(timeout, unit); } @Override public void shutdown() { cronScheduler.shutdown(); cronExecutorService.shutdown(); } @Override public List shutdownNow() { final List results = new ArrayList<>(); results.addAll(cronScheduler.shutdownNow()); results.addAll(cronExecutorService.shutdownNow()); return results; } @Override public boolean isShutdown() { return cronScheduler.isShutdown() && cronExecutorService.isShutdown(); } @Override public boolean isTerminated() { return cronScheduler.isTerminated() && cronExecutorService.isTerminated(); } }