From 137d1b4ba1992acdd880b61b1a03dc31f0cc1839 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 23 Apr 2014 14:19:01 +0000
Subject: [PATCH] OPENDJ-1448 Remove JReplicaDB flushing thread and msg queue 

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java |  213 ++--------------------------------------------------
 1 files changed, 10 insertions(+), 203 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index bbde1d1..19406bd 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,14 +28,10 @@
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
@@ -52,8 +48,6 @@
 import org.opends.server.types.InitializationException;
 
 import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.util.StaticUtils.*;
 
 /**
  * This class is used for managing the replicationServer database for each
@@ -67,7 +61,7 @@
  * <p>
  * This class publish some monitoring information below cn=monitor.
  */
-public class JEReplicaDB implements Runnable
+public class JEReplicaDB
 {
 
   /**
@@ -89,29 +83,7 @@
 
   }
 
-  /**
-   * The msgQueue holds all the updates not yet saved to stable storage.
-   * <p>
-   * This blocking queue is only used as a temporary placeholder so that the
-   * write in the stable storage can be grouped for efficiency reason. Adding an
-   * update synchronously add the update to this list. A dedicated thread
-   * flushes this blocking queue.
-   * <p>
-   * Changes are not read back by replicationServer threads that are responsible
-   * for pushing the changes to other replication server or to LDAP server
-   */
-  private final LinkedBlockingQueue<UpdateMsg> msgQueue =
-    new LinkedBlockingQueue<UpdateMsg>();
-
-  /**
-   * Semaphore used to limit the number of bytes used in memory by the queue.
-   * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
-   * size of msgQueue becomes larger than the available permits and will resume
-   * only when the number of available permits allow it.
-   */
-  private final Semaphore queueSizeBytes;
-  private final int queueMaxBytes;
-
+  private final AtomicBoolean shutdown = new AtomicBoolean(false);
   private ReplicationDB db;
   /**
    * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -122,12 +94,6 @@
   private int serverId;
   private DN baseDN;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
-  private DirectoryThread thread;
-  /**
-   * Used to prevent race conditions between threads calling {@link #flush()}.
-   * This can happen with the thread flushing the queue, or else on shutdown.
-   */
-  private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
   /**
@@ -147,15 +113,8 @@
     this.replicationServer = replicationServer;
     this.serverId = serverId;
     this.baseDN = baseDN;
-    queueMaxBytes = replicationServer.getQueueSize() * 200;
-    queueSizeBytes = new Semaphore(queueMaxBytes);
     db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
     csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
-    thread = new DirectoryThread(this, "Replication server RS("
-            + replicationServer.getServerId()
-            + ") flusher thread for Replica DS(" + serverId
-            + ") for domain \"" + baseDN + "\"");
-    thread.start();
 
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
     DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -174,34 +133,14 @@
    */
   public void add(UpdateMsg updateMsg) throws ChangelogException
   {
-    if (thread.isShutdownInitiated())
+    if (shutdown.get())
     {
       throw new ChangelogException(
           ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
               .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
     }
 
-    final int msgSize = updateMsg.size();
-    if (msgSize < queueMaxBytes)
-    {
-      try
-      {
-        queueSizeBytes.acquire(msgSize);
-      }
-      catch (InterruptedException e)
-      {
-        throw new ChangelogException(
-            ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
-                .toString(), String.valueOf(baseDN), String.valueOf(serverId),
-                stackTraceToSingleLineString(e)));
-      }
-    }
-    else
-    {
-      // edge case with a very large message
-      collectAllPermits();
-    }
-    msgQueue.add(updateMsg);
+    db.addEntry(updateMsg);
 
     final CSNLimits limits = csnLimits;
     final boolean updateNew = limits.newestCSN == null
@@ -215,22 +154,6 @@
     }
   }
 
