From 9ca54cae8c40fcd3fd4b85414c4be5aa3c3c77d6 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 20 May 2014 14:06:04 +0000
Subject: [PATCH] Fixing JEChangeNumberIndexDBTest random tests. JE was throwing exception when the thread accessing it had been interrupted which happens frequently on single core machines. The solution is to replace the use of Thread.sleep(long) + Thread.interrupt() with Object.wait(long) + Object.notify() on thread shutdown.
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 53 +++++++++++++----
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java | 76 +++++++++++++------------
2 files changed, 79 insertions(+), 50 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
index dbf76c5..2c6d9a8 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -30,6 +30,7 @@
import java.net.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
@@ -87,23 +88,24 @@
new HashMap<DN, ReplicationServerDomain>();
private final ChangelogDB changelogDB;
- private volatile boolean shutdown = false;
+ private final AtomicBoolean shutdown = new AtomicBoolean();
private boolean stopListen = false;
- private ReplSessionSecurity replSessionSecurity;
+ private final ReplSessionSecurity replSessionSecurity;
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- private static String eclWorkflowID =
+ private static final String eclWorkflowID =
"External Changelog Workflow ID";
private ECLWorkflowElement eclwe;
- private AtomicReference<WorkflowImpl> eclWorkflowImpl =
+ private final AtomicReference<WorkflowImpl> eclWorkflowImpl =
new AtomicReference<WorkflowImpl>();
/**
* This is required for unit testing, so that we can keep track of all the
* replication servers which are running in the VM.
*/
- private static Set<Integer> localPorts = new CopyOnWriteArraySet<Integer>();
+ private static final Set<Integer> localPorts =
+ new CopyOnWriteArraySet<Integer>();
// Monitors for synchronizing domain creation with the connect thread.
private final Object domainTicketLock = new Object();
@@ -114,7 +116,7 @@
* Holds the list of all replication servers instantiated in this VM.
* This allows to perform clean up of the RS databases in unit tests.
*/
- private static List<ReplicationServer> allInstances =
+ private static final List<ReplicationServer> allInstances =
new ArrayList<ReplicationServer>();
/**
@@ -165,7 +167,6 @@
* ports from other replication servers or from LDAP servers
* and spawn further thread responsible for handling those connections
*/
-
void runListen()
{
logger.info(NOTE_REPLICATION_SERVER_LISTENING,
@@ -173,12 +174,11 @@
listenSocket.getInetAddress().getHostAddress(),
listenSocket.getLocalPort());
- while (!shutdown && !stopListen)
+ while (!shutdown.get() && !stopListen)
{
// Wait on the replicationServer port.
// Read incoming messages and create LDAP or ReplicationServer listener
// and Publisher.
-
try
{
Session session;
@@ -192,14 +192,18 @@
session = replSessionSecurity.createServerSession(newSocket,
timeoutMS);
if (session == null) // Error, go back to accept
+ {
continue;
+ }
}
catch (Exception e)
{
// If problems happen during the SSL handshake, it is necessary
// to close the socket to free the associated resources.
if (newSocket != null)
+ {
newSocket.close();
+ }
continue;
}
@@ -241,7 +245,7 @@
// Just log debug information and loop.
// Do not log the message during shutdown.
logger.traceException(e);
- if (!shutdown)
+ if (!shutdown.get())
{
logger.error(ERR_EXCEPTION_LISTENING, e.getLocalizedMessage());
}
@@ -259,7 +263,7 @@
{
synchronized (connectThreadLock)
{
- while (!shutdown)
+ while (!shutdown.get())
{
HostPort localAddress = HostPort.localAddress(getReplicationPort());
for (ReplicationServerDomain domain : getReplicationServerDomains())
@@ -336,8 +340,10 @@
boolean sslEncryption = replSessionSecurity.isSslEncryption();
if (logger.isTraceEnabled())
+ {
logger.trace("RS " + getMonitorInstanceName() + " connects to "
+ remoteServerAddress);
+ }
Socket socket = new Socket();
Session session = null;
@@ -365,7 +371,7 @@
*/
private void initialize()
{
- shutdown = false;
+ shutdown.set(false);
try
{
@@ -377,14 +383,16 @@
// creates working threads: we must first connect, then start to listen.
if (logger.isTraceEnabled())
- logger.trace("RS " +getMonitorInstanceName()+
- " creates connect thread");
+ {
+ logger.trace("RS " + getMonitorInstanceName() + " creates connect thread");
+ }
connectThread = new ReplicationServerConnectThread(this);
connectThread.start();
if (logger.isTraceEnabled())
- logger.trace("RS " +getMonitorInstanceName()+
- " creates listen thread");
+ {
+ logger.trace("RS " + getMonitorInstanceName() + " creates listen thread");
+ }
listenThread = new ReplicationServerListenThread(this);
listenThread.start();
@@ -399,8 +407,9 @@
eclwe = new ECLWorkflowElement(this);
if (logger.isTraceEnabled())
- logger.trace("RS " +getMonitorInstanceName()+
- " successfully initialized");
+ {
+ logger.trace("RS " + getMonitorInstanceName() + " successfully initialized");
+ }
} catch (UnknownHostException e)
{
logger.error(ERR_UNKNOWN_HOSTNAME);
@@ -577,7 +586,7 @@
/**
* Waits for connections to this ReplicationServer.
*/
- public void waitConnections()
+ void waitConnections()
{
// Acquire a domain ticket and wait for a complete cycle of the connect
// thread.
@@ -599,7 +608,7 @@
// Wait until the connect thread has processed next connect phase.
synchronized (domainTicketLock)
{
- while (myDomainTicket > domainTicket && !shutdown)
+ while (myDomainTicket > domainTicket && !shutdown.get())
{
try
{
@@ -622,10 +631,10 @@
{
localPorts.remove(getReplicationPort());
- if (shutdown)
+ if (!shutdown.compareAndSet(false, true))
+ {
return;
-
- shutdown = true;
+ }
// shutdown the connect thread
if (connectThread != null)
@@ -635,8 +644,6 @@
// shutdown the listener thread
close(listenSocket);
-
- // shutdown the listen thread
if (listenThread != null)
{
listenThread.interrupt();
@@ -705,9 +712,7 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public ConfigChangeResult applyConfigurationChange(
ReplicationServerCfg configuration)
@@ -856,9 +861,7 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public boolean isConfigurationChangeAcceptable(
ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
@@ -875,10 +878,8 @@
*/
public long getGenerationId(DN baseDN)
{
- ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
- if (rsd!=null)
- return rsd.getGenerationId();
- return -1;
+ final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
+ return rsd != null ? rsd.getGenerationId() : -1;
}
/**
@@ -899,8 +900,9 @@
public void remove()
{
if (logger.isTraceEnabled())
+ {
logger.trace("RS " + getMonitorInstanceName() + " starts removing");
-
+ }
shutdown();
}
@@ -983,7 +985,9 @@
}
if (serversToDisconnect.isEmpty())
+ {
return;
+ }
for (ReplicationServerDomain domain: getReplicationServerDomains())
{
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 6a5a11b..a5540a2 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -377,13 +377,11 @@
if (indexer != null)
{
indexer.initiateShutdown();
- indexer.interrupt();
}
final ChangelogDBPurger purger = cnPurger.getAndSet(null);
if (purger != null)
{
purger.initiateShutdown();
- purger.interrupt();
}
try
@@ -827,17 +825,14 @@
oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
if (oldestNotPurgedCSN == null)
{ // shutdown may have been initiated...
- if (!isShutdownInitiated())
- {
- // ... or the change number index DB is empty,
- // wait for new changes to come in.
+ // ... or the change number index DB is empty,
+ // wait for new changes to come in.
- // Note we cannot sleep for as long as the purge delay
- // (3 days default), because we might receive late updates
- // that will have to be purged before the purge delay elapses.
- // This can particularly happen in case of network partitions.
- sleep(DEFAULT_SLEEP);
- }
+ // Note we cannot sleep for as long as the purge delay
+ // (3 days default), because we might receive late updates
+ // that will have to be purged before the purge delay elapses.
+ // This can particularly happen in case of network partitions.
+ jeFriendlySleep(DEFAULT_SLEEP);
continue;
}
}
@@ -853,7 +848,7 @@
latestPurgeDate = purgeTimestamp;
- sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
}
catch (InterruptedException e)
{
@@ -870,6 +865,33 @@
}
}
+ /**
+ * This method implements a sleep() that is friendly to Berkeley JE.
+ * <p>
+ * Originally, {@link Thread#sleep(long)} was used , but waking up a
+ * sleeping threads required calling {@link Thread#interrupt()}, and JE
+ * threw exceptions when invoked on interrupted threads.
+ * <p>
+ * The solution is to replace:
+ * <ol>
+ * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
+ * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
+ * </ol>
+ */
+ private void jeFriendlySleep(long millis) throws InterruptedException
+ {
+ if (!isShutdownInitiated())
+ {
+ synchronized (this)
+ {
+ if (!isShutdownInitiated())
+ {
+ wait(millis);
+ }
+ }
+ }
+ }
+
private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
{
final long nextPurgeTime = notPurgedCSN.getTime();
@@ -888,7 +910,10 @@
public void initiateShutdown()
{
super.initiateShutdown();
- this.interrupt(); // wake up the purger thread for faster shutdown
+ synchronized (this)
+ {
+ notify(); // wake up the purger thread for faster shutdown
+ }
}
}
}
--
Gitblit v1.10.0