mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
25.02.2014 21af6610b07617ecbf1b826310a2f244deb4d348
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -29,6 +29,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -75,12 +76,10 @@
  private final DN baseDN;
  /**
   * The Status analyzer that periodically verifies whether the connected DSs
   * are late. Using an AtomicReference to avoid leaking references to costly
   * threads.
   * Periodically verifies whether the connected DSs are late and publishes any
   * pending status messages.
   */
  private AtomicReference<StatusAnalyzer> statusAnalyzer =
      new AtomicReference<StatusAnalyzer>();
  private final StatusAnalyzer statusAnalyzer;
  /**
   * The monitoring publisher that periodically sends monitoring messages to the
@@ -166,6 +165,98 @@
   */
  private int assuredTimeoutTimerPurgeCounter = 0;
  /**
   * Stores pending status messages such as DS change time heartbeats for future
   * forwarding to the rest of the topology. This class is required in order to
   * decouple inbound IO processing from outbound IO processing and avoid
   * potential inter-process deadlocks. In particular, the {@code ServerReader}
   * thread must not send messages.
   */
  private static class PendingStatusMessages
  {
    private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats =
        new HashMap<Integer, ChangeTimeHeartbeatMsg>(1);
    private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs =
        new HashMap<Integer, MonitorMsg>(1);
    private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs =
        new HashMap<Integer, MonitorMsg>(1);
    private boolean sendRSTopologyMsg;
    private boolean sendDSTopologyMsg;
    private int excludedDSForTopologyMsg = -1;
    /**
     * Enqueues a TopologyMsg for all the connected directory servers in order
     * to let them know the topology (every known DSs and RSs).
     *
     * @param excludedDS
     *          If not null, the topology message will not be sent to this DS.
     */
    private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
    {
      int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
      if (sendDSTopologyMsg)
      {
        if (excludedServerId != excludedDSForTopologyMsg)
        {
          excludedDSForTopologyMsg = -1;
        }
      }
      else
      {
        sendDSTopologyMsg = true;
        excludedDSForTopologyMsg = excludedServerId;
      }
    }
    /**
     * Enqueues a TopologyMsg for all the connected replication servers in order
     * to let them know our connected LDAP servers.
     */
    private void enqueueTopoInfoToAllRSs()
    {
      sendRSTopologyMsg = true;
    }
    /**
     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
     * all other RS instances.
     *
     * @param msg
     *          The heartbeat message.
     */
    private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
    {
      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
    }
    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
    {
      pendingDSMonitorMsgs.put(dsServerId, msg);
    }
    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
    {
      pendingRSMonitorMsgs.put(rsServerId, msg);
    }
  }
  private final Object pendingStatusMessagesLock = new Object();
  /** @GuardedBy("pendingStatusMessagesLock") */
  private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
  /**
   * Creates a new ReplicationServerDomain associated to the baseDN.
   *
@@ -184,7 +275,8 @@
        + ") assured timer for domain \"" + baseDN + "\"", true);
    this.domainDB =
        localReplicationServer.getChangelogDB().getReplicationDomainDB();
    this.statusAnalyzer = new StatusAnalyzer(this);
    this.statusAnalyzer.start();
    DirectoryServer.registerMonitorProvider(this);
  }
@@ -704,7 +796,7 @@
   * @param ack The ack message received.
   * @param ackingServer The server handler of the server that sent the ack.
   */
  public void processAck(AckMsg ack, ServerHandler ackingServer)
  void processAck(AckMsg ack, ServerHandler ackingServer)
  {
    // Retrieve the expected acks info for the update matching the original
    // sent update.
@@ -990,21 +1082,12 @@
        if (connectedRSs.containsKey(sHandler.getServerId()))
        {
          unregisterServerHandler(sHandler, shutdown, false);
        } else if (connectedDSs.containsKey(sHandler.getServerId()))
        }
        else if (connectedDSs.containsKey(sHandler.getServerId()))
        {
          // If this is the last DS for the domain,
          // shutdown the status analyzer
          if (connectedDSs.size() == 1)
          {
            if (logger.isTraceEnabled())
            {
              debug("remote server " + sHandler
                  + " is the last DS to be stopped: stopping status analyzer");
            }
            stopStatusAnalyzer();
          }
          unregisterServerHandler(sHandler, shutdown, true);
        } else if (otherHandlers.contains(sHandler))
        }
        else if (otherHandlers.contains(sHandler))
        {
          unregisterOtherHandler(sHandler);
        }
