From 4a371f6717c50f5e327efeeaeeeccd3f82e1f5b0 Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Thu, 10 Dec 2015 10:38:32 +0000
Subject: [PATCH] Partial fix for OPENDJ-2190 Delete not replicated after 36h of add/delete operations on replicated topology
---
opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java | 11 ++---
opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java | 17 ++++++++
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java | 16 +++++---
opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml | 12 +++---
opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java | 24 ++++++------
5 files changed, 49 insertions(+), 31 deletions(-)
diff --git a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml
index f15fce5..7af8f89 100644
--- a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml
+++ b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationSynchronizationProviderConfiguration.xml
@@ -23,7 +23,7 @@
!
!
! Copyright 2007-2008 Sun Microsystems, Inc.
- ! Portions Copyright 2011 ForgeRock AS
+ ! Portions Copyright 2011-2015 ForgeRock AS
! -->
<adm:managed-object name="replication-synchronization-provider"
plural-name="replication-synchronization-providers"
@@ -88,11 +88,11 @@
received for all the replication domains.
</adm:description>
<adm:default-behavior>
- <adm:defined>
- <adm:value>
- 10
- </adm:value>
- </adm:defined>
+ <adm:alias>
+ <adm:synopsis>
+ Let the server decide.
+ </adm:synopsis>
+ </adm:alias>
</adm:default-behavior>
<adm:syntax>
<adm:integer lower-limit="1" upper-limit="65535"></adm:integer>
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java b/opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java
index a1815a4..d6800f5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/api/WorkQueue.java
@@ -22,11 +22,12 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013-2014 ForgeRock AS.
+ * Portions Copyright 2013-2015 ForgeRock AS.
*/
package org.opends.server.api;
import static org.opends.messages.CoreMessages.*;
+
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.i18n.LocalizableMessage;
import org.opends.server.admin.std.server.WorkQueueCfg;
@@ -34,8 +35,7 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Operation;
-
-
+import org.opends.server.util.Platform;
/**
* This class defines the structure and methods that must be
@@ -166,11 +166,8 @@
else
{
// Automatically choose based on the number of processors.
- int cpus = Runtime.getRuntime().availableProcessors();
- int value = Math.max(24, cpus * 2);
-
+ int value = Platform.computeNumberOfThreads(16, 2);
logger.debug(INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL, value);
-
return value;
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java
index cf7511f..d1ad6aa 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/jeb/ConfigurableEnvironment.java
@@ -62,6 +62,7 @@
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.dbi.MemoryBudget;
+import org.opends.server.util.Platform;
/** This class maps JE properties to configuration attributes. */
public class ConfigurableEnvironment
@@ -299,27 +300,26 @@
{
Object value = method.invoke(cfg);
- if (attrName.equals(ATTR_NUM_CLEANER_THREADS) && value == null)
+ if (value != null)
+ {
+ return String.valueOf(value);
+ }
+
+ if (attrName.equals(ATTR_NUM_CLEANER_THREADS))
{
// Automatically choose based on the number of processors. We will use
// similar heuristics to those used to define the default number of
// worker threads.
- int cpus = Runtime.getRuntime().availableProcessors();
- value = Integer.valueOf(Math.max(24, cpus * 2));
+ value = Platform.computeNumberOfThreads(8, 1.0f);
logger.debug(INFO_ERGONOMIC_SIZING_OF_JE_CLEANER_THREADS,
backendId, (Number) value);
}
- else if (attrName.equals(ATTR_NUM_LOCK_TABLES)
- && value == null)
+ else if (attrName.equals(ATTR_NUM_LOCK_TABLES))
{
- // Automatically choose based on the number of processors.
- // We'll assume that the user has also allowed automatic
- // configuration of cleaners and workers.
- int cpus = Runtime.getRuntime().availableProcessors();
- int cleaners = Math.max(24, cpus * 2);
- int workers = Math.max(24, cpus * 2);
- BigInteger tmp = BigInteger.valueOf((cleaners + workers) * 2);
+ // Automatically choose based on the number of processors. We'll assume that the user has also allowed
+ // automatic configuration of cleaners and workers.
+ BigInteger tmp = BigInteger.valueOf(Platform.computeNumberOfThreads(1, 2));
value = tmp.nextProbablePrime();
logger.debug(INFO_ERGONOMIC_SIZING_OF_JE_LOCK_TABLES, backendId, (Number) value);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
index 3b9ecd6..3e66f86 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -81,6 +81,7 @@
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
+import org.opends.server.util.Platform;
/**
* This class is used to load the Replication code inside the JVM
@@ -277,7 +278,7 @@
// number of replay threads is changed and apply changes.
cfg.addReplicationChangeListener(this);
- replayThreadNumber = cfg.getNumUpdateReplayThreads();
+ replayThreadNumber = getNumberOfReplayThreadsOrDefault(cfg);
connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
// Create the list of domains that are already defined.
@@ -303,6 +304,12 @@
ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL);
}
+ private int getNumberOfReplayThreadsOrDefault(ReplicationSynchronizationProviderCfg cfg)
+ {
+ Integer value = cfg.getNumUpdateReplayThreads();
+ return value == null ? Platform.computeNumberOfThreads(16, 1.5f) : value;
+ }
+
/**
* Create the threads that will wait for incoming update messages.
*/
@@ -780,16 +787,13 @@
return true;
}
- /** {@inheritDoc} */
@Override
- public ConfigChangeResult applyConfigurationChange(
- ReplicationSynchronizationProviderCfg configuration)
+ public ConfigChangeResult applyConfigurationChange(ReplicationSynchronizationProviderCfg configuration)
{
- int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
// Stop threads then restart new number of threads
stopReplayThreads();
- replayThreadNumber = numUpdateRepayThread;
+ replayThreadNumber = getNumberOfReplayThreadsOrDefault(configuration);
if (!domains.isEmpty())
{
createReplayThreads();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java b/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
index 3d07de1..0a3c247 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/util/Platform.java
@@ -48,6 +48,7 @@
import java.lang.reflect.Method;
import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.util.Reject;
import static org.opends.messages.UtilityMessages.*;
import static org.opends.server.util.ServerConstants.CERTANDKEYGEN_PROVIDER;
@@ -609,4 +610,20 @@
{
return IMPL.getUsableMemoryForCaching();
}
+
+ /**
+ * Computes the number of replay/worker/cleaner threads based on the number of cpus in the system.
+ * Allows for a multiplier to be specified and a minimum value to be returned if not enough processors
+ * are present in the system.
+ *
+ * @param minimumValue at least this value should be returned.
+ * @param cpuMultiplier the scaling multiplier of the number of threads to return
+ * @return the number of threads based on the number of cpus in the system.
+ * @throws IllegalArgumentException if {@code cpuMultiplier} is a non positive number
+ */
+ public static int computeNumberOfThreads(int minimumValue, float cpuMultiplier)
+ {
+ Reject.ifTrue(cpuMultiplier < 0, "Multiplier must be a positive number");
+ return Math.max(minimumValue, (int)(Runtime.getRuntime().availableProcessors() * cpuMultiplier));
+ }
}
--
Gitblit v1.10.0