From 48c52a7de8179e42aa4cb614a2e5e7f91ce1c79c Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Nov 2016 15:53:49 +0000
Subject: [PATCH] Prep work OPENDJ-3421 Implement ReplicationServiceDiscoverMechanism

---
 opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java         |   13 +
 opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java     |  384 ++++++++++++++++++++++++++++++++++++++
 opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java           |   10 
 opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java |  164 ++++++++++++++++
 4 files changed, 569 insertions(+), 2 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java b/opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java
index c34cd06..8c92539 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java
@@ -17,6 +17,7 @@
 package org.opends.server.core;
 
 import static com.forgerock.opendj.cli.CommonArguments.*;
+
 import static org.forgerock.util.Reject.*;
 import static org.opends.messages.CoreMessages.*;
 import static org.opends.messages.ToolMessages.*;
@@ -54,6 +55,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.MBeanServer;
@@ -68,6 +70,7 @@
 import org.forgerock.opendj.config.server.ServerManagementContext;
 import org.forgerock.opendj.ldap.DN;
 import org.forgerock.opendj.ldap.ResultCode;
+import org.forgerock.opendj.ldap.schema.Schema;
 import org.forgerock.opendj.server.config.server.AlertHandlerCfg;
 import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
 import org.forgerock.opendj.server.config.server.CryptoManagerCfg;
@@ -152,10 +155,10 @@
 import org.opends.server.types.Operation;
 import org.opends.server.types.Privilege;
 import org.opends.server.types.RestoreConfig;
-import org.forgerock.opendj.ldap.schema.Schema;
 import org.opends.server.types.VirtualAttributeRule;
 import org.opends.server.types.WritabilityMode;
 import org.opends.server.util.BuildVersion;
+import org.opends.server.util.CronExecutorService;
 import org.opends.server.util.MultiOutputStream;
 import org.opends.server.util.RuntimeInformation;
 import org.opends.server.util.SetupUtils;
@@ -646,6 +649,7 @@
   private CommonAudit commonAudit;
 
   private Router httpRouter;
+  private CronExecutorService cronExecutorService;
 
   /** Class that prints the version of OpenDJ server to System.out. */
   public static final class DirectoryServerVersionHandler implements VersionHandler
@@ -1054,6 +1058,12 @@
     {
       return directoryServer.cryptoManager;
     }
+
+    @Override
+    public ScheduledExecutorService getCronExecutorService()
+    {
+      return directoryServer.cronExecutorService;
+    }
   }
 
   /**
@@ -1461,6 +1471,7 @@
 
       commonAudit = new CommonAudit(serverContext);
       httpRouter = new Router();
+      cronExecutorService = new CronExecutorService();
 
       // Allow internal plugins to be registered.
       pluginConfigManager.initializePluginConfigManager();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java b/opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java
index e17fad3..22e06ab 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java
@@ -15,15 +15,17 @@
  */
 package org.opends.server.core;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.forgerock.http.routing.Router;
 import org.forgerock.opendj.config.server.ServerManagementContext;
+import org.forgerock.opendj.ldap.schema.Schema;
 import org.forgerock.opendj.server.config.server.RootCfg;
 import org.opends.server.extensions.DiskSpaceMonitor;
 import org.opends.server.loggers.CommonAudit;
 import org.opends.server.schema.SchemaHandler;
 import org.opends.server.types.CryptoManager;
 import org.opends.server.types.DirectoryEnvironmentConfig;
-import org.forgerock.opendj.ldap.schema.Schema;
 
 /** Context for the server, giving access to global properties of the server. */
 public interface ServerContext
@@ -127,4 +129,10 @@
    */
   CryptoManager getCryptoManager();
 
+  /**
+   * Returns the UNIX's cron-like executor service.
+   *
+   * @return the UNIX's cron-like executor service
+   */
+  ScheduledExecutorService getCronExecutorService();
 }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java b/opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java
