From 21af6610b07617ecbf1b826310a2f244deb4d348 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 25 Mar 2014 15:02:51 +0000
Subject: [PATCH] Fix OPENDJ-1354 - replication threads BLOCKED in pendingChanges queue

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  695 ++++++++++++++++++++++++++++++++-------------------------
 1 files changed, 389 insertions(+), 306 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 02d6849..456f68c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -29,6 +29,7 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -75,12 +76,10 @@
   private final DN baseDN;
 
   /**
-   * The Status analyzer that periodically verifies whether the connected DSs
-   * are late. Using an AtomicReference to avoid leaking references to costly
-   * threads.
+   * Periodically verifies whether the connected DSs are late and publishes any
+   * pending status messages.
    */
-  private AtomicReference<StatusAnalyzer> statusAnalyzer =
-      new AtomicReference<StatusAnalyzer>();
+  private final StatusAnalyzer statusAnalyzer;
 
   /**
    * The monitoring publisher that periodically sends monitoring messages to the
@@ -166,6 +165,98 @@
    */
   private int assuredTimeoutTimerPurgeCounter = 0;
 
+
+
+  /**
+   * Stores pending status messages such as DS change time heartbeats for future
+   * forwarding to the rest of the topology. This class is required in order to
+   * decouple inbound IO processing from outbound IO processing and avoid
+   * potential inter-process deadlocks. In particular, the {@code ServerReader}
+   * thread must not send messages.
+   */
+  private static class PendingStatusMessages
+  {
+    private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats =
+        new HashMap<Integer, ChangeTimeHeartbeatMsg>(1);
+    private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs =
+        new HashMap<Integer, MonitorMsg>(1);
+    private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs =
+        new HashMap<Integer, MonitorMsg>(1);
+    private boolean sendRSTopologyMsg;
+    private boolean sendDSTopologyMsg;
+    private int excludedDSForTopologyMsg = -1;
+
+
+
+    /**
+     * Enqueues a TopologyMsg for all the connected directory servers in order
+     * to let them know the topology (every known DSs and RSs).
+     *
+     * @param excludedDS
+     *          If not null, the topology message will not be sent to this DS.
+     */
+    private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
+    {
+      int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
+      if (sendDSTopologyMsg)
+      {
+        if (excludedServerId != excludedDSForTopologyMsg)
+        {
+          excludedDSForTopologyMsg = -1;
+        }
+      }
+      else
+      {
+        sendDSTopologyMsg = true;
+        excludedDSForTopologyMsg = excludedServerId;
+      }
+    }
+
+
+
+    /**
+     * Enqueues a TopologyMsg for all the connected replication servers in order
+     * to let them know our connected LDAP servers.
+     */
+    private void enqueueTopoInfoToAllRSs()
+    {
+      sendRSTopologyMsg = true;
+    }
+
+
+
+    /**
+     * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
+     * all other RS instances.
+     *
+     * @param msg
+     *          The heartbeat message.
+     */
+    private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
+    {
+      pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
+    }
+
+
+
+    private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
+    {
+      pendingDSMonitorMsgs.put(dsServerId, msg);
+    }
+
+
+
+    private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
+    {
+      pendingRSMonitorMsgs.put(rsServerId, msg);
+    }
+  }
+
+  private final Object pendingStatusMessagesLock = new Object();
+
+  /** @GuardedBy("pendingStatusMessagesLock") */
+  private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
+
   /**
    * Creates a new ReplicationServerDomain associated to the baseDN.
    *
@@ -184,7 +275,8 @@
         + ") assured timer for domain \"" + baseDN + "\"", true);
     this.domainDB =
         localReplicationServer.getChangelogDB().getReplicationDomainDB();
-
+    this.statusAnalyzer = new StatusAnalyzer(this);
+    this.statusAnalyzer.start();
     DirectoryServer.registerMonitorProvider(this);
   }
 
@@ -704,7 +796,7 @@
    * @param ack The ack message received.
    * @param ackingServer The server handler of the server that sent the ack.
    */
