From b875ab3f7b327f797ec4532015e45da6ae3fff56 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 08 Apr 2014 09:09:25 +0000
Subject: [PATCH] Backport fix for OPENDJ-1354: replication threads BLOCKED in pendingChanges queue

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java       |   11 
 opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java            |   67 ++-
 opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java     |    3 
 opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java          |  237 ++++------
 opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java     |   93 +++-
 opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java           |   12 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java           |   44 +
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  767 +++++++++++++++++++++---------------
 opends/src/server/org/opends/server/replication/server/ServerReader.java            |   17 
 9 files changed, 710 insertions(+), 541 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 9b55637..a25690c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
- *      Portions Copyright 2013 ForgeRock AS.
+ *      Portions Copyright 2013-2014 ForgeRock AS.
  */
 package org.opends.server.replication.protocol;
 
@@ -50,12 +50,24 @@
  * When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a
  * MonitorMsg.
  */
-public class MonitorMsg extends RoutableMsg
+public class MonitorMsg extends ReplicationMsg
 {
   /**
-   * Data structure to manage the state and the approximation
-   * of the data of the first missing change for each LDAP server
-   * connected to a Replication Server.
+   * The destination server or servers of this message.
+   */
+  private final int destination;
+
+  /**
+   * The serverID of the server that sends this message.
+   */
+  private final int senderID;
+
+
+
+  /**
+   * Data structure to manage the state and the approximation of the data of the
+   * first missing change for each LDAP server connected to a Replication
+   * Server.
    */
   static class ServerData
   {
@@ -89,24 +101,7 @@
    */
   public MonitorMsg(int sender, int destination)
   {
-    super(sender, destination);
-  }
-
-  /**
-   * Sets the sender ID.
-   * @param senderID The sender ID.
-   */
-  public void setSenderID(int senderID)
-  {
-    this.senderID = senderID;
-  }
-
-  /**
-   * Sets the destination.
-   * @param destination The destination.
-   */
-  public void setDestination(int destination)
-  {
+    this.senderID = sender;
     this.destination = destination;
   }
 
@@ -459,6 +454,32 @@
     return data.rsStates.keySet().iterator();
   }
 
+
+
+  /**
+   * Get the destination.
+   *
+   * @return the destination
+   */
+  public int getDestination()
+  {
+    return destination;
+  }
+
+
+
+  /**
+   * Get the server ID of the server that sent this message.
+   *
+   * @return the server id
+   */
+  public int getSenderID()
+  {
+    return senderID;
+  }
+
+
+
   /**
    * {@inheritDoc}
    */
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
index baf8e26..cfe790c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2008-2009 Sun Microsystems, Inc.
- *      Portions copyright 2013 ForgeRock AS.
+ *      Portions copyright 2013-2014 ForgeRock AS.
  */
 package org.opends.server.replication.protocol;
 
@@ -30,55 +30,70 @@
 import java.util.zip.DataFormatException;
 
 /**
- * This message is part of the replication protocol.
- * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring
- * informations.
- * When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a
- * MonitorMessage.
+ * This message is part of the replication protocol. RS1 sends a
+ * MonitorRequestMsg to RS2 to request its monitoring information. When RS2
+ * receives a MonitorRequestMsg from RS1, RS2 responds with a MonitorMessage.
  */
-public class MonitorRequestMsg extends RoutableMsg
+public class MonitorRequestMsg extends ReplicationMsg
 {
   /**
+   * The destination server or servers of this message.
+   */
+  private final int destination;
+
+  /**
+   * The serverID of the server that sends this message.
+   */
+  private final int senderID;
+
+
+
+  /**
    * Creates a message.
    *
-   * @param serverID The sender server of this message.
-   * @param destination The server or servers targeted by this message.
+   * @param serverID
+   *          The sender server of this message.
+   * @param destination
+   *          The server or servers targeted by this message.
    */
   public MonitorRequestMsg(int serverID, int destination)
   {
-    super(serverID, destination);
+    this.senderID = serverID;
+    this.destination = destination;
   }
 
   /**
    * Creates a new message by decoding the provided byte array.
-   * @param in A byte array containing the encoded information for the message,
-   * @throws DataFormatException If the in does not contain a properly,
-   *                             encoded message.
+   *
+   * @param in
+   *          A byte array containing the encoded information for the message,
+   * @throws DataFormatException
+   *           If the in does not contain a properly, encoded message.
    */
   public MonitorRequestMsg(byte[] in) throws DataFormatException
   {
-    super();
     try
     {
       // First byte is the type
       if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
-        throw new DataFormatException("input is not a valid " +
-            this.getClass().getCanonicalName());
+        throw new DataFormatException("input is not a valid "
+            + this.getClass().getCanonicalName());
       int pos = 1;
 
       // sender
       int length = getNextLength(in, pos);
       String senderString = new String(in, pos, length, "UTF-8");
       this.senderID = Integer.valueOf(senderString);
-      pos += length +1;
+      pos += length + 1;
 
       // destination
       length = getNextLength(in, pos);
       String destinationString = new String(in, pos, length, "UTF-8");
       this.destination = Integer.valueOf(destinationString);
-      pos += length +1;
+      pos += length + 1;
 
-    } catch (UnsupportedEncodingException e)
+    }
+    catch (UnsupportedEncodingException e)
     {
       throw new DataFormatException("UTF-8 is not supported by this jvm.");
     }
@@ -95,8 +110,7 @@
       byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
       byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
 
-      int length = 1 + senderBytes.length + 1
-                     + destinationBytes.length + 1;
+      int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1;
 
       byte[] resultByteArray = new byte[length];
 
@@ -117,4 +131,41 @@
       return null;
     }
   }
