From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS,  we introduce:

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  236 ++++++++++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 201 insertions(+), 35 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index b4403da..71f4386 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -49,8 +49,6 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -101,6 +99,7 @@
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
 import com.sleepycat.je.DatabaseException;
+import java.util.Collections;
 
 /**
  * ReplicationServer Listener.
@@ -173,6 +172,10 @@
   // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
   private int degradedStatusThreshold = 5000;
 
+  // Number of milliseconds to wait before sending new monitoring messages.
+  // If value is 0, monitoring publisher is disabled
+  private long monitoringPublisherPeriod = 3000;
+
   // The handler of the draft change numbers database, the database used to
   // store the relation between a draft change number ('seqnum') and the
   // associated cookie.
@@ -211,6 +214,13 @@
   private int weight = 1;
 
   /**
+   * Holds the list of all replication servers instantiated in this VM.
+   * This allows to perform clean up of the RS databases in unit tests.
+   */
+  private static List<ReplicationServer> allInstances =
+    new ArrayList<ReplicationServer>();
+
+  /**
    * Creates a new Replication server using the provided configuration entry.
    *
    * @param configuration The configuration of this replication server.
@@ -254,6 +264,7 @@
     groupId = (byte)configuration.getGroupId();
     assuredTimeout = configuration.getAssuredTimeout();
     degradedStatusThreshold = configuration.getDegradedStatusThreshold();
+    monitoringPublisherPeriod = configuration.getMonitoringPeriod();
 
     replSessionSecurity = new ReplSessionSecurity();
     initialize(replicationPort);
@@ -274,8 +285,20 @@
     DirectoryServer.registerImportTaskListener(this);
 
     localPorts.add(replicationPort);
+
+    // Keep track of this new instance
+    allInstances.add(this);
   }
 
+  /**
+   * Get the list of every replication servers instantiated in the current VM.
+   * @return The list of every replication servers instantiated in the current
+   * VM.
+   */
+  public static List<ReplicationServer> getAllInstances()
+  {
+    return allInstances;
+  }
 
   /**
    * The run method for the Listen thread.
@@ -850,7 +873,9 @@
       dbEnv.shutdown();
     }
 
-}
+    // Remove this instance from the global instance list
+    allInstances.remove(this);
+  }
 
 
   /**
@@ -1028,6 +1053,32 @@
       }
     }
 
+    // Update period value for monitoring publishers (stop them if requested
+    // value is 0)
+    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
+    {
+      long oldMonitoringPeriod = monitoringPublisherPeriod;
+      monitoringPublisherPeriod = configuration.getMonitoringPeriod();
+      for(ReplicationServerDomain rsd : baseDNs.values())
+      {
+        if (monitoringPublisherPeriod == 0L)
+        {
+          // Requested to stop monitoring publishers
+          rsd.stopMonitoringPublisher();
+        } else if (rsd.isRunningMonitoringPublisher())
+        {
+          // Update the threshold value for this running monitoring publisher
+          rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
+        } else if (oldMonitoringPeriod == 0L)
+        {
+          // Requested to start monitoring publishers with provided period value
+          if ( (rsd.getConnectedDSs().size() > 0) ||
+            (rsd.getConnectedRSs().size() > 0) )
+            rsd.startMonitoringPublisher();
+        }
+      }
+    }
+
     // Changed the group id ?
     byte newGroupId = (byte)configuration.getGroupId();
     if (newGroupId != groupId)
@@ -1044,7 +1095,10 @@
     if (weight != configuration.getWeight())
     {
       weight = configuration.getWeight();
-      // TODO: send new TopologyMsg
+      // Broadcast the new weight the the whole topology. This will make some
+      // DSs reconnect (if needed) to other RSs according to the new weight of
+      // this RS.
+      broadcastConfigChange();
     }
 
     if ((configuration.getReplicationDBDirectory() != null) &&
@@ -1057,6 +1111,19 @@
   }
 
   /**
+   * Broadcast a configuration change that just happened to the whole topology
+   * by sending a TopologyMsg to every entity in the topology.
+   */
+  private void broadcastConfigChange()
+  {
+    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+    {
+      replicationServerDomain.buildAndSendTopoInfoToDSs(null);
+      replicationServerDomain.buildAndSendTopoInfoToRSs();
+    }
+  }
+
+  /**
    * {@inheritDoc}
    */
   public boolean isConfigurationChangeAcceptable(
@@ -1345,6 +1412,15 @@
   }
 
   /**
+   * Get the monitoring publisher period value.
+   * @return the monitoring publisher period value.
+   */
+  public long getMonitoringPublisherPeriod()
+  {
+    return monitoringPublisherPeriod;
+  }
+
+  /**
    * Compute the list of replication servers that are not any
    * more connected to this Replication Server and stop the
    * corresponding handlers.
@@ -1411,12 +1487,80 @@
   /* The date of the last time they have been elaborated */
   private long monitorDataLastBuildDate = 0;
 
-  /* Search op on monitor data is processed by a worker thread.
-   * Requests are sent to the other RS,and responses are received by the
-   * listener threads.
-   * The worker thread is awoke on this semaphore, or on timeout.
+  /**
+   * This uniquely identifies a server (handler) in the cross-domain topology.
+   * Represents an identifier of a handler (in the whole RS) we have to wait a
+   * monitoring message from before answering to a monitor request.
    */
