mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.51.2013 74328205a692a025fbf44a6af9c9f1f456ef34df
OPENDJ-1130 (CR-2323) Connect Error when stopping all the servers from a replication topology at the same time

Properly marked the state of the thread + checked it from the deeper DbHandler.trim() to ensure the thread stops as fast as possible. DB reentrant locsk then ensure the threads properly wait on each other before shutdown.

DirectoryThread.java:
Added ThreadState enum + a threadState AtomicReference instance field.
Added isShutdownInitiated(), initiateShutdown(), startWork(), stopWork(), setThreadStateIfNotShuttingDown(), getThreadState().
Converted comments to javadocs.
In uncaughtException(), do not log ThreadDeath error (coming from Ludo, suggested as part of OPENDJ-1061).

DbHandler.java:
Removed shutdown and done instance fields (replaced by the DirectoryThread.threadState). The done instance field had an ill defined purpose and its usefulness was questionable.
In shutdown(), used DirectoryThread thread state management methods + removed dead code.
In run() and trim(), used DirectoryThread thread state management methods.
2 files modified
316 ■■■■■ changed files
opends/src/server/org/opends/server/api/DirectoryThread.java 203 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java 113 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/api/DirectoryThread.java
@@ -23,34 +23,28 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 */
package org.opends.server.api;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Message;
import org.opends.server.backends.task.Task;
import org.opends.server.core.DirectoryServer;
import static org.opends.server.loggers.debug.DebugLogger.
    debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DN;
import static org.opends.server.util.StaticUtils.stackTraceToString;
import static org.opends.server.util.ServerConstants.
    ALERT_TYPE_UNCAUGHT_EXCEPTION;
import static org.opends.server.util.ServerConstants.
    ALERT_DESCRIPTION_UNCAUGHT_EXCEPTION;
import org.opends.messages.Message;
import static org.opends.messages.CoreMessages.
    ERR_UNCAUGHT_THREAD_EXCEPTION;
import org.opends.server.types.DebugLogLevel;
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a generic thread that should be the superclass
@@ -75,9 +69,55 @@
     mayInstantiate=true,
     mayExtend=true,
     mayInvoke=true)
public class DirectoryThread
       extends Thread
