| | |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | | indexer.interrupt(); |
| | | } |
| | | final ChangelogDBPurger purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | purger.initiateShutdown(); |
| | | purger.interrupt(); |
| | | } |
| | | |
| | | try |
| | |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | |
| | | oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN); |
| | | if (oldestNotPurgedCSN == null) |
| | | { // shutdown may have been initiated... |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | // ... or the change number index DB is empty, |
| | | // wait for new changes to come in. |
| | | // ... or the change number index DB is empty, |
| | | // wait for new changes to come in. |
| | | |
| | | // Note we cannot sleep for as long as the purge delay |
| | | // (3 days default), because we might receive late updates |
| | | // that will have to be purged before the purge delay elapses. |
| | | // This can particularly happen in case of network partitions. |
| | | sleep(DEFAULT_SLEEP); |
| | | } |
| | | // Note we cannot sleep for as long as the purge delay |
| | | // (3 days default), because we might receive late updates |
| | | // that will have to be purged before the purge delay elapses. |
| | | // This can particularly happen in case of network partitions. |
| | | jeFriendlySleep(DEFAULT_SLEEP); |
| | | continue; |
| | | } |
| | | } |
| | |
| | | |
| | | latestPurgeDate = purgeTimestamp; |
| | | |
| | | sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This method implements a sleep() that is friendly to Berkeley JE. |
| | | * <p> |
| | | * Originally, {@link Thread#sleep(long)} was used , but waking up a |
| | | * sleeping threads required calling {@link Thread#interrupt()}, and JE |
| | | * threw exceptions when invoked on interrupted threads. |
| | | * <p> |
| | | * The solution is to replace: |
| | | * <ol> |
| | | * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li> |
| | | * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li> |
| | | * </ol> |
| | | */ |
| | | private void jeFriendlySleep(long millis) throws InterruptedException |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | wait(millis); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) |
| | | { |
| | | final long nextPurgeTime = notPurgedCSN.getTime(); |
| | |
| | | // wait a bit before purging more |
| | | return DEFAULT_SLEEP; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | | { |
| | | super.initiateShutdown(); |
| | | synchronized (this) |
| | | { |
| | | notify(); // wake up the purger thread for faster shutdown |
| | | } |
| | | } |
| | | } |
| | | } |