mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Fabio Pistolesi
08.24.2015 4a371f6717c50f5e327efeeaeeeccd3f82e1f5b0
Partial fix for OPENDJ-2190 Delete not replicated after 36h of add/delete operations on replicated topology

Revise the default number of helper threads in replication.
The number of replication replay threads are insufficient to apply updates coming from replication with respect to requests processed by the frontend.
Same as the worker threads, base their number on the number of CPUs in the system.

On replicated instances, the second instance can now keep up with incoming changes more easily.
The original problem could be seen on the machine I used by running the test ( 2 instances + client) restricted to 8 processors out of 32 (2 x 8 cores + MT).
The first instance goes from 500 to 450 Add/s, because the second instance uses more cpu (from half to two thirds of
what was used by the first to a bit less).
As for the number of worker threads, if run with restricted number and full speed:
Worker threads | All processors | Processor set 0-7
---------------+----------------------+----------------------
2 x Ncpus |700 ADD,100K Searches | 450 ADD, 31K searches
1.5 Ncpus |700 ADD, 76K Searches | 450 ADD, 29K searches

On my laptop (4 core + MT), single instance, I see 32K (2 * Ncpus) instead of 29K (1.5 * Ncpus) searches.
By "Add" we mean "addrate load of add/delete spaced by 10000 entries".
5 files modified
80 ■■■■■ changed files
opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml 12 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java 11 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java 24 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java 16 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java 17 ●●●●● patch | view | raw | blame | history
opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml
@@ -23,7 +23,7 @@
  !
  !
  !      Copyright 2007-2008 Sun Microsystems, Inc.
  !      Portions Copyright 2011 ForgeRock AS
  !      Portions Copyright 2011-2015 ForgeRock AS
  ! -->
<adm:managed-object name="replication-synchronization-provider"
  plural-name="replication-synchronization-providers"
@@ -88,11 +88,11 @@
      received for all the replication domains.
    </adm:description>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>
          10
        </adm:value>
      </adm:defined>
      <adm:alias>
        <adm:synopsis>
          Let the server decide.
        </adm:synopsis>
      </adm:alias>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="1" upper-limit="65535"></adm:integer>
opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java
@@ -22,11 +22,12 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 *      Portions Copyright 2013-2015 ForgeRock AS.
 */
package org.opends.server.api;
import static org.opends.messages.CoreMessages.*;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.i18n.LocalizableMessage;
import org.opends.server.admin.std.server.WorkQueueCfg;
@@ -34,8 +35,7 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Operation;
import org.opends.server.util.Platform;
/**
 * This class defines the structure and methods that must be
@@ -166,11 +166,8 @@
    else
    {
      // Automatically choose based on the number of processors.
      int cpus = Runtime.getRuntime().availableProcessors();
      int value = Math.max(24, cpus * 2);
      int value = Platform.computeNumberOfThreads(16, 2);
      logger.debug(INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL, value);
      return value;
    }
  }
opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java
@@ -62,6 +62,7 @@
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.dbi.MemoryBudget;
import org.opends.server.util.Platform;
/** This class maps JE properties to configuration attributes. */
public class ConfigurableEnvironment
@@ -299,27 +300,26 @@
      {
        Object value = method.invoke(cfg);
        if (attrName.equals(ATTR_NUM_CLEANER_THREADS) && value == null)
        if (value != null)
        {
          return String.valueOf(value);
        }
        if (attrName.equals(ATTR_NUM_CLEANER_THREADS))
        {
          // Automatically choose based on the number of processors. We will use
          // similar heuristics to those used to define the default number of
          // worker threads.
          int cpus = Runtime.getRuntime().availableProcessors();
          value = Integer.valueOf(Math.max(24, cpus * 2));
          value = Platform.computeNumberOfThreads(8, 1.0f);
          logger.debug(INFO_ERGONOMIC_SIZING_OF_JE_CLEANER_THREADS,
              backendId, (Number) value);
        }
        else if (attrName.equals(ATTR_NUM_LOCK_TABLES)
            && value == null)
        else if (attrName.equals(ATTR_NUM_LOCK_TABLES))
        {
          // Automatically choose based on the number of processors.
          // We'll assume that the user has also allowed automatic
          // configuration of cleaners and workers.
          int cpus = Runtime.getRuntime().availableProcessors();
          int cleaners = Math.max(24, cpus * 2);
          int workers = Math.max(24, cpus * 2);
          BigInteger tmp = BigInteger.valueOf((cleaners + workers) * 2);
          // Automatically choose based on the number of processors. We'll assume that the user has also allowed
          // automatic configuration of cleaners and workers.
          BigInteger tmp = BigInteger.valueOf(Platform.computeNumberOfThreads(1, 2));
          value = tmp.nextProbablePrime();
          logger.debug(INFO_ERGONOMIC_SIZING_OF_JE_LOCK_TABLES, backendId, (Number) value);
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -81,6 +81,7 @@
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
import org.opends.server.util.Platform;
/**
 * This class is used to load the Replication code inside the JVM
@@ -277,7 +278,7 @@
    // number of replay threads is changed and apply changes.
    cfg.addReplicationChangeListener(this);
    replayThreadNumber = cfg.getNumUpdateReplayThreads();
    replayThreadNumber = getNumberOfReplayThreadsOrDefault(cfg);
    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
    //  Create the list of domains that are already defined.
@@ -303,6 +304,12 @@
        ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL);
  }
  private int getNumberOfReplayThreadsOrDefault(ReplicationSynchronizationProviderCfg cfg)
  {
    Integer value = cfg.getNumUpdateReplayThreads();
    return value == null ? Platform.computeNumberOfThreads(16, 1.5f) : value;
  }
  /**
   * Create the threads that will wait for incoming update messages.
   */
@@ -780,16 +787,13 @@
    return true;
  }
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationChange(
      ReplicationSynchronizationProviderCfg configuration)
  public ConfigChangeResult applyConfigurationChange(ReplicationSynchronizationProviderCfg configuration)
  {
    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
    // Stop threads then restart new number of threads
    stopReplayThreads();
    replayThreadNumber = numUpdateRepayThread;
    replayThreadNumber = getNumberOfReplayThreadsOrDefault(configuration);
    if (!domains.isEmpty())
    {
      createReplayThreads();
opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
@@ -48,6 +48,7 @@
import java.lang.reflect.Method;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.util.Reject;
import static org.opends.messages.UtilityMessages.*;
import static org.opends.server.util.ServerConstants.CERTANDKEYGEN_PROVIDER;
@@ -609,4 +610,20 @@
  {
    return IMPL.getUsableMemoryForCaching();
  }
  /**
   * Computes the number of replay/worker/cleaner threads based on the number of cpus in the system.
   * Allows for a multiplier to be specified and a minimum value to be returned if not enough processors
   * are present in the system.
   *
   * @param minimumValue at least this value should be returned.
   * @param cpuMultiplier the scaling multiplier of the number of threads to return
   * @return the number of threads based on the number of cpus in the system.
   * @throws IllegalArgumentException if {@code cpuMultiplier} is a non positive number
   */
  public static int computeNumberOfThreads(int minimumValue, float cpuMultiplier)
  {
    Reject.ifTrue(cpuMultiplier < 0, "Multiplier must be a positive number");
    return Math.max(minimumValue, (int)(Runtime.getRuntime().availableProcessors() * cpuMultiplier));
  }
}