From d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 18 Nov 2009 16:55:52 +0000
Subject: [PATCH] Fix for Issue 4300 : stop replication server cause OutOfMemoryError
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 72 +++++++++++++++++++++++++++++------
1 files changed, 59 insertions(+), 13 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c1eb5e5..24fc481 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -188,6 +188,7 @@
private long generationID;
private int updateDoneCount = 0;
+ private boolean connectRequiresRecovery = false;
/**
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -694,6 +695,21 @@
rsServerId = serverInfo.getServerId();
rsServerUrl = bestServer;
+ receiveTopo(topologyMsg);
+
+ // Log a message to let the administrator know that the failure
+ // was resolved.
+ // Wakeup all the thread that were waiting on the window
+ // on the previous connection.
+ connectionError = false;
+ if (sendWindow != null)
+ {
+ sendWindow.release(Integer.MAX_VALUE);
+ }
+ sendWindow = new Semaphore(maxSendWindow);
+ rcvWindow = maxRcvWindow;
+ connected = true;
+
// May have created a broker with null replication domain for
// unit test purpose.
if (domain != null)
@@ -703,8 +719,7 @@
serverInfo.getGenerationId(),
session);
}
- receiveTopo(topologyMsg);
- connected = true;
+
if (getRsGroupId() != groupId)
{
// Connected to replication server with wrong group id:
@@ -766,17 +781,6 @@
if (connected)
{
- // Log a message to let the administrator know that the failure was
- // resolved.
- // Wakeup all the thread that were waiting on the window
- // on the previous connection.
- connectionError = false;
- if (sendWindow != null)
- {
- sendWindow.release(Integer.MAX_VALUE);
- }
- sendWindow = new Semaphore(maxSendWindow);
- rcvWindow = maxRcvWindow;
connectPhaseLock.notify();
if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
@@ -1786,6 +1790,25 @@
*/
public void publish(ReplicationMsg msg)
{
+ _publish(msg, false);
+ }
+
+ /**
+ * Publish a recovery message to the other servers.
+ * @param msg the message to publish
+ */
+ public void publishRecovery(ReplicationMsg msg)
+ {
+ _publish(msg, true);
+ }
+
+ /**
+ * Publish a message to the other servers.
+ * @param msg the message to publish
+ * @param recoveryMsg the message is a recovery Message
+ */
+ void _publish(ReplicationMsg msg, boolean recoveryMsg)
+ {
boolean done = false;
while (!done && !shutdown)
@@ -1825,6 +1848,15 @@
currentWindowSemaphore = sendWindow;
}
+ // If the Replication domain has decided that there is a need to
+ // recover some changes then it is not allowed to send this
+ // change but it will be the responsibility of the recovery thread to
+ // do it.
+ if (!recoveryMsg & connectRequiresRecovery)
+ {
+ return;
+ }
+
if (msg instanceof UpdateMsg)
{
// Acquiring the window credit must be done outside of the
@@ -2548,4 +2580,18 @@
ctHeartbeatPublisherThread = null;
}
}
+
+ /**
+ * Set the connectRequiresRecovery to the provided value.
+ * This flag is used to indicate if a recovery of Update is necessary
+ * after a reconnection to a RS.
+ * It is the responsibility of the ReplicationDomain to set it during the
+ * sessionInitiated phase.
+ *
+ * @param b the new value of the connectRequiresRecovery.
+ */
+ public void setRecoveryRequired(boolean b)
+ {
+ connectRequiresRecovery = b;
+ }
}
--
Gitblit v1.10.0