From 905277605ce8651332e1d2eb0752b24c7a739ac8 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 29 Mar 2012 16:59:02 +0000
Subject: [PATCH] Fix OPENDJ-457: Sleeping replication threads prevent server from shutting down

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  182 ++++++++++++++++++++++++++++-----------------
 1 files changed, 114 insertions(+), 68 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index b0be403..927241a 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -85,6 +85,7 @@
    */
   private static final DebugTracer TRACER = getTracer();
   private volatile boolean shutdown = false;
+  private final Object startStopLock = new Object();
   private volatile Collection<String> servers;
   private volatile boolean connected = false;
   private volatile String replicationServer = "Not connected";
@@ -235,9 +236,12 @@
    */
   public void start()
   {
-    shutdown = false;
-    this.rcvWindow = this.maxRcvWindow;
-    this.connect();
+    synchronized (startStopLock)
+    {
+      shutdown = false;
+      this.rcvWindow = this.maxRcvWindow;
+      this.connect();
+    }
   }
 
   /**
@@ -247,21 +251,23 @@
    */
   public void start(Collection<String> servers)
   {
-    /*
-     * Open Socket to the ReplicationServer
-     * Send the Start message
-     */
-    shutdown = false;
-    this.servers = servers;
-
-    if (servers.size() < 1)
+    synchronized (startStopLock)
     {
-      Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
-      logError(message);
-    }
+      /*
+       * Open Socket to the ReplicationServer Send the Start message
+       */
+      shutdown = false;
+      this.servers = servers;
 
-    this.rcvWindow = this.maxRcvWindow;
-    this.connect();
+      if (servers.size() < 1)
+      {
+        Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
+        logError(message);
+      }
+
+      this.rcvWindow = this.maxRcvWindow;
+      this.connect();
+    }
   }
 
   /**
@@ -2115,7 +2121,7 @@
    */
   public void reStart(boolean infiniteTry)
   {
-    reStart(this.session, infiniteTry);
+    reStart(session, infiniteTry);
   }
 
   /**
@@ -2126,7 +2132,6 @@
    */
   public void reStart(ProtocolSession failingSession, boolean infiniteTry)
   {
-
     if (failingSession != null)
     {
       failingSession.close();
@@ -2135,43 +2140,64 @@
 
     if (failingSession == session)
     {
-      this.connected = false;
+      connected = false;
       rsGroupId = (byte) -1;
       rsServerId = -1;
       rsServerUrl = null;
       session = null;
     }
-    while (!this.connected && (!this.shutdown))
+
+    while (true)
     {
-      try
+      // Synchronize inside the loop in order to allow shutdown.
+      boolean needSleep = false;
+
+      synchronized (startStopLock)
       {
-        this.connect();
-      } catch (Exception e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
-          baseDn, e.getLocalizedMessage()));
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
+        if (connected || shutdown)
+        {
+          break;
+        }
+
+        try
+        {
+          connect();
+        }
+        catch (Exception e)
+        {
+          MessageBuilder mb = new MessageBuilder();
+          mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn,
+              e.getLocalizedMessage()));
+          mb.append(stackTraceToSingleLineString(e));
+          logError(mb.toMessage());
+        }
+
+        if (connected || !infiniteTry)
+        {
+          break;
+        }
+
+        needSleep = true;
       }
-      if ((!connected) && (!infiniteTry))
-        break;
-      if ((!connected) && (!shutdown))
+
+      if (needSleep)
       {
         try
         {
           Thread.sleep(500);
-        } catch (InterruptedException e)
+        }
+        catch (InterruptedException e)
         {
           // ignore
         }
       }
     }
+
     if (debugEnabled())
