From 33aceef7810e559fd0cf68b2f2df715383e1aa00 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Tue, 20 Mar 2012 11:19:25 +0000
Subject: [PATCH] Fix OPENDJ-457: Sleeping replication threads prevent server from shutting down
---
opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java | 34 +++++---
opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java | 35 +++++++-
opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java | 55 +++++++++----
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java | 42 ++++++----
opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java | 4
opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java | 25 +++--
7 files changed, 130 insertions(+), 68 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java b/opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java
index 57750d0..403cd1e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/IdleTimeLimitThread.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
+ * Portions copyright 2012 ForgeRock AS.
*/
package org.opends.server.core;
import org.opends.messages.Message;
@@ -38,6 +39,7 @@
import org.opends.server.types.DisconnectReason;
import static org.opends.server.loggers.debug.DebugLogger.*;
+
import org.opends.server.loggers.ErrorLogger;
import static org.opends.messages.CoreMessages.*;
@@ -60,8 +62,9 @@
- // Indicates whether a shutdown request has been received.
- private boolean shutdownRequested;
+ // Shutdown monitor state.
+ private volatile boolean shutdownRequested;
+ private final Object shutdownLock = new Object();
@@ -93,10 +96,26 @@
{
try
{
- try
+ synchronized (shutdownLock)
{
- sleep(sleepTime);
- } catch (InterruptedException ie) {}
+ if (!shutdownRequested)
+ {
+ try
+ {
+ shutdownLock.wait(sleepTime);
+ }
+ catch (InterruptedException e)
+ {
+ // Server shutdown monitor may interrupt slow threads.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ shutdownRequested = true;
+ break;
+ }
+ }
+ }
sleepTime = 5000L;
for (ConnectionHandler<?> ch : DirectoryServer.getConnectionHandlers())
@@ -181,7 +200,11 @@
*/
public void processServerShutdown(Message reason)
{
- shutdownRequested = true;
+ synchronized (shutdownLock)
+ {
+ shutdownRequested = true;
+ shutdownLock.notifyAll();
+ }
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java
index c16952c..5607ecf 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/ServerShutdownMonitor.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions copyright 2012 ForgeRock AS.
*/
package org.opends.server.core;
@@ -43,7 +44,7 @@
{
// Indicates whether the monitor has completed and the shutdown may be
// finalized with a call to System.exit;
- private boolean monitorDone;
+ private volatile boolean monitorDone;
// The list of threads that need to be monitored.
private LinkedList<Thread> threadList;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
index 45cd861..10515e1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPConnectionHandler.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011 ForgeRock AS
+ * Portions copyright 2011-2012 ForgeRock AS
*/
package org.opends.server.protocols.ldap;
import static org.opends.messages.ProtocolMessages.*;
@@ -155,7 +155,7 @@
// Indicates whether the Directory Server is in the process of
// shutting down.
- private boolean shutdownRequested;
+ private volatile boolean shutdownRequested;
/* Internal LDAP connection handler state */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
index 778a5a4..86f9024 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2012 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -32,6 +32,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.types.DebugLogLevel;
import java.io.IOException;
@@ -123,32 +124,37 @@
}
}
- try
+ long sleepTime = session.getLastPublishTime() +
+ heartbeatInterval - now;
+ if (sleepTime <= 0)
{
- long sleepTime = session.getLastPublishTime() +
- heartbeatInterval - now;
- if (sleepTime <= 0)
- {
- sleepTime = heartbeatInterval;
- }
+ sleepTime = heartbeatInterval;
+ }
- if (debugEnabled())
- {
- TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
- }
+ if (debugEnabled())
+ {
+ TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
+ }
- synchronized (shutdownLock)
+ synchronized (shutdownLock)
+ {
+ if (!shutdown)
{
- if (!shutdown)
+ try
{
shutdownLock.wait(sleepTime);
}
+ catch (InterruptedException e)
+ {
+ // Server shutdown monitor may interrupt slow threads.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ shutdown = true;
+ }
}
}
- catch (InterruptedException e)
- {
- // Keep looping.
- }
}
}
catch (IOException e)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 05d97d0..e2e5b28 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2012 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -34,6 +34,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.types.DebugLogLevel;
/**
* This thread is in charge of periodically determining if the connected
@@ -100,21 +101,25 @@
boolean interrupted = false;
while (!shutdown && !interrupted)
{
- try
+ synchronized (shutdownLock)
{
- synchronized (shutdownLock)
+ if (!shutdown)
{
- if (!shutdown)
+ try
{
shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
}
+ catch (InterruptedException e)
+ {
+ // Server shutdown monitor may interrupt slow threads.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ shutdown = true;
+ break;
+ }
}
- } catch (InterruptedException ex)
- {
- TRACER.debugInfo("Status analyzer for dn " +
- replicationServerDomain.getBaseDn().toString() + " in RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- " has been interrupted while sleeping.");
}
// Go through each connected DS, get the number of pending changes we have
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index 597a6d3..03ad1db 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2012 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -35,6 +35,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.TimeThread;
import java.io.IOException;
@@ -111,27 +112,32 @@
session.publish(ctHeartbeatMsg);
}
- try
+ long sleepTime = session.getLastPublishTime() +
+ heartbeatInterval - now;
+ if (sleepTime <= 0)
{
- long sleepTime = session.getLastPublishTime() +
- heartbeatInterval - now;
- if (sleepTime <= 0)
- {
- sleepTime = heartbeatInterval;
- }
+ sleepTime = heartbeatInterval;
+ }
- synchronized (shutdownLock)
+ synchronized (shutdownLock)
+ {
+ if (!shutdown)
{
- if (!shutdown)
+ try
{
shutdownLock.wait(sleepTime);
}
+ catch (InterruptedException e)
+ {
+ // Server shutdown monitor may interrupt slow threads.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ shutdown = true;
+ }
}
}
- catch (InterruptedException e)
- {
- // Keep looping.
- }
}
}
catch (IOException e)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
index d2325c8..6345f48 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2007-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2012 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -31,10 +31,10 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.types.DebugLogLevel;
import org.opends.server.api.DirectoryThread;
@@ -70,9 +70,10 @@
/**
- * Set this to stop the thread.
+ * Thread life-cycle state.
*/
private volatile boolean shutdown = false;
+ private final Object shutdownLock = new Object();
@@ -110,7 +111,11 @@
*/
public void shutdown()
{
- shutdown = true;
+ synchronized (shutdownLock)
+ {
+ shutdown = true;
+ shutdownLock.notifyAll();
+ }
}
/**
@@ -119,27 +124,30 @@
@Override
public void run()
{
- boolean gotOneFailure = false;
if (debugEnabled())
{
TRACER.debugInfo(this + " is starting, expected interval is " +
heartbeatInterval);
}
+
try
{
+ boolean gotOneFailure = false;
while (!shutdown)
{
long now = System.currentTimeMillis();
long lastReceiveTime = session.getLastReceiveTime();
if (now > lastReceiveTime + heartbeatInterval)
{
- if (gotOneFailure == true)
+ if (gotOneFailure)
{
// Heartbeat is well overdue so the server is assumed to be dead.
logError(WARN_HEARTBEAT_FAILURE.get(serverID,
- replicationServerID,
- session.getReadableRemoteAddress(), baseDN));
- session.close();
+ replicationServerID, session.getReadableRemoteAddress(),
+ baseDN));
+
+ // Exit monitor and close session.
+ shutdown = true;
break;
}
else
@@ -151,13 +159,26 @@
{
gotOneFailure = false;
}
- try
+
+ // Sleep.
+ synchronized (shutdownLock)
{
- Thread.sleep(heartbeatInterval);
- }
- catch (InterruptedException e)
- {
- // That's OK.
+ if (!shutdown)
+ {
+ try
+ {
+ shutdownLock.wait(heartbeatInterval);
+ }
+ catch (InterruptedException e)
+ {
+ // Server shutdown monitor may interrupt slow threads.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ shutdown = true;
+ }
+ }
}
}
}
@@ -165,9 +186,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Heartbeat monitor is exiting." +
- stackTraceToSingleLineString(new Exception()));
+ TRACER.debugInfo("Heartbeat monitor is exiting");
}
+ session.close();
}
}
}
--
Gitblit v1.10.0