| | |
| | | |
| | | import java.io.File; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | |
| | | public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB |
| | | { |
| | | /** The tracer object for the debug logger. */ |
| | | protected static final DebugTracer TRACER = getTracer(); |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | |
| | | */ |
| | | private final class ChangelogDBPurger extends DirectoryThread |
| | | { |
| | | private static final int DEFAULT_SLEEP = 500; |
| | | |
| | | protected ChangelogDBPurger() |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB; |
| | | if (localCNIndexDB == null) |
| | | { // shutdown has been called |
| | | return; |
| | | } |
| | | |
| | | final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; |
| | | final MultiDomainServerState purgeUpToCookie = |
| | | localCNIndexDB.purgeUpTo(purgeTimestamp); |
| | | if (purgeUpToCookie == null) |
| | | { // this can happen when the change number index DB is empty |
| | | continue; |
| | | final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); |
| | | final CSN oldestNotPurgedCSN; |
| | | |
| | | // next code assumes that the compute-change-number config |
| | | // never changes during the life time of an RS |
| | | if (!config.isComputeChangeNumber()) |
| | | { |
| | | oldestNotPurgedCSN = purgeCSN; |
| | | } |
| | | else |
| | | { |
| | | final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB; |
| | | if (localCNIndexDB == null) |
| | | { // shutdown has been initiated |
| | | return; |
| | | } |
| | | |
| | | oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN); |
| | | if (oldestNotPurgedCSN == null) |
| | | { // shutdown may have been initiated... |
| | | if (!isShutdownInitiated()) |
| | | { |
| | | // ... or the change number index DB is empty, |
| | | // wait for new changes to come in. |
| | | |
| | | // Note we cannot sleep for as long as the purge delay |
| | | // (3 days default), because we might receive late updates |
| | | // that will have to be purged before the purge delay elapses. |
| | | // This can particularly happen in case of network partitions. |
| | | sleep(DEFAULT_SLEEP); |
| | | } |
| | | continue; |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * Drive purge of the replica DBs by the oldest non purged cookie in |
| | | * the change number index DB. |
| | | */ |
| | | for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1 |
| | | : domainToReplicaDBs.entrySet()) |
| | | for (final Map<Integer, JEReplicaDB> domainMap |
| | | : domainToReplicaDBs.values()) |
| | | { |
| | | final DN baseDN = entry1.getKey(); |
| | | final Map<Integer, JEReplicaDB> domainMap = entry1.getValue(); |
| | | for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet()) |
| | | for (final JEReplicaDB replicaDB : domainMap.values()) |
| | | { |
| | | final Integer serverId = entry2.getKey(); |
| | | final JEReplicaDB replicaDB = entry2.getValue(); |
| | | replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId)); |
| | | replicaDB.purgeUpTo(oldestNotPurgedCSN); |
| | | } |
| | | } |
| | | |
| | | latestPurgeDate = purgeTimestamp; |
| | | |
| | | // purge delay is specified in seconds so it should not be a problem |
| | | // to sleep for 500 millis |
| | | sleep(500); |
| | | sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // shutdown initiated? |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) |
| | | { |
| | | final long nextPurgeTime = notPurgedCSN.getTime(); |
| | | final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis; |
| | | if (currentPurgeTime <= nextPurgeTime) |
| | | { |
| | | // sleep till the next CSN to purge, |
| | | return nextPurgeTime - currentPurgeTime; |
| | | } |
| | | // wait a bit before purging more |
| | | return DEFAULT_SLEEP; |
| | | } |
| | | } |
| | | } |