From c3420bec486f1921ea67fab4b1019ef06a0cea16 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 23 Mar 2009 09:04:22 +0000
Subject: [PATCH] Fix for 3889 : Replication domain don't have access to the Replica ServerStates
---
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 25 ++++-
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 52 +++++++++++++
opends/src/server/org/opends/server/replication/server/MonitorData.java | 50 ++++++++++++
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 67 ++++++++++++++++
5 files changed, 186 insertions(+), 11 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 1b61c9b..a1dd7b0 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -186,7 +186,6 @@
if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
throw new DataFormatException("input is not a valid " +
this.getClass().getCanonicalName());
- int pos = 1;
// sender
this.senderID = reader.getShort();
diff --git a/opends/src/server/org/opends/server/replication/server/MonitorData.java b/opends/src/server/org/opends/server/replication/server/MonitorData.java
index 25b9992..81a1473 100644
--- a/opends/src/server/org/opends/server/replication/server/MonitorData.java
+++ b/opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2008 Sun Microsystems, Inc.
+ * Copyright 2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -193,7 +193,7 @@
}
// Computes the missing changes counters for RS :
- // Sum the difference of seqnuence numbers for each element in the States.
+ // Sum the difference of sequence numbers for each element in the States.
for (short lsiSid : RSStates.keySet())
{
@@ -390,4 +390,50 @@
}
}
+ /**
+ * Returns an iterator on the serverId of the Replicas for which
+ * we have monitoring data.
+ *
+ * @return The iterator.
+ */
+ public Iterator<Short> ldapIterator()
+ {
+ return LDAPStates.keySet().iterator();
+ }
+
+ /**
+ * Returns an iterator on the serverId of the Replication Servers for which
+ * we have monitoring data.
+ *
+ * @return The iterator.
+ */
+ public Iterator<Short> rsIterator()
+ {
+ return RSStates.keySet().iterator();
+ }
+
+ /**
+ * Get the state of the RS server with the provided serverId.
+ *
+ * @param serverId The server ID.
+ * @return The server state.
+ */
+ public ServerState getRSStates(short serverId)
+ {
+ return RSStates.get(serverId);
+ }
+
+ /**
+ * Get an approximation of the date of the first missing update.
+ *
+ * @param serverId The server ID.
+ * @return The date.
+ */
+ public long getRSApproxFirstMissingDate(short serverId)
+ {
+ Long res;
+ if ((res = fmRSDate.get(serverId)) != null)
+ return res;
+ return 0;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index d7a71f5..a9ade5f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1427,6 +1427,58 @@
errorMsg.getDetails()));
} else if (msg instanceof MonitorRequestMsg)
{
+ // If the request comes from a Directory Server we need to
+ // build the full list of all servers in the topology
+ // and send back a MonitorMsg with the full list of all the servers
+ // in the topology.
+ if (senderHandler.isLDAPserver())
+ {
+ MonitorMsg returnMsg =
+ new MonitorMsg(msg.getDestination(), msg.getsenderID());
+ try
+ {
+ returnMsg.setReplServerDbState(getDbServerState());
+ // Update the information we have about all servers
+ // in the topology.
+ MonitorData md = computeMonitorData();
+
+ // Add the informations about the Replicas currently in
+ // the topology.
+ Iterator<Short> it = md.ldapIterator();
+ while (it.hasNext())
+ {
+ short replicaId = it.next();
+ returnMsg.setServerState(
+ replicaId, md.getLDAPServerState(replicaId),
+ md.getApproxFirstMissingDate(replicaId), true);
+ }
+
+ // Add the informations about the Replication Servers
+ // currently in the topology.
+ it = md.rsIterator();
+ while (it.hasNext())
+ {
+ short replicaId = it.next();
+ returnMsg.setServerState(
+ replicaId, md.getRSStates(replicaId),
+ md.getRSApproxFirstMissingDate(replicaId), false);
+ }
+ }
+ catch (DirectoryException e)
+ {
+ // If we can't compute the Monitor Information, send
+ // back an empty message.
+ }
+ try
+ {
+ senderHandler.send(returnMsg);
+ } catch (IOException e)
+ {
+ // the connection was closed.
+ }
+ return;
+ }
+
MonitorRequestMsg replServerMonitorRequestMsg =
(MonitorRequestMsg) msg;
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 4cb0350..4d0a28e 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -64,6 +64,7 @@
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -89,6 +90,8 @@
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -287,7 +290,7 @@
// Indicates the date when the status changed. This may be used to indicate
// the date the session with the current replication server started (when
// status is NORMAL for instance). All the above assured monitoring fields
- // are also resetted each time the status is changed
+ // are also reset each time the status is changed
private Date lastStatusChangeDate = new Date();
/**
@@ -302,6 +305,20 @@
private final ChangeNumberGenerator generator;
/**
+ * This object is used as a conditional event to be notified about
+ * the reception of monitor information from the Replication Server.
+ */
+ private Object monitorResponse = new Object();
+
+
+ /**
+ * A Map containing of the ServerStates of all the replicas in the topology
+ * as seen by the ReplicationServer the last time it was polled.
+ */
+ private Map<Short, ServerState> replicaStates =
+ new HashMap<Short, ServerState>();
+
+ /**
* Returns the {@link ChangeNumberGenerator} that will be used to
* generate {@link ChangeNumber} for this domain.
*
@@ -549,6 +566,35 @@
}
/**
+ * Gets the States of all the Replicas currently in the
+ * Topology.
+ * When this method is called, a Monitoring message will be sent
+ * to the Replication to which this domain is currently connected
+ * so that it computes a table containing information about
+ * all Directory Servers in the topology.
+ * This Computation involves communications will all the servers
+ * currently connected and
+ *
+ * @return The States of all Replicas in the topology (except us)
+ */
+ public Map<Short, ServerState> getReplicaStates()
+ {
+ // publish Monitor Request Message to the Replication Server
+ broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
+
+ // wait for Response up to 10 seconds.
+ try
+ {
+ synchronized (monitorResponse)
+ {
+ monitorResponse.wait(10000);
+ }
+ } catch (InterruptedException e)
+ {}
+ return replicaStates;
+ }
+
+ /**
* Gets the info for RSs in the topology (except the one we are connected
* to).
* @return The info for RSs in the topology (except the one we are connected
@@ -776,6 +822,25 @@
update = (UpdateMsg) msg;
generator.adjust(update.getChangeNumber());
}
+ else if (msg instanceof MonitorMsg)
+ {
+ // This is the response to a MonitorRequest that was sent earlier
+ // build the replicaStates Map.
+ replicaStates = new HashMap<Short, ServerState>();
+ MonitorMsg monitorMsg = (MonitorMsg) msg;
+ Iterator<Short> it = monitorMsg.ldapIterator();
+ while (it.hasNext())
+ {
+ short serverId = it.next();
+ replicaStates.put(
+ serverId, monitorMsg.getLDAPServerState(serverId));
+ }
+ // Notify the sender that the response was received.
+ synchronized (monitorResponse)
+ {
+ monitorResponse.notify();
+ }
+ }
}
catch (SocketTimeoutException e)
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 65d6784..c2859b4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -29,6 +29,7 @@
import static org.testng.Assert.*;
import java.util.List;
+import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,6 +42,7 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
@@ -66,6 +68,8 @@
int replServerID2 = 20;
FakeReplicationDomain domain1 = null;
FakeReplicationDomain domain2 = null;
+ short domain1ServerId = 1;
+ short domain2ServerId = 2;
try
{
@@ -101,11 +105,11 @@
BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
domain1 = new FakeReplicationDomain(
- testService, (short) 1, servers, 100, 1000, rcvQueue1);
+ testService, (short) domain1ServerId, servers, 100, 1000, rcvQueue1);
BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>();
domain2 = new FakeReplicationDomain(
- testService, (short) 2, servers, 100, 1000, rcvQueue2);
+ testService, (short) domain2ServerId, servers, 100, 1000, rcvQueue2);
/*
* Publish a message from domain1,
@@ -147,25 +151,34 @@
for (DSInfo serverInfo : domain1.getReplicasList())
{
- if (serverInfo.getDsId() == 2)
+ if (serverInfo.getDsId() == domain2ServerId)
assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS);
else
{
- assertTrue(serverInfo.getDsId() == 1);
+ assertTrue(serverInfo.getDsId() == domain1ServerId);
assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS);
}
}
for (DSInfo serverInfo : domain2.getReplicasList())
{
- if (serverInfo.getDsId() == 2)
+ if (serverInfo.getDsId() == domain2ServerId)
assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS);
else
{
- assertTrue(serverInfo.getDsId() == 1);
+ assertTrue(serverInfo.getDsId() == domain1ServerId);
assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS);
}
}
+
+ Map<Short, ServerState> states1 = domain1.getReplicaStates();
+ ServerState state2 = states1.get(domain2ServerId);
+ assertNotNull(state2, "getReplicaStates is not showing DS2");
+
+ Map<Short, ServerState> states2 = domain1.getReplicaStates();
+ ServerState state1 = states2.get(domain1ServerId);
+ assertNotNull(state1, "getReplicaStates is not showing DS1");
+
}
finally
{
--
Gitblit v1.10.0