From eb1275dc2c85f9e3db21bfd5a151d4bc0740d336 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 18 Jan 2008 10:18:24 +0000
Subject: [PATCH] Fix for 1288: Synchronization plugin should not create 10 listener threads for each baseDn
---
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 112 insertions(+), 2 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 001484d..a69f1b4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,19 +22,22 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
+import java.util.ArrayList;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
+import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
@@ -82,6 +85,8 @@
extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
implements ConfigurationAddListener<ReplicationDomainCfg>,
ConfigurationDeleteListener<ReplicationDomainCfg>,
+ ConfigurationChangeListener
+ <ReplicationSynchronizationProviderCfg>,
BackupTaskListener, RestoreTaskListener, ImportTaskListener,
ExportTaskListener
{
@@ -89,6 +94,23 @@
private static Map<DN, ReplicationDomain> domains =
new HashMap<DN, ReplicationDomain>() ;
+ /**
+ * The queue of received update messages, to be treated by the ReplayThread
+ * threads.
+ */
+ private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
+ new LinkedBlockingQueue<UpdateToReplay>();
+
+ /**
+ * The list of ReplayThread threads.
+ */
+ private static List<ReplayThread> replayThreads =
+ new ArrayList<ReplayThread>();
+
+ /**
+ * The configurable number of replay threads.
+ */
+ private static int replayThreadNumber = 10;
/**
* Finds the domain for a given DN.
@@ -167,7 +189,16 @@
throws ConfigException
{
ReplicationDomain domain;
- domain = new ReplicationDomain(configuration);
+ domain = new ReplicationDomain(configuration, updateToReplayQueue);
+
+ if (domains.size() == 0)
+ {
+ /*
+ * Create the threads that will process incoming update messages
+ */
+ createReplayThreads();
+ }
+
domains.put(domain.getBaseDN(), domain);
domain.start();
return domain;
@@ -180,6 +211,12 @@
public static void deleteDomain(DN dn)
{
ReplicationDomain domain = domains.remove(dn);
+
+ // No replay threads running if no replication need
+ if (domains.size() == 0) {
+ stopReplayThreads();
+ }
+
if (domain != null)
domain.shutdown();
}
@@ -200,6 +237,12 @@
configuration.addReplicationDomainAddListener(this);
configuration.addReplicationDomainDeleteListener(this);
+ // Register as a root configuration listener so that we can be notified if
+ // number of replay threads is changed and apply changes.
+ configuration.addReplicationChangeListener(this);
+
+ replayThreadNumber = configuration.getNumUpdateReplayThreads();
+
// Create the list of domains that are already defined.
for (String name : configuration.listReplicationDomains())
{
@@ -228,6 +271,39 @@
}
/**
+ * Create the threads that will wait for incoming update messages.
+ */
+ private synchronized static void createReplayThreads()
+ {
+ replayThreads.clear();
+
+ for (int i = 0; i < replayThreadNumber; i++)
+ {
+ ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
+ replayThread.start();
+ replayThreads.add(replayThread);
+ }
+ }
+
+ /**
+ * Stope the threads that are waiting for incoming update messages.
+ */
+ private synchronized static void stopReplayThreads()
+ {
+ // stop the replay threads
+ for (ReplayThread replayThread : replayThreads)
+ {
+ replayThread.shutdown();
+ }
+
+ for (ReplayThread replayThread : replayThreads)
+ {
+ replayThread.waitForShutdown();
+ }
+ replayThreads.clear();
+ }
+
+ /**
* {@inheritDoc}
*/
public boolean isConfigurationAddAcceptable(
@@ -431,6 +507,9 @@
@Override
public void finalizeSynchronizationProvider()
{
+ // Stop replay threads
+ stopReplayThreads();
+
// shutdown all the domains
for (ReplicationDomain domain : domains.values())
{
@@ -621,6 +700,37 @@
{
return replicationServerListener;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean
+ isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
+ configuration,
+ List<Message> unacceptableReasons)
+ {
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public ConfigChangeResult
+ applyConfigurationChange
+ (ReplicationSynchronizationProviderCfg configuration)
+ {
+ int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
+
+ // Stop threads then restart new number of threads
+ stopReplayThreads();
+ replayThreadNumber = numUpdateRepayThread;
+ if (domains.size() > 0)
+ {
+ createReplayThreads();
+ }
+
+ return new ConfigChangeResult(ResultCode.SUCCESS, false);
+ }
}
--
Gitblit v1.10.0