mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.59.2014 ce28cf867009ee108e837cdd9de814fbcda77b8a
OPENDJ-1618 (CR-5155) External changelog not purged after the purge delay

The ChangelogDBPurger is smart: when changes exist in the DB, it sleeps until the oldest change can be purged.
The problem is that when admin changes the purge delay, the ChangelogDBPurger is not woken up to verify if the oldest change can be purged with the purge delay.
This fix wakes up the ChangelogDBPurger when the purge delay changes.


FileChangelogDB.java, JEChangelogDB.java:
In setPurgeDelay(), ensured an existing ChangelogDBPurger is woken up by a change to the purge delay.
Made purgeDelayInMillis volatile + In ChangelogDBPurger.run(), inlined jeFriendlySleep() to ensure the wait time is computed from the very latest purgeDelayInMillis.
2 files modified
158 ■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 79 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 79 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -122,7 +122,7 @@
   * The purge delay (in milliseconds). Records in the changelog DB that are
   * older than this delay might be removed.
   */
  private long purgeDelayInMillis;
  private volatile long purgeDelayInMillis;
  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
  /** The local replication server. */
@@ -574,21 +574,29 @@
  public void setPurgeDelay(final long purgeDelayInMillis)
  {
    this.purgeDelayInMillis = purgeDelayInMillis;
    final ChangelogDBPurger purger;
    if (purgeDelayInMillis > 0)
    {
      purger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, purger))
      {
        purger.start();
      } // otherwise a purger was already running
      final ChangelogDBPurger newPurger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, newPurger))
      { // no purger was running, run this new one
        newPurger.start();
      }
      else
      { // a purger was already running, just wake that one up
        // to verify if some entries can be purged with the new purge delay
        final ChangelogDBPurger currentPurger = cnPurger.get();
        synchronized (currentPurger)
        {
          currentPurger.notify();
        }
      }
    }
    else
    {
      purger = cnPurger.getAndSet(null);
      if (purger != null)
      {
        purger.initiateShutdown();
      final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
      if (purgerToStop != null)
      { // stop this purger
        purgerToStop.initiateShutdown();
      }
    }
  }
@@ -902,7 +910,16 @@
              // (3 days default), because we might receive late updates
              // that will have to be purged before the purge delay elapses.
              // This can particularly happen in case of network partitions.
              jeFriendlySleep(DEFAULT_SLEEP);
              if (!isShutdownInitiated())
              {
                synchronized (this)
                {
                  if (!isShutdownInitiated())
                  {
                    wait(DEFAULT_SLEEP);
                  }
                }
              }
              continue;
            }
          }
@@ -915,7 +932,16 @@
            }
          }
          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
          if (!isShutdownInitiated())
          {
            synchronized (this)
            {
              if (!isShutdownInitiated())
              {
                wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
              }
            }
          }
        }
        catch (InterruptedException e)
        {
@@ -932,33 +958,6 @@
      }
    }
    /**
     * This method implements a sleep() that is friendly to Berkeley JE.
     * <p>
     * Originally, {@link Thread#sleep(long)} was used , but waking up a
     * sleeping threads required calling {@link Thread#interrupt()}, and JE
     * threw exceptions when invoked on interrupted threads.
     * <p>
     * The solution is to replace:
     * <ol>
     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
     * </ol>
     */
    private void jeFriendlySleep(long millis) throws InterruptedException
    {
      if (!isShutdownInitiated())
      {
        synchronized (this)
        {
          if (!isShutdownInitiated())
          {
            wait(millis);
          }
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -117,7 +117,7 @@
   * The purge delay (in milliseconds). Records in the changelog DB that are
   * older than this delay might be removed.
   */
  private long purgeDelayInMillis;
  private volatile long purgeDelayInMillis;
  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
  /** The local replication server. */
@@ -617,21 +617,29 @@
  public void setPurgeDelay(final long purgeDelayInMillis)
  {
    this.purgeDelayInMillis = purgeDelayInMillis;
    final ChangelogDBPurger purger;
    if (purgeDelayInMillis > 0)
    {
      purger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, purger))
      {
        purger.start();
      } // otherwise a purger was already running
      final ChangelogDBPurger newPurger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, newPurger))
      { // no purger was running, run this new one
        newPurger.start();
      }
      else
      { // a purger was already running, just wake that one up
        // to verify if some entries can be purged with the new purge delay
        final ChangelogDBPurger currentPurger = cnPurger.get();
        synchronized (currentPurger)
        {
          currentPurger.notify();
        }
      }
    }
    else
    {
      purger = cnPurger.getAndSet(null);
      if (purger != null)
      {
        purger.initiateShutdown();
      final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
      if (purgerToStop != null)
      { // stop this purger
        purgerToStop.initiateShutdown();
      }
    }
  }
@@ -945,7 +953,16 @@
              // (3 days default), because we might receive late updates
              // that will have to be purged before the purge delay elapses.
              // This can particularly happen in case of network partitions.
              jeFriendlySleep(DEFAULT_SLEEP);
              if (!isShutdownInitiated())
              {
                synchronized (this)
                {
                  if (!isShutdownInitiated())
                  {
                    wait(DEFAULT_SLEEP);
                  }
                }
              }
              continue;
            }
          }
@@ -958,7 +975,16 @@
            }
          }
          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
          if (!isShutdownInitiated())
          {
            synchronized (this)
            {
              if (!isShutdownInitiated())
              {
                wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
              }
            }
          }
        }
        catch (InterruptedException e)
        {
@@ -975,33 +1001,6 @@
      }
    }
    /**
     * This method implements a sleep() that is friendly to Berkeley JE.
     * <p>
     * Originally, {@link Thread#sleep(long)} was used , but waking up a
     * sleeping threads required calling {@link Thread#interrupt()}, and JE
     * threw exceptions when invoked on interrupted threads.
     * <p>
     * The solution is to replace:
     * <ol>
     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
     * </ol>
     */
    private void jeFriendlySleep(long millis) throws InterruptedException
    {
      if (!isShutdownInitiated())
      {
        synchronized (this)
        {
          if (!isShutdownInitiated())
          {
            wait(millis);
          }
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();