From 3c140af6c756b30b325ce3c6ed080e8898e2b7ec Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 24 Feb 2016 13:52:15 +0000
Subject: [PATCH] OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java |   47 +++++++++++++++++++++++++++++++++--------------
 1 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
index 8c19fef..4d1f54d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2015 ForgeRock AS
+ *      Portions Copyright 2011-2016 ForgeRock AS.
  */
 package org.opends.server.replication.plugin;
 
@@ -32,6 +32,7 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.opends.server.api.DirectoryThread;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -49,6 +50,7 @@
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
   private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
+  private final ReentrantLock switchQueueLock;
   private AtomicBoolean shutdown = new AtomicBoolean(false);
   private static int count;
 
@@ -56,11 +58,13 @@
    * Constructor for the ReplayThread.
    *
    * @param updateToReplayQueue The queue of update messages we have to replay
+   * @param switchQueueLock lock to ensure moving updates from one queue to another is atomic
    */
-  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
+  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue, ReentrantLock switchQueueLock)
   {
-     super("Replica replay thread " + count++);
-     this.updateToReplayQueue = updateToReplayQueue;
+    super("Replica replay thread " + count++);
+    this.updateToReplayQueue = updateToReplayQueue;
+    this.switchQueueLock = switchQueueLock;
   }
 
   /**
@@ -86,19 +90,34 @@
     {
       try
       {
-        UpdateToReplay updateToreplay;
-        // Loop getting an updateToReplayQueue from the update message queue and
-        // replaying matching changes
-        while (!shutdown.get() &&
-          ((updateToreplay = updateToReplayQueue.poll(1L,
-          TimeUnit.SECONDS)) != null))
+        if (switchQueueLock.tryLock(1L, TimeUnit.SECONDS))
         {
-          // Find replication domain for that update message
-          LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
-          LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
+          LDAPReplicationDomain domain;
+          LDAPUpdateMsg updateMsg;
+          try
+          {
+            if (shutdown.get())
+            {
+              break;
+            }
+            UpdateToReplay updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS);
+            if (updateToreplay == null)
+            {
+              continue;
+            }
+            // Find replication domain for that update message and mark it as "in progress"
+            updateMsg = updateToreplay.getUpdateMessage();
+            domain = updateToreplay.getReplicationDomain();
+            domain.markInProgress(updateMsg);
+          }
+          finally
+          {
+            switchQueueLock.unlock();
+          }
           domain.replay(updateMsg, shutdown);
         }
-      } catch (Exception e)
+      }
+      catch (Exception e)
       {
         /*
          * catch all exceptions happening so that the thread never dies even

--
Gitblit v1.10.0