From 74328205a692a025fbf44a6af9c9f1f456ef34df Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Sep 2013 07:51:42 +0000
Subject: [PATCH] OPENDJ-1130 (CR-2323) Connect Error when stopping all the servers from a replication topology at the same time

---
 opends/src/server/org/opends/server/api/DirectoryThread.java |  203 +++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 164 insertions(+), 39 deletions(-)

diff --git a/opends/src/server/org/opends/server/api/DirectoryThread.java b/opends/src/server/org/opends/server/api/DirectoryThread.java
index cb3e9f0..98fa584 100644
--- a/opends/src/server/org/opends/server/api/DirectoryThread.java
+++ b/opends/src/server/org/opends/server/api/DirectoryThread.java
@@ -23,34 +23,28 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2013 ForgeRock AS
  */
 package org.opends.server.api;
 
-
-
-import java.util.Map;
 import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.opends.messages.Message;
 import org.opends.server.backends.task.Task;
 import org.opends.server.core.DirectoryServer;
-import static org.opends.server.loggers.debug.DebugLogger.
-    debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DN;
-import static org.opends.server.util.StaticUtils.stackTraceToString;
-import static org.opends.server.util.ServerConstants.
-    ALERT_TYPE_UNCAUGHT_EXCEPTION;
-import static org.opends.server.util.ServerConstants.
-    ALERT_DESCRIPTION_UNCAUGHT_EXCEPTION;
-import org.opends.messages.Message;
-import static org.opends.messages.CoreMessages.
-    ERR_UNCAUGHT_THREAD_EXCEPTION;
+import org.opends.server.types.DebugLogLevel;
 
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.*;
 
 /**
  * This class defines a generic thread that should be the superclass
@@ -75,9 +69,55 @@
      mayInstantiate=true,
      mayExtend=true,
      mayInvoke=true)
-public class DirectoryThread
-       extends Thread
+public class DirectoryThread extends Thread
 {
+
+  /**
+   * Enumeration holding the "logical" (application) thread states, as opposed
+   * to the operating system-level java.lang.Thread.State.
+   */
+  private static enum ThreadState
+  {
+    /** The current thread is currently not doing any useful work. */
+    IDLE(false),
+    /** The current thread is currently processing a task, doing useful work. */
+    PROCESSING(false),
+    /** The current thread is in the process of shutting down. */
+    SHUTTING_DOWN(true),
+    /**
+     * The current thread has stopped running. Equivalent to
+     * java.lang.Thread.State.TERMINATED.
+     */
+    STOPPED(true);
+
+    /**
+     * Whether this state implies a shutdown has been initiated or completed.
+     */
+    private final boolean shutdownInitiated;
+
+    /**
+     * Constructs an instance of this enum.
+     *
+     * @param shutdownInitiated
+     *          whether this state implies a shutdown was initiated.
+     */
+    private ThreadState(boolean shutdownInitiated)
+    {
+      this.shutdownInitiated = shutdownInitiated;
+    }
+
+    /**
+     * Returns whether the current thread started the shutdown process.
+     *
+     * @return true if the current thread started the shutdown process, false
+     *         otherwise.
+     */
+    public boolean isShutdownInitiated()
+    {
+      return shutdownInitiated;
+    }
+  }
+
   /**
    * A factory which can be used by thread pool based services such as
    * {@code Executor}s to dynamically create new
@@ -85,22 +125,19 @@
    */
   public static final class Factory implements ThreadFactory
   {
-    // The name prefix used for all threads created using this
-    // factory.
+    /** The name prefix used for all threads created using this factory. */
     private final String threadNamePrefix;
 
-    // The ID to use for the next thread created using this factory.
+    /** The ID to use for the next thread created using this factory. */
     private final AtomicInteger nextID = new AtomicInteger();
 
 
-
     /**
      * Creates a new directory thread factory using the provided
      * thread name prefix.
      *
      * @param threadNamePrefix
-     *          The name prefix used for all threads created using
-     *          this factory.
+     *          The name prefix used for all threads created using this factory.
      */
     public Factory(String threadNamePrefix)
     {
@@ -116,6 +153,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public Thread newThread(Runnable r)
     {
       return new DirectoryThread(r, threadNamePrefix + " "
@@ -138,15 +176,19 @@
   public static final DirectoryThreadGroup DIRECTORY_THREAD_GROUP =
       new DirectoryThreadGroup();
 
-  // The stack trace taken at the time that this thread was created.
+  /** The stack trace taken at the time that this thread was created. */
   private StackTraceElement[] creationStackTrace;
 
-  // The task with which this thread is associated, if any.
+  /** The task with which this thread is associated, if any. */
   private Task task;
 
-  // A reference to the thread that was used to create this thread.
+  /** A reference to the thread that was used to create this thread. */
   private Thread parentThread;
 
+  /** The current logical thread's state. */
+  private volatile AtomicReference<ThreadState> threadState =
+      new AtomicReference<ThreadState>(ThreadState.IDLE);
+
   /**
    * A thread group for all directory threads. This implements a
    * custom unhandledException handler that logs the error.
@@ -170,6 +212,7 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public DN getComponentEntryDN() {
       return DN.NULL_DN;
     }
@@ -177,14 +220,16 @@
     /**
      * {@inheritDoc}
      */
+    @Override
     public String getClassName() {
-      return "org.opends.server.api.DirectoryThread";
+      return DirectoryThread.class.getName();
     }
 
     /**
      * {@inheritDoc}
      */
-    public LinkedHashMap<String, String> getAlerts() {
+    @Override
+    public Map<String, String> getAlerts() {
       return alerts;
     }
 
@@ -202,6 +247,12 @@
     @Override
     public void uncaughtException(Thread t, Throwable e)
     {
+      if (e instanceof ThreadDeath)
+      {
+        // Ignore ThreadDeath errors that can happen when everything is being
+        // shutdown.
+        return;
+      }
       if (debugEnabled())
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -225,10 +276,7 @@
    */
   public DirectoryThread(Runnable target, String threadName)
   {
-    super (DIRECTORY_THREAD_GROUP, target,
-           threadName);
-
-
+    super(DIRECTORY_THREAD_GROUP, target, threadName);
     init();
   }
 
@@ -242,8 +290,6 @@
   protected DirectoryThread(String threadName)
   {
     super(DIRECTORY_THREAD_GROUP, threadName);
-
-
     init();
   }
 
@@ -331,21 +377,100 @@
 
 
   /**
-   * Retrieves any relevent debug information with which this tread is
+   * Retrieves any relevant debug information with which this tread is
    * associated so they can be included in debug messages.
    *
    * @return debug information about this thread as a string.
    */
   public Map<String, String> getDebugProperties()
   {
-    LinkedHashMap<String, String> properties =
-        new LinkedHashMap<String, String>();
+    Map<String, String> properties = new LinkedHashMap<String, String>();
 
     properties.put("parentThread", parentThread.getName() +
         "(" + parentThread.getId() + ")");
-    properties.put("isDaemon", String.valueOf(this.isDaemon()));
+    properties.put("isDaemon", String.valueOf(isDaemon()));
 
     return properties;
   }
+
+  /**
+   * Returns whether the shutdown process has been initiated on the current
+   * thread. It also returns true when the thread is actually terminated.
+   * <p>
+   * Waiting for the thread to terminate should be done by invoking one of the
+   * {@link Thread#join()} methods.
+   *
+   * @return true if the shutdown process has been initiated on the current
+   *         thread, false otherwise.
+   */
+  public boolean isShutdownInitiated()
+  {
+    return getThreadState().get().isShutdownInitiated();
+  }
+
+  /**
+   * Instructs the current thread to initiate the shutdown process. The actual
+   * shutdown of the thread is a best effort and is dependent on the
+   * implementation of the {@link Thread#run()} method.
+   */
+  public void initiateShutdown()
+  {
+    setThreadStateIfNotShuttingDown(ThreadState.SHUTTING_DOWN);
+  }
+
+  /**
+   * Sets the current thread state to "processing" if the shutdown process was
+   * not initiated.
+   */
+  public void startWork()
+  {
+    setThreadStateIfNotShuttingDown(ThreadState.PROCESSING);
+  }
+
+  /**
+   * Sets the current thread state to "idle" if the shutdown process was not
+   * initiated.
+   */
+  public void stopWork()
+  {
+    setThreadStateIfNotShuttingDown(ThreadState.IDLE);
+  }
+
+  /**
+   * Sets this thread's current state to the passed in newState if the thread is
+   * not already in a shutting down state.
+   *
+   * @param newState
+   *          the new state to set
+   */
+  private void setThreadStateIfNotShuttingDown(ThreadState newState)
+  {
+    ThreadState currentState = this.threadState.get();
+    while (!currentState.isShutdownInitiated())
+    {
+      if (this.threadState.compareAndSet(currentState, newState))
+      {
+        return;
+      }
+      currentState = this.threadState.get();
+    }
+  }
+
+  /**
+   * Returns the current thread state, possibly returning
+   * {@link ThreadState#STOPPED} if the thread is not alive.
+   *
+   * @return an {@link AtomicReference} to a ThreadState. It can be passed down
+   *         as a method call parameter.
+   */
+  private AtomicReference<ThreadState> getThreadState()
+  {
+    if (!isAlive())
+    {
+      this.threadState.set(ThreadState.STOPPED);
+    }
+    return this.threadState;
+  }
+
 }
 

--
Gitblit v1.10.0