-  public void processAck(AckMsg ack, ServerHandler ackingServer)
+  void processAck(AckMsg ack, ServerHandler ackingServer)
   {
     // Retrieve the expected acks info for the update matching the original
     // sent update.
@@ -990,21 +1082,12 @@
         if (connectedRSs.containsKey(sHandler.getServerId()))
         {
           unregisterServerHandler(sHandler, shutdown, false);
-        } else if (connectedDSs.containsKey(sHandler.getServerId()))
+        }
+        else if (connectedDSs.containsKey(sHandler.getServerId()))
         {
-          // If this is the last DS for the domain,
-          // shutdown the status analyzer
-          if (connectedDSs.size() == 1)
-          {
-            if (logger.isTraceEnabled())
-            {
-              debug("remote server " + sHandler
-                  + " is the last DS to be stopped: stopping status analyzer");
-            }
-            stopStatusAnalyzer();
-          }
           unregisterServerHandler(sHandler, shutdown, true);
-        } else if (otherHandlers.contains(sHandler))
+        }
+        else if (otherHandlers.contains(sHandler))
         {
           unregisterOtherHandler(sHandler);
         }
@@ -1038,15 +1121,19 @@
     resetGenerationIdIfPossible();
     if (!shutdown)
     {
-      if (isDirectoryServer)
+      synchronized (pendingStatusMessagesLock)
       {
-        // Update the remote replication servers with our list
-        // of connected LDAP servers
-        sendTopoInfoToAllRSs();
+        if (isDirectoryServer)
+        {
+          // Update the remote replication servers with our list
+          // of connected LDAP servers
+          pendingStatusMessages.enqueueTopoInfoToAllRSs();
+        }
+        // Warn our DSs that a RS or DS has quit (does not use this
+        // handler as already removed from list)
+        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
       }
-      // Warn our DSs that a RS or DS has quit (does not use this
-      // handler as already removed from list)
-      sendTopoInfoToAllDSsExcept(null);
+      statusAnalyzer.notifyPendingStatusMessage();
     }
   }
 
@@ -1384,99 +1471,71 @@
     return servers;
   }
 
+
+
   /**
    * Processes a message coming from one server in the topology and potentially
    * forwards it to one or all other servers.
    *
    * @param msg
    *          The message received and to be processed.
-   * @param msgEmitter
-   *          The server handler of the server that emitted the message.
+   * @param sender
+   *          The server handler of the server that sent the message.
    */
-  public void process(RoutableMsg msg, ServerHandler msgEmitter)
+  void process(RoutableMsg msg, ServerHandler sender)
   {
-    // Test the message for which a ReplicationServer is expected
-    // to be the destination
-    if (!(msg instanceof InitializeRequestMsg) &&
-        !(msg instanceof InitializeTargetMsg) &&
-        !(msg instanceof InitializeRcvAckMsg) &&
-        !(msg instanceof EntryMsg) &&
-        !(msg instanceof DoneMsg) &&
-        (msg.getDestination() == this.localReplicationServer.getServerId()))
+    if (msg.getDestination() == localReplicationServer.getServerId())
     {
+      // Handle routable messages targeted at this RS.
       if (msg instanceof ErrorMsg)
       {
         ErrorMsg errorMsg = (ErrorMsg) msg;
         logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
-      } else if (msg instanceof MonitorRequestMsg)
-      {
-        replyWithTopologyMonitorMsg(msg, msgEmitter);
-      } else if (msg instanceof MonitorMsg)
-      {
-        MonitorMsg monitorMsg = (MonitorMsg) msg;
-        domainMonitor.receiveMonitorDataResponse(monitorMsg,
-            msgEmitter.getServerId());
-      } else
-      {
-        replyWithUnroutableMsgType(msgEmitter, msg);
       }
-      return;
-    }
-
-    List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
-    if (!servers.isEmpty())
-    {
-      forwardMsgToAllServers(msg, servers, msgEmitter);
+      else
+      {
+        replyWithUnroutableMsgType(sender, msg);
+      }
     }
     else
     {
-      replyWithUnreachablePeerMsg(msgEmitter, msg);
+      // Forward message not destined for this RS.
+      List<ServerHandler> servers = getDestinationServers(msg, sender);
+      if (!servers.isEmpty())
+      {
+        forwardMsgToAllServers(msg, servers, sender);
+      }
+      else
+      {
+        replyWithUnreachablePeerMsg(sender, msg);
+      }
     }
   }
 
