From aacb8bbf0a764ce8eb205e0f6376c055b3e1baa8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 03 Apr 2014 13:30:34 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3314) Re-implement changelog purging logic

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |   86 ++++++++++++++++++++++++++++--------------
 1 files changed, 57 insertions(+), 29 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index f18bd1e..4241fce 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,7 +27,6 @@
 
 import java.io.File;
 import java.util.*;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,7 +39,6 @@
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
@@ -64,7 +62,7 @@
 public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
 {
   /** The tracer object for the debug logger. */
-  protected static final DebugTracer TRACER = getTracer();
+  private static final DebugTracer TRACER = getTracer();
 
   /**
    * This map contains the List of updates received from each LDAP server.
@@ -846,6 +844,7 @@
    */
   private final class ChangelogDBPurger extends DirectoryThread
   {
+    private static final int DEFAULT_SLEEP = 500;
 
     protected ChangelogDBPurger()
     {
@@ -862,42 +861,58 @@
       {
         try
         {
-          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
-          if (localCNIndexDB == null)
-          { // shutdown has been called
-            return;
-          }
-
           final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
-          final MultiDomainServerState purgeUpToCookie =
-              localCNIndexDB.purgeUpTo(purgeTimestamp);
-          if (purgeUpToCookie == null)
-          { // this can happen when the change number index DB is empty
-            continue;
+          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+          final CSN oldestNotPurgedCSN;
+
+          // next code assumes that the compute-change-number config
+          // never changes during the life time of an RS
+          if (!config.isComputeChangeNumber())
+          {
+            oldestNotPurgedCSN = purgeCSN;
+          }
+          else
+          {
+            final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+            if (localCNIndexDB == null)
+            { // shutdown has been initiated
+              return;
+            }
+
+            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
+            if (oldestNotPurgedCSN == null)
+            { // shutdown may have been initiated...
+              if (!isShutdownInitiated())
+              {
+                // ... or the change number index DB is empty,
+                // wait for new changes to come in.
+
+                // Note we cannot sleep for as long as the purge delay
+                // (3 days default), because we might receive late updates
+                // that will have to be purged before the purge delay elapses.
+                // This can particularly happen in case of network partitions.
+                sleep(DEFAULT_SLEEP);
+              }
+              continue;
+            }
           }
 
-          /*
-           * Drive purge of the replica DBs by the oldest non purged cookie in
-           * the change number index DB.
-           */
-          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
-              : domainToReplicaDBs.entrySet())
+          for (final Map<Integer, JEReplicaDB> domainMap
+              : domainToReplicaDBs.values())
           {
-            final DN baseDN = entry1.getKey();
-            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
-            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+            for (final JEReplicaDB replicaDB : domainMap.values())
             {
-              final Integer serverId = entry2.getKey();
-              final JEReplicaDB replicaDB = entry2.getValue();
-              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+              replicaDB.purgeUpTo(oldestNotPurgedCSN);
             }
           }
 
           latestPurgeDate = purgeTimestamp;
 
-          // purge delay is specified in seconds so it should not be a problem
-          // to sleep for 500 millis
-          sleep(500);
+          sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+        }
+        catch (InterruptedException e)
+        {
+          // shutdown initiated?
         }
         catch (Exception e)
         {
@@ -910,5 +925,18 @@
         }
       }
     }
+
+    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
+    {
+      final long nextPurgeTime = notPurgedCSN.getTime();
+      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
+      if (currentPurgeTime <= nextPurgeTime)
+      {
+        // sleep till the next CSN to purge,
+        return nextPurgeTime - currentPurgeTime;
+      }
+      // wait a bit before purging more
+      return DEFAULT_SLEEP;
+    }
   }
 }

--
Gitblit v1.10.0