From 4f4948ee52a68b9efe6bb4584bc95f9f25b0fa5c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 04 Apr 2014 14:07:28 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3314) Re-implement changelog purging logic
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 88 +++++++++++++++++++++++++++++---------------
1 files changed, 58 insertions(+), 30 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index a88cf62..76729c5 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,19 +27,17 @@
import java.io.File;
import java.util.*;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessageBuilder;
-import org.opends.server.admin.std.server.ReplicationServerCfg;
-import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -835,6 +833,7 @@
*/
private final class ChangelogDBPurger extends DirectoryThread
{
+ private static final int DEFAULT_SLEEP = 500;
protected ChangelogDBPurger()
{
@@ -851,42 +850,58 @@
{
try
{
- final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
- if (localCNIndexDB == null)
- { // shutdown has been called
- return;
- }
-
final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
- final MultiDomainServerState purgeUpToCookie =
- localCNIndexDB.purgeUpTo(purgeTimestamp);
- if (purgeUpToCookie == null)
- { // this can happen when the change number index DB is empty
- continue;
+ final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+ final CSN oldestNotPurgedCSN;
+
+ // next code assumes that the compute-change-number config
+ // never changes during the life time of an RS
+ if (!config.isComputeChangeNumber())
+ {
+ oldestNotPurgedCSN = purgeCSN;
+ }
+ else
+ {
+ final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+ if (localCNIndexDB == null)
+ { // shutdown has been initiated
+ return;
+ }
+
+ oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
+ if (oldestNotPurgedCSN == null)
+ { // shutdown may have been initiated...
+ if (!isShutdownInitiated())
+ {
+ // ... or the change number index DB is empty,
+ // wait for new changes to come in.
+
+ // Note we cannot sleep for as long as the purge delay
+ // (3 days default), because we might receive late updates
+ // that will have to be purged before the purge delay elapses.
+ // This can particularly happen in case of network partitions.
+ sleep(DEFAULT_SLEEP);
+ }
+ continue;
+ }
}
- /*
- * Drive purge of the replica DBs by the oldest non purged cookie in
- * the change number index DB.
- */
- for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
- : domainToReplicaDBs.entrySet())
+ for (final Map<Integer, JEReplicaDB> domainMap
+ : domainToReplicaDBs.values())
{
- final DN baseDN = entry1.getKey();
- final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
- for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+ for (final JEReplicaDB replicaDB : domainMap.values())
{
- final Integer serverId = entry2.getKey();
- final JEReplicaDB replicaDB = entry2.getValue();
- replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+ replicaDB.purgeUpTo(oldestNotPurgedCSN);
}
}
latestPurgeDate = purgeTimestamp;
- // purge delay is specified in seconds so it should not be a problem
- // to sleep for 500 millis
- sleep(500);
+ sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown initiated?
}
catch (Exception e)
{
@@ -898,5 +913,18 @@
}
}
}
+
+ private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
+ {
+ final long nextPurgeTime = notPurgedCSN.getTime();
+ final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
+ if (currentPurgeTime <= nextPurgeTime)
+ {
+ // sleep till the next CSN to purge,
+ return nextPurgeTime - currentPurgeTime;
+ }
+ // wait a bit before purging more
+ return DEFAULT_SLEEP;
+ }
}
}
--
Gitblit v1.10.0