mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
06.11.2009 b1ab1b61b2a5cb4a09cd2e727e05f42feb8ca669
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -49,8 +49,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -101,6 +99,7 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
import java.util.Collections;
/**
 * ReplicationServer Listener.
@@ -173,6 +172,10 @@
  // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
  private int degradedStatusThreshold = 5000;
  // Number of milliseconds to wait before sending new monitoring messages.
  // If value is 0, monitoring publisher is disabled
  private long monitoringPublisherPeriod = 3000;
  // The handler of the draft change numbers database, the database used to
  // store the relation between a draft change number ('seqnum') and the
  // associated cookie.
@@ -211,6 +214,13 @@
  private int weight = 1;
  /**
   * Holds the list of all replication servers instantiated in this VM.
   * This allows to perform clean up of the RS databases in unit tests.
   */
  private static List<ReplicationServer> allInstances =
    new ArrayList<ReplicationServer>();
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this replication server.
@@ -254,6 +264,7 @@
    groupId = (byte)configuration.getGroupId();
    assuredTimeout = configuration.getAssuredTimeout();
    degradedStatusThreshold = configuration.getDegradedStatusThreshold();
    monitoringPublisherPeriod = configuration.getMonitoringPeriod();
    replSessionSecurity = new ReplSessionSecurity();
    initialize(replicationPort);
