mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
13.06.2014 f9bc2351a98b71be3239da5dc001a903077ebf24
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -49,12 +49,13 @@
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.opends.messages.ReplicationMessages.*;
/**
 * Log file implementation of the ChangelogDB interface.
@@ -402,12 +403,6 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.clear();
    }
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
@@ -497,6 +492,11 @@
    Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      final ChangeNumberIndexer indexer = this.cnIndexer.get();
      if (indexer != null)
      {
        indexer.clear(baseDN);
      }
      synchronized (domainMap)
      {
        domainMap = domainToReplicaDBs.remove(baseDN);
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,7 +29,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
@@ -63,10 +63,13 @@
  private static final DebugTracer TRACER = getTracer();
  /**
   * If this is true, then the {@link #run()} method must clear its state.
   * Otherwise the run method executes normally.
   * If it contains nothing, then the run method executes normally.
   * Otherwise, the {@link #run()} method must clear its state
   * for the supplied domain baseDNs. If a supplied domain is
   * {@link DN#NULL_DN}, then all domains will be cleared.
   */
  private final AtomicBoolean doClear = new AtomicBoolean();
  private final ConcurrentSkipListSet<DN> domainsToClear =
      new ConcurrentSkipListSet<DN>();
  private final ChangelogDB changelogDB;
  /** Only used for initialization, and then discarded. */
  private ChangelogState changelogState;
@@ -525,13 +528,17 @@
      {
        try
        {
          if (doClear.get())
          if (!domainsToClear.isEmpty())
          {
            removeAllCursors();
            while (!domainsToClear.isEmpty())
            {
              final DN baseDNToClear = domainsToClear.first();
              removeCursors(baseDNToClear);
              // Only release the waiting thread
              // once this domain's state has been cleared.
              domainsToClear.remove(baseDNToClear);
            }
            resetNextChangeForInsertDBCursor();
            // No need to use CAS here because it is only for unit tests and at
            // this point all will have been cleaned up anyway.
            doClear.set(false);
          }
          else
          {
@@ -623,7 +630,7 @@
    }
    finally
    {
      removeAllCursors();
      removeCursors(DN.NULL_DN);
    }
  }
@@ -676,19 +683,33 @@
    }
  }
  private void removeAllCursors()
  private void removeCursors(DN baseDN)
  {
    if (nextChangeForInsertDBCursor != null)
    {
      nextChangeForInsertDBCursor.close();
      nextChangeForInsertDBCursor = null;
    }
    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
    if (DN.NULL_DN.equals(baseDN))
    {
      StaticUtils.close(map.values());
      // close all cursors
      for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
      {
        StaticUtils.close(map.values());
      }
      allCursors.clear();
      newCursors.clear();
    }
    allCursors.clear();
    newCursors.clear();
    else
    {
      // close cursors for this DN
      final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
      if (map != null)
      {
        StaticUtils.close(map.values());
      }
      newCursors.remove(baseDN);
    }
  }
  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
@@ -756,22 +777,27 @@
  }
  /**
   * Asks the current thread to clear its state and blocks until state is
   * cleared.
   * Asks the current thread to clear its state for the specified domain.
   * <p>
   * This method is only useful for unit tests.
   * Note: This method blocks the current thread until state is cleared.
   *
   * @param baseDN the baseDN to be cleared from this thread's state.
   *               {@code null} and {@link DN#NULL_DN} mean "clear all domains".
   */
  public void clear()
  public void clear(DN baseDN)
  {
    doClear.set(true);
    while (doClear.get() && !State.TERMINATED.equals(getState()))
    // Use DN.NULL_DN to say "clear all domains"
    final DN baseDNToClear = baseDN != null ? baseDN : DN.NULL_DN;
    domainsToClear.add(baseDNToClear);
    while (domainsToClear.contains(baseDNToClear)
        && !State.TERMINATED.equals(getState()))
    {
      // wait until clear() has been done by thread, always waking it up
      synchronized (this)
      {
        notify();
      }
      // ensures unit tests wait that this thread's state is cleaned up
      // ensures thread wait that this thread's state is cleaned up
      Thread.yield();
    }
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -462,12 +462,6 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.clear();
    }
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
@@ -555,6 +549,11 @@
    Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      final ChangeNumberIndexer indexer = this.cnIndexer.get();
      if (indexer != null)
      {
        indexer.clear(baseDN);
      }
      synchronized (domainMap)
      {
        domainMap = domainToReplicaDBs.remove(baseDN);