mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
31.21.2007 7d707268291c0ab54d36780248f5252dc16c3970
opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -42,7 +42,6 @@
import org.opends.server.synchronization.protocol.UpdateMessage;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -192,15 +191,11 @@
    /*
     * Push the message to the changelog servers
     */
    HashSet<ServerHandler> saturatedHandler = new HashSet<ServerHandler>();
    if (!sourceHandler.isChangelogServer())
    {
      for (ServerHandler handler : changelogServers.values())
      {
        handler.add(update);
        if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
          saturatedHandler.add(handler);
        handler.add(update, sourceHandler);
      }
    }
@@ -215,45 +210,11 @@
        continue;
      }
      handler.add(update);
      if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
        saturatedHandler.add(handler);
      handler.add(update, sourceHandler);
    }
    /*
     * Check if some flow control must be done.
     * Flow control is done by stopping the reader thread of this
     * server. The messages will therefore accumulate in the TCP socket
     * and will block the writer thread(s) on the other side.
     */
    while (!saturatedHandler.isEmpty())
    {
      HashSet<ServerHandler> restartedHandler = new HashSet<ServerHandler>();
      for (ServerHandler handler : saturatedHandler)
      {
        if (handler.restartAfterSaturation(sourceHandler))
        {
          restartedHandler.add(handler);
        }
      }
      for (ServerHandler handler : restartedHandler)
        saturatedHandler.remove(handler);
      synchronized(flowControlLock)
      {
        if (!saturatedHandler.isEmpty())
        {
          try
          {
            flowControlLock.wait(100);
          } catch (Exception e)
          {
            // just loop
          }
        }
      }
    }
  }
  /**
@@ -568,4 +529,44 @@
  {
    return "ChangelogCache " + baseDn;
  }
  /**
   * Check if some server Handler should be removed from flow control state.
   * @throws IOException If an error happened.
   */
  public void checkAllSaturation() throws IOException
  {
    for (ServerHandler handler : changelogServers.values())
    {
      handler.checkWindow();
    }
    for (ServerHandler handler : connectedServers.values())
    {
      handler.checkWindow();
    }
  }
  /**
   * Check if a server that was in flow control can now restart
   * sending updates.
   * @param sourceHandler The server that must be checked.
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(ServerHandler sourceHandler)
  {
    for (ServerHandler handler : changelogServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
    for (ServerHandler handler : connectedServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
    return true;
  }
}