From 36191b70a96c298ad07cf9a9384cc42764ea957e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 22 Apr 2009 06:22:39 +0000
Subject: [PATCH] The replication publish information about the whole topology in cn=monitor When cn=monitor is searched, the replication therefore asks informations about the replication state to all Replication Servers. This should always be fast unless a server is hanged. In such case the replication waits for 5 seconds then issue an error message and goes on with the information it has received at this time.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                              |  115 +++++++++++++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java |    2 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                                  |   32 ++++-
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                        |  162 ++++++++------------------
 opends/src/server/org/opends/server/replication/server/MonitorData.java                                    |   31 ----
 5 files changed, 196 insertions(+), 146 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/MonitorData.java b/opends/src/server/org/opends/server/replication/server/MonitorData.java
index 0c96b2e..59f5e09 100644
--- a/opends/src/server/org/opends/server/replication/server/MonitorData.java
+++ b/opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -62,8 +62,6 @@
    *   date of the first missing change.
    */
 
-  /* The date of the last time they have been elaborated */
-  private long buildDate = 0;
 
   // For each LDAP server, its server state
   private ConcurrentHashMap<Short, ServerState> LDAPStates =
@@ -103,7 +101,7 @@
   {
     Long afmd = fmd.get(serverId);
     if ((afmd != null) && (afmd>0))
-      return ((this.getBuildDate() - afmd)/1000);
+      return (TimeThread.getTime() - afmd)/1000;
     else
       return 0;
   }
@@ -243,7 +241,6 @@
         TRACER.debugInfo(
           "Complete monitor data : Missing changes ("+ lsiSid +")=" + mds);
     }
-    this.setBuildDate(TimeThread.getTime());
     }
 
   /**
@@ -255,7 +252,6 @@
   {
     String mds = "Monitor data=\n";
 
-    mds+= "Build date=" + this.getBuildDate();
     // RS data
     Iterator<Short> rsite = fmRSDate.keySet().iterator();
     while (rsite.hasNext())
@@ -281,10 +277,9 @@
       ServerState ss = LDAPStates.get(sid);
       mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString()
       + "] afmd=" + this.getApproxFirstMissingDate(sid);
-      if (getBuildDate()>0)
-      {
-        mds += " missingDelay=" + this.getApproxDelay(sid);
-      }
+
+      mds += " missingDelay=" + this.getApproxDelay(sid);
+
       mds +=" missingCount=" + missingChanges.get(sid);
     }
 
@@ -304,24 +299,6 @@
   }
 
   /**
-   * Sets the build date of the data.
-   * @param buildDate The date.
-   */
-  public void setBuildDate(long buildDate)
-  {
-    this.buildDate = buildDate;
-  }
-
-  /**
-   * Returns the build date of the data.
-   * @return The date.
-   */
-  public long getBuildDate()
-  {
-    return buildDate;
-  }
-
-  /**
    * From a provided state, sets the max CN of the monitor data.
    * @param state the provided state.
    */
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index cd75b10..fb991a8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,8 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -65,12 +67,14 @@
 import org.opends.server.types.BackupConfig;
 import org.opends.server.types.ConfigChangeResult;
 import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.LDIFExportConfig;
 import org.opends.server.types.LDIFImportConfig;
 import org.opends.server.types.RestoreConfig;
 import org.opends.server.types.ResultCode;
 import org.opends.server.util.LDIFReader;
+import org.opends.server.util.TimeThread;
 
 import com.sleepycat.je.DatabaseException;
 
@@ -1094,4 +1098,115 @@
   {
     return replicationPort;
   }
