mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
22.22.2009 2d7788a781b8a7ce58b7ec9f8a7a1ae9418ee28b
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
@@ -65,7 +64,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
import java.util.Timer;
import java.util.TimerTask;
@@ -146,20 +144,12 @@
  /* Monitor data management */
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore;
  /**
   * The monitor data consolidated over the topology.
   */
  private MonitorData monitorData = new MonitorData();
  private MonitorData wrkMonitorData;
  private Object monitorDataLock = new Object();
  /**
   * The needed info for each received assured update message we are waiting
@@ -2255,23 +2245,30 @@
  synchronized protected MonitorData computeMonitorData()
    throws DirectoryException
  {
    if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime())
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
      // The current data are still valid. No need to renew them.
      return monitorData;
    }
    // Update the monitorData of all domains if this was necessary.
    replicationServer.computeMonitorData();
    return monitorData;
  }
    wrkMonitorData = new MonitorData();
    synchronized (wrkMonitorData)
  /**
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   *
   * @return The number of response that should come back.
   *
   * @throws DirectoryException In case the monitoring information could
   *                            not be collected.
   */
  int initializeMonitorData() throws DirectoryException
  {
    synchronized (monitorDataLock)
    {
      wrkMonitorData = new MonitorData();
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " Computing monitor data ");
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Computing monitor data ");
      // Let's process our directly connected LSes
      // - in the ServerHandler for a given LS1, the stored state contains :
@@ -2299,7 +2296,7 @@
        wrkMonitorData.setMaxCN(serverID, maxcn);
        wrkMonitorData.setLDAPServerState(serverID, directlshState);
        wrkMonitorData.setFirstMissingDate(serverID,
          directlsh.getApproxFirstMissingDate());
            directlsh.getApproxFirstMissingDate());
      }
      // Then initialize the max CN for the LS that produced something
@@ -2319,44 +2316,35 @@
      // and we need the remote ones.
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " Local monitor data: " +
          wrkMonitorData.toString());
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Local monitor data: " +
            wrkMonitorData.toString());
    }
    // Send Request to the other Replication Servers
    if (remoteMonitorResponsesSemaphore == null)
    {
      remoteMonitorResponsesSemaphore = new Semaphore(0);
      short requestCnt = sendMonitorDataRequest();
      // Wait reponses from them or timeout
      waitMonitorDataResponses(requestCnt);
    } else
    {
      // The processing of renewing the monitor cache is already running
      // We'll make it sleeping until the end
      // TODO: unit test for this case.
      while (remoteMonitorResponsesSemaphore != null)
      {
        waitMonitorDataResponses(1);
      }
    }
    // Send the request for remote monitor data to the
    return sendMonitorDataRequest();
  }
  /**
   * Complete all the calculation when all monitoring information
   * has been received.
   */
  void completeMonitorData()
  {
    wrkMonitorData.completeComputing();
    // Store the new computed data as the reference
    synchronized (monitorData)
    synchronized (monitorDataLock)
    {
      // Now we have the expected answers or an error occurred
      monitorData = wrkMonitorData;
      wrkMonitorData = null;
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " *** Computed MonitorData: " +
          monitorData.toString());
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " *** Computed MonitorData: " +
            monitorData.toString());
    }
    return monitorData;
  }
  /**
@@ -2364,10 +2352,10 @@
   * @return the number of requests sent.
   * @throws DirectoryException when a problem occurs.
   */
  protected short sendMonitorDataRequest()
  protected int sendMonitorDataRequest()
    throws DirectoryException
  {
    short sent = 0;
    int sent = 0;
    try
    {
      for (ServerHandler rs : replicationServers.values())
@@ -2389,49 +2377,6 @@
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * @throws DirectoryException When an error occurs.
   */
  protected void waitMonitorDataResponses(int expectedResponses)
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn +
          " waiting for " + expectedResponses + " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      {
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
      } else
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
      }
    } catch (Exception e)
    {
      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
    } finally
    {
      remoteMonitorResponsesSemaphore = null;
    }
  }
  /**
   * Processes a Monitor message receives from a remote Replication Server
   * and stores the data received.
   *
@@ -2442,23 +2387,20 @@
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        "Receiving " + msg + " from " + msg.getsenderID() +
        remoteMonitorResponsesSemaphore);
    if (remoteMonitorResponsesSemaphore == null)
    {
      // Let's ignore the remote monitor data just received
      // since the computing processing has been ended.
      // An error - probably a timemout - occurred that was already logged
      logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
        Short.toString(msg.getsenderID())));
      return;
    }
        "Receiving " + msg + " from " + msg.getsenderID());
    try
    {
      synchronized (wrkMonitorData)
      synchronized (monitorDataLock)
      {
        if (wrkMonitorData == null)
        {
          // This is a response for an earlier request whose computing is
          // already complete.
          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
                      Short.toString(msg.getsenderID())));
          return;
        }
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN across the RSes
        ServerState replServerState = msg.getReplServerDbState();
@@ -2523,7 +2465,7 @@
      // Decreases the number of expected responses and potentially
      // wakes up the waiting requestor thread.
      remoteMonitorResponsesSemaphore.release();
      replicationServer.responseReceived();
    } catch (Exception e)
    {
@@ -2532,7 +2474,7 @@
      // If an exception occurs while processing one of the expected message,
      // the processing is aborted and the waiting thread is awoke.
      remoteMonitorResponsesSemaphore.notifyAll();
      replicationServer.responseReceivedAll();
    }
  }