mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.09.2014 b875ab3f7b327f797ec4532015e45da6ae3fff56
Backport fix for OPENDJ-1354: replication threads BLOCKED in pendingChanges queue

Decouple writing of status messages (change time heartbeats, monitor, and topology msgs) from RS reader threads through the use of a simple event service. It is now the responsibility of the StatusAnalyzer thread to send status messages when notified to do so by the ReplicationServerDomain. In addition, the Monitor*Msgs are no longer routable since they were only ever sent directly between peers. This simplifies some of the request processing in ReplicationServerDomain.

This change does not attempt to solve potential deadlocks arising from transmission of assured replication acks, status changes, generation ID updates, windowing messages (which are deprecated), total update messages, and error messages.
9 files modified
1251 ■■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java 67 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java 93 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java 12 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 767 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 44 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java 237 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -50,12 +50,24 @@
 * When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a
 * MonitorMsg.
 */
public class MonitorMsg extends RoutableMsg
public class MonitorMsg extends ReplicationMsg
{
  /**
   * Data structure to manage the state and the approximation
   * of the data of the first missing change for each LDAP server
   * connected to a Replication Server.
   * The destination server or servers of this message.
   */
  private final int destination;
  /**
   * The serverID of the server that sends this message.
   */
  private final int senderID;
  /**
   * Data structure to manage the state and the approximation of the data of the
   * first missing change for each LDAP server connected to a Replication
   * Server.
   */
  static class ServerData
  {
@@ -89,24 +101,7 @@
   */
  public MonitorMsg(int sender, int destination)
  {
    super(sender, destination);
  }
  /**
   * Sets the sender ID.
   * @param senderID The sender ID.
   */
  public void setSenderID(int senderID)
  {
    this.senderID = senderID;
  }
  /**
   * Sets the destination.
   * @param destination The destination.
   */
  public void setDestination(int destination)
  {
    this.senderID = sender;
    this.destination = destination;
  }
@@ -459,6 +454,32 @@
    return data.rsStates.keySet().iterator();
  }
  /**
   * Get the destination.
   *
   * @return the destination
   */
  public int getDestination()
  {
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
   * @return the server id
   */
  public int getSenderID()
  {
    return senderID;
  }
  /**
   * {@inheritDoc}
   */
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -30,55 +30,70 @@
import java.util.zip.DataFormatException;
/**
 * This message is part of the replication protocol.
 * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring
 * informations.
 * When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a
 * MonitorMessage.
 * This message is part of the replication protocol. RS1 sends a
 * MonitorRequestMsg to RS2 to request its monitoring information. When RS2
 * receives a MonitorRequestMsg from RS1, RS2 responds with a MonitorMessage.
 */
public class MonitorRequestMsg extends RoutableMsg
public class MonitorRequestMsg extends ReplicationMsg
{
  /**
   * The destination server or servers of this message.
   */
  private final int destination;
  /**
   * The serverID of the server that sends this message.
   */
  private final int senderID;
  /**
   * Creates a message.
   *
   * @param serverID The sender server of this message.
   * @param destination The server or servers targeted by this message.
   * @param serverID
   *          The sender server of this message.
   * @param destination
   *          The server or servers targeted by this message.
   */
  public MonitorRequestMsg(int serverID, int destination)
  {
    super(serverID, destination);
    this.senderID = serverID;
    this.destination = destination;
  }
  /**
   * Creates a new message by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the message,
   * @throws DataFormatException If the in does not contain a properly,
   *                             encoded message.
   *
   * @param in
   *          A byte array containing the encoded information for the message,
   * @throws DataFormatException
   *           If the in does not contain a properly, encoded message.
   */
  public MonitorRequestMsg(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
        throw new DataFormatException("input is not a valid "
            + this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Integer.valueOf(senderString);
      pos += length +1;
      pos += length + 1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length +1;
      pos += length + 1;
    } catch (UnsupportedEncodingException e)
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
@@ -95,8 +110,7 @@
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1
                     + destinationBytes.length + 1;
      int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
@@ -117,4 +131,41 @@
      return null;
    }
  }
  /**
   * Get the destination.
   *
   * @return the destination
   */
  public int getDestination()
  {
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
   * @return the server id
   */
  public int getSenderID()
  {
    return senderID;
  }
  /**
   * Returns a string representation of the message.
   *
   * @return the string representation of this message.
   */
  public String toString()
  {
    return "[" + getClass().getCanonicalName() + " sender=" + senderID
        + " destination=" + destination + "]";
  }
}
opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -22,13 +22,19 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
/**
 * This is an abstract class of messages of the replication protocol
 * for message that needs to contain information about the server that
 * send them and the destination servers to which they should be sent.
 * This is an abstract class of messages of the replication protocol for message
 * that needs to contain information about the server that send them and the
 * destination servers to which they should be sent.
 * <p>
 * Routable messages are used when initializing a new replica from an existing
 * replica: the total update messages are sent across the topology from the
 * source replica to the target replica, possibly traversing one or two
 * replication servers in the process (e.g. DS1 -&gt; RS1 -&gt; RS2 -&gt; DS2).
 */
public abstract class RoutableMsg extends ReplicationMsg
{
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -104,7 +104,6 @@
          {
            break;
          }
          monitorMsg.setDestination(serverHandler.getServerId());
          try
          {
            serverHandler.send(monitorMsg);
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -829,16 +829,6 @@
      }
    }
    // Update threshold value for status analyzers
    final int newThreshold = config.getDegradedStatusThreshold();
    if (oldConfig.getDegradedStatusThreshold() != newThreshold)
    {
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        domain.updateDegradedStatusThreshold(newThreshold);
      }
    }
    // Update period value for monitoring publishers
    if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod())
    {
@@ -970,7 +960,6 @@
  /**
   * Creates the backend associated to this replication server.
   * @throws ConfigException
   */
  private void createBackend() throws ConfigException
  {
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,13 +22,14 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -53,6 +54,8 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
@@ -76,12 +79,10 @@
  private final DN baseDN;
  /**
   * The Status analyzer that periodically verifies whether the connected DSs
   * are late. Using an AtomicReference to avoid leaking references to costly
   * threads.
   * Periodically verifies whether the connected DSs are late and publishes any
   * pending status messages.
   */
  private AtomicReference<StatusAnalyzer> statusAnalyzer =
      new AtomicReference<StatusAnalyzer>();
  private final StatusAnalyzer statusAnalyzer;
  /**
   * The monitoring publisher that periodically sends monitoring messages to the
@@ -168,6 +169,96 @@
  private int assuredTimeoutTimerPurgeCounter = 0;
  /**
   * Stores pending status messages such as DS change time heartbeats for future
   * forwarding to the rest of the topology. This class is required in order to
   * decouple inbound IO processing from outbound IO processing and avoid
   * potential inter-process deadlocks. In particular, the {@code ServerReader}
   * thread must not send messages.
   */
  private static class PendingStatusMessages
  {
    private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats =
        new HashMap<Integer, ChangeTimeHeartbeatMsg>(1);
    private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs =
        new HashMap<Integer, MonitorMsg>(1);
    private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs =
        new HashMap<Integer, MonitorMsg>(1);
    private boolean sendRSTopologyMsg;
    private boolean sendDSTopologyMsg;
    private int excludedDSForTopologyMsg = -1;
    /**
     * Enqueues a TopologyMsg for all the connected directory servers in order
     * to let them know the topology (every known DSs and RSs).
     *
     * @param excludedDS
     *          If not null, the topology message will not be sent to this DS.
     */
    private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
    {
      int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
      if (sendDSTopologyMsg)
      {
        if (excludedServerId != excludedDSForTopologyMsg)
        {
          excludedDSForTopologyMsg = -1;
        }
      }
      else
      {
        sendDSTopologyMsg = true;
        excludedDSForTopologyMsg = excludedServerId;
      }
    }
    /**
     * Enqueues a TopologyMsg for all the connected replication servers in order
     * to let them know our connected LDAP servers.
     */
    private void enqueueTopoInfoToAllRSs()
    {
      sendRSTopologyMsg = true;
    }
    /**
     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
     * all other RS instances.
     *
     * @param msg
     *          The heartbeat message.
     */
    private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
    {
      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
    }
    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
    {
      pendingDSMonitorMsgs.put(dsServerId, msg);
    }
    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
    {
      pendingRSMonitorMsgs.put(rsServerId, msg);
    }
  }
  private final Object pendingStatusMessagesLock = new Object();
  /** @GuardedBy("pendingStatusMessagesLock") */
  private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
  /**
   * Creates a new ReplicationServerDomain associated to the baseDN.
   *
   * @param baseDN
@@ -185,7 +276,8 @@
        + ") assured timer for domain \"" + baseDN + "\"", true);
    this.domainDB =
        localReplicationServer.getChangelogDB().getReplicationDomainDB();
    this.statusAnalyzer = new StatusAnalyzer(this);
    this.statusAnalyzer.start();
    DirectoryServer.registerMonitorProvider(this);
  }
@@ -712,7 +804,7 @@
   * @param ack The ack message received.
   * @param ackingServer The server handler of the server that sent the ack.
   */
  public void processAck(AckMsg ack, ServerHandler ackingServer)
  void processAck(AckMsg ack, ServerHandler ackingServer)
  {
    // Retrieve the expected acks info for the update matching the original
    // sent update.
@@ -1003,21 +1095,12 @@
        if (connectedRSs.containsKey(sHandler.getServerId()))
        {
          unregisterServerHandler(sHandler, shutdown, false);
        } else if (connectedDSs.containsKey(sHandler.getServerId()))
        }
        else if (connectedDSs.containsKey(sHandler.getServerId()))
        {
          // If this is the last DS for the domain,
          // shutdown the status analyzer
          if (connectedDSs.size() == 1)
          {
            if (debugEnabled())
            {
              debug("remote server " + sHandler
                  + " is the last DS to be stopped: stopping status analyzer");
            }
            stopStatusAnalyzer();
          }
          unregisterServerHandler(sHandler, shutdown, true);
        } else if (otherHandlers.contains(sHandler))
        }
        else if (otherHandlers.contains(sHandler))
        {
          unregisterOtherHandler(sHandler);
        }
@@ -1052,15 +1135,19 @@
    resetGenerationIdIfPossible();
    if (!shutdown)
    {
      if (isDirectoryServer)
      synchronized (pendingStatusMessagesLock)
      {
        // Update the remote replication servers with our list
        // of connected LDAP servers
        sendTopoInfoToAllRSs();
        if (isDirectoryServer)
        {
          // Update the remote replication servers with our list
          // of connected LDAP servers
          pendingStatusMessages.enqueueTopoInfoToAllRSs();
        }
        // Warn our DSs that a RS or DS has quit (does not use this
        // handler as already removed from list)
        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
      }
      // Warn our DSs that a RS or DS has quit (does not use this
      // handler as already removed from list)
      sendTopoInfoToAllDSsExcept(null);
      statusAnalyzer.notifyPendingStatusMessage();
    }
  }