+
+  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+  private long monitorDataLifeTime = 500;
+
+  /* The date of the last time they have been elaborated */
+  private long monitorDataLastBuildDate = 0;
+
+  /* Search op on monitor data is processed by a worker thread.
+   * Requests are sent to the other RS,and responses are received by the
+   * listener threads.
+   * The worker thread is awoke on this semaphore, or on timeout.
+   */
+  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
+
+  /**
+   * Trigger the computation of the Global Monitoring Data.
+   * This should be called by all the MonitorProviders that need
+   * the global monitoring data to be updated before they can
+   * publish their information to cn=monitor.
+   *
+   * This method will trigger the update of all the global monitoring
+   * information of all the base-DNs of this replication Server.
+   *
+   * @throws DirectoryException If the computation cannot be achieved.
+   */
+  public void computeMonitorData() throws DirectoryException
+  {
+    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
+      // The current data are still valid. No need to renew them.
+      return;
+    }
+
+    remoteMonitorResponsesSemaphore.drainPermits();
+    int count = 0;
+    for (ReplicationServerDomain domain : baseDNs.values())
+    {
+      count += domain.initializeMonitorData();
+    }
+
+    // Wait for responses
+    waitMonitorDataResponses(count);
+
+    for (ReplicationServerDomain domain : baseDNs.values())
+    {
+      domain.completeMonitorData();
+    }
+  }
+
+  /**
+   * Wait for the expected count of received MonitorMsg.
+   * @param expectedResponses The number of expected answers.
+   * @throws DirectoryException When an error occurs.
+   */
+  private void waitMonitorDataResponses(int expectedResponses)
+    throws DirectoryException
+  {
+    try
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + getMonitorInstanceName() + " baseDn=" +
+          " waiting for " + expectedResponses + " expected monitor messages");
+
+      boolean allPermitsAcquired =
+        remoteMonitorResponsesSemaphore.tryAcquire(
+        expectedResponses,
+        (long) 5000, TimeUnit.MILLISECONDS);
+
+      if (!allPermitsAcquired)
+      {
+        monitorDataLastBuildDate = TimeThread.getTime();
+        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+      // let's go on in best effort even with limited data received.
+      } else
+      {
+        monitorDataLastBuildDate = TimeThread.getTime();
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "In " + getMonitorInstanceName() + " baseDn=" +
+            " Successfully received all " + expectedResponses +
+            " expected monitor messages");
+      }
+    } catch (Exception e)
+    {
+      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+    }
+  }
+
+
+  /**
+   * This should be called by each ReplicationServerDomain that receives
+   * a response to a monitor request message.
+   */
+  public void responseReceived()
+  {
+    remoteMonitorResponsesSemaphore.release();
+  }
+
+
+  /**
+   * This should be called when the Monitoring has failed and the
+   * Worker thread that is waiting for the result should be awaken.
+   */
+  public void responseReceivedAll()
+  {
+    remoteMonitorResponsesSemaphore.notifyAll();
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 24888d5..ea99e4b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.Iterator;
 
@@ -65,7 +64,6 @@
 import org.opends.server.types.Attributes;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
-import org.opends.server.util.TimeThread;
 import com.sleepycat.je.DatabaseException;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -146,20 +144,12 @@
 
   /* Monitor data management */
 
-  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
-  private long monitorDataLifeTime = 500;
-
-  /* Search op on monitor data is processed by a worker thread.
-   * Requests are sent to the other RS,and responses are received by the
-   * listener threads.
-   * The worker thread is awoke on this semaphore, or on timeout.
-   */
-  Semaphore remoteMonitorResponsesSemaphore;
   /**
    * The monitor data consolidated over the topology.
    */
   private MonitorData monitorData = new MonitorData();
   private MonitorData wrkMonitorData;
+  private Object monitorDataLock = new Object();
 
   /**
    * The needed info for each received assured update message we are waiting
@@ -2255,23 +2245,30 @@
   synchronized protected MonitorData computeMonitorData()
     throws DirectoryException
   {
-    if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime())
-    {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
-      // The current data are still valid. No need to renew them.
-      return monitorData;
-    }
+    // Update the monitorData of all domains if this was necessary.
+    replicationServer.computeMonitorData();
+    return monitorData;
+  }
 
-    wrkMonitorData = new MonitorData();
-    synchronized (wrkMonitorData)
+  /**
+   * Start collecting global monitoring information for this
+   * ReplicationServerDomain.
+   *
+   * @return The number of response that should come back.
+   *
+   * @throws DirectoryException In case the monitoring information could
+   *                            not be collected.
+   */
+
+  int initializeMonitorData() throws DirectoryException
+  {
+    synchronized (monitorDataLock)
     {
+      wrkMonitorData = new MonitorData();
       if (debugEnabled())
         TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " Computing monitor data ");
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn + " Computing monitor data ");
 
       // Let's process our directly connected LSes
       // - in the ServerHandler for a given LS1, the stored state contains :
@@ -2299,7 +2296,7 @@
         wrkMonitorData.setMaxCN(serverID, maxcn);
         wrkMonitorData.setLDAPServerState(serverID, directlshState);
         wrkMonitorData.setFirstMissingDate(serverID,
-          directlsh.getApproxFirstMissingDate());
+            directlsh.getApproxFirstMissingDate());
       }
 
       // Then initialize the max CN for the LS that produced something
