From 40e2acfd1e9676f3b63385b15075bf1395d4543e Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 01 Feb 2008 13:21:19 +0000
Subject: [PATCH] Fix 2598 - fixes for global replication monitoring

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |  294 ++++++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 193 insertions(+), 101 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 3b85cb1..0e759c0 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -45,6 +45,7 @@
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -124,12 +125,12 @@
 
 
   /**
-   * When this Handler is connected to a remote replication server
+   * When this Handler is related to a remote replication server
    * this collection will contain as many elements as there are
    * LDAP servers connected to the remote replication server.
    */
-  private List<LightweightServerHandler>
-     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
+  private Map<Short, LightweightServerHandler> connectedServers =
+    new ConcurrentHashMap<Short, LightweightServerHandler>();
 
   /**
    * The time in milliseconds between heartbeats from the replication
@@ -200,6 +201,8 @@
     maxRcvWindow = windowSize;
     rcvWindow = windowSize;
     long localGenerationId = -1;
+    boolean handshakeOnly = false;
+
     try
     {
       if (baseDn != null)
@@ -244,6 +247,8 @@
         maxSendQueue = receivedMsg.getMaxSendQueue();
         heartbeatInterval = receivedMsg.getHeartbeatInterval();
 
+        handshakeOnly = receivedMsg.isHandshakeOnly();
+
         // The session initiator decides whether to use SSL.
         sslEncryption = receivedMsg.getSSLEncryption();
 
@@ -524,60 +529,70 @@
       replicationServerDomain = replicationServer.
               getReplicationServerDomain(this.baseDn,true);
 
-      boolean started;
-      if (serverIsLDAPserver)
+      if (!handshakeOnly)
       {
-        started = replicationServerDomain.startServer(this);
-      }
-      else
-      {
-        started = replicationServerDomain.startReplicationServer(this);
-      }
-
-      if (started)
-      {
-        // sendWindow MUST be created before starting the writer
-        sendWindow = new Semaphore(sendWindowSize);
-
-        writer = new ServerWriter(session, serverId,
-                this, replicationServerDomain);
-        reader = new ServerReader(session, serverId,
-                this, replicationServerDomain);
-
-        reader.start();
-        writer.start();
-
-        // Create a thread to send heartbeat messages.
-        if (heartbeatInterval > 0)
+        boolean started;
+        if (serverIsLDAPserver)
         {
-          heartbeatThread = new HeartbeatThread(
-              "replication Heartbeat to " + serverURL +
-              " for " + this.baseDn,
-              session, heartbeatInterval/3);
-          heartbeatThread.start();
+          started = replicationServerDomain.startServer(this);
+        }
+        else
+        {
+          started = replicationServerDomain.startReplicationServer(this);
         }
 
-
-        DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
-        DirectoryServer.registerMonitorProvider(this);
-      }
-      else
-      {
-        // the connection is not valid, close it.
-        try
+        if (started)
         {
-          if (debugEnabled())
+          // sendWindow MUST be created before starting the writer
+          sendWindow = new Semaphore(sendWindowSize);
+
+          writer = new ServerWriter(session, serverId,
+              this, replicationServerDomain);
+          reader = new ServerReader(session, serverId,
+              this, replicationServerDomain);
+
+          reader.start();
+          writer.start();
+
+          // Create a thread to send heartbeat messages.
+          if (heartbeatInterval > 0)
           {
-            TRACER.debugInfo("In " +
-              replicationServerDomain.getReplicationServer().
-              getMonitorInstanceName() + " RS failed to start locally " +
-              " the connection from serverID="+serverId);
+            heartbeatThread = new HeartbeatThread(
+                "replication Heartbeat to " + serverURL +
+                " for " + this.baseDn,
+                session, heartbeatInterval/3);
+            heartbeatThread.start();
           }
-          session.close();
-        } catch (IOException e1)
-        {
-          // ignore
+
+          DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+          DirectoryServer.registerMonitorProvider(this);
         }
+        else
+        {
+          // the connection is not valid, close it.
+          try
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugInfo("In " +
+                  replicationServerDomain.getReplicationServer().
+                  getMonitorInstanceName() + " RS failed to start locally " +
+                  " the connection from serverID="+serverId);
+            }
+            session.close();
+          } catch (IOException e1)
+          {
+            // ignore
+          }
+        }
+      }
+      else
+      {
+        // For a hanshakeOnly connection, let's only create a reader
+        // in order to detect the connection closure.
+        reader = new ServerReader(session, serverId,
+            this, replicationServerDomain);
+        reader.start();
       }
     }
     catch (Exception e)
@@ -842,22 +857,22 @@
   /**
    * Get the age of the older change that has not yet been replicated
    * to the server handled by this ServerHandler.
-   *
    * @return The age if the older change has not yet been replicated
    *         to the server handled by this ServerHandler.
    */
   public Long getApproxFirstMissingDate()
   {
-    // Get the older CN received
-    // From it, get the next sequence number
-    // Get the CN for the next sequence number
-    // If not present in the local RS db,
-    // then approximate with the older update time
-    ChangeNumber olderUpdateCN = getOlderUpdateCN();
-    if (olderUpdateCN == null)
-      return null;
+    Long result = (long)0;
 
-    return olderUpdateCN.getTime();
+    // Get the older CN received
+    ChangeNumber olderUpdateCN = getOlderUpdateCN();
+    if (olderUpdateCN != null)
+    {
+      // If not present in the local RS db,
+      // then approximate with the older update time
+      result=olderUpdateCN.getTime();
+    }
+    return result;
   }
 
   /**
@@ -874,29 +889,82 @@
 
   /**
    * Get the older Change Number for that server.
+   * Returns null when the queue is empty.
    * @return The older change number.
    */
   public ChangeNumber getOlderUpdateCN()
   {
+    ChangeNumber result = null;
     synchronized (msgQueue)
     {
       if (isFollowing())
       {
         if (msgQueue.isEmpty())
-          return null;
-
-        UpdateMessage msg = msgQueue.first();
-        return msg.getChangeNumber();
+        {
+          result=null;
+        }
+        else
+        {
+          UpdateMessage msg = msgQueue.first();
+          result = msg.getChangeNumber();
+        }
       }
       else
       {
         if (lateQueue.isEmpty())
-          return null;
+        {
+          // isFollowing is false AND lateQueue is empty
+          // We may be at the very moment when the writer has emptyed the
+          // lateQueue when it sent the last update. The writer will fill again
+          // the lateQueue when it will send the next update but we are not yet
+          // there. So let's take the last change not sent directly from
+          // the db.
 
-        UpdateMessage msg = lateQueue.first();
-        return msg.getChangeNumber();
+          ReplicationIteratorComparator comparator =
+            new ReplicationIteratorComparator();
+          SortedSet<ReplicationIterator> iteratorSortedSet =
+            new TreeSet<ReplicationIterator>(comparator);
+          try
+          {
+            // Build a list of candidates iterator (i.e. db i.e. server)
+            for (short serverId : replicationServerDomain.getServers())
+            {
+              // get the last already sent CN from that server
+              ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
+              // get an iterator in this server db from that last change
+              ReplicationIterator iterator =
+                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
+              // if that iterator has changes, then it is a candidate
+              // it is added in the sorted list at a position given by its
+              // current change (see ReplicationIteratorComparator).
+              if ((iterator != null) && (iterator.getChange() != null))
+              {
+                iteratorSortedSet.add(iterator);
+              }
+            }
+            UpdateMessage msg = iteratorSortedSet.first().getChange();
+            result = msg.getChangeNumber();
+          }
+          catch(Exception e)
+          {
+            result=null;
+          }
+          finally
+          {
+            for (ReplicationIterator iterator : iteratorSortedSet)
+            {
+              iterator.releaseCursor();
+            }
+          }
+        }
+        else
+        {
+          UpdateMessage msg = lateQueue.first();
+          result = msg.getChangeNumber();
+        }
       }
     }
+    return result;
   }
 
   /**
@@ -958,7 +1026,7 @@
        */
       while (msgQueue.size() > maxQueueSize)
       {
-        following = false;
+        setFollowing(false);
         msgQueue.removeFirst();
       }
     }
