From f9bc2351a98b71be3239da5dc001a903077ebf24 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 13 Jun 2014 20:06:36 +0000
Subject: [PATCH] OPENDJ-1496 (CR-3767) ThreadInterruptedException (JE) when running replication tests

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java       |   11 ++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |   70 ++++++++++++++++++++++++-----------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java   |   14 +++---
 3 files changed, 60 insertions(+), 35 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index db0c9f3..6013bb6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -49,12 +49,13 @@
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
 import org.opends.server.util.TimeThread;
+
 import com.forgerock.opendj.util.Pair;
 
+import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.util.StaticUtils.*;
-import static org.opends.messages.ReplicationMessages.*;
 
 /**
  * Log file implementation of the ChangelogDB interface.
@@ -402,12 +403,6 @@
     // - then throw the first encountered exception
     ChangelogException firstException = null;
 
-    final ChangeNumberIndexer indexer = cnIndexer.get();
-    if (indexer != null)
-    {
-      indexer.clear();
-    }
-
     for (DN baseDN : this.domainToReplicaDBs.keySet())
     {
       removeDomain(baseDN);
@@ -497,6 +492,11 @@
     Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
     if (domainMap != null)
     {
+      final ChangeNumberIndexer indexer = this.cnIndexer.get();
+      if (indexer != null)
+      {
+        indexer.clear(baseDN);
+      }
       synchronized (domainMap)
       {
         domainMap = domainToReplicaDBs.remove(baseDN);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 8a068f6..7060e55 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,7 +29,7 @@
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.opends.messages.Message;
 import org.opends.server.api.DirectoryThread;
@@ -63,10 +63,13 @@
   private static final DebugTracer TRACER = getTracer();
 
   /**
-   * If this is true, then the {@link #run()} method must clear its state.
-   * Otherwise the run method executes normally.
+   * If it contains nothing, then the run method executes normally.
+   * Otherwise, the {@link #run()} method must clear its state
+   * for the supplied domain baseDNs. If a supplied domain is
+   * {@link DN#NULL_DN}, then all domains will be cleared.
    */
-  private final AtomicBoolean doClear = new AtomicBoolean();
+  private final ConcurrentSkipListSet<DN> domainsToClear =
+      new ConcurrentSkipListSet<DN>();
   private final ChangelogDB changelogDB;
   /** Only used for initialization, and then discarded. */
   private ChangelogState changelogState;
@@ -525,13 +528,17 @@
       {
         try
         {
-          if (doClear.get())
+          if (!domainsToClear.isEmpty())
           {
-            removeAllCursors();
+            while (!domainsToClear.isEmpty())
+            {
+              final DN baseDNToClear = domainsToClear.first();
+              removeCursors(baseDNToClear);
+              // Only release the waiting thread
+              // once this domain's state has been cleared.
+              domainsToClear.remove(baseDNToClear);
+            }
             resetNextChangeForInsertDBCursor();
-            // No need to use CAS here because it is only for unit tests and at
-            // this point all will have been cleaned up anyway.
-            doClear.set(false);
           }
           else
           {
@@ -623,7 +630,7 @@
     }
     finally
     {
-      removeAllCursors();
+      removeCursors(DN.NULL_DN);
     }
   }
 
@@ -676,19 +683,33 @@
     }
   }
 
-  private void removeAllCursors()
+  private void removeCursors(DN baseDN)
   {
     if (nextChangeForInsertDBCursor != null)
     {
       nextChangeForInsertDBCursor.close();
       nextChangeForInsertDBCursor = null;
     }
-    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
+    if (DN.NULL_DN.equals(baseDN))
     {
-      StaticUtils.close(map.values());
+      // close all cursors
+      for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
+      {
+        StaticUtils.close(map.values());
+      }
+      allCursors.clear();
+      newCursors.clear();
     }
-    allCursors.clear();
-    newCursors.clear();
+    else
+    {
+      // close cursors for this DN
+      final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
+      if (map != null)
+      {
+        StaticUtils.close(map.values());
+      }
+      newCursors.remove(baseDN);
+    }
   }
 
   private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
@@ -756,22 +777,27 @@
   }
 
   /**
-   * Asks the current thread to clear its state and blocks until state is
-   * cleared.
+   * Asks the current thread to clear its state for the specified domain.
    * <p>
-   * This method is only useful for unit tests.
+   * Note: This method blocks the current thread until state is cleared.
+   *
+   * @param baseDN the baseDN to be cleared from this thread's state.
+   *               {@code null} and {@link DN#NULL_DN} mean "clear all domains".
    */
-  public void clear()
+  public void clear(DN baseDN)
   {
-    doClear.set(true);
-    while (doClear.get() && !State.TERMINATED.equals(getState()))
+    // Use DN.NULL_DN to say "clear all domains"
+    final DN baseDNToClear = baseDN != null ? baseDN : DN.NULL_DN;
+    domainsToClear.add(baseDNToClear);
+    while (domainsToClear.contains(baseDNToClear)
+        && !State.TERMINATED.equals(getState()))
     {
       // wait until clear() has been done by thread, always waking it up
       synchronized (this)
       {
         notify();
       }
-      // ensures unit tests wait that this thread's state is cleaned up
+      // ensures thread wait that this thread's state is cleaned up
       Thread.yield();
     }
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index e1dbcd3..fb9cad9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -462,12 +462,6 @@
     // - then throw the first encountered exception
     ChangelogException firstException = null;
 
-    final ChangeNumberIndexer indexer = cnIndexer.get();
-    if (indexer != null)
-    {
-      indexer.clear();
-    }
-
     for (DN baseDN : this.domainToReplicaDBs.keySet())
     {
       removeDomain(baseDN);
@@ -555,6 +549,11 @@
     Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
     if (domainMap != null)
     {
+      final ChangeNumberIndexer indexer = this.cnIndexer.get();
+      if (indexer != null)
+      {
+        indexer.clear(baseDN);
+      }
       synchronized (domainMap)
       {
         domainMap = domainToReplicaDBs.remove(baseDN);

--
Gitblit v1.10.0