From b0acea5e1ca30af24c2f976ee0dd8dc43d31ea58 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 31 Jan 2007 17:21:44 +0000
Subject: [PATCH] Fix for Issue 1162 : Synchronization server deadlock when using multiple masters and max delay feature

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java |   67 ++++++++++++++++++++++++++++++---
 1 files changed, 61 insertions(+), 6 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index a8785c3..6c74065 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -103,6 +103,10 @@
   private ServerReader reader;
   private Semaphore sendWindow;
   private int sendWindowSize;
+  private boolean flowControl = false; // indicate that the server is
+                                       // flow controled and should
+                                       // be stopped from sending messsages.
+  private int saturationCount = 0;
 
   private static Map<ChangeNumber, ChangelogAckMessageList>
    changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -594,8 +598,9 @@
    * managed by this ServerHandler.
    *
    * @param update The update that must be added to the list of updates.
+   * @param sourceHandler The server that sent the update.
    */
-  public void add(UpdateMessage update)
+  public void add(UpdateMessage update, ServerHandler sourceHandler)
   {
     synchronized (msgQueue)
     {
@@ -617,6 +622,17 @@
         msgQueue.removeFirst();
       }
     }
+
+    if (isSaturated(update.getChangeNumber(), sourceHandler))
+    {
+      sourceHandler.setSaturated(true);
+    }
+
+  }
+
+  private void setSaturated(boolean value)
+  {
+    flowControl = value;
   }
 
   /**
@@ -630,6 +646,24 @@
   {
     boolean interrupted = true;
     UpdateMessage msg = getnextMessage();
+
+    /*
+     * When we remove a message from the queue we need to check if another
+     * server is waiting in flow control because this queue was too long.
+     * This check might cause a performance penalty an therefore it
+     * is not done for every message removed but only every few messages.
+     */
+    if (++saturationCount > 10)
+    {
+      saturationCount = 0;
+      try
+      {
+        changelogCache.checkAllSaturation();
+      }
+      catch (IOException e)
+      {
+      }
+    }
     do {
       try
       {
@@ -1143,19 +1177,40 @@
   }
 
   /**
+   * Decrement the protocol window, then check if it is necessary
+   * to send a WindowMessage and send it.
+   *
+   * @throws IOException when the session becomes unavailable.
+   */
+  public synchronized void decAndCheckWindow() throws IOException
+  {
+    rcvWindow--;
+    checkWindow();
+  }
+
+  /**
    * Check the protocol window and send WindowMessage if necessary.
    *
    * @throws IOException when the session becomes unavailable.
    */
   public synchronized void checkWindow() throws IOException
   {
-    rcvWindow--;
     if (rcvWindow < rcvWindowSizeHalf)
     {
-      WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
-      session.publish(msg);
-      outAckCount++;
-      rcvWindow += rcvWindowSizeHalf;
+      if (flowControl)
+      {
+        if (changelogCache.restartAfterSaturation(this))
+        {
+          flowControl = false;
+        }
+      }
+      if (!flowControl)
+      {
+        WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+        session.publish(msg);
+        outAckCount++;
+        rcvWindow += rcvWindowSizeHalf;
+      }
     }
   }
 

--
Gitblit v1.10.0