From 905277605ce8651332e1d2eb0752b24c7a739ac8 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 29 Mar 2012 16:59:02 +0000
Subject: [PATCH] Fix OPENDJ-457: Sleeping replication threads prevent server from shutting down
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 182 ++++++++++++++++++++++++++++-----------------
1 files changed, 114 insertions(+), 68 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index b0be403..927241a 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -85,6 +85,7 @@
*/
private static final DebugTracer TRACER = getTracer();
private volatile boolean shutdown = false;
+ private final Object startStopLock = new Object();
private volatile Collection<String> servers;
private volatile boolean connected = false;
private volatile String replicationServer = "Not connected";
@@ -235,9 +236,12 @@
*/
public void start()
{
- shutdown = false;
- this.rcvWindow = this.maxRcvWindow;
- this.connect();
+ synchronized (startStopLock)
+ {
+ shutdown = false;
+ this.rcvWindow = this.maxRcvWindow;
+ this.connect();
+ }
}
/**
@@ -247,21 +251,23 @@
*/
public void start(Collection<String> servers)
{
- /*
- * Open Socket to the ReplicationServer
- * Send the Start message
- */
- shutdown = false;
- this.servers = servers;
-
- if (servers.size() < 1)
+ synchronized (startStopLock)
{
- Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
- logError(message);
- }
+ /*
+ * Open Socket to the ReplicationServer Send the Start message
+ */
+ shutdown = false;
+ this.servers = servers;
- this.rcvWindow = this.maxRcvWindow;
- this.connect();
+ if (servers.size() < 1)
+ {
+ Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
+ logError(message);
+ }
+
+ this.rcvWindow = this.maxRcvWindow;
+ this.connect();
+ }
}
/**
@@ -2115,7 +2121,7 @@
*/
public void reStart(boolean infiniteTry)
{
- reStart(this.session, infiniteTry);
+ reStart(session, infiniteTry);
}
/**
@@ -2126,7 +2132,6 @@
*/
public void reStart(ProtocolSession failingSession, boolean infiniteTry)
{
-
if (failingSession != null)
{
failingSession.close();
@@ -2135,43 +2140,64 @@
if (failingSession == session)
{
- this.connected = false;
+ connected = false;
rsGroupId = (byte) -1;
rsServerId = -1;
rsServerUrl = null;
session = null;
}
- while (!this.connected && (!this.shutdown))
+
+ while (true)
{
- try
+ // Synchronize inside the loop in order to allow shutdown.
+ boolean needSleep = false;
+
+ synchronized (startStopLock)
{
- this.connect();
- } catch (Exception e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
- baseDn, e.getLocalizedMessage()));
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
+ if (connected || shutdown)
+ {
+ break;
+ }
+
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn,
+ e.getLocalizedMessage()));
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ }
+
+ if (connected || !infiniteTry)
+ {
+ break;
+ }
+
+ needSleep = true;
}
- if ((!connected) && (!infiniteTry))
- break;
- if ((!connected) && (!shutdown))
+
+ if (needSleep)
{
try
{
Thread.sleep(500);
- } catch (InterruptedException e)
+ }
+ catch (InterruptedException e)
{
// ignore
}
}
}
+
if (debugEnabled())
- debugInfo(this +
- " end restart : connected=" + connected +
- " with RSid=" + this.getRsServerId() +
- " genid=" + this.generationID);
+ {
+ debugInfo(this + " end restart : connected=" + connected + " with RSid="
+ + this.getRsServerId() + " genid=" + this.generationID);
+ }
}
/**
@@ -2376,9 +2402,9 @@
boolean reconnectOnFailure, boolean returnOnTopoChange)
throws SocketTimeoutException
{
- while (shutdown == false)
+ while (!shutdown)
{
- if ((reconnectOnFailure) && (!connected))
+ if (reconnectOnFailure && !connected)
{
// infinite try to reconnect
reStart(null, true);
@@ -2386,12 +2412,17 @@
// Save session information for later in case we need it for log messages
// after the session has been closed and/or failed.
- final ProtocolSession failingSession = session;
- final int replicationServerID = rsServerId;
+ final ProtocolSession savedSession = session;
+ if (savedSession == null)
+ {
+ // Must be shutting down.
+ break;
+ }
+ final int replicationServerID = rsServerId;
try
{
- ReplicationMsg msg = session.receive();
+ ReplicationMsg msg = savedSession.receive();
if (msg instanceof UpdateMsg)
{
synchronized (this)
@@ -2403,7 +2434,8 @@
{
WindowMsg windowMsg = (WindowMsg) msg;
sendWindow.release(windowMsg.getNumAck());
- } else if (msg instanceof TopologyMsg)
+ }
+ else if (msg instanceof TopologyMsg)
{
TopologyMsg topoMsg = (TopologyMsg) msg;
receiveTopo(topoMsg);
@@ -2417,20 +2449,22 @@
if (returnOnTopoChange)
return msg;
- } else if (msg instanceof StopMsg)
+ }
+ else if (msg instanceof StopMsg)
{
/*
* RS performs a proper disconnection
*/
Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
.get(replicationServerID,
- failingSession.getReadableRemoteAddress(),
- serverId, baseDn);
+ savedSession.getReadableRemoteAddress(),
+ serverId, baseDn);
logError(message);
// Try to find a suitable RS
- this.reStart(failingSession, true);
- } else if (msg instanceof MonitorMsg)
+ this.reStart(savedSession, true);
+ }
+ else if (msg instanceof MonitorMsg)
{
// This is the response to a MonitorRequest that was sent earlier or
// the regular message of the monitoring publisher of the RS.
@@ -2490,14 +2524,14 @@
{
message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
serverId, replicationServerID,
- failingSession.getReadableRemoteAddress(),
+ savedSession.getReadableRemoteAddress(),
baseDn);
}
else
{
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
serverId, replicationServerID,
- failingSession.getReadableRemoteAddress(),
+ savedSession.getReadableRemoteAddress(),
bestServerInfo.getServerId(), baseDn);
}
logError(message);
@@ -2508,36 +2542,45 @@
mustRunBestServerCheckingAlgorithm = 0;
}
}
- } else
+ }
+ else
{
return msg;
}
- } catch (SocketTimeoutException e)
+ }
+ catch (SocketTimeoutException e)
{
throw e;
- } catch (Exception e)
+ }
+ catch (Exception e)
{
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- if (shutdown == false)
+ if (!shutdown)
{
- if ((session == null) || (!session.closeInitiated()))
+ final ProtocolSession tmpSession = session;
+ if (tmpSession == null || !tmpSession.closeInitiated())
{
/*
* We did not initiate the close on our side, log an error message.
*/
Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
.get(serverId, baseDn, replicationServerID,
- failingSession.getReadableRemoteAddress());
+ savedSession.getReadableRemoteAddress());
logError(message);
}
+
if (reconnectOnFailure)
- reStart(failingSession, true);
+ {
+ reStart(savedSession, true);
+ }
else
- break; // does not seem necessary to explicitely disconnect ..
+ {
+ break; // does not seem necessary to explicitly disconnect ..
+ }
}
}
} // while !shutdown
@@ -2614,17 +2657,20 @@
" close the connection to replication server " + rsServerId + " for" +
" domain " + baseDn);
- stopRSHeartBeatMonitoring();
- stopChangeTimeHeartBeatPublishing();
- replicationServer = "stopped";
- shutdown = true;
- connected = false;
- rsGroupId = (byte) -1;
- rsServerId = -1;
- rsServerUrl = null;
- if (session != null)
+ synchronized (startStopLock)
{
- session.close();
+ shutdown = true;
+ connected = false;
+ stopRSHeartBeatMonitoring();
+ stopChangeTimeHeartBeatPublishing();
+ replicationServer = "stopped";
+ rsGroupId = (byte) -1;
+ rsServerId = -1;
+ rsServerUrl = null;
+ if (session != null)
+ {
+ session.close();
+ }
}
}
--
Gitblit v1.10.0