mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
23.13.2009 df0e36ba23d4992009b1b694bb5cb37ba9587836
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -100,22 +100,6 @@
   */
  protected int inCount = 0;
  /**
   * Specifies the max receive queue for this handler.
   */
  protected int maxReceiveQueue = 0;
  /**
   * Specifies the max send queue for this handler.
   */
  protected int maxSendQueue = 0;
  /**
   * Specifies the max receive delay for this handler.
   */
  protected int maxReceiveDelay = 0;
  /**
   * Specifies the max send delay for this handler.
   */
  protected int maxSendDelay = 0;
  /**
   * Specifies the max queue size for this handler.
   */
  protected int maxQueueSize = 5000;
@@ -124,22 +108,6 @@
   */
  protected int maxQueueBytesSize = maxQueueSize * 100;
  /**
   * Specifies the max restart receive queue for this handler.
   */
  protected int restartReceiveQueue;
  /**
   * Specifies the max restart send queue for this handler.
   */
  protected int restartSendQueue;
  /**
   * Specifies the max restart receive delay for this handler.
   */
  protected int restartReceiveDelay;
  /**
   * Specifies the max restart send delay for this handler.
   */
  protected int restartSendDelay;
  /**
   * Specifies whether the consumer is following the producer (is not late).
   */
  protected boolean following = false;
@@ -152,11 +120,6 @@
   */
  private String serviceId = null;
  /**
   * Specifies whether the server is flow controlled and should be stopped from
   * sending messages.
   */
  protected boolean flowControl = false;
  /**
   * Specifies whether the consumer is still active or not.
   * If not active, the handler will not return any message.
   * Called at the beginning of shutdown process.
@@ -220,12 +183,6 @@
        msgQueue.removeFirst();
      }
    }
    if (isSaturated(update.getChangeNumber(), sourceHandler))
    {
      sourceHandler.setSaturated(true);
    }
  }
  /**
   * Set the shut down flag to true and returns the previous value of the flag.
@@ -708,92 +665,6 @@
  }
  /**
   * Check is this server is saturated (this server has already been
   * sent a bunch of updates and has not processed them so they are staying
   * in the message queue for this server an the size of the queue
   * for this server is above the configured limit.
   *
   * The limit can be defined in number of updates or with a maximum delay
   *
   * @param changeNumber The changenumber to use to make the delay calculations.
   * @param sourceHandler The ServerHandler which is sending the update.
   * @return true is saturated false if not saturated.
   */
  public boolean isSaturated(ChangeNumber changeNumber,
      MessageHandler sourceHandler)
  {
    synchronized (msgQueue)
    {
      int size = msgQueue.count();
      if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
        return true;
      if ((sourceHandler.maxSendQueue > 0) &&
          (size >= sourceHandler.maxSendQueue))
        return true;
      if (!msgQueue.isEmpty())
      {
        UpdateMsg firstUpdate = msgQueue.first();
        if (firstUpdate != null)
        {
          long timeDiff = changeNumber.getTimeSec() -
          firstUpdate.getChangeNumber().getTimeSec();
          if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay))
            return true;
          if ((sourceHandler.maxSendDelay > 0) &&
              (timeDiff >= sourceHandler.maxSendDelay))
            return true;
        }
      }
      return false;
    }
  }
  /**
   * Check that the size of the Server Handler messages Queue has lowered
   * below the limit and therefore allowing the reception of messages
   * from other servers to restart.
   * @param source The ServerHandler which was sending the update.
   *        can be null.
   * @return true if the processing can restart
   */
  public boolean restartAfterSaturation(MessageHandler source)
  {
    synchronized (msgQueue)
    {
      int queueSize = msgQueue.count();
      if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
        return false;
      if ((source != null) && (source.maxSendQueue > 0) &&
          (queueSize >= source.restartSendQueue))
        return false;
      if (!msgQueue.isEmpty())
      {
        UpdateMsg firstUpdate = msgQueue.first();
        UpdateMsg lastUpdate = msgQueue.last();
        if ((firstUpdate != null) && (lastUpdate != null))
        {
          long timeDiff = lastUpdate.getChangeNumber().getTimeSec() -
          firstUpdate.getChangeNumber().getTimeSec();
          if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay))
            return false;
          if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >=
            source.restartSendDelay))
            return false;
        }
      }
    }
    return true;
  }
  /**
   * Set that the consumer is now becoming inactive and thus getNextMessage
   * should not return any UpdateMsg any more.
   * @param active the provided state of the consumer.
@@ -812,11 +683,6 @@
    this.following = following;
  }
  private void setSaturated(boolean value)
  {
    flowControl = value;
  }
  /**
   * Set the initial value of the serverState for this handler.
   * Expected to be done once, then the state will be updated using