From 36191b70a96c298ad07cf9a9384cc42764ea957e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 22 Apr 2009 06:22:39 +0000
Subject: [PATCH] The replication publish information about the whole topology in cn=monitor When cn=monitor is searched, the replication therefore asks informations about the replication state to all Replication Servers. This should always be fast unless a server is hanged. In such case the replication waits for 5 seconds then issue an error message and goes on with the information it has received at this time.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  162 +++++++++++++++++------------------------------------
 1 files changed, 52 insertions(+), 110 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 24888d5..ea99e4b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.Iterator;
 
@@ -65,7 +64,6 @@
 import org.opends.server.types.Attributes;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
-import org.opends.server.util.TimeThread;
 import com.sleepycat.je.DatabaseException;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -146,20 +144,12 @@
 
   /* Monitor data management */
 
-  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
-  private long monitorDataLifeTime = 500;
-
-  /* Search op on monitor data is processed by a worker thread.
-   * Requests are sent to the other RS,and responses are received by the
-   * listener threads.
-   * The worker thread is awoke on this semaphore, or on timeout.
-   */
-  Semaphore remoteMonitorResponsesSemaphore;
   /**
    * The monitor data consolidated over the topology.
    */
   private MonitorData monitorData = new MonitorData();
   private MonitorData wrkMonitorData;
+  private Object monitorDataLock = new Object();
 
   /**
    * The needed info for each received assured update message we are waiting
@@ -2255,23 +2245,30 @@
   synchronized protected MonitorData computeMonitorData()
     throws DirectoryException
   {
-    if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime())
-    {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
-      // The current data are still valid. No need to renew them.
-      return monitorData;
-    }
+    // Update the monitorData of all domains if this was necessary.
+    replicationServer.computeMonitorData();
+    return monitorData;
+  }
 
-    wrkMonitorData = new MonitorData();
-    synchronized (wrkMonitorData)
+  /**
+   * Start collecting global monitoring information for this
+   * ReplicationServerDomain.
+   *
+   * @return The number of response that should come back.
+   *
+   * @throws DirectoryException In case the monitoring information could
+   *                            not be collected.
+   */
+
+  int initializeMonitorData() throws DirectoryException
+  {
+    synchronized (monitorDataLock)
     {
+      wrkMonitorData = new MonitorData();
       if (debugEnabled())
         TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " Computing monitor data ");
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn + " Computing monitor data ");
 
       // Let's process our directly connected LSes
       // - in the ServerHandler for a given LS1, the stored state contains :
@@ -2299,7 +2296,7 @@
         wrkMonitorData.setMaxCN(serverID, maxcn);
         wrkMonitorData.setLDAPServerState(serverID, directlshState);
         wrkMonitorData.setFirstMissingDate(serverID,
-          directlsh.getApproxFirstMissingDate());
+            directlsh.getApproxFirstMissingDate());
       }
 
       // Then initialize the max CN for the LS that produced something
@@ -2319,44 +2316,35 @@
       // and we need the remote ones.
       if (debugEnabled())
         TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " Local monitor data: " +
-          wrkMonitorData.toString());
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn + " Local monitor data: " +
+            wrkMonitorData.toString());
     }
 
-    // Send Request to the other Replication Servers
-    if (remoteMonitorResponsesSemaphore == null)
-    {
-      remoteMonitorResponsesSemaphore = new Semaphore(0);
-      short requestCnt = sendMonitorDataRequest();
-      // Wait reponses from them or timeout
-      waitMonitorDataResponses(requestCnt);
-    } else
-    {
-      // The processing of renewing the monitor cache is already running
-      // We'll make it sleeping until the end
-      // TODO: unit test for this case.
-      while (remoteMonitorResponsesSemaphore != null)
-      {
-        waitMonitorDataResponses(1);
-      }
-    }
+    // Send the request for remote monitor data to the
+    return sendMonitorDataRequest();
+  }
 
