From 48c52a7de8179e42aa4cb614a2e5e7f91ce1c79c Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Nov 2016 15:53:49 +0000
Subject: [PATCH] Prep work OPENDJ-3421 Implement ReplicationServiceDiscoverMechanism
---
opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java | 13 +
opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java | 384 ++++++++++++++++++++++++++++++++++++++
opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java | 10
opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java | 164 ++++++++++++++++
4 files changed, 569 insertions(+), 2 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java b/opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java
index c34cd06..8c92539 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java
@@ -17,6 +17,7 @@
package org.opends.server.core;
import static com.forgerock.opendj.cli.CommonArguments.*;
+
import static org.forgerock.util.Reject.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ToolMessages.*;
@@ -54,6 +55,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
@@ -68,6 +70,7 @@
import org.forgerock.opendj.config.server.ServerManagementContext;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.schema.Schema;
import org.forgerock.opendj.server.config.server.AlertHandlerCfg;
import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
import org.forgerock.opendj.server.config.server.CryptoManagerCfg;
@@ -152,10 +155,10 @@
import org.opends.server.types.Operation;
import org.opends.server.types.Privilege;
import org.opends.server.types.RestoreConfig;
-import org.forgerock.opendj.ldap.schema.Schema;
import org.opends.server.types.VirtualAttributeRule;
import org.opends.server.types.WritabilityMode;
import org.opends.server.util.BuildVersion;
+import org.opends.server.util.CronExecutorService;
import org.opends.server.util.MultiOutputStream;
import org.opends.server.util.RuntimeInformation;
import org.opends.server.util.SetupUtils;
@@ -646,6 +649,7 @@
private CommonAudit commonAudit;
private Router httpRouter;
+ private CronExecutorService cronExecutorService;
/** Class that prints the version of OpenDJ server to System.out. */
public static final class DirectoryServerVersionHandler implements VersionHandler
@@ -1054,6 +1058,12 @@
{
return directoryServer.cryptoManager;
}
+
+ @Override
+ public ScheduledExecutorService getCronExecutorService()
+ {
+ return directoryServer.cronExecutorService;
+ }
}
/**
@@ -1461,6 +1471,7 @@
commonAudit = new CommonAudit(serverContext);
httpRouter = new Router();
+ cronExecutorService = new CronExecutorService();
// Allow internal plugins to be registered.
pluginConfigManager.initializePluginConfigManager();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java b/opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java
index e17fad3..22e06ab 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java
@@ -15,15 +15,17 @@
*/
package org.opends.server.core;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.forgerock.http.routing.Router;
import org.forgerock.opendj.config.server.ServerManagementContext;
+import org.forgerock.opendj.ldap.schema.Schema;
import org.forgerock.opendj.server.config.server.RootCfg;
import org.opends.server.extensions.DiskSpaceMonitor;
import org.opends.server.loggers.CommonAudit;
import org.opends.server.schema.SchemaHandler;
import org.opends.server.types.CryptoManager;
import org.opends.server.types.DirectoryEnvironmentConfig;
-import org.forgerock.opendj.ldap.schema.Schema;
/** Context for the server, giving access to global properties of the server. */
public interface ServerContext
@@ -127,4 +129,10 @@
*/
CryptoManager getCryptoManager();
+ /**
+ * Returns the UNIX's cron-like executor service.
+ *
+ * @return the UNIX's cron-like executor service
+ */
+ ScheduledExecutorService getCronExecutorService();
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java b/opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java
new file mode 100644
index 0000000..6b74dc8
--- /dev/null
+++ b/opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java
@@ -0,0 +1,384 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.opends.server.util;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.server.api.DirectoryThread;
+
+/**
+ * Implements a {@link ScheduledExecutorService} on top of a {@link Executors#newCachedThreadPool()
+ * cached thread pool} to achieve UNIX's cron-like capabilities.
+ * <p>
+ * This executor service is implemented with two underlying executor services:
+ * <ul>
+ * <li>{@link Executors#newSingleThreadScheduledExecutor() a single-thread scheduled executor} which
+ * allows to start tasks based on a schedule</li>
+ * <li>{@link Executors#newCachedThreadPool() a cached thread pool} which allows to run several
+ * tasks concurrently, adjusting the thread pool size when necessary. In particular, when the number
+ * of tasks to run concurrently is bigger than the tread pool size, then new threads are spawned.
+ * The thread pool is then shrinked when threads sit idle with no tasks to execute.</li>
+ * </ul>
+ * <p>
+ * All the tasks submitted to the current class are 1. scheduled by the single-thread scheduled
+ * executor and 2. finally executed by the cached thread pool.
+ * <p>
+ * Because of this setup, the assumptions of
+ * {@link #scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} cannot be fulfilled, so calling
+ * this method will throw a {@link UnsupportedOperationException}.
+ * <p>
+ *
+ * <p>
+ * Current (Nov. 2016) OpenDJ threads that may be replaced by using the current class:
+ * <ul>
+ * <li>Idle Time Limit Thread</li>
+ * <li>LDAP Connection Finalizer for connection handler Administration Connector</li>
+ * <li>LDAP Connection Finalizer for connection handler LDAP Connection Handler</li>
+ * <li>Monitor Provider State Updater</li>
+ * <li>Task Scheduler Thread</li>
+ * <li>Rejected: Time Thread - high resolution thread</li>
+ * </ul>
+ */
+public class CronExecutorService implements ScheduledExecutorService
+{
+ /** Made necessary by the double executor services. */
+ private static class FutureTaskResult<V> implements ScheduledFuture<V>
+ {
+ private volatile Future<V> executorFuture;
+ private Future<?> schedulerFuture;
+
+ public void setExecutorFuture(Future<V> executorFuture)
+ {
+ this.executorFuture = executorFuture;
+ }
+
+ public void setSchedulerFuture(Future<?> schedulerFuture)
+ {
+ this.schedulerFuture = schedulerFuture;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return schedulerFuture.cancel(mayInterruptIfRunning)
+ | (executorFuture != null && executorFuture.cancel(mayInterruptIfRunning));
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException
+ {
+ schedulerFuture.get();
+ return executorFuture.get();
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ schedulerFuture.get(timeout, unit);
+ return executorFuture.get(timeout, unit);
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return schedulerFuture.isCancelled() || executorFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return schedulerFuture.isDone() && executorFuture.isDone();
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public int compareTo(Delayed o)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+ }
+
+ private final ExecutorService cronExecutorService;
+ private final ScheduledExecutorService cronScheduler;
+
+ /** Default constructor. */
+ public CronExecutorService()
+ {
+ cronExecutorService = Executors.newCachedThreadPool(new ThreadFactory()
+ {
+ private final List<WeakReference<Thread>> workerThreads = new ArrayList<>();
+
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ final Thread t = new DirectoryThread(r, "Cron worker thread - waiting for number");
+ t.setDaemon(true);
+ final int index = addThreadAndReturnWorkerThreadNumber(t);
+ t.setName("Cron worker thread " + index);
+ return t;
+ }
+
+ private synchronized int addThreadAndReturnWorkerThreadNumber(Thread t)
+ {
+ for (final ListIterator<WeakReference<Thread>> it = workerThreads.listIterator(); it.hasNext();)
+ {
+ final WeakReference<Thread> threadRef = it.next();
+ if (threadRef.get() == null)
+ {
+ // Free slot, let's use it
+ final int result = it.previousIndex();
+ it.set(new WeakReference<>(t));
+ return result;
+ }
+ }
+ final int result = workerThreads.size();
+ workerThreads.add(new WeakReference<>(t));
+ return result;
+ }
+ });
+ cronScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory()
+ {
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ Thread t = new DirectoryThread(r, "Cron scheduler thread");
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ }
+
+ @Override
+ public void execute(final Runnable task)
+ {
+ /** Class specialized for this method. */
+ class ExecuteRunnable implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ cronExecutorService.execute(task);
+ }
+ }
+ cronScheduler.execute(new ExecuteRunnable());
+ }
+
+ @Override
+ public Future<?> submit(final Runnable task)
+ {
+ return submit(task, null);
+ }
+
+ @Override
+ public <T> Future<T> submit(final Runnable task, final T result)
+ {
+ final FutureTaskResult<T> futureResult = new FutureTaskResult<>();
+ /** Class specialized for this method. */
+ class SubmitRunnable implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ futureResult.setExecutorFuture(cronExecutorService.submit(task, result));
+ }
+ }
+ futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitRunnable(), null));
+ return futureResult;
+ }
+
+ @Override
+ public <T> Future<T> submit(final Callable<T> task)
+ {
+ final FutureTaskResult<T> futureResult = new FutureTaskResult<>();
+ /** Class specialized for this method. */
+ class SubmitCallableRunnable implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ futureResult.setExecutorFuture(cronExecutorService.submit(task));
+ }
+ }
+ futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitCallableRunnable()));
+ return futureResult;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable task, final long initialDelay, final long period,
+ final TimeUnit unit)
+ {
+ /** Class specialized for this method. */
+ class NoConcurrentRunRunnable implements Runnable
+ {
+ private AtomicBoolean isRunning = new AtomicBoolean();
+
+ @Override
+ public void run()
+ {
+ if (isRunning.compareAndSet(false, true))
+ {
+ try
+ {
+ task.run();
+ }
+ finally
+ {
+ isRunning.set(false);
+ }
+ }
+ }
+ }
+
+ final FutureTaskResult<?> futureResult = new FutureTaskResult<>();
+ /** Class specialized for this method. */
+ class ScheduleAtFixedRateRunnable implements Runnable
+ {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void run()
+ {
+ futureResult.setExecutorFuture((Future) cronExecutorService.submit(new NoConcurrentRunRunnable()));
+ }
+ }
+ futureResult.setSchedulerFuture(
+ cronScheduler.scheduleAtFixedRate(new ScheduleAtFixedRateRunnable(), initialDelay, period, unit));
+ return futureResult;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable task, final long initialDelay, final long delay,
+ final TimeUnit unit)
+ {
+ throw new UnsupportedOperationException("Cannot schedule with fixed delay between each runs of the task");
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> task, final long delay, final TimeUnit unit)
+ {
+ final FutureTaskResult<V> futureResult = new FutureTaskResult<>();
+ /** Class specialized for this method. */
+ class ScheduleRunnable implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ futureResult.setExecutorFuture(cronExecutorService.submit(task));
+ }
+ }
+ futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleRunnable(), delay, unit));
+ return futureResult;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable task, final long delay, final TimeUnit unit)
+ {
+ final FutureTaskResult<?> futureResult = new FutureTaskResult<>();
+ /** Class specialized for this method. */
+ class ScheduleCallableRunnable implements Runnable
+ {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void run()
+ {
+ futureResult.setExecutorFuture((Future) cronExecutorService.submit(task));
+ }
+ }
+ futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleCallableRunnable(), delay, unit));
+ return futureResult;
+ }
+
+ @Override
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout,
+ final TimeUnit unit) throws InterruptedException
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException
+ {
+ return cronScheduler.awaitTermination(timeout, unit) & cronExecutorService.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public void shutdown()
+ {
+ cronScheduler.shutdown();
+ cronExecutorService.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow()
+ {
+ final List<Runnable> results = new ArrayList<>();
+ results.addAll(cronScheduler.shutdownNow());
+ results.addAll(cronExecutorService.shutdownNow());
+ return results;
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return cronScheduler.isShutdown() && cronExecutorService.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return cronScheduler.isTerminated() && cronExecutorService.isTerminated();
+ }
+}
\ No newline at end of file
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java b/opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java
new file mode 100644
index 0000000..6e53031
--- /dev/null
+++ b/opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java
@@ -0,0 +1,164 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.opends.server.util;
+
+import static java.util.concurrent.TimeUnit.*;
+
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.Callable;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.util.TestTimer.CallableVoid;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+@Test(groups = "precommit", sequential = true)
+public class CronExecutorServiceTest extends DirectoryServerTestCase
+{
+ @BeforeClass
+ public void setUp() throws Exception
+ {
+ TestCaseUtils.startServer();
+ }
+
+ @Test
+ public void execute() throws Exception
+ {
+ final Runnable mock = mock(Runnable.class);
+ new CronExecutorService().execute(mock);
+
+ verifyRunnableInvokedOnce(mock);
+ }
+
+ @Test
+ public void submitRunnable() throws Exception
+ {
+ final Runnable mock = mock(Runnable.class);
+ new CronExecutorService().submit(mock);
+
+ verifyRunnableInvokedOnce(mock);
+ }
+
+ @Test
+ public void submitRunnableAndReturn() throws Exception
+ {
+ final Runnable mock = mock(Runnable.class);
+ new CronExecutorService().submit(mock, null);
+
+ verifyRunnableInvokedOnce(mock);
+ }
+
+ @Test
+ public void submitCallable() throws Exception
+ {
+ final Callable<Void> mock = mock(Callable.class);
+ new CronExecutorService().submit(mock);
+
+ verifyCallableInvokedOnce(mock);
+ }
+
+ @Test
+ public void scheduleRunnable() throws Exception
+ {
+ final Runnable mock = mock(Runnable.class);
+ new CronExecutorService().schedule(mock, 200, MILLISECONDS);
+
+ verifyNoMoreInteractions(mock);
+
+ Thread.sleep(SECONDS.toMillis(1));
+
+ verifyRunnableInvokedOnce(mock);
+ }
+
+ @Test
+ public void scheduleCallable() throws Exception
+ {
+ final Callable<Void> mock = mock(Callable.class);
+ new CronExecutorService().schedule(mock, 200, MILLISECONDS);
+
+ verifyNoMoreInteractions(mock);
+
+ Thread.sleep(SECONDS.toMillis(1));
+
+ maxOneSecond().repeatUntilSuccess(new CallableVoid()
+ {
+ @Override
+ public void call() throws Exception
+ {
+ verify(mock, atLeastOnce()).call();
+ verifyNoMoreInteractions(mock);
+ }
+ });
+ }
+
+ @Test
+ public void scheduleAtFixedRate() throws Exception
+ {
+ final Runnable mock = mock(Runnable.class);
+ new CronExecutorService().scheduleAtFixedRate(mock, 0 /* execute immediately */, 200, MILLISECONDS);
+
+ verifyNoMoreInteractions(mock);
+
+ Thread.sleep(MILLISECONDS.toMillis(200));
+
+ maxOneSecond().repeatUntilSuccess(new CallableVoid()
+ {
+ @Override
+ public void call() throws Exception
+ {
+ verify(mock, atLeastOnce()).run();
+ verifyNoMoreInteractions(mock);
+ }
+ });
+ }
+
+ private void verifyRunnableInvokedOnce(final Runnable mock) throws Exception, InterruptedException
+ {
+ maxOneSecond().repeatUntilSuccess(new CallableVoid()
+ {
+ @Override
+ public void call() throws Exception
+ {
+ verify(mock).run();
+ verifyNoMoreInteractions(mock);
+ }
+ });
+ }
+
+ private void verifyCallableInvokedOnce(final Callable<Void> mock) throws Exception, InterruptedException
+ {
+ maxOneSecond().repeatUntilSuccess(new CallableVoid()
+ {
+ @Override
+ public void call() throws Exception
+ {
+ verify(mock).call();
+ verifyNoMoreInteractions(mock);
+ }
+ });
+ }
+
+ private TestTimer maxOneSecond()
+ {
+ return new TestTimer.Builder()
+ .maxSleep(1, SECONDS)
+ .sleepTimes(10, MILLISECONDS)
+ .toTimer();
+ }
+}
--
Gitblit v1.10.0