-  private void replyWithTopologyMonitorMsg(RoutableMsg msg,
-      ServerHandler msgEmitter)
+  /**
+   * Responds to a monitor request message.
+   *
+   * @param msg
+   *          The monitor request message.
+   * @param sender
+   *          The DS/RS which sent the monitor request.
+   */
+  void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
   {
-    /*
-     * If the request comes from a Directory Server we need to build the full
-     * list of all servers in the topology and send back a MonitorMsg with the
-     * full list of all the servers in the topology.
-     */
-    if (msgEmitter.isDataServer())
-    {
-      // Monitoring information requested by a DS
-      try
-      {
-        MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
-            msg.getDestination(), msg.getSenderID(),
-            domainMonitor.getMonitorData());
-        msgEmitter.send(monitorMsg);
-      }
-      catch (IOException e)
-      {
-        // the connection was closed.
-      }
-    }
-    else
-    {
-      // Monitoring information requested by a RS
-      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
-          msg.getDestination(), msg.getSenderID());
+    enqueueMonitorMsg(msg, sender);
+  }
 
-      if (monitorMsg != null)
-      {
-        try
-        {
-          msgEmitter.send(monitorMsg);
-        }
-        catch (IOException e)
-        {
-          // We log the error. The requestor will detect a timeout or
-          // any other failure on the connection.
-          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getDestination());
-        }
-      }
-    }
+  /**
+   * Responds to a monitor message.
+   *
+   * @param msg
+   *          The monitor message
+   * @param sender
+   *          The DS/RS which sent the monitor.
+   */
+  void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
+  {
+    domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
   }
 
   private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
