From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS,  we introduce:

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |   92 +++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 91 insertions(+), 1 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 983cffa..86e858b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -55,11 +55,14 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.MutableBoolean;
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.ChangeStatusMsg;
 import org.opends.server.replication.protocol.HeartbeatMonitor;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplServerStartDSMsg;
@@ -116,6 +119,28 @@
   private ReplicationDomain domain = null;
 
   /**
+   * This object is used as a conditional event to be notified about
+   * the reception of monitor information from the Replication Server.
+   */
+  private final MutableBoolean monitorResponse = new MutableBoolean(false);
+
+  /**
+   * A Map containing the ServerStates of all the replicas in the topology
+   * as seen by the ReplicationServer the last time it was polled or the last
+   * time it published monitoring information.
+   */
+  private HashMap<Integer, ServerState> replicaStates =
+    new HashMap<Integer, ServerState>();
+
+  /**
+   * A Map containing the ServerStates of all the replication servers in the
+   * topology as seen by the ReplicationServer the last time it was polled or
+   * the last time it published monitoring information.
+   */
+  private HashMap<Integer, ServerState> rsStates =
+    new HashMap<Integer, ServerState>();
+
+  /**
    * The expected duration in milliseconds between heartbeats received
    * from the replication server.  Zero means heartbeats are off.
    */
@@ -1918,6 +1943,37 @@
           // Try to find a suitable RS
           this.reStart(failingSession);
         }
+        else if (msg instanceof MonitorMsg)
+        {
+          // This is the response to a MonitorRequest that was sent earlier or
+          // the regular message of the monitoring publisher of the RS.
+
+          // Extract and store replicas ServerStates
+          replicaStates = new HashMap<Integer, ServerState>();
+          MonitorMsg monitorMsg = (MonitorMsg) msg;
+          Iterator<Integer> it = monitorMsg.ldapIterator();
+          while (it.hasNext())
+          {
+            int srvId = it.next();
+            replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
+          }
+
+          // Notify the sender that the response was received.
+          synchronized (monitorResponse)
+          {
+            monitorResponse.set(true);
+            monitorResponse.notify();
+          }
+
+          // Extract and store replication servers ServerStates
+          rsStates = new HashMap<Integer, ServerState>();
+          it = monitorMsg.rsIterator();
+          while (it.hasNext())
+          {
+            int srvId = it.next();
+            rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
+          }
+        }
         else
         {
           return msg;
@@ -1949,6 +2005,40 @@
   }
 
   /**
+   * Gets the States of all the Replicas currently in the
+   * Topology.
+   * When this method is called, a Monitoring message will be sent
+   * to the Replication Server to which this domain is currently connected
+   * so that it computes a table containing information about
+   * all Directory Servers in the topology.
+   * This Computation involves communications will all the servers
+   * currently connected and
+   *
+   * @return The States of all Replicas in the topology (except us)
+   */
+  public Map<Integer, ServerState> getReplicaStates()
+  {
+    monitorResponse.set(false);
+
+    // publish Monitor Request Message to the Replication Server
+    publish(new MonitorRequestMsg(serverId, getRsServerId()));
+
+    // wait for Response up to 10 seconds.
+    try
+    {
+      synchronized (monitorResponse)
+      {
+        if (monitorResponse.get() == false)
+        {
+          monitorResponse.wait(10000);
+        }
+      }
+    } catch (InterruptedException e)
+    {}
+    return replicaStates;
+  }
+
+  /**
    * This method allows to do the necessary computing for the window
    * management after treatment by the worker threads.
    *
@@ -2440,7 +2530,7 @@
     {
       ctHeartbeatPublisherThread =
         new CTHeartbeatPublisherThread(
-            "Replication CN Heartbeat Thread started for " +
+            "Replication CN Heartbeat sender for " +
             baseDn + " with " + getReplicationServer(),
             session, changeTimeHeartbeatSendInterval, serverId);
       ctHeartbeatPublisherThread.start();

--
Gitblit v1.10.0