| New file |
| | |
| | | /* |
| | | * The contents of this file are subject to the terms of the Common Development and |
| | | * Distribution License (the License). You may not use this file except in compliance with the |
| | | * License. |
| | | * |
| | | * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the |
| | | * specific language governing permission and limitations under the License. |
| | | * |
| | | * When distributing Covered Software, include this CDDL Header Notice in each file and include |
| | | * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL |
| | | * Header, with the fields enclosed by brackets [] replaced by your own identifying |
| | | * information: "Portions Copyright [year] [name of copyright owner]". |
| | | * |
| | | * Copyright 2016 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.util; |
| | | |
| | | import java.lang.ref.WeakReference; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.List; |
| | | import java.util.ListIterator; |
| | | import java.util.concurrent.Callable; |
| | | import java.util.concurrent.Delayed; |
| | | import java.util.concurrent.ExecutionException; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.Future; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.ScheduledThreadPoolExecutor; |
| | | import java.util.concurrent.ThreadFactory; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.TimeoutException; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | |
| | | /** |
| | | * Implements a {@link ScheduledExecutorService} on top of a {@link Executors#newCachedThreadPool() |
| | | * cached thread pool} to achieve UNIX's cron-like capabilities. |
| | | * <p> |
| | | * This executor service is implemented with two underlying executor services: |
| | | * <ul> |
| | | * <li>{@link Executors#newSingleThreadScheduledExecutor() a single-thread scheduled executor} which |
| | | * allows to start tasks based on a schedule</li> |
| | | * <li>{@link Executors#newCachedThreadPool() a cached thread pool} which allows to run several |
| | | * tasks concurrently, adjusting the thread pool size when necessary. In particular, when the number |
| | | * of tasks to run concurrently is bigger than the tread pool size, then new threads are spawned. |
| | | * The thread pool is then shrinked when threads sit idle with no tasks to execute.</li> |
| | | * </ul> |
| | | * <p> |
| | | * All the tasks submitted to the current class are 1. scheduled by the single-thread scheduled |
| | | * executor and 2. finally executed by the cached thread pool. |
| | | * <p> |
| | | * Because of this setup, the assumptions of |
| | | * {@link #scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} cannot be fulfilled, so calling |
| | | * this method will throw a {@link UnsupportedOperationException}. |
| | | * <p> |
| | | * |
| | | * <p> |
| | | * Current (Nov. 2016) OpenDJ threads that may be replaced by using the current class: |
| | | * <ul> |
| | | * <li>Idle Time Limit Thread</li> |
| | | * <li>LDAP Connection Finalizer for connection handler Administration Connector</li> |
| | | * <li>LDAP Connection Finalizer for connection handler LDAP Connection Handler</li> |
| | | * <li>Monitor Provider State Updater</li> |
| | | * <li>Task Scheduler Thread</li> |
| | | * <li>Rejected: Time Thread - high resolution thread</li> |
| | | * </ul> |
| | | */ |
| | | public class CronExecutorService implements ScheduledExecutorService |
| | | { |
| | | /** Made necessary by the double executor services. */ |
| | | private static class FutureTaskResult<V> implements ScheduledFuture<V> |
| | | { |
| | | private volatile Future<V> executorFuture; |
| | | private Future<?> schedulerFuture; |
| | | |
| | | public void setExecutorFuture(Future<V> executorFuture) |
| | | { |
| | | this.executorFuture = executorFuture; |
| | | } |
| | | |
| | | public void setSchedulerFuture(Future<?> schedulerFuture) |
| | | { |
| | | this.schedulerFuture = schedulerFuture; |
| | | } |
| | | |
| | | @Override |
| | | public boolean cancel(boolean mayInterruptIfRunning) |
| | | { |
| | | return schedulerFuture.cancel(mayInterruptIfRunning) |
| | | | (executorFuture != null && executorFuture.cancel(mayInterruptIfRunning)); |
| | | } |
| | | |
| | | @Override |
| | | public V get() throws InterruptedException, ExecutionException |
| | | { |
| | | schedulerFuture.get(); |
| | | return executorFuture.get(); |
| | | } |
| | | |
| | | @Override |
| | | public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException |
| | | { |
| | | schedulerFuture.get(timeout, unit); |
| | | return executorFuture.get(timeout, unit); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isCancelled() |
| | | { |
| | | return schedulerFuture.isCancelled() || executorFuture.isCancelled(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isDone() |
| | | { |
| | | return schedulerFuture.isDone() && executorFuture.isDone(); |
| | | } |
| | | |
| | | @Override |
| | | public long getDelay(TimeUnit unit) |
| | | { |
| | | throw new UnsupportedOperationException("Not implemented"); |
| | | } |
| | | |
| | | @Override |
| | | public int compareTo(Delayed o) |
| | | { |
| | | throw new UnsupportedOperationException("Not implemented"); |
| | | } |
| | | } |
| | | |
| | | private final ExecutorService cronExecutorService; |
| | | private final ScheduledExecutorService cronScheduler; |
| | | |
| | | /** Default constructor. */ |
| | | public CronExecutorService() |
| | | { |
| | | cronExecutorService = Executors.newCachedThreadPool(new ThreadFactory() |
| | | { |
| | | private final List<WeakReference<Thread>> workerThreads = new ArrayList<>(); |
| | | |
| | | @Override |
| | | public Thread newThread(Runnable r) |
| | | { |
| | | final Thread t = new DirectoryThread(r, "Cron worker thread - waiting for number"); |
| | | t.setDaemon(true); |
| | | final int index = addThreadAndReturnWorkerThreadNumber(t); |
| | | t.setName("Cron worker thread " + index); |
| | | return t; |
| | | } |
| | | |
| | | private synchronized int addThreadAndReturnWorkerThreadNumber(Thread t) |
| | | { |
| | | for (final ListIterator<WeakReference<Thread>> it = workerThreads.listIterator(); it.hasNext();) |
| | | { |
| | | final WeakReference<Thread> threadRef = it.next(); |
| | | if (threadRef.get() == null) |
| | | { |
| | | // Free slot, let's use it |
| | | final int result = it.previousIndex(); |
| | | it.set(new WeakReference<>(t)); |
| | | return result; |
| | | } |
| | | } |
| | | final int result = workerThreads.size(); |
| | | workerThreads.add(new WeakReference<>(t)); |
| | | return result; |
| | | } |
| | | }); |
| | | cronScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() |
| | | { |
| | | @Override |
| | | public Thread newThread(final Runnable r) |
| | | { |
| | | Thread t = new DirectoryThread(r, "Cron scheduler thread"); |
| | | t.setDaemon(true); |
| | | return t; |
| | | } |
| | | }); |
| | | } |
| | | |
| | | @Override |
| | | public void execute(final Runnable task) |
| | | { |
| | | /** Class specialized for this method. */ |
| | | class ExecuteRunnable implements Runnable |
| | | { |
| | | @Override |
| | | public void run() |
| | | { |
| | | cronExecutorService.execute(task); |
| | | } |
| | | } |
| | | cronScheduler.execute(new ExecuteRunnable()); |
| | | } |
| | | |
| | | @Override |
| | | public Future<?> submit(final Runnable task) |
| | | { |
| | | return submit(task, null); |
| | | } |
| | | |
| | | @Override |
| | | public <T> Future<T> submit(final Runnable task, final T result) |
| | | { |
| | | final FutureTaskResult<T> futureResult = new FutureTaskResult<>(); |
| | | /** Class specialized for this method. */ |
| | | class SubmitRunnable implements Runnable |
| | | { |
| | | @Override |
| | | public void run() |
| | | { |
| | | futureResult.setExecutorFuture(cronExecutorService.submit(task, result)); |
| | | } |
| | | } |
| | | futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitRunnable(), null)); |
| | | return futureResult; |
| | | } |
| | | |
| | | @Override |
| | | public <T> Future<T> submit(final Callable<T> task) |
| | | { |
| | | final FutureTaskResult<T> futureResult = new FutureTaskResult<>(); |
| | | /** Class specialized for this method. */ |
| | | class SubmitCallableRunnable implements Runnable |
| | | { |
| | | @Override |
| | | public void run() |
| | | { |
| | | futureResult.setExecutorFuture(cronExecutorService.submit(task)); |
| | | } |
| | | } |
| | | futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitCallableRunnable())); |
| | | return futureResult; |
| | | } |
| | | |
| | | @Override |
| | | public ScheduledFuture<?> scheduleAtFixedRate(final Runnable task, final long initialDelay, final long period, |
| | | final TimeUnit unit) |
| | | { |
| | | /** Class specialized for this method. */ |
| | | class NoConcurrentRunRunnable implements Runnable |
| | | { |
| | | private AtomicBoolean isRunning = new AtomicBoolean(); |
| | | |
| | | @Override |
| | | public void run() |
| | | { |
| | | if (isRunning.compareAndSet(false, true)) |
| | | { |
| | | try |
| | | { |
| | | task.run(); |
| | | } |
| | | finally |
| | | { |
| | | isRunning.set(false); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | final FutureTaskResult<?> futureResult = new FutureTaskResult<>(); |
| | | /** Class specialized for this method. */ |
| | | class ScheduleAtFixedRateRunnable implements Runnable |
| | | { |
| | | @SuppressWarnings({ "rawtypes", "unchecked" }) |
| | | @Override |
| | | public void run() |
| | | { |
| | | futureResult.setExecutorFuture((Future) cronExecutorService.submit(new NoConcurrentRunRunnable())); |
| | | } |
| | | } |
| | | futureResult.setSchedulerFuture( |
| | | cronScheduler.scheduleAtFixedRate(new ScheduleAtFixedRateRunnable(), initialDelay, period, unit)); |
| | | return futureResult; |
| | | } |
| | | |
| | | @Override |
| | | public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable task, final long initialDelay, final long delay, |
| | | final TimeUnit unit) |
| | | { |
| | | throw new UnsupportedOperationException("Cannot schedule with fixed delay between each runs of the task"); |
| | | } |
| | | |
| | | @Override |
| | | public <V> ScheduledFuture<V> schedule(final Callable<V> task, final long delay, final TimeUnit unit) |
| | | { |
| | | final FutureTaskResult<V> futureResult = new FutureTaskResult<>(); |
| | | /** Class specialized for this method. */ |
| | | class ScheduleRunnable implements Runnable |
| | | { |
| | | @Override |
| | | public void run() |
| | | { |
| | | futureResult.setExecutorFuture(cronExecutorService.submit(task)); |
| | | } |
| | | } |
| | | futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleRunnable(), delay, unit)); |
| | | return futureResult; |
| | | } |
| | | |
| | | @Override |
| | | public ScheduledFuture<?> schedule(final Runnable task, final long delay, final TimeUnit unit) |
| | | { |
| | | final FutureTaskResult<?> futureResult = new FutureTaskResult<>(); |
| | | /** Class specialized for this method. */ |
| | | class ScheduleCallableRunnable implements Runnable |
| | | { |
| | | @SuppressWarnings({ "rawtypes", "unchecked" }) |
| | | @Override |
| | | public void run() |
| | | { |
| | | futureResult.setExecutorFuture((Future) cronExecutorService.submit(task)); |
| | | } |
| | | } |
| | | futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleCallableRunnable(), delay, unit)); |
| | | return futureResult; |
| | | } |
| | | |
| | | @Override |
| | | public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) |
| | | throws InterruptedException, ExecutionException, TimeoutException |
| | | { |
| | | throw new UnsupportedOperationException("Not implemented"); |
| | | } |
| | | |
| | | @Override |
| | | public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException |
| | | { |
| | | throw new UnsupportedOperationException("Not implemented"); |
| | | } |
| | | |
| | | @Override |
| | | public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, |
| | | final TimeUnit unit) throws InterruptedException |
| | | { |
| | | throw new UnsupportedOperationException("Not implemented"); |
| | | } |
| | | |
| | | @Override |
| | | public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException |
| | | { |
| | | throw new UnsupportedOperationException("Not implemented"); |
| | | } |
| | | |
| | | @Override |
| | | public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException |
| | | { |
| | | return cronScheduler.awaitTermination(timeout, unit) & cronExecutorService.awaitTermination(timeout, unit); |
| | | } |
| | | |
| | | @Override |
| | | public void shutdown() |
| | | { |
| | | cronScheduler.shutdown(); |
| | | cronExecutorService.shutdown(); |
| | | } |
| | | |
| | | @Override |
| | | public List<Runnable> shutdownNow() |
| | | { |
| | | final List<Runnable> results = new ArrayList<>(); |
| | | results.addAll(cronScheduler.shutdownNow()); |
| | | results.addAll(cronExecutorService.shutdownNow()); |
| | | return results; |
| | | } |
| | | |
| | | @Override |
| | | public boolean isShutdown() |
| | | { |
| | | return cronScheduler.isShutdown() && cronExecutorService.isShutdown(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isTerminated() |
| | | { |
| | | return cronScheduler.isTerminated() && cronExecutorService.isTerminated(); |
| | | } |
| | | } |