+
+
+
+  /**
+   * Get the destination.
+   *
+   * @return the destination
+   */
+  public int getDestination()
+  {
+    return destination;
+  }
+
+
+
+  /**
+   * Get the server ID of the server that sent this message.
+   *
+   * @return the server id
+   */
+  public int getSenderID()
+  {
+    return senderID;
+  }
+
+
+
+  /**
+   * Returns a string representation of the message.
+   *
+   * @return the string representation of this message.
+   */
+  public String toString()
+  {
+    return "[" + getClass().getCanonicalName() + " sender=" + senderID
+        + " destination=" + destination + "]";
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java b/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
index 345a00c..74608e1 100644
--- a/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -22,13 +22,19 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2014 ForgeRock AS.
  */
 package org.opends.server.replication.protocol;
 
 /**
- * This is an abstract class of messages of the replication protocol
- * for message that needs to contain information about the server that
- * send them and the destination servers to which they should be sent.
+ * This is an abstract class of messages of the replication protocol for message
+ * that needs to contain information about the server that send them and the
+ * destination servers to which they should be sent.
+ * <p>
+ * Routable messages are used when initializing a new replica from an existing
+ * replica: the total update messages are sent across the topology from the
+ * source replica to the target replica, possibly traversing one or two
+ * replication servers in the process (e.g. DS1 -&gt; RS1 -&gt; RS2 -&gt; DS2).
  */
 public abstract class RoutableMsg extends ReplicationMsg
 {
diff --git a/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java b/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
index 111d635..1a2b514 100644
--- a/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
+++ b/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -104,7 +104,6 @@
           {
             break;
           }
-          monitorMsg.setDestination(serverHandler.getServerId());
           try
           {
             serverHandler.send(monitorMsg);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index bf00104..69a16d8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -829,16 +829,6 @@
       }
     }
 
-    // Update threshold value for status analyzers
-    final int newThreshold = config.getDegradedStatusThreshold();
-    if (oldConfig.getDegradedStatusThreshold() != newThreshold)
-    {
-      for (ReplicationServerDomain domain : getReplicationServerDomains())
-      {
-        domain.updateDegradedStatusThreshold(newThreshold);
-      }
-    }
-
     // Update period value for monitoring publishers
     if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod())
     {
@@ -970,7 +960,6 @@
 
   /**
    * Creates the backend associated to this replication server.
-   * @throws ConfigException
    */
   private void createBackend() throws ConfigException
   {
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 9121ec8..67ffa6c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,13 +22,14 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS
+ *      Portions copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -53,6 +54,8 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.replication.common.StatusMachineEvent.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
 import static org.opends.server.util.StaticUtils.*;
 
@@ -76,12 +79,10 @@
   private final DN baseDN;
 
   /**
-   * The Status analyzer that periodically verifies whether the connected DSs
-   * are late. Using an AtomicReference to avoid leaking references to costly
-   * threads.
+   * Periodically verifies whether the connected DSs are late and publishes any
+   * pending status messages.
    */
-  private AtomicReference<StatusAnalyzer> statusAnalyzer =
-      new AtomicReference<StatusAnalyzer>();
+  private final StatusAnalyzer statusAnalyzer;
 
   /**
    * The monitoring publisher that periodically sends monitoring messages to the
@@ -168,6 +169,96 @@
   private int assuredTimeoutTimerPurgeCounter = 0;
 
   /**
+   * Stores pending status messages such as DS change time heartbeats for future
+   * forwarding to the rest of the topology. This class is required in order to
+   * decouple inbound IO processing from outbound IO processing and avoid
+   * potential inter-process deadlocks. In particular, the {@code ServerReader}
+   * thread must not send messages.
+   */
+  private static class PendingStatusMessages
+  {
+    private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats =
+        new HashMap<Integer, ChangeTimeHeartbeatMsg>(1);
+    private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs =
+        new HashMap<Integer, MonitorMsg>(1);
+    private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs =
+        new HashMap<Integer, MonitorMsg>(1);
+    private boolean sendRSTopologyMsg;
+    private boolean sendDSTopologyMsg;
+    private int excludedDSForTopologyMsg = -1;
+
+
+
+    /**
+     * Enqueues a TopologyMsg for all the connected directory servers in order
+     * to let them know the topology (every known DSs and RSs).
+     *
+     * @param excludedDS
+     *          If not null, the topology message will not be sent to this DS.
+     */
+    private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
+    {
+      int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
+      if (sendDSTopologyMsg)
+      {
+        if (excludedServerId != excludedDSForTopologyMsg)
+        {
+          excludedDSForTopologyMsg = -1;
+        }
+      }
+      else
+      {
+        sendDSTopologyMsg = true;
+        excludedDSForTopologyMsg = excludedServerId;
+      }
+    }
+
+
+
+    /**
+     * Enqueues a TopologyMsg for all the connected replication servers in order
+     * to let them know our connected LDAP servers.
+     */
+    private void enqueueTopoInfoToAllRSs()
+    {
+      sendRSTopologyMsg = true;
+    }
+
+
+
+    /**
+     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
+     * all other RS instances.
+     *
+     * @param msg
+     *          The heartbeat message.
+     */
+    private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
+    {
+      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
+    }
+
+
+
+    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
+    {
+      pendingDSMonitorMsgs.put(dsServerId, msg);
+    }
+
+
+
+    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
+    {
+      pendingRSMonitorMsgs.put(rsServerId, msg);
+    }
+  }
+
+  private final Object pendingStatusMessagesLock = new Object();
+
+  /** @GuardedBy("pendingStatusMessagesLock") */
+  private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
+
+  /**
    * Creates a new ReplicationServerDomain associated to the baseDN.
    *
    * @param baseDN
@@ -185,7 +276,8 @@
         + ") assured timer for domain \"" + baseDN + "\"", true);
     this.domainDB =
         localReplicationServer.getChangelogDB().getReplicationDomainDB();
-
+    this.statusAnalyzer = new StatusAnalyzer(this);
+    this.statusAnalyzer.start();
     DirectoryServer.registerMonitorProvider(this);
   }
 
@@ -712,7 +804,7 @@
    * @param ack The ack message received.
    * @param ackingServer The server handler of the server that sent the ack.
    */
-  public void processAck(AckMsg ack, ServerHandler ackingServer)
+  void processAck(AckMsg ack, ServerHandler ackingServer)
   {
     // Retrieve the expected acks info for the update matching the original
     // sent update.
@@ -1003,21 +1095,12 @@
         if (connectedRSs.containsKey(sHandler.getServerId()))
         {
           unregisterServerHandler(sHandler, shutdown, false);
-        } else if (connectedDSs.containsKey(sHandler.getServerId()))
+        }
+        else if (connectedDSs.containsKey(sHandler.getServerId()))
         {
-          // If this is the last DS for the domain,
-          // shutdown the status analyzer
-          if (connectedDSs.size() == 1)
-          {
-            if (debugEnabled())
-            {
-              debug("remote server " + sHandler
-                  + " is the last DS to be stopped: stopping status analyzer");
-            }
-            stopStatusAnalyzer();
-          }
           unregisterServerHandler(sHandler, shutdown, true);
-        } else if (otherHandlers.contains(sHandler))
+        }
+        else if (otherHandlers.contains(sHandler))
         {
           unregisterOtherHandler(sHandler);
         }
@@ -1052,15 +1135,19 @@
     resetGenerationIdIfPossible();
     if (!shutdown)
     {
-      if (isDirectoryServer)
+      synchronized (pendingStatusMessagesLock)
       {
-        // Update the remote replication servers with our list
-        // of connected LDAP servers
-        sendTopoInfoToAllRSs();
+        if (isDirectoryServer)
+        {
+          // Update the remote replication servers with our list
+          // of connected LDAP servers
+          pendingStatusMessages.enqueueTopoInfoToAllRSs();
+        }
+        // Warn our DSs that a RS or DS has quit (does not use this
+        // handler as already removed from list)
+        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
       }
-      // Warn our DSs that a RS or DS has quit (does not use this
-      // handler as already removed from list)
-      sendTopoInfoToAllDSsExcept(null);
+      statusAnalyzer.notifyPendingStatusMessage();
     }
   }
 
