mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
31.10.2016 48c52a7de8179e42aa4cb614a2e5e7f91ce1c79c
Prep work OPENDJ-3421 Implement ReplicationServiceDiscoverMechanism

Add CronExecutorService
2 files added
2 files modified
571 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java 13 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java 10 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java 384 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java 164 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/core/DirectoryServer.java
@@ -17,6 +17,7 @@
package org.opends.server.core;
import static com.forgerock.opendj.cli.CommonArguments.*;
import static org.forgerock.util.Reject.*;
import static org.opends.messages.CoreMessages.*;
import static org.opends.messages.ToolMessages.*;
@@ -54,6 +55,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
@@ -68,6 +70,7 @@
import org.forgerock.opendj.config.server.ServerManagementContext;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.schema.Schema;
import org.forgerock.opendj.server.config.server.AlertHandlerCfg;
import org.forgerock.opendj.server.config.server.ConnectionHandlerCfg;
import org.forgerock.opendj.server.config.server.CryptoManagerCfg;
@@ -152,10 +155,10 @@
import org.opends.server.types.Operation;
import org.opends.server.types.Privilege;
import org.opends.server.types.RestoreConfig;
import org.forgerock.opendj.ldap.schema.Schema;
import org.opends.server.types.VirtualAttributeRule;
import org.opends.server.types.WritabilityMode;
import org.opends.server.util.BuildVersion;
import org.opends.server.util.CronExecutorService;
import org.opends.server.util.MultiOutputStream;
import org.opends.server.util.RuntimeInformation;
import org.opends.server.util.SetupUtils;
@@ -646,6 +649,7 @@
  private CommonAudit commonAudit;
  private Router httpRouter;
  private CronExecutorService cronExecutorService;
  /** Class that prints the version of OpenDJ server to System.out. */
  public static final class DirectoryServerVersionHandler implements VersionHandler
@@ -1054,6 +1058,12 @@
    {
      return directoryServer.cryptoManager;
    }
    @Override
    public ScheduledExecutorService getCronExecutorService()
    {
      return directoryServer.cronExecutorService;
    }
  }
  /**
@@ -1461,6 +1471,7 @@
      commonAudit = new CommonAudit(serverContext);
      httpRouter = new Router();
      cronExecutorService = new CronExecutorService();
      // Allow internal plugins to be registered.
      pluginConfigManager.initializePluginConfigManager();
opendj-server-legacy/src/main/java/org/opends/server/core/ServerContext.java
@@ -15,15 +15,17 @@
 */
package org.opends.server.core;
import java.util.concurrent.ScheduledExecutorService;
import org.forgerock.http.routing.Router;
import org.forgerock.opendj.config.server.ServerManagementContext;
import org.forgerock.opendj.ldap.schema.Schema;
import org.forgerock.opendj.server.config.server.RootCfg;
import org.opends.server.extensions.DiskSpaceMonitor;
import org.opends.server.loggers.CommonAudit;
import org.opends.server.schema.SchemaHandler;
import org.opends.server.types.CryptoManager;
import org.opends.server.types.DirectoryEnvironmentConfig;
import org.forgerock.opendj.ldap.schema.Schema;
/** Context for the server, giving access to global properties of the server. */
public interface ServerContext
@@ -127,4 +129,10 @@
   */
  CryptoManager getCryptoManager();
  /**
   * Returns the UNIX's cron-like executor service.
   *
   * @return the UNIX's cron-like executor service
   */
  ScheduledExecutorService getCronExecutorService();
}
opendj-server-legacy/src/main/java/org/opends/server/util/CronExecutorService.java
New file
@@ -0,0 +1,384 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.opends.server.util;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.api.DirectoryThread;
/**
 * Implements a {@link ScheduledExecutorService} on top of a {@link Executors#newCachedThreadPool()
 * cached thread pool} to achieve UNIX's cron-like capabilities.
 * <p>
 * This executor service is implemented with two underlying executor services:
 * <ul>
 * <li>{@link Executors#newSingleThreadScheduledExecutor() a single-thread scheduled executor} which
 * allows to start tasks based on a schedule</li>
 * <li>{@link Executors#newCachedThreadPool() a cached thread pool} which allows to run several
 * tasks concurrently, adjusting the thread pool size when necessary. In particular, when the number
 * of tasks to run concurrently is bigger than the tread pool size, then new threads are spawned.
 * The thread pool is then shrinked when threads sit idle with no tasks to execute.</li>
 * </ul>
 * <p>
 * All the tasks submitted to the current class are 1. scheduled by the single-thread scheduled
 * executor and 2. finally executed by the cached thread pool.
 * <p>
 * Because of this setup, the assumptions of
 * {@link #scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} cannot be fulfilled, so calling
 * this method will throw a {@link UnsupportedOperationException}.
 * <p>
 * &nbsp;
 * <p>
 * Current (Nov. 2016) OpenDJ threads that may be replaced by using the current class:
 * <ul>
 * <li>Idle Time Limit Thread</li>
 * <li>LDAP Connection Finalizer for connection handler Administration Connector</li>
 * <li>LDAP Connection Finalizer for connection handler LDAP Connection Handler</li>
 * <li>Monitor Provider State Updater</li>
 * <li>Task Scheduler Thread</li>
 * <li>Rejected: Time Thread - high resolution thread</li>
 * </ul>
 */