@@ -2319,44 +2316,35 @@
       // and we need the remote ones.
       if (debugEnabled())
         TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " Local monitor data: " +
-          wrkMonitorData.toString());
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn + " Local monitor data: " +
+            wrkMonitorData.toString());
     }
 
-    // Send Request to the other Replication Servers
-    if (remoteMonitorResponsesSemaphore == null)
-    {
-      remoteMonitorResponsesSemaphore = new Semaphore(0);
-      short requestCnt = sendMonitorDataRequest();
-      // Wait reponses from them or timeout
-      waitMonitorDataResponses(requestCnt);
-    } else
-    {
-      // The processing of renewing the monitor cache is already running
-      // We'll make it sleeping until the end
-      // TODO: unit test for this case.
-      while (remoteMonitorResponsesSemaphore != null)
-      {
-        waitMonitorDataResponses(1);
-      }
-    }
+    // Send the request for remote monitor data to the
+    return sendMonitorDataRequest();
+  }
 
+  /**
+   * Complete all the calculation when all monitoring information
+   * has been received.
+   */
+  void completeMonitorData()
+  {
     wrkMonitorData.completeComputing();
 
     // Store the new computed data as the reference
-    synchronized (monitorData)
+    synchronized (monitorDataLock)
     {
       // Now we have the expected answers or an error occurred
       monitorData = wrkMonitorData;
       wrkMonitorData = null;
       if (debugEnabled())
         TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn + " *** Computed MonitorData: " +
-          monitorData.toString());
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn + " *** Computed MonitorData: " +
+            monitorData.toString());
     }
-    return monitorData;
   }
 
   /**
@@ -2364,10 +2352,10 @@
    * @return the number of requests sent.
    * @throws DirectoryException when a problem occurs.
    */
