From b0acea5e1ca30af24c2f976ee0dd8dc43d31ea58 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 31 Jan 2007 17:21:44 +0000
Subject: [PATCH] Fix for Issue 1162 : Synchronization server deadlock when using multiple masters and max delay feature
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java | 3
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java | 67 ++++++++++++++++++++--
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java | 2
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java | 85 ++++++++++++++--------------
4 files changed, 107 insertions(+), 50 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
index bb29fc6..b54336a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -42,7 +42,6 @@
import org.opends.server.synchronization.protocol.UpdateMessage;
import java.io.IOException;
-import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -192,15 +191,11 @@
/*
* Push the message to the changelog servers
*/
- HashSet<ServerHandler> saturatedHandler = new HashSet<ServerHandler>();
if (!sourceHandler.isChangelogServer())
{
for (ServerHandler handler : changelogServers.values())
{
- handler.add(update);
-
- if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
- saturatedHandler.add(handler);
+ handler.add(update, sourceHandler);
}
}
@@ -215,45 +210,11 @@
continue;
}
- handler.add(update);
-
- if (handler.isSaturated(update.getChangeNumber(), sourceHandler))
- saturatedHandler.add(handler);
+ handler.add(update, sourceHandler);
}
- /*
- * Check if some flow control must be done.
- * Flow control is done by stopping the reader thread of this
- * server. The messages will therefore accumulate in the TCP socket
- * and will block the writer thread(s) on the other side.
- */
- while (!saturatedHandler.isEmpty())
- {
- HashSet<ServerHandler> restartedHandler = new HashSet<ServerHandler>();
- for (ServerHandler handler : saturatedHandler)
- {
- if (handler.restartAfterSaturation(sourceHandler))
- {
- restartedHandler.add(handler);
- }
- }
- for (ServerHandler handler : restartedHandler)
- saturatedHandler.remove(handler);
- synchronized(flowControlLock)
- {
- if (!saturatedHandler.isEmpty())
- {
- try
- {
- flowControlLock.wait(100);
- } catch (Exception e)
- {
- // just loop
- }
- }
- }
- }
+
}
/**
@@ -568,4 +529,44 @@
{
return "ChangelogCache " + baseDn;
}
+
+ /**
+ * Check if some server Handler should be removed from flow control state.
+ * @throws IOException If an error happened.
+ */
+ public void checkAllSaturation() throws IOException
+ {
+ for (ServerHandler handler : changelogServers.values())
+ {
+ handler.checkWindow();
+ }
+
+ for (ServerHandler handler : connectedServers.values())
+ {
+ handler.checkWindow();
+ }
+ }
+
+ /**
+ * Check if a server that was in flow control can now restart
+ * sending updates.
+ * @param sourceHandler The server that must be checked.
+ * @return true if the server can restart sending changes.
+ * false if the server can't restart sending changes.
+ */
+ public boolean restartAfterSaturation(ServerHandler sourceHandler)
+ {
+ for (ServerHandler handler : changelogServers.values())
+ {
+ if (!handler.restartAfterSaturation(sourceHandler))
+ return false;
+ }
+
+ for (ServerHandler handler : connectedServers.values())
+ {
+ if (!handler.restartAfterSaturation(sourceHandler))
+ return false;
+ }
+ return true;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index a8785c3..6c74065 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -103,6 +103,10 @@
private ServerReader reader;
private Semaphore sendWindow;
private int sendWindowSize;
+ private boolean flowControl = false; // indicate that the server is
+ // flow controled and should
+ // be stopped from sending messsages.
+ private int saturationCount = 0;
private static Map<ChangeNumber, ChangelogAckMessageList>
changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -594,8 +598,9 @@
* managed by this ServerHandler.
*
* @param update The update that must be added to the list of updates.
+ * @param sourceHandler The server that sent the update.
*/
- public void add(UpdateMessage update)
+ public void add(UpdateMessage update, ServerHandler sourceHandler)
{
synchronized (msgQueue)
{
@@ -617,6 +622,17 @@
msgQueue.removeFirst();
}
}
+
+ if (isSaturated(update.getChangeNumber(), sourceHandler))
+ {
+ sourceHandler.setSaturated(true);
+ }
+
+ }
+
+ private void setSaturated(boolean value)
+ {
+ flowControl = value;
}
/**
@@ -630,6 +646,24 @@
{
boolean interrupted = true;
UpdateMessage msg = getnextMessage();
+
+ /*
+ * When we remove a message from the queue we need to check if another
+ * server is waiting in flow control because this queue was too long.
+ * This check might cause a performance penalty an therefore it
+ * is not done for every message removed but only every few messages.
+ */
+ if (++saturationCount > 10)
+ {
+ saturationCount = 0;
+ try
+ {
+ changelogCache.checkAllSaturation();
+ }
+ catch (IOException e)
+ {
+ }
+ }
do {
try
{
@@ -1143,19 +1177,40 @@
}
/**
+ * Decrement the protocol window, then check if it is necessary
+ * to send a WindowMessage and send it.
+ *
+ * @throws IOException when the session becomes unavailable.
+ */
+ public synchronized void decAndCheckWindow() throws IOException
+ {
+ rcvWindow--;
+ checkWindow();
+ }
+
+ /**
* Check the protocol window and send WindowMessage if necessary.
*
* @throws IOException when the session becomes unavailable.
*/
public synchronized void checkWindow() throws IOException
{
- rcvWindow--;
if (rcvWindow < rcvWindowSizeHalf)
{
- WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
- session.publish(msg);
- outAckCount++;
- rcvWindow += rcvWindowSizeHalf;
+ if (flowControl)
+ {
+ if (changelogCache.restartAfterSaturation(this))
+ {
+ flowControl = false;
+ }
+ }
+ if (!flowControl)
+ {
+ WindowMessage msg = new WindowMessage(rcvWindowSizeHalf);
+ session.publish(msg);
+ outAckCount++;
+ rcvWindow += rcvWindowSizeHalf;
+ }
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
index 94cd21c..1435bd2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -102,12 +102,13 @@
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
+ handler.checkWindow();
changelogCache.ack(ack, serverId);
}
else if (msg instanceof UpdateMessage)
{
UpdateMessage update = (UpdateMessage) msg;
- handler.checkWindow();
+ handler.decAndCheckWindow();
changelogCache.put(update, handler);
}
else if (msg instanceof WindowMessage)
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
index 743aa4a..e34d027 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -464,7 +464,7 @@
* This test is sconfigured for a relatively low stress
* but can be changed using TOTAL_MSG and THREADS consts.
*/
- @Test(enabled=false, groups="slow")
+ @Test(enabled=true, groups="slow")
public void multipleWriterMultipleReader() throws Exception
{
ChangelogBroker server = null;
--
Gitblit v1.10.0