opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -122,7 +122,7 @@ * The purge delay (in milliseconds). Records in the changelog DB that are * older than this delay might be removed. */ private long purgeDelayInMillis; private volatile long purgeDelayInMillis; private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); /** The local replication server. */ @@ -574,21 +574,29 @@ public void setPurgeDelay(final long purgeDelayInMillis) { this.purgeDelayInMillis = purgeDelayInMillis; final ChangelogDBPurger purger; if (purgeDelayInMillis > 0) { purger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, purger)) { purger.start(); } // otherwise a purger was already running final ChangelogDBPurger newPurger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, newPurger)) { // no purger was running, run this new one newPurger.start(); } else { // a purger was already running, just wake that one up // to verify if some entries can be purged with the new purge delay final ChangelogDBPurger currentPurger = cnPurger.get(); synchronized (currentPurger) { currentPurger.notify(); } } } else { purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null); if (purgerToStop != null) { // stop this purger purgerToStop.initiateShutdown(); } } } @@ -902,7 +910,16 @@ // (3 days default), because we might receive late updates // that will have to be purged before the purge delay elapses. // This can particularly happen in case of network partitions. jeFriendlySleep(DEFAULT_SLEEP); if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(DEFAULT_SLEEP); } } } continue; } } @@ -915,7 +932,16 @@ } } jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); } } } } catch (InterruptedException e) { @@ -932,33 +958,6 @@ } } /** * This method implements a sleep() that is friendly to Berkeley JE. * <p> * Originally, {@link Thread#sleep(long)} was used , but waking up a * sleeping threads required calling {@link Thread#interrupt()}, and JE * threw exceptions when invoked on interrupted threads. * <p> * The solution is to replace: * <ol> * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li> * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li> * </ol> */ private void jeFriendlySleep(long millis) throws InterruptedException { if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(millis); } } } } private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) { final long nextPurgeTime = notPurgedCSN.getTime(); opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -117,7 +117,7 @@ * The purge delay (in milliseconds). Records in the changelog DB that are * older than this delay might be removed. */ private long purgeDelayInMillis; private volatile long purgeDelayInMillis; private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); /** The local replication server. */ @@ -617,21 +617,29 @@ public void setPurgeDelay(final long purgeDelayInMillis) { this.purgeDelayInMillis = purgeDelayInMillis; final ChangelogDBPurger purger; if (purgeDelayInMillis > 0) { purger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, purger)) { purger.start(); } // otherwise a purger was already running final ChangelogDBPurger newPurger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, newPurger)) { // no purger was running, run this new one newPurger.start(); } else { // a purger was already running, just wake that one up // to verify if some entries can be purged with the new purge delay final ChangelogDBPurger currentPurger = cnPurger.get(); synchronized (currentPurger) { currentPurger.notify(); } } } else { purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null); if (purgerToStop != null) { // stop this purger purgerToStop.initiateShutdown(); } } } @@ -945,7 +953,16 @@ // (3 days default), because we might receive late updates // that will have to be purged before the purge delay elapses. // This can particularly happen in case of network partitions. jeFriendlySleep(DEFAULT_SLEEP); if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(DEFAULT_SLEEP); } } } continue; } } @@ -958,7 +975,16 @@ } } jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); } } } } catch (InterruptedException e) { @@ -975,33 +1001,6 @@ } } /** * This method implements a sleep() that is friendly to Berkeley JE. * <p> * Originally, {@link Thread#sleep(long)} was used , but waking up a * sleeping threads required calling {@link Thread#interrupt()}, and JE * threw exceptions when invoked on interrupted threads. * <p> * The solution is to replace: * <ol> * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li> * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li> * </ol> */ private void jeFriendlySleep(long millis) throws InterruptedException { if (!isShutdownInitiated()) { synchronized (this) { if (!isShutdownInitiated()) { wait(millis); } } } } private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) { final long nextPurgeTime = notPurgedCSN.getTime();