From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS,  we introduce:

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  378 ++++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 255 insertions(+), 123 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4ef072c..ab1da75 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -85,6 +85,8 @@
 import org.opends.server.types.ResultCode;
 
 import com.sleepycat.je.DatabaseException;
+import org.opends.server.replication.server.
+  ReplicationServer.GlobalServerId;
 
 /**
  * This class define an in-memory cache that will be used to store
@@ -109,6 +111,10 @@
   // late or not
   private StatusAnalyzer statusAnalyzer = null;
 
+  // The monitoring publisher that periodically sends monitoring messages to the
+  // topology
+  private MonitoringPublisher monitoringPublisher = null;
+
   /*
    * The following map contains one balanced tree for each replica ID
    * to which we are currently publishing
@@ -1066,6 +1072,17 @@
           // Try doing job anyway...
         }
 
+        // Stop useless monitoring publisher if no more RS or DS in domain
+        if ( (directoryServers.size() + replicationServers.size() )== 1)
+        {
+          if (debugEnabled())
+            TRACER.debugInfo("In " +
+              replicationServer.getMonitorInstanceName() +
+              " remote server " + handler.getMonitorInstanceName() + " is " +
+              "the last RS/DS to be stopped: stopping monitoring publisher");
+          stopMonitoringPublisher();
+        }
+
         if (handler.isReplicationServer())
         {
           if (replicationServers.containsValue(handler))
@@ -1082,44 +1099,39 @@
               buildAndSendTopoInfoToDSs(null);
             }
           }
-        } else
+        } else if (directoryServers.containsValue(handler))
         {
-          if (directoryServers.containsValue(handler))
+          // If this is the last DS for the domain,
+          // shutdown the status analyzer
+          if (directoryServers.size() == 1)
           {
-            // If this is the last DS for the domain,
-            // shutdown the status analyzer
-            if (directoryServers.size() == 1)
-            {
-              if (debugEnabled())
-                TRACER.debugInfo("In " +
-                    replicationServer.getMonitorInstanceName() +
-                    " remote server " + handler.getMonitorInstanceName() +
+            if (debugEnabled())
+              TRACER.debugInfo("In " +
+                replicationServer.getMonitorInstanceName() +
+                " remote server " + handler.getMonitorInstanceName() +
                 " is the last DS to be stopped: stopping status analyzer");
-              stopStatusAnalyzer();
-            }
+            stopStatusAnalyzer();
+          }
 
-            unregisterServerHandler(handler);
-            handler.shutdown();
+          unregisterServerHandler(handler);
+          handler.shutdown();
 
-            // Check if generation id has to be reset
-            mayResetGenerationId();
+          // Check if generation id has to be reset
+          mayResetGenerationId();
+          if (!shutdown)
+          {
             // Update the remote replication servers with our list
             // of connected LDAP servers
-            if (!shutdown)
-            {
-              buildAndSendTopoInfoToRSs();
-              // Warn our DSs that a RS or DS has quit (does not use this
-              // handler as already removed from list)
-              buildAndSendTopoInfoToDSs(null);
-            }
+            buildAndSendTopoInfoToRSs();
+            // Warn our DSs that a RS or DS has quit (does not use this
+            // handler as already removed from list)
+            buildAndSendTopoInfoToDSs(null);
           }
-          else if (otherHandlers.contains(handler))
-          {
-            unRegisterHandler(handler);
-            handler.shutdown();
-          }
+        } else if (otherHandlers.contains(handler))
+        {
+          unRegisterHandler(handler);
+          handler.shutdown();
         }
-
       }
       catch(Exception e)
       {
@@ -1581,99 +1593,51 @@
         // in the topology.
         if (senderHandler.isDataServer())
         {
-          MonitorMsg returnMsg =
-            new MonitorMsg(msg.getDestination(), msg.getsenderID());
+          // Monitoring information requested by a DS
+          MonitorMsg monitorMsg =
+            createGlobalTopologyMonitorMsg(msg.getDestination(),
+            msg.getsenderID());
 
-          try
+           if (monitorMsg != null)
           {
-            returnMsg.setReplServerDbState(getDbServerState());
-            // Update the information we have about all servers
-            // in the topology.
-            MonitorData md = computeMonitorData();
-
-            // Add the informations about the Replicas currently in
-            // the topology.
-            Iterator<Integer> it = md.ldapIterator();
-            while (it.hasNext())
+            try
             {
-              int replicaId = it.next();
-              returnMsg.setServerState(
-                  replicaId, md.getLDAPServerState(replicaId),
-                  md.getApproxFirstMissingDate(replicaId), true);
-            }
-
-            // Add the informations about the Replication Servers
-            // currently in the topology.
-            it = md.rsIterator();
-            while (it.hasNext())
+              senderHandler.send(monitorMsg);
+            } catch (IOException e)
             {
-              int replicaId = it.next();
-              returnMsg.setServerState(
-                  replicaId, md.getRSStates(replicaId),
-                  md.getRSApproxFirstMissingDate(replicaId), false);
+              // the connection was closed.
             }
           }
-          catch (DirectoryException e)
-          {
-            // If we can't compute the Monitor Information, send
-            // back an empty message.
-          }
-          try
-          {
-            senderHandler.send(returnMsg);
-          } catch (IOException e)
-          {
-            // the connection was closed.
-          }
           return;
-        }
-
-        MonitorMsg monitorMsg =
-          new MonitorMsg(msg.getDestination(), msg.getsenderID());
-
-        // Populate for each connected LDAP Server
-        // from the states stored in the serverHandler.
-        // - the server state
-        // - the older missing change
-        for (DataServerHandler lsh : this.directoryServers.values())
+        } else
         {
-          monitorMsg.setServerState(
-            lsh.getServerId(),
-            lsh.getServerState(),
-            lsh.getApproxFirstMissingDate(),
-            true);
-        }
+          // Monitoring information requested by a RS
+          MonitorMsg monitorMsg =
+            createLocalTopologyMonitorMsg(msg.getDestination(),
+            msg.getsenderID());
 
-        // Same for the connected RS
-        for (ReplicationServerHandler rsh : this.replicationServers.values())
-        {
-          monitorMsg.setServerState(
-            rsh.getServerId(),
-            rsh.getServerState(),
-            rsh.getApproxFirstMissingDate(),
-            false);
-        }
-
-        // Populate the RS state in the msg from the DbState
-        monitorMsg.setReplServerDbState(this.getDbServerState());
-
-
-        try
-        {
-          senderHandler.send(monitorMsg);
-        } catch (Exception e)
-        {
-          // We log the error. The requestor will detect a timeout or
-          // any other failure on the connection.
-          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
-              Integer.toString((msg.getDestination()))));
+          if (monitorMsg != null)
+          {
+            try
+            {
+              senderHandler.send(monitorMsg);
+            } catch (Exception e)
+            {
+              // We log the error. The requestor will detect a timeout or
+              // any other failure on the connection.
+              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
+                  Integer.toString((msg.getDestination()))));
+            }
+          }
         }
       } else if (msg instanceof MonitorMsg)
       {
         MonitorMsg monitorMsg =
           (MonitorMsg) msg;
 
-        receivesMonitorDataResponse(monitorMsg);
+        GlobalServerId globalServerId =
+          new GlobalServerId(baseDn, senderHandler.getServerId());
+        receivesMonitorDataResponse(monitorMsg, globalServerId);
       } else
       {
         logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1775,6 +1739,116 @@
   }
 
   /**
+   * Creates a new monitor message including monitoring information for the
+   * whole topology.
+   * @param sender The sender of this message.
+   * @param destination The destination of this message.
+   * @return The newly created and filled MonitorMsg. Null if a problem occurred
+   * during message creation.
+   */
+  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
+  {
+    MonitorMsg returnMsg =
+      new MonitorMsg(sender, destination);
+
+    try
+    {
+      returnMsg.setReplServerDbState(getDbServerState());
+      // Update the information we have about all servers
+      // in the topology.
+      MonitorData md = computeMonitorData();
+
+      // Add the informations about the Replicas currently in
+      // the topology.
+      Iterator<Integer> it = md.ldapIterator();
+      while (it.hasNext())
+      {
+        int replicaId = it.next();
+        returnMsg.setServerState(
+            replicaId, md.getLDAPServerState(replicaId),
+            md.getApproxFirstMissingDate(replicaId), true);
+      }
+
+      // Add the informations about the Replication Servers
+      // currently in the topology.
+      it = md.rsIterator();
+      while (it.hasNext())
+      {
+        int replicaId = it.next();
+        returnMsg.setServerState(
+            replicaId, md.getRSStates(replicaId),
+            md.getRSApproxFirstMissingDate(replicaId), false);
+      }
+    }
+    catch (DirectoryException e)
+    {
+      // If we can't compute the Monitor Information, send
+      // back an empty message.
+    }
+    return returnMsg;
+  }
+
+  /**
+   * Creates a new monitor message including monitoring information for the
+   * topology directly connected to this RS. This includes information for:
+   * - local RS
+   * - all direct DSs
+   * - all direct RSs
+   * @param sender The sender of this message.
+   * @param destination The destination of this message.
+   * @return The newly created and filled MonitorMsg. Null if a problem occurred
+   * during message creation.
+   */
+  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
+  {
+    MonitorMsg monitorMsg = null;
+
+    try {
+
+      // Lock domain as we need to go through connected servers list
+      lock();
+
+      monitorMsg = new MonitorMsg(sender, destination);
+
+
+      // Populate for each connected LDAP Server
+      // from the states stored in the serverHandler.
+      // - the server state
+      // - the older missing change
+      for (DataServerHandler lsh : this.directoryServers.values())
+      {
+        monitorMsg.setServerState(
+          lsh.getServerId(),
+          lsh.getServerState(),
+          lsh.getApproxFirstMissingDate(),
+          true);
+      }
+
+      // Same for the connected RS
+      for (ReplicationServerHandler rsh : this.replicationServers.values())
+      {
+        monitorMsg.setServerState(
+          rsh.getServerId(),
+          rsh.getServerState(),
+          rsh.getApproxFirstMissingDate(),
+          false);
+      }
+
+      // Populate the RS state in the msg from the DbState
+      monitorMsg.setReplServerDbState(this.getDbServerState());
+    } catch(InterruptedException e)
+    {
+      // At lock, too bad...
+    } finally
+    {
+      if (hasLock())
+        release();
+    }
+
+    return monitorMsg;
+  }
+
+  /**
    * Shutdown this ReplicationServerDomain.
    */
   public void shutdown()
