From 4a371f6717c50f5e327efeeaeeeccd3f82e1f5b0 Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Thu, 10 Dec 2015 10:38:32 +0000
Subject: [PATCH] Partial fix for OPENDJ-2190 Delete not replicated after 36h of add/delete operations on replicated topology

---
 opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java                                                                  |   11 ++---
 opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java                                                                  |   17 ++++++++
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java                                      |   16 +++++---
 opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml |   12 +++---
 opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java                                           |   24 ++++++------
 5 files changed, 49 insertions(+), 31 deletions(-)

diff --git a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml
index f15fce5..7af8f89 100644
--- a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml
+++ b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml
@@ -23,7 +23,7 @@
   !
   !
   !      Copyright 2007-2008 Sun Microsystems, Inc.
-  !      Portions Copyright 2011 ForgeRock AS
+  !      Portions Copyright 2011-2015 ForgeRock AS
   ! -->
 <adm:managed-object name="replication-synchronization-provider"
   plural-name="replication-synchronization-providers"
@@ -88,11 +88,11 @@
       received for all the replication domains.
     </adm:description>
     <adm:default-behavior>
-      <adm:defined>
-        <adm:value>
-          10
-        </adm:value>
-      </adm:defined>
+      <adm:alias>
+        <adm:synopsis>
+          Let the server decide.
+        </adm:synopsis>
+      </adm:alias>
     </adm:default-behavior>
     <adm:syntax>
       <adm:integer lower-limit="1" upper-limit="65535"></adm:integer>
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java b/opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java
index a1815a4..d6800f5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java
@@ -22,11 +22,12 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2013-2014 ForgeRock AS.
+ *      Portions Copyright 2013-2015 ForgeRock AS.
  */
 package org.opends.server.api;
 
 import static org.opends.messages.CoreMessages.*;
+
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.i18n.LocalizableMessage;
 import org.opends.server.admin.std.server.WorkQueueCfg;
@@ -34,8 +35,7 @@
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.Operation;
-
-
+import org.opends.server.util.Platform;
 
 /**
  * This class defines the structure and methods that must be
@@ -166,11 +166,8 @@
     else
     {
       // Automatically choose based on the number of processors.
-      int cpus = Runtime.getRuntime().availableProcessors();
-      int value = Math.max(24, cpus * 2);
-
+      int value = Platform.computeNumberOfThreads(16, 2);
       logger.debug(INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL, value);
-
       return value;
     }
   }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java
index cf7511f..d1ad6aa 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java
@@ -62,6 +62,7 @@
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.EnvironmentConfig;
 import com.sleepycat.je.dbi.MemoryBudget;
+import org.opends.server.util.Platform;
 
 /** This class maps JE properties to configuration attributes. */
 public class ConfigurableEnvironment
@@ -299,27 +300,26 @@
       {
         Object value = method.invoke(cfg);
 
-        if (attrName.equals(ATTR_NUM_CLEANER_THREADS) && value == null)
+        if (value != null)
+        {
+          return String.valueOf(value);
+        }
+
+        if (attrName.equals(ATTR_NUM_CLEANER_THREADS))
         {
           // Automatically choose based on the number of processors. We will use
           // similar heuristics to those used to define the default number of
           // worker threads.
-          int cpus = Runtime.getRuntime().availableProcessors();
-          value = Integer.valueOf(Math.max(24, cpus * 2));
+          value = Platform.computeNumberOfThreads(8, 1.0f);
 
           logger.debug(INFO_ERGONOMIC_SIZING_OF_JE_CLEANER_THREADS,
               backendId, (Number) value);
         }
-        else if (attrName.equals(ATTR_NUM_LOCK_TABLES)
-            && value == null)
+        else if (attrName.equals(ATTR_NUM_LOCK_TABLES))
         {
-          // Automatically choose based on the number of processors.
-          // We'll assume that the user has also allowed automatic
-          // configuration of cleaners and workers.
-          int cpus = Runtime.getRuntime().availableProcessors();
-          int cleaners = Math.max(24, cpus * 2);
-          int workers = Math.max(24, cpus * 2);
-          BigInteger tmp = BigInteger.valueOf((cleaners + workers) * 2);
+          // Automatically choose based on the number of processors. We'll assume that the user has also allowed
+          // automatic configuration of cleaners and workers.
+          BigInteger tmp = BigInteger.valueOf(Platform.computeNumberOfThreads(1, 2));
           value = tmp.nextProbablePrime();
 
           logger.debug(INFO_ERGONOMIC_SIZING_OF_JE_LOCK_TABLES, backendId, (Number) value);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
index 3b9ecd6..3e66f86 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -81,6 +81,7 @@
 import org.opends.server.types.operation.PreOperationDeleteOperation;
 import org.opends.server.types.operation.PreOperationModifyDNOperation;
 import org.opends.server.types.operation.PreOperationModifyOperation;
+import org.opends.server.util.Platform;
 
 /**
  * This class is used to load the Replication code inside the JVM
@@ -277,7 +278,7 @@
     // number of replay threads is changed and apply changes.
     cfg.addReplicationChangeListener(this);
 
-    replayThreadNumber = cfg.getNumUpdateReplayThreads();
+    replayThreadNumber = getNumberOfReplayThreadsOrDefault(cfg);
     connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
 
     //  Create the list of domains that are already defined.
@@ -303,6 +304,12 @@
         ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL);
   }
 
+  private int getNumberOfReplayThreadsOrDefault(ReplicationSynchronizationProviderCfg cfg)
+  {
+    Integer value = cfg.getNumUpdateReplayThreads();
+    return value == null ? Platform.computeNumberOfThreads(16, 1.5f) : value;
+  }
+
   /**
    * Create the threads that will wait for incoming update messages.
    */
@@ -780,16 +787,13 @@
     return true;
   }
 
-  /** {@inheritDoc} */
   @Override
-  public ConfigChangeResult applyConfigurationChange(
-      ReplicationSynchronizationProviderCfg configuration)
+  public ConfigChangeResult applyConfigurationChange(ReplicationSynchronizationProviderCfg configuration)
   {
-    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
 
     // Stop threads then restart new number of threads
     stopReplayThreads();
-    replayThreadNumber = numUpdateRepayThread;
+    replayThreadNumber = getNumberOfReplayThreadsOrDefault(configuration);
     if (!domains.isEmpty())
     {
       createReplayThreads();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java b/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
index 3d07de1..0a3c247 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
@@ -48,6 +48,7 @@
 import java.lang.reflect.Method;
 
 import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.util.Reject;
 
 import static org.opends.messages.UtilityMessages.*;
 import static org.opends.server.util.ServerConstants.CERTANDKEYGEN_PROVIDER;
@@ -609,4 +610,20 @@
   {
     return IMPL.getUsableMemoryForCaching();
   }
+
+  /**
+   * Computes the number of replay/worker/cleaner threads based on the number of cpus in the system.
+   * Allows for a multiplier to be specified and a minimum value to be returned if not enough processors
+   * are present in the system.
+   *
+   * @param minimumValue at least this value should be returned.
+   * @param cpuMultiplier the scaling multiplier of the number of threads to return
+   * @return the number of threads based on the number of cpus in the system.
+   * @throws IllegalArgumentException if {@code cpuMultiplier} is a non positive number
+   */
+  public static int computeNumberOfThreads(int minimumValue, float cpuMultiplier)
+  {
+    Reject.ifTrue(cpuMultiplier < 0, "Multiplier must be a positive number");
+    return Math.max(minimumValue, (int)(Runtime.getRuntime().availableProcessors() * cpuMultiplier));
+  }
 }

--
Gitblit v1.10.0