@@ -1532,7 +1591,7 @@
   }
 
   private void forwardMsgToAllServers(RoutableMsg msg,
-      List<ServerHandler> servers, ServerHandler msgEmitter)
+      List<ServerHandler> servers, ServerHandler sender)
   {
     for (ServerHandler targetHandler : servers)
     {
@@ -1557,13 +1616,13 @@
         ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
         try
         {
-          msgEmitter.send(errMsg);
+          sender.send(errMsg);
         } catch (IOException ioe1)
         {
           // an error happened on the sender session trying to recover
           // from an error on the receiver session.
           // We don't have much solution left beside closing the sessions.
-          stopServer(msgEmitter, false);
+          stopServer(sender, false);
           stopServer(targetHandler, false);
         }
       // TODO Handle error properly (sender timeout in addition)
@@ -1629,42 +1688,26 @@
    * @return The newly created and filled MonitorMsg. Null if the current thread
    *         was interrupted while attempting to get the domain lock.
    */
-  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
+  private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
   {
-    try
+    final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+    monitorMsg.setReplServerDbState(getLatestServerState());
+
+    // Add the server state for each connected DS and RS.
+    for (DataServerHandler dsHandler : this.connectedDSs.values())
     {
-      // Lock domain as we need to go through connected servers list
-      lock();
-    }
-    catch (InterruptedException e)
-    {
-      return null;
+      monitorMsg.setServerState(dsHandler.getServerId(),
+          dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
+          true);
     }
 
-    try
+    for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
     {
-      final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
-      monitorMsg.setReplServerDbState(getLatestServerState());
-
-      // Add the server state for each connected DS and RS.
-      for (DataServerHandler dsHandler : this.connectedDSs.values())
-      {
-        monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
-            .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
-      }
-
-      for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
-      {
-        monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
-            .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
-      }
-
-      return monitorMsg;
+      monitorMsg.setServerState(rsHandler.getServerId(),
+          rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
+          false);
     }
-    finally
-    {
-      release();
-    }
+    return monitorMsg;
   }
 
   /**
@@ -1678,6 +1721,7 @@
     assuredTimeoutTimer.cancel();
 
     stopAllServers(true);
+    statusAnalyzer.shutdown();
   }
 
   /**
@@ -1701,79 +1745,7 @@
     return "ReplicationServerDomain " + baseDN;
   }
 
-  /**
-   * Send a TopologyMsg to all the connected directory servers in order to let
-   * them know the topology (every known DSs and RSs).
-   *
-   * @param notThisOne
-   *          If not null, the topology message will not be sent to this DS.
-   */
-  private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
-  {
-    for (DataServerHandler dsHandler : connectedDSs.values())
-    {
-      if (dsHandler != notThisOne)
-      // All except the supplied one
-      {
-        for (int i=1; i<=2; i++)
-        {
-          if (!dsHandler.shuttingDown()
-              && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
-          {
-            TopologyMsg topoMsg =
-                createTopologyMsgForDS(dsHandler.getServerId());
-            try
-            {
-              dsHandler.sendTopoInfo(topoMsg);
-              break;
-            }
-            catch (IOException e)
-            {
-              if (i == 2)
-              {
-                logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, baseDN.toNormalizedString(), "directory",
-                    dsHandler.getServerId(), e.getMessage());
-              }
-            }
-          }
-          sleep(100);
-        }
-      }
-    }
-  }
 
-  /**
-   * Send a TopologyMsg to all the connected replication servers
-   * in order to let them know our connected LDAP servers.
-   */
-  private void sendTopoInfoToAllRSs()
-  {
-    TopologyMsg topoMsg = createTopologyMsgForRS();
-    for (ReplicationServerHandler rsHandler : connectedRSs.values())
-    {
-      for (int i=1; i<=2; i++)
-      {
-        if (!rsHandler.shuttingDown()
-            && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
-        {
-          try
-          {
-            rsHandler.sendTopoInfo(topoMsg);
-            break;
-          }
-          catch (IOException e)
-          {
-            if (i == 2)
-            {
-              logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, baseDN.toNormalizedString(), "replication",
-                  rsHandler.getServerId(), e.getMessage());
-            }
-          }
-        }
-        sleep(100);
-      }
-    }
-  }
 
   /**
    * Creates a TopologyMsg filled with information to be sent to a remote RS.
@@ -2031,7 +2003,7 @@
         return;
       }
 
-      sendTopoInfoToAllExcept(senderHandler);
+      enqueueTopoInfoToAllExcept(senderHandler);
 
       logger.info(NOTE_DIRECTORY_SERVER_CHANGED_STATUS,
           senderHandler.getServerId(), baseDN.toNormalizedString(), newStatus);
@@ -2053,7 +2025,7 @@
    * @param event The event to be used for new status computation
    * @return True if we have been interrupted (must stop), false otherwise
    */
-  public boolean changeStatus(DataServerHandler dsHandler,
+  private boolean changeStatus(DataServerHandler dsHandler,
       StatusMachineEvent event)
   {
     try
@@ -2106,7 +2078,7 @@
         return false;
       }
 
-      sendTopoInfoToAllExcept(dsHandler);
+      enqueueTopoInfoToAllExcept(dsHandler);
     }
     catch (Exception e)
     {
@@ -2125,7 +2097,7 @@
    */
   public void sendTopoInfoToAll()
   {
-    sendTopoInfoToAllExcept(null);
+    enqueueTopoInfoToAllExcept(null);
   }
 
   /**
@@ -2134,10 +2106,14 @@
    * @param dsHandler
    *          if not null, the topology message will not be sent to this DS
    */
-  private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
+  private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
   {
-    sendTopoInfoToAllDSsExcept(dsHandler);
-    sendTopoInfoToAllRSs();
+    synchronized (pendingStatusMessagesLock)
+    {
+      pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
+      pendingStatusMessages.enqueueTopoInfoToAllRSs();
+    }
+    statusAnalyzer.notifyPendingStatusMessage();
   }
 
   /**
@@ -2253,7 +2229,11 @@
        * Sends the currently known topology information to every connected
        * DS we have.
        */
-      sendTopoInfoToAllDSsExcept(null);
+      synchronized (pendingStatusMessagesLock)
+      {
+        pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
+      }
+      statusAnalyzer.notifyPendingStatusMessage();
     }
     catch(Exception e)
     {
@@ -2373,36 +2353,6 @@
   }
 
   /**
-   * Starts the status analyzer for the domain if not already started.
-   */
-  private void startStatusAnalyzer()
-  {
-    int degradedStatusThreshold =
-        localReplicationServer.getDegradedStatusThreshold();
-    if (degradedStatusThreshold > 0) // 0 means no status analyzer
-    {
-      final StatusAnalyzer thread = new StatusAnalyzer(this);
-      if (statusAnalyzer.compareAndSet(null, thread))
-      {
-        thread.start();
-      }
-    }
-  }
-
-  /**
-   * Stops the status analyzer for the domain.
-   */
-  private void stopStatusAnalyzer()
-  {
-    final StatusAnalyzer thread = statusAnalyzer.get();
-    if (thread != null && statusAnalyzer.compareAndSet(thread, null))
-    {
-      thread.shutdown();
-      thread.waitForShutdown();
-    }
-  }
-
-  /**
    * Starts the monitoring publisher for the domain if not already started.
    */
   private void startMonitoringPublisher()
@@ -2569,62 +2519,62 @@
   }
 
 
+
+  private void sendTopologyMsg(String type, ServerHandler handler,
+      TopologyMsg msg)
+  {
+    for (int i = 1; i <= 2; i++)
+    {
+      if (!handler.shuttingDown()
+          && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
+      {
+        try
+        {
+          handler.sendTopoInfo(msg);
+          break;
+        }
+        catch (IOException e)
+        {
+          if (i == 2)
+          {
+            logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO,
+                baseDN.toNormalizedString(), type, handler.getServerId(),
+                e.getMessage());
+          }
+        }
+      }
+      sleep(100);
+    }
+  }
+
+
+
   /**
    * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
    * value received, and forwarding the message to the other RSes.
    * @param senderHandler The handler for the server that sent the heartbeat.
    * @param msg The message to process.
    */
-  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
+  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
       ChangeTimeHeartbeatMsg msg)
   {
-    try
+    domainDB.replicaHeartbeat(baseDN, msg.getCSN());
+    if (senderHandler.isDataServer())
     {
-      // Acquire lock on domain (see more details in comment of start() method
-      // of ServerHandler)
-      lock();
-    }
-    catch (InterruptedException ex)
-    {
-      // We can't deal with this here, so re-interrupt thread so that it is
-      // caught during subsequent IO.
-      Thread.currentThread().interrupt();
-      return;
-    }
-
-    try
-    {
-      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
-      if (senderHandler.isDataServer())
+      /*
+       * If we are the first replication server warned, then forward the message
+       * to the remote replication servers.
+       */
+      synchronized (pendingStatusMessagesLock)
       {
-        // If we are the first replication server warned,
-        // then forwards the message to the remote replication servers
-        for (ReplicationServerHandler rsHandler : connectedRSs.values())
-        {
-          try
-          {
-            if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
-            {
-              rsHandler.send(msg);
-            }
-          }
-          catch (IOException e)
-          {
-            logger.traceException(e);
-            logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG,
-                "Replication Server " + localReplicationServer.getReplicationPort() + " "
-                    + baseDN + " " + localReplicationServer.getServerId());
-            stopServer(rsHandler, false);
-          }
-        }
+        pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
       }
-    }
-    finally
-    {
-      release();
+      statusAnalyzer.notifyPendingStatusMessage();
     }
   }
 
