From 36191b70a96c298ad07cf9a9384cc42764ea957e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 22 Apr 2009 06:22:39 +0000
Subject: [PATCH] The replication publish information about the whole topology in cn=monitor When cn=monitor is searched, the replication therefore asks informations about the replication state to all Replication Servers. This should always be fast unless a server is hanged. In such case the replication waits for 5 seconds then issue an error message and goes on with the information it has received at this time.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 115 insertions(+), 0 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index cd75b10..fb991a8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,8 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -65,12 +67,14 @@
 import org.opends.server.types.BackupConfig;
 import org.opends.server.types.ConfigChangeResult;
 import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.LDIFExportConfig;
 import org.opends.server.types.LDIFImportConfig;
 import org.opends.server.types.RestoreConfig;
 import org.opends.server.types.ResultCode;
 import org.opends.server.util.LDIFReader;
+import org.opends.server.util.TimeThread;
 
 import com.sleepycat.je.DatabaseException;
 
@@ -1094,4 +1098,115 @@
   {
     return replicationPort;
   }
+
+  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+  private long monitorDataLifeTime = 500;
+
+  /* The date of the last time they have been elaborated */
+  private long monitorDataLastBuildDate = 0;
+
+  /* Search op on monitor data is processed by a worker thread.
+   * Requests are sent to the other RS,and responses are received by the
+   * listener threads.
+   * The worker thread is awoke on this semaphore, or on timeout.
+   */
+  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
+
+  /**
+   * Trigger the computation of the Global Monitoring Data.
+   * This should be called by all the MonitorProviders that need
+   * the global monitoring data to be updated before they can
+   * publish their information to cn=monitor.
+   *
+   * This method will trigger the update of all the global monitoring
+   * information of all the base-DNs of this replication Server.
+   *
+   * @throws DirectoryException If the computation cannot be achieved.
+   */
+  public void computeMonitorData() throws DirectoryException
+  {
+    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
+      // The current data are still valid. No need to renew them.
+      return;
+    }
+
+    remoteMonitorResponsesSemaphore.drainPermits();
+    int count = 0;
+    for (ReplicationServerDomain domain : baseDNs.values())
+    {
+      count += domain.initializeMonitorData();
+    }
+
+    // Wait for responses
+    waitMonitorDataResponses(count);
+
+    for (ReplicationServerDomain domain : baseDNs.values())
+    {
+      domain.completeMonitorData();
+    }
+  }
+
+  /**
+   * Wait for the expected count of received MonitorMsg.
+   * @param expectedResponses The number of expected answers.
+   * @throws DirectoryException When an error occurs.
+   */
+  private void waitMonitorDataResponses(int expectedResponses)
+    throws DirectoryException
+  {
+    try
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + getMonitorInstanceName() + " baseDn=" +
+          " waiting for " + expectedResponses + " expected monitor messages");
+
+      boolean allPermitsAcquired =
+        remoteMonitorResponsesSemaphore.tryAcquire(
+        expectedResponses,
+        (long) 5000, TimeUnit.MILLISECONDS);
+
+      if (!allPermitsAcquired)
+      {
+        monitorDataLastBuildDate = TimeThread.getTime();
+        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+      // let's go on in best effort even with limited data received.
+      } else
+      {
+        monitorDataLastBuildDate = TimeThread.getTime();
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "In " + getMonitorInstanceName() + " baseDn=" +
+            " Successfully received all " + expectedResponses +
+            " expected monitor messages");
+      }
+    } catch (Exception e)
+    {
+      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+    }
+  }
+
+
+  /**
+   * This should be called by each ReplicationServerDomain that receives
+   * a response to a monitor request message.
+   */
+  public void responseReceived()
+  {
+    remoteMonitorResponsesSemaphore.release();
+  }
+
+
+  /**
+   * This should be called when the Monitoring has failed and the
+   * Worker thread that is waiting for the result should be awaken.
+   */
+  public void responseReceivedAll()
+  {
+    remoteMonitorResponsesSemaphore.notifyAll();
+  }
 }

--
Gitblit v1.10.0