From b0acea5e1ca30af24c2f976ee0dd8dc43d31ea58 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 31 Jan 2007 17:21:44 +0000
Subject: [PATCH] Fix for Issue 1162 : Synchronization server deadlock when using multiple masters and max delay feature

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java                          |    3 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java                         |   67 ++++++++++++++++++++--
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java |    2 
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java                        |   85 ++++++++++++++--------------
 4 files changed, 107 insertions(+), 50 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index bb29fc6..b54336a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -42,7 +42,6 @@
 import org.opends.server.synchronization.protocol.UpdateMessage;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
@@ -192,15 +191,11 @@
     /*
      * Push the message to the changelog servers
      */
-    HashSet<ServerHandler> saturatedHandler = new HashSet<ServerHandler>();
     if (!sourceHandler.isChangelogServer())
     {
       for (ServerHandler handler : changelogServers.values())
       {
-        handler.add(update);
-
-        if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
-          saturatedHandler.add(handler);
+        handler.add(update, sourceHandler);
       }
     }
 
@@ -215,45 +210,11 @@
         continue;
       }
 
-      handler.add(update);
-
-      if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
-        saturatedHandler.add(handler);
+      handler.add(update, sourceHandler);
     }
 
-    /*
-     * Check if some flow control must be done.
-     * Flow control is done by stopping the reader thread of this
-     * server. The messages will therefore accumulate in the TCP socket
-     * and will block the writer thread(s) on the other side.
-     */
-    while (!saturatedHandler.isEmpty())
-    {
-      HashSet<ServerHandler> restartedHandler = new HashSet<ServerHandler>();
-      for (ServerHandler handler : saturatedHandler)
-      {
-        if (handler.restartAfterSaturation(sourceHandler))
-        {
-          restartedHandler.add(handler);
-        }
-      }
-      for (ServerHandler handler : restartedHandler)
-        saturatedHandler.remove(handler);
 
-      synchronized(flowControlLock)
-      {
-        if (!saturatedHandler.isEmpty())
-        {
-          try
-          {
-            flowControlLock.wait(100);
-          } catch (Exception e)
-          {
-            // just loop
-          }
-        }
-      }
-    }
+
   }
 
   /**
@@ -568,4 +529,44 @@
   {
     return "ChangelogCache " + baseDn;
   }
+
+  /**
+   * Check if some server Handler should be removed from flow control state.
+   * @throws IOException If an error happened.
+   */
+  public void checkAllSaturation() throws IOException
+  {
+    for (ServerHandler handler : changelogServers.values())
+    {
+      handler.checkWindow();
+    }
+
+    for (ServerHandler handler : connectedServers.values())
+    {
+      handler.checkWindow();
+    }
+  }
+
+  /**
+   * Check if a server that was in flow control can now restart
+   * sending updates.
+   * @param sourceHandler The server that must be checked.
+   * @return true if the server can restart sending changes.
+   *         false if the server can't restart sending changes.
+   */
+  public boolean restartAfterSaturation(ServerHandler sourceHandler)
+  {
+    for (ServerHandler handler : changelogServers.values())
+    {
+      if (!handler.restartAfterSaturation(sourceHandler))
+          return false;
+    }
+
+    for (ServerHandler handler : connectedServers.values())
+    {
+      if (!handler.restartAfterSaturation(sourceHandler))
+          return false;
+    }
+    return true;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index a8785c3..6c74065 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -103,6 +103,10 @@
   private ServerReader reader;
   private Semaphore sendWindow;
   private int sendWindowSize;
+  private boolean flowControl = false; // indicate that the server is
+                                       // flow controled and should
+                                       // be stopped from sending messsages.
+  private int saturationCount = 0;
 
   private static Map<ChangeNumber, ChangelogAckMessageList>
    changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -594,8 +598,9 @@
    * managed by this ServerHandler.
    *
    * @param update The update that must be added to the list of updates.
+   * @param sourceHandler The server that sent the update.
    */
-  public void add(UpdateMessage update)
+  public void add(UpdateMessage update, ServerHandler sourceHandler)
   {
     synchronized (msgQueue)
     {
@@ -617,6 +622,17 @@
         msgQueue.removeFirst();
       }
     }
+
+    if (isSaturated(update.getChangeNumber(), sourceHandler))
+    {
+      sourceHandler.setSaturated(true);
+    }
+
+  }
+
+  private void setSaturated(boolean value)
+  {
+    flowControl = value;
   }
 
   /**
@@ -630,6 +646,24 @@
   {
     boolean interrupted = true;
     UpdateMessage msg = getnextMessage();
+
+    /*
+     * When we remove a message from the queue we need to check if another
+     * server is waiting in flow control because this queue was too long.
+     * This check might cause a performance penalty an therefore it
+     * is not done for every message removed but only every few messages.
+     */
+    if (++saturationCount > 10)
+    {
+      saturationCount = 0;
+      try
+      {
+        changelogCache.checkAllSaturation();
+      }
+      catch (IOException e)
+      {
+      }
+    }
     do {
       try
       {
@@ -1143,19 +1177,40 @@
   }
 
   /**
+   * Decrement the protocol window, then check if it is necessary
+   * to send a WindowMessage and send it.
+   *
+   * @throws IOException when the session becomes unavailable.
+   */
+  public synchronized void decAndCheckWindow() throws IOException
+  {
+    rcvWindow--;
+    checkWindow();
+  }
+
+  /**
    * Check the protocol window and send WindowMessage if necessary.
    *
    * @throws IOException when the session becomes unavailable.
    */
   public synchronized void checkWindow() throws IOException
   {
-    rcvWindow--;
     if (rcvWindow < rcvWindowSizeHalf)
     {
-      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
-      session.publish(msg);
-      outAckCount++;
-      rcvWindow += rcvWindowSizeHalf;
+      if (flowControl)
+      {
+        if (changelogCache.restartAfterSaturation(this))
+        {
+          flowControl = false;
+        }
+      }
+      if (!flowControl)
+      {
+        WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+        session.publish(msg);
+        outAckCount++;
+        rcvWindow += rcvWindowSizeHalf;
+      }
     }
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
index 94cd21c..1435bd2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -102,12 +102,13 @@
         if (msg instanceof AckMessage)
         {
           AckMessage ack = (AckMessage) msg;
+          handler.checkWindow();
           changelogCache.ack(ack, serverId);
         }
         else if (msg instanceof UpdateMessage)
         {
           UpdateMessage update = (UpdateMessage) msg;
-          handler.checkWindow();
+          handler.decAndCheckWindow();
           changelogCache.put(update, handler);
         }
         else if (msg instanceof WindowMessage)
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index 743aa4a..e34d027 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -464,7 +464,7 @@
    * This test is sconfigured for a relatively low stress
    * but can be changed using TOTAL_MSG and THREADS consts.
    */
-  @Test(enabled=false, groups="slow")
+  @Test(enabled=true, groups="slow")
   public void multipleWriterMultipleReader() throws Exception
   {
     ChangelogBroker server = null;

--
Gitblit v1.10.0