From 36191b70a96c298ad07cf9a9384cc42764ea957e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 22 Apr 2009 06:22:39 +0000
Subject: [PATCH] The replication publish information about the whole topology in cn=monitor When cn=monitor is searched, the replication therefore asks informations about the replication state to all Replication Servers. This should always be fast unless a server is hanged. In such case the replication waits for 5 seconds then issue an error message and goes on with the information it has received at this time.
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 162 +++++++++++++++++------------------------------------
1 files changed, 52 insertions(+), 110 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 24888d5..ea99e4b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
@@ -65,7 +64,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
-import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
import java.util.Timer;
import java.util.TimerTask;
@@ -146,20 +144,12 @@
/* Monitor data management */
- // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
- private long monitorDataLifeTime = 500;
-
- /* Search op on monitor data is processed by a worker thread.
- * Requests are sent to the other RS,and responses are received by the
- * listener threads.
- * The worker thread is awoke on this semaphore, or on timeout.
- */
- Semaphore remoteMonitorResponsesSemaphore;
/**
* The monitor data consolidated over the topology.
*/
private MonitorData monitorData = new MonitorData();
private MonitorData wrkMonitorData;
+ private Object monitorDataLock = new Object();
/**
* The needed info for each received assured update message we are waiting
@@ -2255,23 +2245,30 @@
synchronized protected MonitorData computeMonitorData()
throws DirectoryException
{
- if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime())
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " getRemoteMonitorData in cache");
- // The current data are still valid. No need to renew them.
- return monitorData;
- }
+ // Update the monitorData of all domains if this was necessary.
+ replicationServer.computeMonitorData();
+ return monitorData;
+ }
- wrkMonitorData = new MonitorData();
- synchronized (wrkMonitorData)
+ /**
+ * Start collecting global monitoring information for this
+ * ReplicationServerDomain.
+ *
+ * @return The number of response that should come back.
+ *
+ * @throws DirectoryException In case the monitoring information could
+ * not be collected.
+ */
+
+ int initializeMonitorData() throws DirectoryException
+ {
+ synchronized (monitorDataLock)
{
+ wrkMonitorData = new MonitorData();
if (debugEnabled())
TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " Computing monitor data ");
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " Computing monitor data ");
// Let's process our directly connected LSes
// - in the ServerHandler for a given LS1, the stored state contains :
@@ -2299,7 +2296,7 @@
wrkMonitorData.setMaxCN(serverID, maxcn);
wrkMonitorData.setLDAPServerState(serverID, directlshState);
wrkMonitorData.setFirstMissingDate(serverID,
- directlsh.getApproxFirstMissingDate());
+ directlsh.getApproxFirstMissingDate());
}
// Then initialize the max CN for the LS that produced something
@@ -2319,44 +2316,35 @@
// and we need the remote ones.
if (debugEnabled())
TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " Local monitor data: " +
- wrkMonitorData.toString());
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " Local monitor data: " +
+ wrkMonitorData.toString());
}
- // Send Request to the other Replication Servers
- if (remoteMonitorResponsesSemaphore == null)
- {
- remoteMonitorResponsesSemaphore = new Semaphore(0);
- short requestCnt = sendMonitorDataRequest();
- // Wait reponses from them or timeout
- waitMonitorDataResponses(requestCnt);
- } else
- {
- // The processing of renewing the monitor cache is already running
- // We'll make it sleeping until the end
- // TODO: unit test for this case.
- while (remoteMonitorResponsesSemaphore != null)
- {
- waitMonitorDataResponses(1);
- }
- }
+ // Send the request for remote monitor data to the
+ return sendMonitorDataRequest();
+ }
+ /**
+ * Complete all the calculation when all monitoring information
+ * has been received.
+ */
+ void completeMonitorData()
+ {
wrkMonitorData.completeComputing();
// Store the new computed data as the reference
- synchronized (monitorData)
+ synchronized (monitorDataLock)
{
// Now we have the expected answers or an error occurred
monitorData = wrkMonitorData;
wrkMonitorData = null;
if (debugEnabled())
TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " *** Computed MonitorData: " +
- monitorData.toString());
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " *** Computed MonitorData: " +
+ monitorData.toString());
}
- return monitorData;
}
/**
@@ -2364,10 +2352,10 @@
* @return the number of requests sent.
* @throws DirectoryException when a problem occurs.
*/
- protected short sendMonitorDataRequest()
+ protected int sendMonitorDataRequest()
throws DirectoryException
{
- short sent = 0;
+ int sent = 0;
try
{
for (ServerHandler rs : replicationServers.values())
@@ -2389,49 +2377,6 @@
}
/**
- * Wait for the expected count of received MonitorMsg.
- * @param expectedResponses The number of expected answers.
- * @throws DirectoryException When an error occurs.
- */
- protected void waitMonitorDataResponses(int expectedResponses)
- throws DirectoryException
- {
- try
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn +
- " waiting for " + expectedResponses + " expected monitor messages");
-
- boolean allPermitsAcquired =
- remoteMonitorResponsesSemaphore.tryAcquire(
- expectedResponses,
- (long) 5000, TimeUnit.MILLISECONDS);
-
- if (!allPermitsAcquired)
- {
- logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
- // let's go on in best effort even with limited data received.
- } else
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn +
- " Successfully received all " + expectedResponses +
- " expected monitor messages");
- }
- } catch (Exception e)
- {
- logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
- } finally
- {
- remoteMonitorResponsesSemaphore = null;
- }
- }
-
- /**
* Processes a Monitor message receives from a remote Replication Server
* and stores the data received.
*
@@ -2442,23 +2387,20 @@
if (debugEnabled())
TRACER.debugInfo(
"In " + this.replicationServer.getMonitorInstanceName() +
- "Receiving " + msg + " from " + msg.getsenderID() +
- remoteMonitorResponsesSemaphore);
-
- if (remoteMonitorResponsesSemaphore == null)
- {
- // Let's ignore the remote monitor data just received
- // since the computing processing has been ended.
- // An error - probably a timemout - occurred that was already logged
- logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
- Short.toString(msg.getsenderID())));
- return;
- }
+ "Receiving " + msg + " from " + msg.getsenderID());
try
{
- synchronized (wrkMonitorData)
+ synchronized (monitorDataLock)
{
+ if (wrkMonitorData == null)
+ {
+ // This is a response for an earlier request whose computing is
+ // already complete.
+ logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
+ Short.toString(msg.getsenderID())));
+ return;
+ }
// Here is the RS state : list <serverID, lastChangeNumber>
// For each LDAP Server, we keep the max CN across the RSes
ServerState replServerState = msg.getReplServerDbState();
@@ -2523,7 +2465,7 @@
// Decreases the number of expected responses and potentially
// wakes up the waiting requestor thread.
- remoteMonitorResponsesSemaphore.release();
+ replicationServer.responseReceived();
} catch (Exception e)
{
@@ -2532,7 +2474,7 @@
// If an exception occurs while processing one of the expected message,
// the processing is aborted and the waiting thread is awoke.
- remoteMonitorResponsesSemaphore.notifyAll();
+ replicationServer.responseReceivedAll();
}
}
--
Gitblit v1.10.0