From 4d0faf5b8ad46e978a72d35a8f736f83fb61fd2d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 30 Mar 2011 19:21:16 +0000
Subject: [PATCH] Fix issue OpenDJ-96: Replication server monitor data computation takes too long / blocks rest of server when another RS is cannot be reached

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  655 ++++++++++++++++++++++++++++++-----------------------------
 1 files changed, 336 insertions(+), 319 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 92d4801..4f60457 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -35,17 +35,10 @@
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -57,39 +50,12 @@
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
-import org.opends.server.replication.protocol.MonitorMsg;
-import org.opends.server.replication.protocol.MonitorRequestMsg;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeBuilder;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.ResultCode;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
+import org.opends.server.types.*;
+import org.opends.server.util.TimeThread;
 
 import com.sleepycat.je.DatabaseException;
-import org.opends.server.replication.server.
-  ReplicationServer.GlobalServerId;
 
 /**
  * This class define an in-memory cache that will be used to store
@@ -165,9 +131,47 @@
   /**
    * The monitor data consolidated over the topology.
    */
-  private MonitorData monitorData = new MonitorData();
-  private MonitorData wrkMonitorData;
-  private final Object monitorDataLock = new Object();
+  private volatile MonitorData monitorData = new MonitorData();
+
+  // This lock guards against multiple concurrent monitor data recalculation.
+  private final Object pendingMonitorLock = new Object();
+
+  // Guarded by pendingMonitorLock.
+  private long monitorDataLastBuildDate = 0;
+
+  // The set of replication servers which are already known to be slow to send
+  // monitor data.
+  //
+  // Guarded by pendingMonitorLock.
+  private Set<Integer> monitorDataLateServers = new HashSet<Integer>();
+
+  // This lock serializes updates to the pending monitor data.
+  private final Object pendingMonitorDataLock = new Object();
+
+  // Monitor data which is currently being calculated.
+  //
+  // Guarded by pendingMonitorDataLock.
+  private MonitorData pendingMonitorData;
+
+  // A set containing the IDs of servers from which we are currently expecting
+  // monitor responses. When a response is received from a server we remove the
+  // ID from this table, and count down the latch if the ID was in the table.
+  //
+  // Guarded by pendingMonitorDataLock.
+  private final Set<Integer> pendingMonitorDataServerIDs =
+    new HashSet<Integer>();
+
+  // This latch is non-null and is used in order to count incoming responses as
+  // they arrive. Since incoming response may arrive at any time, even when
+  // there is no pending monitor request, access to the latch must be guarded.
+  //
+  // Guarded by pendingMonitorDataLock.
+  private CountDownLatch pendingMonitorDataLatch = null;
+
+  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+  private final long monitorDataLifeTime = 500;
+
+
 
   /**
    * The needed info for each received assured update message we are waiting
@@ -188,7 +192,7 @@
   // every n number of treated assured messages
   private int assuredTimeoutTimerPurgeCounter = 0;
 
-  ServerState ctHeartbeatState = null;
+  private ServerState ctHeartbeatState = null;
 
   /**
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -1207,7 +1211,7 @@
    * domain.
    * @param handler the provided handler to unregister.
    */
