From c7077670daca3b689ed75e4bf71dad0483af8473 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 13:27:40 +0000
Subject: [PATCH] Avoided possible costly thread leaks in ReplicationServerDomain.
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 85 ++++++++++++++++++++----------------------
1 files changed, 40 insertions(+), 45 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 099f608..5b8859a 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -34,6 +34,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Category;
@@ -75,17 +76,22 @@
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
private final String baseDn;
+
/**
* The Status analyzer that periodically verifies whether the connected DSs
- * are late.
+ * are late. Using an AtomicReference to avoid leaking references to costly
+ * threads.
*/
- private StatusAnalyzer statusAnalyzer = null;
+ private AtomicReference<StatusAnalyzer> statusAnalyzer =
+ new AtomicReference<StatusAnalyzer>();
/**
* The monitoring publisher that periodically sends monitoring messages to the
- * topology.
+ * topology. Using an AtomicReference to avoid leaking references to costly
+ * threads.
*/
- private MonitoringPublisher monitoringPublisher = null;
+ private AtomicReference<MonitoringPublisher> monitoringPublisher =
+ new AtomicReference<MonitoringPublisher>();
/**
* The following map contains one balanced tree for each replica ID to which
@@ -2853,14 +2859,15 @@
*/
public void startStatusAnalyzer()
{
- if (!isRunningStatusAnalyzer())
- {
- int degradedStatusThreshold =
+ int degradedStatusThreshold =
localReplicationServer.getDegradedStatusThreshold();
- if (degradedStatusThreshold > 0) // 0 means no status analyzer
+ if (degradedStatusThreshold > 0) // 0 means no status analyzer
+ {
+ final StatusAnalyzer thread =
+ new StatusAnalyzer(this, degradedStatusThreshold);
+ if (statusAnalyzer.compareAndSet(null, thread))
{
- statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
- statusAnalyzer.start();
+ thread.start();
}
}
}
@@ -2870,35 +2877,26 @@
*/
private void stopStatusAnalyzer()
{
- if (isRunningStatusAnalyzer())
+ final StatusAnalyzer thread = statusAnalyzer.get();
+ if (statusAnalyzer.compareAndSet(thread, null))
{
- statusAnalyzer.shutdown();
- statusAnalyzer.waitForShutdown();
- statusAnalyzer = null;
+ thread.shutdown();
+ thread.waitForShutdown();
}
}
/**
- * Tests if the status analyzer for this domain is running.
- * @return True if the status analyzer is running, false otherwise.
- */
- private boolean isRunningStatusAnalyzer()
- {
- return statusAnalyzer != null;
- }
-
- /**
* Starts the monitoring publisher for the domain if not already started.
*/
public void startMonitoringPublisher()
{
- if (!isRunningMonitoringPublisher())
+ long period = localReplicationServer.getMonitoringPublisherPeriod();
+ if (period > 0) // 0 means no monitoring publisher
{
- long period = localReplicationServer.getMonitoringPublisherPeriod();
- if (period > 0) // 0 means no monitoring publisher
+ final MonitoringPublisher thread = new MonitoringPublisher(this, period);
+ if (monitoringPublisher.compareAndSet(null, thread))
{
- monitoringPublisher = new MonitoringPublisher(this, period);
- monitoringPublisher.start();
+ thread.start();
}
}
}
@@ -2908,24 +2906,15 @@
*/
private void stopMonitoringPublisher()
{
- if (isRunningMonitoringPublisher())
+ final MonitoringPublisher thread = monitoringPublisher.get();
+ if (monitoringPublisher.compareAndSet(thread, null))
{
- monitoringPublisher.shutdown();
- monitoringPublisher.waitForShutdown();
- monitoringPublisher = null;
+ thread.shutdown();
+ thread.waitForShutdown();
}
}
/**
- * Tests if the monitoring publisher for this domain is running.
- * @return True if the monitoring publisher is running, false otherwise.
- */
- private boolean isRunningMonitoringPublisher()
- {
- return monitoringPublisher != null;
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -3360,10 +3349,13 @@
{
// Requested to stop analyzers
stopStatusAnalyzer();
+ return;
}
- else if (isRunningStatusAnalyzer())
+
+ final StatusAnalyzer saThread = statusAnalyzer.get();
+ if (saThread != null) // it is running
{
- statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
+ saThread.setDegradedStatusThreshold(degradedStatusThreshold);
}
else if (getConnectedDSs().size() > 0)
{
@@ -3384,10 +3376,13 @@
{
// Requested to stop monitoring publishers
stopMonitoringPublisher();
+ return;
}
- else if (isRunningMonitoringPublisher())
+
+ final MonitoringPublisher mpThread = monitoringPublisher.get();
+ if (mpThread != null) // it is running
{
- monitoringPublisher.setPeriod(period);
+ mpThread.setPeriod(period);
}
else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
{
--
Gitblit v1.10.0