@@ -1405,94 +1492,64 @@
    *
    * @param msg
    *          The message received and to be processed.
-   * @param msgEmitter
-   *          The server handler of the server that emitted the message.
+   * @param sender
+   *          The server handler of the server that sent the message.
    */
-  public void process(RoutableMsg msg, ServerHandler msgEmitter)
+  void process(RoutableMsg msg, ServerHandler sender)
   {
-    // Test the message for which a ReplicationServer is expected
-    // to be the destination
-    if (!(msg instanceof InitializeRequestMsg) &&
-        !(msg instanceof InitializeTargetMsg) &&
-        !(msg instanceof InitializeRcvAckMsg) &&
-        !(msg instanceof EntryMsg) &&
-        !(msg instanceof DoneMsg) &&
-        (msg.getDestination() == this.localReplicationServer.getServerId()))
+    if (msg.getDestination() == localReplicationServer.getServerId())
     {
+      // Handle routable messages targeted at this RS.
       if (msg instanceof ErrorMsg)
       {
-        ErrorMsg errorMsg = (ErrorMsg) msg;
-        logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
-      } else if (msg instanceof MonitorRequestMsg)
-      {
-        replyWithTopologyMonitorMsg(msg, msgEmitter);
-      } else if (msg instanceof MonitorMsg)
-      {
-        MonitorMsg monitorMsg = (MonitorMsg) msg;
-        domainMonitor.receiveMonitorDataResponse(monitorMsg,
-            msgEmitter.getServerId());
-      } else
-      {
-        replyWithUnroutableMsgType(msgEmitter, msg);
+        logError(ERR_ERROR_MSG_RECEIVED.get(((ErrorMsg) msg).getDetails()));
       }
-      return;
-    }
-
-    List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
-    if (!servers.isEmpty())
-    {
-      forwardMsgToAllServers(msg, servers, msgEmitter);
+      else
+      {
+        replyWithUnroutableMsgType(sender, msg);
+      }
     }
     else
     {
-      replyWithUnreachablePeerMsg(msgEmitter, msg);
+      // Forward message not destined for this RS.
+      List<ServerHandler> servers = getDestinationServers(msg, sender);
+      if (!servers.isEmpty())
+      {
+        forwardMsgToAllServers(msg, servers, sender);
+      }
+      else
+      {
+        replyWithUnreachablePeerMsg(sender, msg);
+      }
     }
   }
 
-  private void replyWithTopologyMonitorMsg(RoutableMsg msg,
-      ServerHandler msgEmitter)
-  {
-    /*
-     * If the request comes from a Directory Server we need to build the full
-     * list of all servers in the topology and send back a MonitorMsg with the
-     * full list of all the servers in the topology.
-     */
-    if (msgEmitter.isDataServer())
-    {
-      // Monitoring information requested by a DS
-      try
-      {
-        MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
-            msg.getDestination(), msg.getSenderID(),
-            domainMonitor.getMonitorData());
-        msgEmitter.send(monitorMsg);
-      }
-      catch (IOException e)
-      {
-        // the connection was closed.
-      }
-    }
-    else
-    {
-      // Monitoring information requested by a RS
-      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
-          msg.getDestination(), msg.getSenderID());
 
-      if (monitorMsg != null)
-      {
-        try
-        {
-          msgEmitter.send(monitorMsg);
-        }
-        catch (IOException e)
-        {
-          // We log the error. The requestor will detect a timeout or
-          // any other failure on the connection.
-          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg
-              .getDestination())));
-        }
-      }
-    }
+
+  /**
+   * Responds to a monitor request message.
+   *
+   * @param msg
+   *          The monitor request message.
+   * @param sender
+   *          The DS/RS which sent the monitor request.
+   */
+  void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
+  {
+    enqueueMonitorMsg(msg, sender);
+  }
+
+  /**
+   * Responds to a monitor message.
+   *
+   * @param msg
+   *          The monitor message
+   * @param sender
+   *          The DS/RS which sent the monitor.
+   */
+  void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
+  {
+    domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
   }
 
   private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
