mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
22.22.2009 36191b70a96c298ad07cf9a9384cc42764ea957e
The replication publish information about the whole topology in cn=monitor
When cn=monitor is searched, the replication therefore asks informations
about the replication state to all Replication Servers. This should always be
fast unless a server is hanged. In such case the replication waits for
5 seconds then issue an error message and goes on with the
information it has received at this time.

In some cases however it looks like that some server do not send back the
responses fast enough thus provoking errors in the logs.
I was not able to produce this problem but discovered some inefficiency in the
replication monitoring code that can lead to query several time the other servers
and therefore maybe producing the problem.

This change therefore improve the monitoring code so that we only wait once if
a server is very slow to answer.
5 files modified
342 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/MonitorData.java 31 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 115 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 162 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 32 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -62,8 +62,6 @@
   *   date of the first missing change.
   */
  /* The date of the last time they have been elaborated */
  private long buildDate = 0;
  // For each LDAP server, its server state
  private ConcurrentHashMap<Short, ServerState> LDAPStates =
@@ -103,7 +101,7 @@
  {
    Long afmd = fmd.get(serverId);
    if ((afmd != null) && (afmd>0))
      return ((this.getBuildDate() - afmd)/1000);
      return (TimeThread.getTime() - afmd)/1000;
    else
      return 0;
  }
@@ -243,7 +241,6 @@
        TRACER.debugInfo(
          "Complete monitor data : Missing changes ("+ lsiSid +")=" + mds);
    }
    this.setBuildDate(TimeThread.getTime());
    }
  /**
@@ -255,7 +252,6 @@
  {
    String mds = "Monitor data=\n";
    mds+= "Build date=" + this.getBuildDate();
    // RS data
    Iterator<Short> rsite = fmRSDate.keySet().iterator();
    while (rsite.hasNext())
@@ -281,10 +277,9 @@
      ServerState ss = LDAPStates.get(sid);
      mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString()
      + "] afmd=" + this.getApproxFirstMissingDate(sid);
      if (getBuildDate()>0)
      {
        mds += " missingDelay=" + this.getApproxDelay(sid);
      }
      mds += " missingDelay=" + this.getApproxDelay(sid);
      mds +=" missingCount=" + missingChanges.get(sid);
    }
@@ -304,24 +299,6 @@
  }
  /**
   * Sets the build date of the data.
   * @param buildDate The date.
   */
  public void setBuildDate(long buildDate)
  {
    this.buildDate = buildDate;
  }
  /**
   * Returns the build date of the data.
   * @return The date.
   */
  public long getBuildDate()
  {
    return buildDate;
  }
  /**
   * From a provided state, sets the max CN of the monitor data.
   * @param state the provided state.
   */
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -65,12 +67,14 @@
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
@@ -1094,4 +1098,115 @@
  {
    return replicationPort;
  }
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* The date of the last time they have been elaborated */
  private long monitorDataLastBuildDate = 0;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
  /**
   * Trigger the computation of the Global Monitoring Data.
   * This should be called by all the MonitorProviders that need
   * the global monitoring data to be updated before they can
   * publish their information to cn=monitor.
   *
   * This method will trigger the update of all the global monitoring
   * information of all the base-DNs of this replication Server.
   *
   * @throws DirectoryException If the computation cannot be achieved.
   */
  public void computeMonitorData() throws DirectoryException
  {
    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
      // The current data are still valid. No need to renew them.
      return;
    }
    remoteMonitorResponsesSemaphore.drainPermits();
    int count = 0;
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      count += domain.initializeMonitorData();
    }
    // Wait for responses
    waitMonitorDataResponses(count);
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      domain.completeMonitorData();
    }
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * @throws DirectoryException When an error occurs.
   */
  private void waitMonitorDataResponses(int expectedResponses)
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " baseDn=" +
          " waiting for " + expectedResponses + " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
      } else
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + getMonitorInstanceName() + " baseDn=" +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
      }
    } catch (Exception e)
    {
      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
    }
  }
  /**
   * This should be called by each ReplicationServerDomain that receives
   * a response to a monitor request message.
   */
  public void responseReceived()
  {
    remoteMonitorResponsesSemaphore.release();
  }
  /**
   * This should be called when the Monitoring has failed and the
   * Worker thread that is waiting for the result should be awaken.
   */
  public void responseReceivedAll()
  {
    remoteMonitorResponsesSemaphore.notifyAll();
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
@@ -65,7 +64,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
import java.util.Timer;
import java.util.TimerTask;
@@ -146,20 +144,12 @@
  /* Monitor data management */
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore;
  /**
   * The monitor data consolidated over the topology.
   */
  private MonitorData monitorData = new MonitorData();
  private MonitorData wrkMonitorData;
  private Object monitorDataLock = new Object();
  /**
   * The needed info for each received assured update message we are waiting
@@ -2255,23 +2245,30 @@
  synchronized protected MonitorData computeMonitorData()
    throws DirectoryException
  {
    if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime())
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
      // The current data are still valid. No need to renew them.
      return monitorData;
    }
    // Update the monitorData of all domains if this was necessary.
    replicationServer.computeMonitorData();
    return monitorData;
  }
    wrkMonitorData = new MonitorData();
    synchronized (wrkMonitorData)
  /**
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   *
   * @return The number of response that should come back.
   *
   * @throws DirectoryException In case the monitoring information could
   *                            not be collected.
   */
  int initializeMonitorData() throws DirectoryException
  {
    synchronized (monitorDataLock)
    {
      wrkMonitorData = new MonitorData();
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " Computing monitor data ");
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Computing monitor data ");
      // Let's process our directly connected LSes
      // - in the ServerHandler for a given LS1, the stored state contains :
@@ -2299,7 +2296,7 @@
        wrkMonitorData.setMaxCN(serverID, maxcn);
        wrkMonitorData.setLDAPServerState(serverID, directlshState);
        wrkMonitorData.setFirstMissingDate(serverID,
          directlsh.getApproxFirstMissingDate());
            directlsh.getApproxFirstMissingDate());
      }
      // Then initialize the max CN for the LS that produced something
