| | |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.CountDownLatch; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import org.opends.messages.Category; |
| | |
| | | public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | private final String baseDn; |
| | | |
| | | /** |
| | | * The Status analyzer that periodically verifies whether the connected DSs |
| | | * are late. |
| | | * are late. Using an AtomicReference to avoid leaking references to costly |
| | | * threads. |
| | | */ |
| | | private StatusAnalyzer statusAnalyzer = null; |
| | | private AtomicReference<StatusAnalyzer> statusAnalyzer = |
| | | new AtomicReference<StatusAnalyzer>(); |
| | | |
| | | /** |
| | | * The monitoring publisher that periodically sends monitoring messages to the |
| | | * topology. |
| | | * topology. Using an AtomicReference to avoid leaking references to costly |
| | | * threads. |
| | | */ |
| | | private MonitoringPublisher monitoringPublisher = null; |
| | | private AtomicReference<MonitoringPublisher> monitoringPublisher = |
| | | new AtomicReference<MonitoringPublisher>(); |
| | | |
| | | /** |
| | | * The following map contains one balanced tree for each replica ID to which |
| | |
| | | */ |
| | | public void startStatusAnalyzer() |
| | | { |
| | | if (!isRunningStatusAnalyzer()) |
| | | { |
| | | int degradedStatusThreshold = |
| | | int degradedStatusThreshold = |
| | | localReplicationServer.getDegradedStatusThreshold(); |
| | | if (degradedStatusThreshold > 0) // 0 means no status analyzer |
| | | if (degradedStatusThreshold > 0) // 0 means no status analyzer |
| | | { |
| | | final StatusAnalyzer thread = |
| | | new StatusAnalyzer(this, degradedStatusThreshold); |
| | | if (statusAnalyzer.compareAndSet(null, thread)) |
| | | { |
| | | statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold); |
| | | statusAnalyzer.start(); |
| | | thread.start(); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | private void stopStatusAnalyzer() |
| | | { |
| | | if (isRunningStatusAnalyzer()) |
| | | final StatusAnalyzer thread = statusAnalyzer.get(); |
| | | if (statusAnalyzer.compareAndSet(thread, null)) |
| | | { |
| | | statusAnalyzer.shutdown(); |
| | | statusAnalyzer.waitForShutdown(); |
| | | statusAnalyzer = null; |
| | | thread.shutdown(); |
| | | thread.waitForShutdown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Tests if the status analyzer for this domain is running. |
| | | * @return True if the status analyzer is running, false otherwise. |
| | | */ |
| | | private boolean isRunningStatusAnalyzer() |
| | | { |
| | | return statusAnalyzer != null; |
| | | } |
| | | |
| | | /** |
| | | * Starts the monitoring publisher for the domain if not already started. |
| | | */ |
| | | public void startMonitoringPublisher() |
| | | { |
| | | if (!isRunningMonitoringPublisher()) |
| | | long period = localReplicationServer.getMonitoringPublisherPeriod(); |
| | | if (period > 0) // 0 means no monitoring publisher |
| | | { |
| | | long period = localReplicationServer.getMonitoringPublisherPeriod(); |
| | | if (period > 0) // 0 means no monitoring publisher |
| | | final MonitoringPublisher thread = new MonitoringPublisher(this, period); |
| | | if (monitoringPublisher.compareAndSet(null, thread)) |
| | | { |
| | | monitoringPublisher = new MonitoringPublisher(this, period); |
| | | monitoringPublisher.start(); |
| | | thread.start(); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | private void stopMonitoringPublisher() |
| | | { |
| | | if (isRunningMonitoringPublisher()) |
| | | final MonitoringPublisher thread = monitoringPublisher.get(); |
| | | if (monitoringPublisher.compareAndSet(thread, null)) |
| | | { |
| | | monitoringPublisher.shutdown(); |
| | | monitoringPublisher.waitForShutdown(); |
| | | monitoringPublisher = null; |
| | | thread.shutdown(); |
| | | thread.waitForShutdown(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Tests if the monitoring publisher for this domain is running. |
| | | * @return True if the monitoring publisher is running, false otherwise. |
| | | */ |
| | | private boolean isRunningMonitoringPublisher() |
| | | { |
| | | return monitoringPublisher != null; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | |
| | | { |
| | | // Requested to stop analyzers |
| | | stopStatusAnalyzer(); |
| | | return; |
| | | } |
| | | else if (isRunningStatusAnalyzer()) |
| | | |
| | | final StatusAnalyzer saThread = statusAnalyzer.get(); |
| | | if (saThread != null) // it is running |
| | | { |
| | | statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold); |
| | | saThread.setDegradedStatusThreshold(degradedStatusThreshold); |
| | | } |
| | | else if (getConnectedDSs().size() > 0) |
| | | { |
| | |
| | | { |
| | | // Requested to stop monitoring publishers |
| | | stopMonitoringPublisher(); |
| | | return; |
| | | } |
| | | else if (isRunningMonitoringPublisher()) |
| | | |
| | | final MonitoringPublisher mpThread = monitoringPublisher.get(); |
| | | if (mpThread != null) // it is running |
| | | { |
| | | monitoringPublisher.setPeriod(period); |
| | | mpThread.setPeriod(period); |
| | | } |
| | | else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0) |
| | | { |