From f9bc2351a98b71be3239da5dc001a903077ebf24 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 13 Jun 2014 20:06:36 +0000
Subject: [PATCH] OPENDJ-1496 (CR-3767) ThreadInterruptedException (JE) when running replication tests
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 11 ++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 70 ++++++++++++++++++++++++-----------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 14 +++---
3 files changed, 60 insertions(+), 35 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index db0c9f3..6013bb6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -49,12 +49,13 @@
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
+
import com.forgerock.opendj.util.Pair;
+import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
-import static org.opends.messages.ReplicationMessages.*;
/**
* Log file implementation of the ChangelogDB interface.
@@ -402,12 +403,6 @@
// - then throw the first encountered exception
ChangelogException firstException = null;
- final ChangeNumberIndexer indexer = cnIndexer.get();
- if (indexer != null)
- {
- indexer.clear();
- }
-
for (DN baseDN : this.domainToReplicaDBs.keySet())
{
removeDomain(baseDN);
@@ -497,6 +492,11 @@
Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
if (domainMap != null)
{
+ final ChangeNumberIndexer indexer = this.cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.clear(baseDN);
+ }
synchronized (domainMap)
{
domainMap = domainToReplicaDBs.remove(baseDN);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 8a068f6..7060e55 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,7 +29,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
@@ -63,10 +63,13 @@
private static final DebugTracer TRACER = getTracer();
/**
- * If this is true, then the {@link #run()} method must clear its state.
- * Otherwise the run method executes normally.
+ * If it contains nothing, then the run method executes normally.
+ * Otherwise, the {@link #run()} method must clear its state
+ * for the supplied domain baseDNs. If a supplied domain is
+ * {@link DN#NULL_DN}, then all domains will be cleared.
*/
- private final AtomicBoolean doClear = new AtomicBoolean();
+ private final ConcurrentSkipListSet<DN> domainsToClear =
+ new ConcurrentSkipListSet<DN>();
private final ChangelogDB changelogDB;
/** Only used for initialization, and then discarded. */
private ChangelogState changelogState;
@@ -525,13 +528,17 @@
{
try
{
- if (doClear.get())
+ if (!domainsToClear.isEmpty())
{
- removeAllCursors();
+ while (!domainsToClear.isEmpty())
+ {
+ final DN baseDNToClear = domainsToClear.first();
+ removeCursors(baseDNToClear);
+ // Only release the waiting thread
+ // once this domain's state has been cleared.
+ domainsToClear.remove(baseDNToClear);
+ }
resetNextChangeForInsertDBCursor();
- // No need to use CAS here because it is only for unit tests and at
- // this point all will have been cleaned up anyway.
- doClear.set(false);
}
else
{
@@ -623,7 +630,7 @@
}
finally
{
- removeAllCursors();
+ removeCursors(DN.NULL_DN);
}
}
@@ -676,19 +683,33 @@
}
}
- private void removeAllCursors()
+ private void removeCursors(DN baseDN)
{
if (nextChangeForInsertDBCursor != null)
{
nextChangeForInsertDBCursor.close();
nextChangeForInsertDBCursor = null;
}
- for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
+ if (DN.NULL_DN.equals(baseDN))
{
- StaticUtils.close(map.values());
+ // close all cursors
+ for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
+ {
+ StaticUtils.close(map.values());
+ }
+ allCursors.clear();
+ newCursors.clear();
}
- allCursors.clear();
- newCursors.clear();
+ else
+ {
+ // close cursors for this DN
+ final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
+ if (map != null)
+ {
+ StaticUtils.close(map.values());
+ }
+ newCursors.remove(baseDN);
+ }
}
private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
@@ -756,22 +777,27 @@
}
/**
- * Asks the current thread to clear its state and blocks until state is
- * cleared.
+ * Asks the current thread to clear its state for the specified domain.
* <p>
- * This method is only useful for unit tests.
+ * Note: This method blocks the current thread until state is cleared.
+ *
+ * @param baseDN the baseDN to be cleared from this thread's state.
+ * {@code null} and {@link DN#NULL_DN} mean "clear all domains".
*/
- public void clear()
+ public void clear(DN baseDN)
{
- doClear.set(true);
- while (doClear.get() && !State.TERMINATED.equals(getState()))
+ // Use DN.NULL_DN to say "clear all domains"
+ final DN baseDNToClear = baseDN != null ? baseDN : DN.NULL_DN;
+ domainsToClear.add(baseDNToClear);
+ while (domainsToClear.contains(baseDNToClear)
+ && !State.TERMINATED.equals(getState()))
{
// wait until clear() has been done by thread, always waking it up
synchronized (this)
{
notify();
}
- // ensures unit tests wait that this thread's state is cleaned up
+ // ensures thread wait that this thread's state is cleaned up
Thread.yield();
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index e1dbcd3..fb9cad9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -462,12 +462,6 @@
// - then throw the first encountered exception
ChangelogException firstException = null;
- final ChangeNumberIndexer indexer = cnIndexer.get();
- if (indexer != null)
- {
- indexer.clear();
- }
-
for (DN baseDN : this.domainToReplicaDBs.keySet())
{
removeDomain(baseDN);
@@ -555,6 +549,11 @@
Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
if (domainMap != null)
{
+ final ChangeNumberIndexer indexer = this.cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.clear(baseDN);
+ }
synchronized (domainMap)
{
domainMap = domainToReplicaDBs.remove(baseDN);
--
Gitblit v1.10.0