From 40e2acfd1e9676f3b63385b15075bf1395d4543e Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 01 Feb 2008 13:21:19 +0000
Subject: [PATCH] Fix 2598 - fixes for global replication monitoring

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  400 +++++++++++++++++++++++++++++---------------------------
 1 files changed, 209 insertions(+), 191 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 89726d4..77e5d5c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,7 +37,6 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -129,8 +128,8 @@
 
   /* Monitor data management */
 
-  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
-  private long remoteMonitorDataLifeTime = 500;
+  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+  private long monitorDataLifeTime = 500;
 
   /* Search op on monitor data is processed by a worker thread.
    * Requests are sent to the other RS,and responses are received by the
@@ -139,21 +138,11 @@
    */
   Semaphore remoteMonitorResponsesSemaphore;
 
-  /* The date of the last time they have been elaborated */
-  private long validityDate = 0;
-
-  // For each LDAP server, its server state
-  private HashMap<Short, ServerState> LDAPStates =
-    new HashMap<Short, ServerState>();
-
-  // For each LDAP server, the last CN it published
-  private HashMap<Short, ChangeNumber> maxCNs =
-    new HashMap<Short, ChangeNumber>();
-
-  // For each LDAP server, an approximation of the date of the first missing
-  // change
-  private HashMap<Short, Long> approxFirstMissingDate =
-    new HashMap<Short, Long>();
+  /**
+   * The monitor data consolidated over the topology.
+   */
+  private  MonitorData monitorData = new MonitorData();
+  private  MonitorData wrkMonitorData;
 
   /**
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -166,13 +155,7 @@
   {
     this.baseDn = baseDn;
     this.replicationServer = replicationServer;
-
-    if (debugEnabled())
-      TRACER.debugInfo(
-        "In " + this.replicationServer.getMonitorInstanceName() +
-        " Created Cache for " + baseDn + " " +
-        stackTraceToSingleLineString(new Exception()));
-}
+  }
 
   /**
    * Add an update that has been received to the list of
@@ -366,6 +349,10 @@
         {
           replicationServers.remove(handler.getServerId());
           handler.stopHandler();
+
+          // Update the remote replication servers with our list
+          // of connected LDAP servers
+          sendReplServerInfo();
         }
       }
       else
@@ -374,12 +361,12 @@
         {
           connectedServers.remove(handler.getServerId());
           handler.stopHandler();
+
+          // Update the remote replication servers with our list
+          // of connected LDAP servers
+          sendReplServerInfo();
         }
       }
-
-      // Update the remote replication servers with our list
-      // of connected LDAP servers
-      sendReplServerInfo();
   }
 
   /**
@@ -578,7 +565,8 @@
    *
    * @param serverId Identifier of the server for which the iterator is created.
    * @param changeNumber Starting point for the iterator.
-   * @return the created ReplicationIterator.
+   * @return the created ReplicationIterator. Null when no DB is available
+   * for the provided server Id.
    */
   public ReplicationIterator getChangelogIterator(short serverId,
                     ChangeNumber changeNumber)
@@ -591,7 +579,8 @@
     {
       return handler.generateIterator(changeNumber);
     }
-    catch (Exception e) {
+    catch (Exception e)
+    {
      return null;
     }
   }
@@ -759,6 +748,7 @@
    */
   public void process(RoutableMessage msg, ServerHandler senderHandler)
   {
+
     // Test the message for which a ReplicationServer is expected
     // to be the destination
     if (msg.getDestination() == this.replicationServer.getServerId())
@@ -779,20 +769,33 @@
               replServerMonitorRequestMsg.getDestination(),
               replServerMonitorRequestMsg.getsenderID());
 
-        // Populate the RS state in the msg from the DbState
-        monitorMsg.setReplServerState(this.getDbServerState());
-
         // Populate for each connected LDAP Server
         // from the states stored in the serverHandler.
         // - the server state
         // - the older missing change
         for (ServerHandler lsh : this.connectedServers.values())
         {
-          monitorMsg.setLDAPServerState(
+          monitorMsg.setServerState(
               lsh.getServerId(),
               lsh.getServerState(),
-              lsh.getApproxFirstMissingDate());
+              lsh.getApproxFirstMissingDate(),
+              true);
         }
+
+        // Same for the connected RS
+        for (ServerHandler rsh : this.replicationServers.values())
+        {
+          monitorMsg.setServerState(
+              rsh.getServerId(),
+              rsh.getServerState(),
+              rsh.getApproxFirstMissingDate(),
+              false);
+        }
+
+        // Populate the RS state in the msg from the DbState
+        monitorMsg.setReplServerDbState(this.getDbServerState());
+
+
         try
         {
           senderHandler.send(monitorMsg);
@@ -1305,118 +1308,135 @@
       }
     }
 
