From c7077670daca3b689ed75e4bf71dad0483af8473 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 13:27:40 +0000
Subject: [PATCH] Avoided possible costly thread leaks in ReplicationServerDomain.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |   85 ++++++++++++++++++++----------------------
 1 files changed, 40 insertions(+), 45 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 099f608..5b8859a 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.opends.messages.Category;
@@ -75,17 +76,22 @@
 public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
 {
   private final String baseDn;
+
   /**
    * The Status analyzer that periodically verifies whether the connected DSs
-   * are late.
+   * are late. Using an AtomicReference to avoid leaking references to costly
+   * threads.
    */
-  private StatusAnalyzer statusAnalyzer = null;
+  private AtomicReference<StatusAnalyzer> statusAnalyzer =
+      new AtomicReference<StatusAnalyzer>();
 
   /**
    * The monitoring publisher that periodically sends monitoring messages to the
-   * topology.
+   * topology. Using an AtomicReference to avoid leaking references to costly
+   * threads.
    */
-  private MonitoringPublisher monitoringPublisher = null;
+  private AtomicReference<MonitoringPublisher> monitoringPublisher =
+      new AtomicReference<MonitoringPublisher>();
 
   /**
    * The following map contains one balanced tree for each replica ID to which
@@ -2853,14 +2859,15 @@
    */
   public void startStatusAnalyzer()
   {
-    if (!isRunningStatusAnalyzer())
-    {
-      int degradedStatusThreshold =
+    int degradedStatusThreshold =
         localReplicationServer.getDegradedStatusThreshold();
-      if (degradedStatusThreshold > 0) // 0 means no status analyzer
+    if (degradedStatusThreshold > 0) // 0 means no status analyzer
+    {
+      final StatusAnalyzer thread =
+          new StatusAnalyzer(this, degradedStatusThreshold);
+      if (statusAnalyzer.compareAndSet(null, thread))
       {
-        statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
-        statusAnalyzer.start();
+        thread.start();
       }
     }
   }
@@ -2870,35 +2877,26 @@
    */
   private void stopStatusAnalyzer()
   {
-    if (isRunningStatusAnalyzer())
+    final StatusAnalyzer thread = statusAnalyzer.get();
+    if (statusAnalyzer.compareAndSet(thread, null))
     {
-      statusAnalyzer.shutdown();
-      statusAnalyzer.waitForShutdown();
-      statusAnalyzer = null;
+      thread.shutdown();
+      thread.waitForShutdown();
     }
   }
 
   /**
-   * Tests if the status analyzer for this domain is running.
-   * @return True if the status analyzer is running, false otherwise.
-   */
-  private boolean isRunningStatusAnalyzer()
-  {
-    return statusAnalyzer != null;
-  }
-
-  /**
    * Starts the monitoring publisher for the domain if not already started.
    */
   public void startMonitoringPublisher()
   {
-    if (!isRunningMonitoringPublisher())
+    long period = localReplicationServer.getMonitoringPublisherPeriod();
+    if (period > 0) // 0 means no monitoring publisher
     {
-      long period = localReplicationServer.getMonitoringPublisherPeriod();
-      if (period > 0) // 0 means no monitoring publisher
+      final MonitoringPublisher thread = new MonitoringPublisher(this, period);
+      if (monitoringPublisher.compareAndSet(null, thread))
       {
-        monitoringPublisher = new MonitoringPublisher(this, period);
-        monitoringPublisher.start();
+        thread.start();
       }
     }
   }
@@ -2908,24 +2906,15 @@
    */
   private void stopMonitoringPublisher()
   {
-    if (isRunningMonitoringPublisher())
+    final MonitoringPublisher thread = monitoringPublisher.get();
+    if (monitoringPublisher.compareAndSet(thread, null))
     {
-      monitoringPublisher.shutdown();
-      monitoringPublisher.waitForShutdown();
-      monitoringPublisher = null;
+      thread.shutdown();
+      thread.waitForShutdown();
     }
   }
 
   /**
-   * Tests if the monitoring publisher for this domain is running.
-   * @return True if the monitoring publisher is running, false otherwise.
-   */
-  private boolean isRunningMonitoringPublisher()
-  {
-    return monitoringPublisher != null;
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -3360,10 +3349,13 @@
     {
       // Requested to stop analyzers
       stopStatusAnalyzer();
+      return;
     }
-    else if (isRunningStatusAnalyzer())
+
+    final StatusAnalyzer saThread = statusAnalyzer.get();
+    if (saThread != null) // it is running
     {
-      statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
+      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
     }
     else if (getConnectedDSs().size() > 0)
     {
@@ -3384,10 +3376,13 @@
     {
       // Requested to stop monitoring publishers
       stopMonitoringPublisher();
+      return;
     }
-    else if (isRunningMonitoringPublisher())
+
+    final MonitoringPublisher mpThread = monitoringPublisher.get();
+    if (mpThread != null) // it is running
     {
-      monitoringPublisher.setPeriod(period);
+      mpThread.setPeriod(period);
     }
     else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
     {

--
Gitblit v1.10.0