From 74328205a692a025fbf44a6af9c9f1f456ef34df Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Sep 2013 07:51:42 +0000
Subject: [PATCH] OPENDJ-1130 (CR-2323) Connect Error when stopping all the servers from a replication topology at the same time

---
 opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java |  113 +++++++++++++++++++++++++++-----------------------------
 1 files changed, 55 insertions(+), 58 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index 55519b1..0ba2b64 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -115,8 +115,6 @@
   private int serverId;
   private String baseDn;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
-  private boolean shutdown = false;
-  private boolean done = false;
   private DirectoryThread thread;
   private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
@@ -310,30 +308,18 @@
    */
   public void shutdown()
   {
-    if (shutdown)
+    if (thread.isShutdownInitiated())
     {
       return;
     }
 
-    shutdown  = true;
+    thread.initiateShutdown();
+
     synchronized (msgQueue)
     {
       msgQueue.notifyAll();
     }
 
-    synchronized (this)
-    { /* Can this be replaced with thread.join() ? */
-      while (!done)
-      {
-        try
-        {
-          wait();
-        }
-        catch (InterruptedException e)
-        { /* do nothing */}
-      }
-    }
-
     while (msgQueue.size() != 0)
     {
       flush();
@@ -351,55 +337,62 @@
   @Override
   public void run()
   {
-    while (!shutdown)
-    {
-      try
-      {
-        flush();
-        trim();
+    thread.startWork();
 
-        synchronized (msgQueue)
+    try
+    {
+      while (!thread.isShutdownInitiated())
+      {
+        try
         {
-          if (msgQueue.size() < queueLowmark
-              && queueByteSize < queueLowmarkBytes)
+          flush();
+          trim();
+
+          synchronized (msgQueue)
           {
-            try
+            if (msgQueue.size() < queueLowmark
+                && queueByteSize < queueLowmarkBytes)
             {
-              msgQueue.wait(1000);
-            } catch (InterruptedException e)
-            {
-              Thread.currentThread().interrupt();
+              try
+              {
+                msgQueue.wait(1000);
+              }
+              catch (InterruptedException e)
+              {
+                Thread.currentThread().interrupt();
+              }
             }
           }
         }
-      } catch (Exception end)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
-        mb.append(" ");
-        mb.append(stackTraceToSingleLineString(end));
-        logError(mb.toMessage());
-        synchronized (this)
+        catch (Exception end)
         {
-          // set the done variable to true so that this thread don't
-          // get stuck in this dbHandler.shutdown() when it get called
-          // by replicationServer.shutdown();
-          done = true;
+          MessageBuilder mb = new MessageBuilder();
+          mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
+          mb.append(" ");
+          mb.append(stackTraceToSingleLineString(end));
+          logError(mb.toMessage());
+
+          thread.initiateShutdown();
+
+          if (replicationServer != null)
+          {
+            replicationServer.shutdown();
+          }
+          break;
         }
-        if (replicationServer != null)
-        {
-          replicationServer.shutdown();
-        }
-        break;
       }
+
+      // call flush a last time before exiting to make sure that
+      // no change was forgotten in the msgQueue
+      flush();
     }
-    // call flush a last time before exiting to make sure that
-    // no change was forgotten in the msgQueue
-    flush();
+    finally
+    {
+      thread.stopWork();
+    }
 
     synchronized (this)
     {
-      done = true;
       notifyAll();
     }
   }
@@ -450,11 +443,14 @@
         {
           for (int j = 0; j < 50; j++)
           {
+            if (thread.isShutdownInitiated())
+            {
+              return;
+            }
+
             CSN csn = cursor.nextCSN();
             if (csn == null)
             {
-              cursor.close();
-              done = true;
               return;
             }
 
@@ -465,21 +461,22 @@
             else
             {
               firstChange = csn;
-              cursor.close();
-              done = true;
               return;
             }
           }
-          cursor.close();
         }
         catch (ChangelogException e)
         {
           // mark shutdown for this db so that we don't try again to
           // stop it from cursor.close() or methods called by cursor.close()
           cursor.abort();
-          shutdown = true;
+          thread.initiateShutdown();
           throw e;
         }
+        finally
+        {
+          cursor.close();
+        }
       }
     }
   }

--
Gitblit v1.10.0