From 14e5a18a4ba3146470fd4d82152b6f061b442dce Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 31 Mar 2014 09:40:11 +0000
Subject: [PATCH] OPENDJ-1177 Re-implement changelog purging logic

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java |  117 ++++++++++++++++++++++++++++------------------------------
 1 files changed, 56 insertions(+), 61 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 4af7853..e7309b0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -301,11 +301,6 @@
   public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
       throws ChangelogException
   {
-    if (startAfterCSN == null)
-    {
-      // flush any potential changes before opening the cursor
-      flush();
-    }
     return new JEReplicaDBCursor(db, startAfterCSN, this);
   }
 
@@ -427,7 +422,6 @@
       trimDate = lastBeforeTrimDate;
     }
 
-    final int queueLowMarkBytes = queueMaxBytes / 5;
     for (int i = 0; i < 100; i++)
     {
       /*
@@ -436,69 +430,73 @@
        * lowmark). Once the flush backlog increases, stop trimming and start
        * flushing more eagerly.
        */
-      if (i > 20 && msgQueue.size() < queueLowMarkBytes)
+      if (i > 20 && isQueueAboveLowMark())
       {
         break;
       }
 
-      synchronized (flushLock)
+      /*
+       * the trim is done by group in order to save some CPU, IO bandwidth and
+       * DB caches: start the transaction then do a bunch of remove then
+       * commit.
+       */
+      /*
+       * Matt wrote: The record removal is done as a DB transaction and the
+       * deleted records are only "deleted" on commit. While the txn/cursor is
+       * open the records to be deleted will, I think, be pinned in the DB
+       * cache. In other words, the larger the transaction (the more records
+       * deleted during a single batch) the more DB cache will be used to
+       * process the transaction.
+       */
+      final ReplServerDBCursor cursor = db.openDeleteCursor();
+      try
       {
-        /*
-         * the trim is done by group in order to save some CPU, IO bandwidth and
-         * DB caches: start the transaction then do a bunch of remove then
-         * commit.
-         */
-        /*
-         * Matt wrote: The record removal is done as a DB transaction and the
-         * deleted records are only "deleted" on commit. While the txn/cursor is
-         * open the records to be deleted will, I think, be pinned in the DB
-         * cache. In other words, the larger the transaction (the more records
-         * deleted during a single batch) the more DB cache will be used to
-         * process the transaction.
-         */
-        final ReplServerDBCursor cursor = db.openDeleteCursor();
-        try
+        for (int j = 0; j < 50; j++)
         {
-          for (int j = 0; j < 50; j++)
+          if (thread.isShutdownInitiated())
           {
-            if (thread.isShutdownInitiated())
-            {
-              return;
-            }
+            return;
+          }
 
-            CSN csn = cursor.nextCSN();
-            if (csn == null)
-            {
-              return;
-            }
+          CSN csn = cursor.nextCSN();
+          if (csn == null)
+          {
+            return;
+          }
 
-            if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
-            {
-              cursor.delete();
-            }
-            else
-            {
-              csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
-              return;
-            }
+          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
+          {
+            cursor.delete();
+          }
+          else
+          {
+            csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
+            return;
           }
         }
-        catch (ChangelogException e)
-        {
-          // mark shutdown for this db so that we don't try again to
-          // stop it from cursor.close() or methods called by cursor.close()
-          cursor.abort();
-          thread.initiateShutdown();
-          throw e;
-        }
-        finally
-        {
-          cursor.close();
-        }
+      }
+      catch (ChangelogException e)
+      {
+        // mark shutdown for this db so that we don't try again to
+        // stop it from cursor.close() or methods called by cursor.close()
+        cursor.abort();
+        thread.initiateShutdown();
+        throw e;
+      }
+      finally
+      {
+        cursor.close();
       }
     }
   }
 
+  private boolean isQueueAboveLowMark()
+  {
+    final int lowMarkBytes = queueMaxBytes / 5;
+    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
+    return bytesUsed > lowMarkBytes;
+  }
+
   /**
    * Flush a number of updates from the memory list to the stable storage.
    * <p>
@@ -621,13 +619,10 @@
    */
   public void clear() throws ChangelogException
   {
-    synchronized(flushLock)
-    {
-      collectAllPermits();
-      msgQueue.clear();
-      db.clear();
-      csnLimits = new CSNLimits(null, null);
-    }
+    collectAllPermits();
+    msgQueue.clear(); // this call should not do anything at all
+    db.clear();
+    csnLimits = new CSNLimits(null, null);
   }
 
   /**

--
Gitblit v1.10.0