mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
21.26.2013 f9fc57cba81bcf9a8259a8ea699eb999c0415397
Extracted ReplicationDomainMonitor class out of ReplicationServerDomain to increase its cohesion.


ReplicationDomainMonitor.java: ADDED
Extracted from ReplicationServerDomain.

ReplicationServerDomain.java:
Moved all the code managing the MonitorData to ReplicationDomainMonitor + added domainMonitor instance member.
In createGlobalTopologyMonitorMsg(), removed the MonitorData parameter.
Renamed mayResetGenerationId() to resetGenerationIdIfPossible().
Extracted methods setGenerationIdIfUnset() and getMessage().
In getConnectedDSs() and getConnectedRSs() returned unmodifiable maps.

MonitoringPublisher.java:
Renamed instance member replicationServerDomain to domain.
Consequences of the changes to ReplicationServerDomain.
1 files added
2 files modified
882 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java 22 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java 388 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 472 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -54,8 +54,8 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /** The domain we send monitoring for. */
  private final ReplicationServerDomain replicationServerDomain;
  /** The replication domain we send monitoring for. */
  private final ReplicationServerDomain domain;
  /** Sleep time (in ms) before sending new monitoring messages. */
  private volatile long period;
@@ -79,7 +79,7 @@
        + ") monitor publisher for domain \""
        + replicationServerDomain.getBaseDn() + "\"");
    this.replicationServerDomain = replicationServerDomain;
    this.domain = replicationServerDomain;
    this.period = period;
  }