-    /*
+    /* =======================
      * Monitor Data generation
+     * =======================
      */
 
     /**
-     * Retrieves the remote monitor data.
-     *
+     * Retrieves the global monitor data.
+     * @return The monitor data.
      * @throws DirectoryException When an error occurs.
      */
-    protected void retrievesRemoteMonitorData()
+    synchronized protected MonitorData getMonitorData()
       throws DirectoryException
     {
-      if (validityDate > TimeThread.getTime())
+      if (monitorData.getBuildDate() + monitorDataLifeTime
+          > TimeThread.getTime())
       {
-        // The current data are still valid. No need to renew them.
-        return;
+        if (debugEnabled())
+          TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
+       // The current data are still valid. No need to renew them.
+        // FIXME
+        return null;
       }
 
-      // Clean
-      this.LDAPStates.clear();
-      this.maxCNs.clear();
-
-      // Init the maxCNs of our direct LDAP servers from our own dbstate
-      for (ServerHandler rs : connectedServers.values())
+      wrkMonitorData = new MonitorData();
+      synchronized(wrkMonitorData)
       {
-        short serverID = rs.getServerId();
-        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
-        if (cn == null)
-        {
-          // we have nothing in db for that server
-          cn = new ChangeNumber(0, 0 , serverID);
-        }
-        this.maxCNs.put(serverID, cn);
-      }
+        if (debugEnabled())
+          TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn + " Computing monitor data ");
 
-      ServerState replServerState = this.getDbServerState();
-      Iterator<Short> it = replServerState.iterator();
-      while (it.hasNext())
-      {
-        short sid = it.next();
-        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
-        ChangeNumber maxCN = this.maxCNs.get(sid);
-        if ((maxCN != null) && (receivedCN.newer(maxCN)))
+        // Let's process our directly connected LSes
+        // - in the ServerHandler for a given LS1, the stored state contains :
+        //   - the max CN produced by LS1
+        //   - the last CN consumed by LS1 from LS2..n
+        // - in the RSdomain/dbHandler, the built-in state contains :
+        //   - the max CN produced by each server
+        // So for a given LS connected we can take the state and the max from
+        // the LS/state.
+
+        for (ServerHandler directlsh : connectedServers.values())
         {
-          // We found a newer one
-          this.maxCNs.remove(sid);
-          this.maxCNs.put(sid, receivedCN);
+          short serverID = directlsh.getServerId();
+
+          // the state comes from the state stored in the SH
+          ServerState directlshState = directlsh.getServerState().duplicate();
+
+          // the max CN sent by that LS also comes from the SH
+          ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
+          if (maxcn == null)
+          {
+            // This directly connected LS has never produced any change
+            maxcn = new ChangeNumber(0, 0 , serverID);
+          }
+          wrkMonitorData.setMaxCN(serverID, maxcn);
+          wrkMonitorData.setLDAPServerState(serverID, directlshState);
+          wrkMonitorData.setFirstMissingDate(serverID, directlsh.
+                                             getApproxFirstMissingDate());
         }
+
+        // Then initialize the max CN for the LS that produced something
+        // - from our own local db state
+        // - whatever they are directly or undirectly connected
+        ServerState dbServerState = getDbServerState();
+        Iterator<Short> it = dbServerState.iterator();
+        while (it.hasNext())
+        {
+          short sid = it.next();
+          ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
+          wrkMonitorData.setMaxCN(sid, storedCN);
+        }
+
+        // Now we have used all available local informations
+        // and we need the remote ones.
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn + " Local monitor data: " +
+            wrkMonitorData.toString());
       }
 
       // Send Request to the other Replication Servers
       if (remoteMonitorResponsesSemaphore == null)
       {
-        remoteMonitorResponsesSemaphore = new Semaphore(
-            replicationServers.size() -1);
-
-        sendMonitorDataRequest();
-
+        remoteMonitorResponsesSemaphore = new Semaphore(0);
+        short requestCnt = sendMonitorDataRequest();
         // Wait reponses from them or timeout
-        waitMonitorDataResponses(replicationServers.size());
+        waitMonitorDataResponses(requestCnt);
       }
       else
       {
         // The processing of renewing the monitor cache is already running
         // We'll make it sleeping until the end
+        // TODO: unit test for this case.
         while (remoteMonitorResponsesSemaphore!=null)
         {
           waitMonitorDataResponses(1);
         }
       }
 
