mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
30.21.2011 4d0faf5b8ad46e978a72d35a8f736f83fb61fd2d
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -35,17 +35,10 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -57,39 +50,12 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.*;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
import org.opends.server.replication.server.
  ReplicationServer.GlobalServerId;
/**
 * This class define an in-memory cache that will be used to store
@@ -165,9 +131,47 @@
  /**
   * The monitor data consolidated over the topology.
   */
  private MonitorData monitorData = new MonitorData();
  private MonitorData wrkMonitorData;
  private final Object monitorDataLock = new Object();
  private volatile MonitorData monitorData = new MonitorData();
  // This lock guards against multiple concurrent monitor data recalculation.
  private final Object pendingMonitorLock = new Object();
  // Guarded by pendingMonitorLock.
  private long monitorDataLastBuildDate = 0;
  // The set of replication servers which are already known to be slow to send
  // monitor data.
  //
  // Guarded by pendingMonitorLock.
  private Set<Integer> monitorDataLateServers = new HashSet<Integer>();
  // This lock serializes updates to the pending monitor data.
  private final Object pendingMonitorDataLock = new Object();
  // Monitor data which is currently being calculated.
  //
  // Guarded by pendingMonitorDataLock.
  private MonitorData pendingMonitorData;
  // A set containing the IDs of servers from which we are currently expecting
  // monitor responses. When a response is received from a server we remove the
  // ID from this table, and count down the latch if the ID was in the table.
  //
  // Guarded by pendingMonitorDataLock.
  private final Set<Integer> pendingMonitorDataServerIDs =
    new HashSet<Integer>();
  // This latch is non-null and is used in order to count incoming responses as
  // they arrive. Since incoming response may arrive at any time, even when
  // there is no pending monitor request, access to the latch must be guarded.
  //
  // Guarded by pendingMonitorDataLock.
  private CountDownLatch pendingMonitorDataLatch = null;
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private final long monitorDataLifeTime = 500;
  /**
   * The needed info for each received assured update message we are waiting
@@ -188,7 +192,7 @@
  // every n number of treated assured messages
  private int assuredTimeoutTimerPurgeCounter = 0;
  ServerState ctHeartbeatState = null;
  private ServerState ctHeartbeatState = null;
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -1207,7 +1211,7 @@
   * domain.
   * @param handler the provided handler to unregister.
   */
  protected void unregisterServerHandler(ServerHandler handler)
  private void unregisterServerHandler(ServerHandler handler)
  {
    if (handler.isReplicationServer())
    {
@@ -1228,7 +1232,7 @@
   * - traverse replicationServers list and test for each if DS are connected
   * So it strongly relies on the directoryServers list
   */
  protected void mayResetGenerationId()
  private void mayResetGenerationId()
  {
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1523,7 +1527,7 @@
   * @param senderHandler The handler of the server that published this message.
   * @return The list of destination handlers.
   */
  protected List<ServerHandler> getDestinationServers(RoutableMsg msg,
  private List<ServerHandler> getDestinationServers(RoutableMsg msg,
    ServerHandler senderHandler)
  {
    List<ServerHandler> servers =
@@ -1618,16 +1622,16 @@
        if (senderHandler.isDataServer())
        {
          // Monitoring information requested by a DS
          MonitorMsg monitorMsg =
            createGlobalTopologyMonitorMsg(
                msg.getDestination(), msg.getSenderID(), false);
          MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
              msg.getDestination(), msg.getSenderID(), monitorData);
           if (monitorMsg != null)
          if (monitorMsg != null)
          {
            try
            {
              senderHandler.send(monitorMsg);
            } catch (IOException e)
            }
            catch (IOException e)
            {
              // the connection was closed.
            }
@@ -1656,12 +1660,8 @@
        }
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg =
          (MonitorMsg) msg;
        GlobalServerId globalServerId =
          new GlobalServerId(baseDn, senderHandler.getServerId());
        receivesMonitorDataResponse(monitorMsg, globalServerId);
        MonitorMsg monitorMsg = (MonitorMsg) msg;
        receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId());
      } else
      {
        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1758,61 +1758,51 @@
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * whole topology.
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @param  updateMonitorData A boolean indicating if the monitor data should
   *                           be updated. If false the last monitoring data
   *                           that was computed will be returned. This is
   *                           acceptable for most cases because the monitoring
   *                           thread computes the monitoring data frequently.
   *                           If true is used the calling thread may be
   *                           blocked for a while.
   *
   * @param sender
   *          The sender of this message.
   * @param destination
   *          The destination of this message.
   * @param monitorData
   *          The domain monitor data which should be used for the message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   *         during message creation.
   */
  public MonitorMsg createGlobalTopologyMonitorMsg(
      int sender, int destination, boolean updateMonitorData)
      int sender, int destination, MonitorData monitorData)
  {
    MonitorMsg returnMsg =
      new MonitorMsg(sender, destination);
    try
    {
      returnMsg.setReplServerDbState(getDbServerState());
      // Update the information we have about all servers
      // in the topology.
      MonitorData md = computeMonitorData(updateMonitorData);
    returnMsg.setReplServerDbState(getDbServerState());
      // Add the informations about the Replicas currently in
      // the topology.
      Iterator<Integer> it = md.ldapIterator();
      while (it.hasNext())
      {
        int replicaId = it.next();
        returnMsg.setServerState(
            replicaId, md.getLDAPServerState(replicaId),
            md.getApproxFirstMissingDate(replicaId), true);
      }
      // Add the informations about the Replication Servers
      // currently in the topology.
      it = md.rsIterator();
      while (it.hasNext())
      {
        int replicaId = it.next();
        returnMsg.setServerState(
            replicaId, md.getRSStates(replicaId),
            md.getRSApproxFirstMissingDate(replicaId), false);
      }
    }
    catch (DirectoryException e)
    // Add the informations about the Replicas currently in
    // the topology.
    Iterator<Integer> it = monitorData.ldapIterator();
    while (it.hasNext())
    {
      // If we can't compute the Monitor Information, send
      // back an empty message.
      int replicaId = it.next();
      returnMsg.setServerState(replicaId,
          monitorData.getLDAPServerState(replicaId),
          monitorData.getApproxFirstMissingDate(replicaId), true);
    }
    // Add the informations about the Replication Servers
    // currently in the topology.
    it = monitorData.rsIterator();
    while (it.hasNext())
    {
      int replicaId = it.next();
      returnMsg.setServerState(replicaId,
          monitorData.getRSStates(replicaId),
          monitorData.getRSApproxFirstMissingDate(replicaId), false);
    }
    return returnMsg;
  }
@@ -2559,192 +2549,233 @@
   * Monitor Data generation
   * =======================
   */
  /**
   * Retrieves the global monitor data.
   * @param  updateMonitorData A boolean indicating if the monitor data should
   *                           be updated. If false the last monitoring data
   *                           that was computed will be returned. This is
   *                           acceptable for most cases because the monitoring
   *                           thread computes the monitoring data frequently.
   *                           If true is used the calling thread may be
   *                           blocked for a while.
   * @return The monitor data.
   * @throws DirectoryException When an error occurs.
   */
  protected MonitorData computeMonitorData(boolean updateMonitorData)
    throws DirectoryException
  {
    synchronized (monitoringLock)
    {
      if (updateMonitorData)
      {
        // Update the monitorData of ALL domains if this was necessary.
        replicationServer.computeMonitorData();
      }
      // Returns the monitorData of THIS domain
      return monitorData;
    }
  /**
   * Returns the latest monitor data available for this replication server
   * domain.
   *
   * @return The latest monitor data available for this replication server
   *         domain, which is never {@code null}.
   */
  MonitorData getDomainMonitorData()
  {
    return monitorData;
  }
  /**
   * Recomputes the monitor data for this replication server domain.
   *
   * @return The recomputed monitor data for this replication server domain.
   * @throws InterruptedException
   *           If this thread is interrupted while waiting for a response.
   */
  MonitorData computeDomainMonitorData() throws InterruptedException
  {
    // Only allow monitor recalculation at a time.
    synchronized (pendingMonitorLock)
    {
      if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
          .getTime())
      {
        try
        {
          // Prevent out of band monitor responses from updating our pending
          // table until we are ready.
          synchronized (pendingMonitorDataLock)
          {
            // Clear the pending monitor data.
            pendingMonitorDataServerIDs.clear();
            pendingMonitorData = new MonitorData();
            // Initialize the monitor data.
            initializePendingMonitorData();
            // Send the monitor requests to the connected replication servers.
            for (ReplicationServerHandler rs : replicationServers.values())
            {
              // Add server ID to pending table.
              int serverId = rs.getServerId();
              MonitorRequestMsg msg = new MonitorRequestMsg(
                  this.replicationServer.getServerId(), serverId);
              try
              {
                rs.send(msg);
                // Only register this server ID if we were able to send the
                // message.
                pendingMonitorDataServerIDs.add(serverId);
              }
              catch (IOException e)
              {
                // Log a message and do a best effort from here.
                Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST
                    .get(baseDn, serverId, e.getMessage());
                logError(message);
              }
            }
            // Create the pending response latch based on the number of expected
            // monitor responses.
            pendingMonitorDataLatch = new CountDownLatch(
                pendingMonitorDataServerIDs.size());
          }
          // Wait for the responses to come back.
          pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
          // Log messages for replication servers that have gone or come back.
          synchronized (pendingMonitorDataLock)
          {
            // Log servers that have come back.
            for (int serverId : monitorDataLateServers)
            {
              // Ensure that we only log once per server: don't fill the
              // error log with repeated messages.
              if (!pendingMonitorDataServerIDs.contains(serverId))
              {
                logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn,
                    serverId));
              }
            }
            // Log servers that have gone away.
            for (int serverId : pendingMonitorDataServerIDs)
            {
              // Ensure that we only log once per server: don't fill the
              // error log with repeated messages.
              if (!monitorDataLateServers.contains(serverId))
              {
                logError(ERR_MISSING_REMOTE_MONITOR_DATA.get(baseDn,
                    serverId));
              }
            }
            // Remember which servers were late this time.
            monitorDataLateServers.clear();
            monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
          }
          // Store the new computed data as the reference
          synchronized (pendingMonitorDataLock)
          {
            // Now we have the expected answers or an error occurred
            pendingMonitorData.completeComputing();
            monitorData = pendingMonitorData;
            monitorDataLastBuildDate = TimeThread.getTime();
          }
        }
        finally
        {
          synchronized (pendingMonitorDataLock)
          {
            // Clear pending state.
            pendingMonitorData = null;
            pendingMonitorDataLatch = null;
            pendingMonitorDataServerIDs.clear();
          }
        }
      }
    }
    return monitorData;
  }
  /**
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   *
   * @param expectedMonitoringMsg The list of server handler we have to wait a
   * monitoring message from. Will be filled as necessary by this method.
   * @throws DirectoryException
   *           In case the monitoring information could not be collected.
   */
  private void initializePendingMonitorData()
  {
    // Let's process our directly connected LSes
    // - in the ServerHandler for a given LS1, the stored state contains :
    // - the max CN produced by LS1
    // - the last CN consumed by LS1 from LS2..n
    // - in the RSdomain/dbHandler, the built-in state contains :
    // - the max CN produced by each server
    // So for a given LS connected we can take the state and the max from
    // the LS/state.
    for (ServerHandler directlsh : directoryServers.values())
    {
      int serverID = directlsh.getServerId();
      // the state comes from the state stored in the SH
      ServerState directlshState = directlsh.getServerState()
          .duplicate();
      // the max CN sent by that LS also comes from the SH
      ChangeNumber maxcn = directlshState
          .getMaxChangeNumber(serverID);
      if (maxcn == null)
      {
        // This directly connected LS has never produced any change
        maxcn = new ChangeNumber(0, 0, serverID);
      }
      pendingMonitorData.setMaxCN(serverID, maxcn);
      pendingMonitorData.setLDAPServerState(serverID, directlshState);
      pendingMonitorData.setFirstMissingDate(serverID,
          directlsh.getApproxFirstMissingDate());
    }
    // Then initialize the max CN for the LS that produced something
    // - from our own local db state
    // - whatever they are directly or indirectly connected
    ServerState dbServerState = getDbServerState();
    pendingMonitorData.setRSState(replicationServer.getServerId(),
        dbServerState);
    Iterator<Integer> it = dbServerState.iterator();
    while (it.hasNext())
    {
      int sid = it.next();
      ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
      pendingMonitorData.setMaxCN(sid, storedCN);
    }
  }
  /**
   * Processes a Monitor message receives from a remote Replication Server and
   * stores the data received.
   *
   * @throws DirectoryException In case the monitoring information could
   *                            not be collected.
   */
  void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
    throws DirectoryException
  {
    synchronized (monitorDataLock)
    {
      wrkMonitorData = new MonitorData();
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Computing monitor data ");
      // Let's process our directly connected LSes
      // - in the ServerHandler for a given LS1, the stored state contains :
      //   - the max CN produced by LS1
      //   - the last CN consumed by LS1 from LS2..n
      // - in the RSdomain/dbHandler, the built-in state contains :
      //   - the max CN produced by each server
      // So for a given LS connected we can take the state and the max from
      // the LS/state.
      for (ServerHandler directlsh : directoryServers.values())
      {
        int serverID = directlsh.getServerId();
        // the state comes from the state stored in the SH
        ServerState directlshState = directlsh.getServerState().duplicate();
        // the max CN sent by that LS also comes from the SH
        ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
        if (maxcn == null)
        {
          // This directly connected LS has never produced any change
          maxcn = new ChangeNumber(0, 0, serverID);
        }
        wrkMonitorData.setMaxCN(serverID, maxcn);
        wrkMonitorData.setLDAPServerState(serverID, directlshState);
        wrkMonitorData.setFirstMissingDate(serverID,
            directlsh.getApproxFirstMissingDate());
      }
      // Then initialize the max CN for the LS that produced something
      // - from our own local db state
      // - whatever they are directly or indirectly connected
      ServerState dbServerState = getDbServerState();
      wrkMonitorData.setRSState(replicationServer.getServerId(), dbServerState);
      Iterator<Integer> it = dbServerState.iterator();
      while (it.hasNext())
      {
        int sid = it.next();
        ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
        wrkMonitorData.setMaxCN(sid, storedCN);
      }
      // Now we have used all available local informations
      // and we need the remote ones.
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Local monitor data: " +
            wrkMonitorData.toString());
    }
    // Send the request for remote monitor data to the
    sendMonitorDataRequest(expectedMonitoringMsg);
  }
  /**
   * Complete all the calculation when all monitoring information
   * has been received.
   */
  void completeMonitorData()
  {
    // Store the new computed data as the reference
    synchronized (monitorDataLock)
    {
      // Now we have the expected answers or an error occurred
      wrkMonitorData.completeComputing();
      monitorData = wrkMonitorData;
      wrkMonitorData = null;
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " *** Computed MonitorData: " +
            monitorData.toString());
    }
  }
  /**
   * Sends a MonitorRequest message to all connected RS.
   * @param expectedMonitoringMsg The list of server handler we have to wait a
   * monitoring message from. Will be filled as necessary by this method.
   * @throws DirectoryException when a problem occurs.
   */
  protected void sendMonitorDataRequest(
    List<GlobalServerId> expectedMonitoringMsg)
    throws DirectoryException
  {
    try
    {
      for (ServerHandler rs : replicationServers.values())
      {
        int serverId = rs.getServerId();
        // Store the fact that we expect a MonitoringMsg back from this server
        expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
        MonitorRequestMsg msg =
          new MonitorRequestMsg(this.replicationServer.getServerId(),
          serverId);
        rs.send(msg);
      }
    } catch (Exception e)
    {
      Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
      logError(message);
      throw new DirectoryException(ResultCode.OTHER,
        message, e);
    }
  }
  /**
   * Processes a Monitor message receives from a remote Replication Server
   * and stores the data received.
   *
   * @param msg The message to be processed.
   * @param globalServerHandlerId server handler that is receiving the message.
   * @param msg
   *          The message to be processed.
   * @param globalServerHandlerId
   *          server handler that is receiving the message.
   */
  private void receivesMonitorDataResponse(MonitorMsg msg,
    GlobalServerId globalServerId)
      int serverId)
  {
    try
    synchronized (pendingMonitorDataLock)
    {
      synchronized (monitorDataLock)
      if (pendingMonitorData == null)
      {
        if (wrkMonitorData == null)
        {
          // This is a response for an earlier request whose computing is
          // already complete.
          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
              Integer.toString(msg.getSenderID())));
          return;
        }
        // This is a response for an earlier request whose computing is
        // already complete.
        logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn,
            msg.getSenderID()));
        return;
      }
      try
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN across the RSes
        ServerState replServerState = msg.getReplServerDbState();
        wrkMonitorData.setMaxCNs(replServerState);
        pendingMonitorData.setMaxCNs(replServerState);
        // store the remote RS states.
        wrkMonitorData.setRSState(msg.getSenderID(), replServerState);
        pendingMonitorData.setRSState(msg.getSenderID(),
            replServerState);
        // Store the remote LDAP servers states
        Iterator<Integer> lsidIterator = msg.ldapIterator();
