From 40e2acfd1e9676f3b63385b15075bf1395d4543e Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 01 Feb 2008 13:21:19 +0000
Subject: [PATCH] Fix 2598 - fixes for global replication monitoring
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 400 +++++++++++++++++++++++++++++---------------------------
1 files changed, 209 insertions(+), 191 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 89726d4..77e5d5c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,7 +37,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -129,8 +128,8 @@
/* Monitor data management */
- // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
- private long remoteMonitorDataLifeTime = 500;
+ // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+ private long monitorDataLifeTime = 500;
/* Search op on monitor data is processed by a worker thread.
* Requests are sent to the other RS,and responses are received by the
@@ -139,21 +138,11 @@
*/
Semaphore remoteMonitorResponsesSemaphore;
- /* The date of the last time they have been elaborated */
- private long validityDate = 0;
-
- // For each LDAP server, its server state
- private HashMap<Short, ServerState> LDAPStates =
- new HashMap<Short, ServerState>();
-
- // For each LDAP server, the last CN it published
- private HashMap<Short, ChangeNumber> maxCNs =
- new HashMap<Short, ChangeNumber>();
-
- // For each LDAP server, an approximation of the date of the first missing
- // change
- private HashMap<Short, Long> approxFirstMissingDate =
- new HashMap<Short, Long>();
+ /**
+ * The monitor data consolidated over the topology.
+ */
+ private MonitorData monitorData = new MonitorData();
+ private MonitorData wrkMonitorData;
/**
* Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -166,13 +155,7 @@
{
this.baseDn = baseDn;
this.replicationServer = replicationServer;
-
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " Created Cache for " + baseDn + " " +
- stackTraceToSingleLineString(new Exception()));
-}
+ }
/**
* Add an update that has been received to the list of
@@ -366,6 +349,10 @@
{
replicationServers.remove(handler.getServerId());
handler.stopHandler();
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
}
}
else
@@ -374,12 +361,12 @@
{
connectedServers.remove(handler.getServerId());
handler.stopHandler();
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
}
}
-
- // Update the remote replication servers with our list
- // of connected LDAP servers
- sendReplServerInfo();
}
/**
@@ -578,7 +565,8 @@
*
* @param serverId Identifier of the server for which the iterator is created.
* @param changeNumber Starting point for the iterator.
- * @return the created ReplicationIterator.
+ * @return the created ReplicationIterator. Null when no DB is available
+ * for the provided server Id.
*/
public ReplicationIterator getChangelogIterator(short serverId,
ChangeNumber changeNumber)
@@ -591,7 +579,8 @@
{
return handler.generateIterator(changeNumber);
}
- catch (Exception e) {
+ catch (Exception e)
+ {
return null;
}
}
@@ -759,6 +748,7 @@
*/
public void process(RoutableMessage msg, ServerHandler senderHandler)
{
+
// Test the message for which a ReplicationServer is expected
// to be the destination
if (msg.getDestination() == this.replicationServer.getServerId())
@@ -779,20 +769,33 @@
replServerMonitorRequestMsg.getDestination(),
replServerMonitorRequestMsg.getsenderID());
- // Populate the RS state in the msg from the DbState
- monitorMsg.setReplServerState(this.getDbServerState());
-
// Populate for each connected LDAP Server
// from the states stored in the serverHandler.
// - the server state
// - the older missing change
for (ServerHandler lsh : this.connectedServers.values())
{
- monitorMsg.setLDAPServerState(
+ monitorMsg.setServerState(
lsh.getServerId(),
lsh.getServerState(),
- lsh.getApproxFirstMissingDate());
+ lsh.getApproxFirstMissingDate(),
+ true);
}
+
+ // Same for the connected RS
+ for (ServerHandler rsh : this.replicationServers.values())
+ {
+ monitorMsg.setServerState(
+ rsh.getServerId(),
+ rsh.getServerState(),
+ rsh.getApproxFirstMissingDate(),
+ false);
+ }
+
+ // Populate the RS state in the msg from the DbState
+ monitorMsg.setReplServerDbState(this.getDbServerState());
+
+
try
{
senderHandler.send(monitorMsg);
@@ -1305,118 +1308,135 @@
}
}
- /*
+ /* =======================
* Monitor Data generation
+ * =======================
*/
/**
- * Retrieves the remote monitor data.
- *
+ * Retrieves the global monitor data.
+ * @return The monitor data.
* @throws DirectoryException When an error occurs.
*/
- protected void retrievesRemoteMonitorData()
+ synchronized protected MonitorData getMonitorData()
throws DirectoryException
{
- if (validityDate > TimeThread.getTime())
+ if (monitorData.getBuildDate() + monitorDataLifeTime
+ > TimeThread.getTime())
{
- // The current data are still valid. No need to renew them.
- return;
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " getRemoteMonitorData in cache");
+ // The current data are still valid. No need to renew them.
+ // FIXME
+ return null;
}
- // Clean
- this.LDAPStates.clear();
- this.maxCNs.clear();
-
- // Init the maxCNs of our direct LDAP servers from our own dbstate
- for (ServerHandler rs : connectedServers.values())
+ wrkMonitorData = new MonitorData();
+ synchronized(wrkMonitorData)
{
- short serverID = rs.getServerId();
- ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
- if (cn == null)
- {
- // we have nothing in db for that server
- cn = new ChangeNumber(0, 0 , serverID);
- }
- this.maxCNs.put(serverID, cn);
- }
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " Computing monitor data ");
- ServerState replServerState = this.getDbServerState();
- Iterator<Short> it = replServerState.iterator();
- while (it.hasNext())
- {
- short sid = it.next();
- ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
- ChangeNumber maxCN = this.maxCNs.get(sid);
- if ((maxCN != null) && (receivedCN.newer(maxCN)))
+ // Let's process our directly connected LSes
+ // - in the ServerHandler for a given LS1, the stored state contains :
+ // - the max CN produced by LS1
+ // - the last CN consumed by LS1 from LS2..n
+ // - in the RSdomain/dbHandler, the built-in state contains :
+ // - the max CN produced by each server
+ // So for a given LS connected we can take the state and the max from
+ // the LS/state.
+
+ for (ServerHandler directlsh : connectedServers.values())
{
- // We found a newer one
- this.maxCNs.remove(sid);
- this.maxCNs.put(sid, receivedCN);
+ short serverID = directlsh.getServerId();
+
+ // the state comes from the state stored in the SH
+ ServerState directlshState = directlsh.getServerState().duplicate();
+
+ // the max CN sent by that LS also comes from the SH
+ ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
+ if (maxcn == null)
+ {
+ // This directly connected LS has never produced any change
+ maxcn = new ChangeNumber(0, 0 , serverID);
+ }
+ wrkMonitorData.setMaxCN(serverID, maxcn);
+ wrkMonitorData.setLDAPServerState(serverID, directlshState);
+ wrkMonitorData.setFirstMissingDate(serverID, directlsh.
+ getApproxFirstMissingDate());
}
+
+ // Then initialize the max CN for the LS that produced something
+ // - from our own local db state
+ // - whatever they are directly or undirectly connected
+ ServerState dbServerState = getDbServerState();
+ Iterator<Short> it = dbServerState.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
+ wrkMonitorData.setMaxCN(sid, storedCN);
+ }
+
+ // Now we have used all available local informations
+ // and we need the remote ones.
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " Local monitor data: " +
+ wrkMonitorData.toString());
}
// Send Request to the other Replication Servers
if (remoteMonitorResponsesSemaphore == null)
{
- remoteMonitorResponsesSemaphore = new Semaphore(
- replicationServers.size() -1);
-
- sendMonitorDataRequest();
-
+ remoteMonitorResponsesSemaphore = new Semaphore(0);
+ short requestCnt = sendMonitorDataRequest();
// Wait reponses from them or timeout
- waitMonitorDataResponses(replicationServers.size());
+ waitMonitorDataResponses(requestCnt);
}
else
{
// The processing of renewing the monitor cache is already running
// We'll make it sleeping until the end
+ // TODO: unit test for this case.
while (remoteMonitorResponsesSemaphore!=null)
{
waitMonitorDataResponses(1);
}
}
- // Now we have the expected answers of an error occured
- validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
+ wrkMonitorData.completeComputing();
- if (debugEnabled())
+ // Store the new computed data as the reference
+ synchronized(monitorData)
{
- debugMonitorData();
- }
- }
-
- private void debugMonitorData()
- {
- String mds = " Monitor data=";
- Iterator<Short> ite = LDAPStates.keySet().iterator();
- while (ite.hasNext())
- {
- Short sid = ite.next();
- ServerState ss = LDAPStates.get(sid);
- mds += " LDAPState(" + sid + ")=" + ss.toString();
- }
- Iterator<Short> itc = maxCNs.keySet().iterator();
- while (itc.hasNext())
- {
- Short sid = itc.next();
- ChangeNumber cn = maxCNs.get(sid);
- mds += " maxCNs(" + sid + ")=" + cn.toString();
- }
-
- mds += "--";
- TRACER.debugInfo(
+ // Now we have the expected answers or an error occured
+ monitorData = wrkMonitorData;
+ wrkMonitorData = null;
+ if (debugEnabled())
+ TRACER.debugInfo(
"In " + this.replicationServer.getMonitorInstanceName() +
- " baseDN=" + baseDn +
- mds);
+ " baseDn=" + baseDn + " *** Computed MonitorData: " +
+ monitorData.toString());
+ }
+ return monitorData;
}
+
/**
* Sends a MonitorRequest message to all connected RS.
+ * @return the number of requests sent.
* @throws DirectoryException when a problem occurs.
*/
- protected void sendMonitorDataRequest()
+ protected short sendMonitorDataRequest()
throws DirectoryException
{
+ short sent=0;
try
{
for (ServerHandler rs : replicationServers.values())
@@ -1425,6 +1445,7 @@
MonitorRequestMessage(this.replicationServer.getServerId(),
rs.getServerId());
rs.send(msg);
+ sent++;
}
}
catch(Exception e)
@@ -1434,6 +1455,7 @@
throw new DirectoryException(ResultCode.OTHER,
message, e);
}
+ return sent;
}
/**
@@ -1446,21 +1468,30 @@
{
try
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn +
+ " waiting for " + expectedResponses
+ + " expected monitor messages");
+
boolean allPermitsAcquired =
remoteMonitorResponsesSemaphore.tryAcquire(
expectedResponses,
- (long) 500, TimeUnit.MILLISECONDS);
+ (long) 5000, TimeUnit.MILLISECONDS);
if (!allPermitsAcquired)
{
logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+ // FIXME let's go on in best effort even with limited data received.
}
else
{
if (debugEnabled())
TRACER.debugInfo(
"In " + this.replicationServer.getMonitorInstanceName() +
- "Successfully received all " + replicationServers.size()
+ " baseDn=" + baseDn +
+ " Successfully received all " + expectedResponses
+ " expected monitor messages");
}
}
@@ -1482,48 +1513,94 @@
*/
public void receivesMonitorDataResponse(MonitorMessage msg)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "Receiving " + msg + " from " + msg.getsenderID() +
+ remoteMonitorResponsesSemaphore);
+
if (remoteMonitorResponsesSemaphore == null)
{
- // Ignoring the remote monitor data because an error occured previously
+ // FIXME
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "Receiving " + msg + " from " + msg.getsenderID() +
+ " remoteMonitorResponsesSemaphore should not be null"));
+ // Ignoring the remote monitor data because an error occured
+ // previously
return;
}
try
{
- // Here is the RS state : list <serverID, lastChangeNumber>
- // For each LDAP Server, we keep the max CN accross the RSes
- ServerState replServerState = msg.getReplServerState();
- Iterator<Short> it = replServerState.iterator();
- while (it.hasNext())
+ synchronized(wrkMonitorData)
{
- short sid = it.next();
- ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
- ChangeNumber maxCN = this.maxCNs.get(sid);
- if (receivedCN.newer(maxCN))
- {
- // We found a newer one
- this.maxCNs.remove(sid);
- this.maxCNs.put(sid, receivedCN);
- }
- }
+ // Here is the RS state : list <serverID, lastChangeNumber>
+ // For each LDAP Server, we keep the max CN accross the RSes
+ ServerState replServerState = msg.getReplServerDbState();
+ wrkMonitorData.setMaxCNs(replServerState);
- // Store the LDAP servers states
- Iterator<Short> sidIterator = msg.iterator();
- while (sidIterator.hasNext())
- {
- short sid = sidIterator.next();
- ServerState ss = msg.getLDAPServerState(sid);
- this.LDAPStates.put(sid, ss);
- this.approxFirstMissingDate.put(sid,
- msg.getApproxFirstMissingDate(sid));
+ // Store the remote LDAP servers states
+ Iterator<Short> lsidIterator = msg.ldapIterator();
+ while (lsidIterator.hasNext())
+ {
+ short sid = lsidIterator.next();
+ wrkMonitorData.setLDAPServerState(sid,
+ msg.getLDAPServerState(sid).duplicate());
+ wrkMonitorData.setFirstMissingDate(sid,
+ msg.getLDAPApproxFirstMissingDate(sid));
+ }
+
+ // Process the latency reported by the remote RSi on its connections
+ // to the other RSes
+ Iterator<Short> rsidIterator = msg.rsIterator();
+ while (rsidIterator.hasNext())
+ {
+ short rsid = rsidIterator.next();
+ if (rsid == replicationServer.getServerId())
+ {
+ // this is the latency of the remote RSi regarding the current RS
+ // let's update the fmd of my connected LS
+ for (ServerHandler connectedlsh : connectedServers.values())
+ {
+ short connectedlsid = connectedlsh.getServerId();
+ Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+ wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+ }
+ }
+ else
+ {
+ // this is the latency of the remote RSi regarding another RSj
+ // let's update the latency of the LSes connected to RSj
+ ServerHandler rsjHdr = replicationServers.get(rsid);
+ for(short remotelsid : rsjHdr.getConnectedServerIds())
+ {
+ Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+ wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
+ }
+ }
+ }
+ if (debugEnabled())
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn +
+ " Processed msg from " + msg.getsenderID() +
+ " New monitor data: " + wrkMonitorData.toString());
+ }
}
// Decreases the number of expected responses and potentially
// wakes up the waiting requestor thread.
remoteMonitorResponsesSemaphore.release();
+
}
catch (Exception e)
{
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
+ stackTraceToSingleLineString(e)));
+
// If an exception occurs while processing one of the expected message,
// the processing is aborted and the waiting thread is awoke.
remoteMonitorResponsesSemaphore.notifyAll();
@@ -1531,65 +1608,6 @@
}
/**
- * Get the state of the LDAP server with the provided serverId.
- * @param serverId The server ID.
- * @return The server state.
- */
- public ServerState getServerState(short serverId)
- {
- return LDAPStates.get(serverId);
- }
-
- /**
- * Get the highest know change number of the LDAP server with the provided
- * serverId.
- * @param serverId The server ID.
- * @return The highest change number.
- */
- public ChangeNumber getMaxCN(short serverId)
- {
- return maxCNs.get(serverId);
- }
-
- /**
- * Get an approximation of the date of the oldest missing changes.
- * serverId.
- * @param serverId The server ID.
- * @return The approximation of the date of the oldest missing change.
- */
- public Long getApproxFirstMissingDate(short serverId)
- {
- return approxFirstMissingDate.get(serverId);
- }
-
- /**
- * Get the number of missing change for the server with the provided state.
- * @param state The provided server state.
- * @return The number of missing changes.
- */
- public int getMissingChanges(ServerState state)
- {
- // Traverse the max Cn transmitted by each server
- // For each server, get the highest CN know from the current server
- // Sum the difference betwenn the max and the last
- int missingChanges = 0;
- Iterator<Short> itc = maxCNs.keySet().iterator();
- while (itc.hasNext())
- {
- Short sid = itc.next();
- ChangeNumber maxCN = maxCNs.get(sid);
- ChangeNumber last = state.getMaxChangeNumber(sid);
- if (last == null)
- {
- last = new ChangeNumber(0,0, sid);
- }
- int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
- missingChanges += missingChangesFromSID;
- }
- return missingChanges;
- }
-
- /**
* Set the purge delay on all the db Handlers for this Domain
* of Replicaiton.
*
--
Gitblit v1.10.0