| | |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * If this is true, then the {@link #run()} method must clear its state. |
| | | * Otherwise the run method executes normally. |
| | | * If it contains nothing, then the run method executes normally. |
| | | * Otherwise, the {@link #run()} method must clear its state |
| | | * for the supplied domain baseDNs. If a supplied domain is |
| | | * {@link DN#NULL_DN}, then all domains will be cleared. |
| | | */ |
| | | private final AtomicBoolean doClear = new AtomicBoolean(); |
| | | private final ConcurrentSkipListSet<DN> domainsToClear = |
| | | new ConcurrentSkipListSet<DN>(); |
| | | private final ChangelogDB changelogDB; |
| | | /** Only used for initialization, and then discarded. */ |
| | | private ChangelogState changelogState; |
| | |
| | | { |
| | | try |
| | | { |
| | | if (doClear.get()) |
| | | if (!domainsToClear.isEmpty()) |
| | | { |
| | | removeAllCursors(); |
| | | while (!domainsToClear.isEmpty()) |
| | | { |
| | | final DN baseDNToClear = domainsToClear.first(); |
| | | removeCursors(baseDNToClear); |
| | | // Only release the waiting thread |
| | | // once this domain's state has been cleared. |
| | | domainsToClear.remove(baseDNToClear); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | // No need to use CAS here because it is only for unit tests and at |
| | | // this point all will have been cleaned up anyway. |
| | | doClear.set(false); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | finally |
| | | { |
| | | removeAllCursors(); |
| | | removeCursors(DN.NULL_DN); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private void removeAllCursors() |
| | | private void removeCursors(DN baseDN) |
| | | { |
| | | if (nextChangeForInsertDBCursor != null) |
| | | { |
| | | nextChangeForInsertDBCursor.close(); |
| | | nextChangeForInsertDBCursor = null; |
| | | } |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | if (DN.NULL_DN.equals(baseDN)) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | // close all cursors |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | allCursors.clear(); |
| | | newCursors.clear(); |
| | | } |
| | | allCursors.clear(); |
| | | newCursors.clear(); |
| | | else |
| | | { |
| | | // close cursors for this DN |
| | | final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN); |
| | | if (map != null) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | newCursors.remove(baseDN); |
| | | } |
| | | } |
| | | |
| | | private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | |
| | | } |
| | | |
| | | /** |
| | | * Asks the current thread to clear its state and blocks until state is |
| | | * cleared. |
| | | * Asks the current thread to clear its state for the specified domain. |
| | | * <p> |
| | | * This method is only useful for unit tests. |
| | | * Note: This method blocks the current thread until state is cleared. |
| | | * |
| | | * @param baseDN the baseDN to be cleared from this thread's state. |
| | | * {@code null} and {@link DN#NULL_DN} mean "clear all domains". |
| | | */ |
| | | public void clear() |
| | | public void clear(DN baseDN) |
| | | { |
| | | doClear.set(true); |
| | | while (doClear.get() && !State.TERMINATED.equals(getState())) |
| | | // Use DN.NULL_DN to say "clear all domains" |
| | | final DN baseDNToClear = baseDN != null ? baseDN : DN.NULL_DN; |
| | | domainsToClear.add(baseDNToClear); |
| | | while (domainsToClear.contains(baseDNToClear) |
| | | && !State.TERMINATED.equals(getState())) |
| | | { |
| | | // wait until clear() has been done by thread, always waking it up |
| | | synchronized (this) |
| | | { |
| | | notify(); |
| | | } |
| | | // ensures unit tests wait that this thread's state is cleaned up |
| | | // ensures thread wait that this thread's state is cleaned up |
| | | Thread.yield(); |
| | | } |
| | | } |