From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS, we introduce:
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 92 +++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 91 insertions(+), 1 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 983cffa..86e858b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -55,11 +55,14 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.HeartbeatMonitor;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
@@ -116,6 +119,28 @@
private ReplicationDomain domain = null;
/**
+ * This object is used as a conditional event to be notified about
+ * the reception of monitor information from the Replication Server.
+ */
+ private final MutableBoolean monitorResponse = new MutableBoolean(false);
+
+ /**
+ * A Map containing the ServerStates of all the replicas in the topology
+ * as seen by the ReplicationServer the last time it was polled or the last
+ * time it published monitoring information.
+ */
+ private HashMap<Integer, ServerState> replicaStates =
+ new HashMap<Integer, ServerState>();
+
+ /**
+ * A Map containing the ServerStates of all the replication servers in the
+ * topology as seen by the ReplicationServer the last time it was polled or
+ * the last time it published monitoring information.
+ */
+ private HashMap<Integer, ServerState> rsStates =
+ new HashMap<Integer, ServerState>();
+
+ /**
* The expected duration in milliseconds between heartbeats received
* from the replication server. Zero means heartbeats are off.
*/
@@ -1918,6 +1943,37 @@
// Try to find a suitable RS
this.reStart(failingSession);
}
+ else if (msg instanceof MonitorMsg)
+ {
+ // This is the response to a MonitorRequest that was sent earlier or
+ // the regular message of the monitoring publisher of the RS.
+
+ // Extract and store replicas ServerStates
+ replicaStates = new HashMap<Integer, ServerState>();
+ MonitorMsg monitorMsg = (MonitorMsg) msg;
+ Iterator<Integer> it = monitorMsg.ldapIterator();
+ while (it.hasNext())
+ {
+ int srvId = it.next();
+ replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
+ }
+
+ // Notify the sender that the response was received.
+ synchronized (monitorResponse)
+ {
+ monitorResponse.set(true);
+ monitorResponse.notify();
+ }
+
+ // Extract and store replication servers ServerStates
+ rsStates = new HashMap<Integer, ServerState>();
+ it = monitorMsg.rsIterator();
+ while (it.hasNext())
+ {
+ int srvId = it.next();
+ rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
+ }
+ }
else
{
return msg;
@@ -1949,6 +2005,40 @@
}
/**
+ * Gets the States of all the Replicas currently in the
+ * Topology.
+ * When this method is called, a Monitoring message will be sent
+ * to the Replication Server to which this domain is currently connected
+ * so that it computes a table containing information about
+ * all Directory Servers in the topology.
+ * This Computation involves communications will all the servers
+ * currently connected and
+ *
+ * @return The States of all Replicas in the topology (except us)
+ */
+ public Map<Integer, ServerState> getReplicaStates()
+ {
+ monitorResponse.set(false);
+
+ // publish Monitor Request Message to the Replication Server
+ publish(new MonitorRequestMsg(serverId, getRsServerId()));
+
+ // wait for Response up to 10 seconds.
+ try
+ {
+ synchronized (monitorResponse)
+ {
+ if (monitorResponse.get() == false)
+ {
+ monitorResponse.wait(10000);
+ }
+ }
+ } catch (InterruptedException e)
+ {}
+ return replicaStates;
+ }
+
+ /**
* This method allows to do the necessary computing for the window
* management after treatment by the worker threads.
*
@@ -2440,7 +2530,7 @@
{
ctHeartbeatPublisherThread =
new CTHeartbeatPublisherThread(
- "Replication CN Heartbeat Thread started for " +
+ "Replication CN Heartbeat sender for " +
baseDn + " with " + getReplicationServer(),
session, changeTimeHeartbeatSendInterval, serverId);
ctHeartbeatPublisherThread.start();
--
Gitblit v1.10.0