-  protected void unregisterServerHandler(ServerHandler handler)
+  private void unregisterServerHandler(ServerHandler handler)
   {
     if (handler.isReplicationServer())
     {
@@ -1228,7 +1232,7 @@
    * - traverse replicationServers list and test for each if DS are connected
    * So it strongly relies on the directoryServers list
    */
-  protected void mayResetGenerationId()
+  private void mayResetGenerationId()
   {
     if (debugEnabled())
       TRACER.debugInfo(
@@ -1523,7 +1527,7 @@
    * @param senderHandler The handler of the server that published this message.
    * @return The list of destination handlers.
    */
-  protected List<ServerHandler> getDestinationServers(RoutableMsg msg,
+  private List<ServerHandler> getDestinationServers(RoutableMsg msg,
     ServerHandler senderHandler)
   {
     List<ServerHandler> servers =
@@ -1618,16 +1622,16 @@
         if (senderHandler.isDataServer())
         {
           // Monitoring information requested by a DS
-          MonitorMsg monitorMsg =
-            createGlobalTopologyMonitorMsg(
-                msg.getDestination(), msg.getSenderID(), false);
+          MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+              msg.getDestination(), msg.getSenderID(), monitorData);
 
-           if (monitorMsg != null)
+          if (monitorMsg != null)
           {
             try
             {
               senderHandler.send(monitorMsg);
-            } catch (IOException e)
+            }
+            catch (IOException e)
             {
               // the connection was closed.
             }
@@ -1656,12 +1660,8 @@
         }
       } else if (msg instanceof MonitorMsg)
       {
-        MonitorMsg monitorMsg =
-          (MonitorMsg) msg;
-
-        GlobalServerId globalServerId =
-          new GlobalServerId(baseDn, senderHandler.getServerId());
-        receivesMonitorDataResponse(monitorMsg, globalServerId);
+        MonitorMsg monitorMsg = (MonitorMsg) msg;
+        receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId());
       } else
       {
         logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1758,61 +1758,51 @@
 
   }
 
+
+
   /**
    * Creates a new monitor message including monitoring information for the
    * whole topology.
-   * @param sender The sender of this message.
-   * @param destination The destination of this message.
-   * @param  updateMonitorData A boolean indicating if the monitor data should
-   *                           be updated. If false the last monitoring data
-   *                           that was computed will be returned. This is
-   *                           acceptable for most cases because the monitoring
-   *                           thread computes the monitoring data frequently.
-   *                           If true is used the calling thread may be
-   *                           blocked for a while.
+   *
+   * @param sender
+   *          The sender of this message.
+   * @param destination
+   *          The destination of this message.
+   * @param monitorData
+   *          The domain monitor data which should be used for the message.
    * @return The newly created and filled MonitorMsg. Null if a problem occurred
-   * during message creation.
+   *         during message creation.
    */
   public MonitorMsg createGlobalTopologyMonitorMsg(
-      int sender, int destination, boolean updateMonitorData)
+      int sender, int destination, MonitorData monitorData)
   {
     MonitorMsg returnMsg =
       new MonitorMsg(sender, destination);
 
-    try
-    {
-      returnMsg.setReplServerDbState(getDbServerState());
-      // Update the information we have about all servers
-      // in the topology.
-      MonitorData md = computeMonitorData(updateMonitorData);
+    returnMsg.setReplServerDbState(getDbServerState());
 
-      // Add the informations about the Replicas currently in
-      // the topology.
-      Iterator<Integer> it = md.ldapIterator();
-      while (it.hasNext())
-      {
-        int replicaId = it.next();
-        returnMsg.setServerState(
-            replicaId, md.getLDAPServerState(replicaId),
-            md.getApproxFirstMissingDate(replicaId), true);
-      }
-
-      // Add the informations about the Replication Servers
-      // currently in the topology.
-      it = md.rsIterator();
-      while (it.hasNext())
-      {
-        int replicaId = it.next();
-        returnMsg.setServerState(
-            replicaId, md.getRSStates(replicaId),
-            md.getRSApproxFirstMissingDate(replicaId), false);
-      }
-    }
-    catch (DirectoryException e)
+    // Add the informations about the Replicas currently in
+    // the topology.
+    Iterator<Integer> it = monitorData.ldapIterator();
+    while (it.hasNext())
     {
-      // If we can't compute the Monitor Information, send
-      // back an empty message.
+      int replicaId = it.next();
+      returnMsg.setServerState(replicaId,
+          monitorData.getLDAPServerState(replicaId),
+          monitorData.getApproxFirstMissingDate(replicaId), true);
     }
+
+    // Add the informations about the Replication Servers
+    // currently in the topology.
+    it = monitorData.rsIterator();
+    while (it.hasNext())
+    {
+      int replicaId = it.next();
+      returnMsg.setServerState(replicaId,
+          monitorData.getRSStates(replicaId),
+          monitorData.getRSApproxFirstMissingDate(replicaId), false);
+    }
+
     return returnMsg;
   }
 
@@ -2559,192 +2549,233 @@
    * Monitor Data generation
    * =======================
    */
-  /**
-   * Retrieves the global monitor data.
-   * @param  updateMonitorData A boolean indicating if the monitor data should
-   *                           be updated. If false the last monitoring data
-   *                           that was computed will be returned. This is
-   *                           acceptable for most cases because the monitoring
-   *                           thread computes the monitoring data frequently.
-   *                           If true is used the calling thread may be
-   *                           blocked for a while.
-   * @return The monitor data.
-   * @throws DirectoryException When an error occurs.
-   */
-  protected MonitorData computeMonitorData(boolean updateMonitorData)
-    throws DirectoryException
-  {
-    synchronized (monitoringLock)
-    {
-      if (updateMonitorData)
-      {
-        // Update the monitorData of ALL domains if this was necessary.
-        replicationServer.computeMonitorData();
-      }
 
-      // Returns the monitorData of THIS domain
-      return monitorData;
-    }
+  /**
+   * Returns the latest monitor data available for this replication server
+   * domain.
+   *
+   * @return The latest monitor data available for this replication server
+   *         domain, which is never {@code null}.
+   */
+  MonitorData getDomainMonitorData()
+  {
+    return monitorData;
   }
 
+
+
+  /**
+   * Recomputes the monitor data for this replication server domain.
+   *
+   * @return The recomputed monitor data for this replication server domain.
+   * @throws InterruptedException
+   *           If this thread is interrupted while waiting for a response.
+   */
+  MonitorData computeDomainMonitorData() throws InterruptedException
+  {
+    // Only allow monitor recalculation at a time.
+    synchronized (pendingMonitorLock)
+    {
+      if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
+          .getTime())
+      {
+        try
+        {
+          // Prevent out of band monitor responses from updating our pending
+          // table until we are ready.
+          synchronized (pendingMonitorDataLock)
+          {
+            // Clear the pending monitor data.
+            pendingMonitorDataServerIDs.clear();
+            pendingMonitorData = new MonitorData();
+
+            // Initialize the monitor data.
+            initializePendingMonitorData();
+
+            // Send the monitor requests to the connected replication servers.
+            for (ReplicationServerHandler rs : replicationServers.values())
+            {
+              // Add server ID to pending table.
+              int serverId = rs.getServerId();
+
+              MonitorRequestMsg msg = new MonitorRequestMsg(
+                  this.replicationServer.getServerId(), serverId);
+              try
+              {
+                rs.send(msg);
+
+                // Only register this server ID if we were able to send the
+                // message.
+                pendingMonitorDataServerIDs.add(serverId);
+              }
+              catch (IOException e)
+              {
+                // Log a message and do a best effort from here.
+                Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST
+                    .get(baseDn, serverId, e.getMessage());
+                logError(message);
+              }
+            }
+
+            // Create the pending response latch based on the number of expected
+            // monitor responses.
+            pendingMonitorDataLatch = new CountDownLatch(
+                pendingMonitorDataServerIDs.size());
+          }
+
+          // Wait for the responses to come back.
+          pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
+
+          // Log messages for replication servers that have gone or come back.
+          synchronized (pendingMonitorDataLock)
+          {
+            // Log servers that have come back.
+            for (int serverId : monitorDataLateServers)
+            {
+              // Ensure that we only log once per server: don't fill the
+              // error log with repeated messages.
+              if (!pendingMonitorDataServerIDs.contains(serverId))
+              {
+                logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn,
+                    serverId));
+              }
+            }
+
+            // Log servers that have gone away.
+            for (int serverId : pendingMonitorDataServerIDs)
+            {
+              // Ensure that we only log once per server: don't fill the
+              // error log with repeated messages.
+              if (!monitorDataLateServers.contains(serverId))
+              {
+                logError(ERR_MISSING_REMOTE_MONITOR_DATA.get(baseDn,
+                    serverId));
+              }
+            }
+
+            // Remember which servers were late this time.
+            monitorDataLateServers.clear();
+            monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
+          }
+
+          // Store the new computed data as the reference
+          synchronized (pendingMonitorDataLock)
+          {
+            // Now we have the expected answers or an error occurred
+            pendingMonitorData.completeComputing();
+            monitorData = pendingMonitorData;
+            monitorDataLastBuildDate = TimeThread.getTime();
+          }
+        }
+        finally
+        {
+          synchronized (pendingMonitorDataLock)
+          {
+            // Clear pending state.
+            pendingMonitorData = null;
+            pendingMonitorDataLatch = null;
+            pendingMonitorDataServerIDs.clear();
+          }
+        }
+      }
+    }
+
+    return monitorData;
+  }
+
+
+
   /**
    * Start collecting global monitoring information for this
    * ReplicationServerDomain.
    *
-   * @param expectedMonitoringMsg The list of server handler we have to wait a
-   * monitoring message from. Will be filled as necessary by this method.
+   * @throws DirectoryException
+   *           In case the monitoring information could not be collected.
+   */
+
+  private void initializePendingMonitorData()
+  {
+    // Let's process our directly connected LSes
+    // - in the ServerHandler for a given LS1, the stored state contains :
+    // - the max CN produced by LS1
+    // - the last CN consumed by LS1 from LS2..n
+    // - in the RSdomain/dbHandler, the built-in state contains :
+    // - the max CN produced by each server
+    // So for a given LS connected we can take the state and the max from
+    // the LS/state.
+
+    for (ServerHandler directlsh : directoryServers.values())
+    {
+      int serverID = directlsh.getServerId();
+
+      // the state comes from the state stored in the SH
+      ServerState directlshState = directlsh.getServerState()
+          .duplicate();
+
+      // the max CN sent by that LS also comes from the SH
+      ChangeNumber maxcn = directlshState
+          .getMaxChangeNumber(serverID);
+      if (maxcn == null)
+      {
+        // This directly connected LS has never produced any change
+        maxcn = new ChangeNumber(0, 0, serverID);
+      }
+      pendingMonitorData.setMaxCN(serverID, maxcn);
+      pendingMonitorData.setLDAPServerState(serverID, directlshState);
+      pendingMonitorData.setFirstMissingDate(serverID,
+          directlsh.getApproxFirstMissingDate());
+    }
+
+    // Then initialize the max CN for the LS that produced something
+    // - from our own local db state
+    // - whatever they are directly or indirectly connected
+    ServerState dbServerState = getDbServerState();
+    pendingMonitorData.setRSState(replicationServer.getServerId(),
+        dbServerState);
+    Iterator<Integer> it = dbServerState.iterator();
+    while (it.hasNext())
+    {
+      int sid = it.next();
+      ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
+      pendingMonitorData.setMaxCN(sid, storedCN);
+    }
+  }
+
+
+
+  /**
+   * Processes a Monitor message receives from a remote Replication Server and
+   * stores the data received.
    *
-   * @throws DirectoryException In case the monitoring information could
-   *                            not be collected.
-   */
-
-  void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
-    throws DirectoryException
-  {
-    synchronized (monitorDataLock)
-    {
-      wrkMonitorData = new MonitorData();
-      if (debugEnabled())
-        TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDn=" + baseDn + " Computing monitor data ");
-
-      // Let's process our directly connected LSes
-      // - in the ServerHandler for a given LS1, the stored state contains :
-      //   - the max CN produced by LS1
-      //   - the last CN consumed by LS1 from LS2..n
-      // - in the RSdomain/dbHandler, the built-in state contains :
-      //   - the max CN produced by each server
-      // So for a given LS connected we can take the state and the max from
-      // the LS/state.
-
-      for (ServerHandler directlsh : directoryServers.values())
-      {
-        int serverID = directlsh.getServerId();
-
-        // the state comes from the state stored in the SH
-        ServerState directlshState = directlsh.getServerState().duplicate();
-
-        // the max CN sent by that LS also comes from the SH
-        ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
-        if (maxcn == null)
-        {
-          // This directly connected LS has never produced any change
-          maxcn = new ChangeNumber(0, 0, serverID);
-        }
-        wrkMonitorData.setMaxCN(serverID, maxcn);
-        wrkMonitorData.setLDAPServerState(serverID, directlshState);
-        wrkMonitorData.setFirstMissingDate(serverID,
-            directlsh.getApproxFirstMissingDate());
-      }
-
-      // Then initialize the max CN for the LS that produced something
-      // - from our own local db state
-      // - whatever they are directly or indirectly connected
-      ServerState dbServerState = getDbServerState();
-      wrkMonitorData.setRSState(replicationServer.getServerId(), dbServerState);
-      Iterator<Integer> it = dbServerState.iterator();
-      while (it.hasNext())
-      {
-        int sid = it.next();
-        ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
-        wrkMonitorData.setMaxCN(sid, storedCN);
-      }
-
-      // Now we have used all available local informations
-      // and we need the remote ones.
-      if (debugEnabled())
-        TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDn=" + baseDn + " Local monitor data: " +
-            wrkMonitorData.toString());
-    }
-
-    // Send the request for remote monitor data to the
-    sendMonitorDataRequest(expectedMonitoringMsg);
-  }
-
-  /**
-   * Complete all the calculation when all monitoring information
-   * has been received.
-   */
-  void completeMonitorData()
-  {
-    // Store the new computed data as the reference
-    synchronized (monitorDataLock)
-    {
-      // Now we have the expected answers or an error occurred
-      wrkMonitorData.completeComputing();
-      monitorData = wrkMonitorData;
-      wrkMonitorData = null;
-      if (debugEnabled())
-        TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDn=" + baseDn + " *** Computed MonitorData: " +
-            monitorData.toString());
-    }
-  }
-
-  /**
-   * Sends a MonitorRequest message to all connected RS.
-   * @param expectedMonitoringMsg The list of server handler we have to wait a
-   * monitoring message from. Will be filled as necessary by this method.
-   * @throws DirectoryException when a problem occurs.
-   */
-  protected void sendMonitorDataRequest(
-    List<GlobalServerId> expectedMonitoringMsg)
-    throws DirectoryException
-  {
-    try
-    {
-      for (ServerHandler rs : replicationServers.values())
-      {
-        int serverId = rs.getServerId();
-        // Store the fact that we expect a MonitoringMsg back from this server
-        expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
-        MonitorRequestMsg msg =
-          new MonitorRequestMsg(this.replicationServer.getServerId(),
-          serverId);
-        rs.send(msg);
-      }
-    } catch (Exception e)
-    {
-      Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
-      logError(message);
-      throw new DirectoryException(ResultCode.OTHER,
-        message, e);
-    }
-  }
-
-  /**
-   * Processes a Monitor message receives from a remote Replication Server
-   * and stores the data received.
-   *
-   * @param msg The message to be processed.
-   * @param globalServerHandlerId server handler that is receiving the message.
+   * @param msg
+   *          The message to be processed.
+   * @param globalServerHandlerId
+   *          server handler that is receiving the message.
    */
   private void receivesMonitorDataResponse(MonitorMsg msg,
-    GlobalServerId globalServerId)
+      int serverId)
   {
-    try
+    synchronized (pendingMonitorDataLock)
     {
-      synchronized (monitorDataLock)
+      if (pendingMonitorData == null)
       {
-        if (wrkMonitorData == null)
-        {
-          // This is a response for an earlier request whose computing is
-          // already complete.
-          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
-              Integer.toString(msg.getSenderID())));
-          return;
-        }
+        // This is a response for an earlier request whose computing is
+        // already complete.
+        logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn,
+            msg.getSenderID()));
+        return;
+      }
+
+      try
+      {
         // Here is the RS state : list <serverID, lastChangeNumber>
         // For each LDAP Server, we keep the max CN across the RSes
         ServerState replServerState = msg.getReplServerDbState();
-        wrkMonitorData.setMaxCNs(replServerState);
+        pendingMonitorData.setMaxCNs(replServerState);
 
         // store the remote RS states.
-        wrkMonitorData.setRSState(msg.getSenderID(), replServerState);
+        pendingMonitorData.setRSState(msg.getSenderID(),
+            replServerState);
 
         // Store the remote LDAP servers states
         Iterator<Integer> lsidIterator = msg.ldapIterator();
@@ -2752,10 +2783,10 @@
         {
           int sid = lsidIterator.next();
           ServerState dsServerState = msg.getLDAPServerState(sid);
-          wrkMonitorData.setMaxCNs(dsServerState);
-          wrkMonitorData.setLDAPServerState(sid, dsServerState);
-          wrkMonitorData.setFirstMissingDate(sid,
-            msg.getLDAPApproxFirstMissingDate(sid));
+          pendingMonitorData.setMaxCNs(dsServerState);
+          pendingMonitorData.setLDAPServerState(sid, dsServerState);
+          pendingMonitorData.setFirstMissingDate(sid,
+              msg.getLDAPApproxFirstMissingDate(sid));
         }
 
         // Process the latency reported by the remote RSi on its connections
@@ -2768,50 +2799,49 @@
           {
             // this is the latency of the remote RSi regarding the current RS
             // let's update the fmd of my connected LS
-            for (ServerHandler connectedlsh : directoryServers.values())
+            for (ServerHandler connectedlsh : directoryServers
+                .values())
             {
               int connectedlsid = connectedlsh.getServerId();
               Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
-              wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+              pendingMonitorData.setFirstMissingDate(connectedlsid,
+                  newfmd);
             }
-          } else
+          }
+          else
           {
             // this is the latency of the remote RSi regarding another RSj
             // let's update the latency of the LSes connected to RSj
-            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
+            ReplicationServerHandler rsjHdr = replicationServers
+                .get(rsid);
             if (rsjHdr != null)
             {
-              for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
+              for (int remotelsid : rsjHdr
+                  .getConnectedDirectoryServerIds())
               {
                 Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
-                wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
+                pendingMonitorData.setFirstMissingDate(remotelsid,
+                    newfmd);
               }
             }
           }
         }
