mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
23.04.2009 c3420bec486f1921ea67fab4b1019ef06a0cea16
Fix for 3889 : Replication domain don't have access to the Replica ServerStates

This change adds a getReplicaStates() method to the ReplicationDomain
This allows Replicas to know about the State of the other Replicas in the topology.
5 files modified
197 ■■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MonitorData.java 50 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 67 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 25 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -186,7 +186,6 @@
    if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
      throw new DataFormatException("input is not a valid " +
          this.getClass().getCanonicalName());
    int pos = 1;
    // sender
    this.senderID = reader.getShort();
opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -193,7 +193,7 @@
    }
    // Computes the missing changes counters for RS :
    // Sum the difference of seqnuence numbers for each element in the States.
    // Sum the difference of sequence numbers for each element in the States.
    for (short lsiSid : RSStates.keySet())
    {
@@ -390,4 +390,50 @@
    }
  }
  /**
   * Returns an iterator on the serverId of the Replicas for which
   * we have monitoring data.
   *
   * @return The iterator.
   */
  public Iterator<Short> ldapIterator()
  {
    return LDAPStates.keySet().iterator();
  }
  /**
   * Returns an iterator on the serverId of the Replication Servers for which
   * we have monitoring data.
   *
   * @return The iterator.
   */
  public Iterator<Short> rsIterator()
  {
    return RSStates.keySet().iterator();
  }
  /**
   * Get the state of the RS server with the provided serverId.
   *
   * @param serverId The server ID.
   * @return The server state.
   */
  public ServerState getRSStates(short serverId)
  {
    return RSStates.get(serverId);
  }
  /**
   * Get an approximation of the date of the first missing update.
   *
   * @param serverId The server ID.
   * @return The date.
   */
  public long getRSApproxFirstMissingDate(short serverId)
  {
    Long res;
    if ((res = fmRSDate.get(serverId)) != null)
      return res;
    return 0;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1427,6 +1427,58 @@
          errorMsg.getDetails()));
      } else if (msg instanceof MonitorRequestMsg)
      {
        // If the request comes from a Directory Server we need to
        // build the full list of all servers in the topology
        // and send back a MonitorMsg with the full list of all the servers
        // in the topology.
        if (senderHandler.isLDAPserver())
        {
          MonitorMsg returnMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
          try
          {
            returnMsg.setReplServerDbState(getDbServerState());
            // Update the information we have about all servers
            // in the topology.
            MonitorData md = computeMonitorData();
            // Add the informations about the Replicas currently in
            // the topology.
            Iterator<Short> it = md.ldapIterator();
            while (it.hasNext())
            {
              short replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getLDAPServerState(replicaId),
                  md.getApproxFirstMissingDate(replicaId), true);
            }
            // Add the informations about the Replication Servers
            // currently in the topology.
            it = md.rsIterator();
            while (it.hasNext())
            {
              short replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getRSStates(replicaId),
                  md.getRSApproxFirstMissingDate(replicaId), false);
            }
          }
          catch (DirectoryException e)
          {
            // If we can't compute the Monitor Information, send
            // back an empty message.
          }
          try
          {
            senderHandler.send(returnMsg);
          } catch (IOException e)
          {
            // the connection was closed.
          }
          return;
        }
        MonitorRequestMsg replServerMonitorRequestMsg =
          (MonitorRequestMsg) msg;
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -64,6 +64,7 @@
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -89,6 +90,8 @@
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -287,7 +290,7 @@
  // Indicates the date when the status changed. This may be used to indicate
  // the date the session with the current replication server started (when
  // status is NORMAL for instance). All the above assured monitoring fields
  // are also resetted each time the status is changed
  // are also reset each time the status is changed
  private Date lastStatusChangeDate = new Date();
  /**
@@ -302,6 +305,20 @@
  private final ChangeNumberGenerator generator;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private Object monitorResponse = new Object();
  /**
   * A Map containing of the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled.
   */
  private Map<Short, ServerState> replicaStates =
    new HashMap<Short, ServerState>();
  /**
   * Returns the {@link ChangeNumberGenerator} that will be used to
   * generate {@link ChangeNumber} for this domain.
   *
@@ -549,6 +566,35 @@
  }
  /**
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
   * to the Replication to which this domain is currently connected
   * so that it computes a table containing information about
   * all Directory Servers in the topology.
   * This Computation involves communications will all the servers
   * currently connected and
   *
   * @return The States of all Replicas in the topology (except us)
   */
  public Map<Short, ServerState> getReplicaStates()
  {
    // publish Monitor Request Message to the Replication Server
    broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
    // wait for Response up to 10 seconds.
    try
    {
      synchronized (monitorResponse)
      {
        monitorResponse.wait(10000);
      }
    } catch (InterruptedException e)
    {}
    return replicaStates;
  }
  /**
   * Gets the info for RSs in the topology (except the one we are connected
   * to).
   * @return The info for RSs in the topology (except the one we are connected
@@ -776,6 +822,25 @@
          update = (UpdateMsg) msg;
          generator.adjust(update.getChangeNumber());
        }
        else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier
          // build the replicaStates Map.
          replicaStates = new HashMap<Short, ServerState>();
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          Iterator<Short> it = monitorMsg.ldapIterator();
          while (it.hasNext())
          {
            short serverId = it.next();
            replicaStates.put(
                serverId, monitorMsg.getLDAPServerState(serverId));
          }
          // Notify the sender that the response was received.
          synchronized (monitorResponse)
          {
            monitorResponse.notify();
          }
        }
      }
      catch (SocketTimeoutException e)
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -29,6 +29,7 @@
import static org.testng.Assert.*;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,6 +42,7 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
@@ -66,6 +68,8 @@
    int replServerID2 = 20;
    FakeReplicationDomain domain1 = null;
    FakeReplicationDomain domain2 = null;
    short domain1ServerId = 1;
    short domain2ServerId = 2;
    try
    {
@@ -101,11 +105,11 @@
      BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<UpdateMsg>();
      domain1 = new FakeReplicationDomain(
          testService, (short) 1, servers, 100, 1000, rcvQueue1);
          testService, (short) domain1ServerId, servers, 100, 1000, rcvQueue1);
      BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<UpdateMsg>();
      domain2 = new FakeReplicationDomain(
          testService, (short) 2, servers, 100, 1000, rcvQueue2);
          testService, (short) domain2ServerId, servers, 100, 1000, rcvQueue2);
      /*
       * Publish a message from domain1,
@@ -147,25 +151,34 @@
      for (DSInfo serverInfo : domain1.getReplicasList())
      {
        if (serverInfo.getDsId() == 2)
        if (serverInfo.getDsId() == domain2ServerId)
          assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS);
        else
        {
          assertTrue(serverInfo.getDsId() == 1);
          assertTrue(serverInfo.getDsId() == domain1ServerId);
          assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS);
        }
      }
      for (DSInfo serverInfo : domain2.getReplicasList())
      {
        if (serverInfo.getDsId() == 2)
        if (serverInfo.getDsId() == domain2ServerId)
          assertTrue(serverInfo.getStatus() == ServerStatus.BAD_GEN_ID_STATUS);
        else
        {
          assertTrue(serverInfo.getDsId() == 1);
          assertTrue(serverInfo.getDsId() == domain1ServerId);
          assertTrue(serverInfo.getStatus() == ServerStatus.NORMAL_STATUS);
        }
      }
      Map<Short, ServerState> states1 = domain1.getReplicaStates();
      ServerState state2 = states1.get(domain2ServerId);
      assertNotNull(state2, "getReplicaStates is not showing DS2");
      Map<Short, ServerState> states2 = domain1.getReplicaStates();
      ServerState state1 = states2.get(domain1ServerId);
      assertNotNull(state1, "getReplicaStates is not showing DS1");
    }
    finally
    {