@@ -2752,10 +2783,10 @@
        {
          int sid = lsidIterator.next();
          ServerState dsServerState = msg.getLDAPServerState(sid);
          wrkMonitorData.setMaxCNs(dsServerState);
          wrkMonitorData.setLDAPServerState(sid, dsServerState);
          wrkMonitorData.setFirstMissingDate(sid,
            msg.getLDAPApproxFirstMissingDate(sid));
          pendingMonitorData.setMaxCNs(dsServerState);
          pendingMonitorData.setLDAPServerState(sid, dsServerState);
          pendingMonitorData.setFirstMissingDate(sid,
              msg.getLDAPApproxFirstMissingDate(sid));
        }
        // Process the latency reported by the remote RSi on its connections
@@ -2768,50 +2799,49 @@
          {
            // this is the latency of the remote RSi regarding the current RS
            // let's update the fmd of my connected LS
            for (ServerHandler connectedlsh : directoryServers.values())
            for (ServerHandler connectedlsh : directoryServers
                .values())
            {
              int connectedlsid = connectedlsh.getServerId();
              Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
              wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
              pendingMonitorData.setFirstMissingDate(connectedlsid,
                  newfmd);
            }
          } else
          }
          else
          {
            // this is the latency of the remote RSi regarding another RSj
            // let's update the latency of the LSes connected to RSj
            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
            ReplicationServerHandler rsjHdr = replicationServers
                .get(rsid);
            if (rsjHdr != null)
            {
              for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
              for (int remotelsid : rsjHdr
                  .getConnectedDirectoryServerIds())
              {
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
                pendingMonitorData.setFirstMissingDate(remotelsid,
                    newfmd);
              }
            }
          }
        }
        if (debugEnabled())
      }
      catch (RuntimeException e)
      {
        // FIXME: do we really expect these???
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e
            .getMessage() + stackTraceToSingleLineString(e)));
      }
      finally
      {
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requestor thread.
        if (pendingMonitorDataServerIDs.remove(serverId))
        {
          if (debugEnabled())
            TRACER.debugInfo(
              "In " + this +
              " baseDn=" + baseDn +
              " Processed msg from " + msg.getSenderID() +
              " New monitor data: " + wrkMonitorData.toString());
          pendingMonitorDataLatch.countDown();
        }
      }
      // Decreases the number of expected responses and potentially
      // wakes up the waiting requestor thread.
      replicationServer.responseReceived(globalServerId);
    } catch (Exception e)
    {
      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
        stackTraceToSingleLineString(e)));
      // If an exception occurs while processing one of the expected message,
      // the processing is aborted and the waiting thread is awoke.
      replicationServer.responseReceivedAll();
    }
  }
