mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.09.2014 b875ab3f7b327f797ec4532015e45da6ae3fff56
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,13 +22,14 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -53,6 +54,8 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
@@ -76,12 +79,10 @@
  private final DN baseDN;
  /**
   * The Status analyzer that periodically verifies whether the connected DSs
   * are late. Using an AtomicReference to avoid leaking references to costly
   * threads.
   * Periodically verifies whether the connected DSs are late and publishes any
   * pending status messages.
   */
  private AtomicReference<StatusAnalyzer> statusAnalyzer =
      new AtomicReference<StatusAnalyzer>();
  private final StatusAnalyzer statusAnalyzer;
  /**
   * The monitoring publisher that periodically sends monitoring messages to the
@@ -168,6 +169,96 @@
  private int assuredTimeoutTimerPurgeCounter = 0;
  /**
   * Stores pending status messages such as DS change time heartbeats for future
   * forwarding to the rest of the topology. This class is required in order to
   * decouple inbound IO processing from outbound IO processing and avoid
   * potential inter-process deadlocks. In particular, the {@code ServerReader}
   * thread must not send messages.
   */
  private static class PendingStatusMessages
  {
    private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats =
        new HashMap<Integer, ChangeTimeHeartbeatMsg>(1);
    private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs =
        new HashMap<Integer, MonitorMsg>(1);
    private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs =
        new HashMap<Integer, MonitorMsg>(1);
    private boolean sendRSTopologyMsg;
    private boolean sendDSTopologyMsg;
    private int excludedDSForTopologyMsg = -1;
    /**
     * Enqueues a TopologyMsg for all the connected directory servers in order
     * to let them know the topology (every known DSs and RSs).
     *
     * @param excludedDS
     *          If not null, the topology message will not be sent to this DS.
     */
    private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
    {
      int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
      if (sendDSTopologyMsg)
      {
        if (excludedServerId != excludedDSForTopologyMsg)
        {
          excludedDSForTopologyMsg = -1;
        }
      }
      else
      {
        sendDSTopologyMsg = true;
        excludedDSForTopologyMsg = excludedServerId;
      }
    }
    /**
     * Enqueues a TopologyMsg for all the connected replication servers in order
     * to let them know our connected LDAP servers.
     */
    private void enqueueTopoInfoToAllRSs()
    {
      sendRSTopologyMsg = true;
    }
    /**
     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
     * all other RS instances.
     *
     * @param msg
     *          The heartbeat message.
     */
    private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
    {
      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
    }
    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
    {
      pendingDSMonitorMsgs.put(dsServerId, msg);
    }
    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
    {
      pendingRSMonitorMsgs.put(rsServerId, msg);
    }
  }
  private final Object pendingStatusMessagesLock = new Object();
  /** @GuardedBy("pendingStatusMessagesLock") */
  private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
  /**
   * Creates a new ReplicationServerDomain associated to the baseDN.
   *
   * @param baseDN
@@ -185,7 +276,8 @@
        + ") assured timer for domain \"" + baseDN + "\"", true);
    this.domainDB =
        localReplicationServer.getChangelogDB().getReplicationDomainDB();
    this.statusAnalyzer = new StatusAnalyzer(this);
    this.statusAnalyzer.start();
    DirectoryServer.registerMonitorProvider(this);
  }
@@ -712,7 +804,7 @@
   * @param ack The ack message received.
   * @param ackingServer The server handler of the server that sent the ack.
   */
  public void processAck(AckMsg ack, ServerHandler ackingServer)
  void processAck(AckMsg ack, ServerHandler ackingServer)
  {
    // Retrieve the expected acks info for the update matching the original
    // sent update.
@@ -1003,21 +1095,12 @@
        if (connectedRSs.containsKey(sHandler.getServerId()))
        {
          unregisterServerHandler(sHandler, shutdown, false);
        } else if (connectedDSs.containsKey(sHandler.getServerId()))
        }
        else if (connectedDSs.containsKey(sHandler.getServerId()))
        {
          // If this is the last DS for the domain,
          // shutdown the status analyzer
          if (connectedDSs.size() == 1)
          {
            if (debugEnabled())
            {
              debug("remote server " + sHandler
                  + " is the last DS to be stopped: stopping status analyzer");
            }
            stopStatusAnalyzer();
          }
          unregisterServerHandler(sHandler, shutdown, true);
        } else if (otherHandlers.contains(sHandler))
        }
        else if (otherHandlers.contains(sHandler))
        {
          unregisterOtherHandler(sHandler);
        }