@@ -1831,8 +1905,7 @@
 
   /**
    * Send a TopologyMsg to all the connected directory servers in order to
-   * let.
-   * them know the topology (every known DSs and RSs)
+   * let them know the topology (every known DSs and RSs).
    * @param notThisOne If not null, the topology message will not be sent to
    * this passed server.
    */
@@ -1931,10 +2004,11 @@
       dsInfos.add(serverHandler.toDSInfo());
     }
 
-    // Create info for us (local RS)
+    // Create info for the local RS
     List<RSInfo> rsInfos = new ArrayList<RSInfo>();
     RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
-      generationId, replicationServer.getGroupId());
+      generationId, replicationServer.getGroupId(),
+      replicationServer.getWeight());
     rsInfos.add(localRSInfo);
 
     return new TopologyMsg(dsInfos, rsInfos);
@@ -1965,7 +2039,8 @@
 
     // Add our own info (local RS)
     RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
-      generationId, replicationServer.getGroupId());
+      generationId, replicationServer.getGroupId(),
+      replicationServer.getWeight());
     rsInfos.add(localRSInfo);
 
     // Go through every peer RSs (and get their connected DSs), also add info
@@ -2471,13 +2546,15 @@
    * Start collecting global monitoring information for this
    * ReplicationServerDomain.
    *
