From 36191b70a96c298ad07cf9a9384cc42764ea957e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 22 Apr 2009 06:22:39 +0000
Subject: [PATCH] The replication publish information about the whole topology in cn=monitor When cn=monitor is searched, the replication therefore asks informations about the replication state to all Replication Servers. This should always be fast unless a server is hanged. In such case the replication waits for 5 seconds then issue an error message and goes on with the information it has received at this time.
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 115 +++++++++++++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 2
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 32 ++++-
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 162 ++++++++------------------
opends/src/server/org/opends/server/replication/server/MonitorData.java | 31 ----
5 files changed, 196 insertions(+), 146 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/MonitorData.java b/opends/src/server/org/opends/server/replication/server/MonitorData.java
index 0c96b2e..59f5e09 100644
--- a/opends/src/server/org/opends/server/replication/server/MonitorData.java
+++ b/opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -62,8 +62,6 @@
* date of the first missing change.
*/
- /* The date of the last time they have been elaborated */
- private long buildDate = 0;
// For each LDAP server, its server state
private ConcurrentHashMap<Short, ServerState> LDAPStates =
@@ -103,7 +101,7 @@
{
Long afmd = fmd.get(serverId);
if ((afmd != null) && (afmd>0))
- return ((this.getBuildDate() - afmd)/1000);
+ return (TimeThread.getTime() - afmd)/1000;
else
return 0;
}
@@ -243,7 +241,6 @@
TRACER.debugInfo(
"Complete monitor data : Missing changes ("+ lsiSid +")=" + mds);
}
- this.setBuildDate(TimeThread.getTime());
}
/**
@@ -255,7 +252,6 @@
{
String mds = "Monitor data=\n";
- mds+= "Build date=" + this.getBuildDate();
// RS data
Iterator<Short> rsite = fmRSDate.keySet().iterator();
while (rsite.hasNext())
@@ -281,10 +277,9 @@
ServerState ss = LDAPStates.get(sid);
mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString()
+ "] afmd=" + this.getApproxFirstMissingDate(sid);
- if (getBuildDate()>0)
- {
- mds += " missingDelay=" + this.getApproxDelay(sid);
- }
+
+ mds += " missingDelay=" + this.getApproxDelay(sid);
+
mds +=" missingCount=" + missingChanges.get(sid);
}
@@ -304,24 +299,6 @@
}
/**
- * Sets the build date of the data.
- * @param buildDate The date.
- */
- public void setBuildDate(long buildDate)
- {
- this.buildDate = buildDate;
- }
-
- /**
- * Returns the build date of the data.
- * @return The date.
- */
- public long getBuildDate()
- {
- return buildDate;
- }
-
- /**
* From a provided state, sets the max CN of the monitor data.
* @param state the provided state.
*/
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index cd75b10..fb991a8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -65,12 +67,14 @@
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.LDIFReader;
+import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
@@ -1094,4 +1098,115 @@
{
return replicationPort;
}
+
+ // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+ private long monitorDataLifeTime = 500;
+
+ /* The date of the last time they have been elaborated */
+ private long monitorDataLastBuildDate = 0;
+
+ /* Search op on monitor data is processed by a worker thread.
+ * Requests are sent to the other RS,and responses are received by the
+ * listener threads.
+ * The worker thread is awoke on this semaphore, or on timeout.
+ */
+ Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
+
+ /**
+ * Trigger the computation of the Global Monitoring Data.
+ * This should be called by all the MonitorProviders that need
+ * the global monitoring data to be updated before they can
+ * publish their information to cn=monitor.
+ *
+ * This method will trigger the update of all the global monitoring
+ * information of all the base-DNs of this replication Server.
+ *
+ * @throws DirectoryException If the computation cannot be achieved.
+ */
+ public void computeMonitorData() throws DirectoryException
+ {
+ if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
+ // The current data are still valid. No need to renew them.
+ return;
+ }
+
+ remoteMonitorResponsesSemaphore.drainPermits();
+ int count = 0;
+ for (ReplicationServerDomain domain : baseDNs.values())
+ {
+ count += domain.initializeMonitorData();
+ }
+
+ // Wait for responses
+ waitMonitorDataResponses(count);
+
+ for (ReplicationServerDomain domain : baseDNs.values())
+ {
+ domain.completeMonitorData();
+ }
+ }
+
+ /**
+ * Wait for the expected count of received MonitorMsg.
+ * @param expectedResponses The number of expected answers.
+ * @throws DirectoryException When an error occurs.
+ */
+ private void waitMonitorDataResponses(int expectedResponses)
+ throws DirectoryException
+ {
+ try
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + getMonitorInstanceName() + " baseDn=" +
+ " waiting for " + expectedResponses + " expected monitor messages");
+
+ boolean allPermitsAcquired =
+ remoteMonitorResponsesSemaphore.tryAcquire(
+ expectedResponses,
+ (long) 5000, TimeUnit.MILLISECONDS);
+
+ if (!allPermitsAcquired)
+ {
+ monitorDataLastBuildDate = TimeThread.getTime();
+ logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+ // let's go on in best effort even with limited data received.
+ } else
+ {
+ monitorDataLastBuildDate = TimeThread.getTime();
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + getMonitorInstanceName() + " baseDn=" +
+ " Successfully received all " + expectedResponses +
+ " expected monitor messages");
+ }
+ } catch (Exception e)
+ {
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+ }
+ }
+
+
+ /**
+ * This should be called by each ReplicationServerDomain that receives
+ * a response to a monitor request message.
+ */
+ public void responseReceived()
+ {
+ remoteMonitorResponsesSemaphore.release();
+ }
+
+
+ /**
+ * This should be called when the Monitoring has failed and the
+ * Worker thread that is waiting for the result should be awaken.
+ */
+ public void responseReceivedAll()
+ {
+ remoteMonitorResponsesSemaphore.notifyAll();
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 24888d5..ea99e4b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
@@ -65,7 +64,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
-import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
import java.util.Timer;
import java.util.TimerTask;
@@ -146,20 +144,12 @@
/* Monitor data management */
- // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
- private long monitorDataLifeTime = 500;
-
- /* Search op on monitor data is processed by a worker thread.
- * Requests are sent to the other RS,and responses are received by the
- * listener threads.
- * The worker thread is awoke on this semaphore, or on timeout.
- */
- Semaphore remoteMonitorResponsesSemaphore;
/**
* The monitor data consolidated over the topology.
*/
private MonitorData monitorData = new MonitorData();
private MonitorData wrkMonitorData;
+ private Object monitorDataLock = new Object();
/**
* The needed info for each received assured update message we are waiting
@@ -2255,23 +2245,30 @@
synchronized protected MonitorData computeMonitorData()
throws DirectoryException
{
- if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime())
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " getRemoteMonitorData in cache");
- // The current data are still valid. No need to renew them.
- return monitorData;
- }
+ // Update the monitorData of all domains if this was necessary.
+ replicationServer.computeMonitorData();
+ return monitorData;
+ }
- wrkMonitorData = new MonitorData();
- synchronized (wrkMonitorData)
+ /**
+ * Start collecting global monitoring information for this
+ * ReplicationServerDomain.
+ *
+ * @return The number of response that should come back.
+ *
+ * @throws DirectoryException In case the monitoring information could
+ * not be collected.
+ */
+
+ int initializeMonitorData() throws DirectoryException
+ {
+ synchronized (monitorDataLock)
{
+ wrkMonitorData = new MonitorData();
if (debugEnabled())
TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " Computing monitor data ");
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " Computing monitor data ");
// Let's process our directly connected LSes
// - in the ServerHandler for a given LS1, the stored state contains :
@@ -2299,7 +2296,7 @@
wrkMonitorData.setMaxCN(serverID, maxcn);
wrkMonitorData.setLDAPServerState(serverID, directlshState);
wrkMonitorData.setFirstMissingDate(serverID,
- directlsh.getApproxFirstMissingDate());
+ directlsh.getApproxFirstMissingDate());
}
// Then initialize the max CN for the LS that produced something
@@ -2319,44 +2316,35 @@
// and we need the remote ones.
if (debugEnabled())
TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " Local monitor data: " +
- wrkMonitorData.toString());
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " Local monitor data: " +
+ wrkMonitorData.toString());
}
- // Send Request to the other Replication Servers
- if (remoteMonitorResponsesSemaphore == null)
- {
- remoteMonitorResponsesSemaphore = new Semaphore(0);
- short requestCnt = sendMonitorDataRequest();
- // Wait reponses from them or timeout
- waitMonitorDataResponses(requestCnt);
- } else
- {
- // The processing of renewing the monitor cache is already running
- // We'll make it sleeping until the end
- // TODO: unit test for this case.
- while (remoteMonitorResponsesSemaphore != null)
- {
- waitMonitorDataResponses(1);
- }
- }
+ // Send the request for remote monitor data to the
+ return sendMonitorDataRequest();
+ }
+ /**
+ * Complete all the calculation when all monitoring information
+ * has been received.
+ */
+ void completeMonitorData()
+ {
wrkMonitorData.completeComputing();
// Store the new computed data as the reference
- synchronized (monitorData)
+ synchronized (monitorDataLock)
{
// Now we have the expected answers or an error occurred
monitorData = wrkMonitorData;
wrkMonitorData = null;
if (debugEnabled())
TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn + " *** Computed MonitorData: " +
- monitorData.toString());
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " *** Computed MonitorData: " +
+ monitorData.toString());
}
- return monitorData;
}
/**
@@ -2364,10 +2352,10 @@
* @return the number of requests sent.
* @throws DirectoryException when a problem occurs.
*/
- protected short sendMonitorDataRequest()
+ protected int sendMonitorDataRequest()
throws DirectoryException
{
- short sent = 0;
+ int sent = 0;
try
{
for (ServerHandler rs : replicationServers.values())
@@ -2389,49 +2377,6 @@
}
/**
- * Wait for the expected count of received MonitorMsg.
- * @param expectedResponses The number of expected answers.
- * @throws DirectoryException When an error occurs.
- */
- protected void waitMonitorDataResponses(int expectedResponses)
- throws DirectoryException
- {
- try
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn +
- " waiting for " + expectedResponses + " expected monitor messages");
-
- boolean allPermitsAcquired =
- remoteMonitorResponsesSemaphore.tryAcquire(
- expectedResponses,
- (long) 5000, TimeUnit.MILLISECONDS);
-
- if (!allPermitsAcquired)
- {
- logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
- // let's go on in best effort even with limited data received.
- } else
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDn=" + baseDn +
- " Successfully received all " + expectedResponses +
- " expected monitor messages");
- }
- } catch (Exception e)
- {
- logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
- } finally
- {
- remoteMonitorResponsesSemaphore = null;
- }
- }
-
- /**
* Processes a Monitor message receives from a remote Replication Server
* and stores the data received.
*
@@ -2442,23 +2387,20 @@
if (debugEnabled())
TRACER.debugInfo(
"In " + this.replicationServer.getMonitorInstanceName() +
- "Receiving " + msg + " from " + msg.getsenderID() +
- remoteMonitorResponsesSemaphore);
-
- if (remoteMonitorResponsesSemaphore == null)
- {
- // Let's ignore the remote monitor data just received
- // since the computing processing has been ended.
- // An error - probably a timemout - occurred that was already logged
- logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
- Short.toString(msg.getsenderID())));
- return;
- }
+ "Receiving " + msg + " from " + msg.getsenderID());
try
{
- synchronized (wrkMonitorData)
+ synchronized (monitorDataLock)
{
+ if (wrkMonitorData == null)
+ {
+ // This is a response for an earlier request whose computing is
+ // already complete.
+ logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
+ Short.toString(msg.getsenderID())));
+ return;
+ }
// Here is the RS state : list <serverID, lastChangeNumber>
// For each LDAP Server, we keep the max CN across the RSes
ServerState replServerState = msg.getReplServerDbState();
@@ -2523,7 +2465,7 @@
// Decreases the number of expected responses and potentially
// wakes up the waiting requestor thread.
- remoteMonitorResponsesSemaphore.release();
+ replicationServer.responseReceived();
} catch (Exception e)
{
@@ -2532,7 +2474,7 @@
// If an exception occurs while processing one of the expected message,
// the processing is aborted and the waiting thread is awoke.
- remoteMonitorResponsesSemaphore.notifyAll();
+ replicationServer.responseReceivedAll();
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 081cba3..b6c4e3d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -2064,6 +2064,18 @@
long delay = md.getApproxDelay(serverId);
attributes.add(Attributes.create("approximate-delay", String
.valueOf(delay)));
+
+ /* get the Server State */
+ AttributeBuilder builder = new AttributeBuilder("server-state");
+ ServerState state = md.getLDAPServerState(serverId);
+ if (state != null)
+ {
+ for (String str : state.toStringSet())
+ {
+ builder.add(str);
+ }
+ attributes.add(builder.toAttribute());
+ }
}
else
{
@@ -2071,6 +2083,18 @@
long missingChanges = md.getMissingChangesRS(serverId);
attributes.add(Attributes.create("missing-changes", String
.valueOf(missingChanges)));
+
+ /* get the Server State */
+ AttributeBuilder builder = new AttributeBuilder("server-state");
+ ServerState state = md.getRSStates(serverId);
+ if (state != null)
+ {
+ for (String str : state.toStringSet())
+ {
+ builder.add(str);
+ }
+ attributes.add(builder.toAttribute());
+ }
}
}
catch (Exception e)
@@ -2131,14 +2155,6 @@
attributes.add(Attributes.create("current-rcv-window", String
.valueOf(rcvWindow)));
- /* get the Server State */
- AttributeBuilder builder = new AttributeBuilder("server-state");
- for (String str : serverState.toStringSet())
- {
- builder.add(str);
- }
- attributes.add(builder.toAttribute());
-
// Encryption
attributes.add(Attributes.create("ssl-encryption", String
.valueOf(session.isEncrypted())));
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index e668820..641b14c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -175,7 +175,7 @@
ServerState state2 = states1.get(domain2ServerId);
assertNotNull(state2, "getReplicaStates is not showing DS2");
- Map<Short, ServerState> states2 = domain1.getReplicaStates();
+ Map<Short, ServerState> states2 = domain2.getReplicaStates();
ServerState state1 = states2.get(domain1ServerId);
assertNotNull(state1, "getReplicaStates is not showing DS1");
--
Gitblit v1.10.0