@@ -1553,7 +1610,7 @@
   }
 
   private void forwardMsgToAllServers(RoutableMsg msg,
-      List<ServerHandler> servers, ServerHandler msgEmitter)
+      List<ServerHandler> servers, ServerHandler sender)
   {
     for (ServerHandler targetHandler : servers)
     {
@@ -1579,13 +1636,13 @@
         ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
         try
         {
-          msgEmitter.send(errMsg);
+          sender.send(errMsg);
         } catch (IOException ioe1)
         {
           // an error happened on the sender session trying to recover
           // from an error on the receiver session.
           // We don't have much solution left beside closing the sessions.
-          stopServer(msgEmitter, false);
+          stopServer(sender, false);
           stopServer(targetHandler, false);
         }
       // TODO Handle error properly (sender timeout in addition)
@@ -1651,42 +1708,26 @@
    * @return The newly created and filled MonitorMsg. Null if the current thread
    *         was interrupted while attempting to get the domain lock.
    */
-  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
+  private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
   {
-    try
+    final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+    monitorMsg.setReplServerDbState(getLatestServerState());
+
+    // Add the server state for each connected DS and RS.
+    for (DataServerHandler dsHandler : this.connectedDSs.values())
     {
-      // Lock domain as we need to go through connected servers list
-      lock();
-    }
-    catch (InterruptedException e)
-    {
-      return null;
+      monitorMsg.setServerState(dsHandler.getServerId(),
+          dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
+          true);
     }
 
-    try
+    for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
     {
-      final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
-      monitorMsg.setReplServerDbState(getLatestServerState());
-
-      // Add the server state for each connected DS and RS.
-      for (DataServerHandler dsHandler : this.connectedDSs.values())
-      {
-        monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
-            .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
-      }
-
-      for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
-      {
-        monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
-            .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
-      }
-
-      return monitorMsg;
+      monitorMsg.setServerState(rsHandler.getServerId(),
+          rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
+          false);
     }
-    finally
-    {
-      release();
-    }
+    return monitorMsg;
   }
 
   /**
@@ -1700,6 +1741,7 @@
     assuredTimeoutTimer.cancel();
 
     stopAllServers(true);
+    statusAnalyzer.shutdown();
   }
 
   /**
@@ -1723,83 +1765,7 @@
     return "ReplicationServerDomain " + baseDN;
   }
 
-  /**
-   * Send a TopologyMsg to all the connected directory servers in order to let
-   * them know the topology (every known DSs and RSs).
-   *
-   * @param notThisOne
-   *          If not null, the topology message will not be sent to this DS.
-   */
-  private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
-  {
-    for (DataServerHandler dsHandler : connectedDSs.values())
-    {
-      if (dsHandler != notThisOne)
-      // All except the supplied one
-      {
-        for (int i=1; i<=2; i++)
-        {
-          if (!dsHandler.shuttingDown()
-              && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
-          {
-            TopologyMsg topoMsg =
-                createTopologyMsgForDS(dsHandler.getServerId());
-            try
-            {
-              dsHandler.sendTopoInfo(topoMsg);
-              break;
-            }
-            catch (IOException e)
-            {
-              if (i == 2)
-              {
-                Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
-                    baseDN.toNormalizedString(), "directory",
-                    Integer.toString(dsHandler.getServerId()), e.getMessage());
-                logError(message);
-              }
-            }
-          }
-          sleep(100);
-        }
-      }
-    }
-  }
 
-  /**
-   * Send a TopologyMsg to all the connected replication servers
-   * in order to let them know our connected LDAP servers.
-   */
-  private void sendTopoInfoToAllRSs()
-  {
-    TopologyMsg topoMsg = createTopologyMsgForRS();
-    for (ReplicationServerHandler rsHandler : connectedRSs.values())
-    {
-      for (int i=1; i<=2; i++)
-      {
-        if (!rsHandler.shuttingDown()
-            && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
-        {
-          try
-          {
-            rsHandler.sendTopoInfo(topoMsg);
-            break;
-          }
-          catch (IOException e)
-          {
-            if (i == 2)
-            {
-              Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
-                  baseDN.toNormalizedString(), "replication",
-                  Integer.toString(rsHandler.getServerId()), e.getMessage());
-              logError(message);
-            }
-          }
-        }
-        sleep(100);
-      }
-    }
-  }
 
   /**
    * Creates a TopologyMsg filled with information to be sent to a remote RS.
@@ -2062,7 +2028,7 @@
         return;
       }
 
-      sendTopoInfoToAllExcept(senderHandler);
+      enqueueTopoInfoToAllExcept(senderHandler);
 
       Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
           senderHandler.getServerId(), baseDN.toNormalizedString(),
@@ -2087,7 +2053,7 @@
    * @param event The event to be used for new status computation
    * @return True if we have been interrupted (must stop), false otherwise
    */
-  public boolean changeStatus(DataServerHandler dsHandler,
+  private boolean changeStatus(DataServerHandler dsHandler,
       StatusMachineEvent event)
   {
     try
@@ -2142,7 +2108,7 @@
         return false;
       }
 
-      sendTopoInfoToAllExcept(dsHandler);
+      enqueueTopoInfoToAllExcept(dsHandler);
     }
     catch (Exception e)
     {
@@ -2162,7 +2128,7 @@
    */
   public void sendTopoInfoToAll()
   {
-    sendTopoInfoToAllExcept(null);
+    enqueueTopoInfoToAllExcept(null);
   }
 
   /**
@@ -2171,10 +2137,14 @@
    * @param dsHandler
    *          if not null, the topology message will not be sent to this DS
    */
-  private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
+  private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
   {
-    sendTopoInfoToAllDSsExcept(dsHandler);
-    sendTopoInfoToAllRSs();
+    synchronized (pendingStatusMessagesLock)
+    {
+      pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
+      pendingStatusMessages.enqueueTopoInfoToAllRSs();
+    }
+    statusAnalyzer.notifyPendingStatusMessage();
   }
 
   /**
@@ -2293,7 +2263,11 @@
        * Sends the currently known topology information to every connected
        * DS we have.
        */
-      sendTopoInfoToAllDSsExcept(null);
+      synchronized (pendingStatusMessagesLock)
+      {
+        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
+      }
+      statusAnalyzer.notifyPendingStatusMessage();
     }
     catch(Exception e)
     {
@@ -2414,37 +2388,6 @@
   }
 
   /**
-   * Starts the status analyzer for the domain if not already started.
-   */
-  private void startStatusAnalyzer()
-  {
-    int degradedStatusThreshold =
-        localReplicationServer.getDegradedStatusThreshold();
-    if (degradedStatusThreshold > 0) // 0 means no status analyzer
-    {
-      final StatusAnalyzer thread =
-          new StatusAnalyzer(this, degradedStatusThreshold);
-      if (statusAnalyzer.compareAndSet(null, thread))
-      {
-        thread.start();
-      }
-    }
-  }
-
-  /**
-   * Stops the status analyzer for the domain.
-   */
-  private void stopStatusAnalyzer()
-  {
-    final StatusAnalyzer thread = statusAnalyzer.get();
-    if (thread != null && statusAnalyzer.compareAndSet(thread, null))
-    {
-      thread.shutdown();
-      thread.waitForShutdown();
-    }
-  }
-
-  /**
    * Starts the monitoring publisher for the domain if not already started.
    */
   private void startMonitoringPublisher()
@@ -2611,63 +2554,62 @@
   }
 
 
+
+  private void sendTopologyMsg(String type, ServerHandler handler,
+      TopologyMsg msg)
+  {
+    for (int i = 1; i <= 2; i++)
+    {
+      if (!handler.shuttingDown()
+          && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
+      {
+        try
+        {
+          handler.sendTopoInfo(msg);
+          break;
+        }
+        catch (IOException e)
+        {
+          if (i == 2)
+          {
+            logError(ERR_EXCEPTION_SENDING_TOPO_INFO.get(
+                baseDN.toNormalizedString(), type,
+                String.valueOf(handler.getServerId()), e.getMessage()));
+          }
+        }
+      }
+      sleep(100);
+    }
+  }
+
+
+
   /**
    * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
    * value received, and forwarding the message to the other RSes.
    * @param senderHandler The handler for the server that sent the heartbeat.
    * @param msg The message to process.
    */
-  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
+  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
       ChangeTimeHeartbeatMsg msg)
   {
-    try
+    domainDB.replicaHeartbeat(baseDN, msg.getCSN());
+    if (senderHandler.isDataServer())
     {
-      // Acquire lock on domain (see more details in comment of start() method
-      // of ServerHandler)
-      lock();
-    }
-    catch (InterruptedException ex)
-    {
-      // We can't deal with this here, so re-interrupt thread so that it is
-      // caught during subsequent IO.
-      Thread.currentThread().interrupt();
-      return;
-    }
-
-    try
-    {
-      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
-      if (senderHandler.isDataServer())
+      /*
+       * If we are the first replication server warned, then forward the message
+       * to the remote replication servers.
+       */
+      synchronized (pendingStatusMessagesLock)
       {
-        // If we are the first replication server warned,
-        // then forwards the message to the remote replication servers
-        for (ReplicationServerHandler rsHandler : connectedRSs.values())
-        {
-          try
-          {
-            if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
-            {
-              rsHandler.send(msg);
-            }
-          }
-          catch (IOException e)
-          {
-            TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            logError(ERR_CHANGELOG_ERROR_SENDING_MSG
-                .get("Replication Server "
-                    + localReplicationServer.getReplicationPort() + " "
-                    + baseDN + " " + localReplicationServer.getServerId()));
-            stopServer(rsHandler, false);
-          }
-        }
+        pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
       }
-    }
-    finally
-    {
-      release();
+      statusAnalyzer.notifyPendingStatusMessage();
     }
   }
 
+
+
   /**
    * Get the latest (more recent) trim date of the changelog dbs associated
    * to this domain.
@@ -2703,33 +2645,6 @@
   }
 
   /**
-   * Update the status analyzer with the new threshold value.
-   *
-   * @param degradedStatusThreshold
-   *          The new threshold value.
-   */
-  void updateDegradedStatusThreshold(int degradedStatusThreshold)
-  {
-    if (degradedStatusThreshold == 0)
-    {
-      // Requested to stop analyzers
-      stopStatusAnalyzer();
-      return;
-    }
-
-    final StatusAnalyzer saThread = statusAnalyzer.get();
-    if (saThread != null) // it is running
-    {
-      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
-    }
-    else if (connectedDSs.size() > 0)
-    {
-      // Requested to start analyzers with provided threshold value
-      startStatusAnalyzer();
-    }
-  }
-
-  /**
    * Update the monitoring publisher with the new period value.
    *
    * @param period
@@ -2765,7 +2680,6 @@
    */
   public void register(DataServerHandler dsHandler)
   {
-    startStatusAnalyzer();
     startMonitoringPublisher();
 
     // connected with new DS: store handler.
@@ -2773,7 +2687,7 @@
 
     // Tell peer RSs and DSs a new DS just connected to us
     // No need to re-send TopologyMsg to this just new DS
-    sendTopoInfoToAllExcept(dsHandler);
+    enqueueTopoInfoToAllExcept(dsHandler);
   }
 
   /**
@@ -2798,4 +2712,211 @@
         + " and port=" + localReplicationServer.getReplicationPort()
         + ": " + message);
   }
+
+
+
+  /**
+   * Go through each connected DS, get the number of pending changes we have for
+   * it and change status accordingly if threshold value is crossed/uncrossed.
+   */
+  void checkDSDegradedStatus()
+  {
+    final int degradedStatusThreshold = localReplicationServer
+        .getDegradedStatusThreshold();
+    // Threshold value = 0 means no status analyzer (no degrading system)
+    // we should not have that as the status analyzer thread should not be
+    // created if this is the case, but for sanity purpose, we add this
+    // test
+    if (degradedStatusThreshold > 0)
+    {
+      for (DataServerHandler serverHandler : connectedDSs.values())
+      {
+        // Get number of pending changes for this server
+        final int nChanges = serverHandler.getRcvMsgQueueSize();
+        if (debugEnabled())
+        {
+          TRACER.debugInfo("In RS " + getLocalRSServerId() + ", for baseDN="
+              + getBaseDN() + ": " + "Status analyzer: DS "
+              + serverHandler.getServerId() + " has " + nChanges
+              + " message(s) in writer queue.");
+        }
+
+        // Check status to know if it is relevant to change the status. Do not
+        // take RSD lock to test. If we attempt to change the status whereas
+        // the current status does allow it, this will be noticed by
+        // the changeStatusFromStatusAnalyzer() method. This allows to take the
+        // lock roughly only when needed versus every sleep time timeout.
+        if (nChanges >= degradedStatusThreshold)
+        {
+          if (serverHandler.getStatus() == NORMAL_STATUS
+              && changeStatus(serverHandler, TO_DEGRADED_STATUS_EVENT))
+          {
+            break; // Interrupted.
+          }
+        }
+        else
+        {
+          if (serverHandler.getStatus() == DEGRADED_STATUS
+              && changeStatus(serverHandler, TO_NORMAL_STATUS_EVENT))
+          {
+            break; // Interrupted.
+          }
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * Sends any enqueued status messages to the rest of the topology.
+   */
+  void sendPendingStatusMessages()
+  {
+    /*
+     * Take a snapshot of pending status notifications in order to avoid holding
+     * the broadcast lock for too long. In addition, clear the notifications so
+     * that they are not resent the next time.
+     */
+    final PendingStatusMessages savedState;
+    synchronized (pendingStatusMessagesLock)
+    {
+      savedState = pendingStatusMessages;
+      pendingStatusMessages = new PendingStatusMessages();
+    }
+    sendPendingChangeTimeHeartbeatMsgs(savedState);
+    sendPendingTopologyMsgs(savedState);
+    sendPendingMonitorMsgs(savedState);
+  }
+
+
+
+  private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
+  {
+    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
+        .entrySet())
+    {
+      ServerHandler ds = connectedDSs.get(msg.getKey());
+      if (ds != null)
+      {
+        try
+        {
+          ds.send(msg.getValue());
+        }
+        catch (IOException e)
+        {
+          // Ignore: connection closed.
+        }
+      }
+    }
+    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
+        .entrySet())
+    {
+      ServerHandler rs = connectedRSs.get(msg.getKey());
+      if (rs != null)
+      {
+        try
+        {
+          rs.send(msg.getValue());
+        }
+        catch (IOException e)
+        {
+          // We log the error. The requestor will detect a timeout or
+          // any other failure on the connection.
+
+          // FIXME: why do we log for RSs but not DSs?
+          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(String.valueOf(msg
+              .getValue().getDestination())));
+        }
+      }
+    }
+  }
+
+
+
+  private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
+  {
+    for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats
+        .values())
+    {
+      for (ReplicationServerHandler rsHandler : connectedRSs.values())
+      {
+        try
+        {
+          if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
+          {
+            rsHandler.send(pendingHeartbeat);
+          }
+        }
+        catch (IOException e)
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get("Replication Server "
+              + localReplicationServer.getReplicationPort() + " " + baseDN
+              + " " + localReplicationServer.getServerId()));
+          stopServer(rsHandler, false);
+        }
+      }
+    }
+  }
+
+
+
+  private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
+  {
+    if (pendingMsgs.sendDSTopologyMsg)
+    {
+      for (ServerHandler handler : connectedDSs.values())
+      {
+        if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
+        {
+          final TopologyMsg topoMsg = createTopologyMsgForDS(handler
+              .getServerId());
+          sendTopologyMsg("directory", handler, topoMsg);
+        }
+      }
+    }
+
+    if (pendingMsgs.sendRSTopologyMsg)
+    {
+      final TopologyMsg topoMsg = createTopologyMsgForRS();
+      for (ServerHandler handler : connectedRSs.values())
+      {
+        sendTopologyMsg("replication", handler, topoMsg);
+      }
+    }
+  }
+
+
+
+  private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
+  {
+    /*
+     * If the request comes from a Directory Server we need to build the full
+     * list of all servers in the topology and send back a MonitorMsg with the
+     * full list of all the servers in the topology.
+     */
+    if (sender.isDataServer())
+    {
+      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+          msg.getDestination(), msg.getSenderID(),
+          domainMonitor.getMonitorData());
+      synchronized (pendingStatusMessagesLock)
+      {
+        pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
+            monitorMsg);
+      }
+    }
+    else
+    {
+      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
+          msg.getDestination(), msg.getSenderID());
+      synchronized (pendingStatusMessagesLock)
+      {
+        pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
+            monitorMsg);
+      }
+    }
+    statusAnalyzer.notifyPendingStatusMessage();
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index c7efe83..9c165a8 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,6 +27,7 @@
 package org.opends.server.replication.server;
 
 import java.io.IOException;
