From c3420bec486f1921ea67fab4b1019ef06a0cea16 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 23 Mar 2009 09:04:22 +0000
Subject: [PATCH] Fix for 3889 : Replication domain don't have access to the Replica ServerStates

---
 opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java                                   |    3 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java |   25 ++++-
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                        |   52 +++++++++++++
 opends/src/server/org/opends/server/replication/server/MonitorData.java                                    |   50 ++++++++++++
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                             |   67 ++++++++++++++++
 5 files changed, 186 insertions(+), 11 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 1b61c9b..a1dd7b0 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -186,7 +186,6 @@
     if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
       throw new DataFormatException("input is not a valid " +
           this.getClass().getCanonicalName());
-    int pos = 1;
 
     // sender
     this.senderID = reader.getShort();
diff --git a/opends/src/server/org/opends/server/replication/server/MonitorData.java b/opends/src/server/org/opends/server/replication/server/MonitorData.java
index 25b9992..81a1473 100644
--- a/opends/src/server/org/opends/server/replication/server/MonitorData.java
+++ b/opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -193,7 +193,7 @@
     }
 
     // Computes the missing changes counters for RS :
-    // Sum the difference of seqnuence numbers for each element in the States.
+    // Sum the difference of sequence numbers for each element in the States.
 
     for (short lsiSid : RSStates.keySet())
     {
@@ -390,4 +390,50 @@
     }
   }
 
