From 3253906b33605684c3e071a6e5c3af0f20c9e375 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 04 Nov 2013 11:06:40 +0000
Subject: [PATCH] OPENDJ-1190 (CR-2523) Under rare circumstances the DS replication recovery thread (RSUpdater) can spin

---
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java |   85 ++++++++++++++++++++++++++++++++++--------
 1 files changed, 68 insertions(+), 17 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 821f7b8..57535f3 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.DataFormatException;
 
 import org.opends.messages.Category;
@@ -199,6 +200,8 @@
    * is not updated too early.
    */
   private final PendingChanges pendingChanges;
+  private final AtomicReference<RSUpdater> rsUpdater =
+      new AtomicReference<RSUpdater>(null);
 
   /**
    * It contain the updates that were done on other servers, transmitted
@@ -335,7 +338,7 @@
    * The thread that periodically saves the ServerState of this
    * LDAPReplicationDomain in the database.
    */
-  private class  ServerStateFlush extends DirectoryThread
+  private class ServerStateFlush extends DirectoryThread
   {
     protected ServerStateFlush()
     {
@@ -351,7 +354,7 @@
     {
       done = false;
 
-      while (!shutdown)
+      while (!isShutdownInitiated())
       {
         try
         {
@@ -368,6 +371,7 @@
         catch (InterruptedException e)
         {
           // Thread interrupted: check for shutdown.
+          Thread.currentThread().interrupt();
         }
       }
       state.save();
@@ -383,6 +387,11 @@
   private class RSUpdater extends DirectoryThread
   {
     private final CSN startCSN;
+    /**
+     * Used to communicate that the current thread computation needs to
+     * shutdown.
+     */
+    private AtomicBoolean shutdown = new AtomicBoolean(false);
 
     protected RSUpdater(CSN replServerMaxCSN)
     {
@@ -400,8 +409,7 @@
     {
       // Replication server is missing some of our changes: let's
       // send them to him.
-      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
-      logError(message);
+      logError(DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get());
 
       /*
        * Get all the changes that have not been seen by this
@@ -409,10 +417,9 @@
        */
       try
       {
-        if (buildAndPublishMissingChanges(startCSN, broker))
+        if (buildAndPublishMissingChanges(startCSN, broker, shutdown))
         {
-          message = DEBUG_CHANGES_SENT.get();
-          logError(message);
+          logError(DEBUG_CHANGES_SENT.get());
           synchronized(replayOperations)
           {
             replayOperations.clear();
@@ -427,8 +434,7 @@
            * Log an error for the repair tool
            * that will need to re-synchronize the servers.
            */
-          message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
-          logError(message);
+          logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
         }
       } catch (Exception e)
       {
@@ -439,14 +445,24 @@
          * Log an error for the repair tool
          * that will need to re-synchronize the servers.
          */
-        message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
-        logError(message);
+        logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
       }
       finally
       {
         broker.setRecoveryRequired(false);
+        // RSUpdater thread has finished its work, let's remove it from memory
+        // so another RSUpdater thread can be started if needed.
+        rsUpdater.compareAndSet(this, null);
       }
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public void initiateShutdown()
+    {
+      this.shutdown.set(true);
+      super.initiateShutdown();
+    }
   }
 
 
@@ -2372,10 +2388,16 @@
     if (!shutdown)
     {
       shutdown = true;
+      final RSUpdater rsUpdater = this.rsUpdater.get();
+      if (rsUpdater != null)
+      {
+        rsUpdater.initiateShutdown();
+      }
 
       // stop the thread in charge of flushing the ServerState.
       if (flushThread != null)
       {
+        flushThread.initiateShutdown();
         synchronized (flushThread)
         {
           flushThread.notify();
@@ -4356,7 +4378,11 @@
         {
           pendingChanges.setRecovering(true);
           broker.setRecoveryRequired(true);
-          new RSUpdater(replServerMaxCSN).start();
+          final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
+          if (this.rsUpdater.compareAndSet(null, rsUpdater))
+          {
+            rsUpdater.start();
+          }
         }
       }
     } catch (Exception e)
@@ -4375,12 +4401,14 @@
    *          The CSN where we need to start the search
    * @param session
    *          The session to use to publish the changes
+   * @param shutdown
+   *          whether the current run must be stopped
    * @return A boolean indicating he success of the operation.
    * @throws Exception
    *           if an Exception happens during the search.
    */
   public boolean buildAndPublishMissingChanges(CSN startCSN,
-      ReplicationBroker session) throws Exception
+      ReplicationBroker session, AtomicBoolean shutdown) throws Exception
   {
     // Trim the changes in replayOperations that are older than the startCSN.
     synchronized (replayOperations)
@@ -4388,6 +4416,10 @@
       Iterator<CSN> it = replayOperations.keySet().iterator();
       while (it.hasNext())
       {
+        if (shutdown.get())
+        {
+          return false;
+        }
         if (it.next().isNewerThan(startCSN))
         {
           break;
@@ -4401,6 +4433,11 @@
     CSN currentStartCSN = startCSN;
     do
     {
+      if (shutdown.get())
+      {
+        return false;
+      }
+
       lastRetrievedChange = null;
       // We can't do the search in one go because we need to store the results
       // so that we are sure we send the operations in order and because the
@@ -4417,15 +4454,21 @@
 
       // Publish and remove all the changes from the replayOperations list
       // that are older than the endCSN.
-      List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
+      final List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
       synchronized (replayOperations)
       {
         Iterator<FakeOperation> itOp = replayOperations.values().iterator();
         while (itOp.hasNext())
         {
+          if (shutdown.get())
+          {
+            return false;
+          }
           FakeOperation fakeOp = itOp.next();
           if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
-              || !state.cover(fakeOp.getCSN()))
+              || !state.cover(fakeOp.getCSN())
+              // do not look for replay operations in the future
+              || endCSN.isNewerThan(now()))
           {
             break;
           }
@@ -4438,9 +4481,13 @@
 
       for (FakeOperation opToSend : opsToSend)
       {
+        if (shutdown.get())
+        {
+          return false;
+        }
         session.publishRecovery(opToSend.generateMessage());
       }
-      opsToSend.clear();
+
       if (lastRetrievedChange != null)
       {
         currentStartCSN = lastRetrievedChange;
@@ -4449,13 +4496,16 @@
       {
         currentStartCSN = endCSN;
       }
-
     } while (pendingChanges.recoveryUntil(lastRetrievedChange)
           && op.getResultCode().equals(ResultCode.SUCCESS));
 
     return op.getResultCode().equals(ResultCode.SUCCESS);
   }
 
+  private static CSN now()
+  {
+    return new CSN(TimeThread.getTime(), 0, 0);
+  }
 
   /**
    * Search for the changes that happened since fromCSN based on the historical
@@ -4589,6 +4639,7 @@
         catch (InterruptedException e)
         {
           // Thread interrupted: check for shutdown.
+          Thread.currentThread().interrupt();
         }
       }
 

--
Gitblit v1.10.0