-      debugInfo(this +
-          " end restart : connected=" + connected +
-          " with RSid=" + this.getRsServerId() +
-          " genid=" + this.generationID);
+    {
+      debugInfo(this + " end restart : connected=" + connected + " with RSid="
+          + this.getRsServerId() + " genid=" + this.generationID);
+    }
   }
 
   /**
@@ -2376,9 +2402,9 @@
       boolean reconnectOnFailure, boolean returnOnTopoChange)
     throws SocketTimeoutException
   {
-    while (shutdown == false)
+    while (!shutdown)
     {
-      if ((reconnectOnFailure) && (!connected))
+      if (reconnectOnFailure && !connected)
       {
         // infinite try to reconnect
         reStart(null, true);
@@ -2386,12 +2412,17 @@
 
       // Save session information for later in case we need it for log messages
       // after the session has been closed and/or failed.
-      final ProtocolSession failingSession = session;
-      final int replicationServerID = rsServerId;
+      final ProtocolSession savedSession = session;
+      if (savedSession == null)
+      {
+        // Must be shutting down.
+        break;
+      }
 
+      final int replicationServerID = rsServerId;
       try
       {
-        ReplicationMsg msg = session.receive();
+        ReplicationMsg msg = savedSession.receive();
         if (msg instanceof UpdateMsg)
         {
           synchronized (this)
@@ -2403,7 +2434,8 @@
         {
           WindowMsg windowMsg = (WindowMsg) msg;
           sendWindow.release(windowMsg.getNumAck());
-        } else if (msg instanceof TopologyMsg)
+        }
+        else if (msg instanceof TopologyMsg)
         {
           TopologyMsg topoMsg = (TopologyMsg) msg;
           receiveTopo(topoMsg);
@@ -2417,20 +2449,22 @@
           if (returnOnTopoChange)
             return msg;
 
-        } else if (msg instanceof StopMsg)
+        }
+        else if (msg instanceof StopMsg)
         {
           /*
            * RS performs a proper disconnection
            */
           Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
               .get(replicationServerID,
-                  failingSession.getReadableRemoteAddress(),
-                  serverId, baseDn);
+                  savedSession.getReadableRemoteAddress(),
+              serverId, baseDn);
           logError(message);
 
           // Try to find a suitable RS
-          this.reStart(failingSession, true);
-        } else if (msg instanceof MonitorMsg)
+          this.reStart(savedSession, true);
+        }
+        else if (msg instanceof MonitorMsg)
         {
           // This is the response to a MonitorRequest that was sent earlier or
           // the regular message of the monitoring publisher of the RS.
@@ -2490,14 +2524,14 @@
                 {
                   message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                       serverId, replicationServerID,
-                      failingSession.getReadableRemoteAddress(),
+                      savedSession.getReadableRemoteAddress(),
                       baseDn);
                 }
                 else
                 {
                   message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                       serverId, replicationServerID,
-                      failingSession.getReadableRemoteAddress(),
+                      savedSession.getReadableRemoteAddress(),
                       bestServerInfo.getServerId(), baseDn);
                 }
                 logError(message);
@@ -2508,36 +2542,45 @@
               mustRunBestServerCheckingAlgorithm = 0;
             }
           }
-        } else
+        }
+        else
         {
           return msg;
         }
-      } catch (SocketTimeoutException e)
+      }
+      catch (SocketTimeoutException e)
       {
         throw e;
-      } catch (Exception e)
+      }
+      catch (Exception e)
       {
         if (debugEnabled())
         {
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
         }
 
-        if (shutdown == false)
+        if (!shutdown)
         {
-          if ((session == null) || (!session.closeInitiated()))
+          final ProtocolSession tmpSession = session;
+          if (tmpSession == null || !tmpSession.closeInitiated())
           {
             /*
              * We did not initiate the close on our side, log an error message.
              */
             Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
                 .get(serverId, baseDn, replicationServerID,
-                    failingSession.getReadableRemoteAddress());
+                    savedSession.getReadableRemoteAddress());
             logError(message);
           }
+
           if (reconnectOnFailure)
-            reStart(failingSession, true);
+          {
+            reStart(savedSession, true);
+          }
           else
-            break; // does not seem necessary to explicitely disconnect ..
+          {
+            break; // does not seem necessary to explicitly disconnect ..
+          }
         }
       }
     } // while !shutdown
@@ -2614,17 +2657,20 @@
         " close the connection to replication server " + rsServerId + " for" +
         " domain " + baseDn);
 
-    stopRSHeartBeatMonitoring();
-    stopChangeTimeHeartBeatPublishing();
-    replicationServer = "stopped";
-    shutdown = true;
-    connected = false;
-    rsGroupId = (byte) -1;
-    rsServerId = -1;
-    rsServerUrl = null;
-    if (session != null)
+    synchronized (startStopLock)
     {
-      session.close();
+      shutdown = true;
+      connected = false;
+      stopRSHeartBeatMonitoring();
+      stopChangeTimeHeartBeatPublishing();
+      replicationServer = "stopped";
+      rsGroupId = (byte) -1;
+      rsServerId = -1;
+      rsServerUrl = null;
+      if (session != null)
+      {
+        session.close();
+      }
     }
   }
 

--
Gitblit v1.10.0