-      // Now we have the expected answers of an error occured
-      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
+      wrkMonitorData.completeComputing();
 
-      if (debugEnabled())
+      // Store the new computed data as the reference
+      synchronized(monitorData)
       {
-        debugMonitorData();
-      }
-    }
-
-    private void debugMonitorData()
-    {
-      String mds = " Monitor data=";
-      Iterator<Short> ite = LDAPStates.keySet().iterator();
-      while (ite.hasNext())
-      {
-        Short sid = ite.next();
-        ServerState ss = LDAPStates.get(sid);
-        mds += " LDAPState(" + sid + ")=" + ss.toString();
-      }
-      Iterator<Short> itc = maxCNs.keySet().iterator();
-      while (itc.hasNext())
-      {
-        Short sid = itc.next();
-        ChangeNumber cn = maxCNs.get(sid);
-        mds += " maxCNs(" + sid + ")=" + cn.toString();
-      }
-
-      mds += "--";
-      TRACER.debugInfo(
+        // Now we have the expected answers or an error occured
+        monitorData = wrkMonitorData;
+        wrkMonitorData = null;
+        if (debugEnabled())
+          TRACER.debugInfo(
           "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDN=" + baseDn +
-          mds);
+          " baseDn=" + baseDn + " *** Computed MonitorData: " +
+          monitorData.toString());
+      }
+      return monitorData;
     }
 
+
     /**
      * Sends a MonitorRequest message to all connected RS.
+     * @return the number of requests sent.
      * @throws DirectoryException when a problem occurs.
      */
-    protected void sendMonitorDataRequest()
+    protected short sendMonitorDataRequest()
       throws DirectoryException
     {
+      short sent=0;
       try
       {
         for (ServerHandler rs : replicationServers.values())
@@ -1425,6 +1445,7 @@
             MonitorRequestMessage(this.replicationServer.getServerId(),
               rs.getServerId());
           rs.send(msg);
+          sent++;
         }
       }
       catch(Exception e)
@@ -1434,6 +1455,7 @@
         throw new DirectoryException(ResultCode.OTHER,
             message, e);
       }