@@ -1052,15 +1135,19 @@
    resetGenerationIdIfPossible();
    if (!shutdown)
    {
      if (isDirectoryServer)
      synchronized (pendingStatusMessagesLock)
      {
        // Update the remote replication servers with our list
        // of connected LDAP servers
        sendTopoInfoToAllRSs();
        if (isDirectoryServer)
        {
          // Update the remote replication servers with our list
          // of connected LDAP servers
          pendingStatusMessages.enqueueTopoInfoToAllRSs();
        }
        // Warn our DSs that a RS or DS has quit (does not use this
        // handler as already removed from list)
        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
      }
      // Warn our DSs that a RS or DS has quit (does not use this
      // handler as already removed from list)
      sendTopoInfoToAllDSsExcept(null);
      statusAnalyzer.notifyPendingStatusMessage();
    }
  }
@@ -1405,94 +1492,64 @@
   *
   * @param msg
   *          The message received and to be processed.
   * @param msgEmitter
   *          The server handler of the server that emitted the message.
   * @param sender
   *          The server handler of the server that sent the message.
   */
  public void process(RoutableMsg msg, ServerHandler msgEmitter)
  void process(RoutableMsg msg, ServerHandler sender)
  {
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (!(msg instanceof InitializeRequestMsg) &&
        !(msg instanceof InitializeTargetMsg) &&
        !(msg instanceof InitializeRcvAckMsg) &&
        !(msg instanceof EntryMsg) &&
        !(msg instanceof DoneMsg) &&
        (msg.getDestination() == this.localReplicationServer.getServerId()))
    if (msg.getDestination() == localReplicationServer.getServerId())
    {
      // Handle routable messages targeted at this RS.
      if (msg instanceof ErrorMsg)
      {
        ErrorMsg errorMsg = (ErrorMsg) msg;
        logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
      } else if (msg instanceof MonitorRequestMsg)
      {
        replyWithTopologyMonitorMsg(msg, msgEmitter);
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg = (MonitorMsg) msg;
        domainMonitor.receiveMonitorDataResponse(monitorMsg,
            msgEmitter.getServerId());
      } else
      {
        replyWithUnroutableMsgType(msgEmitter, msg);
        logError(ERR_ERROR_MSG_RECEIVED.get(((ErrorMsg) msg).getDetails()));
      }
      return;
    }
    List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
    if (!servers.isEmpty())
    {
      forwardMsgToAllServers(msg, servers, msgEmitter);
      else
      {
        replyWithUnroutableMsgType(sender, msg);
      }
    }
    else
    {
      replyWithUnreachablePeerMsg(msgEmitter, msg);
      // Forward message not destined for this RS.
      List<ServerHandler> servers = getDestinationServers(msg, sender);
      if (!servers.isEmpty())
      {
        forwardMsgToAllServers(msg, servers, sender);
      }
      else
      {
        replyWithUnreachablePeerMsg(sender, msg);
      }
    }
  }
  private void replyWithTopologyMonitorMsg(RoutableMsg msg,
      ServerHandler msgEmitter)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (msgEmitter.isDataServer())
    {
      // Monitoring information requested by a DS
      try
      {
        MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
            msg.getDestination(), msg.getSenderID(),
            domainMonitor.getMonitorData());
        msgEmitter.send(monitorMsg);
      }
      catch (IOException e)
      {
        // the connection was closed.
      }
    }
    else
    {
      // Monitoring information requested by a RS
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
      if (monitorMsg != null)
      {
        try
        {
          msgEmitter.send(monitorMsg);
        }
        catch (IOException e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg
              .getDestination())));
        }
      }
    }
  /**
   * Responds to a monitor request message.
   *
   * @param msg
   *          The monitor request message.
   * @param sender
   *          The DS/RS which sent the monitor request.
   */
  void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
  {
    enqueueMonitorMsg(msg, sender);
  }
  /**
   * Responds to a monitor message.
   *
   * @param msg
   *          The monitor message
   * @param sender
   *          The DS/RS which sent the monitor.
   */
  void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
  {
    domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
  }
  private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
