mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.27.2013 c7077670daca3b689ed75e4bf71dad0483af8473
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -34,6 +34,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Category;
@@ -75,17 +76,22 @@
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
  private final String baseDn;
  /**
   * The Status analyzer that periodically verifies whether the connected DSs
   * are late.
   * are late. Using an AtomicReference to avoid leaking references to costly
   * threads.
   */
  private StatusAnalyzer statusAnalyzer = null;
  private AtomicReference<StatusAnalyzer> statusAnalyzer =
      new AtomicReference<StatusAnalyzer>();
  /**
   * The monitoring publisher that periodically sends monitoring messages to the
   * topology.
   * topology. Using an AtomicReference to avoid leaking references to costly
   * threads.
   */
  private MonitoringPublisher monitoringPublisher = null;
  private AtomicReference<MonitoringPublisher> monitoringPublisher =
      new AtomicReference<MonitoringPublisher>();
  /**
   * The following map contains one balanced tree for each replica ID to which
@@ -2853,14 +2859,15 @@
   */
  public void startStatusAnalyzer()
  {
    if (!isRunningStatusAnalyzer())
    {
      int degradedStatusThreshold =
    int degradedStatusThreshold =
        localReplicationServer.getDegradedStatusThreshold();
      if (degradedStatusThreshold > 0) // 0 means no status analyzer
    if (degradedStatusThreshold > 0) // 0 means no status analyzer
    {
      final StatusAnalyzer thread =
          new StatusAnalyzer(this, degradedStatusThreshold);
      if (statusAnalyzer.compareAndSet(null, thread))
      {
        statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
        statusAnalyzer.start();
        thread.start();
      }
    }
  }
@@ -2870,35 +2877,26 @@
   */
  private void stopStatusAnalyzer()
  {
    if (isRunningStatusAnalyzer())
    final StatusAnalyzer thread = statusAnalyzer.get();
    if (statusAnalyzer.compareAndSet(thread, null))
    {
      statusAnalyzer.shutdown();
      statusAnalyzer.waitForShutdown();
      statusAnalyzer = null;
      thread.shutdown();
      thread.waitForShutdown();
    }
  }
  /**
   * Tests if the status analyzer for this domain is running.
   * @return True if the status analyzer is running, false otherwise.
   */
  private boolean isRunningStatusAnalyzer()
  {
    return statusAnalyzer != null;
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  public void startMonitoringPublisher()
  {
    if (!isRunningMonitoringPublisher())
    long period = localReplicationServer.getMonitoringPublisherPeriod();
    if (period > 0) // 0 means no monitoring publisher
    {
      long period = localReplicationServer.getMonitoringPublisherPeriod();
      if (period > 0) // 0 means no monitoring publisher
      final MonitoringPublisher thread = new MonitoringPublisher(this, period);
      if (monitoringPublisher.compareAndSet(null, thread))
      {
        monitoringPublisher = new MonitoringPublisher(this, period);
        monitoringPublisher.start();
        thread.start();
      }
    }
  }
@@ -2908,24 +2906,15 @@
   */
  private void stopMonitoringPublisher()
  {
    if (isRunningMonitoringPublisher())
    final MonitoringPublisher thread = monitoringPublisher.get();
    if (monitoringPublisher.compareAndSet(thread, null))
    {
      monitoringPublisher.shutdown();
      monitoringPublisher.waitForShutdown();
      monitoringPublisher = null;
      thread.shutdown();
      thread.waitForShutdown();
    }
  }
  /**
   * Tests if the monitoring publisher for this domain is running.
   * @return True if the monitoring publisher is running, false otherwise.
   */
  private boolean isRunningMonitoringPublisher()
  {
    return monitoringPublisher != null;
  }
  /**
   * {@inheritDoc}
   */
  @Override
@@ -3360,10 +3349,13 @@
    {
      // Requested to stop analyzers
      stopStatusAnalyzer();
      return;
    }
    else if (isRunningStatusAnalyzer())
    final StatusAnalyzer saThread = statusAnalyzer.get();
    if (saThread != null) // it is running
    {
      statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
    }
    else if (getConnectedDSs().size() > 0)
    {
@@ -3384,10 +3376,13 @@
    {
      // Requested to stop monitoring publishers
      stopMonitoringPublisher();
      return;
    }
    else if (isRunningMonitoringPublisher())
    final MonitoringPublisher mpThread = monitoringPublisher.get();
    if (mpThread != null) // it is running
    {
      monitoringPublisher.setPeriod(period);
      mpThread.setPeriod(period);
    }
    else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
    {