-   * @return The number of response that should come back.
+   * @param expectedMonitoringMsg The list of server handler we have to wait a
+   * monitoring message from. Will be filled as necessary by this method.
    *
    * @throws DirectoryException In case the monitoring information could
    *                            not be collected.
    */
 
-  int initializeMonitorData() throws DirectoryException
+  void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
+    throws DirectoryException
   {
     synchronized (monitorDataLock)
     {
@@ -2539,7 +2616,7 @@
     }
 
     // Send the request for remote monitor data to the
-    return sendMonitorDataRequest();
+    sendMonitorDataRequest(expectedMonitoringMsg);
   }
 
   /**
@@ -2566,22 +2643,25 @@
 
   /**
    * Sends a MonitorRequest message to all connected RS.
-   * @return the number of requests sent.
+   * @param expectedMonitoringMsg The list of server handler we have to wait a
+   * monitoring message from. Will be filled as necessary by this method.
    * @throws DirectoryException when a problem occurs.
    */
-  protected int sendMonitorDataRequest()
+  protected void sendMonitorDataRequest(
+    List<GlobalServerId> expectedMonitoringMsg)
     throws DirectoryException
   {
-    int sent = 0;
     try
     {
       for (ServerHandler rs : replicationServers.values())
       {
+        int serverId = rs.getServerId();
         MonitorRequestMsg msg =
           new MonitorRequestMsg(this.replicationServer.getServerId(),
-          rs.getServerId());
+          serverId);
         rs.send(msg);
-        sent++;
+        // Store the fact that we expect a MonitoringMsg back from this server
+        expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
       }
     } catch (Exception e)
     {
@@ -2590,7 +2670,6 @@
       throw new DirectoryException(ResultCode.OTHER,
         message, e);
     }