+
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Semaphore;
@@ -484,7 +485,7 @@
    */
   public ReplicationServerDomain getDomain()
   {
-    return this.replicationServerDomain;
+    return replicationServerDomain;
   }
 
   /**
@@ -847,21 +848,45 @@
    *
    * @param msg The message to be processed.
    */
-  public void process(RoutableMsg msg)
+  void process(RoutableMsg msg)
   {
     if (debugEnabled())
+    {
       TRACER.debugInfo("In "
           + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
           + this + " processes routable msg received:" + msg);
+    }
     replicationServerDomain.process(msg, this);
   }
 
   /**
+   * Responds to a monitor request message.
+   *
+   * @param msg
+   *          The monitor request message.
+   */
+  void processMonitorRequestMsg(MonitorRequestMsg msg)
+  {
+    replicationServerDomain.processMonitorRequestMsg(msg, this);
+  }
+
+  /**
+   * Responds to a monitor message.
+   *
+   * @param msg
+   *          The monitor message.
+   */
+  void processMonitorMsg(MonitorMsg msg)
+  {
+    replicationServerDomain.processMonitorMsg(msg, this);
+  }
+
+  /**
    * Processes a change time heartbeat msg.
    *
    * @param msg The message to be processed.
    */
-  public void process(ChangeTimeHeartbeatMsg msg)
+  void process(ChangeTimeHeartbeatMsg msg)
   {
     if (debugEnabled())
       TRACER.debugInfo("In "
@@ -925,15 +950,6 @@
   }
 
   /**
-   * Sets the replication server domain associated.
-   * @param rsd The provided replication server domain.
-   */
-  protected void setReplicationServerDomain(ReplicationServerDomain rsd)
-  {
-    this.replicationServerDomain = rsd;
-  }
-
-  /**
    * Sets the window size when used when sending to the remote.
    * @param size The provided window size.
    */
@@ -1179,7 +1195,7 @@
    * Process a Ack message received.
    * @param ack the message received.
    */
-  public void processAck(AckMsg ack)
+  void processAck(AckMsg ack)
   {
     if (replicationServerDomain!=null)
       replicationServerDomain.processAck(ack, this);
@@ -1200,7 +1216,7 @@
    * Process a ResetGenerationIdMsg message received.
    * @param msg the message received.
    */
-  public void processResetGenId(ResetGenerationIdMsg msg)
+  void processResetGenId(ResetGenerationIdMsg msg)
   {
     if (replicationServerDomain!=null)
       replicationServerDomain.resetGenerationId(this, msg);
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index ed74a43..6e85261 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -180,8 +180,23 @@
           } else if (msg instanceof WindowMsg)
           {
             handler.updateWindow((WindowMsg) msg);
-          } else if (msg instanceof RoutableMsg)
+          }
+          else if (msg instanceof MonitorRequestMsg)
           {
+            handler.processMonitorRequestMsg((MonitorRequestMsg) msg);
+          }
+          else if (msg instanceof MonitorMsg)
+          {
+            handler.processMonitorMsg((MonitorMsg) msg);
+          }
+          else if (msg instanceof RoutableMsg)
+          {
+            /*
+             * Note that we handle monitor messages separately since they in
+             * fact never need "routing" and are instead sent directly between
+             * connected peers. Doing so allows us to more clearly decouple
+             * write IO from the reader thread (see OPENDJ-1354).
+             */
             handler.process((RoutableMsg) msg);
           } else if (msg instanceof ResetGenerationIdMsg)
           {
diff --git a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 34185c6..721ec23 100644
--- a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -22,18 +22,19 @@
  *
  *
  *      Copyright 2008-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
+
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.types.DebugLogLevel;
 
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.common.ServerStatus.*;
-import static org.opends.server.replication.common.StatusMachineEvent.*;
+
 
 /**
  * This thread is in charge of periodically determining if the connected
@@ -44,46 +45,45 @@
  * the threshold is uncrossed, the status analyzer must make the DS status
  * change back to NORMAL_STATUS. To have meaning of status, please refer to
  * ServerStatus class.
+ * <p>
+ * In addition, this thread is responsible for publishing any pending status
+ * messages.
  */
-public class StatusAnalyzer extends DirectoryThread
+class StatusAnalyzer extends DirectoryThread
 {
-
-  private volatile boolean shutdown = false;
-
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private final ReplicationServerDomain replicationServerDomain;
-  private volatile int degradedStatusThreshold = -1;
-
   /** Sleep time for the thread, in ms. */
   private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
 
-  private volatile boolean done = false;
+  private final ReplicationServerDomain replicationServerDomain;
+  private final Object eventMonitor = new Object();
+  private boolean pendingStatusMessage = false;
+  private long nextCheckDSDegradedStatusTime;
 
-  private final Object shutdownLock = new Object();
+
 
   /**
    * Create a StatusAnalyzer.
-   * @param replicationServerDomain The ReplicationServerDomain the status
-   *        analyzer is for.
-   * @param degradedStatusThreshold The pending changes threshold value to be
-   * used for putting a DS in DEGRADED_STATUS.
+   *
+   * @param replicationServerDomain
+   *          The ReplicationServerDomain the status analyzer is for.
    */
-  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
-    int degradedStatusThreshold)
+  StatusAnalyzer(ReplicationServerDomain replicationServerDomain)
   {
     super("Replication server RS("
         + replicationServerDomain.getLocalRSServerId()
-        + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN()
+        + ") status monitor for domain \""
+        + replicationServerDomain.getBaseDN()
         + "\"");
-
     this.replicationServerDomain = replicationServerDomain;
-    this.degradedStatusThreshold = degradedStatusThreshold;
   }
 
+
+
   /**
    * Analyzes if servers are late or not, and change their status accordingly.
    */
@@ -96,79 +96,55 @@
           getMessage("Directory server status analyzer starting."));
     }
 
