mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
13.06.2014 01e198c6c4c3bbbfe57ebc2f9b749d74c5c781fc
OPENDJ-1496 (CR-3767) ThreadInterruptedException (JE) when running replication tests

Issue: On resetting the generationId in RS2, the ServerReader thread is blocked waiting on the ChangeNumberIndexer thread that is waiting on new changes.
The fix consisted in ensuring the replicaDB cursors are released by the ChangeNumberIndexer thread when a replicaDB is being cleared.


ChangeNumberIndexer.java:
Changed AtomicBoolean doClear field into ConcurrentSkipListSet<DN> domainsToClear.
Changed removeAllCursors() to removeCursors(DN baseDN).
In clear(), added a DN parameter.

JEChangelogDB.java, FileChangelogDB.java:
Called ChangeNumberIndexer.clear() from removeDomain() rather than from clearDB().
3 files modified
95 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 70 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -49,12 +49,13 @@
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.opends.messages.ReplicationMessages.*;
/**
 * Log file implementation of the ChangelogDB interface.
@@ -402,12 +403,6 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.clear();
    }
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
@@ -497,6 +492,11 @@
    Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      final ChangeNumberIndexer indexer = this.cnIndexer.get();
      if (indexer != null)
      {
        indexer.clear(baseDN);
      }
      synchronized (domainMap)
      {
        domainMap = domainToReplicaDBs.remove(baseDN);
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,7 +29,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
@@ -63,10 +63,13 @@
  private static final DebugTracer TRACER = getTracer();
  /**
   * If this is true, then the {@link #run()} method must clear its state.
   * Otherwise the run method executes normally.
   * If it contains nothing, then the run method executes normally.
   * Otherwise, the {@link #run()} method must clear its state
   * for the supplied domain baseDNs. If a supplied domain is
   * {@link DN#NULL_DN}, then all domains will be cleared.
   */
  private final AtomicBoolean doClear = new AtomicBoolean();
  private final ConcurrentSkipListSet<DN> domainsToClear =
      new ConcurrentSkipListSet<DN>();
  private final ChangelogDB changelogDB;
  /** Only used for initialization, and then discarded. */
  private ChangelogState changelogState;
@@ -525,13 +528,17 @@
      {
        try
        {
          if (doClear.get())
          if (!domainsToClear.isEmpty())
          {
            removeAllCursors();
            while (!domainsToClear.isEmpty())
            {
              final DN baseDNToClear = domainsToClear.first();
              removeCursors(baseDNToClear);
              // Only release the waiting thread
              // once this domain's state has been cleared.
              domainsToClear.remove(baseDNToClear);
            }
            resetNextChangeForInsertDBCursor();
            // No need to use CAS here because it is only for unit tests and at
            // this point all will have been cleaned up anyway.
            doClear.set(false);
          }
          else
          {
@@ -623,7 +630,7 @@
    }
    finally
    {
      removeAllCursors();
      removeCursors(DN.NULL_DN);
    }
  }
@@ -676,19 +683,33 @@
    }
  }
  private void removeAllCursors()
  private void removeCursors(DN baseDN)
  {
    if (nextChangeForInsertDBCursor != null)
    {
      nextChangeForInsertDBCursor.close();
      nextChangeForInsertDBCursor = null;
    }
    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
    if (DN.NULL_DN.equals(baseDN))
    {
      StaticUtils.close(map.values());
      // close all cursors
      for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
      {
        StaticUtils.close(map.values());
      }
      allCursors.clear();
      newCursors.clear();
    }
    allCursors.clear();
    newCursors.clear();
    else
    {
      // close cursors for this DN
      final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
      if (map != null)
      {
        StaticUtils.close(map.values());
      }
      newCursors.remove(baseDN);
    }
  }
  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
@@ -756,22 +777,27 @@
  }
  /**
   * Asks the current thread to clear its state and blocks until state is
   * cleared.
   * Asks the current thread to clear its state for the specified domain.
   * <p>
   * This method is only useful for unit tests.
   * Note: This method blocks the current thread until state is cleared.
   *
   * @param baseDN the baseDN to be cleared from this thread's state.
   *               {@code null} and {@link DN#NULL_DN} mean "clear all domains".
   */
  public void clear()
  public void clear(DN baseDN)
  {
    doClear.set(true);
    while (doClear.get() && !State.TERMINATED.equals(getState()))
    // Use DN.NULL_DN to say "clear all domains"
    final DN baseDNToClear = baseDN != null ? baseDN : DN.NULL_DN;
    domainsToClear.add(baseDNToClear);
    while (domainsToClear.contains(baseDNToClear)
        && !State.TERMINATED.equals(getState()))
    {
      // wait until clear() has been done by thread, always waking it up
      synchronized (this)
      {
        notify();
      }
      // ensures unit tests wait that this thread's state is cleaned up
      // ensures thread wait that this thread's state is cleaned up
      Thread.yield();
    }
  }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -462,12 +462,6 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.clear();
    }
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
@@ -555,6 +549,11 @@
    Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      final ChangeNumberIndexer indexer = this.cnIndexer.get();
      if (indexer != null)
      {
        indexer.clear(baseDN);
      }
      synchronized (domainMap)
      {
        domainMap = domainToReplicaDBs.remove(baseDN);