@@ -2319,44 +2316,35 @@
      // and we need the remote ones.
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " Local monitor data: " +
          wrkMonitorData.toString());
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Local monitor data: " +
            wrkMonitorData.toString());
    }
    // Send Request to the other Replication Servers
    if (remoteMonitorResponsesSemaphore == null)
    {
      remoteMonitorResponsesSemaphore = new Semaphore(0);
      short requestCnt = sendMonitorDataRequest();
      // Wait reponses from them or timeout
      waitMonitorDataResponses(requestCnt);
    } else
    {
      // The processing of renewing the monitor cache is already running
      // We'll make it sleeping until the end
      // TODO: unit test for this case.
      while (remoteMonitorResponsesSemaphore != null)
      {
        waitMonitorDataResponses(1);
      }
    }
    // Send the request for remote monitor data to the
    return sendMonitorDataRequest();
  }
  /**
   * Complete all the calculation when all monitoring information
   * has been received.
   */
  void completeMonitorData()
  {
    wrkMonitorData.completeComputing();
    // Store the new computed data as the reference
    synchronized (monitorData)
    synchronized (monitorDataLock)
    {
      // Now we have the expected answers or an error occurred
      monitorData = wrkMonitorData;
      wrkMonitorData = null;
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " *** Computed MonitorData: " +
          monitorData.toString());
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " *** Computed MonitorData: " +
            monitorData.toString());
    }
    return monitorData;
  }
  /**
@@ -2364,10 +2352,10 @@
   * @return the number of requests sent.
   * @throws DirectoryException when a problem occurs.
   */
  protected short sendMonitorDataRequest()
  protected int sendMonitorDataRequest()
    throws DirectoryException
  {
    short sent = 0;
    int sent = 0;
    try
    {
      for (ServerHandler rs : replicationServers.values())
@@ -2389,49 +2377,6 @@
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * @throws DirectoryException When an error occurs.
   */
  protected void waitMonitorDataResponses(int expectedResponses)
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn +
          " waiting for " + expectedResponses + " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      {
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
      } else
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
      }
    } catch (Exception e)
    {
      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
    } finally
    {
      remoteMonitorResponsesSemaphore = null;
    }
  }
  /**
   * Processes a Monitor message receives from a remote Replication Server
   * and stores the data received.
   *
@@ -2442,23 +2387,20 @@
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        "Receiving " + msg + " from " + msg.getsenderID() +
        remoteMonitorResponsesSemaphore);
    if (remoteMonitorResponsesSemaphore == null)
    {
      // Let's ignore the remote monitor data just received
      // since the computing processing has been ended.
      // An error - probably a timemout - occurred that was already logged
      logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
        Short.toString(msg.getsenderID())));
      return;
    }
        "Receiving " + msg + " from " + msg.getsenderID());
    try
    {
      synchronized (wrkMonitorData)
      synchronized (monitorDataLock)
      {
        if (wrkMonitorData == null)
        {
          // This is a response for an earlier request whose computing is
          // already complete.
          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
                      Short.toString(msg.getsenderID())));
          return;
        }
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN across the RSes
        ServerState replServerState = msg.getReplServerDbState();
@@ -2523,7 +2465,7 @@
      // Decreases the number of expected responses and potentially
      // wakes up the waiting requestor thread.
      remoteMonitorResponsesSemaphore.release();
      replicationServer.responseReceived();
    } catch (Exception e)
    {
@@ -2532,7 +2474,7 @@
      // If an exception occurs while processing one of the expected message,
      // the processing is aborted and the waiting thread is awoke.
      remoteMonitorResponsesSemaphore.notifyAll();
      replicationServer.responseReceivedAll();
    }
  }
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -2064,6 +2064,18 @@
        long delay = md.getApproxDelay(serverId);
        attributes.add(Attributes.create("approximate-delay", String
            .valueOf(delay)));
        /* get the Server State */
        AttributeBuilder builder = new AttributeBuilder("server-state");
        ServerState state = md.getLDAPServerState(serverId);
        if (state != null)
        {
          for (String str : state.toStringSet())
          {
            builder.add(str);
          }
          attributes.add(builder.toAttribute());
        }
      }
      else
      {
@@ -2071,6 +2083,18 @@
        long missingChanges = md.getMissingChangesRS(serverId);
        attributes.add(Attributes.create("missing-changes", String
            .valueOf(missingChanges)));
        /* get the Server State */
        AttributeBuilder builder = new AttributeBuilder("server-state");
        ServerState state = md.getRSStates(serverId);
        if (state != null)
        {
          for (String str : state.toStringSet())
          {
            builder.add(str);
          }
          attributes.add(builder.toAttribute());
        }
      }
    }
    catch (Exception e)
@@ -2131,14 +2155,6 @@
    attributes.add(Attributes.create("current-rcv-window", String
        .valueOf(rcvWindow)));
    /* get the Server State */
    AttributeBuilder builder = new AttributeBuilder("server-state");
    for (String str : serverState.toStringSet())
    {
      builder.add(str);
    }
    attributes.add(builder.toAttribute());
    // Encryption
    attributes.add(Attributes.create("ssl-encryption", String
        .valueOf(session.isEncrypted())));
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -175,7 +175,7 @@
      ServerState state2 = states1.get(domain2ServerId);
      assertNotNull(state2, "getReplicaStates is not showing DS2");
      Map<Short, ServerState> states2 = domain1.getReplicaStates();
      Map<Short, ServerState> states2 = domain2.getReplicaStates();
      ServerState state1 = states2.get(domain1ServerId);
      assertNotNull(state1, "getReplicaStates is not showing DS1");