@@ -1405,94 +1492,64 @@
   *
   * @param msg
   *          The message received and to be processed.
   * @param msgEmitter
   *          The server handler of the server that emitted the message.
   * @param sender
   *          The server handler of the server that sent the message.
   */
  public void process(RoutableMsg msg, ServerHandler msgEmitter)
  void process(RoutableMsg msg, ServerHandler sender)
  {
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (!(msg instanceof InitializeRequestMsg) &&
        !(msg instanceof InitializeTargetMsg) &&
        !(msg instanceof InitializeRcvAckMsg) &&
        !(msg instanceof EntryMsg) &&
        !(msg instanceof DoneMsg) &&
        (msg.getDestination() == this.localReplicationServer.getServerId()))
    if (msg.getDestination() == localReplicationServer.getServerId())
    {
      // Handle routable messages targeted at this RS.
      if (msg instanceof ErrorMsg)
      {
        ErrorMsg errorMsg = (ErrorMsg) msg;
        logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
      } else if (msg instanceof MonitorRequestMsg)
      {
        replyWithTopologyMonitorMsg(msg, msgEmitter);
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg = (MonitorMsg) msg;
        domainMonitor.receiveMonitorDataResponse(monitorMsg,
            msgEmitter.getServerId());
      } else
      {
        replyWithUnroutableMsgType(msgEmitter, msg);
        logError(ERR_ERROR_MSG_RECEIVED.get(((ErrorMsg) msg).getDetails()));
      }
      return;
    }
    List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
    if (!servers.isEmpty())
    {
      forwardMsgToAllServers(msg, servers, msgEmitter);
      else
      {
        replyWithUnroutableMsgType(sender, msg);
      }
    }
    else
    {
      replyWithUnreachablePeerMsg(msgEmitter, msg);
      // Forward message not destined for this RS.
      List<ServerHandler> servers = getDestinationServers(msg, sender);
      if (!servers.isEmpty())
      {
        forwardMsgToAllServers(msg, servers, sender);
      }
      else
      {
        replyWithUnreachablePeerMsg(sender, msg);
      }
    }
  }
  private void replyWithTopologyMonitorMsg(RoutableMsg msg,
      ServerHandler msgEmitter)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (msgEmitter.isDataServer())
    {
      // Monitoring information requested by a DS
      try
      {
        MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
            msg.getDestination(), msg.getSenderID(),
            domainMonitor.getMonitorData());
        msgEmitter.send(monitorMsg);
      }
      catch (IOException e)
      {
        // the connection was closed.
      }
    }
    else
    {
      // Monitoring information requested by a RS
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
      if (monitorMsg != null)
      {
        try
        {
          msgEmitter.send(monitorMsg);
        }
        catch (IOException e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg
              .getDestination())));
        }
      }
    }
  /**
   * Responds to a monitor request message.
   *
   * @param msg
   *          The monitor request message.
   * @param sender
   *          The DS/RS which sent the monitor request.
   */
  void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
  {
    enqueueMonitorMsg(msg, sender);
  }
  /**
   * Responds to a monitor message.
   *
   * @param msg
   *          The monitor message
   * @param sender
   *          The DS/RS which sent the monitor.
   */
  void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
  {
    domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
  }
  private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