@@ -1038,15 +1121,19 @@
    resetGenerationIdIfPossible();
    if (!shutdown)
    {
      if (isDirectoryServer)
      synchronized (pendingStatusMessagesLock)
      {
        // Update the remote replication servers with our list
        // of connected LDAP servers
        sendTopoInfoToAllRSs();
        if (isDirectoryServer)
        {
          // Update the remote replication servers with our list
          // of connected LDAP servers
          pendingStatusMessages.enqueueTopoInfoToAllRSs();
        }
        // Warn our DSs that a RS or DS has quit (does not use this
        // handler as already removed from list)
        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
      }
      // Warn our DSs that a RS or DS has quit (does not use this
      // handler as already removed from list)
      sendTopoInfoToAllDSsExcept(null);
      statusAnalyzer.notifyPendingStatusMessage();
    }
  }
@@ -1384,99 +1471,71 @@
    return servers;
  }
  /**
   * Processes a message coming from one server in the topology and potentially
   * forwards it to one or all other servers.
   *
   * @param msg
   *          The message received and to be processed.
   * @param msgEmitter
   *          The server handler of the server that emitted the message.
   * @param sender
   *          The server handler of the server that sent the message.
   */
  public void process(RoutableMsg msg, ServerHandler msgEmitter)
  void process(RoutableMsg msg, ServerHandler sender)
  {
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (!(msg instanceof InitializeRequestMsg) &&
        !(msg instanceof InitializeTargetMsg) &&
        !(msg instanceof InitializeRcvAckMsg) &&
        !(msg instanceof EntryMsg) &&
        !(msg instanceof DoneMsg) &&
        (msg.getDestination() == this.localReplicationServer.getServerId()))
    if (msg.getDestination() == localReplicationServer.getServerId())
    {
      // Handle routable messages targeted at this RS.
      if (msg instanceof ErrorMsg)
      {
        ErrorMsg errorMsg = (ErrorMsg) msg;
        logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
      } else if (msg instanceof MonitorRequestMsg)
      {
        replyWithTopologyMonitorMsg(msg, msgEmitter);
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg = (MonitorMsg) msg;
        domainMonitor.receiveMonitorDataResponse(monitorMsg,
            msgEmitter.getServerId());
      } else
      {
        replyWithUnroutableMsgType(msgEmitter, msg);
      }
      return;
    }
    List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
    if (!servers.isEmpty())
    {
      forwardMsgToAllServers(msg, servers, msgEmitter);
      else
      {
        replyWithUnroutableMsgType(sender, msg);
      }
    }
    else
    {
      replyWithUnreachablePeerMsg(msgEmitter, msg);
      // Forward message not destined for this RS.
      List<ServerHandler> servers = getDestinationServers(msg, sender);
      if (!servers.isEmpty())
      {
        forwardMsgToAllServers(msg, servers, sender);
      }
      else
      {
        replyWithUnreachablePeerMsg(sender, msg);
      }
    }
  }
  private void replyWithTopologyMonitorMsg(RoutableMsg msg,
      ServerHandler msgEmitter)
  /**
   * Responds to a monitor request message.
   *
   * @param msg
   *          The monitor request message.
   * @param sender
   *          The DS/RS which sent the monitor request.
   */
  void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (msgEmitter.isDataServer())
    {
      // Monitoring information requested by a DS
      try
      {
        MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
            msg.getDestination(), msg.getSenderID(),
            domainMonitor.getMonitorData());
        msgEmitter.send(monitorMsg);
      }
      catch (IOException e)
      {
        // the connection was closed.
      }
    }
    else
    {
      // Monitoring information requested by a RS
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
    enqueueMonitorMsg(msg, sender);
  }
      if (monitorMsg != null)
      {
        try
        {
          msgEmitter.send(monitorMsg);
        }
        catch (IOException e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getDestination());
        }
      }
    }
  /**
   * Responds to a monitor message.
   *
   * @param msg
   *          The monitor message
   * @param sender
   *          The DS/RS which sent the monitor.
   */
  void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
  {
    domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
  }
  private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