@@ -107,15 +107,10 @@
        }
        // Send global topology information to peer DSs
        MonitorData monitorData = replicationServerDomain
            .computeDomainMonitorData();
        MonitorMsg monitorMsg = domain.createGlobalTopologyMonitorMsg(0, 0);
        final int localServerId = domain.getLocalRSServerId();
        MonitorMsg monitorMsg = replicationServerDomain
            .createGlobalTopologyMonitorMsg(0, 0, monitorData);
        int localServerId = replicationServerDomain.getLocalRSServerId();
        for (ServerHandler serverHandler : replicationServerDomain
            .getConnectedDSs().values())
        for (ServerHandler serverHandler : domain.getConnectedDSs().values())
        {
          // Set the right sender and destination ids
          monitorMsg.setSenderID(localServerId);
@@ -203,8 +198,7 @@
  private String getMessage(String message)
  {
    return "In RS " + replicationServerDomain.getLocalRSServerId()
        + ", for base dn " + replicationServerDomain.getBaseDn() + ": "
        + message;
    return "In RS " + domain.getLocalRSServerId() + ", for base dn "
        + domain.getBaseDn() + ": " + message;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java
New file
@@ -0,0 +1,388 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Message;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class maintains monitor data for a replication domain.
 */
class ReplicationDomainMonitor
{
  /**
   * The monitor data consolidated over the topology.
   */
  private volatile MonitorData monitorData = new MonitorData();
  /**
   * This lock guards against multiple concurrent monitor data recalculation.
   */
  private final Object pendingMonitorLock = new Object();
  /** Guarded by pendingMonitorLock. */
  private long monitorDataLastBuildDate = 0;
  /**
   * The set of replication servers which are already known to be slow to send
   * monitor data.
   * <p>
   * Guarded by pendingMonitorLock.
   */
  private final Set<Integer> monitorDataLateServers = new HashSet<Integer>();
  /** This lock serializes updates to the pending monitor data. */
  private final Object pendingMonitorDataLock = new Object();
  /**
   * Monitor data which is currently being calculated.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  private MonitorData pendingMonitorData;
  /**
   * A set containing the IDs of servers from which we are currently expecting
   * monitor responses. When a response is received from a server we remove the
   * ID from this table, and count down the latch if the ID was in the table.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  private final Set<Integer> pendingMonitorDataServerIDs =
      new HashSet<Integer>();
  /**
   * This latch is non-null and is used in order to count incoming responses as
   * they arrive. Since incoming response may arrive at any time, even when
   * there is no pending monitor request, access to the latch must be guarded.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  private CountDownLatch pendingMonitorDataLatch = null;
  /**
   * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
   */
  private final long monitorDataLifeTime = 500;
  /**
   * The replication domain monitored by this class.
   */
  private final ReplicationServerDomain domain;
  /**
   * Builds an object of this class.
   *
   * @param replicationDomain
   *          The replication domain that will be monitored by this class
   */
  public ReplicationDomainMonitor(ReplicationServerDomain replicationDomain)
  {
    this.domain = replicationDomain;
  }
  /**
   * Returns the latest monitor data available for this replication server
   * domain.
   *
   * @return The latest monitor data available for this replication server
   *         domain, which is never {@code null}.
   */
  public MonitorData getMonitorData()
  {
    return monitorData;
  }
  /**
   * Recomputes the monitor data for this replication server domain.
   *
   * @return The recomputed monitor data for this replication server domain.
   * @throws InterruptedException
   *           If this thread is interrupted while waiting for a response.
   */
  public MonitorData computeDomainMonitorData() throws InterruptedException
  {
    // Only allow monitor recalculation at a time.
    synchronized (pendingMonitorLock)
    {
      if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
          .getTime())
      {
        try
        {
          // Prevent out of band monitor responses from updating our pending
          // table until we are ready.
          synchronized (pendingMonitorDataLock)
          {
            // Clear the pending monitor data.
            pendingMonitorDataServerIDs.clear();
            pendingMonitorData = new MonitorData();
            initializePendingMonitorData();
            // Send the monitor requests to the connected replication servers.
            for (ServerHandler rs : domain.getConnectedRSs().values())
            {
              final int serverId = rs.getServerId();
              MonitorRequestMsg msg =
                  new MonitorRequestMsg(domain.getLocalRSServerId(), serverId);
              try
              {
                rs.send(msg);
                // Only register this server ID to pending table if we were able
                // to send the message.
                pendingMonitorDataServerIDs.add(serverId);
              }
              catch (IOException e)
              {
                // Log a message and do a best effort from here.
                Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get(
                    domain.getBaseDn(), serverId, e.getMessage());
                logError(message);
              }
            }
            // Create the pending response latch based on the number of expected
            // monitor responses.
            pendingMonitorDataLatch =
                new CountDownLatch(pendingMonitorDataServerIDs.size());
          }
          // Wait for the responses to come back.
          pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
          // Log messages for replication servers that have gone or come back.
          synchronized (pendingMonitorDataLock)
          {
            // Log servers that have come back.
            for (int serverId : monitorDataLateServers)
            {
              // Ensure that we only log once per server: don't fill the
              // error log with repeated messages.
              if (!pendingMonitorDataServerIDs.contains(serverId))
              {
                logError(NOTE_MONITOR_DATA_RECEIVED.get(
                    domain.getBaseDn(), serverId));
              }
            }
            // Log servers that have gone away.
            for (int serverId : pendingMonitorDataServerIDs)
            {
              // Ensure that we only log once per server: don't fill the
              // error log with repeated messages.
              if (!monitorDataLateServers.contains(serverId))
              {
                logError(WARN_MISSING_REMOTE_MONITOR_DATA.get(
                    domain.getBaseDn(), serverId));
              }
            }
            // Remember which servers were late this time.
            monitorDataLateServers.clear();
            monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
          }
          // Store the new computed data as the reference
          synchronized (pendingMonitorDataLock)
          {
            // Now we have the expected answers or an error occurred
            pendingMonitorData.completeComputing();
            monitorData = pendingMonitorData;
            monitorDataLastBuildDate = TimeThread.getTime();
          }
        }
        finally
        {
          synchronized (pendingMonitorDataLock)
          {
            // Clear pending state.
            pendingMonitorData = null;
            pendingMonitorDataLatch = null;
            pendingMonitorDataServerIDs.clear();
          }
        }
      }
    }
    return monitorData;
  }
  /**
   * Start collecting global monitoring information for the replication domain.
   */
  private void initializePendingMonitorData()
  {
    // Let's process our directly connected DS
    // - in the ServerHandler for a given DS1, the stored state contains :
    // -- the max CN produced by DS1
    // -- the last CN consumed by DS1 from DS2..n
    // - in the RSdomain/dbHandler, the built-in state contains :
    // -- the max CN produced by each server
    // So for a given DS connected we can take the state and the max from
    // the DS/state.
    for (ServerHandler ds : domain.getConnectedDSs().values())
    {
      final int serverId = ds.getServerId();
      final ServerState dsState = ds.getServerState().duplicate();
      ChangeNumber maxcn = dsState.getChangeNumber(serverId);
      if (maxcn == null)
      {
        // This directly connected LS has never produced any change
        maxcn = new ChangeNumber(0, 0, serverId);
      }
      pendingMonitorData.setMaxCN(serverId, maxcn);
      pendingMonitorData.setLDAPServerState(serverId, dsState);
      pendingMonitorData.setFirstMissingDate(serverId,
          ds.getApproxFirstMissingDate());
    }
    // Then initialize the max CN for the LS that produced something
    // - from our own local db state
    // - whatever they are directly or indirectly connected
    final ServerState dbServerState = domain.getDbServerState();
    pendingMonitorData.setRSState(domain.getLocalRSServerId(), dbServerState);
    for (int serverId : dbServerState)
    {
      ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
      pendingMonitorData.setMaxCN(serverId, storedCN);
    }
  }
  /**
   * Processes a Monitor message receives from a remote Replication Server and
   * stores the data received.
   *
   * @param msg
   *          The message to be processed.
   * @param serverId
   *          server handler that is receiving the message.
   */
  public void receiveMonitorDataResponse(MonitorMsg msg, int serverId)
  {
    synchronized (pendingMonitorDataLock)
    {
      if (pendingMonitorData == null)
      {
        // This is a response for an earlier request whose computing is
        // already complete.
        logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(domain.getBaseDn(),
            msg.getSenderID()));
        return;
      }
      try
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN across the RSes
        ServerState replServerState = msg.getReplServerDbState();
        pendingMonitorData.setMaxCNs(replServerState);
        // store the remote RS states.
        pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
        // Store the remote LDAP servers states
        for (int dsServerId : toIterable(msg.ldapIterator()))
        {
          ServerState dsServerState = msg.getLDAPServerState(dsServerId);
          pendingMonitorData.setMaxCNs(dsServerState);
          pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
          pendingMonitorData.setFirstMissingDate(dsServerId,
              msg.getLDAPApproxFirstMissingDate(dsServerId));
        }
        // Process the latency reported by the remote RSi on its connections
        // to the other RSes
        for (int rsServerId : toIterable(msg.rsIterator()))
        {
          long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
          if (rsServerId == domain.getLocalRSServerId())
          {
            // this is the latency of the remote RSi regarding the current RS
            // let's update the first missing date of my connected LS
            for (DataServerHandler ds : domain.getConnectedDSs().values())
            {
              int connectedServerId = ds.getServerId();
              pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
            }
          }
          else
          {
            // this is the latency of the remote RSi regarding another RSj
            // let's update the latency of the LSes connected to RSj
            ReplicationServerHandler rsjHdr =
                domain.getConnectedRSs().get(rsServerId);
            if (rsjHdr != null)
            {
              for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
              {
                pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
              }
            }
          }
        }
      }
      catch (RuntimeException e)
      {
        // FIXME: do we really expect these???
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
            e.getMessage() + stackTraceToSingleLineString(e)));
      }
      finally
      {
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requester thread.
        if (pendingMonitorDataServerIDs.remove(serverId))
        {
          pendingMonitorDataLatch.countDown();
        }
      }
    }
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -32,7 +32,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -51,7 +50,6 @@
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.types.*;
import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -92,6 +90,11 @@
   */
  private AtomicReference<MonitoringPublisher> monitoringPublisher =
      new AtomicReference<MonitoringPublisher>();
  /**
   * Maintains monitor data for the current domain.
   */
  private ReplicationDomainMonitor domainMonitor =
      new ReplicationDomainMonitor(this);
  /**
   * The following map contains one balanced tree for each replica ID to which
@@ -128,63 +131,6 @@
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  // Monitor data management
  /**
   * The monitor data consolidated over the topology.
   */
  private volatile MonitorData monitorData = new MonitorData();
  /**
   * This lock guards against multiple concurrent monitor data recalculation.
   */
  private final Object pendingMonitorLock = new Object();
  /** Guarded by pendingMonitorLock. */
  private long monitorDataLastBuildDate = 0;
  /**
   * The set of replication servers which are already known to be slow to send
   * monitor data.
   * <p>
   * Guarded by pendingMonitorLock.
   */
  private final Set<Integer> monitorDataLateServers = new HashSet<Integer>();
  /** This lock serializes updates to the pending monitor data. */
  private final Object pendingMonitorDataLock = new Object();
  /**
   * Monitor data which is currently being calculated. Guarded by
   * pendingMonitorDataLock.
   */
  private MonitorData pendingMonitorData;
  /**
   * A set containing the IDs of servers from which we are currently expecting
   * monitor responses. When a response is received from a server we remove the
   * ID from this table, and count down the latch if the ID was in the table.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  private final Set<Integer> pendingMonitorDataServerIDs =
    new HashSet<Integer>();
  /**
   * This latch is non-null and is used in order to count incoming responses as
   * they arrive. Since incoming response may arrive at any time, even when
   * there is no pending monitor request, access to the latch must be guarded.
   * <p>
   * Guarded by pendingMonitorDataLock.
   */
  private CountDownLatch pendingMonitorDataLatch = null;
  /**
   * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
   */
  private final long monitorDataLifeTime = 500;
  /**
   * The needed info for each received assured update message we are waiting
   * acks for.
@@ -251,11 +197,7 @@
    sourceHandler.updateServerState(update);
    sourceHandler.incrementInCount();
    if (generationId < 0)
    {
      generationId = sourceHandler.getGenerationId();
    }
    setGenerationIdIfUnset(sourceHandler.getGenerationId());
    /**
     * If this is an assured message (a message requesting ack), we must
@@ -424,19 +366,17 @@
        {
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
          {
            TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update "
                + update.getChangeNumber()
            TRACER.debugInfo(getMessage("update " + update.getChangeNumber()
                + " will not be sent to directory server "
                + dsHandler.getServerId() + " with generation id "
                + dsHandler.getGenerationId() + " different from local "
                + "generation id " + generationId);
                + "generation id " + generationId));
          }
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
          {
            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
                + " for dn " + baseDn + ", update " + update.getChangeNumber()
            TRACER.debugInfo(getMessage("update " + update.getChangeNumber()
                + " will not be sent to directory server "
                + dsHandler.getServerId() + " as it is in full update");
                + dsHandler.getServerId() + " as it is in full update"));
          }
        }
@@ -869,11 +809,9 @@
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          if (debugEnabled())
          {
            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
                    + " for "+ baseDn
                    + ", sending timeout for assured update with change "
                    + " number " + cn + " to server id "
                    + origServer.getServerId());
            TRACER.debugInfo(getMessage(
                "sending timeout for assured update with change number " + cn
                + " to server id " + origServer.getServerId()));
          }
          try
          {
@@ -1112,8 +1050,7 @@
    unregisterServerHandler(sHandler);
    sHandler.shutdown();
    // Check if generation id has to be reset
    mayResetGenerationId();
    resetGenerationIdIfPossible();
    if (!shutdown)
    {
      if (isDirectoryServer)
@@ -1211,16 +1148,13 @@
   * - traverse replicationServers list and test for each if DS are connected
   * So it strongly relies on the directoryServers list
   */
  private void mayResetGenerationId()
  private void resetGenerationIdIfPossible()
  {
    String prefix =
        "In RS " + this.localReplicationServer.getMonitorInstanceName()
            + " for " + baseDn + " ";
    if (debugEnabled())
    {
      TRACER.debugInfo(prefix + "mayResetGenerationId generationIdSavedStatus="
          + generationIdSavedStatus);
      TRACER.debugInfo(getMessage(
          "mayResetGenerationId generationIdSavedStatus="
          + generationIdSavedStatus));
    }
    // If there is no more any LDAP server connected to this domain in the
@@ -1235,9 +1169,9 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo(prefix + "mayResetGenerationId skip RS "
            TRACER.debugInfo(getMessage("mayResetGenerationId skip RS "
                + rsHandler.getMonitorInstanceName()
                + " that has different genId");
                + " that has different genId"));
          }
        }
        else if (rsHandler.hasRemoteLDAPServers())
