From 947e952c52717610f323f6d4a60e0017f68621a1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 05 Nov 2014 16:46:08 +0000
Subject: [PATCH] OPENDJ-1618 (CR-5155) External changelog not purged after the purge delay
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 79 +++++++++++++++++++--------------------
1 files changed, 39 insertions(+), 40 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 085504d..89d2d9e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -119,7 +119,7 @@
* The purge delay (in milliseconds). Records in the changelog DB that are
* older than this delay might be removed.
*/
- private long purgeDelayInMillis;
+ private volatile long purgeDelayInMillis;
private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
/** The local replication server. */
@@ -622,21 +622,29 @@
public void setPurgeDelay(final long purgeDelayInMillis)
{
this.purgeDelayInMillis = purgeDelayInMillis;
- final ChangelogDBPurger purger;
if (purgeDelayInMillis > 0)
{
- purger = new ChangelogDBPurger();
- if (cnPurger.compareAndSet(null, purger))
- {
- purger.start();
- } // otherwise a purger was already running
+ final ChangelogDBPurger newPurger = new ChangelogDBPurger();
+ if (cnPurger.compareAndSet(null, newPurger))
+ { // no purger was running, run this new one
+ newPurger.start();
+ }
+ else
+ { // a purger was already running, just wake that one up
+ // to verify if some entries can be purged with the new purge delay
+ final ChangelogDBPurger currentPurger = cnPurger.get();
+ synchronized (currentPurger)
+ {
+ currentPurger.notify();
+ }
+ }
}
else
{
- purger = cnPurger.getAndSet(null);
- if (purger != null)
- {
- purger.initiateShutdown();
+ final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
+ if (purgerToStop != null)
+ { // stop this purger
+ purgerToStop.initiateShutdown();
}
}
}
@@ -953,7 +961,16 @@
// (3 days default), because we might receive late updates
// that will have to be purged before the purge delay elapses.
// This can particularly happen in case of network partitions.
- jeFriendlySleep(DEFAULT_SLEEP);
+ if (!isShutdownInitiated())
+ {
+ synchronized (this)
+ {
+ if (!isShutdownInitiated())
+ {
+ wait(DEFAULT_SLEEP);
+ }
+ }
+ }
continue;
}
}
@@ -966,7 +983,16 @@
}
}
- jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ if (!isShutdownInitiated())
+ {
+ synchronized (this)
+ {
+ if (!isShutdownInitiated())
+ {
+ wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ }
+ }
+ }
}
catch (InterruptedException e)
{
@@ -983,33 +1009,6 @@
}
}
- /**
- * This method implements a sleep() that is friendly to Berkeley JE.
- * <p>
- * Originally, {@link Thread#sleep(long)} was used , but waking up a
- * sleeping threads required calling {@link Thread#interrupt()}, and JE
- * threw exceptions when invoked on interrupted threads.
- * <p>
- * The solution is to replace:
- * <ol>
- * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
- * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
- * </ol>
- */
- private void jeFriendlySleep(long millis) throws InterruptedException
- {
- if (!isShutdownInitiated())
- {
- synchronized (this)
- {
- if (!isShutdownInitiated())
- {
- wait(millis);
- }
- }
- }
- }
-
private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
{
final long nextPurgeTime = notPurgedCSN.getTime();
--
Gitblit v1.10.0