From 74328205a692a025fbf44a6af9c9f1f456ef34df Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Sep 2013 07:51:42 +0000
Subject: [PATCH] OPENDJ-1130 (CR-2323) Connect Error when stopping all the servers from a replication topology at the same time
---
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java | 113 +++++++++++++++++++++++++++-----------------------------
1 files changed, 55 insertions(+), 58 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index 55519b1..0ba2b64 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -115,8 +115,6 @@
private int serverId;
private String baseDn;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
- private boolean shutdown = false;
- private boolean done = false;
private DirectoryThread thread;
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
@@ -310,30 +308,18 @@
*/
public void shutdown()
{
- if (shutdown)
+ if (thread.isShutdownInitiated())
{
return;
}
- shutdown = true;
+ thread.initiateShutdown();
+
synchronized (msgQueue)
{
msgQueue.notifyAll();
}
- synchronized (this)
- { /* Can this be replaced with thread.join() ? */
- while (!done)
- {
- try
- {
- wait();
- }
- catch (InterruptedException e)
- { /* do nothing */}
- }
- }
-
while (msgQueue.size() != 0)
{
flush();
@@ -351,55 +337,62 @@
@Override
public void run()
{
- while (!shutdown)
- {
- try
- {
- flush();
- trim();
+ thread.startWork();
- synchronized (msgQueue)
+ try
+ {
+ while (!thread.isShutdownInitiated())
+ {
+ try
{
- if (msgQueue.size() < queueLowmark
- && queueByteSize < queueLowmarkBytes)
+ flush();
+ trim();
+
+ synchronized (msgQueue)
{
- try
+ if (msgQueue.size() < queueLowmark
+ && queueByteSize < queueLowmarkBytes)
{
- msgQueue.wait(1000);
- } catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
+ try
+ {
+ msgQueue.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
}
}
- } catch (Exception end)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(end));
- logError(mb.toMessage());
- synchronized (this)
+ catch (Exception end)
{
- // set the done variable to true so that this thread don't
- // get stuck in this dbHandler.shutdown() when it get called
- // by replicationServer.shutdown();
- done = true;
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
+ mb.append(" ");
+ mb.append(stackTraceToSingleLineString(end));
+ logError(mb.toMessage());
+
+ thread.initiateShutdown();
+
+ if (replicationServer != null)
+ {
+ replicationServer.shutdown();
+ }
+ break;
}
- if (replicationServer != null)
- {
- replicationServer.shutdown();
- }
- break;
}
+
+ // call flush a last time before exiting to make sure that
+ // no change was forgotten in the msgQueue
+ flush();
}
- // call flush a last time before exiting to make sure that
- // no change was forgotten in the msgQueue
- flush();
+ finally
+ {
+ thread.stopWork();
+ }
synchronized (this)
{
- done = true;
notifyAll();
}
}
@@ -450,11 +443,14 @@
{
for (int j = 0; j < 50; j++)
{
+ if (thread.isShutdownInitiated())
+ {
+ return;
+ }
+
CSN csn = cursor.nextCSN();
if (csn == null)
{
- cursor.close();
- done = true;
return;
}
@@ -465,21 +461,22 @@
else
{
firstChange = csn;
- cursor.close();
- done = true;
return;
}
}
- cursor.close();
}
catch (ChangelogException e)
{
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
cursor.abort();
- shutdown = true;
+ thread.initiateShutdown();
throw e;
}
+ finally
+ {
+ cursor.close();
+ }
}
}
}
--
Gitblit v1.10.0