-        if (debugEnabled())
+      }
+      catch (RuntimeException e)
+      {
+        // FIXME: do we really expect these???
+        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e
+            .getMessage() + stackTraceToSingleLineString(e)));
+      }
+      finally
+      {
+        // Decreases the number of expected responses and potentially
+        // wakes up the waiting requestor thread.
+        if (pendingMonitorDataServerIDs.remove(serverId))
         {
-          if (debugEnabled())
-            TRACER.debugInfo(
-              "In " + this +
-              " baseDn=" + baseDn +
-              " Processed msg from " + msg.getSenderID() +
-              " New monitor data: " + wrkMonitorData.toString());
+          pendingMonitorDataLatch.countDown();
         }
       }
-
-      // Decreases the number of expected responses and potentially
-      // wakes up the waiting requestor thread.
-      replicationServer.responseReceived(globalServerId);
-
-    } catch (Exception e)
-    {
-      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
-        stackTraceToSingleLineString(e)));
-
-      // If an exception occurs while processing one of the expected message,
-      // the processing is aborted and the waiting thread is awoke.
-      replicationServer.responseReceivedAll();
     }
   }
 
@@ -2846,6 +2876,8 @@
   {
     return replicationServers;
   }
+
+
   /**
    * A synchronization mechanism is created to insure exclusive access to the
    * domain. The goal is to have a consistent view of the topology by locking
@@ -2868,11 +2900,6 @@
   private ReentrantLock lock = new ReentrantLock();
 
   /**
-   * This lock is used to protect the monitoring computing.
-   */
-  private final Object monitoringLock = new Object();
-
-  /**
    * This lock is used to protect the generationid variable.
    */
   private final Object generationIDLock = new Object();
@@ -3073,23 +3100,13 @@
     builder.add(baseDn.toString() + " " + generationId);
     attributes.add(builder.toAttribute());
 
-    try
-    {
-      MonitorData md = computeMonitorData(true);
+    MonitorData md = getDomainMonitorData();
 
-      // Missing changes
-      long missingChanges =
-        md.getMissingChangesRS(replicationServer.getServerId());
-      attributes.add(Attributes.create("missing-changes", String.valueOf(
-        missingChanges)));
-    }
-    catch (Exception e)
-    {
-      Message message =
-        ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e));
-      // We failed retrieving the monitor data.
-      attributes.add(Attributes.create("error", message.toString()));
-    }
+    // Missing changes
+    long missingChanges = md.getMissingChangesRS(replicationServer
+        .getServerId());
+    attributes.add(Attributes.create("missing-changes",
+        String.valueOf(missingChanges)));
 
     return attributes;
   }

--
Gitblit v1.10.0