-  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
+  public static class GlobalServerId {
+
+    private int serverId = -1;
+    private String baseDn = null;
+
+    /**
+     * Constructor for a global server id.
+     * @param baseDn The dn of the RSD owning the handler.
+     * @param serverId The handler id in the matching RSD.
+     */
+    public GlobalServerId(String baseDn, int serverId) {
+      this.baseDn = baseDn;
+      this.serverId = serverId;
+    }
+
+    /**
+     * Get the server handler id.
+     * @return the serverId
+     */
+    public int getServerId()
+    {
+      return serverId;
+    }
+
+    /**
+     * Get the base dn.
+     * @return the baseDn
+     */
+    public String getBaseDn()
+    {
+      return baseDn;
+    }
+
+    /**
+     * Get the hascode.
+     * @return The hashcode.
+     */
+    @Override
+    public int hashCode()
+    {
+      int hash = 7;
+      hash = 43 * hash + this.serverId;
+      hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
+      return hash;
+    }
+
+    /**
+     * Tests if the passed global server handler id represents the same server
+     * handler as this one.
+     * @param obj The object to test.
+     * @return True if both identifiers are the same.
+     */
+    public boolean equals(Object obj) {
+      if ( (obj == null) || (obj instanceof GlobalServerId))
+        return false;
+
+      GlobalServerId globalServerId = (GlobalServerId)obj;
+      return ( globalServerId.baseDn.equals(baseDn) &&
+        (globalServerId.serverId == serverId) );
+    }
+  }
+
+  /**
+   * This gives the list of server handlers we are willing to wait monitoring
+   * message from. Each time a monitoring message is received by a server
+   * handler, the matching server handler id is retired from the list. When the
+   * list is empty, we received all expected monitoring messages.
+   */
+  private List<GlobalServerId> expectedMonitoringMsg = null;
 
   /**
    * Trigger the computation of the Global Monitoring Data.
@@ -1429,7 +1573,7 @@
    *
    * @throws DirectoryException If the computation cannot be achieved.
    */
-  public void computeMonitorData() throws DirectoryException
+  public synchronized void computeMonitorData() throws DirectoryException
   {
     if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
     {
@@ -1440,15 +1584,17 @@
       return;
     }
 
-    remoteMonitorResponsesSemaphore.drainPermits();
-    int count = 0;
+    // Initialize the list of server handlers we expect monitoring messages from
+    expectedMonitoringMsg =
+      Collections.synchronizedList(new ArrayList<GlobalServerId>());
+
     for (ReplicationServerDomain domain : baseDNs.values())
     {
-      count += domain.initializeMonitorData();
+      domain.initializeMonitorData(expectedMonitoringMsg);
     }
 
     // Wait for responses
-    waitMonitorDataResponses(count);
+    waitMonitorDataResponses();
 
     for (ReplicationServerDomain domain : baseDNs.values())
     {
@@ -1457,38 +1603,51 @@
   }
 
   /**
-   * Wait for the expected count of received MonitorMsg.
-   * @param expectedResponses The number of expected answers.
+   * Wait for the expected received MonitorMsg.
    * @throws DirectoryException When an error occurs.
    */
-  private void waitMonitorDataResponses(int expectedResponses)
+  private void waitMonitorDataResponses()
     throws DirectoryException
   {
     try
     {
       if (debugEnabled())
         TRACER.debugInfo(
-          "In " + getMonitorInstanceName() + " baseDn=" +
-          " waiting for " + expectedResponses + " expected monitor messages");
+          "In " + getMonitorInstanceName() +
+          " waiting for " + expectedMonitoringMsg.size() +
+          " expected monitor messages");
 
-      boolean allPermitsAcquired =
-        remoteMonitorResponsesSemaphore.tryAcquire(
-        expectedResponses,
-        (long) 5000, TimeUnit.MILLISECONDS);
-
-      if (!allPermitsAcquired)
+      // Wait up to 5 seconds for every expected monitoring message to come
+      // back.
+      boolean allReceived = false;
+      long startTime = TimeThread.getTime();
+      long curTime = startTime;
+      int maxTime = 5000;
+      while ( (curTime - startTime) < maxTime )
       {
-        monitorDataLastBuildDate = TimeThread.getTime();
+        // Have every expected monitoring messages arrived ?
+        if (expectedMonitoringMsg.size() == 0)
+        {
+          // Ok break the loop
+          allReceived = true;
+          break;
+        }
+        Thread.sleep(100);
+        curTime = TimeThread.getTime();
+      }
+
+      monitorDataLastBuildDate = TimeThread.getTime();
+
+      if (!allReceived)
+      {
         logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
-      // let's go on in best effort even with limited data received.
+        // let's go on in best effort even with limited data received.
       } else
       {
-        monitorDataLastBuildDate = TimeThread.getTime();
         if (debugEnabled())
           TRACER.debugInfo(
-            "In " + getMonitorInstanceName() + " baseDn=" +
-            " Successfully received all " + expectedResponses +
-            " expected monitor messages");
+            "In " + getMonitorInstanceName() +
+            " Successfully received all expected monitor messages");
       }
     } catch (Exception e)
     {
@@ -1499,11 +1658,18 @@
 
   /**
    * This should be called by each ReplicationServerDomain that receives
-   * a response to a monitor request message.
+   * a response to a monitor request message. This may also be called when a
+   * monitoring message is coming from a RS whose monitoring publisher thread
+   * sent it. As monitoring messages (sent because of monitoring request or
+   * because of monitoring publisher) have the same content, this is also ok
+   * to mark ok the server when the monitoring message coms from a monitoring
+   * publisher thread.
+   * @param globalServerId The server handler that is receiving the
+   * monitoring message.
    */
-  public void responseReceived()
+  public void responseReceived(GlobalServerId globalServerId)
   {
-    remoteMonitorResponsesSemaphore.release();
+    expectedMonitoringMsg.remove(globalServerId);
   }
 
 
@@ -1513,7 +1679,7 @@
    */
   public void responseReceivedAll()
   {
-    remoteMonitorResponsesSemaphore.notifyAll();
+    expectedMonitoringMsg.clear();
   }
 
   /**

--
Gitblit v1.10.0