-  protected short sendMonitorDataRequest()
+  protected int sendMonitorDataRequest()
     throws DirectoryException
   {
-    short sent = 0;
+    int sent = 0;
     try
     {
       for (ServerHandler rs : replicationServers.values())
@@ -2389,49 +2377,6 @@
   }
 
   /**
-   * Wait for the expected count of received MonitorMsg.
-   * @param expectedResponses The number of expected answers.
-   * @throws DirectoryException When an error occurs.
-   */
-  protected void waitMonitorDataResponses(int expectedResponses)
-    throws DirectoryException
-  {
-    try
-    {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDn=" + baseDn +
-          " waiting for " + expectedResponses + " expected monitor messages");
-
-      boolean allPermitsAcquired =
-        remoteMonitorResponsesSemaphore.tryAcquire(
-        expectedResponses,
-        (long) 5000, TimeUnit.MILLISECONDS);
-
-      if (!allPermitsAcquired)
-      {
-        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
-      // let's go on in best effort even with limited data received.
-      } else
-      {
-        if (debugEnabled())
-          TRACER.debugInfo(
-            "In " + this.replicationServer.getMonitorInstanceName() +
-            " baseDn=" + baseDn +
-            " Successfully received all " + expectedResponses +
-            " expected monitor messages");
-      }
-    } catch (Exception e)
-    {
-      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
-    } finally
-    {
-      remoteMonitorResponsesSemaphore = null;
-    }
-  }
-
-  /**
    * Processes a Monitor message receives from a remote Replication Server
    * and stores the data received.
    *
@@ -2442,23 +2387,20 @@
     if (debugEnabled())
       TRACER.debugInfo(
         "In " + this.replicationServer.getMonitorInstanceName() +
-        "Receiving " + msg + " from " + msg.getsenderID() +
-        remoteMonitorResponsesSemaphore);
-
-    if (remoteMonitorResponsesSemaphore == null)
-    {
-      // Let's ignore the remote monitor data just received
-      // since the computing processing has been ended.
-      // An error - probably a timemout - occurred that was already logged
-      logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
-        Short.toString(msg.getsenderID())));
-      return;
-    }
+        "Receiving " + msg + " from " + msg.getsenderID());
 
     try
     {
-      synchronized (wrkMonitorData)
+      synchronized (monitorDataLock)
       {
+        if (wrkMonitorData == null)
+        {
+          // This is a response for an earlier request whose computing is
+          // already complete.
+          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
+                      Short.toString(msg.getsenderID())));
+          return;
+        }
         // Here is the RS state : list <serverID, lastChangeNumber>
         // For each LDAP Server, we keep the max CN across the RSes
         ServerState replServerState = msg.getReplServerDbState();
@@ -2523,7 +2465,7 @@
 
       // Decreases the number of expected responses and potentially
       // wakes up the waiting requestor thread.
-      remoteMonitorResponsesSemaphore.release();
+      replicationServer.responseReceived();
 
     } catch (Exception e)
     {
@@ -2532,7 +2474,7 @@
 
       // If an exception occurs while processing one of the expected message,
       // the processing is aborted and the waiting thread is awoke.
-      remoteMonitorResponsesSemaphore.notifyAll();
+      replicationServer.responseReceivedAll();
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 081cba3..b6c4e3d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -2064,6 +2064,18 @@
         long delay = md.getApproxDelay(serverId);
         attributes.add(Attributes.create("approximate-delay", String
             .valueOf(delay)));
+
+        /* get the Server State */
+        AttributeBuilder builder = new AttributeBuilder("server-state");
+        ServerState state = md.getLDAPServerState(serverId);
+        if (state != null)
+        {
+          for (String str : state.toStringSet())
+          {
+            builder.add(str);
+          }
+          attributes.add(builder.toAttribute());
+        }
       }
       else
       {
@@ -2071,6 +2083,18 @@
         long missingChanges = md.getMissingChangesRS(serverId);
         attributes.add(Attributes.create("missing-changes", String
             .valueOf(missingChanges)));
+
+        /* get the Server State */
+        AttributeBuilder builder = new AttributeBuilder("server-state");
+        ServerState state = md.getRSStates(serverId);
+        if (state != null)
+        {
+          for (String str : state.toStringSet())
+          {
+            builder.add(str);
+          }
+          attributes.add(builder.toAttribute());
+        }
       }
     }
     catch (Exception e)
@@ -2131,14 +2155,6 @@
     attributes.add(Attributes.create("current-rcv-window", String
         .valueOf(rcvWindow)));
 
-    /* get the Server State */
-    AttributeBuilder builder = new AttributeBuilder("server-state");
-    for (String str : serverState.toStringSet())
-    {
-      builder.add(str);
-    }
-    attributes.add(builder.toAttribute());
-
     // Encryption
     attributes.add(Attributes.create("ssl-encryption", String
         .valueOf(session.isEncrypted())));
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index e668820..641b14c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -175,7 +175,7 @@
       ServerState state2 = states1.get(domain2ServerId);
       assertNotNull(state2, "getReplicaStates is not showing DS2");
 
-      Map<Short, ServerState> states2 = domain1.getReplicaStates();
+      Map<Short, ServerState> states2 = domain2.getReplicaStates();
       ServerState state1 = states2.get(domain1ServerId);
       assertNotNull(state1, "getReplicaStates is not showing DS1");
 

--
Gitblit v1.10.0