From 947e952c52717610f323f6d4a60e0017f68621a1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 05 Nov 2014 16:46:08 +0000
Subject: [PATCH] OPENDJ-1618 (CR-5155) External changelog not purged after the purge delay

---
 opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java |   79 +++++++++++++++++++--------------------
 1 files changed, 39 insertions(+), 40 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index e3ddb3e..3d2730c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -124,7 +124,7 @@
    * The purge delay (in milliseconds). Records in the changelog DB that are
    * older than this delay might be removed.
    */
-  private long purgeDelayInMillis;
+  private volatile long purgeDelayInMillis;
   private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
 
   /** The local replication server. */
@@ -579,21 +579,29 @@
   public void setPurgeDelay(final long purgeDelayInMillis)
   {
     this.purgeDelayInMillis = purgeDelayInMillis;
-    final ChangelogDBPurger purger;
     if (purgeDelayInMillis > 0)
     {
-      purger = new ChangelogDBPurger();
-      if (cnPurger.compareAndSet(null, purger))
-      {
-        purger.start();
-      } // otherwise a purger was already running
+      final ChangelogDBPurger newPurger = new ChangelogDBPurger();
+      if (cnPurger.compareAndSet(null, newPurger))
+      { // no purger was running, run this new one
+        newPurger.start();
+      }
+      else
+      { // a purger was already running, just wake that one up
+        // to verify if some entries can be purged with the new purge delay
+        final ChangelogDBPurger currentPurger = cnPurger.get();
+        synchronized (currentPurger)
+        {
+          currentPurger.notify();
+        }
+      }
     }
     else
     {
-      purger = cnPurger.getAndSet(null);
-      if (purger != null)
-      {
-        purger.initiateShutdown();
+      final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
+      if (purgerToStop != null)
+      { // stop this purger
+        purgerToStop.initiateShutdown();
       }
     }
   }
@@ -910,7 +918,16 @@
               // (3 days default), because we might receive late updates
               // that will have to be purged before the purge delay elapses.
               // This can particularly happen in case of network partitions.
-              jeFriendlySleep(DEFAULT_SLEEP);
+              if (!isShutdownInitiated())
+              {
+                synchronized (this)
+                {
+                  if (!isShutdownInitiated())
+                  {
+                    wait(DEFAULT_SLEEP);
+                  }
+                }
+              }
               continue;
             }
           }
@@ -923,7 +940,16 @@
             }
           }
 
-          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+          if (!isShutdownInitiated())
+          {
+            synchronized (this)
+            {
+              if (!isShutdownInitiated())
+              {
+                wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+              }
+            }
+          }
         }
         catch (InterruptedException e)
         {
@@ -940,33 +966,6 @@
       }
     }
 
-    /**
-     * This method implements a sleep() that is friendly to Berkeley JE.
-     * <p>
-     * Originally, {@link Thread#sleep(long)} was used , but waking up a
-     * sleeping threads required calling {@link Thread#interrupt()}, and JE
-     * threw exceptions when invoked on interrupted threads.
-     * <p>
-     * The solution is to replace:
-     * <ol>
-     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
-     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
-     * </ol>
-     */
-    private void jeFriendlySleep(long millis) throws InterruptedException
-    {
-      if (!isShutdownInitiated())
-      {
-        synchronized (this)
-        {
-          if (!isShutdownInitiated())
-          {
-            wait(millis);
-          }
-        }
-      }
-    }
-
     private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
     {
       final long nextPurgeTime = notPurgedCSN.getTime();

--
Gitblit v1.10.0