-    return sent;
   }
 
   /**
@@ -2598,8 +2677,10 @@
    * and stores the data received.
    *
    * @param msg The message to be processed.
+   * @param globalServerHandlerId server handler that is receiving the message.
    */
-  public void receivesMonitorDataResponse(MonitorMsg msg)
+  private void receivesMonitorDataResponse(MonitorMsg msg,
+    GlobalServerId globalServerId)
   {
     try
     {
@@ -2677,7 +2758,7 @@
 
       // Decreases the number of expected responses and potentially
       // wakes up the waiting requestor thread.
-      replicationServer.responseReceived();
+      replicationServer.responseReceived(globalServerId);
 
     } catch (Exception e)
     {
@@ -2832,6 +2913,57 @@
   }
 
   /**
+   * Starts the monitoring publisher for the domain.
+   */
+  public void startMonitoringPublisher()
+  {
+    if (monitoringPublisher == null)
+    {
+      long period =
+        replicationServer.getMonitoringPublisherPeriod();
+      if (period > 0) // 0 means no monitoring publisher
+      {
+        monitoringPublisher = new MonitoringPublisher(this, period);
+        monitoringPublisher.start();
+      }
+    }
+  }
+
+  /**
+   * Stops the monitoring publisher for the domain.
+   */
+  public void stopMonitoringPublisher()
+  {
+    if (monitoringPublisher != null)
+    {
+      monitoringPublisher.shutdown();
+      monitoringPublisher.waitForShutdown();
+      monitoringPublisher = null;
+    }
+  }
+
+  /**
+   * Tests if the monitoring publisher for this domain is running.
+   * @return True if the monitoring publisher is running, false otherwise.
+   */
+  public boolean isRunningMonitoringPublisher()
+  {
+    return (monitoringPublisher != null);
+  }
+
+  /**
+   * Update the monitoring publisher with the new period value.
+   * @param period The new period value.
+   */
+  public void updateMonitoringPublisher(long period)
+  {
+    if (monitoringPublisher != null)
+    {
+      monitoringPublisher.setPeriod(period);
+    }
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override

--
Gitblit v1.10.0