mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
22.22.2009 2d7788a781b8a7ce58b7ec9f8a7a1ae9418ee28b
opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -62,8 +62,6 @@
   *   date of the first missing change.
   */
  /* The date of the last time they have been elaborated */
  private long buildDate = 0;
  // For each LDAP server, its server state
  private ConcurrentHashMap<Short, ServerState> LDAPStates =
@@ -103,7 +101,7 @@
  {
    Long afmd = fmd.get(serverId);
    if ((afmd != null) && (afmd>0))
      return ((this.getBuildDate() - afmd)/1000);
      return (TimeThread.getTime() - afmd)/1000;
    else
      return 0;
  }
@@ -243,7 +241,6 @@
        TRACER.debugInfo(
          "Complete monitor data : Missing changes ("+ lsiSid +")=" + mds);
    }
    this.setBuildDate(TimeThread.getTime());
    }
  /**
@@ -255,7 +252,6 @@
  {
    String mds = "Monitor data=\n";
    mds+= "Build date=" + this.getBuildDate();
    // RS data
    Iterator<Short> rsite = fmRSDate.keySet().iterator();
    while (rsite.hasNext())
@@ -281,10 +277,9 @@
      ServerState ss = LDAPStates.get(sid);
      mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString()
      + "] afmd=" + this.getApproxFirstMissingDate(sid);
      if (getBuildDate()>0)
      {
        mds += " missingDelay=" + this.getApproxDelay(sid);
      }
      mds += " missingDelay=" + this.getApproxDelay(sid);
      mds +=" missingCount=" + missingChanges.get(sid);
    }
@@ -304,24 +299,6 @@
  }
  /**
   * Sets the build date of the data.
   * @param buildDate The date.
   */
  public void setBuildDate(long buildDate)
  {
    this.buildDate = buildDate;
  }
  /**
   * Returns the build date of the data.
   * @return The date.
   */
  public long getBuildDate()
  {
    return buildDate;
  }
  /**
   * From a provided state, sets the max CN of the monitor data.
   * @param state the provided state.
   */
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -65,12 +67,14 @@
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
@@ -1094,4 +1098,115 @@
  {
    return replicationPort;
  }
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* The date of the last time they have been elaborated */
  private long monitorDataLastBuildDate = 0;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
  /**
   * Trigger the computation of the Global Monitoring Data.
   * This should be called by all the MonitorProviders that need
   * the global monitoring data to be updated before they can
   * publish their information to cn=monitor.
   *
   * This method will trigger the update of all the global monitoring
   * information of all the base-DNs of this replication Server.
   *
   * @throws DirectoryException If the computation cannot be achieved.
   */
  public void computeMonitorData() throws DirectoryException
  {
    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
      // The current data are still valid. No need to renew them.
      return;
    }
    remoteMonitorResponsesSemaphore.drainPermits();
    int count = 0;
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      count += domain.initializeMonitorData();
    }
    // Wait for responses
    waitMonitorDataResponses(count);
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      domain.completeMonitorData();
    }
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * @throws DirectoryException When an error occurs.
   */
  private void waitMonitorDataResponses(int expectedResponses)
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " baseDn=" +
          " waiting for " + expectedResponses + " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
      } else
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + getMonitorInstanceName() + " baseDn=" +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
      }
    } catch (Exception e)
    {
      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
    }
  }
  /**
   * This should be called by each ReplicationServerDomain that receives
   * a response to a monitor request message.
   */
  public void responseReceived()
  {
    remoteMonitorResponsesSemaphore.release();
  }
  /**
   * This should be called when the Monitoring has failed and the
   * Worker thread that is waiting for the result should be awaken.
   */
  public void responseReceivedAll()
  {
    remoteMonitorResponsesSemaphore.notifyAll();
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
@@ -65,7 +64,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
import java.util.Timer;
import java.util.TimerTask;
@@ -146,20 +144,12 @@
  /* Monitor data management */
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore;
  /**
   * The monitor data consolidated over the topology.
   */
  private MonitorData monitorData = new MonitorData();
  private MonitorData wrkMonitorData;
  private Object monitorDataLock = new Object();
  /**
   * The needed info for each received assured update message we are waiting
@@ -2255,23 +2245,30 @@
  synchronized protected MonitorData computeMonitorData()
    throws DirectoryException
  {
    if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime())
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
      // The current data are still valid. No need to renew them.
      return monitorData;
    }
    // Update the monitorData of all domains if this was necessary.
    replicationServer.computeMonitorData();
    return monitorData;
  }
    wrkMonitorData = new MonitorData();
    synchronized (wrkMonitorData)
  /**
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   *
   * @return The number of response that should come back.
   *
   * @throws DirectoryException In case the monitoring information could
   *                            not be collected.
   */
  int initializeMonitorData() throws DirectoryException
  {
    synchronized (monitorDataLock)
    {
      wrkMonitorData = new MonitorData();
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " Computing monitor data ");
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Computing monitor data ");
      // Let's process our directly connected LSes
      // - in the ServerHandler for a given LS1, the stored state contains :
@@ -2299,7 +2296,7 @@
        wrkMonitorData.setMaxCN(serverID, maxcn);
        wrkMonitorData.setLDAPServerState(serverID, directlshState);
        wrkMonitorData.setFirstMissingDate(serverID,
          directlsh.getApproxFirstMissingDate());
            directlsh.getApproxFirstMissingDate());
      }
      // Then initialize the max CN for the LS that produced something
