opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -124,7 +124,7 @@ * The purge delay (in milliseconds). Records in the changelog DB that are * older than this delay might be removed. */ private long purgeDelayInMillis; private volatile long purgeDelayInMillis; private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); /** The local replication server. */ @@ -579,21 +579,29 @@ public void setPurgeDelay(final long purgeDelayInMillis) { this.purgeDelayInMillis = purgeDelayInMillis; final ChangelogDBPurger purger; if (purgeDelayInMillis > 0) { purger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, purger)) { purger.start(); } // otherwise a purger was already running final ChangelogDBPurger newPurger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, newPurger)) { // no purger was running, run this new one newPurger.start(); } else { // a purger was already running, just wake that one up // to verify if some entries can be purged with the new purge delay final ChangelogDBPurger currentPurger = cnPurger.get(); synchronized (currentPurger) { currentPurger.notify(); } } } else { purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null); if (purgerToStop != null) { // stop this purger purgerToStop.initiateShutdown(); } } } @@ -910,7 +918,16 @@ // (3 days default), because we might receive late updates // that will have to be purged before the purge delay elapses. // This can particularly happen in case of network partitions. jeFriendlySleep(DEFAULT_SLEEP); if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(DEFAULT_SLEEP); } } } continue; } } @@ -923,7 +940,16 @@ } } jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); } } } } catch (InterruptedException e) { @@ -940,33 +966,6 @@ } } /** * This method implements a sleep() that is friendly to Berkeley JE. * <p> * Originally, {@link Thread#sleep(long)} was used , but waking up a * sleeping threads required calling {@link Thread#interrupt()}, and JE * threw exceptions when invoked on interrupted threads. * <p> * The solution is to replace: * <ol> * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li> * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li> * </ol> */ private void jeFriendlySleep(long millis) throws InterruptedException { if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(millis); } } } } private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) { final long nextPurgeTime = notPurgedCSN.getTime(); opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -119,7 +119,7 @@ * The purge delay (in milliseconds). Records in the changelog DB that are * older than this delay might be removed. */ private long purgeDelayInMillis; private volatile long purgeDelayInMillis; private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); /** The local replication server. */ @@ -622,21 +622,29 @@ public void setPurgeDelay(final long purgeDelayInMillis) { this.purgeDelayInMillis = purgeDelayInMillis; final ChangelogDBPurger purger; if (purgeDelayInMillis > 0) { purger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, purger)) { purger.start(); } // otherwise a purger was already running final ChangelogDBPurger newPurger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, newPurger)) { // no purger was running, run this new one newPurger.start(); } else { // a purger was already running, just wake that one up // to verify if some entries can be purged with the new purge delay final ChangelogDBPurger currentPurger = cnPurger.get(); synchronized (currentPurger) { currentPurger.notify(); } } } else { purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null); if (purgerToStop != null) { // stop this purger purgerToStop.initiateShutdown(); } } } @@ -953,7 +961,16 @@ // (3 days default), because we might receive late updates // that will have to be purged before the purge delay elapses. // This can particularly happen in case of network partitions. jeFriendlySleep(DEFAULT_SLEEP); if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(DEFAULT_SLEEP); } } } continue; } } @@ -966,7 +983,16 @@ } } jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); } } } } catch (InterruptedException e) { @@ -983,33 +1009,6 @@ } } /** * This method implements a sleep() that is friendly to Berkeley JE. * <p> * Originally, {@link Thread#sleep(long)} was used , but waking up a * sleeping threads required calling {@link Thread#interrupt()}, and JE * threw exceptions when invoked on interrupted threads. * <p> * The solution is to replace: * <ol> * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li> * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li> * </ol> */ private void jeFriendlySleep(long millis) throws InterruptedException { if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(millis); } } } } private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) { final long nextPurgeTime = notPurgedCSN.getTime();