+
+
   /**
    * Get the latest (more recent) trim date of the changelog dbs associated
    * to this domain.
@@ -2660,26 +2610,6 @@
   }
 
   /**
-   * Update the status analyzer with the new threshold value.
-   *
-   * @param degradedStatusThreshold
-   *          The new threshold value.
-   */
-  void updateDegradedStatusThreshold(int degradedStatusThreshold)
-  {
-    if (degradedStatusThreshold == 0)
-    {
-      // Requested to stop analyzers
-      stopStatusAnalyzer();
-    }
-    else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
-    {
-      // Requested to start analyzers with provided threshold value
-      startStatusAnalyzer();
-    }
-  }
-
-  /**
    * Update the monitoring publisher with the new period value.
    *
    * @param period
@@ -2715,7 +2645,6 @@
    */
   public void register(DataServerHandler dsHandler)
   {
-    startStatusAnalyzer();
     startMonitoringPublisher();
 
     // connected with new DS: store handler.
@@ -2723,7 +2652,7 @@
 
     // Tell peer RSs and DSs a new DS just connected to us
     // No need to re-send TopologyMsg to this just new DS
-    sendTopoInfoToAllExcept(dsHandler);
+    enqueueTopoInfoToAllExcept(dsHandler);
   }
 
   /**
@@ -2765,7 +2694,7 @@
     // test
     if (degradedStatusThreshold > 0)
     {
-      for (DataServerHandler serverHandler : getConnectedDSs().values())
+      for (DataServerHandler serverHandler : connectedDSs.values())
       {
         // Get number of pending changes for this server
         final int nChanges = serverHandler.getRcvMsgQueueSize();
@@ -2801,4 +2730,158 @@
       }
     }
   }
+
+
+
+  /**
+   * Sends any enqueued status messages to the rest of the topology.
+   */
+  void sendPendingStatusMessages()
+  {
+    /*
+     * Take a snapshot of pending status notifications in order to avoid holding
+     * the broadcast lock for too long. In addition, clear the notifications so
+     * that they are not resent the next time.
+     */
+    final PendingStatusMessages savedState;
+    synchronized (pendingStatusMessagesLock)
+    {
+      savedState = pendingStatusMessages;
+      pendingStatusMessages = new PendingStatusMessages();
+    }
+    sendPendingChangeTimeHeartbeatMsgs(savedState);
+    sendPendingTopologyMsgs(savedState);
+    sendPendingMonitorMsgs(savedState);
+  }
+
+
+
+  private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
+  {
+    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
+        .entrySet())
+    {
+      ServerHandler ds = connectedDSs.get(msg.getKey());
+      if (ds != null)
+      {
+        try
+        {
+          ds.send(msg.getValue());
+        }
+        catch (IOException e)
+        {
+          // Ignore: connection closed.
+        }
+      }
+    }
+    for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
+        .entrySet())
+    {
+      ServerHandler rs = connectedRSs.get(msg.getKey());
+      if (rs != null)
+      {
+        try
+        {
+          rs.send(msg.getValue());
+        }
+        catch (IOException e)
+        {
+          // We log the error. The requestor will detect a timeout or
+          // any other failure on the connection.
+
+          // FIXME: why do we log for RSs but not DSs?
+          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getValue()
+              .getDestination());
+        }
+      }
+    }
+  }
+
+
+
+  private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
+  {
+    for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats
+        .values())
+    {
+      for (ReplicationServerHandler rsHandler : connectedRSs.values())
+      {
+        try
+        {
+          if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
+          {
+            rsHandler.send(pendingHeartbeat);
+          }
+        }
+        catch (IOException e)
+        {
+          logger.traceException(e);
+          logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, "Replication Server "
+              + localReplicationServer.getReplicationPort() + " " + baseDN
+              + " " + localReplicationServer.getServerId());
+          stopServer(rsHandler, false);
+        }
+      }
+    }
+  }
+
+
+
+  private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
+  {
+    if (pendingMsgs.sendDSTopologyMsg)
+    {
+      for (ServerHandler handler : connectedDSs.values())
+      {
+        if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
+        {
+          final TopologyMsg topoMsg = createTopologyMsgForDS(handler
+              .getServerId());
+          sendTopologyMsg("directory", handler, topoMsg);
+        }
+      }
+    }
+
+    if (pendingMsgs.sendRSTopologyMsg)
+    {
+      final TopologyMsg topoMsg = createTopologyMsgForRS();
+      for (ServerHandler handler : connectedRSs.values())
+      {
+        sendTopologyMsg("replication", handler, topoMsg);
+      }
+    }
+  }
+
+
+
+  private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
+  {
+    /*
+     * If the request comes from a Directory Server we need to build the full
+     * list of all servers in the topology and send back a MonitorMsg with the
+     * full list of all the servers in the topology.
+     */
+    if (sender.isDataServer())
+    {
+      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+          msg.getDestination(), msg.getSenderID(),
+          domainMonitor.getMonitorData());
+      synchronized (pendingStatusMessagesLock)
+      {
+        pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
+            monitorMsg);
+      }
+    }
+    else
+    {
+      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
+          msg.getDestination(), msg.getSenderID());
+      synchronized (pendingStatusMessagesLock)
+      {
+        pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
+            monitorMsg);
+      }
+    }
+    statusAnalyzer.notifyPendingStatusMessage();
+  }
 }

--
Gitblit v1.10.0