@@ -2846,6 +2876,8 @@
  {
    return replicationServers;
  }
  /**
   * A synchronization mechanism is created to insure exclusive access to the
   * domain. The goal is to have a consistent view of the topology by locking
@@ -2868,11 +2900,6 @@
  private ReentrantLock lock = new ReentrantLock();
  /**
   * This lock is used to protect the monitoring computing.
   */
  private final Object monitoringLock = new Object();
  /**
   * This lock is used to protect the generationid variable.
   */
  private final Object generationIDLock = new Object();
@@ -3073,23 +3100,13 @@
    builder.add(baseDn.toString() + " " + generationId);
    attributes.add(builder.toAttribute());
    try
    {
      MonitorData md = computeMonitorData(true);
    MonitorData md = getDomainMonitorData();
      // Missing changes
      long missingChanges =
        md.getMissingChangesRS(replicationServer.getServerId());
      attributes.add(Attributes.create("missing-changes", String.valueOf(
        missingChanges)));
    }
    catch (Exception e)
    {
      Message message =
        ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e));
      // We failed retrieving the monitor data.
      attributes.add(Attributes.create("error", message.toString()));
    }
    // Missing changes
    long missingChanges = md.getMissingChangesRS(replicationServer
        .getServerId());
    attributes.add(Attributes.create("missing-changes",
        String.valueOf(missingChanges)));
    return attributes;
  }