From 33aceef7810e559fd0cf68b2f2df715383e1aa00 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 20 Mar 2012 11:19:25 +0000
Subject: [PATCH] Fix OPENDJ-457: Sleeping replication threads prevent server from shutting down

---
 opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java |   34 +++++---
 opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java                       |   35 +++++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java           |   55 +++++++++----
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java           |   42 ++++++----
 opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java           |    4 
 opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java                     |    3 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java              |   25 +++--
 7 files changed, 130 insertions(+), 68 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java b/opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java
index 57750d0..403cd1e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2008 Sun Microsystems, Inc.
+ *      Portions copyright 2012 ForgeRock AS.
  */
 package org.opends.server.core;
 import org.opends.messages.Message;
@@ -38,6 +39,7 @@
 import org.opends.server.types.DisconnectReason;
 
 import static org.opends.server.loggers.debug.DebugLogger.*;
+
 import org.opends.server.loggers.ErrorLogger;
 import static org.opends.messages.CoreMessages.*;
 
@@ -60,8 +62,9 @@
 
 
 
-  // Indicates whether a shutdown request has been received.
-  private boolean shutdownRequested;
+  // Shutdown monitor state.
+  private volatile boolean shutdownRequested;
+  private final Object shutdownLock = new Object();
 
 
 
@@ -93,10 +96,26 @@
     {
       try
       {
-        try
+        synchronized (shutdownLock)
         {
-          sleep(sleepTime);
-        } catch (InterruptedException ie) {}
+          if (!shutdownRequested)
+          {
+            try
+            {
+              shutdownLock.wait(sleepTime);
+            }
+            catch (InterruptedException e)
+            {
+              // Server shutdown monitor may interrupt slow threads.
+              if (debugEnabled())
+              {
+                TRACER.debugCaught(DebugLogLevel.ERROR, e);
+              }
+              shutdownRequested = true;
+              break;
+            }
+          }
+        }
 
         sleepTime = 5000L;
         for (ConnectionHandler<?> ch : DirectoryServer.getConnectionHandlers())
@@ -181,7 +200,11 @@
    */
   public void processServerShutdown(Message reason)
   {
-    shutdownRequested = true;
+    synchronized (shutdownLock)
+    {
+      shutdownRequested = true;
+      shutdownLock.notifyAll();
+    }
   }
 }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java
index c16952c..5607ecf 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Portions copyright 2012 ForgeRock AS.
  */
 package org.opends.server.core;
 
@@ -43,7 +44,7 @@
 {
   // Indicates whether the monitor has completed and the shutdown may be
   // finalized with a call to System.exit;
-  private boolean monitorDone;
+  private volatile boolean monitorDone;
 
   // The list of threads that need to be monitored.
   private LinkedList<Thread> threadList;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
index 45cd861..10515e1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011 ForgeRock AS
+ *      Portions copyright 2011-2012 ForgeRock AS
  */
 package org.opends.server.protocols.ldap;
 import static org.opends.messages.ProtocolMessages.*;
@@ -155,7 +155,7 @@
 
   // Indicates whether the Directory Server is in the process of
   // shutting down.
-  private boolean shutdownRequested;
+  private volatile boolean shutdownRequested;
 
   /* Internal LDAP connection handler state */
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
index 778a5a4..86f9024 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2008 Sun Microsystems, Inc.
- *      Portions Copyright 2011 ForgeRock AS
+ *      Portions Copyright 2011-2012 ForgeRock AS
  */
 
 package org.opends.server.replication.protocol;
@@ -32,6 +32,7 @@
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.types.DebugLogLevel;
 
 import java.io.IOException;
 
@@ -123,32 +124,37 @@
           }
         }
 
-        try
+        long sleepTime = session.getLastPublishTime() +
+            heartbeatInterval - now;
+        if (sleepTime <= 0)
         {
-          long sleepTime = session.getLastPublishTime() +
-              heartbeatInterval - now;
-          if (sleepTime <= 0)
-          {
-            sleepTime = heartbeatInterval;
-          }
+          sleepTime = heartbeatInterval;
+        }
 
-          if (debugEnabled())
-          {
-            TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
-          }
+        if (debugEnabled())
+        {
+          TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
+        }
 
-          synchronized (shutdownLock)
+        synchronized (shutdownLock)
+        {
+          if (!shutdown)
           {
-            if (!shutdown)
+            try
             {
               shutdownLock.wait(sleepTime);
             }
+            catch (InterruptedException e)
+            {
+              // Server shutdown monitor may interrupt slow threads.
+              if (debugEnabled())
+              {
+                TRACER.debugCaught(DebugLogLevel.ERROR, e);
+              }
+              shutdown = true;
+            }
           }
         }
-        catch (InterruptedException e)
-        {
-          // Keep looping.
-        }
       }
     }
     catch (IOException e)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 05d97d0..e2e5b28 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2008-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011 ForgeRock AS