@@ -2319,44 +2316,35 @@
      // and we need the remote ones.
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " Local monitor data: " +
          wrkMonitorData.toString());
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Local monitor data: " +
            wrkMonitorData.toString());
    }
    // Send Request to the other Replication Servers
    if (remoteMonitorResponsesSemaphore == null)
    {
      remoteMonitorResponsesSemaphore = new Semaphore(0);
      short requestCnt = sendMonitorDataRequest();
      // Wait reponses from them or timeout
      waitMonitorDataResponses(requestCnt);
    } else
    {
      // The processing of renewing the monitor cache is already running
      // We'll make it sleeping until the end
      // TODO: unit test for this case.
      while (remoteMonitorResponsesSemaphore != null)
      {
        waitMonitorDataResponses(1);
      }
    }
    // Send the request for remote monitor data to the
    return sendMonitorDataRequest();
  }
  /**
   * Complete all the calculation when all monitoring information
   * has been received.
   */
  void completeMonitorData()
  {
    wrkMonitorData.completeComputing();
    // Store the new computed data as the reference
    synchronized (monitorData)
    synchronized (monitorDataLock)
    {
      // Now we have the expected answers or an error occurred
      monitorData = wrkMonitorData;
      wrkMonitorData = null;
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " *** Computed MonitorData: " +
          monitorData.toString());
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " *** Computed MonitorData: " +
            monitorData.toString());
    }
    return monitorData;
  }
  /**
@@ -2364,10 +2352,10 @@
   * @return the number of requests sent.
   * @throws DirectoryException when a problem occurs.
   */
  protected short sendMonitorDataRequest()
  protected int sendMonitorDataRequest()
    throws DirectoryException
  {
    short sent = 0;
    int sent = 0;
    try
    {
      for (ServerHandler rs : replicationServers.values())
@@ -2389,49 +2377,6 @@
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * @throws DirectoryException When an error occurs.
   */
  protected void waitMonitorDataResponses(int expectedResponses)
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn +
          " waiting for " + expectedResponses + " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      {
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
      } else
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
      }
    } catch (Exception e)
    {
      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
    } finally
    {
      remoteMonitorResponsesSemaphore = null;
    }
  }
  /**
   * Processes a Monitor message receives from a remote Replication Server
   * and stores the data received.
   *
@@ -2442,23 +2387,20 @@
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        "Receiving " + msg + " from " + msg.getsenderID() +
        remoteMonitorResponsesSemaphore);
    if (remoteMonitorResponsesSemaphore == null)
    {
      // Let's ignore the remote monitor data just received
      // since the computing processing has been ended.
      // An error - probably a timemout - occurred that was already logged
      logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
        Short.toString(msg.getsenderID())));
      return;
    }
        "Receiving " + msg + " from " + msg.getsenderID());
    try
    {
      synchronized (wrkMonitorData)
      synchronized (monitorDataLock)
      {
        if (wrkMonitorData == null)
        {
          // This is a response for an earlier request whose computing is
          // already complete.
          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
                      Short.toString(msg.getsenderID())));
          return;
        }
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN across the RSes
        ServerState replServerState = msg.getReplServerDbState();
@@ -2523,7 +2465,7 @@
      // Decreases the number of expected responses and potentially
      // wakes up the waiting requestor thread.
      remoteMonitorResponsesSemaphore.release();
      replicationServer.responseReceived();
    } catch (Exception e)
    {
@@ -2532,7 +2474,7 @@
      // If an exception occurs while processing one of the expected message,
      // the processing is aborted and the waiting thread is awoke.
      remoteMonitorResponsesSemaphore.notifyAll();
      replicationServer.responseReceivedAll();
    }
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -2064,6 +2064,18 @@
        long delay = md.getApproxDelay(serverId);
        attributes.add(Attributes.create("approximate-delay", String
            .valueOf(delay)));
        /* get the Server State */
        AttributeBuilder builder = new AttributeBuilder("server-state");
        ServerState state = md.getLDAPServerState(serverId);
        if (state != null)
        {
          for (String str : state.toStringSet())
          {
            builder.add(str);
          }
          attributes.add(builder.toAttribute());
        }
      }
      else
      {
@@ -2071,6 +2083,18 @@
        long missingChanges = md.getMissingChangesRS(serverId);
        attributes.add(Attributes.create("missing-changes", String
            .valueOf(missingChanges)));
        /* get the Server State */
        AttributeBuilder builder = new AttributeBuilder("server-state");
        ServerState state = md.getRSStates(serverId);
        if (state != null)
        {
          for (String str : state.toStringSet())
          {
            builder.add(str);
          }
          attributes.add(builder.toAttribute());
        }
      }
    }
    catch (Exception e)
@@ -2131,14 +2155,6 @@
    attributes.add(Attributes.create("current-rcv-window", String
        .valueOf(rcvWindow)));
    /* get the Server State */
    AttributeBuilder builder = new AttributeBuilder("server-state");
    for (String str : serverState.toStringSet())
    {
      builder.add(str);
    }
    attributes.add(builder.toAttribute());
    // Encryption
    attributes.add(Attributes.create("ssl-encryption", String
        .valueOf(session.isEncrypted())));
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -175,7 +175,7 @@
      ServerState state2 = states1.get(domain2ServerId);
      assertNotNull(state2, "getReplicaStates is not showing DS2");
      Map<Short, ServerState> states2 = domain1.getReplicaStates();
      Map<Short, ServerState> states2 = domain2.getReplicaStates();
      ServerState state1 = states2.get(domain1ServerId);
      assertNotNull(state1, "getReplicaStates is not showing DS1");