From 856fdd0571358c660afaf379f8e774ab8b24f05c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock 

---
 opends/src/server/org/opends/server/replication/plugin/ReplayThread.java |   27 +++++++++++++--------------
 1 files changed, 13 insertions(+), 14 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index 1304346..a57c110 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -26,20 +26,20 @@
  *      Portions Copyright 2011-2013 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
-import org.opends.server.replication.protocol.LDAPUpdateMsg;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.opends.messages.Message;
-
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
 
 /**
  * Thread that is used to get message from the replication servers (stored
@@ -56,7 +56,7 @@
   private static final DebugTracer TRACER = getTracer();
 
   private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
-  private volatile boolean shutdown = false;
+  private AtomicBoolean shutdown = new AtomicBoolean(false);
   private static int count = 0;
 
   /**
@@ -75,7 +75,7 @@
    */
   public void shutdown()
   {
-    shutdown = true;
+    shutdown.set(true);
   }
 
   /**
@@ -84,27 +84,26 @@
   @Override
   public void run()
   {
-
     if (debugEnabled())
     {
       TRACER.debugInfo("Replication Replay thread starting.");
     }
 
-    while (!shutdown)
+    while (!shutdown.get())
     {
       try
       {
         UpdateToReplay updateToreplay;
         // Loop getting an updateToReplayQueue from the update message queue and
         // replaying matching changes
-        while ( (!shutdown) &&
+        while (!shutdown.get() &&
           ((updateToreplay = updateToReplayQueue.poll(1L,
           TimeUnit.SECONDS)) != null))
         {
           // Find replication domain for that update message
           LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
           LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
-          domain.replay(updateMsg);
+          domain.replay(updateMsg, shutdown);
         }
       } catch (Exception e)
       {

--
Gitblit v1.10.0