From 08908175782573c536cb485092e473c7f1729281 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 28 Mar 2014 11:50:04 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3278) Re-implement changelog purging logic

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java |  354 +++++++++++++++++++++++++++++-----------------------------
 1 files changed, 176 insertions(+), 178 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index b85d132..4426b0f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS
+ *      Portions copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.je;
 
@@ -30,6 +30,9 @@
 import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.DirectoryThread;
@@ -67,13 +70,33 @@
  */
 public class JEReplicaDB implements Runnable
 {
+
+  /**
+   * Class that allows atomically setting oldest and newest CSNs without
+   * synchronization.
+   *
+   * @Immutable
+   */
+  private static final class CSNLimits
+  {
+    private final CSN oldestCSN;
+    private final CSN newestCSN;
+
+    public CSNLimits(CSN oldestCSN, CSN newestCSN)
+    {
+      this.oldestCSN = oldestCSN;
+      this.newestCSN = newestCSN;
+    }
+
+  }
+
   /**
    * The msgQueue holds all the updates not yet saved to stable storage.
    * <p>
-   * This list is only used as a temporary placeholder so that the write in the
-   * stable storage can be grouped for efficiency reason. Adding an update
-   * synchronously add the update to this list. A dedicated thread loops on
-   * flush() and trim().
+   * This blocking queue is only used as a temporary placeholder so that the
+   * write in the stable storage can be grouped for efficiency reason. Adding an
+   * update synchronously add the update to this list. A dedicated thread loops
+   * on {@link #flush()} and {@link #trim()}.
    * <dl>
    * <dt>flush()</dt>
    * <dd>get a number of changes from the in memory list by block and write them
@@ -86,37 +109,35 @@
    * Changes are not read back by replicationServer threads that are responsible
    * for pushing the changes to other replication server or to LDAP server
    */
-  private final LinkedList<UpdateMsg> msgQueue =
-    new LinkedList<UpdateMsg>();
+  private final LinkedBlockingQueue<UpdateMsg> msgQueue =
+    new LinkedBlockingQueue<UpdateMsg>();
 
   /**
-   * The High and low water mark for the max size of the msgQueue. The threads
-   * calling add() method will be blocked if the size of msgQueue becomes larger
-   * than the queueHimark and will resume only when the size of the msgQueue
-   * goes below queueLowmark.
+   * Semaphore used to limit the number of bytes used in memory by the queue.
+   * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
+   * size of msgQueue becomes larger than the available permits and will resume
+   * only when the number of available permits allow it.
    */
-  private int queueMaxSize = 5000;
-  private int queueLowmark = 1000;
-  private int queueHimark = 4000;
-
-  /**
-   * The queue himark and lowmark in bytes, this is set to 100 times the himark
-   * and lowmark in number of updates.
-   */
-  private int queueMaxBytes = 100 * queueMaxSize;
-  private int queueLowmarkBytes = 100 * queueLowmark;
-  private int queueHimarkBytes = 100 * queueHimark;
-
-  /** The number of bytes currently in the queue. */
-  private int queueByteSize = 0;
+  private final Semaphore queueSizeBytes;
+  private final int queueMaxBytes;
 
   private ReplicationDB db;
-  private CSN oldestCSN;
-  private CSN newestCSN;
+  /**
+   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
+   *
+   * @NonNull
+   */
+  private volatile CSNLimits csnLimits;
   private int serverId;
   private DN baseDN;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
   private DirectoryThread thread;
+  /**
+   * Used to prevent race conditions between threads calling {@link #clear()}
+   * {@link #flush()} or {@link #trim()}. This can happen with the thread
+   * flushing the queue, on shutdown or on cursor opening, a thread calling
+   * clear(), etc.
+   */
   private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
@@ -146,16 +167,10 @@
     this.serverId = serverId;
     this.baseDN = baseDN;
     trimAge = replicationServer.getTrimAge();
-    final int queueSize = replicationServer.getQueueSize();
-    queueMaxSize = queueSize;
-    queueLowmark = queueSize / 5;
-    queueHimark = queueSize * 4 / 5;
-    queueMaxBytes = 200 * queueMaxSize;
-    queueLowmarkBytes = 200 * queueLowmark;
-    queueHimarkBytes = 200 * queueLowmark;
+    queueMaxBytes = replicationServer.getQueueSize() * 200;
+    queueSizeBytes = new Semaphore(queueMaxBytes);
     db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
-    oldestCSN = db.readOldestCSN();
-    newestCSN = db.readNewestCSN();
+    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
     thread = new DirectoryThread(this, "Replication server RS("
         + replicationServer.getServerId()
         + ") changelog checkpointer for Replica DS(" + serverId
@@ -167,61 +182,72 @@
   }
 
   /**
-   * Add an update to the list of messages that must be saved to the db
-   * managed by this db handler.
-   * This method is blocking if the size of the list of message is larger
-   * than its maximum.
+   * Add an update to the list of messages that must be saved to the db managed
+   * by this db handler. This method is blocking if the size of the list of
+   * message is larger than its maximum.
    *
-   * @param update The update that must be saved to the db managed by this db
-   *               handler.
+   * @param updateMsg
+   *          The update message that must be saved to the db managed by this db
+   *          handler.
+   * @throws ChangelogException
+   *           If a database problem happened
    */
-  public void add(UpdateMsg update)
+  public void add(UpdateMsg updateMsg) throws ChangelogException
   {
-    synchronized (msgQueue)
+    if (thread.isShutdownInitiated())
     {
-      int size = msgQueue.size();
-      if (size > queueHimark || queueByteSize > queueHimarkBytes)
-      {
-        msgQueue.notify();
-      }
+      throw new ChangelogException(
+          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
+              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
+    }
 
-      while (size > queueMaxSize || queueByteSize > queueMaxBytes)
+    final int msgSize = updateMsg.size();
+    if (msgSize < queueMaxBytes)
+    {
+      try
       {
-        try
-        {
-          msgQueue.wait(500);
-        } catch (InterruptedException e)
-        {
-          // simply loop to try again.
-        }
-        size = msgQueue.size();
+        queueSizeBytes.acquire(msgSize);
       }
+      catch (InterruptedException e)
+      {
+        throw new ChangelogException(
+            ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
+                .toString(), String.valueOf(baseDN), String.valueOf(serverId),
+                stackTraceToSingleLineString(e)));
+      }
+    }
+    else
+    {
+      // edge case with a very large message
+      collectAllPermits();
+    }
+    msgQueue.add(updateMsg);
 
-      queueByteSize += update.size();
-      msgQueue.add(update);
-      if (newestCSN == null || newestCSN.isOlderThan(update.getCSN()))
-      {
-        newestCSN = update.getCSN();
-      }
-      if (oldestCSN == null)
-      {
-        oldestCSN = update.getCSN();
-      }
+    final CSNLimits limits = csnLimits;
+    final boolean updateNew = limits.newestCSN == null
+        || limits.newestCSN.isOlderThan(updateMsg.getCSN());
+    final boolean updateOld = limits.oldestCSN == null;
+    if (updateOld || updateNew)
+    {
+      csnLimits = new CSNLimits(
+          updateOld ? updateMsg.getCSN() : limits.oldestCSN,
+          updateNew ? updateMsg.getCSN() : limits.newestCSN);
     }
   }
 
-  /**
-   * Get some changes out of the message queue of the LDAP server.
-   * (from the beginning of the queue)
-   * @param number the maximum number of messages to extract.
-   * @return a List containing number changes extracted from the queue.
-   */
-  private List<UpdateMsg> getChanges(int number)
+  /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
+  private void collectAllPermits()
   {
-    synchronized (msgQueue)
+    int collectedPermits = queueSizeBytes.drainPermits();
+    while (collectedPermits != queueMaxBytes)
     {
-      final int minAvailableNb = Math.min(number, msgQueue.size());
-      return new LinkedList<UpdateMsg>(msgQueue.subList(0, minAvailableNb));
+      // Do not use Thread.sleep() because:
+      // 1) it is expected the permits will be released very soon
+      // 2) we want to collect all the permits, so do not leave a chance to
+      // other threads to steal them from us.
+      // 3) we want to keep low latency
+      Thread.yield();
+      collectedPermits += queueSizeBytes.drainPermits();
     }
   }
 
@@ -232,7 +258,7 @@
    */
   public CSN getOldestCSN()
   {
-    return oldestCSN;
+    return csnLimits.oldestCSN;
   }
 
   /**
@@ -242,7 +268,7 @@
    */
   public CSN getNewestCSN()
   {
-    return newestCSN;
+    return csnLimits.newestCSN;
   }
 
   /**
@@ -252,9 +278,10 @@
    */
   public long getChangesCount()
   {
-    if (newestCSN != null && oldestCSN != null)
+    final CSNLimits limits = csnLimits;
+    if (limits.newestCSN != null && limits.oldestCSN != null)
     {
-      return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1;
+      return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1;
     }
     return 0;
   }
@@ -276,36 +303,13 @@
   {
     if (startAfterCSN == null)
     {
+      // flush any potential changes before opening the cursor
       flush();
     }
     return new JEReplicaDBCursor(db, startAfterCSN, this);
   }
 
   /**
-   * Removes the provided number of messages from the beginning of the msgQueue.
-   *
-   * @param number the number of changes to be removed.
-   */
-  private void clearQueue(int number)
-  {
-    synchronized (msgQueue)
-    {
-      int current = 0;
-      while (current < number && !msgQueue.isEmpty())
-      {
-        UpdateMsg msg = msgQueue.remove(); // remove first
-        queueByteSize -= msg.size();
-        current++;
-      }
-      if (msgQueue.size() < queueLowmark
-          && queueByteSize < queueLowmarkBytes)
-      {
-        msgQueue.notifyAll();
-      }
-    }
-  }
-
-  /**
    * Shutdown this ReplicaDB.
    */
   public void shutdown()
@@ -317,11 +321,6 @@
 
     thread.initiateShutdown();
 
-    synchronized (msgQueue)
-    {
-      msgQueue.notifyAll();
-    }
-
     while (msgQueue.size() != 0)
     {
       try
@@ -331,8 +330,7 @@
       catch (ChangelogException e)
       {
         // We are already shutting down
-        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
-            .get(stackTraceToSingleLineString(e)));
+        logError(e.getMessageObject());
       }
     }
 
