From b0acea5e1ca30af24c2f976ee0dd8dc43d31ea58 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 31 Jan 2007 17:21:44 +0000
Subject: [PATCH] Fix for Issue 1162 : Synchronization server deadlock when using multiple masters and max delay feature
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java | 67 ++++++++++++++++++++++++++++++---
1 files changed, 61 insertions(+), 6 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index a8785c3..6c74065 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -103,6 +103,10 @@
private ServerReader reader;
private Semaphore sendWindow;
private int sendWindowSize;
+ private boolean flowControl = false; // indicate that the server is
+ // flow controled and should
+ // be stopped from sending messsages.
+ private int saturationCount = 0;
private static Map<ChangeNumber, ChangelogAckMessageList>
changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -594,8 +598,9 @@
* managed by this ServerHandler.
*
* @param update The update that must be added to the list of updates.
+ * @param sourceHandler The server that sent the update.
*/
- public void add(UpdateMessage update)
+ public void add(UpdateMessage update, ServerHandler sourceHandler)
{
synchronized (msgQueue)
{
@@ -617,6 +622,17 @@
msgQueue.removeFirst();
}
}
+
+ if (isSaturated(update.getChangeNumber(), sourceHandler))
+ {
+ sourceHandler.setSaturated(true);
+ }
+
+ }
+
+ private void setSaturated(boolean value)
+ {
+ flowControl = value;
}
/**
@@ -630,6 +646,24 @@
{
boolean interrupted = true;
UpdateMessage msg = getnextMessage();
+
+ /*
+ * When we remove a message from the queue we need to check if another
+ * server is waiting in flow control because this queue was too long.
+ * This check might cause a performance penalty an therefore it
+ * is not done for every message removed but only every few messages.
+ */
+ if (++saturationCount > 10)
+ {
+ saturationCount = 0;
+ try
+ {
+ changelogCache.checkAllSaturation();
+ }
+ catch (IOException e)
+ {
+ }
+ }
do {
try
{
@@ -1143,19 +1177,40 @@
}
/**
+ * Decrement the protocol window, then check if it is necessary
+ * to send a WindowMessage and send it.
+ *
+ * @throws IOException when the session becomes unavailable.
+ */
+ public synchronized void decAndCheckWindow() throws IOException
+ {
+ rcvWindow--;
+ checkWindow();
+ }
+
+ /**
* Check the protocol window and send WindowMessage if necessary.
*
* @throws IOException when the session becomes unavailable.
*/
public synchronized void checkWindow() throws IOException
{
- rcvWindow--;
if (rcvWindow < rcvWindowSizeHalf)
{
- WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
- session.publish(msg);
- outAckCount++;
- rcvWindow += rcvWindowSizeHalf;
+ if (flowControl)
+ {
+ if (changelogCache.restartAfterSaturation(this))
+ {
+ flowControl = false;
+ }
+ }
+ if (!flowControl)
+ {
+ WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+ session.publish(msg);
+ outAckCount++;
+ rcvWindow += rcvWindowSizeHalf;
+ }
}
}
--
Gitblit v1.10.0