From 3c140af6c756b30b325ce3c6ed080e8898e2b7ec Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 24 Feb 2016 13:52:15 +0000
Subject: [PATCH] OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput
---
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java | 47 +++++++++++++++++++++++++++++++++--------------
1 files changed, 33 insertions(+), 14 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
index 8c19fef..4d1f54d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -32,6 +32,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -49,6 +50,7 @@
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
+ private final ReentrantLock switchQueueLock;
private AtomicBoolean shutdown = new AtomicBoolean(false);
private static int count;
@@ -56,11 +58,13 @@
* Constructor for the ReplayThread.
*
* @param updateToReplayQueue The queue of update messages we have to replay
+ * @param switchQueueLock lock to ensure moving updates from one queue to another is atomic
*/
- public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
+ public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue, ReentrantLock switchQueueLock)
{
- super("Replica replay thread " + count++);
- this.updateToReplayQueue = updateToReplayQueue;
+ super("Replica replay thread " + count++);
+ this.updateToReplayQueue = updateToReplayQueue;
+ this.switchQueueLock = switchQueueLock;
}
/**
@@ -86,19 +90,34 @@
{
try
{
- UpdateToReplay updateToreplay;
- // Loop getting an updateToReplayQueue from the update message queue and
- // replaying matching changes
- while (!shutdown.get() &&
- ((updateToreplay = updateToReplayQueue.poll(1L,
- TimeUnit.SECONDS)) != null))
+ if (switchQueueLock.tryLock(1L, TimeUnit.SECONDS))
{
- // Find replication domain for that update message
- LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
- LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
+ LDAPReplicationDomain domain;
+ LDAPUpdateMsg updateMsg;
+ try
+ {
+ if (shutdown.get())
+ {
+ break;
+ }
+ UpdateToReplay updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS);
+ if (updateToreplay == null)
+ {
+ continue;
+ }
+ // Find replication domain for that update message and mark it as "in progress"
+ updateMsg = updateToreplay.getUpdateMessage();
+ domain = updateToreplay.getReplicationDomain();
+ domain.markInProgress(updateMsg);
+ }
+ finally
+ {
+ switchQueueLock.unlock();
+ }
domain.replay(updateMsg, shutdown);
}
- } catch (Exception e)
+ }
+ catch (Exception e)
{
/*
* catch all exceptions happening so that the thread never dies even
--
Gitblit v1.10.0