opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -186,7 +186,6 @@ if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR) throw new DataFormatException("input is not a valid " + this.getClass().getCanonicalName()); int pos = 1; // sender this.senderID = reader.getShort(); opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.server; @@ -193,7 +193,7 @@ } // Computes the missing changes counters for RS : // Sum the difference of seqnuence numbers for each element in the States. // Sum the difference of sequence numbers for each element in the States. for (short lsiSid : RSStates.keySet()) { @@ -390,4 +390,50 @@ } } /** * Returns an iterator on the serverId of the Replicas for which * we have monitoring data. * * @return The iterator. */ public Iterator<Short> ldapIterator() { return LDAPStates.keySet().iterator(); } /** * Returns an iterator on the serverId of the Replication Servers for which * we have monitoring data. * * @return The iterator. */ public Iterator<Short> rsIterator() { return RSStates.keySet().iterator(); } /** * Get the state of the RS server with the provided serverId. * * @param serverId The server ID. * @return The server state. */ public ServerState getRSStates(short serverId) { return RSStates.get(serverId); } /** * Get an approximation of the date of the first missing update. * * @param serverId The server ID. * @return The date. */ public long getRSApproxFirstMissingDate(short serverId) { Long res; if ((res = fmRSDate.get(serverId)) != null) return res; return 0; } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1427,6 +1427,58 @@ errorMsg.getDetails())); } else if (msg instanceof MonitorRequestMsg) { // If the request comes from a Directory Server we need to // build the full list of all servers in the topology // and send back a MonitorMsg with the full list of all the servers // in the topology. if (senderHandler.isLDAPserver()) { MonitorMsg returnMsg = new MonitorMsg(msg.getDestination(), msg.getsenderID()); try { returnMsg.setReplServerDbState(getDbServerState()); // Update the information we have about all servers // in the topology. MonitorData md = computeMonitorData(); // Add the informations about the Replicas currently in // the topology. Iterator<Short> it = md.ldapIterator(); while (it.hasNext()) { short replicaId = it.next(); returnMsg.setServerState( replicaId, md.getLDAPServerState(replicaId), md.getApproxFirstMissingDate(replicaId), true); } // Add the informations about the Replication Servers // currently in the topology. it = md.rsIterator(); while (it.hasNext()) { short replicaId = it.next(); returnMsg.setServerState( replicaId, md.getRSStates(replicaId), md.getRSApproxFirstMissingDate(replicaId), false); } } catch (DirectoryException e) { // If we can't compute the Monitor Information, send // back an empty message. } try { senderHandler.send(returnMsg); } catch (IOException e) { // the connection was closed. } return; } MonitorRequestMsg replServerMonitorRequestMsg = (MonitorRequestMsg) msg; opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -64,6 +64,7 @@ import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; @@ -89,6 +90,8 @@ import org.opends.server.replication.protocol.HeartbeatMsg; import org.opends.server.replication.protocol.InitializeRequestMsg; import org.opends.server.replication.protocol.InitializeTargetMsg; import org.opends.server.replication.protocol.MonitorMsg; import org.opends.server.replication.protocol.MonitorRequestMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplicationMsg; @@ -287,7 +290,7 @@ // Indicates the date when the status changed. This may be used to indicate // the date the session with the current replication server started (when // status is NORMAL for instance). All the above assured monitoring fields // are also resetted each time the status is changed // are also reset each time the status is changed private Date lastStatusChangeDate = new Date(); /** @@ -302,6 +305,20 @@ private final ChangeNumberGenerator generator; /** * This object is used as a conditional event to be notified about * the reception of monitor information from the Replication Server. */ private Object monitorResponse = new Object(); /** * A Map containing of the ServerStates of all the replicas in the topology * as seen by the ReplicationServer the last time it was polled. */ private Map<Short, ServerState> replicaStates = new HashMap<Short, ServerState>(); /** * Returns the {@link ChangeNumberGenerator} that will be used to * generate {@link ChangeNumber} for this domain. * @@ -549,6 +566,35 @@ } /** * Gets the States of all the Replicas currently in the * Topology. * When this method is called, a Monitoring message will be sent * to the Replication to which this domain is currently connected * so that it computes a table containing information about * all Directory Servers in the topology. * This Computation involves communications will all the servers * currently connected and * * @return The States of all Replicas in the topology (except us) */ public Map<Short, ServerState> getReplicaStates() { // publish Monitor Request Message to the Replication Server broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId())); // wait for Response up to 10 seconds. try { synchronized (monitorResponse) { monitorResponse.wait(10000); } } catch (InterruptedException e) {} return replicaStates; } /** * Gets the info for RSs in the topology (except the one we are connected * to). * @return The info for RSs in the topology (except the one we are connected @@ -776,6 +822,25 @@ update = (UpdateMsg) msg; generator.adjust(update.getChangeNumber()); } else if (msg instanceof MonitorMsg) { // This is the response to a MonitorRequest that was sent earlier // build the replicaStates Map. replicaStates = new HashMap<Short, ServerState>(); MonitorMsg monitorMsg = (MonitorMsg) msg; Iterator<Short> it = monitorMsg.ldapIterator(); while (it.hasNext()) { short serverId = it.next(); replicaStates.put( serverId, monitorMsg.getLDAPServerState(serverId)); } // Notify the sender that the response was received. synchronized (monitorResponse) { monitorResponse.notify(); } } } catch (SocketTimeoutException e) { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.*; import java.util.List; import java.util.Map; import java.util.TreeSet; import java.util.concurrent.LinkedBlockingQueue; @@ -41,6 +42,7 @@ import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.RSInfo; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; @@ -66,6 +68,8 @@ int replServerID2 = 20; FakeReplicationDomain domain1 = null; FakeReplicationDomain domain2 = null; short domain1ServerId = 1; short domain2ServerId = 2; try { @@ -101,11 +105,11 @@ BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>(); domain1 = new FakeReplicationDomain( testService, (short) 1, servers, 100, 1000, rcvQueue1); testService, (short) domain1ServerId, servers, 100, 1000, rcvQueue1); BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>(); domain2 = new FakeReplicationDomain( testService, (short) 2, servers, 100, 1000, rcvQueue2); testService, (short) domain2ServerId, servers, 100, 1000, rcvQueue2); /* * Publish a message from domain1, @@ -147,25 +151,34 @@ for (DSInfo serverInfo : domain1.getReplicasList()) { if (serverInfo.getDsId() == 2) if (serverInfo.getDsId() == domain2ServerId) assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS); else { assertTrue(serverInfo.getDsId() == 1); assertTrue(serverInfo.getDsId() == domain1ServerId); assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS); } } for (DSInfo serverInfo : domain2.getReplicasList()) { if (serverInfo.getDsId() == 2) if (serverInfo.getDsId() == domain2ServerId) assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS); else { assertTrue(serverInfo.getDsId() == 1); assertTrue(serverInfo.getDsId() == domain1ServerId); assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS); } } Map<Short, ServerState> states1 = domain1.getReplicaStates(); ServerState state2 = states1.get(domain2ServerId); assertNotNull(state2, "getReplicaStates is not showing DS2"); Map<Short, ServerState> states2 = domain1.getReplicaStates(); ServerState state1 = states2.get(domain1ServerId); assertNotNull(state1, "getReplicaStates is not showing DS1"); } finally {