-  /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
-  private void collectAllPermits()
-  {
-    int collectedPermits = queueSizeBytes.drainPermits();
-    while (collectedPermits != queueMaxBytes)
-    {
-      // Do not use Thread.sleep() because:
-      // 1) it is expected the permits will be released very soon
-      // 2) we want to collect all the permits, so do not leave a chance to
-      // other threads to steal them from us.
-      // 3) we want to keep low latency
-      Thread.yield();
-      collectedPermits += queueSizeBytes.drainPermits();
-    }
-  }
-
   /**
    * Get the oldest CSN that has not been purged yet.
    *
@@ -289,80 +212,10 @@
    */
   public void shutdown()
   {
-    if (thread.isShutdownInitiated())
+    if (shutdown.compareAndSet(false, true))
     {
-      return;
-    }
-
-    thread.initiateShutdown();
-
-    while (msgQueue.size() != 0)
-    {
-      try
-      {
-        flush();
-      }
-      catch (ChangelogException e)
-      {
-        // We are already shutting down
-        logError(e.getMessageObject());
-      }
-    }
-
-    db.shutdown();
-    DirectoryServer.deregisterMonitorProvider(dbMonitor);
-  }
-
-  /**
-   * Flushes the replicaDB queue from memory to stable storage.
-   */
-  @Override
-  public void run()
-  {
-    thread.startWork();
-
-    try
-    {
-      while (!thread.isShutdownInitiated())
-      {
-        try
-        {
-          flush();
-        }
-        catch (ChangelogException end)
-        {
-          stop(end);
-          break;
-        }
-      }
-
-      try
-      {
-        // call flush a last time before exiting to make sure that
-        // no change was forgotten in the msgQueue
-        flush();
-      }
-      catch (ChangelogException e)
-      {
-        stop(e);
-      }
-    }
-    finally
-    {
-      thread.stopWork();
-    }
-  }
-
-  private void stop(Exception e)
-  {
-    logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
-        .get(stackTraceToSingleLineString(e)));
-
-    thread.initiateShutdown();
-
-    if (replicationServer != null)
-    {
-      replicationServer.shutdown();
+      db.shutdown();
+      DirectoryServer.deregisterMonitorProvider(dbMonitor);
     }
   }
 
@@ -401,7 +254,7 @@
       {
         for (int j = 0; j < 50; j++)
         {
-          if (thread.isShutdownInitiated())
+          if (shutdown.get())
           {
             return;
           }
@@ -428,7 +281,7 @@
         // mark shutdown for this db so that we don't try again to
         // stop it from cursor.close() or methods called by cursor.close()
         cursor.abort();
-        thread.initiateShutdown();
+        shutdown.set(true);
         throw e;
       }
       finally
@@ -439,47 +292,6 @@
   }
 
   /**
-   * Flush a number of updates from the memory list to the stable storage.
-   * <p>
-   * Flush is done by chunk sized to 500 messages, starting from the beginning
-   * of the list.
-   * <p>
-   * @GuardedBy("flushLock")
-   * @throws ChangelogException
-   *           If a database problem happened
-   */
-  private void flush() throws ChangelogException
-  {
-    try
-    {
-      synchronized (flushLock)
-      {
-        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
-        if (change == null)
-        {
-          // nothing to persist, check if shutdown was invoked
-          return;
-        }
-
-        // Try to see if there are more changes and persist them all.
-        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
-        changes.add(change);
-        msgQueue.drainTo(changes);
-
-        int totalSize = db.addEntries(changes);
-        // do not release more than queue max size permits
-        // (be careful of the edge case with the very large message)
-        queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
-      }
-    }
-    catch (InterruptedException e)
-    {
-      throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
-          .get(stackTraceToSingleLineString(e)));
-    }
-  }
-
-  /**
    * This internal class is used to implement the Monitoring capabilities of the
    * ReplicaDB.
    */
@@ -501,9 +313,6 @@
       {
         create(attributes, "last-change", encode(limits.newestCSN));
       }
-      create(attributes, "queue-size", String.valueOf(msgQueue.size()));
-      create(attributes, "queue-size-bytes",
-          String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
       return attributes;
     }
 
@@ -552,8 +361,6 @@
    */
   public void clear() throws ChangelogException
   {
-    collectAllPermits();
-    msgQueue.clear(); // this call should not do anything at all
     db.clear();
     csnLimits = new CSNLimits(null, null);
   }

--
Gitblit v1.10.0