/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2016 ForgeRock AS.
*/
package org.opends.server.util;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.api.DirectoryThread;
/**
* Implements a {@link ScheduledExecutorService} on top of a {@link Executors#newCachedThreadPool()
* cached thread pool} to achieve UNIX's cron-like capabilities.
*
* This executor service is implemented with two underlying executor services:
*
* - {@link Executors#newSingleThreadScheduledExecutor() a single-thread scheduled executor} which
* allows to start tasks based on a schedule
* - {@link Executors#newCachedThreadPool() a cached thread pool} which allows to run several
* tasks concurrently, adjusting the thread pool size when necessary. In particular, when the number
* of tasks to run concurrently is bigger than the tread pool size, then new threads are spawned.
* The thread pool is then shrinked when threads sit idle with no tasks to execute.
*
*
* All the tasks submitted to the current class are 1. scheduled by the single-thread scheduled
* executor and 2. finally executed by the cached thread pool.
*
* Because of this setup, the assumptions of
* {@link #scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} cannot be fulfilled, so calling
* this method will throw a {@link UnsupportedOperationException}.
*
*
*
* Current (Nov. 2016) OpenDJ threads that may be replaced by using the current class:
*
* - Idle Time Limit Thread
* - LDAP Connection Finalizer for connection handler Administration Connector
* - LDAP Connection Finalizer for connection handler LDAP Connection Handler
* - Monitor Provider State Updater
* - Task Scheduler Thread
* - Rejected: Time Thread - high resolution thread
*
*/
public class CronExecutorService implements ScheduledExecutorService
{
/** Made necessary by the double executor services. */
private static class FutureTaskResult implements ScheduledFuture
{
private volatile Future executorFuture;
private Future> schedulerFuture;
public void setExecutorFuture(Future executorFuture)
{
this.executorFuture = executorFuture;
}
public void setSchedulerFuture(Future> schedulerFuture)
{
this.schedulerFuture = schedulerFuture;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return schedulerFuture.cancel(mayInterruptIfRunning)
| (executorFuture != null && executorFuture.cancel(mayInterruptIfRunning));
}
@Override
public V get() throws InterruptedException, ExecutionException
{
schedulerFuture.get();
return executorFuture.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
schedulerFuture.get(timeout, unit);
return executorFuture.get(timeout, unit);
}
@Override
public boolean isCancelled()
{
return schedulerFuture.isCancelled() || executorFuture.isCancelled();
}
@Override
public boolean isDone()
{
return schedulerFuture.isDone() && executorFuture.isDone();
}
@Override
public long getDelay(TimeUnit unit)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public int compareTo(Delayed o)
{
throw new UnsupportedOperationException("Not implemented");
}
}
private final ExecutorService cronExecutorService;
private final ScheduledExecutorService cronScheduler;
/** Default constructor. */
public CronExecutorService()
{
cronExecutorService = Executors.newCachedThreadPool(new ThreadFactory()
{
private final List> workerThreads = new ArrayList<>();
@Override
public Thread newThread(Runnable r)
{
final Thread t = new DirectoryThread(r, "Cron worker thread - waiting for number");
t.setDaemon(true);
final int index = addThreadAndReturnWorkerThreadNumber(t);
t.setName("Cron worker thread " + index);
return t;
}
private synchronized int addThreadAndReturnWorkerThreadNumber(Thread t)
{
for (final ListIterator> it = workerThreads.listIterator(); it.hasNext();)
{
final WeakReference threadRef = it.next();
if (threadRef.get() == null)
{
// Free slot, let's use it
final int result = it.previousIndex();
it.set(new WeakReference<>(t));
return result;
}
}
final int result = workerThreads.size();
workerThreads.add(new WeakReference<>(t));
return result;
}
});
cronScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory()
{
@Override
public Thread newThread(final Runnable r)
{
Thread t = new DirectoryThread(r, "Cron scheduler thread");
t.setDaemon(true);
return t;
}
});
}
@Override
public void execute(final Runnable task)
{
/** Class specialized for this method. */
class ExecuteRunnable implements Runnable
{
@Override
public void run()
{
cronExecutorService.execute(task);
}
}
cronScheduler.execute(new ExecuteRunnable());
}
@Override
public Future> submit(final Runnable task)
{
return submit(task, null);
}
@Override
public Future submit(final Runnable task, final T result)
{
final FutureTaskResult futureResult = new FutureTaskResult<>();
/** Class specialized for this method. */
class SubmitRunnable implements Runnable
{
@Override
public void run()
{
futureResult.setExecutorFuture(cronExecutorService.submit(task, result));
}
}
futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitRunnable(), null));
return futureResult;
}
@Override
public Future submit(final Callable task)
{
final FutureTaskResult futureResult = new FutureTaskResult<>();
/** Class specialized for this method. */
class SubmitCallableRunnable implements Runnable
{
@Override
public void run()
{
futureResult.setExecutorFuture(cronExecutorService.submit(task));
}
}
futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitCallableRunnable()));
return futureResult;
}
@Override
public ScheduledFuture> scheduleAtFixedRate(final Runnable task, final long initialDelay, final long period,
final TimeUnit unit)
{
/** Class specialized for this method. */
class NoConcurrentRunRunnable implements Runnable
{
private AtomicBoolean isRunning = new AtomicBoolean();
@Override
public void run()
{
if (isRunning.compareAndSet(false, true))
{
try
{
task.run();
}
finally
{
isRunning.set(false);
}
}
}
}
final FutureTaskResult> futureResult = new FutureTaskResult<>();
/** Class specialized for this method. */
class ScheduleAtFixedRateRunnable implements Runnable
{
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void run()
{
futureResult.setExecutorFuture((Future) cronExecutorService.submit(new NoConcurrentRunRunnable()));
}
}
futureResult.setSchedulerFuture(
cronScheduler.scheduleAtFixedRate(new ScheduleAtFixedRateRunnable(), initialDelay, period, unit));
return futureResult;
}
@Override
public ScheduledFuture> scheduleWithFixedDelay(final Runnable task, final long initialDelay, final long delay,
final TimeUnit unit)
{
throw new UnsupportedOperationException("Cannot schedule with fixed delay between each runs of the task");
}
@Override
public ScheduledFuture schedule(final Callable task, final long delay, final TimeUnit unit)
{
final FutureTaskResult futureResult = new FutureTaskResult<>();
/** Class specialized for this method. */
class ScheduleRunnable implements Runnable
{
@Override
public void run()
{
futureResult.setExecutorFuture(cronExecutorService.submit(task));
}
}
futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleRunnable(), delay, unit));
return futureResult;
}
@Override
public ScheduledFuture> schedule(final Runnable task, final long delay, final TimeUnit unit)
{
final FutureTaskResult> futureResult = new FutureTaskResult<>();
/** Class specialized for this method. */
class ScheduleCallableRunnable implements Runnable
{
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void run()
{
futureResult.setExecutorFuture((Future) cronExecutorService.submit(task));
}
}
futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleCallableRunnable(), delay, unit));
return futureResult;
}
@Override
public T invokeAny(final Collection extends Callable> tasks, final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public T invokeAny(final Collection extends Callable> tasks) throws InterruptedException, ExecutionException
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public List> invokeAll(final Collection extends Callable> tasks, final long timeout,
final TimeUnit unit) throws InterruptedException
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public List> invokeAll(final Collection extends Callable> tasks) throws InterruptedException
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException
{
return cronScheduler.awaitTermination(timeout, unit) & cronExecutorService.awaitTermination(timeout, unit);
}
@Override
public void shutdown()
{
cronScheduler.shutdown();
cronExecutorService.shutdown();
}
@Override
public List shutdownNow()
{
final List results = new ArrayList<>();
results.addAll(cronScheduler.shutdownNow());
results.addAll(cronExecutorService.shutdownNow());
return results;
}
@Override
public boolean isShutdown()
{
return cronScheduler.isShutdown() && cronExecutorService.isShutdown();
}
@Override
public boolean isTerminated()
{
return cronScheduler.isTerminated() && cronExecutorService.isTerminated();
}
}