@@ -274,8 +285,20 @@
    DirectoryServer.registerImportTaskListener(this);
    localPorts.add(replicationPort);
    // Keep track of this new instance
    allInstances.add(this);
  }
  /**
   * Get the list of every replication servers instantiated in the current VM.
   * @return The list of every replication servers instantiated in the current
   * VM.
   */
  public static List<ReplicationServer> getAllInstances()
  {
    return allInstances;
  }
  /**
   * The run method for the Listen thread.
@@ -850,7 +873,9 @@
      dbEnv.shutdown();
    }
}
    // Remove this instance from the global instance list
    allInstances.remove(this);
  }
  /**
@@ -1028,6 +1053,32 @@
      }
    }
    // Update period value for monitoring publishers (stop them if requested
    // value is 0)
    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
    {
      long oldMonitoringPeriod = monitoringPublisherPeriod;
      monitoringPublisherPeriod = configuration.getMonitoringPeriod();
      for(ReplicationServerDomain rsd : baseDNs.values())
      {
        if (monitoringPublisherPeriod == 0L)
        {
          // Requested to stop monitoring publishers
          rsd.stopMonitoringPublisher();
        } else if (rsd.isRunningMonitoringPublisher())
        {
          // Update the threshold value for this running monitoring publisher
          rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
        } else if (oldMonitoringPeriod == 0L)
        {
          // Requested to start monitoring publishers with provided period value
          if ( (rsd.getConnectedDSs().size() > 0) ||
            (rsd.getConnectedRSs().size() > 0) )
            rsd.startMonitoringPublisher();
        }
      }
    }
    // Changed the group id ?
    byte newGroupId = (byte)configuration.getGroupId();
    if (newGroupId != groupId)
@@ -1044,7 +1095,10 @@
    if (weight != configuration.getWeight())
    {
      weight = configuration.getWeight();
      // TODO: send new TopologyMsg
      // Broadcast the new weight the the whole topology. This will make some
      // DSs reconnect (if needed) to other RSs according to the new weight of
      // this RS.
      broadcastConfigChange();
    }
    if ((configuration.getReplicationDBDirectory() != null) &&
@@ -1057,6 +1111,19 @@
  }
  /**
   * Broadcast a configuration change that just happened to the whole topology
   * by sending a TopologyMsg to every entity in the topology.
   */
  private void broadcastConfigChange()
  {
    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
    {
      replicationServerDomain.buildAndSendTopoInfoToDSs(null);
      replicationServerDomain.buildAndSendTopoInfoToRSs();
    }
  }
  /**
   * {@inheritDoc}
   */
  public boolean isConfigurationChangeAcceptable(
@@ -1345,6 +1412,15 @@
  }
  /**
   * Get the monitoring publisher period value.
   * @return the monitoring publisher period value.
   */
  public long getMonitoringPublisherPeriod()
  {
    return monitoringPublisherPeriod;
  }
  /**
   * Compute the list of replication servers that are not any
   * more connected to this Replication Server and stop the
   * corresponding handlers.
@@ -1411,12 +1487,80 @@
  /* The date of the last time they have been elaborated */
  private long monitorDataLastBuildDate = 0;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
  /**
   * This uniquely identifies a server (handler) in the cross-domain topology.
   * Represents an identifier of a handler (in the whole RS) we have to wait a
   * monitoring message from before answering to a monitor request.
   */
  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
  public static class GlobalServerId {
    private int serverId = -1;
    private String baseDn = null;
    /**
     * Constructor for a global server id.
     * @param baseDn The dn of the RSD owning the handler.
     * @param serverId The handler id in the matching RSD.
     */
    public GlobalServerId(String baseDn, int serverId) {
      this.baseDn = baseDn;
      this.serverId = serverId;
    }
    /**
     * Get the server handler id.
     * @return the serverId
     */
    public int getServerId()
    {
      return serverId;
    }
    /**
     * Get the base dn.
     * @return the baseDn
     */
    public String getBaseDn()
    {
      return baseDn;
    }
    /**
     * Get the hascode.
     * @return The hashcode.
     */
    @Override
    public int hashCode()
    {
      int hash = 7;
      hash = 43 * hash + this.serverId;
      hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
      return hash;
    }
    /**
     * Tests if the passed global server handler id represents the same server
     * handler as this one.
     * @param obj The object to test.
     * @return True if both identifiers are the same.
     */
    public boolean equals(Object obj) {
      if ( (obj == null) || (obj instanceof GlobalServerId))
        return false;
      GlobalServerId globalServerId = (GlobalServerId)obj;
      return ( globalServerId.baseDn.equals(baseDn) &&
        (globalServerId.serverId == serverId) );
    }
  }
  /**
   * This gives the list of server handlers we are willing to wait monitoring
   * message from. Each time a monitoring message is received by a server
   * handler, the matching server handler id is retired from the list. When the
   * list is empty, we received all expected monitoring messages.
   */
  private List<GlobalServerId> expectedMonitoringMsg = null;
  /**
   * Trigger the computation of the Global Monitoring Data.
@@ -1429,7 +1573,7 @@
   *
   * @throws DirectoryException If the computation cannot be achieved.
   */
  public void computeMonitorData() throws DirectoryException
  public synchronized void computeMonitorData() throws DirectoryException
  {
    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
    {
@@ -1440,15 +1584,17 @@
      return;
    }
    remoteMonitorResponsesSemaphore.drainPermits();
    int count = 0;
    // Initialize the list of server handlers we expect monitoring messages from
    expectedMonitoringMsg =
      Collections.synchronizedList(new ArrayList<GlobalServerId>());
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      count += domain.initializeMonitorData();
      domain.initializeMonitorData(expectedMonitoringMsg);
    }
    // Wait for responses
    waitMonitorDataResponses(count);
    waitMonitorDataResponses();
    for (ReplicationServerDomain domain : baseDNs.values())
    {
@@ -1457,38 +1603,51 @@
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * Wait for the expected received MonitorMsg.
   * @throws DirectoryException When an error occurs.
   */
  private void waitMonitorDataResponses(int expectedResponses)
  private void waitMonitorDataResponses()
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " baseDn=" +
          " waiting for " + expectedResponses + " expected monitor messages");
          "In " + getMonitorInstanceName() +
          " waiting for " + expectedMonitoringMsg.size() +
          " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      // Wait up to 5 seconds for every expected monitoring message to come
      // back.
      boolean allReceived = false;
      long startTime = TimeThread.getTime();
      long curTime = startTime;
      int maxTime = 5000;
      while ( (curTime - startTime) < maxTime )
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        // Have every expected monitoring messages arrived ?
        if (expectedMonitoringMsg.size() == 0)
        {
          // Ok break the loop
          allReceived = true;
          break;
        }
        Thread.sleep(100);
        curTime = TimeThread.getTime();
      }
      monitorDataLastBuildDate = TimeThread.getTime();
      if (!allReceived)
      {
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
        // let's go on in best effort even with limited data received.
      } else
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + getMonitorInstanceName() + " baseDn=" +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
            "In " + getMonitorInstanceName() +
            " Successfully received all expected monitor messages");
      }
    } catch (Exception e)
    {
@@ -1499,11 +1658,18 @@
  /**
   * This should be called by each ReplicationServerDomain that receives
   * a response to a monitor request message.
   * a response to a monitor request message. This may also be called when a
   * monitoring message is coming from a RS whose monitoring publisher thread
   * sent it. As monitoring messages (sent because of monitoring request or
   * because of monitoring publisher) have the same content, this is also ok
   * to mark ok the server when the monitoring message coms from a monitoring
   * publisher thread.
   * @param globalServerId The server handler that is receiving the
   * monitoring message.
   */
  public void responseReceived()
  public void responseReceived(GlobalServerId globalServerId)
  {
    remoteMonitorResponsesSemaphore.release();
    expectedMonitoringMsg.remove(globalServerId);
  }
@@ -1513,7 +1679,7 @@
   */
  public void responseReceivedAll()
  {
    remoteMonitorResponsesSemaphore.notifyAll();
    expectedMonitoringMsg.clear();
  }
  /**