public class CronExecutorService implements ScheduledExecutorService
{
  /** Made necessary by the double executor services. */
  private static class FutureTaskResult<V> implements ScheduledFuture<V>
  {
    private volatile Future<V> executorFuture;
    private Future<?> schedulerFuture;
    public void setExecutorFuture(Future<V> executorFuture)
    {
      this.executorFuture = executorFuture;
    }
    public void setSchedulerFuture(Future<?> schedulerFuture)
    {
      this.schedulerFuture = schedulerFuture;
    }
    @Override
    public boolean cancel(boolean mayInterruptIfRunning)
    {
      return schedulerFuture.cancel(mayInterruptIfRunning)
          | (executorFuture != null && executorFuture.cancel(mayInterruptIfRunning));
    }
    @Override
    public V get() throws InterruptedException, ExecutionException
    {
      schedulerFuture.get();
      return executorFuture.get();
    }
    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
    {
      schedulerFuture.get(timeout, unit);
      return executorFuture.get(timeout, unit);
    }
    @Override
    public boolean isCancelled()
    {
      return schedulerFuture.isCancelled() || executorFuture.isCancelled();
    }
    @Override
    public boolean isDone()
    {
      return schedulerFuture.isDone() && executorFuture.isDone();
    }
    @Override
    public long getDelay(TimeUnit unit)
    {
      throw new UnsupportedOperationException("Not implemented");
    }
    @Override
    public int compareTo(Delayed o)
    {
      throw new UnsupportedOperationException("Not implemented");
    }
  }
  private final ExecutorService cronExecutorService;
  private final ScheduledExecutorService cronScheduler;
  /** Default constructor. */
  public CronExecutorService()
  {
    cronExecutorService = Executors.newCachedThreadPool(new ThreadFactory()
    {
      private final List<WeakReference<Thread>> workerThreads = new ArrayList<>();
      @Override
      public Thread newThread(Runnable r)
      {
        final Thread t = new DirectoryThread(r, "Cron worker thread - waiting for number");
        t.setDaemon(true);
        final int index = addThreadAndReturnWorkerThreadNumber(t);
        t.setName("Cron worker thread " + index);
        return t;
      }
      private synchronized int addThreadAndReturnWorkerThreadNumber(Thread t)
      {
        for (final ListIterator<WeakReference<Thread>> it = workerThreads.listIterator(); it.hasNext();)
        {
          final WeakReference<Thread> threadRef = it.next();
          if (threadRef.get() == null)
          {
            // Free slot, let's use it
            final int result = it.previousIndex();
            it.set(new WeakReference<>(t));
            return result;
          }
        }
        final int result = workerThreads.size();
        workerThreads.add(new WeakReference<>(t));
        return result;
      }
    });
    cronScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory()
    {
      @Override
      public Thread newThread(final Runnable r)
      {
        Thread t = new DirectoryThread(r, "Cron scheduler thread");
        t.setDaemon(true);
        return t;
      }
    });
  }
  @Override
  public void execute(final Runnable task)
  {
    /** Class specialized for this method. */
    class ExecuteRunnable implements Runnable
    {
      @Override
      public void run()
      {
        cronExecutorService.execute(task);
      }
    }
    cronScheduler.execute(new ExecuteRunnable());
  }
  @Override
  public Future<?> submit(final Runnable task)
  {
    return submit(task, null);
  }
  @Override
  public <T> Future<T> submit(final Runnable task, final T result)
  {
    final FutureTaskResult<T> futureResult = new FutureTaskResult<>();
    /** Class specialized for this method. */
    class SubmitRunnable implements Runnable
    {
      @Override
      public void run()
      {
        futureResult.setExecutorFuture(cronExecutorService.submit(task, result));
      }
    }
    futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitRunnable(), null));
    return futureResult;
  }
  @Override
  public <T> Future<T> submit(final Callable<T> task)
  {
    final FutureTaskResult<T> futureResult = new FutureTaskResult<>();
    /** Class specialized for this method. */
    class SubmitCallableRunnable implements Runnable
    {
      @Override
      public void run()
      {
        futureResult.setExecutorFuture(cronExecutorService.submit(task));
      }
    }
    futureResult.setSchedulerFuture(cronScheduler.submit(new SubmitCallableRunnable()));
    return futureResult;
  }
  @Override
  public ScheduledFuture<?> scheduleAtFixedRate(final Runnable task, final long initialDelay, final long period,
      final TimeUnit unit)
  {
    /** Class specialized for this method. */
    class NoConcurrentRunRunnable implements Runnable
    {
      private AtomicBoolean isRunning = new AtomicBoolean();
      @Override
      public void run()
      {
        if (isRunning.compareAndSet(false, true))
        {
          try
          {
            task.run();
          }
          finally
          {
            isRunning.set(false);
          }
        }
      }
    }
    final FutureTaskResult<?> futureResult = new FutureTaskResult<>();
    /** Class specialized for this method. */
    class ScheduleAtFixedRateRunnable implements Runnable
    {
      @SuppressWarnings({ "rawtypes", "unchecked" })
      @Override
      public void run()
      {
        futureResult.setExecutorFuture((Future) cronExecutorService.submit(new NoConcurrentRunRunnable()));
      }
    }
    futureResult.setSchedulerFuture(
        cronScheduler.scheduleAtFixedRate(new ScheduleAtFixedRateRunnable(), initialDelay, period, unit));
    return futureResult;
  }
  @Override
  public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable task, final long initialDelay, final long delay,
      final TimeUnit unit)
  {
    throw new UnsupportedOperationException("Cannot schedule with fixed delay between each runs of the task");
  }
  @Override
  public <V> ScheduledFuture<V> schedule(final Callable<V> task, final long delay, final TimeUnit unit)
  {
    final FutureTaskResult<V> futureResult = new FutureTaskResult<>();
    /** Class specialized for this method. */
    class ScheduleRunnable implements Runnable
    {
      @Override
      public void run()
      {
        futureResult.setExecutorFuture(cronExecutorService.submit(task));
      }
    }
    futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleRunnable(), delay, unit));
    return futureResult;
  }
  @Override
  public ScheduledFuture<?> schedule(final Runnable task, final long delay, final TimeUnit unit)
  {
    final FutureTaskResult<?> futureResult = new FutureTaskResult<>();
    /** Class specialized for this method. */
    class ScheduleCallableRunnable implements Runnable
    {
      @SuppressWarnings({ "rawtypes", "unchecked" })
      @Override
      public void run()
      {
        futureResult.setExecutorFuture((Future) cronExecutorService.submit(task));
      }
    }
    futureResult.setSchedulerFuture(cronScheduler.schedule(new ScheduleCallableRunnable(), delay, unit));
    return futureResult;
  }
  @Override
  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
      throws InterruptedException, ExecutionException, TimeoutException
  {
    throw new UnsupportedOperationException("Not implemented");
  }
  @Override
  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
  {
    throw new UnsupportedOperationException("Not implemented");
  }
  @Override
  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout,
      final TimeUnit unit) throws InterruptedException
  {
    throw new UnsupportedOperationException("Not implemented");
  }
  @Override
  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException
  {
    throw new UnsupportedOperationException("Not implemented");
  }
  @Override
  public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException
  {
    return cronScheduler.awaitTermination(timeout, unit) & cronExecutorService.awaitTermination(timeout, unit);
  }
  @Override
  public void shutdown()
  {
    cronScheduler.shutdown();
    cronExecutorService.shutdown();
  }
  @Override
  public List<Runnable> shutdownNow()
  {
    final List<Runnable> results = new ArrayList<>();
    results.addAll(cronScheduler.shutdownNow());
    results.addAll(cronExecutorService.shutdownNow());
    return results;
  }
  @Override
  public boolean isShutdown()
  {
    return cronScheduler.isShutdown() && cronExecutorService.isShutdown();
  }
  @Override
  public boolean isTerminated()
  {
    return cronScheduler.isTerminated() && cronExecutorService.isTerminated();
  }
}
opendj-server-legacy/src/test/java/org/opends/server/util/CronExecutorServiceTest.java
New file
@@ -0,0 +1,164 @@
/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.opends.server.util;
import static java.util.concurrent.TimeUnit.*;
import static org.mockito.Mockito.*;
import java.util.concurrent.Callable;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.util.TestTimer.CallableVoid;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@SuppressWarnings("javadoc")
@Test(groups = "precommit", sequential = true)
public class CronExecutorServiceTest extends DirectoryServerTestCase
{
  @BeforeClass
  public void setUp() throws Exception
  {
    TestCaseUtils.startServer();
  }
  @Test
  public void execute() throws Exception
  {
    final Runnable mock = mock(Runnable.class);
    new CronExecutorService().execute(mock);
    verifyRunnableInvokedOnce(mock);
  }
  @Test
  public void submitRunnable() throws Exception
  {
    final Runnable mock = mock(Runnable.class);
    new CronExecutorService().submit(mock);
    verifyRunnableInvokedOnce(mock);
  }
  @Test
  public void submitRunnableAndReturn() throws Exception
  {
    final Runnable mock = mock(Runnable.class);
    new CronExecutorService().submit(mock, null);
    verifyRunnableInvokedOnce(mock);
  }
  @Test
  public void submitCallable() throws Exception
  {
    final Callable<Void> mock = mock(Callable.class);
    new CronExecutorService().submit(mock);
    verifyCallableInvokedOnce(mock);
  }
  @Test
  public void scheduleRunnable() throws Exception
  {
    final Runnable mock = mock(Runnable.class);
    new CronExecutorService().schedule(mock, 200, MILLISECONDS);
    verifyNoMoreInteractions(mock);
    Thread.sleep(SECONDS.toMillis(1));
    verifyRunnableInvokedOnce(mock);
  }
  @Test
  public void scheduleCallable() throws Exception
  {
    final Callable<Void> mock = mock(Callable.class);
    new CronExecutorService().schedule(mock, 200, MILLISECONDS);
    verifyNoMoreInteractions(mock);
    Thread.sleep(SECONDS.toMillis(1));
    maxOneSecond().repeatUntilSuccess(new CallableVoid()
    {
      @Override
      public void call() throws Exception
      {
        verify(mock, atLeastOnce()).call();
        verifyNoMoreInteractions(mock);
      }
    });
  }
  @Test
  public void scheduleAtFixedRate() throws Exception
  {
    final Runnable mock = mock(Runnable.class);
    new CronExecutorService().scheduleAtFixedRate(mock, 0 /* execute immediately */, 200, MILLISECONDS);
    verifyNoMoreInteractions(mock);
    Thread.sleep(MILLISECONDS.toMillis(200));
    maxOneSecond().repeatUntilSuccess(new CallableVoid()
    {
      @Override
      public void call() throws Exception
      {
        verify(mock, atLeastOnce()).run();
        verifyNoMoreInteractions(mock);
      }
    });
  }
  private void verifyRunnableInvokedOnce(final Runnable mock) throws Exception, InterruptedException
  {
    maxOneSecond().repeatUntilSuccess(new CallableVoid()
    {
      @Override
      public void call() throws Exception
      {
        verify(mock).run();
        verifyNoMoreInteractions(mock);
      }
    });
  }
  private void verifyCallableInvokedOnce(final Callable<Void> mock) throws Exception, InterruptedException
  {
    maxOneSecond().repeatUntilSuccess(new CallableVoid()
    {
      @Override
      public void call() throws Exception
      {
        verify(mock).call();
        verifyNoMoreInteractions(mock);
      }
    });
  }
  private TestTimer maxOneSecond()
  {
    return new TestTimer.Builder()
        .maxSleep(1, SECONDS)
        .sleepTimes(10, MILLISECONDS)
        .toTimer();
  }
}