From eb1275dc2c85f9e3db21bfd5a151d4bc0740d336 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 18 Jan 2008 10:18:24 +0000
Subject: [PATCH] Fix for 1288: Synchronization plugin should not create 10 listener threads for each baseDn

---
 opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java |  114 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 112 insertions(+), 2 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 001484d..a69f1b4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,19 +22,22 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 
+import java.util.ArrayList;
 import static org.opends.server.replication.plugin.
 ReplicationRepairRequestControl.*;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.opends.messages.Message;
 import org.opends.server.admin.server.ConfigurationAddListener;
+import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.server.ConfigurationDeleteListener;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
@@ -82,6 +85,8 @@
        extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
        implements ConfigurationAddListener<ReplicationDomainCfg>,
                   ConfigurationDeleteListener<ReplicationDomainCfg>,
+                  ConfigurationChangeListener
+                  <ReplicationSynchronizationProviderCfg>,
                   BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                   ExportTaskListener
 {
@@ -89,6 +94,23 @@
   private static Map<DN, ReplicationDomain> domains =
     new HashMap<DN, ReplicationDomain>() ;
 
+  /**
+   * The queue of received update messages, to be treated by the ReplayThread
+   * threads.
+   */
+  private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
+    new LinkedBlockingQueue<UpdateToReplay>();
+
+  /**
+   * The list of ReplayThread threads.
+   */
+  private static List<ReplayThread> replayThreads =
+    new ArrayList<ReplayThread>();
+
+  /**
+   * The configurable number of replay threads.
+   */
+  private static int replayThreadNumber = 10;
 
   /**
    * Finds the domain for a given DN.
@@ -167,7 +189,16 @@
       throws ConfigException
   {
     ReplicationDomain domain;
-    domain = new ReplicationDomain(configuration);
+    domain = new ReplicationDomain(configuration, updateToReplayQueue);
+
+    if (domains.size() == 0)
+    {
+      /*
+       * Create the threads that will process incoming update messages
+       */
+      createReplayThreads();
+    }
+
     domains.put(domain.getBaseDN(), domain);
     domain.start();
     return domain;
@@ -180,6 +211,12 @@
   public static void deleteDomain(DN dn)
   {
     ReplicationDomain domain = domains.remove(dn);
+
+    // No replay threads running if no replication need
+    if (domains.size() == 0) {
+      stopReplayThreads();
+    }
+
     if (domain != null)
       domain.shutdown();
   }
@@ -200,6 +237,12 @@
     configuration.addReplicationDomainAddListener(this);
     configuration.addReplicationDomainDeleteListener(this);
 
+    // Register as a root configuration listener so that we can be notified if
+    // number of replay threads is changed and apply changes.
+    configuration.addReplicationChangeListener(this);
+
+    replayThreadNumber = configuration.getNumUpdateReplayThreads();
+
     //  Create the list of domains that are already defined.
     for (String name : configuration.listReplicationDomains())
     {
@@ -228,6 +271,39 @@
   }
 
   /**
+   * Create the threads that will wait for incoming update messages.
+   */
+  private synchronized static void createReplayThreads()
+  {
+    replayThreads.clear();
+
+    for (int i = 0; i < replayThreadNumber; i++)
+    {
+      ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
+      replayThread.start();
+      replayThreads.add(replayThread);
+    }
+  }
+
+  /**
+   * Stope the threads that are waiting for incoming update messages.
+   */
+  private synchronized static void stopReplayThreads()
+  {
+    //  stop the replay threads
+    for (ReplayThread replayThread : replayThreads)
+    {
+      replayThread.shutdown();
+    }
+
+    for (ReplayThread replayThread : replayThreads)
+    {
+      replayThread.waitForShutdown();
+    }
+    replayThreads.clear();
+  }
+
+  /**
    * {@inheritDoc}
    */
   public boolean isConfigurationAddAcceptable(
@@ -431,6 +507,9 @@
   @Override
   public void finalizeSynchronizationProvider()
   {
+    // Stop replay threads
+    stopReplayThreads();
+
     // shutdown all the domains
     for (ReplicationDomain domain : domains.values())
     {
@@ -621,6 +700,37 @@
   {
     return replicationServerListener;
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean
+    isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
+    configuration,
+    List<Message> unacceptableReasons)
+  {
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public ConfigChangeResult
+    applyConfigurationChange
+    (ReplicationSynchronizationProviderCfg configuration)
+  {
+    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
+
+    // Stop threads then restart new number of threads
+    stopReplayThreads();
+    replayThreadNumber = numUpdateRepayThread;
+    if (domains.size() > 0)
+    {
+      createReplayThreads();
+    }
+
+    return new ConfigChangeResult(ResultCode.SUCCESS, false);
+  }
 }
 
 

--
Gitblit v1.10.0