mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
31.21.2007 b0acea5e1ca30af24c2f976ee0dd8dc43d31ea58
Fix for Issue 1162 : Synchronization server deadlock when using multiple masters and max delay feature

When using multiple masters and configuring the server for maxSendDelays the
synchronization server sometimes stop all activities and never resume it.

The problem was a deadlock between the max delays mechanism and the protocol window mechanism.

This deadlock occurs because the reader threads of the synchronization server are blocked when
the maximum configured delay is reached ans therefore can't process
the WindowMessages anymore.

The solution is not to block the reader thread anymore but to stop sending Window messages to
block the servers that are creating too much delay.

The writer thread then need to check if it is necessary to send again Window messages when
their queue become smaller than the configured threshold.

This fix allows to pass the test :
org.opends.server.stcnhronization.changelog.ChangelogTest.MultipleWriterMultipleReader
This commit therefore also enable this test.
4 files modified
157 ■■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java 85 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java 67 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java 3 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -42,7 +42,6 @@
import org.opends.server.synchronization.protocol.UpdateMessage;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -192,15 +191,11 @@
    /*
     * Push the message to the changelog servers
     */
    HashSet<ServerHandler> saturatedHandler = new HashSet<ServerHandler>();
    if (!sourceHandler.isChangelogServer())
    {
      for (ServerHandler handler : changelogServers.values())
      {
        handler.add(update);
        if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
          saturatedHandler.add(handler);
        handler.add(update, sourceHandler);
      }
    }
@@ -215,45 +210,11 @@
        continue;
      }
      handler.add(update);
      if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
        saturatedHandler.add(handler);
      handler.add(update, sourceHandler);
    }
    /*
     * Check if some flow control must be done.
     * Flow control is done by stopping the reader thread of this
     * server. The messages will therefore accumulate in the TCP socket
     * and will block the writer thread(s) on the other side.
     */
    while (!saturatedHandler.isEmpty())
    {
      HashSet<ServerHandler> restartedHandler = new HashSet<ServerHandler>();
      for (ServerHandler handler : saturatedHandler)
      {
        if (handler.restartAfterSaturation(sourceHandler))
        {
          restartedHandler.add(handler);
        }
      }
      for (ServerHandler handler : restartedHandler)
        saturatedHandler.remove(handler);
      synchronized(flowControlLock)
      {
        if (!saturatedHandler.isEmpty())
        {
          try
          {
            flowControlLock.wait(100);
          } catch (Exception e)
          {
            // just loop
          }
        }
      }
    }
  }
  /**
@@ -568,4 +529,44 @@
  {
    return "ChangelogCache " + baseDn;
  }
  /**
   * Check if some server Handler should be removed from flow control state.
   * @throws IOException If an error happened.
   */
  public void checkAllSaturation() throws IOException
  {
    for (ServerHandler handler : changelogServers.values())
    {
      handler.checkWindow();
    }
    for (ServerHandler handler : connectedServers.values())
    {
      handler.checkWindow();
    }
  }
  /**
   * Check if a server that was in flow control can now restart
   * sending updates.
   * @param sourceHandler The server that must be checked.
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(ServerHandler sourceHandler)
  {
    for (ServerHandler handler : changelogServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
    for (ServerHandler handler : connectedServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
          return false;
    }
    return true;
  }
}
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -103,6 +103,10 @@
  private ServerReader reader;
  private Semaphore sendWindow;
  private int sendWindowSize;
  private boolean flowControl = false; // indicate that the server is
                                       // flow controled and should
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  private static Map<ChangeNumber, ChangelogAckMessageList>
   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -594,8 +598,9 @@
   * managed by this ServerHandler.
   *
   * @param update The update that must be added to the list of updates.
   * @param sourceHandler The server that sent the update.
   */
  public void add(UpdateMessage update)
  public void add(UpdateMessage update, ServerHandler sourceHandler)
  {
    synchronized (msgQueue)
    {
@@ -617,6 +622,17 @@
        msgQueue.removeFirst();
      }
    }
    if (isSaturated(update.getChangeNumber(), sourceHandler))
    {
      sourceHandler.setSaturated(true);
    }
  }
  private void setSaturated(boolean value)
  {
    flowControl = value;
  }
  /**
@@ -630,6 +646,24 @@
  {
    boolean interrupted = true;
    UpdateMessage msg = getnextMessage();
    /*
     * When we remove a message from the queue we need to check if another
     * server is waiting in flow control because this queue was too long.
     * This check might cause a performance penalty an therefore it
     * is not done for every message removed but only every few messages.
     */
    if (++saturationCount > 10)
    {
      saturationCount = 0;
      try
      {
        changelogCache.checkAllSaturation();
      }
      catch (IOException e)
      {
      }
    }
    do {
      try
      {
@@ -1143,19 +1177,40 @@
  }
  /**
   * Decrement the protocol window, then check if it is necessary
   * to send a WindowMessage and send it.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void decAndCheckWindow() throws IOException
  {
    rcvWindow--;
    checkWindow();
  }
  /**
   * Check the protocol window and send WindowMessage if necessary.
   *
   * @throws IOException when the session becomes unavailable.
   */
  public synchronized void checkWindow() throws IOException
  {
    rcvWindow--;
    if (rcvWindow < rcvWindowSizeHalf)
    {
      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
      session.publish(msg);
      outAckCount++;
      rcvWindow += rcvWindowSizeHalf;
      if (flowControl)
      {
        if (changelogCache.restartAfterSaturation(this))
        {
          flowControl = false;
        }
      }
      if (!flowControl)
      {
        WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
        session.publish(msg);
        outAckCount++;
        rcvWindow += rcvWindowSizeHalf;
      }
    }
  }
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -102,12 +102,13 @@
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
          handler.checkWindow();
          changelogCache.ack(ack, serverId);
        }
        else if (msg instanceof UpdateMessage)
        {
          UpdateMessage update = (UpdateMessage) msg;
          handler.checkWindow();
          handler.decAndCheckWindow();
          changelogCache.put(update, handler);
        }
        else if (msg instanceof WindowMessage)
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -464,7 +464,7 @@
   * This test is sconfigured for a relatively low stress
   * but can be changed using TOTAL_MSG and THREADS consts.
   */
  @Test(enabled=false, groups="slow")
  @Test(enabled=true, groups="slow")
  public void multipleWriterMultipleReader() throws Exception
  {
    ChangelogBroker server = null;