public class DirectoryThread extends Thread
{
  /**
   * Enumeration holding the "logical" (application) thread states, as opposed
   * to the operating system-level java.lang.Thread.State.
   */
  private static enum ThreadState
  {
    /** The current thread is currently not doing any useful work. */
    IDLE(false),
    /** The current thread is currently processing a task, doing useful work. */
    PROCESSING(false),
    /** The current thread is in the process of shutting down. */
    SHUTTING_DOWN(true),
    /**
     * The current thread has stopped running. Equivalent to
     * java.lang.Thread.State.TERMINATED.
     */
    STOPPED(true);
    /**
     * Whether this state implies a shutdown has been initiated or completed.
     */
    private final boolean shutdownInitiated;
    /**
     * Constructs an instance of this enum.
     *
     * @param shutdownInitiated
     *          whether this state implies a shutdown was initiated.
     */
    private ThreadState(boolean shutdownInitiated)
    {
      this.shutdownInitiated = shutdownInitiated;
    }
    /**
     * Returns whether the current thread started the shutdown process.
     *
     * @return true if the current thread started the shutdown process, false
     *         otherwise.
     */
    public boolean isShutdownInitiated()
    {
      return shutdownInitiated;
    }
  }
  /**
   * A factory which can be used by thread pool based services such as
   * {@code Executor}s to dynamically create new
@@ -85,22 +125,19 @@
   */
  public static final class Factory implements ThreadFactory
  {
    // The name prefix used for all threads created using this
    // factory.
    /** The name prefix used for all threads created using this factory. */
    private final String threadNamePrefix;
    // The ID to use for the next thread created using this factory.
    /** The ID to use for the next thread created using this factory. */
    private final AtomicInteger nextID = new AtomicInteger();
    /**
     * Creates a new directory thread factory using the provided
     * thread name prefix.
     *
     * @param threadNamePrefix
     *          The name prefix used for all threads created using
     *          this factory.
     *          The name prefix used for all threads created using this factory.
     */
    public Factory(String threadNamePrefix)
    {
@@ -116,6 +153,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public Thread newThread(Runnable r)
    {
      return new DirectoryThread(r, threadNamePrefix + " "
@@ -138,15 +176,19 @@
  public static final DirectoryThreadGroup DIRECTORY_THREAD_GROUP =
      new DirectoryThreadGroup();
  // The stack trace taken at the time that this thread was created.
  /** The stack trace taken at the time that this thread was created. */
  private StackTraceElement[] creationStackTrace;
  // The task with which this thread is associated, if any.
  /** The task with which this thread is associated, if any. */
  private Task task;
  // A reference to the thread that was used to create this thread.
  /** A reference to the thread that was used to create this thread. */
  private Thread parentThread;
  /** The current logical thread's state. */
  private volatile AtomicReference<ThreadState> threadState =
      new AtomicReference<ThreadState>(ThreadState.IDLE);
  /**
   * A thread group for all directory threads. This implements a
   * custom unhandledException handler that logs the error.
@@ -170,6 +212,7 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public DN getComponentEntryDN() {
      return DN.NULL_DN;
    }
@@ -177,14 +220,16 @@
    /**
     * {@inheritDoc}
     */
    @Override
    public String getClassName() {
      return "org.opends.server.api.DirectoryThread";
      return DirectoryThread.class.getName();
    }
    /**
     * {@inheritDoc}
     */
    public LinkedHashMap<String, String> getAlerts() {
    @Override
    public Map<String, String> getAlerts() {
      return alerts;
    }
@@ -202,6 +247,12 @@
    @Override
    public void uncaughtException(Thread t, Throwable e)
    {
      if (e instanceof ThreadDeath)
      {
        // Ignore ThreadDeath errors that can happen when everything is being
        // shutdown.
        return;
      }
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -225,10 +276,7 @@
   */
  public DirectoryThread(Runnable target, String threadName)
  {
    super (DIRECTORY_THREAD_GROUP, target,
           threadName);
    super(DIRECTORY_THREAD_GROUP, target, threadName);
    init();
  }
@@ -242,8 +290,6 @@
  protected DirectoryThread(String threadName)
  {
    super(DIRECTORY_THREAD_GROUP, threadName);
    init();
  }
@@ -331,21 +377,100 @@
  /**
   * Retrieves any relevent debug information with which this tread is
   * Retrieves any relevant debug information with which this tread is
   * associated so they can be included in debug messages.
   *
   * @return debug information about this thread as a string.
   */
  public Map<String, String> getDebugProperties()
  {
    LinkedHashMap<String, String> properties =
        new LinkedHashMap<String, String>();
    Map<String, String> properties = new LinkedHashMap<String, String>();
    properties.put("parentThread", parentThread.getName() +
        "(" + parentThread.getId() + ")");
    properties.put("isDaemon", String.valueOf(this.isDaemon()));
    properties.put("isDaemon", String.valueOf(isDaemon()));
    return properties;
  }
  /**
   * Returns whether the shutdown process has been initiated on the current
   * thread. It also returns true when the thread is actually terminated.
   * <p>
   * Waiting for the thread to terminate should be done by invoking one of the
   * {@link Thread#join()} methods.
   *
   * @return true if the shutdown process has been initiated on the current
   *         thread, false otherwise.
   */
  public boolean isShutdownInitiated()
  {
    return getThreadState().get().isShutdownInitiated();
  }
  /**
   * Instructs the current thread to initiate the shutdown process. The actual
   * shutdown of the thread is a best effort and is dependent on the
   * implementation of the {@link Thread#run()} method.
   */
  public void initiateShutdown()
  {
    setThreadStateIfNotShuttingDown(ThreadState.SHUTTING_DOWN);
  }
  /**
   * Sets the current thread state to "processing" if the shutdown process was
   * not initiated.
   */
  public void startWork()
  {
    setThreadStateIfNotShuttingDown(ThreadState.PROCESSING);
  }
  /**
   * Sets the current thread state to "idle" if the shutdown process was not
   * initiated.
   */
  public void stopWork()
  {
    setThreadStateIfNotShuttingDown(ThreadState.IDLE);
  }
  /**
   * Sets this thread's current state to the passed in newState if the thread is
   * not already in a shutting down state.
   *
   * @param newState
   *          the new state to set
   */
  private void setThreadStateIfNotShuttingDown(ThreadState newState)
  {
    ThreadState currentState = this.threadState.get();
    while (!currentState.isShutdownInitiated())
    {
      if (this.threadState.compareAndSet(currentState, newState))
      {
        return;
      }
      currentState = this.threadState.get();
    }
  }
  /**
   * Returns the current thread state, possibly returning
   * {@link ThreadState#STOPPED} if the thread is not alive.
   *
   * @return an {@link AtomicReference} to a ThreadState. It can be passed down
   *         as a method call parameter.
   */
  private AtomicReference<ThreadState> getThreadState()
  {
    if (!isAlive())
    {
      this.threadState.set(ThreadState.STOPPED);
    }
    return this.threadState;
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -115,8 +115,6 @@
  private int serverId;
  private String baseDn;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private boolean shutdown = false;
  private boolean done = false;
  private DirectoryThread thread;
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
@@ -310,30 +308,18 @@
   */
  public void shutdown()
  {
    if (shutdown)
    if (thread.isShutdownInitiated())
    {
      return;
    }
    shutdown  = true;
    thread.initiateShutdown();
    synchronized (msgQueue)
    {
      msgQueue.notifyAll();
    }
    synchronized (this)
    { /* Can this be replaced with thread.join() ? */
      while (!done)
      {
        try
        {
          wait();
        }
        catch (InterruptedException e)
        { /* do nothing */}
      }
    }
    while (msgQueue.size() != 0)
    {
      flush();
@@ -351,55 +337,62 @@
  @Override
  public void run()
  {
    while (!shutdown)
    {
      try
      {
        flush();
        trim();
    thread.startWork();
        synchronized (msgQueue)
    try
    {
      while (!thread.isShutdownInitiated())
      {
        try
        {
          if (msgQueue.size() < queueLowmark
              && queueByteSize < queueLowmarkBytes)
          flush();
          trim();
          synchronized (msgQueue)
          {
            try
            if (msgQueue.size() < queueLowmark
                && queueByteSize < queueLowmarkBytes)
            {
              msgQueue.wait(1000);
            } catch (InterruptedException e)
            {
              Thread.currentThread().interrupt();
              try
              {
                msgQueue.wait(1000);
              }
              catch (InterruptedException e)
              {
                Thread.currentThread().interrupt();
              }
            }
          }
        }
      } catch (Exception end)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(end));
        logError(mb.toMessage());
        synchronized (this)
        catch (Exception end)
        {
          // set the done variable to true so that this thread don't
          // get stuck in this dbHandler.shutdown() when it get called
          // by replicationServer.shutdown();
          done = true;
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
          mb.append(" ");
          mb.append(stackTraceToSingleLineString(end));
          logError(mb.toMessage());
          thread.initiateShutdown();
          if (replicationServer != null)
          {
            replicationServer.shutdown();
          }
          break;
        }
        if (replicationServer != null)
        {
          replicationServer.shutdown();
        }
        break;
      }
      // call flush a last time before exiting to make sure that
      // no change was forgotten in the msgQueue
      flush();
    }
    // call flush a last time before exiting to make sure that
    // no change was forgotten in the msgQueue
    flush();
    finally
    {
      thread.stopWork();
    }
    synchronized (this)
    {
      done = true;
      notifyAll();
    }
  }
@@ -450,11 +443,14 @@
        {
          for (int j = 0; j < 50; j++)
          {
            if (thread.isShutdownInitiated())
            {
              return;
            }
            CSN csn = cursor.nextCSN();
            if (csn == null)
            {
              cursor.close();
              done = true;
              return;
            }
@@ -465,21 +461,22 @@
            else
            {
              firstChange = csn;
              cursor.close();
              done = true;
              return;
            }
          }
          cursor.close();
        }
        catch (ChangelogException e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          cursor.abort();
          shutdown = true;
          thread.initiateShutdown();
          throw e;
        }
        finally
        {
          cursor.close();
        }
      }
    }
  }