+ *      Portions Copyright 2011-2012 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -34,6 +34,7 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.types.DebugLogLevel;
 
 /**
  * This thread is in charge of periodically determining if the connected
@@ -100,21 +101,25 @@
     boolean interrupted = false;
     while (!shutdown && !interrupted)
     {
-      try
+      synchronized (shutdownLock)
       {
-        synchronized (shutdownLock)
+        if (!shutdown)
         {
-          if (!shutdown)
+          try
           {
             shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
           }
+          catch (InterruptedException e)
+          {
+            // Server shutdown monitor may interrupt slow threads.
+            if (debugEnabled())
+            {
+              TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            }
+            shutdown = true;
+            break;
+          }
         }
-      } catch (InterruptedException ex)
-      {
-        TRACER.debugInfo("Status analyzer for dn " +
-            replicationServerDomain.getBaseDn().toString() + " in RS " +
-            replicationServerDomain.getReplicationServer().getServerId() +
-            " has been interrupted while sleeping.");
       }
 
       // Go through each connected DS, get the number of pending changes we have
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index 597a6d3..03ad1db 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011 ForgeRock AS
+ *      Portions Copyright 2011-2012 ForgeRock AS
  */
 
 package org.opends.server.replication.service;
@@ -35,6 +35,7 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.TimeThread;
 
 import java.io.IOException;
@@ -111,27 +112,32 @@
           session.publish(ctHeartbeatMsg);
         }
 
-        try
+        long sleepTime = session.getLastPublishTime() +
+            heartbeatInterval - now;
+        if (sleepTime <= 0)
         {
-          long sleepTime = session.getLastPublishTime() +
-              heartbeatInterval - now;
-          if (sleepTime <= 0)
-          {
-            sleepTime = heartbeatInterval;
-          }
+          sleepTime = heartbeatInterval;
+        }
 
-          synchronized (shutdownLock)
+        synchronized (shutdownLock)
+        {
+          if (!shutdown)
           {
-            if (!shutdown)
+            try
             {
               shutdownLock.wait(sleepTime);
             }
+            catch (InterruptedException e)
+            {
+              // Server shutdown monitor may interrupt slow threads.
+              if (debugEnabled())
+              {
+                TRACER.debugCaught(DebugLogLevel.ERROR, e);
+              }
+              shutdown = true;
+            }
           }
         }
-        catch (InterruptedException e)
-        {
-          // Keep looping.
-        }
       }
     }
     catch (IOException e)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
index d2325c8..6345f48 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2007-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011 ForgeRock AS
+ *      Portions Copyright 2011-2012 ForgeRock AS
  */
 
 package org.opends.server.replication.service;
@@ -31,10 +31,10 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.types.DebugLogLevel;
 
 import org.opends.server.api.DirectoryThread;
 
@@ -70,9 +70,10 @@
 
 
   /**
-   * Set this to stop the thread.
+   * Thread life-cycle state.
    */
   private volatile boolean shutdown = false;
+  private final Object shutdownLock = new Object();
 
 
 
@@ -110,7 +111,11 @@
    */
   public void shutdown()
   {
-    shutdown = true;
+    synchronized (shutdownLock)
+    {
+      shutdown = true;
+      shutdownLock.notifyAll();
+    }
   }
 
   /**
@@ -119,27 +124,30 @@
   @Override
   public void run()
   {
-    boolean gotOneFailure = false;
     if (debugEnabled())
     {
       TRACER.debugInfo(this + " is starting, expected interval is " +
                 heartbeatInterval);
     }
+
     try
     {
+      boolean gotOneFailure = false;
       while (!shutdown)
       {
         long now = System.currentTimeMillis();
         long lastReceiveTime = session.getLastReceiveTime();
         if (now > lastReceiveTime + heartbeatInterval)
         {
-          if (gotOneFailure == true)
+          if (gotOneFailure)
           {
             // Heartbeat is well overdue so the server is assumed to be dead.
             logError(WARN_HEARTBEAT_FAILURE.get(serverID,
-                replicationServerID,
-                session.getReadableRemoteAddress(), baseDN));
-            session.close();
+                replicationServerID, session.getReadableRemoteAddress(),
+                baseDN));
+
+            // Exit monitor and close session.
+            shutdown = true;
             break;
           }
           else
@@ -151,13 +159,26 @@
         {
           gotOneFailure = false;
         }
-        try
+
+        // Sleep.
+        synchronized (shutdownLock)
         {
-          Thread.sleep(heartbeatInterval);
-        }
-        catch (InterruptedException e)
-        {
-          // That's OK.
+          if (!shutdown)
+          {
+            try
+            {
+              shutdownLock.wait(heartbeatInterval);
+            }
+            catch (InterruptedException e)
+            {
+              // Server shutdown monitor may interrupt slow threads.
+              if (debugEnabled())
+              {
+                TRACER.debugCaught(DebugLogLevel.ERROR, e);
+              }
+              shutdown = true;
+            }
+          }
         }
       }
     }
@@ -165,9 +186,9 @@
     {
       if (debugEnabled())
       {
-        TRACER.debugInfo("Heartbeat monitor is exiting." +
-            stackTraceToSingleLineString(new Exception()));
+        TRACER.debugInfo("Heartbeat monitor is exiting");
       }
+      session.close();
     }
   }
 }

--
Gitblit v1.10.0