@@ -358,27 +356,8 @@
         {
           flush();
           trim();
-
-          synchronized (msgQueue)
-          {
-            if (msgQueue.size() < queueLowmark
-                && queueByteSize < queueLowmarkBytes)
-            {
-              try
-              {
-                msgQueue.wait(1000);
-              }
-              catch (InterruptedException e)
-              {
-                // Do not reset the interrupt flag here,
-                // because otherwise JE will barf next time flush() is called:
-                // JE 5.0.97 refuses to persist changes to the DB when invoked
-                // from a Thread with the interrupt flag set to true.
-              }
-            }
-          }
         }
-        catch (Exception end)
+        catch (ChangelogException end)
         {
           stop(end);
           break;
@@ -400,11 +379,6 @@
     {
       thread.stopWork();
     }
-
-    synchronized (this)
-    {
-      notifyAll();
-    }
   }
 
   private void stop(Exception e)
@@ -453,13 +427,34 @@
       trimDate = lastBeforeTrimDate;
     }
 
+    final int queueLowMarkBytes = queueMaxBytes / 5;
     for (int i = 0; i < 100; i++)
     {
+      /*
+       * Perform at least some trimming regardless of the flush backlog. Then
+       * continue trim iterations while the flush backlog is low (below the
+       * lowmark). Once the flush backlog increases, stop trimming and start
+       * flushing more eagerly.
+       */
+      if (i > 20 && msgQueue.size() < queueLowMarkBytes)
+      {
+        break;
+      }
+
       synchronized (flushLock)
       {
         /*
-         * the trim is done by group in order to save some CPU and IO bandwidth
-         * start the transaction then do a bunch of remove then commit
+         * the trim is done by group in order to save some CPU, IO bandwidth and
+         * DB caches: start the transaction then do a bunch of remove then
+         * commit.
+         */
+        /*
+         * Matt wrote: The record removal is done as a DB transaction and the
+         * deleted records are only "deleted" on commit. While the txn/cursor is
+         * open the records to be deleted will, I think, be pinned in the DB
+         * cache. In other words, the larger the transaction (the more records
+         * deleted during a single batch) the more DB cache will be used to
+         * process the transaction.
          */
         final ReplServerDBCursor cursor = db.openDeleteCursor();
         try
@@ -477,13 +472,13 @@
               return;
             }
 
-            if (!csn.equals(newestCSN) && csn.isOlderThan(trimDate))
+            if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
             {
               cursor.delete();
             }
             else
             {
-              oldestCSN = csn;
+              csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
               return;
             }
           }
