mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
06.11.2009 3a9e211d36ee94ff99941943b3b51e0f768624f5
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -55,11 +55,14 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
@@ -116,6 +119,28 @@
  private ReplicationDomain domain = null;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  /**
   * A Map containing the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled or the last
   * time it published monitoring information.
   */
  private HashMap<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  /**
   * A Map containing the ServerStates of all the replication servers in the
   * topology as seen by the ReplicationServer the last time it was polled or
   * the last time it published monitoring information.
   */
  private HashMap<Integer, ServerState> rsStates =
    new HashMap<Integer, ServerState>();
  /**
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
   */
@@ -1918,6 +1943,37 @@
          // Try to find a suitable RS
          this.reStart(failingSession);
        }
        else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier or
          // the regular message of the monitoring publisher of the RS.
          // Extract and store replicas ServerStates
          replicaStates = new HashMap<Integer, ServerState>();
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          Iterator<Integer> it = monitorMsg.ldapIterator();
          while (it.hasNext())
          {
            int srvId = it.next();
            replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
          }
          // Notify the sender that the response was received.
          synchronized (monitorResponse)
          {
            monitorResponse.set(true);
            monitorResponse.notify();
          }
          // Extract and store replication servers ServerStates
          rsStates = new HashMap<Integer, ServerState>();
          it = monitorMsg.rsIterator();
          while (it.hasNext())
          {
            int srvId = it.next();
            rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
          }
        }
        else
        {
          return msg;
@@ -1949,6 +2005,40 @@
  }
  /**
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
   * to the Replication Server to which this domain is currently connected
   * so that it computes a table containing information about
   * all Directory Servers in the topology.
   * This Computation involves communications will all the servers
   * currently connected and
   *
   * @return The States of all Replicas in the topology (except us)
   */
  public Map<Integer, ServerState> getReplicaStates()
  {
    monitorResponse.set(false);
    // publish Monitor Request Message to the Replication Server
    publish(new MonitorRequestMsg(serverId, getRsServerId()));
    // wait for Response up to 10 seconds.
    try
    {
      synchronized (monitorResponse)
      {
        if (monitorResponse.get() == false)
        {
          monitorResponse.wait(10000);
        }
      }
    } catch (InterruptedException e)
    {}
    return replicaStates;
  }
  /**
   * This method allows to do the necessary computing for the window
   * management after treatment by the worker threads.
   *
@@ -2440,7 +2530,7 @@
    {
      ctHeartbeatPublisherThread =
        new CTHeartbeatPublisherThread(
            "Replication CN Heartbeat Thread started for " +
            "Replication CN Heartbeat sender for " +
            baseDn + " with " + getReplicationServer(),
            session, changeTimeHeartbeatSendInterval, serverId);
      ctHeartbeatPublisherThread.start();