mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
31.21.2007 b0acea5e1ca30af24c2f976ee0dd8dc43d31ea58
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -103,6 +103,10 @@
  private ServerReader reader;
  private Semaphore sendWindow;
  private int sendWindowSize;
  private boolean flowControl = false; // indicate that the server is
                                       // flow controled and should
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  private static Map<ChangeNumber, ChangelogAckMessageList>
   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -594,8 +598,9 @@
   * managed by this ServerHandler.
   *
   * @param update The update that must be added to the list of updates.
   * @param sourceHandler The server that sent the update.
   */
  public void add(UpdateMessage update)
  public void add(UpdateMessage update, ServerHandler sourceHandler)
  {
    synchronized (msgQueue)
    {
@@ -617,6 +622,17 @@
        msgQueue.removeFirst();
      }
    }
    if (isSaturated(update.getChangeNumber(), sourceHandler))
    {
      sourceHandler.setSaturated(true);
    }
  }
  private void setSaturated(boolean value)
  {
    flowControl = value;
  }
  /**
@@ -630,6 +646,24 @@
  {
    boolean interrupted = true;
    UpdateMessage msg = getnextMessage();
    /*
     * When we remove a message from the queue we need to check if another
     * server is waiting in flow control because this queue was too long.
     * This check might cause a performance penalty an therefore it
     * is not done for every message removed but only every few messages.
     */
    if (++saturationCount > 10)
    {
      saturationCount = 0;
      try
      {
        changelogCache.checkAllSaturation();
      }
      catch (IOException e)
      {
      }
    }
    do {
      try
      {
@@ -1143,19 +1177,40 @@
  }
  /**
   * Decrement the protocol window, then check if it is necessary
   * to send a WindowMessage and send it.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void decAndCheckWindow() throws IOException
  {
    rcvWindow--;
    checkWindow();
  }
  /**
   * Check the protocol window and send WindowMessage if necessary.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void checkWindow() throws IOException
  {
    rcvWindow--;
    if (rcvWindow < rcvWindowSizeHalf)
    {
      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
      session.publish(msg);
      outAckCount++;
      rcvWindow += rcvWindowSizeHalf;
      if (flowControl)
      {
        if (changelogCache.restartAfterSaturation(this))
        {
          flowControl = false;
        }
      }
      if (!flowControl)
      {
        WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
        session.publish(msg);
        outAckCount++;
        rcvWindow += rcvWindowSizeHalf;
      }
    }
  }