-    while (!shutdown)
+    try
     {
-      synchronized (shutdownLock)
+      while (true)
       {
-        if (!shutdown)
+        final boolean requestStatusBroadcastWasRequested;
+        synchronized (eventMonitor)
         {
-          try
+          if (!isShutdownInitiated() && !pendingStatusMessage)
           {
-            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
+            eventMonitor.wait(STATUS_ANALYZER_SLEEP_TIME);
           }
-          catch (InterruptedException e)
-          {
-            // Server shutdown monitor may interrupt slow threads.
-            if (debugEnabled())
-            {
-              TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            }
-            shutdown = true;
-            break;
-          }
-        }
-      }
-
-      // Go through each connected DS, get the number of pending changes we have
-      // for it and change status accordingly if threshold value is
-      // crossed/uncrossed
-      for (DataServerHandler serverHandler :
-        replicationServerDomain.getConnectedDSs().values())
-      {
-        // Get number of pending changes for this server
-        int nChanges = serverHandler.getRcvMsgQueueSize();
-        if (debugEnabled())
-        {
-          TRACER.debugInfo(getMessage("Status analyzer: DS "
-              + serverHandler.getServerId() + " has " + nChanges
-              + " message(s) in writer queue."));
+          requestStatusBroadcastWasRequested = pendingStatusMessage;
+          pendingStatusMessage = false;
         }
 
-        // Check status to know if it is relevant to change the status. Do not
-        // take RSD lock to test. If we attempt to change the status whereas
-        // the current status does allow it, this will be noticed by
-        // the changeStatusFromStatusAnalyzer() method. This allows to take the
-        // lock roughly only when needed versus every sleep time timeout.
-        if (degradedStatusThreshold > 0)
-          // Threshold value = 0 means no status analyzer (no degrading system)
-          // we should not have that as the status analyzer thread should not be
-          // created if this is the case, but for sanity purpose, we add this
-          // test
+        if (isShutdownInitiated())
         {
-          if (nChanges >= degradedStatusThreshold)
-          {
-            if (serverHandler.getStatus() == NORMAL_STATUS
-                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
-            {
-              break;
-            }
-          }
-          else
-          {
-            if (serverHandler.getStatus() == DEGRADED_STATUS
-                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
-            {
-              break;
-            }
-          }
+          break;
+        }
+
+        // Broadcast heartbeats, topology messages, etc if requested.
+        if (requestStatusBroadcastWasRequested)
+        {
+          replicationServerDomain.sendPendingStatusMessages();
+        }
+
+        /*
+         * Check the degraded status for connected DS instances only if
+         * sufficient time has passed. The current time is not cached because
+         * the call to checkDSDegradedStatus may take some time.
+         */
+        if (nextCheckDSDegradedStatusTime < System.currentTimeMillis())
+        {
+          replicationServerDomain.checkDSDegradedStatus();
+          nextCheckDSDegradedStatusTime = System.currentTimeMillis()
+              + STATUS_ANALYZER_SLEEP_TIME;
         }
       }
     }
+    catch (InterruptedException e)
+    {
+      // Forcefully stopped.
+    }
 
-    done = true;
     TRACER.debugInfo(getMessage("Status analyzer is terminated."));
   }
 
+
+
   private String getMessage(String message)
   {
     return "In RS " + replicationServerDomain.getLocalRSServerId()
@@ -176,75 +152,50 @@
         + message;
   }
 
-  private boolean isInterrupted(DataServerHandler serverHandler,
-      StatusMachineEvent event)
-  {
-    if (replicationServerDomain.changeStatus(serverHandler, event))
-    {
-      // Finish job and let thread die
-      TRACER.debugInfo(
-          getMessage("Status analyzer has been interrupted and will die."));
-      return true;
-    }
-    return false;
-  }
+
 
   /**
    * Stops the thread.
    */
-  public void shutdown()
+  void shutdown()
   {
-    synchronized (shutdownLock)
-    {
-      shutdown = true;
-      shutdownLock.notifyAll();
-
-      if (debugEnabled())
-      {
-        TRACER.debugInfo(getMessage("Shutting down status analyzer."));
-      }
-    }
-  }
-
-  /**
-   * Waits for analyzer death. If not finished within 2 seconds,
-   * forces interruption
-   */
-  public void waitForShutdown()
-  {
-    try
-    {
-      int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
-      int n = 0;
-      while (!done && this.isAlive())
-      {
-        Thread.sleep(50);
-        n++;
-        if (n >= FACTOR)
-        {
-          TRACER.debugInfo(getMessage("Interrupting status analyzer."));
-          interrupt();
-        }
-      }
-    } catch (InterruptedException e)
-    {
-      // exit the loop if this thread is interrupted.
-    }
-  }
-
-  /**
-   * Sets the threshold value.
-   * @param degradedStatusThreshold The new threshold value.
-   */
-  public void setDegradedStatusThreshold(int degradedStatusThreshold)
-  {
+    initiateShutdown();
     if (debugEnabled())
     {
-      TRACER.debugInfo(getMessage(
-          "Directory server status analyzer changing threshold value to "
-              + degradedStatusThreshold));
+      TRACER.debugInfo(getMessage("Shutting down status analyzer."));
     }
+    synchronized (eventMonitor)
+    {
+      eventMonitor.notifyAll();
+    }
+    try
+    {
+      join(2000);
+    }
+    catch (InterruptedException e)
+    {
+      // Trapped: forcefully stop the thread.
+    }
+    if (isAlive())
+    {
+      // The join timed out or was interrupted so attempt to forcefully stop the
+      // analyzer.
+      interrupt();
+    }
+  }
 
-    this.degradedStatusThreshold = degradedStatusThreshold;
+
+
+  /**
+   * Requests that a topology state related message be broadcast to the rest of
+   * the topology. Messages include DS heartbeats, topology information, etc.
+   */
+  void notifyPendingStatusMessage()
+  {
+    synchronized (eventMonitor)
+    {
+      pendingStatusMessage = true;
+      eventMonitor.notifyAll();
+    }
   }
 }

--
Gitblit v1.10.0