From 0b8de8e6dbc4760fa587d12dd868c7c9f577b261 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Sep 2013 07:51:42 +0000
Subject: [PATCH] OPENDJ-1130 (CR-2323) Connect Error when stopping all the servers from a replication topology at the same time
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java | 113 +++++++++---------
opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java | 203 +++++++++++++++++++++++++++------
2 files changed, 219 insertions(+), 97 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java b/opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java
index cb3e9f0..98fa584 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/api/DirectoryThread.java
@@ -23,34 +23,28 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS
*/
package org.opends.server.api;
-
-
-import java.util.Map;
import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opends.messages.Message;
import org.opends.server.backends.task.Task;
import org.opends.server.core.DirectoryServer;
-import static org.opends.server.loggers.debug.DebugLogger.
- debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DN;
-import static org.opends.server.util.StaticUtils.stackTraceToString;
-import static org.opends.server.util.ServerConstants.
- ALERT_TYPE_UNCAUGHT_EXCEPTION;
-import static org.opends.server.util.ServerConstants.
- ALERT_DESCRIPTION_UNCAUGHT_EXCEPTION;
-import org.opends.messages.Message;
-import static org.opends.messages.CoreMessages.
- ERR_UNCAUGHT_THREAD_EXCEPTION;
+import org.opends.server.types.DebugLogLevel;
+import static org.opends.messages.CoreMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.*;
/**
* This class defines a generic thread that should be the superclass
@@ -75,9 +69,55 @@
mayInstantiate=true,
mayExtend=true,
mayInvoke=true)
-public class DirectoryThread
- extends Thread
+public class DirectoryThread extends Thread
{
+
+ /**
+ * Enumeration holding the "logical" (application) thread states, as opposed
+ * to the operating system-level java.lang.Thread.State.
+ */
+ private static enum ThreadState
+ {
+ /** The current thread is currently not doing any useful work. */
+ IDLE(false),
+ /** The current thread is currently processing a task, doing useful work. */
+ PROCESSING(false),
+ /** The current thread is in the process of shutting down. */
+ SHUTTING_DOWN(true),
+ /**
+ * The current thread has stopped running. Equivalent to
+ * java.lang.Thread.State.TERMINATED.
+ */
+ STOPPED(true);
+
+ /**
+ * Whether this state implies a shutdown has been initiated or completed.
+ */
+ private final boolean shutdownInitiated;
+
+ /**
+ * Constructs an instance of this enum.
+ *
+ * @param shutdownInitiated
+ * whether this state implies a shutdown was initiated.
+ */
+ private ThreadState(boolean shutdownInitiated)
+ {
+ this.shutdownInitiated = shutdownInitiated;
+ }
+
+ /**
+ * Returns whether the current thread started the shutdown process.
+ *
+ * @return true if the current thread started the shutdown process, false
+ * otherwise.
+ */
+ public boolean isShutdownInitiated()
+ {
+ return shutdownInitiated;
+ }
+ }
+
/**
* A factory which can be used by thread pool based services such as
* {@code Executor}s to dynamically create new
@@ -85,22 +125,19 @@
*/
public static final class Factory implements ThreadFactory
{
- // The name prefix used for all threads created using this
- // factory.
+ /** The name prefix used for all threads created using this factory. */
private final String threadNamePrefix;
- // The ID to use for the next thread created using this factory.
+ /** The ID to use for the next thread created using this factory. */
private final AtomicInteger nextID = new AtomicInteger();
-
/**
* Creates a new directory thread factory using the provided
* thread name prefix.
*
* @param threadNamePrefix
- * The name prefix used for all threads created using
- * this factory.
+ * The name prefix used for all threads created using this factory.
*/
public Factory(String threadNamePrefix)
{
@@ -116,6 +153,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public Thread newThread(Runnable r)
{
return new DirectoryThread(r, threadNamePrefix + " "
@@ -138,15 +176,19 @@
public static final DirectoryThreadGroup DIRECTORY_THREAD_GROUP =
new DirectoryThreadGroup();
- // The stack trace taken at the time that this thread was created.
+ /** The stack trace taken at the time that this thread was created. */
private StackTraceElement[] creationStackTrace;
- // The task with which this thread is associated, if any.
+ /** The task with which this thread is associated, if any. */
private Task task;
- // A reference to the thread that was used to create this thread.
+ /** A reference to the thread that was used to create this thread. */
private Thread parentThread;
+ /** The current logical thread's state. */
+ private volatile AtomicReference<ThreadState> threadState =
+ new AtomicReference<ThreadState>(ThreadState.IDLE);
+
/**
* A thread group for all directory threads. This implements a
* custom unhandledException handler that logs the error.
@@ -170,6 +212,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public DN getComponentEntryDN() {
return DN.NULL_DN;
}
@@ -177,14 +220,16 @@
/**
* {@inheritDoc}
*/
+ @Override
public String getClassName() {
- return "org.opends.server.api.DirectoryThread";
+ return DirectoryThread.class.getName();
}
/**
* {@inheritDoc}
*/
- public LinkedHashMap<String, String> getAlerts() {
+ @Override
+ public Map<String, String> getAlerts() {
return alerts;
}
@@ -202,6 +247,12 @@
@Override
public void uncaughtException(Thread t, Throwable e)
{
+ if (e instanceof ThreadDeath)
+ {
+ // Ignore ThreadDeath errors that can happen when everything is being
+ // shutdown.
+ return;
+ }
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -225,10 +276,7 @@
*/
public DirectoryThread(Runnable target, String threadName)
{
- super (DIRECTORY_THREAD_GROUP, target,
- threadName);
-
-
+ super(DIRECTORY_THREAD_GROUP, target, threadName);
init();
}
@@ -242,8 +290,6 @@
protected DirectoryThread(String threadName)
{
super(DIRECTORY_THREAD_GROUP, threadName);
-
-
init();
}
@@ -331,21 +377,100 @@
/**
- * Retrieves any relevent debug information with which this tread is
+ * Retrieves any relevant debug information with which this tread is
* associated so they can be included in debug messages.
*
* @return debug information about this thread as a string.
*/
public Map<String, String> getDebugProperties()
{
- LinkedHashMap<String, String> properties =
- new LinkedHashMap<String, String>();
+ Map<String, String> properties = new LinkedHashMap<String, String>();
properties.put("parentThread", parentThread.getName() +
"(" + parentThread.getId() + ")");
- properties.put("isDaemon", String.valueOf(this.isDaemon()));
+ properties.put("isDaemon", String.valueOf(isDaemon()));
return properties;
}
+
+ /**
+ * Returns whether the shutdown process has been initiated on the current
+ * thread. It also returns true when the thread is actually terminated.
+ * <p>
+ * Waiting for the thread to terminate should be done by invoking one of the
+ * {@link Thread#join()} methods.
+ *
+ * @return true if the shutdown process has been initiated on the current
+ * thread, false otherwise.
+ */
+ public boolean isShutdownInitiated()
+ {
+ return getThreadState().get().isShutdownInitiated();
+ }
+
+ /**
+ * Instructs the current thread to initiate the shutdown process. The actual
+ * shutdown of the thread is a best effort and is dependent on the
+ * implementation of the {@link Thread#run()} method.
+ */
+ public void initiateShutdown()
+ {
+ setThreadStateIfNotShuttingDown(ThreadState.SHUTTING_DOWN);
+ }
+
+ /**
+ * Sets the current thread state to "processing" if the shutdown process was
+ * not initiated.
+ */
+ public void startWork()
+ {
+ setThreadStateIfNotShuttingDown(ThreadState.PROCESSING);
+ }
+
+ /**
+ * Sets the current thread state to "idle" if the shutdown process was not
+ * initiated.
+ */
+ public void stopWork()
+ {
+ setThreadStateIfNotShuttingDown(ThreadState.IDLE);
+ }
+
+ /**
+ * Sets this thread's current state to the passed in newState if the thread is
+ * not already in a shutting down state.
+ *
+ * @param newState
+ * the new state to set
+ */
+ private void setThreadStateIfNotShuttingDown(ThreadState newState)
+ {
+ ThreadState currentState = this.threadState.get();
+ while (!currentState.isShutdownInitiated())
+ {
+ if (this.threadState.compareAndSet(currentState, newState))
+ {
+ return;
+ }
+ currentState = this.threadState.get();
+ }
+ }
+
+ /**
+ * Returns the current thread state, possibly returning
+ * {@link ThreadState#STOPPED} if the thread is not alive.
+ *
+ * @return an {@link AtomicReference} to a ThreadState. It can be passed down
+ * as a method call parameter.
+ */
+ private AtomicReference<ThreadState> getThreadState()
+ {
+ if (!isAlive())
+ {
+ this.threadState.set(ThreadState.STOPPED);
+ }
+ return this.threadState;
+ }
+
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index 55519b1..0ba2b64 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -115,8 +115,6 @@
private int serverId;
private String baseDn;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
- private boolean shutdown = false;
- private boolean done = false;
private DirectoryThread thread;
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
@@ -310,30 +308,18 @@
*/
public void shutdown()
{
- if (shutdown)
+ if (thread.isShutdownInitiated())
{
return;
}
- shutdown = true;
+ thread.initiateShutdown();
+
synchronized (msgQueue)
{
msgQueue.notifyAll();
}
- synchronized (this)
- { /* Can this be replaced with thread.join() ? */
- while (!done)
- {
- try
- {
- wait();
- }
- catch (InterruptedException e)
- { /* do nothing */}
- }
- }
-
while (msgQueue.size() != 0)
{
flush();
@@ -351,55 +337,62 @@
@Override
public void run()
{
- while (!shutdown)
- {
- try
- {
- flush();
- trim();
+ thread.startWork();
- synchronized (msgQueue)
+ try
+ {
+ while (!thread.isShutdownInitiated())
+ {
+ try
{
- if (msgQueue.size() < queueLowmark
- && queueByteSize < queueLowmarkBytes)
+ flush();
+ trim();
+
+ synchronized (msgQueue)
{
- try
+ if (msgQueue.size() < queueLowmark
+ && queueByteSize < queueLowmarkBytes)
{
- msgQueue.wait(1000);
- } catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
+ try
+ {
+ msgQueue.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
}
}
- } catch (Exception end)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(end));
- logError(mb.toMessage());
- synchronized (this)
+ catch (Exception end)
{
- // set the done variable to true so that this thread don't
- // get stuck in this dbHandler.shutdown() when it get called
- // by replicationServer.shutdown();
- done = true;
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
+ mb.append(" ");
+ mb.append(stackTraceToSingleLineString(end));
+ logError(mb.toMessage());
+
+ thread.initiateShutdown();
+
+ if (replicationServer != null)
+ {
+ replicationServer.shutdown();
+ }
+ break;
}
- if (replicationServer != null)
- {
- replicationServer.shutdown();
- }
- break;
}
+
+ // call flush a last time before exiting to make sure that
+ // no change was forgotten in the msgQueue
+ flush();
}
- // call flush a last time before exiting to make sure that
- // no change was forgotten in the msgQueue
- flush();
+ finally
+ {
+ thread.stopWork();
+ }
synchronized (this)
{
- done = true;
notifyAll();
}
}
@@ -450,11 +443,14 @@
{
for (int j = 0; j < 50; j++)
{
+ if (thread.isShutdownInitiated())
+ {
+ return;
+ }
+
CSN csn = cursor.nextCSN();
if (csn == null)
{
- cursor.close();
- done = true;
return;
}
@@ -465,21 +461,22 @@
else
{
firstChange = csn;
- cursor.close();
- done = true;
return;
}
}
- cursor.close();
}
catch (ChangelogException e)
{
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
cursor.abort();
- shutdown = true;
+ thread.initiateShutdown();
throw e;
}
+ finally
+ {
+ cursor.close();
+ }
}
}
}
--
Gitblit v1.10.0