From d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 18 Nov 2009 16:55:52 +0000
Subject: [PATCH] Fix for Issue 4300 : stop replication server cause OutOfMemoryError

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |   72 +++++++++++++++++++++++++++++------
 1 files changed, 59 insertions(+), 13 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c1eb5e5..24fc481 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -188,6 +188,7 @@
 
   private long generationID;
   private int updateDoneCount = 0;
+  private boolean connectRequiresRecovery = false;
 
   /**
    * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -694,6 +695,21 @@
                 rsServerId = serverInfo.getServerId();
                 rsServerUrl = bestServer;
 
+                receiveTopo(topologyMsg);
+
+                // Log a message to let the administrator know that the failure
+                // was resolved.
+                // Wakeup all the thread that were waiting on the window
+                // on the previous connection.
+                connectionError = false;
+                if (sendWindow != null)
+                {
+                  sendWindow.release(Integer.MAX_VALUE);
+                }
+                sendWindow = new Semaphore(maxSendWindow);
+                rcvWindow = maxRcvWindow;
+                connected = true;
+
                 // May have created a broker with null replication domain for
                 // unit test purpose.
                 if (domain != null)
@@ -703,8 +719,7 @@
                       serverInfo.getGenerationId(),
                       session);
                 }
-                receiveTopo(topologyMsg);
-                connected = true;
+
                 if (getRsGroupId() != groupId)
                 {
                  // Connected to replication server with wrong group id:
@@ -766,17 +781,6 @@
 
       if (connected)
       {
-        // Log a message to let the administrator know that the failure was
-        // resolved.
-        // Wakeup all the thread that were waiting on the window
-        // on the previous connection.
-        connectionError = false;
-        if (sendWindow != null)
-        {
-          sendWindow.release(Integer.MAX_VALUE);
-        }
-        sendWindow = new Semaphore(maxSendWindow);
-        rcvWindow = maxRcvWindow;
         connectPhaseLock.notify();
 
         if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
@@ -1786,6 +1790,25 @@
    */
   public void publish(ReplicationMsg msg)
   {
+    _publish(msg, false);
+  }
+
+  /**
+   * Publish a recovery message to the other servers.
+   * @param msg the message to publish
+   */
+  public void publishRecovery(ReplicationMsg msg)
+  {
+    _publish(msg, true);
+  }
+
+  /**
+   * Publish a message to the other servers.
+   * @param msg the message to publish
+   * @param recoveryMsg the message is a recovery Message
+   */
+  void _publish(ReplicationMsg msg, boolean recoveryMsg)
+  {
     boolean done = false;
 
     while (!done && !shutdown)
@@ -1825,6 +1848,15 @@
           currentWindowSemaphore = sendWindow;
         }
 
+        // If the Replication domain has decided that there is a need to
+        // recover some changes then it is not allowed to send this
+        // change but it will be the responsibility of the recovery thread to
+        // do it.
+        if (!recoveryMsg & connectRequiresRecovery)
+        {
+          return;
+        }
+
         if (msg instanceof UpdateMsg)
         {
           // Acquiring the window credit must be done outside of the
@@ -2548,4 +2580,18 @@
       ctHeartbeatPublisherThread = null;
     }
   }
+
+  /**
+   * Set the connectRequiresRecovery to the provided value.
+   * This flag is used to indicate if a recovery of Update is necessary
+   * after a reconnection to a RS.
+   * It is the responsibility of the ReplicationDomain to set it during the
+   * sessionInitiated phase.
+   *
+   * @param b the new value of the connectRequiresRecovery.
+   */
+  public void setRecoveryRequired(boolean b)
+  {
+    connectRequiresRecovery = b;
+  }
 }

--
Gitblit v1.10.0