+  /**
+   * Complete all the calculation when all monitoring information
+   * has been received.
+   */
+  void completeMonitorData()
+  {
     wrkMonitorData.completeComputing();
 
     // Store the new computed data as the reference
-    synchronized (monitorData)
+    synchronized (monitorDataLock)
     {
       // Now we have the expected answers or an error occurred
       monitorData = wrkMonitorData;
       wrkMonitorData = null;
       if (debugEnabled())
         TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " *** Computed MonitorData: " +
-          monitorData.toString());
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn + " *** Computed MonitorData: " +
+            monitorData.toString());
     }
-    return monitorData;
   }
 
   /**
@@ -2364,10 +2352,10 @@
    * @return the number of requests sent.
    * @throws DirectoryException when a problem occurs.
    */
-  protected short sendMonitorDataRequest()
+  protected int sendMonitorDataRequest()
     throws DirectoryException
   {
-    short sent = 0;
+    int sent = 0;
     try
     {
       for (ServerHandler rs : replicationServers.values())
@@ -2389,49 +2377,6 @@
   }
 
   /**
-   * Wait for the expected count of received MonitorMsg.
-   * @param expectedResponses The number of expected answers.
-   * @throws DirectoryException When an error occurs.
-   */
-  protected void waitMonitorDataResponses(int expectedResponses)
-    throws DirectoryException
-  {
-    try
-    {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn +
-          " waiting for " + expectedResponses + " expected monitor messages");
-
-      boolean allPermitsAcquired =
-        remoteMonitorResponsesSemaphore.tryAcquire(
-        expectedResponses,
-        (long) 5000, TimeUnit.MILLISECONDS);
-
-      if (!allPermitsAcquired)
-      {
-        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
-      // let's go on in best effort even with limited data received.
-      } else
-      {
-        if (debugEnabled())
-          TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDn=" + baseDn +
-            " Successfully received all " + expectedResponses +
-            " expected monitor messages");
-      }
-    } catch (Exception e)
-    {
-      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
-    } finally
-    {
-      remoteMonitorResponsesSemaphore = null;
-    }
-  }
-
-  /**
    * Processes a Monitor message receives from a remote Replication Server
    * and stores the data received.
    *
@@ -2442,23 +2387,20 @@
     if (debugEnabled())
       TRACER.debugInfo(
         "In " + this.replicationServer.getMonitorInstanceName() +
-        "Receiving " + msg + " from " + msg.getsenderID() +
-        remoteMonitorResponsesSemaphore);
-
-    if (remoteMonitorResponsesSemaphore == null)
-    {
-      // Let's ignore the remote monitor data just received
-      // since the computing processing has been ended.
-      // An error - probably a timemout - occurred that was already logged
-      logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
-        Short.toString(msg.getsenderID())));
-      return;
-    }
+        "Receiving " + msg + " from " + msg.getsenderID());
 
     try
     {
-      synchronized (wrkMonitorData)
+      synchronized (monitorDataLock)
       {
+        if (wrkMonitorData == null)
+        {
+          // This is a response for an earlier request whose computing is
+          // already complete.
+          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
+                      Short.toString(msg.getsenderID())));
+          return;
+        }
         // Here is the RS state : list <serverID, lastChangeNumber>
         // For each LDAP Server, we keep the max CN across the RSes
         ServerState replServerState = msg.getReplServerDbState();
@@ -2523,7 +2465,7 @@
 
       // Decreases the number of expected responses and potentially
       // wakes up the waiting requestor thread.
-      remoteMonitorResponsesSemaphore.release();
+      replicationServer.responseReceived();
 
     } catch (Exception e)
     {
@@ -2532,7 +2474,7 @@
 
       // If an exception occurs while processing one of the expected message,
       // the processing is aborted and the waiting thread is awoke.
-      remoteMonitorResponsesSemaphore.notifyAll();
+      replicationServer.responseReceivedAll();
     }
   }
 

--
Gitblit v1.10.0