From 5bf287bc9f92c5b0893e1dade87453be153d07c1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Dec 2013 16:30:13 +0000
Subject: [PATCH] OPENDJ-1172 Deadlock between replication threads during shutdown

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java |   63 +++++++++++++++++++++----------
 1 files changed, 42 insertions(+), 21 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index d9779e6..b85d132 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -31,7 +31,6 @@
 import java.util.LinkedList;
 import java.util.List;
 
-import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.MonitorProvider;
@@ -270,7 +269,7 @@
    * @return a new {@link DBCursor} that allows to browse the db managed by this
    *         ReplicaDB and starting at the position defined by a given CSN.
    * @throws ChangelogException
-   *           if a database problem happened.
+   *           if a database problem happened
    */
   public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
       throws ChangelogException
@@ -325,7 +324,16 @@
 
     while (msgQueue.size() != 0)
     {
-      flush();
+      try
+      {
+        flush();
+      }
+      catch (ChangelogException e)
+      {
+        // We are already shutting down
+        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+            .get(stackTraceToSingleLineString(e)));
+      }
     }
 
     db.shutdown();
@@ -372,25 +380,21 @@
         }
         catch (Exception end)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
-          mb.append(" ");
-          mb.append(stackTraceToSingleLineString(end));
-          logError(mb.toMessage());
-
-          thread.initiateShutdown();
-
-          if (replicationServer != null)
-          {
-            replicationServer.shutdown();
-          }
+          stop(end);
           break;
         }
       }
 
-      // call flush a last time before exiting to make sure that
-      // no change was forgotten in the msgQueue
-      flush();
+      try
+      {
+        // call flush a last time before exiting to make sure that
+        // no change was forgotten in the msgQueue
+        flush();
+      }
+      catch (ChangelogException e)
+      {
+        stop(e);
+      }
     }
     finally
     {
@@ -403,6 +407,19 @@
     }
   }
 
+  private void stop(Exception e)
+  {
+    logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+        .get(stackTraceToSingleLineString(e)));
+
+    thread.initiateShutdown();
+
+    if (replicationServer != null)
+    {
+      replicationServer.shutdown();
+    }
+  }
+
   /**
    * Retrieves the latest trim date.
    * @return the latest trim date.
@@ -489,10 +506,14 @@
 
   /**
    * Flush a number of updates from the memory list to the stable storage.
-   * Flush is done by chunk sized to 500 messages, starting from the
-   * beginning of the list.
+   * <p>
+   * Flush is done by chunk sized to 500 messages, starting from the beginning
+   * of the list.
+   *
+   * @throws ChangelogException
+   *           If a database problem happened
    */
-  public void flush()
+  public void flush() throws ChangelogException
   {
     int size;
     int chunksize = Math.min(queueMaxSize, 500);

--
Gitblit v1.10.0