| | |
| | | * The purge delay (in milliseconds). Records in the changelog DB that are |
| | | * older than this delay might be removed. |
| | | */ |
| | | private long purgeDelayInMillis; |
| | | private volatile long purgeDelayInMillis; |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); |
| | | |
| | | /** The local replication server. */ |
| | |
| | | public void setPurgeDelay(final long purgeDelayInMillis) |
| | | { |
| | | this.purgeDelayInMillis = purgeDelayInMillis; |
| | | final ChangelogDBPurger purger; |
| | | if (purgeDelayInMillis > 0) |
| | | { |
| | | purger = new ChangelogDBPurger(); |
| | | if (cnPurger.compareAndSet(null, purger)) |
| | | { |
| | | purger.start(); |
| | | } // otherwise a purger was already running |
| | | final ChangelogDBPurger newPurger = new ChangelogDBPurger(); |
| | | if (cnPurger.compareAndSet(null, newPurger)) |
| | | { // no purger was running, run this new one |
| | | newPurger.start(); |
| | | } |
| | | else |
| | | { // a purger was already running, just wake that one up |
| | | // to verify if some entries can be purged with the new purge delay |
| | | final ChangelogDBPurger currentPurger = cnPurger.get(); |
| | | synchronized (currentPurger) |
| | | { |
| | | currentPurger.notify(); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | purger.initiateShutdown(); |
| | | final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null); |
| | | if (purgerToStop != null) |
| | | { // stop this purger |
| | | purgerToStop.initiateShutdown(); |
| | | } |
| | | } |
| | | } |
| | |
| | | // (3 days default), because we might receive late updates |
| | | // that will have to be purged before the purge delay elapses. |
| | | // This can particularly happen in case of network partitions. |
| | | jeFriendlySleep(DEFAULT_SLEEP); |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | wait(DEFAULT_SLEEP); |
| | | } |
| | | } |
| | | } |
| | | continue; |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This method implements a sleep() that is friendly to Berkeley JE. |
| | | * <p> |
| | | * Originally, {@link Thread#sleep(long)} was used , but waking up a |
| | | * sleeping threads required calling {@link Thread#interrupt()}, and JE |
| | | * threw exceptions when invoked on interrupted threads. |
| | | * <p> |
| | | * The solution is to replace: |
| | | * <ol> |
| | | * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li> |
| | | * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li> |
| | | * </ol> |
| | | */ |
| | | private void jeFriendlySleep(long millis) throws InterruptedException |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | wait(millis); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) |
| | | { |
| | | final long nextPurgeTime = notPurgedCSN.getTime(); |