@@ -1553,7 +1610,7 @@
  }
  private void forwardMsgToAllServers(RoutableMsg msg,
      List<ServerHandler> servers, ServerHandler msgEmitter)
      List<ServerHandler> servers, ServerHandler sender)
  {
    for (ServerHandler targetHandler : servers)
    {
@@ -1579,13 +1636,13 @@
        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
        try
        {
          msgEmitter.send(errMsg);
          sender.send(errMsg);
        } catch (IOException ioe1)
        {
          // an error happened on the sender session trying to recover
          // from an error on the receiver session.
          // We don't have much solution left beside closing the sessions.
          stopServer(msgEmitter, false);
          stopServer(sender, false);
          stopServer(targetHandler, false);
        }
      // TODO Handle error properly (sender timeout in addition)
@@ -1651,42 +1708,26 @@
   * @return The newly created and filled MonitorMsg. Null if the current thread
   *         was interrupted while attempting to get the domain lock.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    try
    final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
    monitorMsg.setReplServerDbState(getLatestServerState());
    // Add the server state for each connected DS and RS.
    for (DataServerHandler dsHandler : this.connectedDSs.values())
    {
      // Lock domain as we need to go through connected servers list
      lock();
    }
    catch (InterruptedException e)
    {
      return null;
      monitorMsg.setServerState(dsHandler.getServerId(),
          dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
          true);
    }
    try
    for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
    {
      final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
      monitorMsg.setReplServerDbState(getLatestServerState());
      // Add the server state for each connected DS and RS.
      for (DataServerHandler dsHandler : this.connectedDSs.values())
      {
        monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
            .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
      }
      for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
      {
        monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
            .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
      }
      return monitorMsg;
      monitorMsg.setServerState(rsHandler.getServerId(),
          rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
          false);
    }
    finally
    {
      release();
    }
    return monitorMsg;
  }
  /**
@@ -1700,6 +1741,7 @@
    assuredTimeoutTimer.cancel();
    stopAllServers(true);
    statusAnalyzer.shutdown();
  }
  /**
@@ -1723,83 +1765,7 @@
    return "ReplicationServerDomain " + baseDN;
  }
  /**
   * Send a TopologyMsg to all the connected directory servers in order to let
   * them know the topology (every known DSs and RSs).
   *
   * @param notThisOne
   *          If not null, the topology message will not be sent to this DS.
   */
  private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
  {
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      if (dsHandler != notThisOne)
      // All except the supplied one
      {
        for (int i=1; i<=2; i++)
        {
          if (!dsHandler.shuttingDown()
              && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          {
            TopologyMsg topoMsg =
                createTopologyMsgForDS(dsHandler.getServerId());
            try
            {
              dsHandler.sendTopoInfo(topoMsg);
              break;
            }
            catch (IOException e)
            {
              if (i == 2)
              {
                Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                    baseDN.toNormalizedString(), "directory",
                    Integer.toString(dsHandler.getServerId()), e.getMessage());
                logError(message);
              }
            }
          }
          sleep(100);
        }
      }
    }
  }
  /**
   * Send a TopologyMsg to all the connected replication servers
   * in order to let them know our connected LDAP servers.
   */
  private void sendTopoInfoToAllRSs()
  {
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      for (int i=1; i<=2; i++)
      {
        if (!rsHandler.shuttingDown()
            && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
        {
          try
          {
            rsHandler.sendTopoInfo(topoMsg);
            break;
          }
          catch (IOException e)
          {
            if (i == 2)
            {
              Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                  baseDN.toNormalizedString(), "replication",
                  Integer.toString(rsHandler.getServerId()), e.getMessage());
              logError(message);
            }
          }
        }
        sleep(100);
      }
    }
  }
  /**
   * Creates a TopologyMsg filled with information to be sent to a remote RS.
@@ -2062,7 +2028,7 @@
        return;
      }
      sendTopoInfoToAllExcept(senderHandler);
      enqueueTopoInfoToAllExcept(senderHandler);
      Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
          senderHandler.getServerId(), baseDN.toNormalizedString(),
@@ -2087,7 +2053,7 @@
   * @param event The event to be used for new status computation
   * @return True if we have been interrupted (must stop), false otherwise
   */
  public boolean changeStatus(DataServerHandler dsHandler,
  private boolean changeStatus(DataServerHandler dsHandler,
      StatusMachineEvent event)
  {
    try
@@ -2142,7 +2108,7 @@
        return false;
      }
      sendTopoInfoToAllExcept(dsHandler);
      enqueueTopoInfoToAllExcept(dsHandler);
    }
    catch (Exception e)
    {
@@ -2162,7 +2128,7 @@
   */
  public void sendTopoInfoToAll()
  {
    sendTopoInfoToAllExcept(null);
    enqueueTopoInfoToAllExcept(null);
  }
  /**
@@ -2171,10 +2137,14 @@
   * @param dsHandler
   *          if not null, the topology message will not be sent to this DS
   */
  private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
  private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
  {
    sendTopoInfoToAllDSsExcept(dsHandler);
    sendTopoInfoToAllRSs();
    synchronized (pendingStatusMessagesLock)
    {
      pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
      pendingStatusMessages.enqueueTopoInfoToAllRSs();
    }
    statusAnalyzer.notifyPendingStatusMessage();
  }
  /**
@@ -2293,7 +2263,11 @@
       * Sends the currently known topology information to every connected
       * DS we have.
       */
      sendTopoInfoToAllDSsExcept(null);
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
      }
      statusAnalyzer.notifyPendingStatusMessage();
    }
    catch(Exception e)
    {
@@ -2414,37 +2388,6 @@
  }
  /**
   * Starts the status analyzer for the domain if not already started.
   */
  private void startStatusAnalyzer()
  {
    int degradedStatusThreshold =
        localReplicationServer.getDegradedStatusThreshold();
    if (degradedStatusThreshold > 0) // 0 means no status analyzer
    {
      final StatusAnalyzer thread =
          new StatusAnalyzer(this, degradedStatusThreshold);
      if (statusAnalyzer.compareAndSet(null, thread))
      {
        thread.start();
      }
    }
  }
  /**
   * Stops the status analyzer for the domain.
   */
  private void stopStatusAnalyzer()
  {
    final StatusAnalyzer thread = statusAnalyzer.get();
    if (thread != null && statusAnalyzer.compareAndSet(thread, null))
    {
      thread.shutdown();
      thread.waitForShutdown();
    }
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  private void startMonitoringPublisher()
@@ -2611,63 +2554,62 @@
  }
  private void sendTopologyMsg(String type, ServerHandler handler,
      TopologyMsg msg)
  {
    for (int i = 1; i <= 2; i++)
    {
      if (!handler.shuttingDown()
          && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
      {
        try
        {
          handler.sendTopoInfo(msg);
          break;
        }
        catch (IOException e)
        {
          if (i == 2)
          {
            logError(ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                baseDN.toNormalizedString(), type,
                String.valueOf(handler.getServerId()), e.getMessage()));
          }
        }
      }
      sleep(100);
    }
  }
  /**
   * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
   * value received, and forwarding the message to the other RSes.
   * @param senderHandler The handler for the server that sent the heartbeat.
   * @param msg The message to process.
   */
  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
      ChangeTimeHeartbeatMsg msg)
  {
    try
    domainDB.replicaHeartbeat(baseDN, msg.getCSN());
    if (senderHandler.isDataServer())
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    }
    catch (InterruptedException ex)
    {
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
    {
      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
      if (senderHandler.isDataServer())
      /*
       * If we are the first replication server warned, then forward the message
       * to the remote replication servers.
       */
      synchronized (pendingStatusMessagesLock)
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : connectedRSs.values())
        {
          try
          {
            if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
            {
              rsHandler.send(msg);
            }
          }
          catch (IOException e)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG
                .get("Replication Server "
                    + localReplicationServer.getReplicationPort() + " "
                    + baseDN + " " + localReplicationServer.getServerId()));
            stopServer(rsHandler, false);
          }
        }
        pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
      }
    }
    finally
    {
      release();
      statusAnalyzer.notifyPendingStatusMessage();
    }
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
@@ -2703,33 +2645,6 @@
  }
  /**
   * Update the status analyzer with the new threshold value.
   *
   * @param degradedStatusThreshold
   *          The new threshold value.
   */
  void updateDegradedStatusThreshold(int degradedStatusThreshold)
  {
    if (degradedStatusThreshold == 0)
    {
      // Requested to stop analyzers
      stopStatusAnalyzer();
      return;
    }
    final StatusAnalyzer saThread = statusAnalyzer.get();
    if (saThread != null) // it is running
    {
      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
    }
    else if (connectedDSs.size() > 0)
    {
      // Requested to start analyzers with provided threshold value
      startStatusAnalyzer();
    }
  }
  /**
   * Update the monitoring publisher with the new period value.
   *
   * @param period
@@ -2765,7 +2680,6 @@
   */
  public void register(DataServerHandler dsHandler)
  {
    startStatusAnalyzer();
    startMonitoringPublisher();
    // connected with new DS: store handler.
@@ -2773,7 +2687,7 @@
    // Tell peer RSs and DSs a new DS just connected to us
    // No need to re-send TopologyMsg to this just new DS
    sendTopoInfoToAllExcept(dsHandler);
    enqueueTopoInfoToAllExcept(dsHandler);
  }
  /**
@@ -2798,4 +2712,211 @@
        + " and port=" + localReplicationServer.getReplicationPort()
        + ": " + message);
  }
  /**
   * Go through each connected DS, get the number of pending changes we have for
   * it and change status accordingly if threshold value is crossed/uncrossed.
   */
  void checkDSDegradedStatus()
  {
    final int degradedStatusThreshold = localReplicationServer
        .getDegradedStatusThreshold();
    // Threshold value = 0 means no status analyzer (no degrading system)
    // we should not have that as the status analyzer thread should not be
    // created if this is the case, but for sanity purpose, we add this
    // test
    if (degradedStatusThreshold > 0)
    {
      for (DataServerHandler serverHandler : connectedDSs.values())
      {
        // Get number of pending changes for this server
        final int nChanges = serverHandler.getRcvMsgQueueSize();
        if (debugEnabled())
        {
          TRACER.debugInfo("In RS " + getLocalRSServerId() + ", for baseDN="
              + getBaseDN() + ": " + "Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue.");
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (nChanges >= degradedStatusThreshold)
        {
          if (serverHandler.getStatus() == NORMAL_STATUS
              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
          {
            break; // Interrupted.
          }
        }
        else
        {
          if (serverHandler.getStatus() == DEGRADED_STATUS
              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
          {
            break; // Interrupted.
          }
        }
      }
    }
  }
  /**
   * Sends any enqueued status messages to the rest of the topology.
   */
  void sendPendingStatusMessages()
  {
    /*
     * Take a snapshot of pending status notifications in order to avoid holding
     * the broadcast lock for too long. In addition, clear the notifications so
     * that they are not resent the next time.
     */
    final PendingStatusMessages savedState;
    synchronized (pendingStatusMessagesLock)
    {
      savedState = pendingStatusMessages;
      pendingStatusMessages = new PendingStatusMessages();
    }
    sendPendingChangeTimeHeartbeatMsgs(savedState);
    sendPendingTopologyMsgs(savedState);
    sendPendingMonitorMsgs(savedState);
  }
  private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
  {
    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
        .entrySet())
    {
      ServerHandler ds = connectedDSs.get(msg.getKey());
      if (ds != null)
      {
        try
        {
          ds.send(msg.getValue());
        }
        catch (IOException e)
        {
          // Ignore: connection closed.
        }
      }
    }
    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
        .entrySet())
    {
      ServerHandler rs = connectedRSs.get(msg.getKey());
      if (rs != null)
      {
        try
        {
          rs.send(msg.getValue());
        }
        catch (IOException e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          // FIXME: why do we log for RSs but not DSs?
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(String.valueOf(msg
              .getValue().getDestination())));
        }
      }
    }
  }
  private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
  {
    for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats
        .values())
    {
      for (ReplicationServerHandler rsHandler : connectedRSs.values())
      {
        try
        {
          if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
          {
            rsHandler.send(pendingHeartbeat);
          }
        }
        catch (IOException e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get("Replication Server "
              + localReplicationServer.getReplicationPort() + " " + baseDN
              + " " + localReplicationServer.getServerId()));
          stopServer(rsHandler, false);
        }
      }
    }
  }
  private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
  {
    if (pendingMsgs.sendDSTopologyMsg)
    {
      for (ServerHandler handler : connectedDSs.values())
      {
        if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
        {
          final TopologyMsg topoMsg = createTopologyMsgForDS(handler
              .getServerId());
          sendTopologyMsg("directory", handler, topoMsg);
        }
      }
    }
    if (pendingMsgs.sendRSTopologyMsg)
    {
      final TopologyMsg topoMsg = createTopologyMsgForRS();
      for (ServerHandler handler : connectedRSs.values())
      {
        sendTopologyMsg("replication", handler, topoMsg);
      }
    }
  }
  private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (sender.isDataServer())
    {
      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID(),
          domainMonitor.getMonitorData());
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
            monitorMsg);
      }
    }
    else
    {
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
            monitorMsg);
      }
    }
    statusAnalyzer.notifyPendingStatusMessage();
  }
}