@@ -1246,10 +1180,10 @@
          if (debugEnabled())
          {
            TRACER.debugInfo(prefix + "mayResetGenerationId RS "
            TRACER.debugInfo(getMessage("mayResetGenerationId RS "
                + rsHandler.getMonitorInstanceName()
                + " has ldap servers connected to it"
                + " - will not reset generationId");
                + " - will not reset generationId"));
          }
          break;
        }
@@ -1261,13 +1195,13 @@
      if (debugEnabled())
      {
        TRACER.debugInfo(prefix + "has ldap servers connected to it"
            + " - will not reset generationId");
        TRACER.debugInfo(getMessage("has ldap servers connected to it"
            + " - will not reset generationId"));
      }
    }
    if (!ldapServersConnectedInTheTopology
        && !this.generationIdSavedStatus
        && !generationIdSavedStatus
        && generationId != -1)
    {
      changeGenerationId(-1, false);
@@ -1544,7 +1478,8 @@
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg = (MonitorMsg) msg;
        receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId());
        domainMonitor.receiveMonitorDataResponse(monitorMsg,
            msgEmitter.getServerId());
      } else
      {
        replyWithUnroutableMsgType(msgEmitter, msg);
@@ -1573,10 +1508,10 @@
    if (msgEmitter.isDataServer())
    {
      // Monitoring information requested by a DS
      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID(), monitorData);
      try
      {
        MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
            msg.getDestination(), msg.getSenderID());
        msgEmitter.send(monitorMsg);
      }
      catch (IOException e)
