From 36191b70a96c298ad07cf9a9384cc42764ea957e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 22 Apr 2009 06:22:39 +0000
Subject: [PATCH] The replication publish information about the whole topology in cn=monitor When cn=monitor is searched, the replication therefore asks informations about the replication state to all Replication Servers. This should always be fast unless a server is hanged. In such case the replication waits for 5 seconds then issue an error message and goes on with the information it has received at this time.
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 115 insertions(+), 0 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index cd75b10..fb991a8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -65,12 +67,14 @@
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.LDIFReader;
+import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
@@ -1094,4 +1098,115 @@
{
return replicationPort;
}
+
+ // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+ private long monitorDataLifeTime = 500;
+
+ /* The date of the last time they have been elaborated */
+ private long monitorDataLastBuildDate = 0;
+
+ /* Search op on monitor data is processed by a worker thread.
+ * Requests are sent to the other RS,and responses are received by the
+ * listener threads.
+ * The worker thread is awoke on this semaphore, or on timeout.
+ */
+ Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
+
+ /**
+ * Trigger the computation of the Global Monitoring Data.
+ * This should be called by all the MonitorProviders that need
+ * the global monitoring data to be updated before they can
+ * publish their information to cn=monitor.
+ *
+ * This method will trigger the update of all the global monitoring
+ * information of all the base-DNs of this replication Server.
+ *
+ * @throws DirectoryException If the computation cannot be achieved.
+ */
+ public void computeMonitorData() throws DirectoryException
+ {
+ if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
+ // The current data are still valid. No need to renew them.
+ return;
+ }
+
+ remoteMonitorResponsesSemaphore.drainPermits();
+ int count = 0;
+ for (ReplicationServerDomain domain : baseDNs.values())
+ {
+ count += domain.initializeMonitorData();
+ }
+
+ // Wait for responses
+ waitMonitorDataResponses(count);
+
+ for (ReplicationServerDomain domain : baseDNs.values())
+ {
+ domain.completeMonitorData();
+ }
+ }
+
+ /**
+ * Wait for the expected count of received MonitorMsg.
+ * @param expectedResponses The number of expected answers.
+ * @throws DirectoryException When an error occurs.
+ */
+ private void waitMonitorDataResponses(int expectedResponses)
+ throws DirectoryException
+ {
+ try
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + getMonitorInstanceName() + " baseDn=" +
+ " waiting for " + expectedResponses + " expected monitor messages");
+
+ boolean allPermitsAcquired =
+ remoteMonitorResponsesSemaphore.tryAcquire(
+ expectedResponses,
+ (long) 5000, TimeUnit.MILLISECONDS);
+
+ if (!allPermitsAcquired)
+ {
+ monitorDataLastBuildDate = TimeThread.getTime();
+ logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+ // let's go on in best effort even with limited data received.
+ } else
+ {
+ monitorDataLastBuildDate = TimeThread.getTime();
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + getMonitorInstanceName() + " baseDn=" +
+ " Successfully received all " + expectedResponses +
+ " expected monitor messages");
+ }
+ } catch (Exception e)
+ {
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+ }
+ }
+
+
+ /**
+ * This should be called by each ReplicationServerDomain that receives
+ * a response to a monitor request message.
+ */
+ public void responseReceived()
+ {
+ remoteMonitorResponsesSemaphore.release();
+ }
+
+
+ /**
+ * This should be called when the Monitoring has failed and the
+ * Worker thread that is waiting for the result should be awaken.
+ */
+ public void responseReceivedAll()
+ {
+ remoteMonitorResponsesSemaphore.notifyAll();
+ }
}
--
Gitblit v1.10.0