mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
22.22.2009 36191b70a96c298ad07cf9a9384cc42764ea957e
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -65,12 +67,14 @@
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
@@ -1094,4 +1098,115 @@
  {
    return replicationPort;
  }
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* The date of the last time they have been elaborated */
  private long monitorDataLastBuildDate = 0;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
  /**
   * Trigger the computation of the Global Monitoring Data.
   * This should be called by all the MonitorProviders that need
   * the global monitoring data to be updated before they can
   * publish their information to cn=monitor.
   *
   * This method will trigger the update of all the global monitoring
   * information of all the base-DNs of this replication Server.
   *
   * @throws DirectoryException If the computation cannot be achieved.
   */
  public void computeMonitorData() throws DirectoryException
  {
    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
      // The current data are still valid. No need to renew them.
      return;
    }
    remoteMonitorResponsesSemaphore.drainPermits();
    int count = 0;
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      count += domain.initializeMonitorData();
    }
    // Wait for responses
    waitMonitorDataResponses(count);
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      domain.completeMonitorData();
    }
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * @throws DirectoryException When an error occurs.
   */
  private void waitMonitorDataResponses(int expectedResponses)
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " baseDn=" +
          " waiting for " + expectedResponses + " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
      } else
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + getMonitorInstanceName() + " baseDn=" +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
      }
    } catch (Exception e)
    {
      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
    }
  }
  /**
   * This should be called by each ReplicationServerDomain that receives
   * a response to a monitor request message.
   */
  public void responseReceived()
  {
    remoteMonitorResponsesSemaphore.release();
  }
  /**
   * This should be called when the Monitoring has failed and the
   * Worker thread that is waiting for the result should be awaken.
   */
  public void responseReceivedAll()
  {
    remoteMonitorResponsesSemaphore.notifyAll();
  }
}