@@ -1711,33 +1646,25 @@
   *          The sender of this message.
   * @param destination
   *          The destination of this message.
   * @param monitorData
   *          The domain monitor data which should be used for the message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   *         during message creation.
   */
  public MonitorMsg createGlobalTopologyMonitorMsg(
      int sender, int destination, MonitorData monitorData)
  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
  {
    final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
    returnMsg.setReplServerDbState(getDbServerState());
    // Add the informations about the Replicas currently in the topology.
    Iterator<Integer> it = monitorData.ldapIterator();
    while (it.hasNext())
    // Add the server state for each DS and RS currently in the topology.
    final MonitorData monitorData = getDomainMonitorData();
    for (int replicaId : toIterable(monitorData.ldapIterator()))
    {
      int replicaId = it.next();
      returnMsg.setServerState(replicaId,
          monitorData.getLDAPServerState(replicaId),
          monitorData.getApproxFirstMissingDate(replicaId), true);
    }
    // Add the information about the RSs currently in the topology.
    it = monitorData.rsIterator();
    while (it.hasNext())
    for (int replicaId : toIterable(monitorData.rsIterator()))
    {
      int replicaId = it.next();
      returnMsg.setServerState(replicaId,
          monitorData.getRSStates(replicaId),
          monitorData.getRSApproxFirstMissingDate(replicaId), false);
@@ -1774,27 +1701,22 @@
    try
    {
      MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
      final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
      monitorMsg.setReplServerDbState(getDbServerState());
      // Populate for each connected LDAP Server
      // from the states stored in the serverHandler.
      // - the server state
      // - the older missing change
      // Add the server state for each connected DS and RS.
      for (DataServerHandler dsHandler : this.connectedDSs.values())
      {
        monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
            .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
      {
        monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
            .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
      }
      // Populate the RS state in the msg from the DbState
      monitorMsg.setReplServerDbState(getDbServerState());
      return monitorMsg;
    }
    finally
@@ -2060,7 +1982,6 @@
      if (this.generationId != generationId)
      {
        // we are changing of genId
        clearDbs();
        this.generationId = generationId;
@@ -2187,10 +2108,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS " + getLocalRSServerId() +
          " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
          " for baseDn " + baseDn + ":\n" + csMsg);
      TRACER.debugInfo(getMessage("receiving ChangeStatusMsg from "
          + senderHandler.getServerId() + ":\n" + csMsg));
    }
    try
@@ -2421,9 +2340,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In RS " + getLocalRSServerId()
          + " Receiving TopologyMsg from " + rsHandler.getServerId()
          + " for baseDn " + baseDn + ":\n" + topoMsg);
      TRACER.debugInfo(getMessage("receiving TopologyMsg from "
          + rsHandler.getServerId() + ":\n" + topoMsg));
    }
    try
