mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.46.2014 947e952c52717610f323f6d4a60e0017f68621a1
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -124,7 +124,7 @@
   * The purge delay (in milliseconds). Records in the changelog DB that are
   * older than this delay might be removed.
   */
  private long purgeDelayInMillis;
  private volatile long purgeDelayInMillis;
  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
  /** The local replication server. */
@@ -579,21 +579,29 @@
  public void setPurgeDelay(final long purgeDelayInMillis)
  {
    this.purgeDelayInMillis = purgeDelayInMillis;
    final ChangelogDBPurger purger;
    if (purgeDelayInMillis > 0)
    {
      purger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, purger))
      {
        purger.start();
      } // otherwise a purger was already running
      final ChangelogDBPurger newPurger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, newPurger))
      { // no purger was running, run this new one
        newPurger.start();
      }
      else
      { // a purger was already running, just wake that one up
        // to verify if some entries can be purged with the new purge delay
        final ChangelogDBPurger currentPurger = cnPurger.get();
        synchronized (currentPurger)
        {
          currentPurger.notify();
        }
      }
    }
    else
    {
      purger = cnPurger.getAndSet(null);
      if (purger != null)
      {
        purger.initiateShutdown();
      final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
      if (purgerToStop != null)
      { // stop this purger
        purgerToStop.initiateShutdown();
      }
    }
  }
@@ -910,7 +918,16 @@
              // (3 days default), because we might receive late updates
              // that will have to be purged before the purge delay elapses.
              // This can particularly happen in case of network partitions.
              jeFriendlySleep(DEFAULT_SLEEP);
              if (!isShutdownInitiated())
              {
                synchronized (this)
                {
                  if (!isShutdownInitiated())
                  {
                    wait(DEFAULT_SLEEP);
                  }
                }
              }
              continue;
            }
          }
@@ -923,7 +940,16 @@
            }
          }
          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
          if (!isShutdownInitiated())
          {
            synchronized (this)
            {
              if (!isShutdownInitiated())
              {
                wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
              }
            }
          }
        }
        catch (InterruptedException e)
        {
@@ -940,33 +966,6 @@
      }
    }
    /**
     * This method implements a sleep() that is friendly to Berkeley JE.
     * <p>
     * Originally, {@link Thread#sleep(long)} was used , but waking up a
     * sleeping threads required calling {@link Thread#interrupt()}, and JE
     * threw exceptions when invoked on interrupted threads.
     * <p>
     * The solution is to replace:
     * <ol>
     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
     * </ol>
     */
    private void jeFriendlySleep(long millis) throws InterruptedException
    {
      if (!isShutdownInitiated())
      {
        synchronized (this)
        {
          if (!isShutdownInitiated())
          {
            wait(millis);
          }
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();