mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
31.21.2007 b0acea5e1ca30af24c2f976ee0dd8dc43d31ea58
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -42,7 +42,6 @@
import org.opends.server.synchronization.protocol.UpdateMessage;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -192,15 +191,11 @@
    /*
     * Push the message to the changelog servers
     */
    HashSet<ServerHandler> saturatedHandler = new HashSet<ServerHandler>();
    if (!sourceHandler.isChangelogServer())
    {
      for (ServerHandler handler : changelogServers.values())
      {
        handler.add(update);
        if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
          saturatedHandler.add(handler);
        handler.add(update, sourceHandler);
      }
    }
@@ -215,45 +210,11 @@
        continue;
      }
      handler.add(update);
      if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
        saturatedHandler.add(handler);
      handler.add(update, sourceHandler);
    }
    /*
     * Check if some flow control must be done.
     * Flow control is done by stopping the reader thread of this
     * server. The messages will therefore accumulate in the TCP socket
     * and will block the writer thread(s) on the other side.
     */
    while (!saturatedHandler.isEmpty())
    {
      HashSet<ServerHandler> restartedHandler = new HashSet<ServerHandler>();
      for (ServerHandler handler : saturatedHandler)
      {
        if (handler.restartAfterSaturation(sourceHandler))
        {
          restartedHandler.add(handler);
        }
      }
      for (ServerHandler handler : restartedHandler)
        saturatedHandler.remove(handler);
      synchronized(flowControlLock)
      {
        if (!saturatedHandler.isEmpty())
        {
          try
          {
            flowControlLock.wait(100);
          } catch (Exception e)
          {
            // just loop
          }
        }
      }
    }
  }
  /**
@@ -568,4 +529,44 @@
  {
    return "ChangelogCache " + baseDn;
  }
  /**
   * Check if some server Handler should be removed from flow control state.
   * @throws IOException If an error happened.
   */
  public void checkAllSaturation() throws IOException
  {
    for (ServerHandler handler : changelogServers.values())
    {
      handler.checkWindow();
    }
    for (ServerHandler handler : connectedServers.values())
    {
      handler.checkWindow();
    }
  }
  /**
   * Check if a server that was in flow control can now restart
   * sending updates.
   * @param sourceHandler The server that must be checked.
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(ServerHandler sourceHandler)
  {
    for (ServerHandler handler : changelogServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
    for (ServerHandler handler : connectedServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
    return true;
  }
}
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -103,6 +103,10 @@
  private ServerReader reader;
  private Semaphore sendWindow;
  private int sendWindowSize;
  private boolean flowControl = false; // indicate that the server is
                                       // flow controled and should
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  private static Map<ChangeNumber, ChangelogAckMessageList>
   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -594,8 +598,9 @@
   * managed by this ServerHandler.
   *
   * @param update The update that must be added to the list of updates.
   * @param sourceHandler The server that sent the update.
   */
  public void add(UpdateMessage update)
  public void add(UpdateMessage update, ServerHandler sourceHandler)
  {
    synchronized (msgQueue)
    {
@@ -617,6 +622,17 @@
        msgQueue.removeFirst();
      }
    }
    if (isSaturated(update.getChangeNumber(), sourceHandler))
    {
      sourceHandler.setSaturated(true);
    }
  }
  private void setSaturated(boolean value)
  {
    flowControl = value;
  }
  /**
@@ -630,6 +646,24 @@
  {
    boolean interrupted = true;
    UpdateMessage msg = getnextMessage();
    /*
     * When we remove a message from the queue we need to check if another
     * server is waiting in flow control because this queue was too long.
     * This check might cause a performance penalty an therefore it
     * is not done for every message removed but only every few messages.
     */
    if (++saturationCount > 10)
    {
      saturationCount = 0;
      try
      {
        changelogCache.checkAllSaturation();
      }
      catch (IOException e)
      {
      }
    }
    do {
      try
      {
@@ -1143,19 +1177,40 @@
  }
  /**
   * Decrement the protocol window, then check if it is necessary
   * to send a WindowMessage and send it.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void decAndCheckWindow() throws IOException
  {
    rcvWindow--;
    checkWindow();
  }
  /**
   * Check the protocol window and send WindowMessage if necessary.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void checkWindow() throws IOException
  {
    rcvWindow--;
    if (rcvWindow < rcvWindowSizeHalf)
    {
      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
      session.publish(msg);
      outAckCount++;
      rcvWindow += rcvWindowSizeHalf;
      if (flowControl)
      {
        if (changelogCache.restartAfterSaturation(this))
        {
          flowControl = false;
        }
      }
      if (!flowControl)
      {
        WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
        session.publish(msg);
        outAckCount++;
        rcvWindow += rcvWindowSizeHalf;
      }
    }
  }
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -102,12 +102,13 @@
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
          handler.checkWindow();
          changelogCache.ack(ack, serverId);
        }
        else if (msg instanceof UpdateMessage)
        {
          UpdateMessage update = (UpdateMessage) msg;
          handler.checkWindow();
          handler.decAndCheckWindow();
          changelogCache.put(update, handler);
        }
        else if (msg instanceof WindowMessage)
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -464,7 +464,7 @@
   * This test is sconfigured for a relatively low stress
   * but can be changed using TOTAL_MSG and THREADS consts.
   */
  @Test(enabled=false, groups="slow")
  @Test(enabled=true, groups="slow")
  public void multipleWriterMultipleReader() throws Exception
  {
    ChangelogBroker server = null;