@@ -1553,7 +1610,7 @@
  }
  private void forwardMsgToAllServers(RoutableMsg msg,
      List<ServerHandler> servers, ServerHandler msgEmitter)
      List<ServerHandler> servers, ServerHandler sender)
  {
    for (ServerHandler targetHandler : servers)
    {
@@ -1579,13 +1636,13 @@
        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
        try
        {
          msgEmitter.send(errMsg);
          sender.send(errMsg);
        } catch (IOException ioe1)
        {
          // an error happened on the sender session trying to recover
          // from an error on the receiver session.
          // We don't have much solution left beside closing the sessions.
          stopServer(msgEmitter, false);
          stopServer(sender, false);
          stopServer(targetHandler, false);
        }
      // TODO Handle error properly (sender timeout in addition)
@@ -1651,42 +1708,26 @@
   * @return The newly created and filled MonitorMsg. Null if the current thread
   *         was interrupted while attempting to get the domain lock.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    try
    final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
    monitorMsg.setReplServerDbState(getLatestServerState());
    // Add the server state for each connected DS and RS.
    for (DataServerHandler dsHandler : this.connectedDSs.values())
    {
      // Lock domain as we need to go through connected servers list
      lock();
    }
    catch (InterruptedException e)
    {
      return null;
      monitorMsg.setServerState(dsHandler.getServerId(),
          dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
          true);
    }
    try
    for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
    {
      final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
      monitorMsg.setReplServerDbState(getLatestServerState());
      // Add the server state for each connected DS and RS.
      for (DataServerHandler dsHandler : this.connectedDSs.values())
      {
        monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
            .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
      }
      for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
      {
        monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
            .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
      }
      return monitorMsg;
      monitorMsg.setServerState(rsHandler.getServerId(),
          rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
          false);
    }
    finally
    {
      release();
    }
    return monitorMsg;
  }
  /**
@@ -1700,6 +1741,7 @@
    assuredTimeoutTimer.cancel();
    stopAllServers(true);
    statusAnalyzer.shutdown();
  }
  /**
@@ -1723,83 +1765,7 @@
    return "ReplicationServerDomain " + baseDN;
  }
  /**
   * Send a TopologyMsg to all the connected directory servers in order to let
   * them know the topology (every known DSs and RSs).
   *
   * @param notThisOne
   *          If not null, the topology message will not be sent to this DS.
   */
  private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
  {
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      if (dsHandler != notThisOne)
      // All except the supplied one
      {
        for (int i=1; i<=2; i++)
        {
          if (!dsHandler.shuttingDown()
              && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          {
            TopologyMsg topoMsg =
                createTopologyMsgForDS(dsHandler.getServerId());
            try
            {
              dsHandler.sendTopoInfo(topoMsg);
              break;
            }
            catch (IOException e)
            {
              if (i == 2)
              {
                Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                    baseDN.toNormalizedString(), "directory",
                    Integer.toString(dsHandler.getServerId()), e.getMessage());
                logError(message);
              }
            }
          }
          sleep(100);
        }
      }
    }
  }
  /**
   * Send a TopologyMsg to all the connected replication servers
   * in order to let them know our connected LDAP servers.
   */
  private void sendTopoInfoToAllRSs()
  {
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      for (int i=1; i<=2; i++)
      {
        if (!rsHandler.shuttingDown()
            && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
        {
          try
          {
            rsHandler.sendTopoInfo(topoMsg);
            break;
          }
          catch (IOException e)
          {
            if (i == 2)
            {
              Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                  baseDN.toNormalizedString(), "replication",
                  Integer.toString(rsHandler.getServerId()), e.getMessage());
              logError(message);
            }
          }
        }
        sleep(100);
      }
    }
  }
  /**
   * Creates a TopologyMsg filled with information to be sent to a remote RS.
@@ -2062,7 +2028,7 @@
        return;
      }
      sendTopoInfoToAllExcept(senderHandler);
      enqueueTopoInfoToAllExcept(senderHandler);
      Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
          senderHandler.getServerId(), baseDN.toNormalizedString(),
@@ -2087,7 +2053,7 @@
   * @param event The event to be used for new status computation
   * @return True if we have been interrupted (must stop), false otherwise
   */
  public boolean changeStatus(DataServerHandler dsHandler,
  private boolean changeStatus(DataServerHandler dsHandler,
      StatusMachineEvent event)
  {
    try
@@ -2142,7 +2108,7 @@
        return false;
      }
      sendTopoInfoToAllExcept(dsHandler);
      enqueueTopoInfoToAllExcept(dsHandler);
    }
    catch (Exception e)
    {
@@ -2162,7 +2128,7 @@
   */
  public void sendTopoInfoToAll()
  {
    sendTopoInfoToAllExcept(null);
    enqueueTopoInfoToAllExcept(null);
  }
  /**
@@ -2171,10 +2137,14 @@
   * @param dsHandler
   *          if not null, the topology message will not be sent to this DS
   */
  private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
  private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
  {
    sendTopoInfoToAllDSsExcept(dsHandler);
    sendTopoInfoToAllRSs();
    synchronized (pendingStatusMessagesLock)
    {
      pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
      pendingStatusMessages.enqueueTopoInfoToAllRSs();
    }
    statusAnalyzer.notifyPendingStatusMessage();
  }
  /**
@@ -2293,7 +2263,11 @@
       * Sends the currently known topology information to every connected
       * DS we have.
       */
      sendTopoInfoToAllDSsExcept(null);
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
      }
      statusAnalyzer.notifyPendingStatusMessage();
    }
    catch(Exception e)
    {
@@ -2414,37 +2388,6 @@
  }
  /**
   * Starts the status analyzer for the domain if not already started.
   */
  private void startStatusAnalyzer()
  {
    int degradedStatusThreshold =
        localReplicationServer.getDegradedStatusThreshold();
    if (degradedStatusThreshold > 0) // 0 means no status analyzer
    {
      final StatusAnalyzer thread =
          new StatusAnalyzer(this, degradedStatusThreshold);
      if (statusAnalyzer.compareAndSet(null, thread))
      {
        thread.start();
      }
    }
  }
  /**
   * Stops the status analyzer for the domain.
   */
  private void stopStatusAnalyzer()
  {
    final StatusAnalyzer thread = statusAnalyzer.get();
    if (thread != null && statusAnalyzer.compareAndSet(thread, null))
    {
      thread.shutdown();
      thread.waitForShutdown();
    }
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  private void startMonitoringPublisher()
@@ -2611,63 +2554,62 @@
  }
  private void sendTopologyMsg(String type, ServerHandler handler,
      TopologyMsg msg)
  {
    for (int i = 1; i <= 2; i++)
    {
      if (!handler.shuttingDown()
          && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
      {
        try
        {
          handler.sendTopoInfo(msg);
          break;
        }
        catch (IOException e)
        {
          if (i == 2)
          {
            logError(ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                baseDN.toNormalizedString(), type,
                String.valueOf(handler.getServerId()), e.getMessage()));
          }
        }
      }
      sleep(100);
    }
  }
  /**
   * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
   * value received, and forwarding the message to the other RSes.
   * @param senderHandler The handler for the server that sent the heartbeat.
   * @param msg The message to process.
   */
  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
      ChangeTimeHeartbeatMsg msg)
  {
    try
    domainDB.replicaHeartbeat(baseDN, msg.getCSN());
    if (senderHandler.isDataServer())
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    }
    catch (InterruptedException ex)
    {
      // We can't deal with this here, so re-interrupt thread so that it is
      // caught during subsequent IO.
      Thread.currentThread().interrupt();
      return;
    }
    try
    {
      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
      if (senderHandler.isDataServer())
      /*
       * If we are the first replication server warned, then forward the message
       * to the remote replication servers.
       */
      synchronized (pendingStatusMessagesLock)
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : connectedRSs.values())
        {
          try
          {
            if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
            {
              rsHandler.send(msg);
            }
          }
          catch (IOException e)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG
                .get("Replication Server "
                    + localReplicationServer.getReplicationPort() + " "
                    + baseDN + " " + localReplicationServer.getServerId()));
            stopServer(rsHandler, false);
          }
        }
        pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
      }
    }
    finally
    {
      release();
      statusAnalyzer.notifyPendingStatusMessage();
    }
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
@@ -2703,33 +2645,6 @@
  }
  /**
   * Update the status analyzer with the new threshold value.
   *
   * @param degradedStatusThreshold
   *          The new threshold value.
   */
  void updateDegradedStatusThreshold(int degradedStatusThreshold)
  {
    if (degradedStatusThreshold == 0)
    {
      // Requested to stop analyzers
      stopStatusAnalyzer();
      return;
    }
    final StatusAnalyzer saThread = statusAnalyzer.get();
    if (saThread != null) // it is running
    {
      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
    }
    else if (connectedDSs.size() > 0)
    {
      // Requested to start analyzers with provided threshold value
      startStatusAnalyzer();
    }
  }
  /**
   * Update the monitoring publisher with the new period value.
   *
   * @param period
@@ -2765,7 +2680,6 @@
   */
  public void register(DataServerHandler dsHandler)
  {
    startStatusAnalyzer();
    startMonitoringPublisher();
    // connected with new DS: store handler.
@@ -2773,7 +2687,7 @@
    // Tell peer RSs and DSs a new DS just connected to us
    // No need to re-send TopologyMsg to this just new DS
    sendTopoInfoToAllExcept(dsHandler);
    enqueueTopoInfoToAllExcept(dsHandler);
  }
  /**
@@ -2798,4 +2712,211 @@
        + " and port=" + localReplicationServer.getReplicationPort()
        + ": " + message);
  }
  /**
   * Go through each connected DS, get the number of pending changes we have for
   * it and change status accordingly if threshold value is crossed/uncrossed.
   */
  void checkDSDegradedStatus()
  {
    final int degradedStatusThreshold = localReplicationServer
        .getDegradedStatusThreshold();
    // Threshold value = 0 means no status analyzer (no degrading system)
    // we should not have that as the status analyzer thread should not be
    // created if this is the case, but for sanity purpose, we add this
    // test
    if (degradedStatusThreshold > 0)
    {
      for (DataServerHandler serverHandler : connectedDSs.values())
      {
        // Get number of pending changes for this server
        final int nChanges = serverHandler.getRcvMsgQueueSize();
        if (debugEnabled())
        {
          TRACER.debugInfo("In RS " + getLocalRSServerId() + ", for baseDN="
              + getBaseDN() + ": " + "Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue.");
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (nChanges >= degradedStatusThreshold)
        {
          if (serverHandler.getStatus() == NORMAL_STATUS
              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
          {
            break; // Interrupted.
          }
        }
        else
        {
          if (serverHandler.getStatus() == DEGRADED_STATUS
              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
          {
            break; // Interrupted.
          }
        }
      }
    }
  }
  /**
   * Sends any enqueued status messages to the rest of the topology.
   */
  void sendPendingStatusMessages()
  {
    /*
     * Take a snapshot of pending status notifications in order to avoid holding
     * the broadcast lock for too long. In addition, clear the notifications so
     * that they are not resent the next time.
     */
    final PendingStatusMessages savedState;
    synchronized (pendingStatusMessagesLock)
    {
      savedState = pendingStatusMessages;
      pendingStatusMessages = new PendingStatusMessages();
    }
    sendPendingChangeTimeHeartbeatMsgs(savedState);
    sendPendingTopologyMsgs(savedState);
    sendPendingMonitorMsgs(savedState);
  }
  private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
  {
    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
        .entrySet())
    {
      ServerHandler ds = connectedDSs.get(msg.getKey());
      if (ds != null)
      {
        try
        {
          ds.send(msg.getValue());
        }
        catch (IOException e)
        {
          // Ignore: connection closed.
        }
      }
    }
    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
        .entrySet())
    {
      ServerHandler rs = connectedRSs.get(msg.getKey());
      if (rs != null)
      {
        try
        {
          rs.send(msg.getValue());
        }
        catch (IOException e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          // FIXME: why do we log for RSs but not DSs?
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(String.valueOf(msg
              .getValue().getDestination())));
        }
      }
    }
  }
  private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
  {
    for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats
        .values())
    {
      for (ReplicationServerHandler rsHandler : connectedRSs.values())
      {
        try
        {
          if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
          {
            rsHandler.send(pendingHeartbeat);
          }
        }
        catch (IOException e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get("Replication Server "
              + localReplicationServer.getReplicationPort() + " " + baseDN
              + " " + localReplicationServer.getServerId()));
          stopServer(rsHandler, false);
        }
      }
    }
  }
  private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
  {
    if (pendingMsgs.sendDSTopologyMsg)
    {
      for (ServerHandler handler : connectedDSs.values())
      {
        if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
        {
          final TopologyMsg topoMsg = createTopologyMsgForDS(handler
              .getServerId());
          sendTopologyMsg("directory", handler, topoMsg);
        }
      }
    }
    if (pendingMsgs.sendRSTopologyMsg)
    {
      final TopologyMsg topoMsg = createTopologyMsgForRS();
      for (ServerHandler handler : connectedRSs.values())
      {
        sendTopologyMsg("replication", handler, topoMsg);
      }
    }
  }
  private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
  {
    /*
     * If the request comes from a Directory Server we need to build the full
     * list of all servers in the topology and send back a MonitorMsg with the
     * full list of all the servers in the topology.
     */
    if (sender.isDataServer())
    {
      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID(),
          domainMonitor.getMonitorData());
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
            monitorMsg);
      }
    }
    else
    {
      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
          msg.getDestination(), msg.getSenderID());
      synchronized (pendingStatusMessagesLock)
      {
        pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
            monitorMsg);
      }
    }
    statusAnalyzer.notifyPendingStatusMessage();
  }
}
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.server;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
@@ -484,7 +485,7 @@
   */
  public ReplicationServerDomain getDomain()
  {
    return this.replicationServerDomain;
    return replicationServerDomain;
  }
  /**
@@ -847,21 +848,45 @@
   *
   * @param msg The message to be processed.
   */
  public void process(RoutableMsg msg)
  void process(RoutableMsg msg)
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In "
          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
          + this + " processes routable msg received:" + msg);
    }
    replicationServerDomain.process(msg, this);
  }
  /**
   * Responds to a monitor request message.
   *
   * @param msg
   *          The monitor request message.
   */
  void processMonitorRequestMsg(MonitorRequestMsg msg)
  {
    replicationServerDomain.processMonitorRequestMsg(msg, this);
  }
  /**
   * Responds to a monitor message.
   *
   * @param msg
   *          The monitor message.
   */
  void processMonitorMsg(MonitorMsg msg)
  {
    replicationServerDomain.processMonitorMsg(msg, this);
  }
  /**
   * Processes a change time heartbeat msg.
   *
   * @param msg The message to be processed.
   */
  public void process(ChangeTimeHeartbeatMsg msg)
  void process(ChangeTimeHeartbeatMsg msg)
  {
    if (debugEnabled())
      TRACER.debugInfo("In "
@@ -925,15 +950,6 @@
  }
  /**
   * Sets the replication server domain associated.
   * @param rsd The provided replication server domain.
   */
  protected void setReplicationServerDomain(ReplicationServerDomain rsd)
  {
    this.replicationServerDomain = rsd;
  }
  /**
   * Sets the window size when used when sending to the remote.
   * @param size The provided window size.
   */
@@ -1179,7 +1195,7 @@
   * Process a Ack message received.
   * @param ack the message received.
   */
  public void processAck(AckMsg ack)
  void processAck(AckMsg ack)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.processAck(ack, this);
@@ -1200,7 +1216,7 @@
   * Process a ResetGenerationIdMsg message received.
   * @param msg the message received.
   */
  public void processResetGenId(ResetGenerationIdMsg msg)
  void processResetGenId(ResetGenerationIdMsg msg)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.resetGenerationId(this, msg);
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -180,8 +180,23 @@
          } else if (msg instanceof WindowMsg)
          {
            handler.updateWindow((WindowMsg) msg);
          } else if (msg instanceof RoutableMsg)
          }
          else if (msg instanceof MonitorRequestMsg)
          {
            handler.processMonitorRequestMsg((MonitorRequestMsg) msg);
          }
          else if (msg instanceof MonitorMsg)
          {
            handler.processMonitorMsg((MonitorMsg) msg);
          }
          else if (msg instanceof RoutableMsg)
          {
            /*
             * Note that we handle monitor messages separately since they in
             * fact never need "routing" and are instead sent directly between
             * connected peers. Doing so allows us to more clearly decouple
             * write IO from the reader thread (see OPENDJ-1354).
             */
            handler.process((RoutableMsg) msg);
          } else if (msg instanceof ResetGenerationIdMsg)
          {
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -22,18 +22,19 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.types.DebugLogLevel;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
/**
 * This thread is in charge of periodically determining if the connected
@@ -44,46 +45,45 @@
 * the threshold is uncrossed, the status analyzer must make the DS status
 * change back to NORMAL_STATUS. To have meaning of status, please refer to
 * ServerStatus class.
 * <p>
 * In addition, this thread is responsible for publishing any pending status
 * messages.
 */
public class StatusAnalyzer extends DirectoryThread
class StatusAnalyzer extends DirectoryThread
{
  private volatile boolean shutdown = false;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private final ReplicationServerDomain replicationServerDomain;
  private volatile int degradedStatusThreshold = -1;
  /** Sleep time for the thread, in ms. */
  private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
  private volatile boolean done = false;
  private final ReplicationServerDomain replicationServerDomain;
  private final Object eventMonitor = new Object();
  private boolean pendingStatusMessage = false;
  private long nextCheckDSDegradedStatusTime;
  private final Object shutdownLock = new Object();
  /**
   * Create a StatusAnalyzer.
   * @param replicationServerDomain The ReplicationServerDomain the status
   *        analyzer is for.
   * @param degradedStatusThreshold The pending changes threshold value to be
   * used for putting a DS in DEGRADED_STATUS.
   *
   * @param replicationServerDomain
   *          The ReplicationServerDomain the status analyzer is for.
   */
  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
    int degradedStatusThreshold)
  StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
  {
    super("Replication server RS("
        + replicationServerDomain.getLocalRSServerId()
        + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
        + ") status monitor for domain \""
        + replicationServerDomain.getBaseDN()
        + "\"");
    this.replicationServerDomain = replicationServerDomain;
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  /**
   * Analyzes if servers are late or not, and change their status accordingly.
   */
@@ -96,79 +96,55 @@
          getMessage("Directory server status analyzer starting."));
    }
    while (!shutdown)
    try
    {
      synchronized (shutdownLock)
      while (true)
      {
        if (!shutdown)
        final boolean requestStatusBroadcastWasRequested;
        synchronized (eventMonitor)
        {
          try
          if (!isShutdownInitiated() && !pendingStatusMessage)
          {
            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
            eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
          }
          catch (InterruptedException e)
          {
            // Server shutdown monitor may interrupt slow threads.
            if (debugEnabled())
            {
              TRACER.debugCaught(DebugLogLevel.ERROR, e);
            }
            shutdown = true;
            break;
          }
        }
      }
      // Go through each connected DS, get the number of pending changes we have
      // for it and change status accordingly if threshold value is
      // crossed/uncrossed
      for (DataServerHandler serverHandler :
        replicationServerDomain.getConnectedDSs().values())
      {
        // Get number of pending changes for this server
        int nChanges = serverHandler.getRcvMsgQueueSize();
        if (debugEnabled())
        {
          TRACER.debugInfo(getMessage("Status analyzer: DS "
              + serverHandler.getServerId() + " has " + nChanges
              + " message(s) in writer queue."));
          requestStatusBroadcastWasRequested = pendingStatusMessage;
          pendingStatusMessage = false;
        }
        // Check status to know if it is relevant to change the status. Do not
        // take RSD lock to test. If we attempt to change the status whereas
        // the current status does allow it, this will be noticed by
        // the changeStatusFromStatusAnalyzer() method. This allows to take the
        // lock roughly only when needed versus every sleep time timeout.
        if (degradedStatusThreshold > 0)
          // Threshold value = 0 means no status analyzer (no degrading system)
          // we should not have that as the status analyzer thread should not be
          // created if this is the case, but for sanity purpose, we add this
          // test
        if (isShutdownInitiated())
        {
          if (nChanges >= degradedStatusThreshold)
          {
            if (serverHandler.getStatus() == NORMAL_STATUS
                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
            {
              break;
            }
          }
          else
          {
            if (serverHandler.getStatus() == DEGRADED_STATUS
                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
            {
              break;
            }
          }
          break;
        }
        // Broadcast heartbeats, topology messages, etc if requested.
        if (requestStatusBroadcastWasRequested)
        {
          replicationServerDomain.sendPendingStatusMessages();
        }
        /*
         * Check the degraded status for connected DS instances only if
         * sufficient time has passed. The current time is not cached because
         * the call to checkDSDegradedStatus may take some time.
         */
        if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
        {
          replicationServerDomain.checkDSDegradedStatus();
          nextCheckDSDegradedStatusTime = System.currentTimeMillis()
              + STATUS_ANALYZER_SLEEP_TIME;
        }
      }
    }
    catch (InterruptedException e)
    {
      // Forcefully stopped.
    }
    done = true;
    TRACER.debugInfo(getMessage("Status analyzer is terminated."));
  }
  private String getMessage(String message)
  {
    return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -176,75 +152,50 @@
        + message;
  }
  private boolean isInterrupted(DataServerHandler serverHandler,
      StatusMachineEvent event)
  {
    if (replicationServerDomain.changeStatus(serverHandler, event))
    {
      // Finish job and let thread die
      TRACER.debugInfo(
          getMessage("Status analyzer has been interrupted and will die."));
      return true;
    }
    return false;
  }
  /**
   * Stops the thread.
   */
  public void shutdown()
  void shutdown()
  {
    synchronized (shutdownLock)
    {
      shutdown = true;
      shutdownLock.notifyAll();
      if (debugEnabled())
      {
        TRACER.debugInfo(getMessage("Shutting down status analyzer."));
      }
    }
  }
  /**
   * Waits for analyzer death. If not finished within 2 seconds,
   * forces interruption
   */
  public void waitForShutdown()
  {
    try
    {
      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
      int n = 0;
      while (!done && this.isAlive())
      {
        Thread.sleep(50);
        n++;
        if (n >= FACTOR)
        {
          TRACER.debugInfo(getMessage("Interrupting status analyzer."));
          interrupt();
        }
      }
    } catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
    }
  }
  /**
   * Sets the threshold value.
   * @param degradedStatusThreshold The new threshold value.
   */
  public void setDegradedStatusThreshold(int degradedStatusThreshold)
  {
    initiateShutdown();
    if (debugEnabled())
    {
      TRACER.debugInfo(getMessage(
          "Directory server status analyzer changing threshold value to "
              + degradedStatusThreshold));
      TRACER.debugInfo(getMessage("Shutting down status analyzer."));
    }
    synchronized (eventMonitor)
    {
      eventMonitor.notifyAll();
    }
    try
    {
      join(2000);
    }
    catch (InterruptedException e)
    {
      // Trapped: forcefully stop the thread.
    }
    if (isAlive())
    {
      // The join timed out or was interrupted so attempt to forcefully stop the
      // analyzer.
      interrupt();
    }
  }
    this.degradedStatusThreshold = degradedStatusThreshold;
  /**
   * Requests that a topology state related message be broadcast to the rest of
   * the topology. Messages include DS heartbeats, topology information, etc.
   */
  void notifyPendingStatusMessage()
  {
    synchronized (eventMonitor)
    {
      pendingStatusMessage = true;
      eventMonitor.notifyAll();
    }
  }
}