From 0b8de8e6dbc4760fa587d12dd868c7c9f577b261 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Sep 2013 07:51:42 +0000
Subject: [PATCH] OPENDJ-1130 (CR-2323) Connect Error when stopping all the servers from a replication topology at the same time

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java |  113 +++++++++---------
 opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java                       |  203 +++++++++++++++++++++++++++------
 2 files changed, 219 insertions(+), 97 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java b/opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java
index cb3e9f0..98fa584 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java
@@ -23,34 +23,28 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2013 ForgeRock AS
  */
 package org.opends.server.api;
 
-
-
-import java.util.Map;
 import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.opends.messages.Message;
 import org.opends.server.backends.task.Task;
 import org.opends.server.core.DirectoryServer;
-import static org.opends.server.loggers.debug.DebugLogger.
-    debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DN;
-import static org.opends.server.util.StaticUtils.stackTraceToString;
-import static org.opends.server.util.ServerConstants.
-    ALERT_TYPE_UNCAUGHT_EXCEPTION;
-import static org.opends.server.util.ServerConstants.
-    ALERT_DESCRIPTION_UNCAUGHT_EXCEPTION;
-import org.opends.messages.Message;
-import static org.opends.messages.CoreMessages.
-    ERR_UNCAUGHT_THREAD_EXCEPTION;
+import org.opends.server.types.DebugLogLevel;
 
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.*;
 
 /**
  * This class defines a generic thread that should be the superclass
@@ -75,9 +69,55 @@
      mayInstantiate=true,
      mayExtend=true,
      mayInvoke=true)
-public class DirectoryThread
-       extends Thread
+public class DirectoryThread extends Thread
 {
+
+  /**
+   * Enumeration holding the "logical" (application) thread states, as opposed
+   * to the operating system-level java.lang.Thread.State.
+   */
+  private static enum ThreadState
+  {
+    /** The current thread is currently not doing any useful work. */
+    IDLE(false),
+    /** The current thread is currently processing a task, doing useful work. */
+    PROCESSING(false),
+    /** The current thread is in the process of shutting down. */
+    SHUTTING_DOWN(true),
+    /**
+     * The current thread has stopped running. Equivalent to
+     * java.lang.Thread.State.TERMINATED.
+     */
+    STOPPED(true);
+
+    /**
+     * Whether this state implies a shutdown has been initiated or completed.
+     */
+    private final boolean shutdownInitiated;
+
+    /**
+     * Constructs an instance of this enum.
+     *
+     * @param shutdownInitiated
+     *          whether this state implies a shutdown was initiated.
+     */
+    private ThreadState(boolean shutdownInitiated)
+    {
+      this.shutdownInitiated = shutdownInitiated;
+    }
+
+    /**
+     * Returns whether the current thread started the shutdown process.
+     *
+     * @return true if the current thread started the shutdown process, false
+     *         otherwise.
+     */
+    public boolean isShutdownInitiated()
+    {
+      return shutdownInitiated;
+    }
+  }
+
   /**
    * A factory which can be used by thread pool based services such as
    * {@code Executor}s to dynamically create new
@@ -85,22 +125,19 @@
    */
   public static final class Factory implements ThreadFactory
   {
-    // The name prefix used for all threads created using this
-    // factory.
+    /** The name prefix used for all threads created using this factory. */
     private final String threadNamePrefix;
 
-    // The ID to use for the next thread created using this factory.
+    /** The ID to use for the next thread created using this factory. */
     private final AtomicInteger nextID = new AtomicInteger();
 
 
-
     /**
      * Creates a new directory thread factory using the provided
      * thread name prefix.
      *
      * @param threadNamePrefix
-     *          The name prefix used for all threads created using
-     *          this factory.
+     *          The name prefix used for all threads created using this factory.
      */
     public Factory(String threadNamePrefix)
     {
@@ -116,6 +153,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public Thread newThread(Runnable r)
     {
       return new DirectoryThread(r, threadNamePrefix + " "
@@ -138,15 +176,19 @@
   public static final DirectoryThreadGroup DIRECTORY_THREAD_GROUP =
       new DirectoryThreadGroup();
 
-  // The stack trace taken at the time that this thread was created.
+  /** The stack trace taken at the time that this thread was created. */
   private StackTraceElement[] creationStackTrace;
 
-  // The task with which this thread is associated, if any.
+  /** The task with which this thread is associated, if any. */
   private Task task;
 
-  // A reference to the thread that was used to create this thread.
+  /** A reference to the thread that was used to create this thread. */
   private Thread parentThread;
 
+  /** The current logical thread's state. */
+  private volatile AtomicReference<ThreadState> threadState =
+      new AtomicReference<ThreadState>(ThreadState.IDLE);
+
   /**
    * A thread group for all directory threads. This implements a
    * custom unhandledException handler that logs the error.
@@ -170,6 +212,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public DN getComponentEntryDN() {
       return DN.NULL_DN;
     }
@@ -177,14 +220,16 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public String getClassName() {
-      return "org.opends.server.api.DirectoryThread";
+      return DirectoryThread.class.getName();
     }
 
     /**
      * {@inheritDoc}
      */
-    public LinkedHashMap<String, String> getAlerts() {
+    @Override
+    public Map<String, String> getAlerts() {
       return alerts;
     }
 
@@ -202,6 +247,12 @@
     @Override
     public void uncaughtException(Thread t, Throwable e)
     {
+      if (e instanceof ThreadDeath)
+      {
+        // Ignore ThreadDeath errors that can happen when everything is being
+        // shutdown.
+        return;
+      }
       if (debugEnabled())
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -225,10 +276,7 @@
    */
   public DirectoryThread(Runnable target, String threadName)
   {
-    super (DIRECTORY_THREAD_GROUP, target,
-           threadName);
-
-
+    super(DIRECTORY_THREAD_GROUP, target, threadName);
     init();
   }
 
@@ -242,8 +290,6 @@
   protected DirectoryThread(String threadName)
   {
     super(DIRECTORY_THREAD_GROUP, threadName);
-
-
     init();
   }
 
@@ -331,21 +377,100 @@
 
 
   /**
-   * Retrieves any relevent debug information with which this tread is
+   * Retrieves any relevant debug information with which this tread is
    * associated so they can be included in debug messages.
    *
    * @return debug information about this thread as a string.
    */
   public Map<String, String> getDebugProperties()
   {
-    LinkedHashMap<String, String> properties =
-        new LinkedHashMap<String, String>();
+    Map<String, String> properties = new LinkedHashMap<String, String>();
 
     properties.put("parentThread", parentThread.getName() +
         "(" + parentThread.getId() + ")");
-    properties.put("isDaemon", String.valueOf(this.isDaemon()));
+    properties.put("isDaemon", String.valueOf(isDaemon()));
 
     return properties;
   }
+
+  /**
+   * Returns whether the shutdown process has been initiated on the current
+   * thread. It also returns true when the thread is actually terminated.
+   * <p>
+   * Waiting for the thread to terminate should be done by invoking one of the
+   * {@link Thread#join()} methods.
+   *
+   * @return true if the shutdown process has been initiated on the current
+   *         thread, false otherwise.
+   */
+  public boolean isShutdownInitiated()
+  {
+    return getThreadState().get().isShutdownInitiated();
+  }
+
+  /**
+   * Instructs the current thread to initiate the shutdown process. The actual
+   * shutdown of the thread is a best effort and is dependent on the
+   * implementation of the {@link Thread#run()} method.
+   */
+  public void initiateShutdown()
+  {
+    setThreadStateIfNotShuttingDown(ThreadState.SHUTTING_DOWN);
+  }
+
+  /**
+   * Sets the current thread state to "processing" if the shutdown process was
+   * not initiated.
+   */
+  public void startWork()
+  {
+    setThreadStateIfNotShuttingDown(ThreadState.PROCESSING);
+  }
+
+  /**
+   * Sets the current thread state to "idle" if the shutdown process was not
+   * initiated.
+   */
+  public void stopWork()
+  {
+    setThreadStateIfNotShuttingDown(ThreadState.IDLE);
+  }
+
+  /**
+   * Sets this thread's current state to the passed in newState if the thread is
+   * not already in a shutting down state.
+   *
+   * @param newState
+   *          the new state to set
+   */
+  private void setThreadStateIfNotShuttingDown(ThreadState newState)
+  {
+    ThreadState currentState = this.threadState.get();
+    while (!currentState.isShutdownInitiated())
+    {
+      if (this.threadState.compareAndSet(currentState, newState))
+      {
+        return;
+      }
+      currentState = this.threadState.get();
+    }
+  }
+
+  /**
+   * Returns the current thread state, possibly returning
+   * {@link ThreadState#STOPPED} if the thread is not alive.
+   *
+   * @return an {@link AtomicReference} to a ThreadState. It can be passed down
+   *         as a method call parameter.
+   */
+  private AtomicReference<ThreadState> getThreadState()
+  {
+    if (!isAlive())
+    {
+      this.threadState.set(ThreadState.STOPPED);
+    }
+    return this.threadState;
+  }
+
 }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index 55519b1..0ba2b64 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -115,8 +115,6 @@
   private int serverId;
   private String baseDn;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
-  private boolean shutdown = false;
-  private boolean done = false;
   private DirectoryThread thread;
   private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
@@ -310,30 +308,18 @@
    */
   public void shutdown()
   {
-    if (shutdown)
+    if (thread.isShutdownInitiated())
     {
       return;
     }
 
-    shutdown  = true;
+    thread.initiateShutdown();
+
     synchronized (msgQueue)
     {
       msgQueue.notifyAll();
     }
 
-    synchronized (this)
-    { /* Can this be replaced with thread.join() ? */
-      while (!done)
-      {
-        try
-        {
-          wait();
-        }
-        catch (InterruptedException e)
-        { /* do nothing */}
-      }
-    }
-
     while (msgQueue.size() != 0)
     {
       flush();
@@ -351,55 +337,62 @@
   @Override
   public void run()
   {
-    while (!shutdown)
-    {
-      try
-      {
-        flush();
-        trim();
+    thread.startWork();
 
-        synchronized (msgQueue)
+    try
+    {
+      while (!thread.isShutdownInitiated())
+      {
+        try
         {
-          if (msgQueue.size() < queueLowmark
-              && queueByteSize < queueLowmarkBytes)
+          flush();
+          trim();
+
+          synchronized (msgQueue)
           {
-            try
+            if (msgQueue.size() < queueLowmark
+                && queueByteSize < queueLowmarkBytes)
             {
-              msgQueue.wait(1000);
-            } catch (InterruptedException e)
-            {
-              Thread.currentThread().interrupt();
+              try
+              {
+                msgQueue.wait(1000);
+              }
+              catch (InterruptedException e)
+              {
+                Thread.currentThread().interrupt();
+              }
             }
           }
         }
-      } catch (Exception end)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
-        mb.append(" ");
-        mb.append(stackTraceToSingleLineString(end));
-        logError(mb.toMessage());
-        synchronized (this)
+        catch (Exception end)
         {
-          // set the done variable to true so that this thread don't
-          // get stuck in this dbHandler.shutdown() when it get called
-          // by replicationServer.shutdown();
-          done = true;
+          MessageBuilder mb = new MessageBuilder();
+          mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
+          mb.append(" ");
+          mb.append(stackTraceToSingleLineString(end));
+          logError(mb.toMessage());
+
+          thread.initiateShutdown();
+
+          if (replicationServer != null)
+          {
+            replicationServer.shutdown();
+          }
+          break;
         }
-        if (replicationServer != null)
-        {
-          replicationServer.shutdown();
-        }
-        break;
       }
+
+      // call flush a last time before exiting to make sure that
+      // no change was forgotten in the msgQueue
+      flush();
     }
-    // call flush a last time before exiting to make sure that
-    // no change was forgotten in the msgQueue
-    flush();
+    finally
+    {
+      thread.stopWork();
+    }
 
     synchronized (this)
     {
-      done = true;
       notifyAll();
     }
   }
@@ -450,11 +443,14 @@
         {
           for (int j = 0; j < 50; j++)
           {
+            if (thread.isShutdownInitiated())
+            {
+              return;
+            }
+
             CSN csn = cursor.nextCSN();
             if (csn == null)
             {
-              cursor.close();
-              done = true;
               return;
             }
 
@@ -465,21 +461,22 @@
             else
             {
               firstChange = csn;
-              cursor.close();
-              done = true;
               return;
             }
           }
-          cursor.close();
         }
         catch (ChangelogException e)
         {
           // mark shutdown for this db so that we don't try again to
           // stop it from cursor.close() or methods called by cursor.close()
           cursor.abort();
-          shutdown = true;
+          thread.initiateShutdown();
           throw e;
         }
+        finally
+        {
+          cursor.close();
+        }
       }
     }
   }

--
Gitblit v1.10.0