@@ -1532,7 +1591,7 @@
  }
  private void forwardMsgToAllServers(RoutableMsg msg,
      List<ServerHandler> servers, ServerHandler msgEmitter)
      List<ServerHandler> servers, ServerHandler sender)
  {
    for (ServerHandler targetHandler : servers)
    {
@@ -1557,13 +1616,13 @@
        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
        try
        {
          msgEmitter.send(errMsg);
          sender.send(errMsg);
        } catch (IOException ioe1)
        {
          // an error happened on the sender session trying to recover
          // from an error on the receiver session.
          // We don't have much solution left beside closing the sessions.
          stopServer(msgEmitter, false);
          stopServer(sender, false);
          stopServer(targetHandler, false);
        }
      // TODO Handle error properly (sender timeout in addition)
@@ -1629,42 +1688,26 @@
   * @return The newly created and filled MonitorMsg. Null if the current thread
   *         was interrupted while attempting to get the domain lock.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    try
    final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
    monitorMsg.setReplServerDbState(getLatestServerState());
    // Add the server state for each connected DS and RS.
    for (DataServerHandler dsHandler : this.connectedDSs.values())
    {
      // Lock domain as we need to go through connected servers list
      lock();
    }
    catch (InterruptedException e)
    {
      return null;
      monitorMsg.setServerState(dsHandler.getServerId(),
          dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
          true);
    }
    try
    for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
    {
      final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
      monitorMsg.setReplServerDbState(getLatestServerState());
      // Add the server state for each connected DS and RS.
      for (DataServerHandler dsHandler : this.connectedDSs.values())
      {
        monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
            .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
      }
      for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
      {
        monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
            .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
      }
      return monitorMsg;
      monitorMsg.setServerState(rsHandler.getServerId(),
          rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
          false);
    }
    finally
    {
      release();
    }
    return monitorMsg;
  }
  /**
@@ -1678,6 +1721,7 @@
    assuredTimeoutTimer.cancel();
    stopAllServers(true);
    statusAnalyzer.shutdown();
  }
  /**
@@ -1701,79 +1745,7 @@
    return "ReplicationServerDomain " + baseDN;
  }
  /**
   * Send a TopologyMsg to all the connected directory servers in order to let
   * them know the topology (every known DSs and RSs).
   *
   * @param notThisOne
   *          If not null, the topology message will not be sent to this DS.
   */
  private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
  {
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      if (dsHandler != notThisOne)
      // All except the supplied one
      {
        for (int i=1; i<=2; i++)
        {
          if (!dsHandler.shuttingDown()
              && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          {
            TopologyMsg topoMsg =
                createTopologyMsgForDS(dsHandler.getServerId());
            try
            {
              dsHandler.sendTopoInfo(topoMsg);
              break;
            }
            catch (IOException e)
            {
              if (i == 2)
              {
                logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, baseDN.toNormalizedString(), "directory",
                    dsHandler.getServerId(), e.getMessage());
              }
            }
          }
          sleep(100);
        }
      }
    }
  }
  /**
   * Send a TopologyMsg to all the connected replication servers
   * in order to let them know our connected LDAP servers.
   */
  private void sendTopoInfoToAllRSs()
  {
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      for (int i=1; i<=2; i++)
      {
        if (!rsHandler.shuttingDown()
            && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
        {
          try
          {
            rsHandler.sendTopoInfo(topoMsg);
            break;
          }
          catch (IOException e)
          {
            if (i == 2)
            {
              logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, baseDN.toNormalizedString(), "replication",
                  rsHandler.getServerId(), e.getMessage());
            }
          }
        }
        sleep(100);
      }
    }
  }
  /**
   * Creates a TopologyMsg filled with information to be sent to a remote RS.
@@ -2031,7 +2003,7 @@
        return;
      }
      sendTopoInfoToAllExcept(senderHandler);
      enqueueTopoInfoToAllExcept(senderHandler);
      logger.info(NOTE_DIRECTORY_SERVER_CHANGED_STATUS,
          senderHandler.getServerId(), baseDN.toNormalizedString(), newStatus);
@@ -2053,7 +2025,7 @@
   * @param event The event to be used for new status computation
   * @return True if we have been interrupted (must stop), false otherwise
   */
  public boolean changeStatus(DataServerHandler dsHandler,
  private boolean changeStatus(DataServerHandler dsHandler,
      StatusMachineEvent event)
  {
    try
@@ -2106,7 +2078,7 @@
        return false;
      }
      sendTopoInfoToAllExcept(dsHandler);
      enqueueTopoInfoToAllExcept(dsHandler);
    }
    catch (Exception e)
    {
@@ -2125,7 +2097,7 @@
   */
  public void sendTopoInfoToAll()
  {
    sendTopoInfoToAllExcept(null);
    enqueueTopoInfoToAllExcept(null);
  }
  /**
@@ -2134,10 +2106,14 @@
   * @param dsHandler
   *          if not null, the topology message will not be sent to this DS
   */
  private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
  private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
  {
    sendTopoInfoToAllDSsExcept(dsHandler);
    sendTopoInfoToAllRSs();
    synchronized (pendingStatusMessagesLock)
    {
      pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
      pendingStatusMessages.enqueueTopoInfoToAllRSs();
    }
    statusAnalyzer.notifyPendingStatusMessage();
  }
  /**
@@ -2253,7 +2229,11 @@
       * Sends the currently known topology information to every connected
       * DS we have.
       */
      sendTopoInfoToAllDSsExcept(null);
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
      }
      statusAnalyzer.notifyPendingStatusMessage();
    }
    catch(Exception e)
    {
@@ -2373,36 +2353,6 @@
  }
  /**
   * Starts the status analyzer for the domain if not already started.
   */
  private void startStatusAnalyzer()
  {
    int degradedStatusThreshold =
        localReplicationServer.getDegradedStatusThreshold();
    if (degradedStatusThreshold > 0) // 0 means no status analyzer
    {
      final StatusAnalyzer thread = new StatusAnalyzer(this);
      if (statusAnalyzer.compareAndSet(null, thread))
      {
        thread.start();
      }
    }
  }
  /**
   * Stops the status analyzer for the domain.
   */
  private void stopStatusAnalyzer()
  {
    final StatusAnalyzer thread = statusAnalyzer.get();
    if (thread != null && statusAnalyzer.compareAndSet(thread, null))
    {
      thread.shutdown();
      thread.waitForShutdown();
    }
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  private void startMonitoringPublisher()
@@ -2569,62 +2519,62 @@
  }
  private void sendTopologyMsg(String type, ServerHandler handler,
      TopologyMsg msg)
  {
    for (int i = 1; i <= 2; i++)
    {
      if (!handler.shuttingDown()
          && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
      {
        try
        {
          handler.sendTopoInfo(msg);
          break;
        }
        catch (IOException e)
        {
          if (i == 2)
          {
            logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO,
                baseDN.toNormalizedString(), type, handler.getServerId(),
                e.getMessage());
          }
        }
      }
      sleep(100);
    }
  }
  /**
   * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
   * value received, and forwarding the message to the other RSes.
   * @param senderHandler The handler for the server that sent the heartbeat.
   * @param msg The message to process.
   */
  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
      ChangeTimeHeartbeatMsg msg)
  {
    try
    domainDB.replicaHeartbeat(baseDN, msg.getCSN());
    if (senderHandler.isDataServer())
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    }
    catch (InterruptedException ex)
    {
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
    {
      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
      if (senderHandler.isDataServer())
      /*
       * If we are the first replication server warned, then forward the message
       * to the remote replication servers.
       */
      synchronized (pendingStatusMessagesLock)
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : connectedRSs.values())
        {
          try
          {
            if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
            {
              rsHandler.send(msg);
            }
          }
          catch (IOException e)
          {
            logger.traceException(e);
            logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG,
                "Replication Server " + localReplicationServer.getReplicationPort() + " "
                    + baseDN + " " + localReplicationServer.getServerId());
            stopServer(rsHandler, false);
          }
        }
        pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
      }
    }
    finally
    {
      release();
      statusAnalyzer.notifyPendingStatusMessage();
    }
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
@@ -2660,26 +2610,6 @@
  }
  /**
   * Update the status analyzer with the new threshold value.
   *
   * @param degradedStatusThreshold
   *          The new threshold value.
   */
  void updateDegradedStatusThreshold(int degradedStatusThreshold)
  {
    if (degradedStatusThreshold == 0)
    {
      // Requested to stop analyzers
      stopStatusAnalyzer();
    }
    else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
    {
      // Requested to start analyzers with provided threshold value
      startStatusAnalyzer();
    }
  }
  /**
   * Update the monitoring publisher with the new period value.
   *
   * @param period
@@ -2715,7 +2645,6 @@
   */
  public void register(DataServerHandler dsHandler)
  {
    startStatusAnalyzer();
    startMonitoringPublisher();
    // connected with new DS: store handler.
@@ -2723,7 +2652,7 @@
    // Tell peer RSs and DSs a new DS just connected to us
    // No need to re-send TopologyMsg to this just new DS
    sendTopoInfoToAllExcept(dsHandler);
    enqueueTopoInfoToAllExcept(dsHandler);
  }
  /**
@@ -2765,7 +2694,7 @@
    // test
    if (degradedStatusThreshold > 0)
    {
      for (DataServerHandler serverHandler : getConnectedDSs().values())
      for (DataServerHandler serverHandler : connectedDSs.values())
      {
        // Get number of pending changes for this server
        final int nChanges = serverHandler.getRcvMsgQueueSize();
@@ -2801,4 +2730,158 @@
      }
    }
  }
  /**
   * Sends any enqueued status messages to the rest of the topology.
   */
  void sendPendingStatusMessages()
  {
    /*
     * Take a snapshot of pending status notifications in order to avoid holding
     * the broadcast lock for too long. In addition, clear the notifications so
     * that they are not resent the next time.
     */
    final PendingStatusMessages savedState;
    synchronized (pendingStatusMessagesLock)
    {
      savedState = pendingStatusMessages;
      pendingStatusMessages = new PendingStatusMessages();
    }
    sendPendingChangeTimeHeartbeatMsgs(savedState);
    sendPendingTopologyMsgs(savedState);
    sendPendingMonitorMsgs(savedState);
  }
  private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
  {
    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
        .entrySet())
    {
      ServerHandler ds = connectedDSs.get(msg.getKey());
      if (ds != null)
      {
        try
        {
          ds.send(msg.getValue());
        }
        catch (IOException e)
        {
          // Ignore: connection closed.
        }
      }
    }
    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
        .entrySet())
    {
      ServerHandler rs = connectedRSs.get(msg.getKey());
      if (rs != null)
      {
        try
        {
          rs.send(msg.getValue());
        }
        catch (IOException e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          // FIXME: why do we log for RSs but not DSs?
          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getValue()
              .getDestination());
        }
      }
    }
  }
  private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
  {
    for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats
        .values())
    {
      for (ReplicationServerHandler rsHandler : connectedRSs.values())
      {
        try
        {
          if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
          {
            rsHandler.send(pendingHeartbeat);
          }
        }
        catch (IOException e)
        {
          logger.traceException(e);
          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, "Replication Server "
              + localReplicationServer.getReplicationPort() + " " + baseDN
              + " " + localReplicationServer.getServerId());
          stopServer(rsHandler, false);
        }
      }
    }
  }
  private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
  {
    if (pendingMsgs.sendDSTopologyMsg)
    {
      for (ServerHandler handler : connectedDSs.values())
      {
        if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
        {
          final TopologyMsg topoMsg = createTopologyMsgForDS(handler
              .getServerId());
          sendTopologyMsg("directory", handler, topoMsg);
        }
      }
    }
    if (pendingMsgs.sendRSTopologyMsg)
    {
      final TopologyMsg topoMsg = createTopologyMsgForRS();
      for (ServerHandler handler : connectedRSs.values())
      {
        sendTopologyMsg("replication", handler, topoMsg);
      }
    }
  }
  private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (sender.isDataServer())
    {
      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID(),
          domainMonitor.getMonitorData());
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
            monitorMsg);
      }
    }
    else
    {
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
            monitorMsg);
      }
    }
    statusAnalyzer.notifyPendingStatusMessage();
  }
}