new file mode 100644
index 0000000..6b74dc8
--- /dev/null
+++ b/opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java
@@ -0,0 +1,384 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.opends.server.util;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.server.api.DirectoryThread;
+
+/**
+ * Implements a {@link ScheduledExecutorService} on top of a {@link Executors#newCachedThreadPool()
+ * cached thread pool} to achieve UNIX's cron-like capabilities.
+ * <p>
+ * This executor service is implemented with two underlying executor services:
+ * <ul>
+ * <li>{@link Executors#newSingleThreadScheduledExecutor() a single-thread scheduled executor} which
+ * allows to start tasks based on a schedule</li>
+ * <li>{@link Executors#newCachedThreadPool() a cached thread pool} which allows to run several
+ * tasks concurrently, adjusting the thread pool size when necessary. In particular, when the number
+ * of tasks to run concurrently is bigger than the tread pool size, then new threads are spawned.
+ * The thread pool is then shrinked when threads sit idle with no tasks to execute.</li>
+ * </ul>
+ * <p>
+ * All the tasks submitted to the current class are 1. scheduled by the single-thread scheduled
+ * executor and 2. finally executed by the cached thread pool.
+ * <p>
+ * Because of this setup, the assumptions of
+ * {@link #scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} cannot be fulfilled, so calling
+ * this method will throw a {@link UnsupportedOperationException}.
+ * <p>
+ * &nbsp;
+ * <p>
+ * Current (Nov. 2016) OpenDJ threads that may be replaced by using the current class:
+ * <ul>
+ * <li>Idle Time Limit Thread</li>
+ * <li>LDAP Connection Finalizer for connection handler Administration Connector</li>
+ * <li>LDAP Connection Finalizer for connection handler LDAP Connection Handler</li>
+ * <li>Monitor Provider State Updater</li>
+ * <li>Task Scheduler Thread</li>
+ * <li>Rejected: Time Thread - high resolution thread</li>
+ * </ul>
+ */
+public class CronExecutorService implements ScheduledExecutorService
+{
+  /** Made necessary by the double executor services. */
+  private static class FutureTaskResult<V> implements ScheduledFuture<V>
+  {
+    private volatile Future<V> executorFuture;
+    private Future<?> schedulerFuture;
+
+    public void setExecutorFuture(Future<V> executorFuture)
+    {
+      this.executorFuture = executorFuture;
+    }
+
+    public void setSchedulerFuture(Future<?> schedulerFuture)
+    {
+      this.schedulerFuture = schedulerFuture;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning)
+    {
+      return schedulerFuture.cancel(mayInterruptIfRunning)
+          | (executorFuture != null && executorFuture.cancel(mayInterruptIfRunning));
+    }
+
+    @Override
+    public V get() throws InterruptedException, ExecutionException
+    {
+      schedulerFuture.get();
+      return executorFuture.get();
+    }
+
+    @Override
+    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+    {
+      schedulerFuture.get(timeout, unit);
+      return executorFuture.get(timeout, unit);
+    }
+
+    @Override
+    public boolean isCancelled()
+    {
+      return schedulerFuture.isCancelled() || executorFuture.isCancelled();
+    }
+
+    @Override
+    public boolean isDone()
+    {
+      return schedulerFuture.isDone() && executorFuture.isDone();
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit)
+    {
+      throw new UnsupportedOperationException("Not implemented");
+    }
+
+    @Override
+    public int compareTo(Delayed o)
+    {
+      throw new UnsupportedOperationException("Not implemented");
+    }
+  }
+
+  private final ExecutorService cronExecutorService;
+  private final ScheduledExecutorService cronScheduler;
+
+  /** Default constructor. */
+  public CronExecutorService()
+  {
+    cronExecutorService = Executors.newCachedThreadPool(new ThreadFactory()
+    {
+      private final List<WeakReference<Thread>> workerThreads = new ArrayList<>();
+
+      @Override
+      public Thread newThread(Runnable r)
+      {
+        final Thread t = new DirectoryThread(r, "Cron worker thread - waiting for number");
+        t.setDaemon(true);
+        final int index = addThreadAndReturnWorkerThreadNumber(t);
+        t.setName("Cron worker thread " + index);
+        return t;
+      }
+
+      private synchronized int addThreadAndReturnWorkerThreadNumber(Thread t)
+      {
+        for (final ListIterator<WeakReference<Thread>> it = workerThreads.listIterator(); it.hasNext();)
+        {
+          final WeakReference<Thread> threadRef = it.next();
+          if (threadRef.get() == null)
+          {
+            // Free slot, let's use it
+            final int result = it.previousIndex();
+            it.set(new WeakReference<>(t));
+            return result;
+          }
+        }
+        final int result = workerThreads.size();
+        workerThreads.add(new WeakReference<>(t));
+        return result;
+      }
+    });
+    cronScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory()
+    {
+      @Override
+      public Thread newThread(final Runnable r)
+      {
+        Thread t = new DirectoryThread(r, "Cron scheduler thread");
+        t.setDaemon(true);
+        return t;
+      }
+    });
+  }
+
+  @Override
+  public void execute(final Runnable task)
+  {
+    /** Class specialized for this method. */
+    class ExecuteRunnable implements Runnable
+    {
+      @Override
+      public void run()
+      {
+        cronExecutorService.execute(task);
+      }
+    }
+    cronScheduler.execute(new ExecuteRunnable());
+  }
+
+  @Override
+  public Future<?> submit(final Runnable task)
+  {
+    return submit(task, null);
+  }
+
+  @Override
+  public <T> Future<T> submit(final Runnable task, final T result)
+  {
+    final FutureTaskResult<T> futureResult = new FutureTaskResult<>();
+    /** Class specialized for this method. */
+    class SubmitRunnable implements Runnable
+    {
+      @Override
+      public void run()
+      {
+        futureResult.setExecutorFuture(cronExecutorService.submit(task, result));
+      }
+    }
+    futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitRunnable(), null));
+    return futureResult;
+  }
+
+  @Override
+  public <T> Future<T> submit(final Callable<T> task)
+  {
+    final FutureTaskResult<T> futureResult = new FutureTaskResult<>();
+    /** Class specialized for this method. */
+    class SubmitCallableRunnable implements Runnable
+    {
+      @Override
+      public void run()
+      {
+        futureResult.setExecutorFuture(cronExecutorService.submit(task));
+      }
+    }
+    futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitCallableRunnable()));
+    return futureResult;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(final Runnable task, final long initialDelay, final long period,
+      final TimeUnit unit)
+  {
+    /** Class specialized for this method. */
+    class NoConcurrentRunRunnable implements Runnable
+    {
+      private AtomicBoolean isRunning = new AtomicBoolean();
+
+      @Override
+      public void run()
+      {
+        if (isRunning.compareAndSet(false, true))
+        {
+          try
+          {
+            task.run();
+          }
+          finally
+          {
+            isRunning.set(false);
+          }
+        }
+      }
+    }
+
+    final FutureTaskResult<?> futureResult = new FutureTaskResult<>();
+    /** Class specialized for this method. */
+    class ScheduleAtFixedRateRunnable implements Runnable
+    {
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      @Override
+      public void run()
+      {
+        futureResult.setExecutorFuture((Future) cronExecutorService.submit(new NoConcurrentRunRunnable()));
+      }
+    }
+    futureResult.setSchedulerFuture(
+        cronScheduler.scheduleAtFixedRate(new ScheduleAtFixedRateRunnable(), initialDelay, period, unit));
+    return futureResult;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable task, final long initialDelay, final long delay,
+      final TimeUnit unit)
+  {
+    throw new UnsupportedOperationException("Cannot schedule with fixed delay between each runs of the task");
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(final Callable<V> task, final long delay, final TimeUnit unit)
+  {
+    final FutureTaskResult<V> futureResult = new FutureTaskResult<>();
+    /** Class specialized for this method. */
+    class ScheduleRunnable implements Runnable
+    {
+      @Override
+      public void run()
+      {
+        futureResult.setExecutorFuture(cronExecutorService.submit(task));
+      }
+    }
+    futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleRunnable(), delay, unit));
+    return futureResult;
+  }
+
+  @Override
+  public ScheduledFuture<?> schedule(final Runnable task, final long delay, final TimeUnit unit)
+  {
+    final FutureTaskResult<?> futureResult = new FutureTaskResult<>();
+    /** Class specialized for this method. */
+    class ScheduleCallableRunnable implements Runnable
+    {
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      @Override
+      public void run()
+      {
+        futureResult.setExecutorFuture((Future) cronExecutorService.submit(task));
+      }
+    }
+    futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleCallableRunnable(), delay, unit));
+    return futureResult;
+  }
+
+  @Override
+  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException
+  {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+  {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout,
+      final TimeUnit unit) throws InterruptedException
+  {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException
+  {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException
+  {
+    return cronScheduler.awaitTermination(timeout, unit) & cronExecutorService.awaitTermination(timeout, unit);
+  }
+
+  @Override
+  public void shutdown()
+  {
+    cronScheduler.shutdown();
+    cronExecutorService.shutdown();
+  }
+
+  @Override
+  public List<Runnable> shutdownNow()
+  {
+    final List<Runnable> results = new ArrayList<>();
+    results.addAll(cronScheduler.shutdownNow());
+    results.addAll(cronExecutorService.shutdownNow());
+    return results;
+  }
+
+  @Override
+  public boolean isShutdown()
+  {
+    return cronScheduler.isShutdown() && cronExecutorService.isShutdown();
+  }
+
+  @Override
+  public boolean isTerminated()
+  {
+    return cronScheduler.isTerminated() && cronExecutorService.isTerminated();
+  }
+}
\ No newline at end of file
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java b/opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java
new file mode 100644
index 0000000..6e53031
--- /dev/null
+++ b/opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java
@@ -0,0 +1,164 @@
+/*
+ * The contents of this file are subject to the terms of the Common Development and
+ * Distribution License (the License). You may not use this file except in compliance with the
+ * License.
+ *
+ * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
+ * specific language governing permission and limitations under the License.
+ *
+ * When distributing Covered Software, include this CDDL Header Notice in each file and include
+ * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
+ * Header, with the fields enclosed by brackets [] replaced by your own identifying
+ * information: "Portions Copyright [year] [name of copyright owner]".
+ *
+ * Copyright 2016 ForgeRock AS.
+ */
+package org.opends.server.util;
+
+import static java.util.concurrent.TimeUnit.*;
+
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.Callable;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.util.TestTimer.CallableVoid;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+@Test(groups = "precommit", sequential = true)
+public class CronExecutorServiceTest extends DirectoryServerTestCase
+{
+  @BeforeClass
+  public void setUp() throws Exception
+  {
+    TestCaseUtils.startServer();
+  }
+
+  @Test
+  public void execute() throws Exception
+  {
+    final Runnable mock = mock(Runnable.class);
+    new CronExecutorService().execute(mock);
+
+    verifyRunnableInvokedOnce(mock);
+  }
+
+  @Test
+  public void submitRunnable() throws Exception
+  {
+    final Runnable mock = mock(Runnable.class);
+    new CronExecutorService().submit(mock);
+
+    verifyRunnableInvokedOnce(mock);
+  }
+
+  @Test
+  public void submitRunnableAndReturn() throws Exception
+  {
+    final Runnable mock = mock(Runnable.class);
+    new CronExecutorService().submit(mock, null);
+
+    verifyRunnableInvokedOnce(mock);
+  }
+
+  @Test
+  public void submitCallable() throws Exception
+  {
+    final Callable<Void> mock = mock(Callable.class);
+    new CronExecutorService().submit(mock);
+
+    verifyCallableInvokedOnce(mock);
+  }
+
+  @Test
+  public void scheduleRunnable() throws Exception
+  {
+    final Runnable mock = mock(Runnable.class);
+    new CronExecutorService().schedule(mock, 200, MILLISECONDS);
+
+    verifyNoMoreInteractions(mock);
+
+    Thread.sleep(SECONDS.toMillis(1));
+
+    verifyRunnableInvokedOnce(mock);
+  }
+
+  @Test
+  public void scheduleCallable() throws Exception
+  {
+    final Callable<Void> mock = mock(Callable.class);
+    new CronExecutorService().schedule(mock, 200, MILLISECONDS);
+
+    verifyNoMoreInteractions(mock);
+
+    Thread.sleep(SECONDS.toMillis(1));
+
+    maxOneSecond().repeatUntilSuccess(new CallableVoid()
+    {
+      @Override
+      public void call() throws Exception
+      {
+        verify(mock, atLeastOnce()).call();
+        verifyNoMoreInteractions(mock);
+      }
+    });
+  }
+
+  @Test
+  public void scheduleAtFixedRate() throws Exception
+  {
+    final Runnable mock = mock(Runnable.class);
+    new CronExecutorService().scheduleAtFixedRate(mock, 0 /* execute immediately */, 200, MILLISECONDS);
+
+    verifyNoMoreInteractions(mock);
+
+    Thread.sleep(MILLISECONDS.toMillis(200));
+
+    maxOneSecond().repeatUntilSuccess(new CallableVoid()
+    {
+      @Override
+      public void call() throws Exception
+      {
+        verify(mock, atLeastOnce()).run();
+        verifyNoMoreInteractions(mock);
+      }
+    });
+  }
+
+  private void verifyRunnableInvokedOnce(final Runnable mock) throws Exception, InterruptedException
+  {
+    maxOneSecond().repeatUntilSuccess(new CallableVoid()
+    {
+      @Override
+      public void call() throws Exception
+      {
+        verify(mock).run();
+        verifyNoMoreInteractions(mock);
+      }
+    });
+  }
+
+  private void verifyCallableInvokedOnce(final Callable<Void> mock) throws Exception, InterruptedException
+  {
+    maxOneSecond().repeatUntilSuccess(new CallableVoid()
+    {
+      @Override
+      public void call() throws Exception
+      {
+        verify(mock).call();
+        verifyNoMoreInteractions(mock);
+      }
+    });
+  }
+
+  private TestTimer maxOneSecond()
+  {
+    return new TestTimer.Builder()
+        .maxSleep(1, SECONDS)
+        .sleepTimes(10, MILLISECONDS)
+        .toTimer();
+  }
+}

--
Gitblit v1.10.0