@@ -515,32 +510,33 @@
    */
   public void flush() throws ChangelogException
   {
-    int size;
-    int chunksize = Math.min(queueMaxSize, 500);
-
-    do
+    try
     {
-      synchronized(flushLock)
+      synchronized (flushLock)
       {
-        // get N (or less) messages from the queue to save to the DB
-        // (from the beginning of the queue)
-        List<UpdateMsg> changes = getChanges(chunksize);
-
-        // if no more changes to save exit immediately.
-        if (changes == null || (size = changes.size()) == 0)
+        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
+        final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
+        if (change == null)
         {
+          // nothing to persist, move on to the trim phase
           return;
         }
 
-        // save the change to the stable storage.
-        db.addEntries(changes);
+        // Try to see if there are more changes and persist them all.
+        changes.add(change);
+        msgQueue.drainTo(changes);
 
-        // remove the changes from the list of changes to be saved
-        // (remove from the beginning of the queue)
-        clearQueue(changes.size());
+        int totalSize = db.addEntries(changes);
+        // do not release more than queue max size permits
+        // (be careful of the edge case with the very large message)
+        queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
       }
-      // loop while there are more changes in the queue
-    } while (size == chunksize);
+    }
+    catch (InterruptedException e)
+    {
+      throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+          .get(stackTraceToSingleLineString(e)));
+    }
   }
 
   /**
@@ -556,25 +552,28 @@
     public List<Attribute> getMonitorData()
     {
       List<Attribute> attributes = new ArrayList<Attribute>();
-      attributes.add(Attributes.create("replicationServer-database",
-          String.valueOf(serverId)));
-      attributes.add(Attributes.create("domain-name",
-          baseDN.toNormalizedString()));
-      if (oldestCSN != null)
+      create(attributes, "replicationServer-database", String.valueOf(serverId));
+      create(attributes, "domain-name", baseDN.toNormalizedString());
+      final CSNLimits limits = csnLimits;
+      if (limits.oldestCSN != null)
       {
-        attributes.add(Attributes.create("first-change", encode(oldestCSN)));
+        create(attributes, "first-change", encode(limits.oldestCSN));
       }
-      if (newestCSN != null)
+      if (limits.newestCSN != null)
       {
-        attributes.add(Attributes.create("last-change", encode(newestCSN)));
+        create(attributes, "last-change", encode(limits.newestCSN));
       }
-      attributes.add(
-          Attributes.create("queue-size", String.valueOf(msgQueue.size())));
-      attributes.add(
-          Attributes.create("queue-size-bytes", String.valueOf(queueByteSize)));
+      create(attributes, "queue-size", String.valueOf(msgQueue.size()));
+      create(attributes, "queue-size-bytes",
+          String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
       return attributes;
     }
 
+    private void create(List<Attribute> attributes, String name, String value)
+    {
+      attributes.add(Attributes.create(name, value));
+    }
+
     private String encode(CSN csn)
     {
       return csn + " " + new Date(csn.getTime());
@@ -609,8 +608,9 @@
   @Override
   public String toString()
   {
+    final CSNLimits limits = csnLimits;
     return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
-        + oldestCSN + " " + newestCSN;
+        + limits.oldestCSN + " " + limits.newestCSN;
   }
 
   /**
@@ -631,12 +631,10 @@
   {
     synchronized(flushLock)
     {
+      collectAllPermits();
       msgQueue.clear();
-      queueByteSize = 0;
-
       db.clear();
-      oldestCSN = db.readOldestCSN();
-      newestCSN = db.readNewestCSN();
+      csnLimits = new CSNLimits(null, null);
     }
   }
 
@@ -655,7 +653,7 @@
    * For test purpose.
    * @return The memory queue size.
    */
-  public int getQueueSize()
+  int getQueueSize()
   {
     return this.msgQueue.size();
   }

--
Gitblit v1.10.0