From 21af6610b07617ecbf1b826310a2f244deb4d348 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 25 Mar 2014 15:02:51 +0000
Subject: [PATCH] Fix OPENDJ-1354 - replication threads BLOCKED in pendingChanges queue
---
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 695 ++++++++++++++++++++++++++++++++-------------------------
1 files changed, 389 insertions(+), 306 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 02d6849..456f68c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -29,6 +29,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -75,12 +76,10 @@
private final DN baseDN;
/**
- * The Status analyzer that periodically verifies whether the connected DSs
- * are late. Using an AtomicReference to avoid leaking references to costly
- * threads.
+ * Periodically verifies whether the connected DSs are late and publishes any
+ * pending status messages.
*/
- private AtomicReference<StatusAnalyzer> statusAnalyzer =
- new AtomicReference<StatusAnalyzer>();
+ private final StatusAnalyzer statusAnalyzer;
/**
* The monitoring publisher that periodically sends monitoring messages to the
@@ -166,6 +165,98 @@
*/
private int assuredTimeoutTimerPurgeCounter = 0;
+
+
+ /**
+ * Stores pending status messages such as DS change time heartbeats for future
+ * forwarding to the rest of the topology. This class is required in order to
+ * decouple inbound IO processing from outbound IO processing and avoid
+ * potential inter-process deadlocks. In particular, the {@code ServerReader}
+ * thread must not send messages.
+ */
+ private static class PendingStatusMessages
+ {
+ private final Map<Integer, ChangeTimeHeartbeatMsg> pendingHeartbeats =
+ new HashMap<Integer, ChangeTimeHeartbeatMsg>(1);
+ private final Map<Integer, MonitorMsg> pendingDSMonitorMsgs =
+ new HashMap<Integer, MonitorMsg>(1);
+ private final Map<Integer, MonitorMsg> pendingRSMonitorMsgs =
+ new HashMap<Integer, MonitorMsg>(1);
+ private boolean sendRSTopologyMsg;
+ private boolean sendDSTopologyMsg;
+ private int excludedDSForTopologyMsg = -1;
+
+
+
+ /**
+ * Enqueues a TopologyMsg for all the connected directory servers in order
+ * to let them know the topology (every known DSs and RSs).
+ *
+ * @param excludedDS
+ * If not null, the topology message will not be sent to this DS.
+ */
+ private void enqueueTopoInfoToAllDSsExcept(DataServerHandler excludedDS)
+ {
+ int excludedServerId = excludedDS != null ? excludedDS.getServerId() : -1;
+ if (sendDSTopologyMsg)
+ {
+ if (excludedServerId != excludedDSForTopologyMsg)
+ {
+ excludedDSForTopologyMsg = -1;
+ }
+ }
+ else
+ {
+ sendDSTopologyMsg = true;
+ excludedDSForTopologyMsg = excludedServerId;
+ }
+ }
+
+
+
+ /**
+ * Enqueues a TopologyMsg for all the connected replication servers in order
+ * to let them know our connected LDAP servers.
+ */
+ private void enqueueTopoInfoToAllRSs()
+ {
+ sendRSTopologyMsg = true;
+ }
+
+
+
+ /**
+ * Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
+ * all other RS instances.
+ *
+ * @param msg
+ * The heartbeat message.
+ */
+ private void enqueueChangeTimeHeartbeatMsg(ChangeTimeHeartbeatMsg msg)
+ {
+ pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
+ }
+
+
+
+ private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
+ {
+ pendingDSMonitorMsgs.put(dsServerId, msg);
+ }
+
+
+
+ private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
+ {
+ pendingRSMonitorMsgs.put(rsServerId, msg);
+ }
+ }
+
+ private final Object pendingStatusMessagesLock = new Object();
+
+ /** @GuardedBy("pendingStatusMessagesLock") */
+ private PendingStatusMessages pendingStatusMessages = new PendingStatusMessages();
+
/**
* Creates a new ReplicationServerDomain associated to the baseDN.
*
@@ -184,7 +275,8 @@
+ ") assured timer for domain \"" + baseDN + "\"", true);
this.domainDB =
localReplicationServer.getChangelogDB().getReplicationDomainDB();
-
+ this.statusAnalyzer = new StatusAnalyzer(this);
+ this.statusAnalyzer.start();
DirectoryServer.registerMonitorProvider(this);
}
@@ -704,7 +796,7 @@
* @param ack The ack message received.
* @param ackingServer The server handler of the server that sent the ack.
*/
- public void processAck(AckMsg ack, ServerHandler ackingServer)
+ void processAck(AckMsg ack, ServerHandler ackingServer)
{
// Retrieve the expected acks info for the update matching the original
// sent update.
@@ -990,21 +1082,12 @@
if (connectedRSs.containsKey(sHandler.getServerId()))
{
unregisterServerHandler(sHandler, shutdown, false);
- } else if (connectedDSs.containsKey(sHandler.getServerId()))
+ }
+ else if (connectedDSs.containsKey(sHandler.getServerId()))
{
- // If this is the last DS for the domain,
- // shutdown the status analyzer
- if (connectedDSs.size() == 1)
- {
- if (logger.isTraceEnabled())
- {
- debug("remote server " + sHandler
- + " is the last DS to be stopped: stopping status analyzer");
- }
- stopStatusAnalyzer();
- }
unregisterServerHandler(sHandler, shutdown, true);
- } else if (otherHandlers.contains(sHandler))
+ }
+ else if (otherHandlers.contains(sHandler))
{
unregisterOtherHandler(sHandler);
}
@@ -1038,15 +1121,19 @@
resetGenerationIdIfPossible();
if (!shutdown)
{
- if (isDirectoryServer)
+ synchronized (pendingStatusMessagesLock)
{
- // Update the remote replication servers with our list
- // of connected LDAP servers
- sendTopoInfoToAllRSs();
+ if (isDirectoryServer)
+ {
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ pendingStatusMessages.enqueueTopoInfoToAllRSs();
+ }
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
}
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- sendTopoInfoToAllDSsExcept(null);
+ statusAnalyzer.notifyPendingStatusMessage();
}
}
@@ -1384,99 +1471,71 @@
return servers;
}
+
+
/**
* Processes a message coming from one server in the topology and potentially
* forwards it to one or all other servers.
*
* @param msg
* The message received and to be processed.
- * @param msgEmitter
- * The server handler of the server that emitted the message.
+ * @param sender
+ * The server handler of the server that sent the message.
*/
- public void process(RoutableMsg msg, ServerHandler msgEmitter)
+ void process(RoutableMsg msg, ServerHandler sender)
{
- // Test the message for which a ReplicationServer is expected
- // to be the destination
- if (!(msg instanceof InitializeRequestMsg) &&
- !(msg instanceof InitializeTargetMsg) &&
- !(msg instanceof InitializeRcvAckMsg) &&
- !(msg instanceof EntryMsg) &&
- !(msg instanceof DoneMsg) &&
- (msg.getDestination() == this.localReplicationServer.getServerId()))
+ if (msg.getDestination() == localReplicationServer.getServerId())
{
+ // Handle routable messages targeted at this RS.
if (msg instanceof ErrorMsg)
{
ErrorMsg errorMsg = (ErrorMsg) msg;
logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
- } else if (msg instanceof MonitorRequestMsg)
- {
- replyWithTopologyMonitorMsg(msg, msgEmitter);
- } else if (msg instanceof MonitorMsg)
- {
- MonitorMsg monitorMsg = (MonitorMsg) msg;
- domainMonitor.receiveMonitorDataResponse(monitorMsg,
- msgEmitter.getServerId());
- } else
- {
- replyWithUnroutableMsgType(msgEmitter, msg);
}
- return;
- }
-
- List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
- if (!servers.isEmpty())
- {
- forwardMsgToAllServers(msg, servers, msgEmitter);
+ else
+ {
+ replyWithUnroutableMsgType(sender, msg);
+ }
}
else
{
- replyWithUnreachablePeerMsg(msgEmitter, msg);
+ // Forward message not destined for this RS.
+ List<ServerHandler> servers = getDestinationServers(msg, sender);
+ if (!servers.isEmpty())
+ {
+ forwardMsgToAllServers(msg, servers, sender);
+ }
+ else
+ {
+ replyWithUnreachablePeerMsg(sender, msg);
+ }
}
}
- private void replyWithTopologyMonitorMsg(RoutableMsg msg,
- ServerHandler msgEmitter)
+ /**
+ * Responds to a monitor request message.
+ *
+ * @param msg
+ * The monitor request message.
+ * @param sender
+ * The DS/RS which sent the monitor request.
+ */
+ void processMonitorRequestMsg(MonitorRequestMsg msg, ServerHandler sender)
{
- /*
- * If the request comes from a Directory Server we need to build the full
- * list of all servers in the topology and send back a MonitorMsg with the
- * full list of all the servers in the topology.
- */
- if (msgEmitter.isDataServer())
- {
- // Monitoring information requested by a DS
- try
- {
- MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
- msg.getDestination(), msg.getSenderID(),
- domainMonitor.getMonitorData());
- msgEmitter.send(monitorMsg);
- }
- catch (IOException e)
- {
- // the connection was closed.
- }
- }
- else
- {
- // Monitoring information requested by a RS
- MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
- msg.getDestination(), msg.getSenderID());
+ enqueueMonitorMsg(msg, sender);
+ }
- if (monitorMsg != null)
- {
- try
- {
- msgEmitter.send(monitorMsg);
- }
- catch (IOException e)
- {
- // We log the error. The requestor will detect a timeout or
- // any other failure on the connection.
- logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getDestination());
- }
- }
- }
+ /**
+ * Responds to a monitor message.
+ *
+ * @param msg
+ * The monitor message
+ * @param sender
+ * The DS/RS which sent the monitor.
+ */
+ void processMonitorMsg(MonitorMsg msg, ServerHandler sender)
+ {
+ domainMonitor.receiveMonitorDataResponse(msg, sender.getServerId());
}
private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
@@ -1532,7 +1591,7 @@
}
private void forwardMsgToAllServers(RoutableMsg msg,
- List<ServerHandler> servers, ServerHandler msgEmitter)
+ List<ServerHandler> servers, ServerHandler sender)
{
for (ServerHandler targetHandler : servers)
{
@@ -1557,13 +1616,13 @@
ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
try
{
- msgEmitter.send(errMsg);
+ sender.send(errMsg);
} catch (IOException ioe1)
{
// an error happened on the sender session trying to recover
// from an error on the receiver session.
// We don't have much solution left beside closing the sessions.
- stopServer(msgEmitter, false);
+ stopServer(sender, false);
stopServer(targetHandler, false);
}
// TODO Handle error properly (sender timeout in addition)
@@ -1629,42 +1688,26 @@
* @return The newly created and filled MonitorMsg. Null if the current thread
* was interrupted while attempting to get the domain lock.
*/
- public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
+ private MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
{
- try
+ final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+ monitorMsg.setReplServerDbState(getLatestServerState());
+
+ // Add the server state for each connected DS and RS.
+ for (DataServerHandler dsHandler : this.connectedDSs.values())
{
- // Lock domain as we need to go through connected servers list
- lock();
- }
- catch (InterruptedException e)
- {
- return null;
+ monitorMsg.setServerState(dsHandler.getServerId(),
+ dsHandler.getServerState(), dsHandler.getApproxFirstMissingDate(),
+ true);
}
- try
+ for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
{
- final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
- monitorMsg.setReplServerDbState(getLatestServerState());
-
- // Add the server state for each connected DS and RS.
- for (DataServerHandler dsHandler : this.connectedDSs.values())
- {
- monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
- .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
- }
-
- for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
- {
- monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
- .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
- }
-
- return monitorMsg;
+ monitorMsg.setServerState(rsHandler.getServerId(),
+ rsHandler.getServerState(), rsHandler.getApproxFirstMissingDate(),
+ false);
}
- finally
- {
- release();
- }
+ return monitorMsg;
}
/**
@@ -1678,6 +1721,7 @@
assuredTimeoutTimer.cancel();
stopAllServers(true);
+ statusAnalyzer.shutdown();
}
/**
@@ -1701,79 +1745,7 @@
return "ReplicationServerDomain " + baseDN;
}
- /**
- * Send a TopologyMsg to all the connected directory servers in order to let
- * them know the topology (every known DSs and RSs).
- *
- * @param notThisOne
- * If not null, the topology message will not be sent to this DS.
- */
- private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
- {
- for (DataServerHandler dsHandler : connectedDSs.values())
- {
- if (dsHandler != notThisOne)
- // All except the supplied one
- {
- for (int i=1; i<=2; i++)
- {
- if (!dsHandler.shuttingDown()
- && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
- {
- TopologyMsg topoMsg =
- createTopologyMsgForDS(dsHandler.getServerId());
- try
- {
- dsHandler.sendTopoInfo(topoMsg);
- break;
- }
- catch (IOException e)
- {
- if (i == 2)
- {
- logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, baseDN.toNormalizedString(), "directory",
- dsHandler.getServerId(), e.getMessage());
- }
- }
- }
- sleep(100);
- }
- }
- }
- }
- /**
- * Send a TopologyMsg to all the connected replication servers
- * in order to let them know our connected LDAP servers.
- */
- private void sendTopoInfoToAllRSs()
- {
- TopologyMsg topoMsg = createTopologyMsgForRS();
- for (ReplicationServerHandler rsHandler : connectedRSs.values())
- {
- for (int i=1; i<=2; i++)
- {
- if (!rsHandler.shuttingDown()
- && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
- {
- try
- {
- rsHandler.sendTopoInfo(topoMsg);
- break;
- }
- catch (IOException e)
- {
- if (i == 2)
- {
- logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO, baseDN.toNormalizedString(), "replication",
- rsHandler.getServerId(), e.getMessage());
- }
- }
- }
- sleep(100);
- }
- }
- }
/**
* Creates a TopologyMsg filled with information to be sent to a remote RS.
@@ -2031,7 +2003,7 @@
return;
}
- sendTopoInfoToAllExcept(senderHandler);
+ enqueueTopoInfoToAllExcept(senderHandler);
logger.info(NOTE_DIRECTORY_SERVER_CHANGED_STATUS,
senderHandler.getServerId(), baseDN.toNormalizedString(), newStatus);
@@ -2053,7 +2025,7 @@
* @param event The event to be used for new status computation
* @return True if we have been interrupted (must stop), false otherwise
*/
- public boolean changeStatus(DataServerHandler dsHandler,
+ private boolean changeStatus(DataServerHandler dsHandler,
StatusMachineEvent event)
{
try
@@ -2106,7 +2078,7 @@
return false;
}
- sendTopoInfoToAllExcept(dsHandler);
+ enqueueTopoInfoToAllExcept(dsHandler);
}
catch (Exception e)
{
@@ -2125,7 +2097,7 @@
*/
public void sendTopoInfoToAll()
{
- sendTopoInfoToAllExcept(null);
+ enqueueTopoInfoToAllExcept(null);
}
/**
@@ -2134,10 +2106,14 @@
* @param dsHandler
* if not null, the topology message will not be sent to this DS
*/
- private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
+ private void enqueueTopoInfoToAllExcept(DataServerHandler dsHandler)
{
- sendTopoInfoToAllDSsExcept(dsHandler);
- sendTopoInfoToAllRSs();
+ synchronized (pendingStatusMessagesLock)
+ {
+ pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(dsHandler);
+ pendingStatusMessages.enqueueTopoInfoToAllRSs();
+ }
+ statusAnalyzer.notifyPendingStatusMessage();
}
/**
@@ -2253,7 +2229,11 @@
* Sends the currently known topology information to every connected
* DS we have.
*/
- sendTopoInfoToAllDSsExcept(null);
+ synchronized (pendingStatusMessagesLock)
+ {
+ pendingStatusMessages.enqueueTopoInfoToAllDSsExcept(null);
+ }
+ statusAnalyzer.notifyPendingStatusMessage();
}
catch(Exception e)
{
@@ -2373,36 +2353,6 @@
}
/**
- * Starts the status analyzer for the domain if not already started.
- */
- private void startStatusAnalyzer()
- {
- int degradedStatusThreshold =
- localReplicationServer.getDegradedStatusThreshold();
- if (degradedStatusThreshold > 0) // 0 means no status analyzer
- {
- final StatusAnalyzer thread = new StatusAnalyzer(this);
- if (statusAnalyzer.compareAndSet(null, thread))
- {
- thread.start();
- }
- }
- }
-
- /**
- * Stops the status analyzer for the domain.
- */
- private void stopStatusAnalyzer()
- {
- final StatusAnalyzer thread = statusAnalyzer.get();
- if (thread != null && statusAnalyzer.compareAndSet(thread, null))
- {
- thread.shutdown();
- thread.waitForShutdown();
- }
- }
-
- /**
* Starts the monitoring publisher for the domain if not already started.
*/
private void startMonitoringPublisher()
@@ -2569,62 +2519,62 @@
}
+
+ private void sendTopologyMsg(String type, ServerHandler handler,
+ TopologyMsg msg)
+ {
+ for (int i = 1; i <= 2; i++)
+ {
+ if (!handler.shuttingDown()
+ && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
+ {
+ try
+ {
+ handler.sendTopoInfo(msg);
+ break;
+ }
+ catch (IOException e)
+ {
+ if (i == 2)
+ {
+ logger.error(ERR_EXCEPTION_SENDING_TOPO_INFO,
+ baseDN.toNormalizedString(), type, handler.getServerId(),
+ e.getMessage());
+ }
+ }
+ }
+ sleep(100);
+ }
+ }
+
+
+
/**
* Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
* value received, and forwarding the message to the other RSes.
* @param senderHandler The handler for the server that sent the heartbeat.
* @param msg The message to process.
*/
- public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
+ void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
ChangeTimeHeartbeatMsg msg)
{
- try
+ domainDB.replicaHeartbeat(baseDN, msg.getCSN());
+ if (senderHandler.isDataServer())
{
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- }
- catch (InterruptedException ex)
- {
- // We can't deal with this here, so re-interrupt thread so that it is
- // caught during subsequent IO.
- Thread.currentThread().interrupt();
- return;
- }
-
- try
- {
- domainDB.replicaHeartbeat(baseDN, msg.getCSN());
- if (senderHandler.isDataServer())
+ /*
+ * If we are the first replication server warned, then forward the message
+ * to the remote replication servers.
+ */
+ synchronized (pendingStatusMessagesLock)
{
- // If we are the first replication server warned,
- // then forwards the message to the remote replication servers
- for (ReplicationServerHandler rsHandler : connectedRSs.values())
- {
- try
- {
- if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
- {
- rsHandler.send(msg);
- }
- }
- catch (IOException e)
- {
- logger.traceException(e);
- logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG,
- "Replication Server " + localReplicationServer.getReplicationPort() + " "
- + baseDN + " " + localReplicationServer.getServerId());
- stopServer(rsHandler, false);
- }
- }
+ pendingStatusMessages.enqueueChangeTimeHeartbeatMsg(msg);
}
- }
- finally
- {
- release();
+ statusAnalyzer.notifyPendingStatusMessage();
}
}
+
+
/**
* Get the latest (more recent) trim date of the changelog dbs associated
* to this domain.
@@ -2660,26 +2610,6 @@
}
/**
- * Update the status analyzer with the new threshold value.
- *
- * @param degradedStatusThreshold
- * The new threshold value.
- */
- void updateDegradedStatusThreshold(int degradedStatusThreshold)
- {
- if (degradedStatusThreshold == 0)
- {
- // Requested to stop analyzers
- stopStatusAnalyzer();
- }
- else if (statusAnalyzer.get() == null && connectedDSs.size() > 0)
- {
- // Requested to start analyzers with provided threshold value
- startStatusAnalyzer();
- }
- }
-
- /**
* Update the monitoring publisher with the new period value.
*
* @param period
@@ -2715,7 +2645,6 @@
*/
public void register(DataServerHandler dsHandler)
{
- startStatusAnalyzer();
startMonitoringPublisher();
// connected with new DS: store handler.
@@ -2723,7 +2652,7 @@
// Tell peer RSs and DSs a new DS just connected to us
// No need to re-send TopologyMsg to this just new DS
- sendTopoInfoToAllExcept(dsHandler);
+ enqueueTopoInfoToAllExcept(dsHandler);
}
/**
@@ -2765,7 +2694,7 @@
// test
if (degradedStatusThreshold > 0)
{
- for (DataServerHandler serverHandler : getConnectedDSs().values())
+ for (DataServerHandler serverHandler : connectedDSs.values())
{
// Get number of pending changes for this server
final int nChanges = serverHandler.getRcvMsgQueueSize();
@@ -2801,4 +2730,158 @@
}
}
}
+
+
+
+ /**
+ * Sends any enqueued status messages to the rest of the topology.
+ */
+ void sendPendingStatusMessages()
+ {
+ /*
+ * Take a snapshot of pending status notifications in order to avoid holding
+ * the broadcast lock for too long. In addition, clear the notifications so
+ * that they are not resent the next time.
+ */
+ final PendingStatusMessages savedState;
+ synchronized (pendingStatusMessagesLock)
+ {
+ savedState = pendingStatusMessages;
+ pendingStatusMessages = new PendingStatusMessages();
+ }
+ sendPendingChangeTimeHeartbeatMsgs(savedState);
+ sendPendingTopologyMsgs(savedState);
+ sendPendingMonitorMsgs(savedState);
+ }
+
+
+
+ private void sendPendingMonitorMsgs(final PendingStatusMessages pendingMsgs)
+ {
+ for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingDSMonitorMsgs
+ .entrySet())
+ {
+ ServerHandler ds = connectedDSs.get(msg.getKey());
+ if (ds != null)
+ {
+ try
+ {
+ ds.send(msg.getValue());
+ }
+ catch (IOException e)
+ {
+ // Ignore: connection closed.
+ }
+ }
+ }
+ for (Entry<Integer, MonitorMsg> msg : pendingMsgs.pendingRSMonitorMsgs
+ .entrySet())
+ {
+ ServerHandler rs = connectedRSs.get(msg.getKey());
+ if (rs != null)
+ {
+ try
+ {
+ rs.send(msg.getValue());
+ }
+ catch (IOException e)
+ {
+ // We log the error. The requestor will detect a timeout or
+ // any other failure on the connection.
+
+ // FIXME: why do we log for RSs but not DSs?
+ logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, msg.getValue()
+ .getDestination());
+ }
+ }
+ }
+ }
+
+
+
+ private void sendPendingChangeTimeHeartbeatMsgs(PendingStatusMessages pendingMsgs)
+ {
+ for (ChangeTimeHeartbeatMsg pendingHeartbeat : pendingMsgs.pendingHeartbeats
+ .values())
+ {
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
+ {
+ try
+ {
+ if (rsHandler.getProtocolVersion() >= REPLICATION_PROTOCOL_V3)
+ {
+ rsHandler.send(pendingHeartbeat);
+ }
+ }
+ catch (IOException e)
+ {
+ logger.traceException(e);
+ logger.error(ERR_CHANGELOG_ERROR_SENDING_MSG, "Replication Server "
+ + localReplicationServer.getReplicationPort() + " " + baseDN
+ + " " + localReplicationServer.getServerId());
+ stopServer(rsHandler, false);
+ }
+ }
+ }
+ }
+
+
+
+ private void sendPendingTopologyMsgs(PendingStatusMessages pendingMsgs)
+ {
+ if (pendingMsgs.sendDSTopologyMsg)
+ {
+ for (ServerHandler handler : connectedDSs.values())
+ {
+ if (handler.getServerId() != pendingMsgs.excludedDSForTopologyMsg)
+ {
+ final TopologyMsg topoMsg = createTopologyMsgForDS(handler
+ .getServerId());
+ sendTopologyMsg("directory", handler, topoMsg);
+ }
+ }
+ }
+
+ if (pendingMsgs.sendRSTopologyMsg)
+ {
+ final TopologyMsg topoMsg = createTopologyMsgForRS();
+ for (ServerHandler handler : connectedRSs.values())
+ {
+ sendTopologyMsg("replication", handler, topoMsg);
+ }
+ }
+ }
+
+
+
+ private void enqueueMonitorMsg(MonitorRequestMsg msg, ServerHandler sender)
+ {
+ /*
+ * If the request comes from a Directory Server we need to build the full
+ * list of all servers in the topology and send back a MonitorMsg with the
+ * full list of all the servers in the topology.
+ */
+ if (sender.isDataServer())
+ {
+ MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID(),
+ domainMonitor.getMonitorData());
+ synchronized (pendingStatusMessagesLock)
+ {
+ pendingStatusMessages.enqueueDSMonitorMsg(sender.getServerId(),
+ monitorMsg);
+ }
+ }
+ else
+ {
+ MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID());
+ synchronized (pendingStatusMessagesLock)
+ {
+ pendingStatusMessages.enqueueRSMonitorMsg(sender.getServerId(),
+ monitorMsg);
+ }
+ }
+ statusAnalyzer.notifyPendingStatusMessage();
+ }
}
--
Gitblit v1.10.0