From 856fdd0571358c660afaf379f8e774ab8b24f05c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock
---
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java | 27 +++++++++++++--------------
1 files changed, 13 insertions(+), 14 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index 1304346..a57c110 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -26,20 +26,20 @@
* Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.plugin;
-import org.opends.server.replication.protocol.LDAPUpdateMsg;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.opends.messages.Message;
-
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
/**
* Thread that is used to get message from the replication servers (stored
@@ -56,7 +56,7 @@
private static final DebugTracer TRACER = getTracer();
private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
- private volatile boolean shutdown = false;
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
private static int count = 0;
/**
@@ -75,7 +75,7 @@
*/
public void shutdown()
{
- shutdown = true;
+ shutdown.set(true);
}
/**
@@ -84,27 +84,26 @@
@Override
public void run()
{
-
if (debugEnabled())
{
TRACER.debugInfo("Replication Replay thread starting.");
}
- while (!shutdown)
+ while (!shutdown.get())
{
try
{
UpdateToReplay updateToreplay;
// Loop getting an updateToReplayQueue from the update message queue and
// replaying matching changes
- while ( (!shutdown) &&
+ while (!shutdown.get() &&
((updateToreplay = updateToReplayQueue.poll(1L,
TimeUnit.SECONDS)) != null))
{
// Find replication domain for that update message
LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
- domain.replay(updateMsg);
+ domain.replay(updateMsg, shutdown);
}
} catch (Exception e)
{
--
Gitblit v1.10.0