From aacb8bbf0a764ce8eb205e0f6376c055b3e1baa8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 03 Apr 2014 13:30:34 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3314) Re-implement changelog purging logic
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 86 ++++++++++++++++++++++++++++--------------
1 files changed, 57 insertions(+), 29 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index f18bd1e..4241fce 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,7 +27,6 @@
import java.io.File;
import java.util.*;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,7 +39,6 @@
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -64,7 +62,7 @@
public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
{
/** The tracer object for the debug logger. */
- protected static final DebugTracer TRACER = getTracer();
+ private static final DebugTracer TRACER = getTracer();
/**
* This map contains the List of updates received from each LDAP server.
@@ -846,6 +844,7 @@
*/
private final class ChangelogDBPurger extends DirectoryThread
{
+ private static final int DEFAULT_SLEEP = 500;
protected ChangelogDBPurger()
{
@@ -862,42 +861,58 @@
{
try
{
- final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
- if (localCNIndexDB == null)
- { // shutdown has been called
- return;
- }
-
final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
- final MultiDomainServerState purgeUpToCookie =
- localCNIndexDB.purgeUpTo(purgeTimestamp);
- if (purgeUpToCookie == null)
- { // this can happen when the change number index DB is empty
- continue;
+ final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+ final CSN oldestNotPurgedCSN;
+
+ // next code assumes that the compute-change-number config
+ // never changes during the life time of an RS
+ if (!config.isComputeChangeNumber())
+ {
+ oldestNotPurgedCSN = purgeCSN;
+ }
+ else
+ {
+ final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+ if (localCNIndexDB == null)
+ { // shutdown has been initiated
+ return;
+ }
+
+ oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
+ if (oldestNotPurgedCSN == null)
+ { // shutdown may have been initiated...
+ if (!isShutdownInitiated())
+ {
+ // ... or the change number index DB is empty,
+ // wait for new changes to come in.
+
+ // Note we cannot sleep for as long as the purge delay
+ // (3 days default), because we might receive late updates
+ // that will have to be purged before the purge delay elapses.
+ // This can particularly happen in case of network partitions.
+ sleep(DEFAULT_SLEEP);
+ }
+ continue;
+ }
}
- /*
- * Drive purge of the replica DBs by the oldest non purged cookie in
- * the change number index DB.
- */
- for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
- : domainToReplicaDBs.entrySet())
+ for (final Map<Integer, JEReplicaDB> domainMap
+ : domainToReplicaDBs.values())
{
- final DN baseDN = entry1.getKey();
- final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
- for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+ for (final JEReplicaDB replicaDB : domainMap.values())
{
- final Integer serverId = entry2.getKey();
- final JEReplicaDB replicaDB = entry2.getValue();
- replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+ replicaDB.purgeUpTo(oldestNotPurgedCSN);
}
}
latestPurgeDate = purgeTimestamp;
- // purge delay is specified in seconds so it should not be a problem
- // to sleep for 500 millis
- sleep(500);
+ sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown initiated?
}
catch (Exception e)
{
@@ -910,5 +925,18 @@
}
}
}
+
+ private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
+ {
+ final long nextPurgeTime = notPurgedCSN.getTime();
+ final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
+ if (currentPurgeTime <= nextPurgeTime)
+ {
+ // sleep till the next CSN to purge,
+ return nextPurgeTime - currentPurgeTime;
+ }
+ // wait a bit before purging more
+ return DEFAULT_SLEEP;
+ }
}
}
--
Gitblit v1.10.0