@@ -1083,6 +1151,13 @@
               }
             }
           }
+
+          // The loop below relies on the fact that it is sorted based
+          // on the currentChange of each iterator to consider the next
+          // change accross all servers.
+          // Hence it is necessary to remove and eventual add again an iterator
+          // when looping in order to keep consistent the order of the
+          // iterators (see ReplicationIteratorComparator.
           while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
           {
             ReplicationIterator iterator = iteratorSortedSet.first();
@@ -1107,7 +1182,7 @@
             {
               if (msgQueue.size() < maxQueueSize)
               {
-                following = true;
+                setFollowing(true);
               }
             }
           }
@@ -1119,7 +1194,7 @@
               if (msgQueue.contains(msg))
               {
                 /* we finally catched up with the regular queue */
-                following = true;
+                setFollowing(true);
                 lateQueue.clear();
                 UpdateMessage msg1;
                 do
@@ -1459,14 +1534,6 @@
       attributes.add(new Attribute("connected-to", this.replicationServerDomain.
           getReplicationServer().getMonitorInstanceName()));
 
-      // Add the oldest missing update
-      Long olderUpdateTime = this.getApproxFirstMissingDate();
-      if (olderUpdateTime != null)
-      {
-        Date date = new Date(olderUpdateTime);
-        attributes.add(new Attribute("approx-older-change-not-synchronized",
-          date.toString()));
-      }
     }
     else
     {
@@ -1477,27 +1544,42 @@
     attributes.add(new Attribute("base-dn",
                                  baseDn.toString()));
 
-    // Update stats
-
-    // Retrieves the topology counters
     if (serverIsLDAPserver)
     {
+      MonitorData md;
       try
       {
-        replicationServerDomain.retrievesRemoteMonitorData();
+        md = replicationServerDomain.getMonitorData();
+
+        // Oldest missing update
+        Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
+        if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
+        {
+          Date date = new Date(approxFirstMissingDate);
+          attributes.add(new Attribute("approx-older-change-not-synchronized",
+              date.toString()));
+          attributes.add(
+              new Attribute("approx-older-change-not-synchronized-millis",
+                  String.valueOf(approxFirstMissingDate)));
+        }
+
+        // Missing changes
+        long missingChanges = md.getMissingChanges(serverId);
+        attributes.add(new Attribute("missing-changes",
+            String.valueOf(missingChanges)));
+
+        // Replication delay
+        long delay = md.getApproxDelay(serverId);
+        attributes.add(new Attribute("approximate-delay",
+            String.valueOf(delay)));
       }
       catch(Exception e)
       {
-        // FIXME: We failed retrieving the remote monitor data
+        // TODO: improve the log
+        // We failed retrieving the remote monitor data.
+        attributes.add(new Attribute("error",
+            stackTraceToSingleLineString(e)));
       }
-
-      // Compute the latency for the current SH
-      int missingChanges =
-        replicationServerDomain.getMissingChanges(serverState);
-
-      // add the latency attribute to our monitor data
-      attributes.add(new Attribute("missing-changes",
-          String.valueOf(missingChanges)));
     }
 
     // Deprecated
@@ -1532,8 +1614,6 @@
     attributes.add(new Attribute("waiting-changes",
         String.valueOf(getRcvMsgQueueSize())));
     // Age of oldest missing change
-    attributes.add(new Attribute("approximate-delay",
-                                 String.valueOf(getApproxDelay())));
 
     // Date of the oldest missing change
     long olderUpdateTime = getOlderUpdateTime();
@@ -1731,14 +1811,14 @@
      List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
      generationId = infoMsg.getGenerationId();
 
-     synchronized(remoteLDAPservers)
+     synchronized(connectedServers)
      {
        // Removes the existing structures
-       for (LightweightServerHandler lsh : remoteLDAPservers)
+       for (LightweightServerHandler lsh : connectedServers.values())
        {
          lsh.stopHandler();
        }
-       remoteLDAPservers.clear();
+       connectedServers.clear();
 
        // Creates the new structure according to the message received.
        for (String newConnectedServer : newRemoteLDAPservers)
@@ -1746,7 +1826,7 @@
          LightweightServerHandler lsh
          = new LightweightServerHandler(newConnectedServer, this);
          lsh.startHandler();
-         remoteLDAPservers.add(lsh);
+         connectedServers.put(lsh.getServerId(), lsh);
        }
      }
    }
@@ -1762,14 +1842,17 @@
     */
    public boolean isRemoteLDAPServer(short wantedServer)
    {
-     for (LightweightServerHandler server : remoteLDAPservers)
+     synchronized(connectedServers)
      {
-       if (wantedServer == server.getServerId())
+       for (LightweightServerHandler server : connectedServers.values())
        {
-         return true;
+         if (wantedServer == server.getServerId())
+         {
+           return true;
+         }
        }
+       return false;
      }
-     return false;
    }
 
    /**
@@ -1781,7 +1864,7 @@
     */
    public boolean hasRemoteLDAPServers()
    {
-     return !remoteLDAPservers.isEmpty();
+     return !connectedServers.isEmpty();
    }
 
   /**
@@ -1907,4 +1990,13 @@
   {
     return this.replicationServerDomain;
   }
+
+  /**
+   * Return a Set containing the servers known by this replicationServer.
+   * @return a set containing the servers known by this replicationServer.
+   */
+  public Set<Short> getConnectedServerIds()
+  {
+    return connectedServers.keySet();
+  }
 }

--
Gitblit v1.10.0