From 819f74758a1c464bbf578e70ca8592cc8d101d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Apr 2014 09:51:11 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java |  129 ++++++++++--------------------------------
 1 files changed, 32 insertions(+), 97 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index e7309b0..bbde1d1 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -50,7 +50,6 @@
 import org.opends.server.types.Attributes;
 import org.opends.server.types.DN;
 import org.opends.server.types.InitializationException;
-import org.opends.server.util.TimeThread;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
@@ -95,16 +94,8 @@
    * <p>
    * This blocking queue is only used as a temporary placeholder so that the
    * write in the stable storage can be grouped for efficiency reason. Adding an
-   * update synchronously add the update to this list. A dedicated thread loops
-   * on {@link #flush()} and {@link #trim()}.
-   * <dl>
-   * <dt>flush()</dt>
-   * <dd>get a number of changes from the in memory list by block and write them
-   * to the db.</dd>
-   * <dt>trim()</dt>
-   * <dd>deletes from the DB a number of changes that are older than a certain
-   * date.</dd>
-   * </dl>
+   * update synchronously add the update to this list. A dedicated thread
+   * flushes this blocking queue.
    * <p>
    * Changes are not read back by replicationServer threads that are responsible
    * for pushing the changes to other replication server or to LDAP server
@@ -133,22 +124,12 @@
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
   private DirectoryThread thread;
   /**
-   * Used to prevent race conditions between threads calling {@link #clear()}
-   * {@link #flush()} or {@link #trim()}. This can happen with the thread
-   * flushing the queue, on shutdown or on cursor opening, a thread calling
-   * clear(), etc.
+   * Used to prevent race conditions between threads calling {@link #flush()}.
+   * This can happen with the thread flushing the queue, or else on shutdown.
    */
   private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
-  private long latestTrimDate = 0;
-
-  /**
-   * The trim age in milliseconds. Changes record in the change DB that
-   * are older than this age are removed.
-   */
-  private long trimAge;
-
   /**
    * Creates a new ReplicaDB associated to a given LDAP server.
    *
@@ -166,15 +147,14 @@
     this.replicationServer = replicationServer;
     this.serverId = serverId;
     this.baseDN = baseDN;
-    trimAge = replicationServer.getTrimAge();
     queueMaxBytes = replicationServer.getQueueSize() * 200;
     queueSizeBytes = new Semaphore(queueMaxBytes);
     db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
     csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
     thread = new DirectoryThread(this, "Replication server RS("
-        + replicationServer.getServerId()
-        + ") changelog checkpointer for Replica DS(" + serverId
-        + ") for domain \"" + baseDN + "\"");
+            + replicationServer.getServerId()
+            + ") flusher thread for Replica DS(" + serverId
+            + ") for domain \"" + baseDN + "\"");
     thread.start();
 
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -334,9 +314,7 @@
   }
 
   /**
-   * Run method for this class.
-   * Periodically Flushes the ReplicationServerDomain cache from memory to the
-   * stable storage and trims the old updates.
+   * Flushes the replicaDB queue from memory to stable storage.
    */
   @Override
   public void run()
@@ -350,7 +328,6 @@
         try
         {
           flush();
-          trim();
         }
         catch (ChangelogException end)
         {
@@ -390,55 +367,26 @@
   }
 
   /**
-   * Retrieves the latest trim date.
-   * @return the latest trim date.
+   * Synchronously purge changes older than purgeCSN from this replicaDB.
+   *
+   * @param purgeCSN
+   *          The CSN up to which changes can be purged. No purging happens when
+   *          it is null.
+   * @throws ChangelogException
+   *           In case of database problem.
    */
-  public long getLatestTrimDate()
+  void purgeUpTo(final CSN purgeCSN) throws ChangelogException
   {
-    return latestTrimDate;
-  }
-
-
-  /**
-   * Trim old changes from this replicationServer database.
-   * @throws ChangelogException In case of database problem.
-   */
-  private void trim() throws ChangelogException
-  {
-    if (trimAge == 0)
+    if (purgeCSN == null)
     {
       return;
     }
 
-    latestTrimDate = TimeThread.getTime() - trimAge;
-
-    CSN trimDate = new CSN(latestTrimDate, 0, 0);
-
-    // Find the last CSN before the trimDate, in the Database.
-    CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate);
-    if (lastBeforeTrimDate != null)
-    {
-      // If we found it, we want to stop trimming when reaching it.
-      trimDate = lastBeforeTrimDate;
-    }
-
     for (int i = 0; i < 100; i++)
     {
       /*
-       * Perform at least some trimming regardless of the flush backlog. Then
-       * continue trim iterations while the flush backlog is low (below the
-       * lowmark). Once the flush backlog increases, stop trimming and start
-       * flushing more eagerly.
-       */
-      if (i > 20 && isQueueAboveLowMark())
-      {
-        break;
-      }
-
-      /*
-       * the trim is done by group in order to save some CPU, IO bandwidth and
-       * DB caches: start the transaction then do a bunch of remove then
-       * commit.
+       * the purge is done by group in order to save some CPU, IO bandwidth and
+       * DB caches: start the transaction then do a bunch of remove then commit.
        */
       /*
        * Matt wrote: The record removal is done as a DB transaction and the
@@ -464,7 +412,7 @@
             return;
           }
 
-          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
+          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN))
           {
             cursor.delete();
           }
@@ -490,37 +438,31 @@
     }
   }
 
-  private boolean isQueueAboveLowMark()
-  {
-    final int lowMarkBytes = queueMaxBytes / 5;
-    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
-    return bytesUsed > lowMarkBytes;
-  }
-
   /**
    * Flush a number of updates from the memory list to the stable storage.
    * <p>
    * Flush is done by chunk sized to 500 messages, starting from the beginning
    * of the list.
-   *
+   * <p>
+   * @GuardedBy("flushLock")
    * @throws ChangelogException
    *           If a database problem happened
    */
-  public void flush() throws ChangelogException
+  private void flush() throws ChangelogException
   {
     try
     {
       synchronized (flushLock)
       {
-        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
-        final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
+        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
         if (change == null)
         {
-          // nothing to persist, move on to the trim phase
+          // nothing to persist, check if shutdown was invoked
           return;
         }
 
         // Try to see if there are more changes and persist them all.
+        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
         changes.add(change);
         msgQueue.drainTo(changes);
 
@@ -604,15 +546,6 @@
   }
 
   /**
-   * Set the Purge delay for this db Handler.
-   * @param delay The purge delay in Milliseconds.
-   */
-  public void setPurgeDelay(long delay)
-  {
-    trimAge = delay;
-  }
-
-  /**
    * Clear the changes from this DB (from both memory cache and DB storage).
    * @throws ChangelogException When an exception occurs while removing the
    * changes from the DB.
@@ -636,13 +569,15 @@
   }
 
   /**
-   * Return the size of the msgQueue (the memory cache of the ReplicaDB).
+   * Return the number of records of this replicaDB.
+   * <p>
    * For test purpose.
-   * @return The memory queue size.
+   *
+   * @return The number of records of this replicaDB.
    */
-  int getQueueSize()
+  long getNumberRecords()
   {
-    return this.msgQueue.size();
+    return db.getNumberRecords();
   }
 
   /**

--
Gitblit v1.10.0