+  /**
+   * Returns an iterator on the serverId of the Replicas for which
+   * we have monitoring data.
+   *
+   * @return The iterator.
+   */
+  public Iterator<Short> ldapIterator()
+  {
+    return LDAPStates.keySet().iterator();
+  }
+
+  /**
+   * Returns an iterator on the serverId of the Replication Servers for which
+   * we have monitoring data.
+   *
+   * @return The iterator.
+   */
+  public Iterator<Short> rsIterator()
+  {
+    return RSStates.keySet().iterator();
+  }
+
+  /**
+   * Get the state of the RS server with the provided serverId.
+   *
+   * @param serverId The server ID.
+   * @return The server state.
+   */
+  public ServerState getRSStates(short serverId)
+  {
+    return RSStates.get(serverId);
+  }
+
+  /**
+   * Get an approximation of the date of the first missing update.
+   *
+   * @param serverId The server ID.
+   * @return The date.
+   */
+  public long getRSApproxFirstMissingDate(short serverId)
+  {
+    Long res;
+    if ((res = fmRSDate.get(serverId)) != null)
+      return res;
+    return 0;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index d7a71f5..a9ade5f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1427,6 +1427,58 @@
           errorMsg.getDetails()));
       } else if (msg instanceof MonitorRequestMsg)
       {
+        // If the request comes from a Directory Server we need to
+        // build the full list of all servers in the topology
+        // and send back a MonitorMsg with the full list of all the servers
+        // in the topology.
+        if (senderHandler.isLDAPserver())
+        {
+          MonitorMsg returnMsg =
+            new MonitorMsg(msg.getDestination(), msg.getsenderID());
+          try
+          {
+            returnMsg.setReplServerDbState(getDbServerState());
+            // Update the information we have about all servers
+            // in the topology.
+            MonitorData md = computeMonitorData();
+
+            // Add the informations about the Replicas currently in
+            // the topology.
+            Iterator<Short> it = md.ldapIterator();
+            while (it.hasNext())
+            {
+              short replicaId = it.next();
+              returnMsg.setServerState(
+                  replicaId, md.getLDAPServerState(replicaId),
+                  md.getApproxFirstMissingDate(replicaId), true);
+            }
+
+            // Add the informations about the Replication Servers
+            // currently in the topology.
+            it = md.rsIterator();
+            while (it.hasNext())
+            {
+              short replicaId = it.next();
+              returnMsg.setServerState(
+                  replicaId, md.getRSStates(replicaId),
+                  md.getRSApproxFirstMissingDate(replicaId), false);
+            }
+          }
+          catch (DirectoryException e)
+          {
+            // If we can't compute the Monitor Information, send
+            // back an empty message.
+          }
+          try
+          {
+            senderHandler.send(returnMsg);
+          } catch (IOException e)
+          {
+            // the connection was closed.
+          }
+          return;
+        }
+
         MonitorRequestMsg replServerMonitorRequestMsg =
           (MonitorRequestMsg) msg;
 
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 4cb0350..4d0a28e 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -64,6 +64,7 @@
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -89,6 +90,8 @@
 import org.opends.server.replication.protocol.HeartbeatMsg;
 import org.opends.server.replication.protocol.InitializeRequestMsg;
 import org.opends.server.replication.protocol.InitializeTargetMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -287,7 +290,7 @@
   // Indicates the date when the status changed. This may be used to indicate
   // the date the session with the current replication server started (when
   // status is NORMAL for instance). All the above assured monitoring fields
-  // are also resetted each time the status is changed
+  // are also reset each time the status is changed
   private Date lastStatusChangeDate = new Date();
 
   /**
@@ -302,6 +305,20 @@
   private final ChangeNumberGenerator generator;
 
   /**
+   * This object is used as a conditional event to be notified about
+   * the reception of monitor information from the Replication Server.
+   */
+  private Object monitorResponse = new Object();
+
+
+  /**
+   * A Map containing of the ServerStates of all the replicas in the topology
+   * as seen by the ReplicationServer the last time it was polled.
+   */
+  private Map<Short, ServerState> replicaStates =
+    new HashMap<Short, ServerState>();
+
+  /**
    * Returns the {@link ChangeNumberGenerator} that will be used to
    * generate {@link ChangeNumber} for this domain.
    *
@@ -549,6 +566,35 @@
   }
 
   /**
+   * Gets the States of all the Replicas currently in the
+   * Topology.
+   * When this method is called, a Monitoring message will be sent
+   * to the Replication to which this domain is currently connected
+   * so that it computes a table containing information about
+   * all Directory Servers in the topology.
+   * This Computation involves communications will all the servers
+   * currently connected and
+   *
+   * @return The States of all Replicas in the topology (except us)
+   */
+  public Map<Short, ServerState> getReplicaStates()
+  {
+    // publish Monitor Request Message to the Replication Server
+    broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
+
+    // wait for Response up to 10 seconds.
+    try
+    {
+      synchronized (monitorResponse)
+      {
+        monitorResponse.wait(10000);
+      }
+    } catch (InterruptedException e)
+    {}
+    return replicaStates;
+  }
+
+  /**
    * Gets the info for RSs in the topology (except the one we are connected
    * to).
    * @return The info for RSs in the topology (except the one we are connected
@@ -776,6 +822,25 @@
           update = (UpdateMsg) msg;
           generator.adjust(update.getChangeNumber());
         }
+        else if (msg instanceof MonitorMsg)
+        {
+          // This is the response to a MonitorRequest that was sent earlier
+          // build the replicaStates Map.
+          replicaStates = new HashMap<Short, ServerState>();
+          MonitorMsg monitorMsg = (MonitorMsg) msg;
+          Iterator<Short> it = monitorMsg.ldapIterator();
+          while (it.hasNext())
+          {
+            short serverId = it.next();
+            replicaStates.put(
+                serverId, monitorMsg.getLDAPServerState(serverId));
+          }
+          // Notify the sender that the response was received.
+          synchronized (monitorResponse)
+          {
+            monitorResponse.notify();
+          }
+        }
       }
       catch (SocketTimeoutException e)
       {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 65d6784..c2859b4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -29,6 +29,7 @@
 import static org.testng.Assert.*;
 
 import java.util.List;
+import java.util.Map;
 import java.util.TreeSet;
 
 import java.util.concurrent.LinkedBlockingQueue;
@@ -41,6 +42,7 @@
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.DSInfo;
 import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplServerFakeConfiguration;
@@ -66,6 +68,8 @@
     int replServerID2 = 20;
     FakeReplicationDomain domain1 = null;
     FakeReplicationDomain domain2 = null;
+    short domain1ServerId = 1;
+    short domain2ServerId = 2;
 
     try
     {
@@ -101,11 +105,11 @@
 
       BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
       domain1 = new FakeReplicationDomain(
-          testService, (short) 1, servers, 100, 1000, rcvQueue1);
+          testService, (short) domain1ServerId, servers, 100, 1000, rcvQueue1);
 
       BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>();
       domain2 = new FakeReplicationDomain(
-          testService, (short) 2, servers, 100, 1000, rcvQueue2);
+          testService, (short) domain2ServerId, servers, 100, 1000, rcvQueue2);
 
       /*
        * Publish a message from domain1,
@@ -147,25 +151,34 @@
 
       for (DSInfo serverInfo : domain1.getReplicasList())
       {
-        if (serverInfo.getDsId() == 2)
+        if (serverInfo.getDsId() == domain2ServerId)
           assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS);
         else
         {
-          assertTrue(serverInfo.getDsId() == 1);
+          assertTrue(serverInfo.getDsId() == domain1ServerId);
           assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS);
         }
       }
 
       for (DSInfo serverInfo : domain2.getReplicasList())
       {
-        if (serverInfo.getDsId() == 2)
+        if (serverInfo.getDsId() == domain2ServerId)
           assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS);
         else
         {
-          assertTrue(serverInfo.getDsId() == 1);
+          assertTrue(serverInfo.getDsId() == domain1ServerId);
           assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS);
         }
       }
+
+      Map<Short, ServerState> states1 = domain1.getReplicaStates();
+      ServerState state2 = states1.get(domain2ServerId);
+      assertNotNull(state2, "getReplicaStates is not showing DS2");
+
+      Map<Short, ServerState> states2 = domain1.getReplicaStates();
+      ServerState state1 = states2.get(domain1ServerId);
+      assertNotNull(state1, "getReplicaStates is not showing DS1");
+
     }
     finally
     {

--
Gitblit v1.10.0