| | |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | | indexer.interrupt(); |
| | | } |
| | | final ChangelogDBPurger purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | purger.initiateShutdown(); |
| | | purger.interrupt(); |
| | | } |
| | | |
| | | try |
| | |
| | | oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN); |
| | | if (oldestNotPurgedCSN == null) |
| | | { // shutdown may have been initiated... |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | // ... or the change number index DB is empty, |
| | | // wait for new changes to come in. |
| | | // ... or the change number index DB is empty, |
| | | // wait for new changes to come in. |
| | | |
| | | // Note we cannot sleep for as long as the purge delay |
| | | // (3 days default), because we might receive late updates |
| | | // that will have to be purged before the purge delay elapses. |
| | | // This can particularly happen in case of network partitions. |
| | | sleep(DEFAULT_SLEEP); |
| | | } |
| | | // Note we cannot sleep for as long as the purge delay |
| | | // (3 days default), because we might receive late updates |
| | | // that will have to be purged before the purge delay elapses. |
| | | // This can particularly happen in case of network partitions. |
| | | jeFriendlySleep(DEFAULT_SLEEP); |
| | | continue; |
| | | } |
| | | } |
| | |
| | | |
| | | latestPurgeDate = purgeTimestamp; |
| | | |
| | | sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This method implements a sleep() that is friendly to Berkeley JE. |
| | | * <p> |
| | | * Originally, {@link Thread#sleep(long)} was used , but waking up a |
| | | * sleeping threads required calling {@link Thread#interrupt()}, and JE |
| | | * threw exceptions when invoked on interrupted threads. |
| | | * <p> |
| | | * The solution is to replace: |
| | | * <ol> |
| | | * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li> |
| | | * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li> |
| | | * </ol> |
| | | */ |
| | | private void jeFriendlySleep(long millis) throws InterruptedException |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | wait(millis); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) |
| | | { |
| | | final long nextPurgeTime = notPurgedCSN.getTime(); |
| | |
| | | public void initiateShutdown() |
| | | { |
| | | super.initiateShutdown(); |
| | | this.interrupt(); // wake up the purger thread for faster shutdown |
| | | synchronized (this) |
| | | { |
| | | notify(); // wake up the purger thread for faster shutdown |
| | | } |
| | | } |
| | | } |
| | | } |