From 4d0faf5b8ad46e978a72d35a8f736f83fb61fd2d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 30 Mar 2011 19:21:16 +0000
Subject: [PATCH] Fix issue OpenDJ-96: Replication server monitor data computation takes too long / blocks rest of server when another RS is cannot be reached
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 655 ++++++++++++++++++++++++++++++-----------------------------
1 files changed, 336 insertions(+), 319 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 92d4801..4f60457 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -35,17 +35,10 @@
import java.io.IOException;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -57,39 +50,12 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
-import org.opends.server.replication.protocol.MonitorMsg;
-import org.opends.server.replication.protocol.MonitorRequestMsg;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeBuilder;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.ResultCode;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
+import org.opends.server.types.*;
+import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
-import org.opends.server.replication.server.
- ReplicationServer.GlobalServerId;
/**
* This class define an in-memory cache that will be used to store
@@ -165,9 +131,47 @@
/**
* The monitor data consolidated over the topology.
*/
- private MonitorData monitorData = new MonitorData();
- private MonitorData wrkMonitorData;
- private final Object monitorDataLock = new Object();
+ private volatile MonitorData monitorData = new MonitorData();
+
+ // This lock guards against multiple concurrent monitor data recalculation.
+ private final Object pendingMonitorLock = new Object();
+
+ // Guarded by pendingMonitorLock.
+ private long monitorDataLastBuildDate = 0;
+
+ // The set of replication servers which are already known to be slow to send
+ // monitor data.
+ //
+ // Guarded by pendingMonitorLock.
+ private Set<Integer> monitorDataLateServers = new HashSet<Integer>();
+
+ // This lock serializes updates to the pending monitor data.
+ private final Object pendingMonitorDataLock = new Object();
+
+ // Monitor data which is currently being calculated.
+ //
+ // Guarded by pendingMonitorDataLock.
+ private MonitorData pendingMonitorData;
+
+ // A set containing the IDs of servers from which we are currently expecting
+ // monitor responses. When a response is received from a server we remove the
+ // ID from this table, and count down the latch if the ID was in the table.
+ //
+ // Guarded by pendingMonitorDataLock.
+ private final Set<Integer> pendingMonitorDataServerIDs =
+ new HashSet<Integer>();
+
+ // This latch is non-null and is used in order to count incoming responses as
+ // they arrive. Since incoming response may arrive at any time, even when
+ // there is no pending monitor request, access to the latch must be guarded.
+ //
+ // Guarded by pendingMonitorDataLock.
+ private CountDownLatch pendingMonitorDataLatch = null;
+
+ // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+ private final long monitorDataLifeTime = 500;
+
+
/**
* The needed info for each received assured update message we are waiting
@@ -188,7 +192,7 @@
// every n number of treated assured messages
private int assuredTimeoutTimerPurgeCounter = 0;
- ServerState ctHeartbeatState = null;
+ private ServerState ctHeartbeatState = null;
/**
* Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -1207,7 +1211,7 @@
* domain.
* @param handler the provided handler to unregister.
*/
- protected void unregisterServerHandler(ServerHandler handler)
+ private void unregisterServerHandler(ServerHandler handler)
{
if (handler.isReplicationServer())
{
@@ -1228,7 +1232,7 @@
* - traverse replicationServers list and test for each if DS are connected
* So it strongly relies on the directoryServers list
*/
- protected void mayResetGenerationId()
+ private void mayResetGenerationId()
{
if (debugEnabled())
TRACER.debugInfo(
@@ -1523,7 +1527,7 @@
* @param senderHandler The handler of the server that published this message.
* @return The list of destination handlers.
*/
- protected List<ServerHandler> getDestinationServers(RoutableMsg msg,
+ private List<ServerHandler> getDestinationServers(RoutableMsg msg,
ServerHandler senderHandler)
{
List<ServerHandler> servers =
@@ -1618,16 +1622,16 @@
if (senderHandler.isDataServer())
{
// Monitoring information requested by a DS
- MonitorMsg monitorMsg =
- createGlobalTopologyMonitorMsg(
- msg.getDestination(), msg.getSenderID(), false);
+ MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID(), monitorData);
- if (monitorMsg != null)
+ if (monitorMsg != null)
{
try
{
senderHandler.send(monitorMsg);
- } catch (IOException e)
+ }
+ catch (IOException e)
{
// the connection was closed.
}
@@ -1656,12 +1660,8 @@
}
} else if (msg instanceof MonitorMsg)
{
- MonitorMsg monitorMsg =
- (MonitorMsg) msg;
-
- GlobalServerId globalServerId =
- new GlobalServerId(baseDn, senderHandler.getServerId());
- receivesMonitorDataResponse(monitorMsg, globalServerId);
+ MonitorMsg monitorMsg = (MonitorMsg) msg;
+ receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId());
} else
{
logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1758,61 +1758,51 @@
}
+
+
/**
* Creates a new monitor message including monitoring information for the
* whole topology.
- * @param sender The sender of this message.
- * @param destination The destination of this message.
- * @param updateMonitorData A boolean indicating if the monitor data should
- * be updated. If false the last monitoring data
- * that was computed will be returned. This is
- * acceptable for most cases because the monitoring
- * thread computes the monitoring data frequently.
- * If true is used the calling thread may be
- * blocked for a while.
+ *
+ * @param sender
+ * The sender of this message.
+ * @param destination
+ * The destination of this message.
+ * @param monitorData
+ * The domain monitor data which should be used for the message.
* @return The newly created and filled MonitorMsg. Null if a problem occurred
- * during message creation.
+ * during message creation.
*/
public MonitorMsg createGlobalTopologyMonitorMsg(
- int sender, int destination, boolean updateMonitorData)
+ int sender, int destination, MonitorData monitorData)
{
MonitorMsg returnMsg =
new MonitorMsg(sender, destination);
- try
- {
- returnMsg.setReplServerDbState(getDbServerState());
- // Update the information we have about all servers
- // in the topology.
- MonitorData md = computeMonitorData(updateMonitorData);
+ returnMsg.setReplServerDbState(getDbServerState());
- // Add the informations about the Replicas currently in
- // the topology.
- Iterator<Integer> it = md.ldapIterator();
- while (it.hasNext())
- {
- int replicaId = it.next();
- returnMsg.setServerState(
- replicaId, md.getLDAPServerState(replicaId),
- md.getApproxFirstMissingDate(replicaId), true);
- }
-
- // Add the informations about the Replication Servers
- // currently in the topology.
- it = md.rsIterator();
- while (it.hasNext())
- {
- int replicaId = it.next();
- returnMsg.setServerState(
- replicaId, md.getRSStates(replicaId),
- md.getRSApproxFirstMissingDate(replicaId), false);
- }
- }
- catch (DirectoryException e)
+ // Add the informations about the Replicas currently in
+ // the topology.
+ Iterator<Integer> it = monitorData.ldapIterator();
+ while (it.hasNext())
{
- // If we can't compute the Monitor Information, send
- // back an empty message.
+ int replicaId = it.next();
+ returnMsg.setServerState(replicaId,
+ monitorData.getLDAPServerState(replicaId),
+ monitorData.getApproxFirstMissingDate(replicaId), true);
}
+
+ // Add the informations about the Replication Servers
+ // currently in the topology.
+ it = monitorData.rsIterator();
+ while (it.hasNext())
+ {
+ int replicaId = it.next();
+ returnMsg.setServerState(replicaId,
+ monitorData.getRSStates(replicaId),
+ monitorData.getRSApproxFirstMissingDate(replicaId), false);
+ }
+
return returnMsg;
}
@@ -2559,192 +2549,233 @@
* Monitor Data generation
* =======================
*/
- /**
- * Retrieves the global monitor data.
- * @param updateMonitorData A boolean indicating if the monitor data should
- * be updated. If false the last monitoring data
- * that was computed will be returned. This is
- * acceptable for most cases because the monitoring
- * thread computes the monitoring data frequently.
- * If true is used the calling thread may be
- * blocked for a while.
- * @return The monitor data.
- * @throws DirectoryException When an error occurs.
- */
- protected MonitorData computeMonitorData(boolean updateMonitorData)
- throws DirectoryException
- {
- synchronized (monitoringLock)
- {
- if (updateMonitorData)
- {
- // Update the monitorData of ALL domains if this was necessary.
- replicationServer.computeMonitorData();
- }
- // Returns the monitorData of THIS domain
- return monitorData;
- }
+ /**
+ * Returns the latest monitor data available for this replication server
+ * domain.
+ *
+ * @return The latest monitor data available for this replication server
+ * domain, which is never {@code null}.
+ */
+ MonitorData getDomainMonitorData()
+ {
+ return monitorData;
}
+
+
+ /**
+ * Recomputes the monitor data for this replication server domain.
+ *
+ * @return The recomputed monitor data for this replication server domain.
+ * @throws InterruptedException
+ * If this thread is interrupted while waiting for a response.
+ */
+ MonitorData computeDomainMonitorData() throws InterruptedException
+ {
+ // Only allow monitor recalculation at a time.
+ synchronized (pendingMonitorLock)
+ {
+ if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
+ .getTime())
+ {
+ try
+ {
+ // Prevent out of band monitor responses from updating our pending
+ // table until we are ready.
+ synchronized (pendingMonitorDataLock)
+ {
+ // Clear the pending monitor data.
+ pendingMonitorDataServerIDs.clear();
+ pendingMonitorData = new MonitorData();
+
+ // Initialize the monitor data.
+ initializePendingMonitorData();
+
+ // Send the monitor requests to the connected replication servers.
+ for (ReplicationServerHandler rs : replicationServers.values())
+ {
+ // Add server ID to pending table.
+ int serverId = rs.getServerId();
+
+ MonitorRequestMsg msg = new MonitorRequestMsg(
+ this.replicationServer.getServerId(), serverId);
+ try
+ {
+ rs.send(msg);
+
+ // Only register this server ID if we were able to send the
+ // message.
+ pendingMonitorDataServerIDs.add(serverId);
+ }
+ catch (IOException e)
+ {
+ // Log a message and do a best effort from here.
+ Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST
+ .get(baseDn, serverId, e.getMessage());
+ logError(message);
+ }
+ }
+
+ // Create the pending response latch based on the number of expected
+ // monitor responses.
+ pendingMonitorDataLatch = new CountDownLatch(
+ pendingMonitorDataServerIDs.size());
+ }
+
+ // Wait for the responses to come back.
+ pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
+
+ // Log messages for replication servers that have gone or come back.
+ synchronized (pendingMonitorDataLock)
+ {
+ // Log servers that have come back.
+ for (int serverId : monitorDataLateServers)
+ {
+ // Ensure that we only log once per server: don't fill the
+ // error log with repeated messages.
+ if (!pendingMonitorDataServerIDs.contains(serverId))
+ {
+ logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn,
+ serverId));
+ }
+ }
+
+ // Log servers that have gone away.
+ for (int serverId : pendingMonitorDataServerIDs)
+ {
+ // Ensure that we only log once per server: don't fill the
+ // error log with repeated messages.
+ if (!monitorDataLateServers.contains(serverId))
+ {
+ logError(ERR_MISSING_REMOTE_MONITOR_DATA.get(baseDn,
+ serverId));
+ }
+ }
+
+ // Remember which servers were late this time.
+ monitorDataLateServers.clear();
+ monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
+ }
+
+ // Store the new computed data as the reference
+ synchronized (pendingMonitorDataLock)
+ {
+ // Now we have the expected answers or an error occurred
+ pendingMonitorData.completeComputing();
+ monitorData = pendingMonitorData;
+ monitorDataLastBuildDate = TimeThread.getTime();
+ }
+ }
+ finally
+ {
+ synchronized (pendingMonitorDataLock)
+ {
+ // Clear pending state.
+ pendingMonitorData = null;
+ pendingMonitorDataLatch = null;
+ pendingMonitorDataServerIDs.clear();
+ }
+ }
+ }
+ }
+
+ return monitorData;
+ }
+
+
+
/**
* Start collecting global monitoring information for this
* ReplicationServerDomain.
*
- * @param expectedMonitoringMsg The list of server handler we have to wait a
- * monitoring message from. Will be filled as necessary by this method.
+ * @throws DirectoryException
+ * In case the monitoring information could not be collected.
+ */
+
+ private void initializePendingMonitorData()
+ {
+ // Let's process our directly connected LSes
+ // - in the ServerHandler for a given LS1, the stored state contains :
+ // - the max CN produced by LS1
+ // - the last CN consumed by LS1 from LS2..n
+ // - in the RSdomain/dbHandler, the built-in state contains :
+ // - the max CN produced by each server
+ // So for a given LS connected we can take the state and the max from
+ // the LS/state.
+
+ for (ServerHandler directlsh : directoryServers.values())
+ {
+ int serverID = directlsh.getServerId();
+
+ // the state comes from the state stored in the SH
+ ServerState directlshState = directlsh.getServerState()
+ .duplicate();
+
+ // the max CN sent by that LS also comes from the SH
+ ChangeNumber maxcn = directlshState
+ .getMaxChangeNumber(serverID);
+ if (maxcn == null)
+ {
+ // This directly connected LS has never produced any change
+ maxcn = new ChangeNumber(0, 0, serverID);
+ }
+ pendingMonitorData.setMaxCN(serverID, maxcn);
+ pendingMonitorData.setLDAPServerState(serverID, directlshState);
+ pendingMonitorData.setFirstMissingDate(serverID,
+ directlsh.getApproxFirstMissingDate());
+ }
+
+ // Then initialize the max CN for the LS that produced something
+ // - from our own local db state
+ // - whatever they are directly or indirectly connected
+ ServerState dbServerState = getDbServerState();
+ pendingMonitorData.setRSState(replicationServer.getServerId(),
+ dbServerState);
+ Iterator<Integer> it = dbServerState.iterator();
+ while (it.hasNext())
+ {
+ int sid = it.next();
+ ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
+ pendingMonitorData.setMaxCN(sid, storedCN);
+ }
+ }
+
+
+
+ /**
+ * Processes a Monitor message receives from a remote Replication Server and
+ * stores the data received.
*
- * @throws DirectoryException In case the monitoring information could
- * not be collected.
- */
-
- void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
- throws DirectoryException
- {
- synchronized (monitorDataLock)
- {
- wrkMonitorData = new MonitorData();
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " Computing monitor data ");
-
- // Let's process our directly connected LSes
- // - in the ServerHandler for a given LS1, the stored state contains :
- // - the max CN produced by LS1
- // - the last CN consumed by LS1 from LS2..n
- // - in the RSdomain/dbHandler, the built-in state contains :
- // - the max CN produced by each server
- // So for a given LS connected we can take the state and the max from
- // the LS/state.
-
- for (ServerHandler directlsh : directoryServers.values())
- {
- int serverID = directlsh.getServerId();
-
- // the state comes from the state stored in the SH
- ServerState directlshState = directlsh.getServerState().duplicate();
-
- // the max CN sent by that LS also comes from the SH
- ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
- if (maxcn == null)
- {
- // This directly connected LS has never produced any change
- maxcn = new ChangeNumber(0, 0, serverID);
- }
- wrkMonitorData.setMaxCN(serverID, maxcn);
- wrkMonitorData.setLDAPServerState(serverID, directlshState);
- wrkMonitorData.setFirstMissingDate(serverID,
- directlsh.getApproxFirstMissingDate());
- }
-
- // Then initialize the max CN for the LS that produced something
- // - from our own local db state
- // - whatever they are directly or indirectly connected
- ServerState dbServerState = getDbServerState();
- wrkMonitorData.setRSState(replicationServer.getServerId(), dbServerState);
- Iterator<Integer> it = dbServerState.iterator();
- while (it.hasNext())
- {
- int sid = it.next();
- ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
- wrkMonitorData.setMaxCN(sid, storedCN);
- }
-
- // Now we have used all available local informations
- // and we need the remote ones.
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " Local monitor data: " +
- wrkMonitorData.toString());
- }
-
- // Send the request for remote monitor data to the
- sendMonitorDataRequest(expectedMonitoringMsg);
- }
-
- /**
- * Complete all the calculation when all monitoring information
- * has been received.
- */
- void completeMonitorData()
- {
- // Store the new computed data as the reference
- synchronized (monitorDataLock)
- {
- // Now we have the expected answers or an error occurred
- wrkMonitorData.completeComputing();
- monitorData = wrkMonitorData;
- wrkMonitorData = null;
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " *** Computed MonitorData: " +
- monitorData.toString());
- }
- }
-
- /**
- * Sends a MonitorRequest message to all connected RS.
- * @param expectedMonitoringMsg The list of server handler we have to wait a
- * monitoring message from. Will be filled as necessary by this method.
- * @throws DirectoryException when a problem occurs.
- */
- protected void sendMonitorDataRequest(
- List<GlobalServerId> expectedMonitoringMsg)
- throws DirectoryException
- {
- try
- {
- for (ServerHandler rs : replicationServers.values())
- {
- int serverId = rs.getServerId();
- // Store the fact that we expect a MonitoringMsg back from this server
- expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
- MonitorRequestMsg msg =
- new MonitorRequestMsg(this.replicationServer.getServerId(),
- serverId);
- rs.send(msg);
- }
- } catch (Exception e)
- {
- Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
- logError(message);
- throw new DirectoryException(ResultCode.OTHER,
- message, e);
- }
- }
-
- /**
- * Processes a Monitor message receives from a remote Replication Server
- * and stores the data received.
- *
- * @param msg The message to be processed.
- * @param globalServerHandlerId server handler that is receiving the message.
+ * @param msg
+ * The message to be processed.
+ * @param globalServerHandlerId
+ * server handler that is receiving the message.
*/
private void receivesMonitorDataResponse(MonitorMsg msg,
- GlobalServerId globalServerId)
+ int serverId)
{
- try
+ synchronized (pendingMonitorDataLock)
{
- synchronized (monitorDataLock)
+ if (pendingMonitorData == null)
{
- if (wrkMonitorData == null)
- {
- // This is a response for an earlier request whose computing is
- // already complete.
- logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
- Integer.toString(msg.getSenderID())));
- return;
- }
+ // This is a response for an earlier request whose computing is
+ // already complete.
+ logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn,
+ msg.getSenderID()));
+ return;
+ }
+
+ try
+ {
// Here is the RS state : list <serverID, lastChangeNumber>
// For each LDAP Server, we keep the max CN across the RSes
ServerState replServerState = msg.getReplServerDbState();
- wrkMonitorData.setMaxCNs(replServerState);
+ pendingMonitorData.setMaxCNs(replServerState);
// store the remote RS states.
- wrkMonitorData.setRSState(msg.getSenderID(), replServerState);
+ pendingMonitorData.setRSState(msg.getSenderID(),
+ replServerState);
// Store the remote LDAP servers states
Iterator<Integer> lsidIterator = msg.ldapIterator();
@@ -2752,10 +2783,10 @@
{
int sid = lsidIterator.next();
ServerState dsServerState = msg.getLDAPServerState(sid);
- wrkMonitorData.setMaxCNs(dsServerState);
- wrkMonitorData.setLDAPServerState(sid, dsServerState);
- wrkMonitorData.setFirstMissingDate(sid,
- msg.getLDAPApproxFirstMissingDate(sid));
+ pendingMonitorData.setMaxCNs(dsServerState);
+ pendingMonitorData.setLDAPServerState(sid, dsServerState);
+ pendingMonitorData.setFirstMissingDate(sid,
+ msg.getLDAPApproxFirstMissingDate(sid));
}
// Process the latency reported by the remote RSi on its connections
@@ -2768,50 +2799,49 @@
{
// this is the latency of the remote RSi regarding the current RS
// let's update the fmd of my connected LS
- for (ServerHandler connectedlsh : directoryServers.values())
+ for (ServerHandler connectedlsh : directoryServers
+ .values())
{
int connectedlsid = connectedlsh.getServerId();
Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
- wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+ pendingMonitorData.setFirstMissingDate(connectedlsid,
+ newfmd);
}
- } else
+ }
+ else
{
// this is the latency of the remote RSi regarding another RSj
// let's update the latency of the LSes connected to RSj
- ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
+ ReplicationServerHandler rsjHdr = replicationServers
+ .get(rsid);
if (rsjHdr != null)
{
- for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
+ for (int remotelsid : rsjHdr
+ .getConnectedDirectoryServerIds())
{
Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
- wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
+ pendingMonitorData.setFirstMissingDate(remotelsid,
+ newfmd);
}
}
}
}
- if (debugEnabled())
+ }
+ catch (RuntimeException e)
+ {
+ // FIXME: do we really expect these???
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e
+ .getMessage() + stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ // Decreases the number of expected responses and potentially
+ // wakes up the waiting requestor thread.
+ if (pendingMonitorDataServerIDs.remove(serverId))
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this +
- " baseDn=" + baseDn +
- " Processed msg from " + msg.getSenderID() +
- " New monitor data: " + wrkMonitorData.toString());
+ pendingMonitorDataLatch.countDown();
}
}
-
- // Decreases the number of expected responses and potentially
- // wakes up the waiting requestor thread.
- replicationServer.responseReceived(globalServerId);
-
- } catch (Exception e)
- {
- logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
- stackTraceToSingleLineString(e)));
-
- // If an exception occurs while processing one of the expected message,
- // the processing is aborted and the waiting thread is awoke.
- replicationServer.responseReceivedAll();
}
}
@@ -2846,6 +2876,8 @@
{
return replicationServers;
}
+
+
/**
* A synchronization mechanism is created to insure exclusive access to the
* domain. The goal is to have a consistent view of the topology by locking
@@ -2868,11 +2900,6 @@
private ReentrantLock lock = new ReentrantLock();
/**
- * This lock is used to protect the monitoring computing.
- */
- private final Object monitoringLock = new Object();
-
- /**
* This lock is used to protect the generationid variable.
*/
private final Object generationIDLock = new Object();
@@ -3073,23 +3100,13 @@
builder.add(baseDn.toString() + " " + generationId);
attributes.add(builder.toAttribute());
- try
- {
- MonitorData md = computeMonitorData(true);
+ MonitorData md = getDomainMonitorData();
- // Missing changes
- long missingChanges =
- md.getMissingChangesRS(replicationServer.getServerId());
- attributes.add(Attributes.create("missing-changes", String.valueOf(
- missingChanges)));
- }
- catch (Exception e)
- {
- Message message =
- ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e));
- // We failed retrieving the monitor data.
- attributes.add(Attributes.create("error", message.toString()));
- }
+ // Missing changes
+ long missingChanges = md.getMissingChangesRS(replicationServer
+ .getServerId());
+ attributes.add(Attributes.create("missing-changes",
+ String.valueOf(missingChanges)));
return attributes;
}
--
Gitblit v1.10.0