+      return sent;
     }
 
     /**
@@ -1446,21 +1468,30 @@
     {
       try
       {
+        if (debugEnabled())
+          TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn +
+          " waiting for " + expectedResponses
+          + " expected monitor messages");
+
         boolean allPermitsAcquired =
           remoteMonitorResponsesSemaphore.tryAcquire(
               expectedResponses,
-              (long) 500, TimeUnit.MILLISECONDS);
+              (long) 5000, TimeUnit.MILLISECONDS);
 
         if (!allPermitsAcquired)
         {
           logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+          // FIXME let's go on in best effort even with limited data received.
         }
         else
         {
           if (debugEnabled())
             TRACER.debugInfo(
             "In " + this.replicationServer.getMonitorInstanceName() +
-            "Successfully received all " + replicationServers.size()
+            " baseDn=" + baseDn +
+            " Successfully received all " + expectedResponses
             + " expected monitor messages");
         }
       }
@@ -1482,48 +1513,94 @@
      */
     public void receivesMonitorDataResponse(MonitorMessage msg)
     {
+      if (debugEnabled())
+        TRACER.debugInfo(
+        "In " + this.replicationServer.getMonitorInstanceName() +
+        "Receiving " + msg + " from " + msg.getsenderID() +
+        remoteMonitorResponsesSemaphore);
+
       if (remoteMonitorResponsesSemaphore == null)
       {
-        // Ignoring the remote monitor data because an error occured previously
+        // FIXME
+        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            "Receiving " + msg + " from " + msg.getsenderID() +
+            " remoteMonitorResponsesSemaphore should not be null"));
+        // Ignoring the remote monitor data because an error occured
+        // previously
         return;
       }
 
       try
       {
-        // Here is the RS state : list <serverID, lastChangeNumber>
-        // For each LDAP Server, we keep the max CN accross the RSes
-        ServerState replServerState = msg.getReplServerState();
-        Iterator<Short> it = replServerState.iterator();
-        while (it.hasNext())
+        synchronized(wrkMonitorData)
         {
-          short sid = it.next();
-          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
-          ChangeNumber maxCN = this.maxCNs.get(sid);
-          if (receivedCN.newer(maxCN))
-          {
-            // We found a newer one
-            this.maxCNs.remove(sid);
-            this.maxCNs.put(sid, receivedCN);
-          }
-        }
+          // Here is the RS state : list <serverID, lastChangeNumber>
+          // For each LDAP Server, we keep the max CN accross the RSes
+          ServerState replServerState = msg.getReplServerDbState();
+          wrkMonitorData.setMaxCNs(replServerState);
 
-        // Store the LDAP servers states
-        Iterator<Short> sidIterator = msg.iterator();
-        while (sidIterator.hasNext())
-        {
-          short sid = sidIterator.next();
-          ServerState ss = msg.getLDAPServerState(sid);
-          this.LDAPStates.put(sid, ss);
-          this.approxFirstMissingDate.put(sid,
-              msg.getApproxFirstMissingDate(sid));
+          // Store the remote LDAP servers states
+          Iterator<Short> lsidIterator = msg.ldapIterator();
+          while (lsidIterator.hasNext())
+          {
+            short sid = lsidIterator.next();
+            wrkMonitorData.setLDAPServerState(sid,
+                msg.getLDAPServerState(sid).duplicate());
+            wrkMonitorData.setFirstMissingDate(sid,
+                msg.getLDAPApproxFirstMissingDate(sid));
+          }
+
+          // Process the latency reported by the remote RSi on its connections
+          // to the other RSes
+          Iterator<Short> rsidIterator = msg.rsIterator();
+          while (rsidIterator.hasNext())
+          {
+            short rsid = rsidIterator.next();
+            if (rsid == replicationServer.getServerId())
+            {
+              // this is the latency of the remote RSi regarding the current RS
+              // let's update the fmd of my connected LS
+              for (ServerHandler connectedlsh : connectedServers.values())
+              {
+                short connectedlsid = connectedlsh.getServerId();
+                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+                wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+              }
+            }
+            else
+            {
+              // this is the latency of the remote RSi regarding another RSj
+              // let's update the latency of the LSes connected to RSj
+              ServerHandler rsjHdr = replicationServers.get(rsid);
+              for(short remotelsid : rsjHdr.getConnectedServerIds())
+              {
+                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+                wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
+              }
+            }
+          }
+          if (debugEnabled())
+          {
+            if (debugEnabled())
+              TRACER.debugInfo(
+              "In " + this.replicationServer.getMonitorInstanceName() +
+              " baseDn=" + baseDn +
+              " Processed msg from " + msg.getsenderID() +
+              " New monitor data: " + wrkMonitorData.toString());
+          }
         }
 
         // Decreases the number of expected responses and potentially
         // wakes up the waiting requestor thread.
         remoteMonitorResponsesSemaphore.release();
+
       }
       catch (Exception e)
       {
+        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
+            stackTraceToSingleLineString(e)));
+
         // If an exception occurs while processing one of the expected message,
         // the processing is aborted and the waiting thread is awoke.
         remoteMonitorResponsesSemaphore.notifyAll();
@@ -1531,65 +1608,6 @@
     }
 
     /**
-     * Get the state of the LDAP server with the provided serverId.
-     * @param serverId The server ID.
-     * @return The server state.
-     */
-    public ServerState getServerState(short serverId)
-    {
-      return LDAPStates.get(serverId);
-    }
-
-    /**
-     * Get the highest know change number of the LDAP server with the provided
-     * serverId.
-     * @param serverId The server ID.
-     * @return The highest change number.
-     */
-    public ChangeNumber getMaxCN(short serverId)
-    {
-      return maxCNs.get(serverId);
-    }
-
-    /**
-     * Get an approximation of the date of the oldest missing changes.
-     * serverId.
-     * @param serverId The server ID.
-     * @return The approximation of the date of the oldest missing change.
-     */
-    public Long getApproxFirstMissingDate(short serverId)
-    {
-      return approxFirstMissingDate.get(serverId);
-    }
-
-    /**
-     * Get the number of missing change for the server with the provided state.
-     * @param state The provided server state.
-     * @return The number of missing changes.
-     */
-    public int getMissingChanges(ServerState state)
-    {
-      // Traverse the max Cn transmitted by each server
-      // For each server, get the highest CN know from the current server
-      // Sum the difference betwenn the max and the last
-      int missingChanges = 0;
-      Iterator<Short> itc = maxCNs.keySet().iterator();
-      while (itc.hasNext())
-      {
-        Short sid = itc.next();
-        ChangeNumber maxCN = maxCNs.get(sid);
-        ChangeNumber last = state.getMaxChangeNumber(sid);
-        if (last == null)
-        {
-          last = new ChangeNumber(0,0, sid);
-        }
-        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
-        missingChanges += missingChangesFromSID;
-      }
-      return missingChanges;
-    }
-
-    /**
      * Set the purge delay on all the db Handlers for this Domain
      * of Replicaiton.
      *

--
Gitblit v1.10.0