@@ -2448,12 +2366,8 @@
      // Handle generation id
      if (allowResetGenId)
      {
        // Check if generation id has to be reset
        mayResetGenerationId();
        if (generationId < 0)
        {
          generationId = rsHandler.getGenerationId();
        }
        resetGenerationIdIfPossible();
        setGenerationIdIfUnset(rsHandler.getGenerationId());
      }
      if (isDifferentGenerationId(rsHandler.getGenerationId()))
@@ -2487,10 +2401,13 @@
    }
  }
  /* =======================
   * Monitor Data generation
   * =======================
   */
  private void setGenerationIdIfUnset(long generationId)
  {
    if (this.generationId < 0)
    {
      this.generationId = generationId;
    }
  }
  /**
   * Returns the latest monitor data available for this replication server
@@ -2501,274 +2418,7 @@
   */
  MonitorData getDomainMonitorData()
  {
    return monitorData;
  }
  /**
   * Recomputes the monitor data for this replication server domain.
   *
   * @return The recomputed monitor data for this replication server domain.
   * @throws InterruptedException
   *           If this thread is interrupted while waiting for a response.
   */
  MonitorData computeDomainMonitorData() throws InterruptedException
  {
    // Only allow monitor recalculation at a time.
    synchronized (pendingMonitorLock)
    {
      if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
          .getTime())
      {
        try
        {
          // Prevent out of band monitor responses from updating our pending
          // table until we are ready.
          synchronized (pendingMonitorDataLock)
          {
            // Clear the pending monitor data.
            pendingMonitorDataServerIDs.clear();
            pendingMonitorData = new MonitorData();
            // Initialize the monitor data.
            initializePendingMonitorData();
            // Send the monitor requests to the connected replication servers.
            for (ReplicationServerHandler rs : connectedRSs.values())
            {
              // Add server ID to pending table.
              int serverId = rs.getServerId();
              MonitorRequestMsg msg = new MonitorRequestMsg(
                  this.localReplicationServer.getServerId(), serverId);
              try
              {
                rs.send(msg);
                // Only register this server ID if we were able to send the
                // message.
                pendingMonitorDataServerIDs.add(serverId);
              }
              catch (IOException e)
              {
                // Log a message and do a best effort from here.
                Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST
                    .get(baseDn, serverId, e.getMessage());
                logError(message);
              }
            }
            // Create the pending response latch based on the number of expected
            // monitor responses.
            pendingMonitorDataLatch = new CountDownLatch(
                pendingMonitorDataServerIDs.size());
          }
          // Wait for the responses to come back.
          pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
          // Log messages for replication servers that have gone or come back.
          synchronized (pendingMonitorDataLock)
          {
            // Log servers that have come back.
            for (int serverId : monitorDataLateServers)
            {
              // Ensure that we only log once per server: don't fill the
              // error log with repeated messages.
              if (!pendingMonitorDataServerIDs.contains(serverId))
              {
                logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn,
                    serverId));
              }
            }
            // Log servers that have gone away.
            for (int serverId : pendingMonitorDataServerIDs)
            {
              // Ensure that we only log once per server: don't fill the
              // error log with repeated messages.
              if (!monitorDataLateServers.contains(serverId))
              {
                logError(WARN_MISSING_REMOTE_MONITOR_DATA.get(baseDn,
                    serverId));
              }
            }
            // Remember which servers were late this time.
            monitorDataLateServers.clear();
            monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
          }
          // Store the new computed data as the reference
          synchronized (pendingMonitorDataLock)
          {
            // Now we have the expected answers or an error occurred
            pendingMonitorData.completeComputing();
            monitorData = pendingMonitorData;
            monitorDataLastBuildDate = TimeThread.getTime();
          }
        }
        finally
        {
          synchronized (pendingMonitorDataLock)
          {
            // Clear pending state.
            pendingMonitorData = null;
            pendingMonitorDataLatch = null;
            pendingMonitorDataServerIDs.clear();
          }
        }
      }
    }
    return monitorData;
  }
  /**
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   */
  private void initializePendingMonitorData()
  {
    // Let's process our directly connected DS
    // - in the ServerHandler for a given DS1, the stored state contains :
    // - the max CN produced by DS1
    // - the last CN consumed by DS1 from DS2..n
    // - in the RSdomain/dbHandler, the built-in state contains :
    // - the max CN produced by each server
    // So for a given DS connected we can take the state and the max from
    // the DS/state.
    for (ServerHandler ds : connectedDSs.values())
    {
      int serverID = ds.getServerId();
      // the state comes from the state stored in the SH
      ServerState dsState = ds.getServerState().duplicate();
      // the max CN sent by that LS also comes from the SH
      ChangeNumber maxcn = dsState.getChangeNumber(serverID);
      if (maxcn == null)
      {
        // This directly connected LS has never produced any change
        maxcn = new ChangeNumber(0, 0, serverID);
      }
      pendingMonitorData.setMaxCN(serverID, maxcn);
      pendingMonitorData.setLDAPServerState(serverID, dsState);
      pendingMonitorData.setFirstMissingDate(serverID,
          ds.getApproxFirstMissingDate());
    }
    // Then initialize the max CN for the LS that produced something
    // - from our own local db state
    // - whatever they are directly or indirectly connected
    ServerState dbServerState = getDbServerState();
    pendingMonitorData.setRSState(localReplicationServer.getServerId(),
        dbServerState);
    for (int serverId : dbServerState) {
      ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
      pendingMonitorData.setMaxCN(serverId, storedCN);
    }
  }
  /**
   * Processes a Monitor message receives from a remote Replication Server and
   * stores the data received.
   *
   * @param msg
   *          The message to be processed.
   * @param serverId
   *          server handler that is receiving the message.
   */
  private void receivesMonitorDataResponse(MonitorMsg msg, int serverId)
  {
    synchronized (pendingMonitorDataLock)
    {
      if (pendingMonitorData == null)
      {
        // This is a response for an earlier request whose computing is
        // already complete.
        logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn,
            msg.getSenderID()));
        return;
      }
      try
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN across the RSes
        ServerState replServerState = msg.getReplServerDbState();
        pendingMonitorData.setMaxCNs(replServerState);
        // store the remote RS states.
        pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
        // Store the remote LDAP servers states
        Iterator<Integer> dsServerIdIterator = msg.ldapIterator();
        while (dsServerIdIterator.hasNext())
        {
          int dsServerId = dsServerIdIterator.next();
          ServerState dsServerState = msg.getLDAPServerState(dsServerId);
          pendingMonitorData.setMaxCNs(dsServerState);
          pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
          pendingMonitorData.setFirstMissingDate(dsServerId,
              msg.getLDAPApproxFirstMissingDate(dsServerId));
        }
        // Process the latency reported by the remote RSi on its connections
        // to the other RSes
        Iterator<Integer> rsServerIdIterator = msg.rsIterator();
        while (rsServerIdIterator.hasNext())
        {
          int rsServerId = rsServerIdIterator.next();
          long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
          if (rsServerId == localReplicationServer.getServerId())
          {
            // this is the latency of the remote RSi regarding the current RS
            // let's update the fmd of my connected LS
            for (DataServerHandler connectedDS : connectedDSs.values())
            {
              int connectedServerId = connectedDS.getServerId();
              pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
            }
          }
          else
          {
            // this is the latency of the remote RSi regarding another RSj
            // let's update the latency of the LSes connected to RSj
            ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId);
            if (rsjHdr != null)
            {
              for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
              {
                pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
              }
            }
          }
        }
      }
      catch (RuntimeException e)
      {
        // FIXME: do we really expect these???
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
            e.getMessage() + stackTraceToSingleLineString(e)));
      }
      finally
      {
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requester thread.
        if (pendingMonitorDataServerIDs.remove(serverId))
        {
          pendingMonitorDataLatch.countDown();
        }
      }
    }
    return domainMonitor.getMonitorData();
  }
  /**
@@ -2791,7 +2441,7 @@
   */
  public Map<Integer, DataServerHandler> getConnectedDSs()
  {
    return connectedDSs;
    return Collections.unmodifiableMap(connectedDSs);
  }
  /**
@@ -2800,7 +2450,7 @@
   */
  public Map<Integer, ReplicationServerHandler> getConnectedRSs()
  {
    return connectedRSs;
    return Collections.unmodifiableMap(connectedRSs);
  }
@@ -2960,19 +2610,13 @@
        String.valueOf(localReplicationServer.getServerId())));
    attributes.add(Attributes.create("replication-server-port",
        String.valueOf(localReplicationServer.getReplicationPort())));
    // Add all the base DNs that are known by this replication server.
    attributes.add(Attributes.create("domain-name", baseDn));
    // Publish to monitor the generation ID by replicationServerDomain
    attributes.add(Attributes.create("generation-id",
        baseDn + " " + generationId));
    MonitorData md = getDomainMonitorData();
    // Missing changes
    long missingChanges = md.getMissingChangesRS(localReplicationServer
        .getServerId());
    long missingChanges = getDomainMonitorData().getMissingChangesRS(
        localReplicationServer.getServerId());
    attributes.add(Attributes.create("missing-changes",
        String.valueOf(missingChanges)));
@@ -3439,4 +3083,10 @@
    // connection): store handler.
    connectedRSs.put(rsHandler.getServerId(), rsHandler);
  }
  private String getMessage(String message)
  {
    return "In RS " + localReplicationServer.getServerId() + " for " + baseDn
        + ": " + message;
  }
}